Skip to content

Commit

Permalink
Add JMESPath processor and condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Mar 26, 2018
1 parent f199a18 commit b409a2d
Show file tree
Hide file tree
Showing 10 changed files with 673 additions and 10 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ input:
operator: equals_cs
part: 0
arg: ""
jmespath:
part: 0
query: ""
not: {}
or: []
resource: ""
xor: []
decompress:
algorithm: gzip
parts: []
Expand All @@ -191,6 +195,9 @@ input:
insert_part:
index: -1
content: ""
jmespath:
part: 0
query: ""
sample:
retain: 0.1
seed: 0
Expand Down Expand Up @@ -354,9 +361,13 @@ resources:
operator: equals_cs
part: 0
arg: ""
jmespath:
part: 0
query: ""
not: {}
or: []
resource: ""
xor: []
logger:
prefix: service
log_level: INFO
Expand Down
16 changes: 9 additions & 7 deletions lib/processor/condition/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ var Constructors = map[string]TypeSpec{}

// Config is the all encompassing configuration struct for all condition types.
type Config struct {
Type string `json:"type" yaml:"type"`
And AndConfig `json:"and" yaml:"and"`
Content ContentConfig `json:"content" yaml:"content"`
Not NotConfig `json:"not" yaml:"not"`
Or OrConfig `json:"or" yaml:"or"`
Resource string `json:"resource" yaml:"resource"`
Xor XorConfig `json:"xor" yaml:"xor"`
Type string `json:"type" yaml:"type"`
And AndConfig `json:"and" yaml:"and"`
Content ContentConfig `json:"content" yaml:"content"`
JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"`
Not NotConfig `json:"not" yaml:"not"`
Or OrConfig `json:"or" yaml:"or"`
Resource string `json:"resource" yaml:"resource"`
Xor XorConfig `json:"xor" yaml:"xor"`
}

