Skip to content

Commit 1d3bc8c

Browse files
authored
azure-event hub: improve error handling and stop input if the event has not been processed correctly (#16215)
* work on GA * update changelog * go vet * work on error * error handling * error message * temp * temp * undo tests * update * integration * fmt update * upgrade * upgrade * mage vendor * notice
1 parent b131405 commit 1d3bc8c

File tree

13 files changed

+75
-53
lines changed

13 files changed

+75
-53
lines changed

CHANGELOG.next.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
170170
- Add ingress nginx controller fileset {pull}16197[16197]
171171
- move create-[module,fileset,fields] to mage and enable in x-pack/filebeat {pull}15836[15836]
172172
- Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121]
173+
- Work on e2e ACK's for the azure-eventhub input {issue}15671[15671] {pull}16215[16215]
173174
- Add MQTT input. {issue}15602[15602] {pull}16204[16204]
174175
- Add ECS categorization fields to activemq module. {issue}16151[16151] {pull}16201[16201]
175176
- Add a TLS test and more debug output to httpjson input {pull}16315[16315]

NOTICE.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ License type (autodetected): MIT
518518

519519
--------------------------------------------------------------------
520520
Dependency: github.com/Azure/azure-event-hubs-go/v3
521-
Version: v3.1.0
521+
Version: v3.1.2
522522
License type (autodetected): MIT
523523
./vendor/github.com/Azure/azure-event-hubs-go/v3/LICENSE:
524524
--------------------------------------------------------------------

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee // indirect
1111
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible
1212
code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a // indirect
13-
github.com/Azure/azure-event-hubs-go/v3 v3.1.0
13+
github.com/Azure/azure-event-hubs-go/v3 v3.1.2
1414
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible
1515
github.com/Azure/azure-storage-blob-go v0.8.0
1616
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect

go.sum

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8
3131
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
3232
github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc=
3333
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
34-
github.com/Azure/azure-event-hubs-go/v3 v3.1.0 h1:j+/WXzke3PTRu5gAgSpWgWJVfpwIyaedIqqgdgkjAe0=
35-
github.com/Azure/azure-event-hubs-go/v3 v3.1.0/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
34+
github.com/Azure/azure-event-hubs-go/v3 v3.1.2 h1:S/NjCZ1Z2R4rHJd2Hbbad6rIhxJ4lZZebKTsKHweX4A=
35+
github.com/Azure/azure-event-hubs-go/v3 v3.1.2/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
3636
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
3737
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
3838
github.com/Azure/azure-pipeline-go v0.2.1 h1:OLBdZJ3yvOn2MezlWvbrBMTEUQC72zAftRZOMdj5HYo=
@@ -107,8 +107,8 @@ github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2Z
107107
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg=
108108
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 h1:WFwa9pqou0Nb4DdfBOyaBTH0GqLE74Qwdf61E7ITHwQ=
109109
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43/go.mod h1:tJPYQG4mnMeUtQvQKNkbsFrnmZOg59Qnf8CcctFv5v4=
110-
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
111110
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
111+
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
112112
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI=
113113
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y=
114114
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
@@ -730,8 +730,8 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR
730730
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
731731
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
732732
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
733-
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
734733
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
734+
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
735735
golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
736736
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
737737
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=

vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md

+7-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go

+28-22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/Azure/azure-event-hubs-go/v3/storage/credential.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/Azure/azure-event-hubs-go/v3/version.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ github.com/Azure/azure-amqp-common-go/v3/internal/tracing
4444
github.com/Azure/azure-amqp-common-go/v3/rpc
4545
github.com/Azure/azure-amqp-common-go/v3/sas
4646
github.com/Azure/azure-amqp-common-go/v3/uuid
47-
# github.com/Azure/azure-event-hubs-go/v3 v3.1.0
47+
# github.com/Azure/azure-event-hubs-go/v3 v3.1.2
4848
github.com/Azure/azure-event-hubs-go/v3
4949
github.com/Azure/azure-event-hubs-go/v3/atom
5050
github.com/Azure/azure-event-hubs-go/v3/eph

