diff --git a/Gopkg.lock b/Gopkg.lock index 6ca026787a..8815e76e24 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -321,14 +321,14 @@ version = "v2.0.0" [[projects]] + branch = "v2" name = "gopkg.in/yaml.v2" packages = ["."] - revision = "7f97868eec74b32b0982dd158a51a446d1da7eb5" - version = "v2.1.1" + revision = "4fc5987536ef307a24ca299aee7ae301cde3d221" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "5f97500d7aef822a95e562d480528d26cba7b9dc7d7ddadf6694875bb434b996" + inputs-digest = "5532da6e48d2e97d5c147b14e1e6254bb779feb25e617e87605cd47e4c6e13a4" solver-name = "gps-cdcl" solver-version = 1 diff --git a/config/everything.yaml b/config/everything.yaml index 3e65760150..fe3faace4f 100644 --- a/config/everything.yaml +++ b/config/everything.yaml @@ -172,9 +172,13 @@ input: operator: equals_cs part: 0 arg: "" + jmespath: + part: 0 + query: "" not: {} or: [] resource: "" + xor: [] decompress: algorithm: gzip parts: [] @@ -191,6 +195,9 @@ input: insert_part: index: -1 content: "" + jmespath: + part: 0 + query: "" sample: retain: 0.1 seed: 0 @@ -354,9 +361,13 @@ resources: operator: equals_cs part: 0 arg: "" + jmespath: + part: 0 + query: "" not: {} or: [] resource: "" + xor: [] logger: prefix: service log_level: INFO diff --git a/lib/processor/condition/constructor.go b/lib/processor/condition/constructor.go index 308b73d667..978aaa5e28 100644 --- a/lib/processor/condition/constructor.go +++ b/lib/processor/condition/constructor.go @@ -51,13 +51,14 @@ var Constructors = map[string]TypeSpec{} // Config is the all encompassing configuration struct for all condition types. type Config struct { - Type string `json:"type" yaml:"type"` - And AndConfig `json:"and" yaml:"and"` - Content ContentConfig `json:"content" yaml:"content"` - Not NotConfig `json:"not" yaml:"not"` - Or OrConfig `json:"or" yaml:"or"` - Resource string `json:"resource" yaml:"resource"` - Xor XorConfig `json:"xor" yaml:"xor"` + Type string `json:"type" yaml:"type"` + And AndConfig `json:"and" yaml:"and"` + Content ContentConfig `json:"content" yaml:"content"` + JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` + Not NotConfig `json:"not" yaml:"not"` + Or OrConfig `json:"or" yaml:"or"` + Resource string `json:"resource" yaml:"resource"` + Xor XorConfig `json:"xor" yaml:"xor"` } // NewConfig returns a configuration struct fully populated with default values. @@ -66,6 +67,7 @@ func NewConfig() Config { Type: "content", And: NewAndConfig(), Content: NewContentConfig(), + JMESPath: NewJMESPathConfig(), Not: NewNotConfig(), Or: NewOrConfig(), Resource: "", diff --git a/lib/processor/condition/jmespath.go b/lib/processor/condition/jmespath.go new file mode 100644 index 0000000000..fbe90870ba --- /dev/null +++ b/lib/processor/condition/jmespath.go @@ -0,0 +1,147 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package condition + +import ( + "fmt" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" + jmespath "github.com/jmespath/go-jmespath" +) + +//------------------------------------------------------------------------------ + +func init() { + Constructors["jmespath"] = TypeSpec{ + constructor: NewJMESPath, + description: ` +Parses a message part as a JSON blob and attempts to apply a JMESPath expression +to it, expecting a boolean response. If the response is true the condition +passes, otherwise it does not. Please refer to the +[JMESPath website](http://jmespath.org/) for information and tutorials regarding +the syntax of expressions. + +For example, with the following config: + +` + "``` yaml" + ` +jmespath: + part: 0 + query: a == 'foo' +` + "```" + ` + +If the initial jmespaths of part 0 were: + +` + "``` json" + ` +{ + "a": "foo" +} +` + "```" + ` + +Then the condition would pass. + +JMESPath is traditionally used for mutating JSON jmespath, in order to do this +please instead use the ` + "[`jmespath`](../processors/README.md#jmespath)" + ` +processor instead.`, + } +} + +//------------------------------------------------------------------------------ + +// JMESPathConfig is a configuration struct containing fields for the jmespath +// condition. +type JMESPathConfig struct { + Part int `json:"part" yaml:"part"` + Query string `json:"query" yaml:"query"` +} + +// NewJMESPathConfig returns a JMESPathConfig with default values. +func NewJMESPathConfig() JMESPathConfig { + return JMESPathConfig{ + Part: 0, + Query: "", + } +} + +//------------------------------------------------------------------------------ + +// JMESPath is a condition that checks message against a jmespath query. +type JMESPath struct { + stats metrics.Type + log log.Modular + part int + query *jmespath.JMESPath +} + +// NewJMESPath returns a JMESPath processor. +func NewJMESPath( + conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, +) (Type, error) { + query, err := jmespath.Compile(conf.JMESPath.Query) + if err != nil { + return nil, fmt.Errorf("failed to compile JMESPath query: %v", err) + } + + return &JMESPath{ + stats: stats, + log: log, + part: conf.JMESPath.Part, + query: query, + }, nil +} + +//------------------------------------------------------------------------------ + +// Check attempts to check a message part against a configured condition. +func (c *JMESPath) Check(msg types.Message) bool { + index := c.part + if index < 0 { + index = msg.Len() + index + } + + if index < 0 || index >= msg.Len() { + c.stats.Incr("condition.jmespath.skipped", 1) + return false + } + + jsonPart, err := msg.GetJSON(index) + if err != nil { + c.stats.Incr("condition.jmespath.error.json_parse", 1) + c.stats.Incr("condition.jmespath.dropped", 1) + c.log.Errorf("Failed to parse part into json: %v\n", err) + return false + } + + var result interface{} + if result, err = c.query.Search(jsonPart); err != nil { + c.stats.Incr("condition.jmespath.error.jmespath_search", 1) + c.stats.Incr("condition.jmespath.dropped", 1) + c.log.Errorf("Failed to search json: %v\n", err) + return false + } + c.stats.Incr("condition.jmespath.applied", 1) + + resultBool, _ := result.(bool) + return resultBool +} + +//------------------------------------------------------------------------------ diff --git a/lib/processor/condition/jmespath_test.go b/lib/processor/condition/jmespath_test.go new file mode 100644 index 0000000000..8f736b223d --- /dev/null +++ b/lib/processor/condition/jmespath_test.go @@ -0,0 +1,111 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package condition + +import ( + "os" + "testing" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" +) + +func TestJMESPathCheck(t *testing.T) { + testLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"}) + testMet := metrics.DudType{} + + type fields struct { + query string + part int + } + tests := []struct { + name string + fields fields + arg [][]byte + want bool + }{ + { + name: "bool result pos", + fields: fields{ + query: "foo == 'bar'", + part: 0, + }, + arg: [][]byte{ + []byte(`{"foo":"bar"}`), + }, + want: true, + }, + { + name: "bool result neg", + fields: fields{ + query: "foo == 'bar'", + part: 0, + }, + arg: [][]byte{ + []byte(`{"foo":"baz"}`), + }, + want: false, + }, + { + name: "str result neg", + fields: fields{ + query: "foo", + part: 0, + }, + arg: [][]byte{ + []byte(`{"foo":"baz"}`), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := NewConfig() + conf.Type = "jmespath" + conf.JMESPath.Query = tt.fields.query + conf.JMESPath.Part = tt.fields.part + + c, err := NewJMESPath(conf, nil, testLog, testMet) + if err != nil { + t.Error(err) + return + } + if got := c.Check(types.NewMessage(tt.arg)); got != tt.want { + t.Errorf("JMESPath.Check() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestJMESPathBadOperator(t *testing.T) { + testLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"}) + testMet := metrics.DudType{} + + conf := NewConfig() + conf.Type = "jmespath" + conf.JMESPath.Query = "this@#$@#$%@#%$@# is a bad query" + + _, err := NewJMESPath(conf, nil, testLog, testMet) + if err == nil { + t.Error("expected error from bad query") + } +} diff --git a/lib/processor/constructor.go b/lib/processor/constructor.go index 65c0a90841..9d8cab751c 100644 --- a/lib/processor/constructor.go +++ b/lib/processor/constructor.go @@ -61,6 +61,7 @@ type Config struct { Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"` HashSample HashSampleConfig `json:"hash_sample" yaml:"hash_sample"` InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"` + JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"` Sample SampleConfig `json:"sample" yaml:"sample"` SelectJSON SelectJSONConfig `json:"select_json" yaml:"select_json"` SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"` @@ -82,6 +83,7 @@ func NewConfig() Config { Dedupe: NewDedupeConfig(), HashSample: NewHashSampleConfig(), InsertPart: NewInsertPartConfig(), + JMESPath: NewJMESPathConfig(), Sample: NewSampleConfig(), SelectJSON: NewSelectJSONConfig(), SelectParts: NewSelectPartsConfig(), diff --git a/lib/processor/jmespath.go b/lib/processor/jmespath.go new file mode 100644 index 0000000000..3cdcd79985 --- /dev/null +++ b/lib/processor/jmespath.go @@ -0,0 +1,175 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package processor + +import ( + "fmt" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" + jmespath "github.com/jmespath/go-jmespath" +) + +//------------------------------------------------------------------------------ + +func init() { + Constructors["jmespath"] = TypeSpec{ + constructor: NewJMESPath, + description: ` +Parses a message part as a JSON blob and attempts to apply a JMESPath expression +to it, replacing the contents of the part with the result. Please refer to the +[JMESPath website](http://jmespath.org/) for information and tutorials regarding +the syntax of expressions. + +For example, with the following config: + +` + "``` yaml" + ` +jmespath: + part: 0 + query: locations[?state == 'WA'].name | sort(@) | {Cities: join(', ', @)} +` + "```" + ` + +If the initial contents of part 0 were: + +` + "``` json" + ` +{ + "locations": [ + {"name": "Seattle", "state": "WA"}, + {"name": "New York", "state": "NY"}, + {"name": "Bellevue", "state": "WA"}, + {"name": "Olympia", "state": "WA"} + ] +} +` + "```" + ` + +Then the resulting contents of part 0 would be: + +` + "``` json" + ` +{"Cities": "Bellevue, Olympia, Seattle"} +` + "```" + ` + +It is possible to create boolean queries with JMESPath, in order to filter +messages with boolean queries please instead use the +` + "[`jmespath`](../conditions/README.md#jmespath)" + ` condition instead. + +The part index can be negative, and if so the part will be selected from the end +counting backwards starting from -1. E.g. if part = -1 then the selected part +will be the last part of the message, if part = -2 then the part before the +last element with be selected, and so on.`, + } +} + +//------------------------------------------------------------------------------ + +// JMESPathConfig contains any configuration for the JMESPath processor. +type JMESPathConfig struct { + Part int `json:"part" yaml:"part"` + Query string `json:"query" yaml:"query"` +} + +// NewJMESPathConfig returns a JMESPathConfig with default values. +func NewJMESPathConfig() JMESPathConfig { + return JMESPathConfig{ + Part: 0, + Query: "", + } +} + +//------------------------------------------------------------------------------ + +// JMESPath is a processor that executes JMESPath queries on a message part and +// replaces the contents with the result. +type JMESPath struct { + part int + query *jmespath.JMESPath + + conf Config + log log.Modular + stats metrics.Type +} + +// NewJMESPath returns a JMESPath processor. +func NewJMESPath( + conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, +) (Type, error) { + query, err := jmespath.Compile(conf.JMESPath.Query) + if err != nil { + return nil, fmt.Errorf("failed to compile JMESPath query: %v", err) + } + j := &JMESPath{ + part: conf.JMESPath.Part, + query: query, + conf: conf, + log: log.NewModule(".processor.jmespath"), + stats: stats, + } + return j, nil +} + +//------------------------------------------------------------------------------ + +// ProcessMessage prepends a new message part to the message. +func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response) { + p.stats.Incr("processor.jmespath.count", 1) + + msgs := [1]types.Message{msg} + + index := p.part + if index < 0 { + index = msg.Len() + index + } + + if index < 0 || index >= msg.Len() { + p.stats.Incr("processor.jmespath.skipped", 1) + p.stats.Incr("processor.jmespath.dropped", 1) + return msgs[:], nil + } + + jsonPart, err := msg.GetJSON(index) + if err != nil { + p.stats.Incr("processor.jmespath.error.json_parse", 1) + p.stats.Incr("processor.jmespath.dropped", 1) + p.log.Errorf("Failed to parse part into json: %v\n", err) + return msgs[:], nil + } + + var result interface{} + if result, err = p.query.Search(jsonPart); err != nil { + p.stats.Incr("processor.jmespath.error.jmespath_search", 1) + p.stats.Incr("processor.jmespath.dropped", 1) + p.log.Errorf("Failed to search json: %v\n", err) + return msgs[:], nil + } + + newMsg := msg.ShallowCopy() + msgs[0] = newMsg + + if err = newMsg.SetJSON(index, result); err != nil { + p.stats.Incr("processor.jmespath.error.json_set", 1) + p.log.Errorf("Failed to convert jmespath result into part: %v\n", err) + } + + p.stats.Incr("processor.jmespath.sent", 1) + return msgs[:], nil +} + +//------------------------------------------------------------------------------ diff --git a/lib/processor/jmespath_test.go b/lib/processor/jmespath_test.go new file mode 100644 index 0000000000..4373942e9b --- /dev/null +++ b/lib/processor/jmespath_test.go @@ -0,0 +1,136 @@ +package processor + +import ( + "os" + "testing" + + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" +) + +func TestJMESPathValidation(t *testing.T) { + conf := NewConfig() + conf.JMESPath.Part = 0 + conf.JMESPath.Query = "foo.bar" + + testLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"}) + + jSet, err := NewJMESPath(conf, nil, testLog, metrics.DudType{}) + if err != nil { + t.Fatal(err) + } + + msgIn := types.NewMessage([][]byte{[]byte("this is bad json")}) + msgs, res := jSet.ProcessMessage(msgIn) + if len(msgs) != 1 { + t.Fatal("No passthrough for bad input data") + } + if res != nil { + t.Fatal("Non-nil result") + } + if exp, act := "this is bad json", string(msgs[0].GetAll()[0]); exp != act { + t.Errorf("Wrong output from bad json: %v != %v", act, exp) + } + + conf.JMESPath.Part = 5 + + jSet, err = NewJMESPath(conf, nil, testLog, metrics.DudType{}) + if err != nil { + t.Fatal(err) + } + + msgIn = types.NewMessage([][]byte{[]byte("{}")}) + msgs, res = jSet.ProcessMessage(msgIn) + if len(msgs) != 1 { + t.Fatal("No passthrough for bad index") + } + if res != nil { + t.Fatal("Non-nil result") + } + if exp, act := "{}", string(msgs[0].GetAll()[0]); exp != act { + t.Errorf("Wrong output from bad index: %v != %v", act, exp) + } +} + +func TestJMESPath(t *testing.T) { + tLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"}) + tStats := metrics.DudType{} + + type jTest struct { + name string + path string + input string + output string + } + + tests := []jTest{ + { + name: "select obj", + path: "foo.bar", + input: `{"foo":{"bar":{"baz":1}}}`, + output: `{"baz":1}`, + }, + { + name: "select array", + path: "foo.bar", + input: `{"foo":{"bar":["baz","qux"]}}`, + output: `["baz","qux"]`, + }, + { + name: "select obj as str", + path: "foo.bar", + input: `{"foo":{"bar":"{\"baz\":1}"}}`, + output: `"{\"baz\":1}"`, + }, + { + name: "select str", + path: "foo.bar", + input: `{"foo":{"bar":"hello world"}}`, + output: `"hello world"`, + }, + { + name: "select float", + path: "foo.bar", + input: `{"foo":{"bar":0.123}}`, + output: `0.123`, + }, + { + name: "select int", + path: "foo.bar", + input: `{"foo":{"bar":123}}`, + output: `123`, + }, + { + name: "select bool", + path: "foo.bar", + input: `{"foo":{"bar":true}}`, + output: `true`, + }, + } + + for _, test := range tests { + conf := NewConfig() + conf.JMESPath.Part = 0 + conf.JMESPath.Query = test.path + + jSet, err := NewJMESPath(conf, nil, tLog, tStats) + if err != nil { + t.Fatalf("Error for test '%v': %v", test.name, err) + } + + inMsg := types.NewMessage( + [][]byte{ + []byte(test.input), + }, + ) + msgs, _ := jSet.ProcessMessage(inMsg) + if len(msgs) != 1 { + t.Fatalf("Test '%v' did not succeed", test.name) + } + + if exp, act := test.output, string(msgs[0].GetAll()[0]); exp != act { + t.Errorf("Wrong result '%v': %v != %v", test.name, act, exp) + } + } +} diff --git a/resources/docs/conditions/README.md b/resources/docs/conditions/README.md index ff9d7e7521..72dbd2e92d 100644 --- a/resources/docs/conditions/README.md +++ b/resources/docs/conditions/README.md @@ -140,6 +140,36 @@ Checks whether the part ends with the argument (case sensitive.) Checks whether the part ends with the argument under unicode case-folding (case insensitive.) +## `jmespath` + +Parses a message part as a JSON blob and attempts to apply a JMESPath expression +to it, expecting a boolean response. If the response is true the condition +passes, otherwise it does not. Please refer to the +[JMESPath website](http://jmespath.org/) for information and tutorials regarding +the syntax of expressions. + +For example, with the following config: + +``` yaml +jmespath: + part: 0 + query: a == 'foo' +``` + +If the initial jmespaths of part 0 were: + +``` json +{ + "a": "foo" +} +``` + +Then the condition would pass. + +JMESPath is traditionally used for mutating JSON jmespath, in order to do this +please instead use the [`jmespath`](../processors/README.md#jmespath) +processor instead. + ## `not` Not is a condition that returns the opposite (NOT) of its child condition. The @@ -224,6 +254,12 @@ they will only be executed once per message, regardless of how many times they are referenced (unless the content is modified). Therefore, resource conditions can act as a runtime optimisation as well as a config optimisation. +## `xor` + +Xor is a condition that returns the logical XOR of its children conditions, +meaning it only resolves to true if _exactly_ one of its children conditions +resolves to true. + [0]: ../processors/README.md [1]: ../processors/README.md#condition [2]: #resource diff --git a/resources/docs/processors/README.md b/resources/docs/processors/README.md index a8bc20e66c..313196da6b 100644 --- a/resources/docs/processors/README.md +++ b/resources/docs/processors/README.md @@ -121,6 +121,49 @@ than the length of the existing parts it will be inserted at the beginning. This processor will interpolate functions within the 'content' field, you can find a list of functions [here](../config_interpolation.md#functions). +## `jmespath` + +Parses a message part as a JSON blob and attempts to apply a JMESPath expression +to it, replacing the contents of the part with the result. Please refer to the +[JMESPath website](http://jmespath.org/) for information and tutorials regarding +the syntax of expressions. + +For example, with the following config: + +``` yaml +jmespath: + part: 0 + query: locations[?state == 'WA'].name | sort(@) | {Cities: join(', ', @)} +``` + +If the initial contents of part 0 were: + +``` json +{ + "locations": [ + {"name": "Seattle", "state": "WA"}, + {"name": "New York", "state": "NY"}, + {"name": "Bellevue", "state": "WA"}, + {"name": "Olympia", "state": "WA"} + ] +} +``` + +Then the resulting contents of part 0 would be: + +``` json +{"Cities": "Bellevue, Olympia, Seattle"} +``` + +It is possible to create boolean queries with JMESPath, in order to filter +messages with boolean queries please instead use the +[`jmespath`](../conditions/README.md#jmespath) condition instead. + +The part index can be negative, and if so the part will be selected from the end +counting backwards starting from -1. E.g. if part = -1 then the selected part +will be the last part of the message, if part = -2 then the part before the +last element with be selected, and so on. + ## `noop` Noop is a no-op processor that does nothing, the message passes through