// NewConfig returns a configuration struct fully populated with default values.
Expand All @@ -66,6 +67,7 @@ func NewConfig() Config {
Type: "content",
And: NewAndConfig(),
Content: NewContentConfig(),
JMESPath: NewJMESPathConfig(),
Not: NewNotConfig(),
Or: NewOrConfig(),
Resource: "",
Expand Down
147 changes: 147 additions & 0 deletions lib/processor/condition/jmespath.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package condition

import (
"fmt"

"github.com/Jeffail/benthos/lib/types"
"github.com/Jeffail/benthos/lib/util/service/log"
"github.com/Jeffail/benthos/lib/util/service/metrics"
jmespath "github.com/jmespath/go-jmespath"
)

//------------------------------------------------------------------------------

func init() {
Constructors["jmespath"] = TypeSpec{
constructor: NewJMESPath,
description: `
Parses a message part as a JSON blob and attempts to apply a JMESPath expression
to it, expecting a boolean response. If the response is true the condition
passes, otherwise it does not. Please refer to the
[JMESPath website](http://jmespath.org/) for information and tutorials regarding
the syntax of expressions.
For example, with the following config:
` + "``` yaml" + `
jmespath:
part: 0
query: a == 'foo'
` + "```" + `
If the initial jmespaths of part 0 were:
` + "``` json" + `
{
"a": "foo"
}
` + "```" + `
Then the condition would pass.
JMESPath is traditionally used for mutating JSON jmespath, in order to do this
please instead use the ` + "[`jmespath`](../processors/README.md#jmespath)" + `
processor instead.`,
}
}

//------------------------------------------------------------------------------

// JMESPathConfig is a configuration struct containing fields for the jmespath
// condition.
type JMESPathConfig struct {
Part int `json:"part" yaml:"part"`
Query string `json:"query" yaml:"query"`
}

// NewJMESPathConfig returns a JMESPathConfig with default values.
func NewJMESPathConfig() JMESPathConfig {
return JMESPathConfig{
Part: 0,
Query: "",
}
}

//------------------------------------------------------------------------------

// JMESPath is a condition that checks message against a jmespath query.
type JMESPath struct {
stats metrics.Type
log log.Modular
part int
query *jmespath.JMESPath
}

// NewJMESPath returns a JMESPath processor.
func NewJMESPath(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
query, err := jmespath.Compile(conf.JMESPath.Query)
if err != nil {
return nil, fmt.Errorf("failed to compile JMESPath query: %v", err)
}

return &JMESPath{
stats: stats,
log: log,
part: conf.JMESPath.Part,
query: query,
}, nil
}

//------------------------------------------------------------------------------

// Check attempts to check a message part against a configured condition.
func (c *JMESPath) Check(msg types.Message) bool {
index := c.part
if index < 0 {
index = msg.Len() + index
}

if index < 0 || index >= msg.Len() {
c.stats.Incr("condition.jmespath.skipped", 1)
return false
}

jsonPart, err := msg.GetJSON(index)
if err != nil {
c.stats.Incr("condition.jmespath.error.json_parse", 1)
c.stats.Incr("condition.jmespath.dropped", 1)
c.log.Errorf("Failed to parse part into json: %v\n", err)
return false
}

var result interface{}
if result, err = c.query.Search(jsonPart); err != nil {
c.stats.Incr("condition.jmespath.error.jmespath_search", 1)
c.stats.Incr("condition.jmespath.dropped", 1)
c.log.Errorf("Failed to search json: %v\n", err)
return false
}
c.stats.Incr("condition.jmespath.applied", 1)

resultBool, _ := result.(bool)
return resultBool
}

//------------------------------------------------------------------------------
111 changes: 111 additions & 0 deletions lib/processor/condition/jmespath_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package condition

import (
"os"
"testing"

"github.com/Jeffail/benthos/lib/types"
"github.com/Jeffail/benthos/lib/util/service/log"
"github.com/Jeffail/benthos/lib/util/service/metrics"
)

func TestJMESPathCheck(t *testing.T) {
testLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"})
testMet := metrics.DudType{}

type fields struct {
query string
part int
}
tests := []struct {
name string
fields fields
arg [][]byte
want bool
}{
{
name: "bool result pos",
fields: fields{
query: "foo == 'bar'",
part: 0,
},
arg: [][]byte{
[]byte(`{"foo":"bar"}`),
},
want: true,
},
{
name: "bool result neg",
fields: fields{
query: "foo == 'bar'",
part: 0,
},
arg: [][]byte{
[]byte(`{"foo":"baz"}`),
},
want: false,
},
{
name: "str result neg",
fields: fields{
query: "foo",
part: 0,
},
arg: [][]byte{
[]byte(`{"foo":"baz"}`),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := NewConfig()
conf.Type = "jmespath"
conf.JMESPath.Query = tt.fields.query
conf.JMESPath.Part = tt.fields.part

c, err := NewJMESPath(conf, nil, testLog, testMet)
if err != nil {
t.Error(err)
return
}
if got := c.Check(types.NewMessage(tt.arg)); got != tt.want {
t.Errorf("JMESPath.Check() = %v, want %v", got, tt.want)
}
})
}
}

func TestJMESPathBadOperator(t *testing.T) {
testLog := log.NewLogger(os.Stdout, log.LoggerConfig{LogLevel: "NONE"})
testMet := metrics.DudType{}

conf := NewConfig()
conf.Type = "jmespath"
conf.JMESPath.Query = "this@#$@#$%@#%$@# is a bad query"

_, err := NewJMESPath(conf, nil, testLog, testMet)
if err == nil {
t.Error("expected error from bad query")
}
}
2 changes: 2 additions & 0 deletions lib/processor/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"`
HashSample HashSampleConfig `json:"hash_sample" yaml:"hash_sample"`
InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"`
JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"`
Sample SampleConfig `json:"sample" yaml:"sample"`
SelectJSON SelectJSONConfig `json:"select_json" yaml:"select_json"`
SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
Expand All @@ -82,6 +83,7 @@ func NewConfig() Config {
Dedupe: NewDedupeConfig(),
HashSample: NewHashSampleConfig(),
InsertPart: NewInsertPartConfig(),
JMESPath: NewJMESPathConfig(),
Sample: NewSampleConfig(),
SelectJSON: NewSelectJSONConfig(),
SelectParts: NewSelectPartsConfig(),
Expand Down
Loading

0 comments on commit b409a2d

Please sign in to comment.