x-pack/filebeat/input/azureeventhub/eph.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package azureeventhub
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011

1112
eventhub "github.com/Azure/azure-event-hubs-go/v3"
@@ -48,8 +49,15 @@ func (a *azureInput) runWithEPH() error {
4849
// register a message handler -- many can be registered
4950
handlerID, err := a.processor.RegisterHandler(a.workerCtx,
5051
func(c context.Context, e *eventhub.Event) error {
52+
var onEventErr error
5153
// partitionID is not yet mapped in the azure-eventhub sdk
52-
return a.processEvents(e, "")
54+
ok := a.processEvents(e, "")
55+
if !ok {
56+
onEventErr = errors.New("OnEvent function returned false. Stopping input worker")
57+
a.log.Debug(onEventErr.Error())
58+
a.Stop()
59+
}
60+
return onEventErr
5361
})
5462
if err != nil {
5563
return err

x-pack/filebeat/input/azureeventhub/input.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type azureInput struct {
4242
workerWg sync.WaitGroup // waits on worker goroutine.
4343
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
4444
hub *eventhub.Hub // hub will be assigned
45+
ackChannel chan int
4546
}
4647

4748
const (
@@ -66,14 +67,6 @@ func NewInput(
6667
if err := cfg.Unpack(&config); err != nil {
6768
return nil, errors.Wrapf(err, "reading %s input config", inputName)
6869
}
69-
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
70-
Processing: beat.ProcessingConfig{
71-
DynamicFields: inputContext.DynamicFields,
72-
},
73-
})
74-
if err != nil {
75-
return nil, err
76-
}
7770

7871
inputCtx, cancelInputCtx := context.WithCancel(context.Background())
7972
go func() {
@@ -88,17 +81,24 @@ func NewInput(
8881
// to be recreated with each restart.
8982
workerCtx, workerCancel := context.WithCancel(inputCtx)
9083

91-
input := &azureInput{
84+
in := &azureInput{
9285
config: config,
9386
log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", config.ConnectionString),
94-
outlet: out,
9587
context: inputContext,
9688
workerCtx: workerCtx,
9789
workerCancel: workerCancel,
9890
}
99-
100-
input.log.Infof("Initialized %s input.", inputName)
101-
return input, nil
91+
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
92+
Processing: beat.ProcessingConfig{
93+
DynamicFields: inputContext.DynamicFields,
94+
},
95+
})
96+
if err != nil {
97+
return nil, err
98+
}
99+
in.outlet = out
100+
in.log.Infof("Initialized %s input.", inputName)
101+
return in, nil
102102
}
103103

104104
// Run starts the input worker then returns. Only the first invocation
@@ -176,7 +176,7 @@ func (a *azureInput) Wait() {
176176
a.Stop()
177177
}
178178

179-
func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) error {
179+
func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool {
180180
timestamp := time.Now()
181181
azure := common.MapStr{
182182
// partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable
@@ -195,12 +195,13 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) er
195195
"message": msg,
196196
"azure": azure,
197197
},
198+
Private: event.Data,
198199
})
199200
if !ok {
200-
return errors.New("event has not been sent")
201+
return ok
201202
}
202203
}
203-
return nil
204+
return true
204205
}
205206

206207
// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
@@ -209,7 +210,6 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
209210
err := json.Unmarshal(bMessage, &obj)
210211
if err != nil {
211212
a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err)
212-
return []string{string(bMessage)}
213213
}
214214
var messages []string
215215
if len(obj[expandEventListFromField]) > 0 {

x-pack/filebeat/input/azureeventhub/input_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ func TestProcessEvents(t *testing.T) {
5757
Data: []byte(msg),
5858
SystemProperties: &properties,
5959
}
60-
err = input.processEvents(&ev, "0")
61-
if err != nil {
62-
t.Fatal(err)
60+
ok := input.processEvents(&ev, "0")
61+
if !ok {
62+
t.Fatal("OnEvent function returned false")
6363
}
6464
assert.Equal(t, len(o.Events), 1)
6565
message, err := o.Events[0].Fields.GetValue("message")

0 commit comments

Comments
 (0)