diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c4475b5db..d6119d7850 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ All notable changes to this project will be documented in this file. ### Fixed -- Fixed `pg_stream` issue with discrepancies between replication and snapshot streaming for `UUID` type (@le-vlad) +- Fixed `pg_stream` issue with discrepancies between replication and snapshot streaming for `UUID` type. (@le-vlad) +- Fixed `avro` scanner bug introduced in v4.25.0. (@mihaitodor) ### Changed diff --git a/internal/impl/avro/resources/ocf.avro b/internal/impl/avro/resources/ocf.avro new file mode 100644 index 0000000000..66a8b71f6c Binary files /dev/null and b/internal/impl/avro/resources/ocf.avro differ diff --git a/internal/impl/avro/scanner.go b/internal/impl/avro/scanner.go index c90f2e8e10..f1a558b64e 100644 --- a/internal/impl/avro/scanner.go +++ b/internal/impl/avro/scanner.go @@ -17,6 +17,7 @@ package avro import ( "bufio" "context" + "fmt" "io" "github.com/linkedin/goavro/v2" @@ -115,14 +116,22 @@ func (c *avroScanner) NextBatch(ctx context.Context) (service.MessageBatch, erro return nil, io.EOF } + if !c.ocf.Scan() { + err := c.ocf.Err() + if err != nil { + return nil, fmt.Errorf("failed to scan OCF file: %s", err) + } + return nil, io.EOF + } + datum, err := c.ocf.Read() if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read OCF datum: %s", err) } jb, err := c.avroCodec.TextualFromNative(nil, datum) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode OCF datum to JSON: %s", err) } return service.MessageBatch{service.NewMessage(jb)}, nil } diff --git a/internal/impl/avro/scanner_test.go b/internal/impl/avro/scanner_test.go new file mode 100644 index 0000000000..729aa7a32f --- /dev/null +++ b/internal/impl/avro/scanner_test.go @@ -0,0 +1,99 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestScanner(t *testing.T) { + tests := []struct { + name string + rawJSON bool + output []string + }{ + { + name: "standard JSON", + rawJSON: false, + output: []string{ + `{"Price":{"double":12.32},"OrderDate":{"long.timestamp-millis":1687221496000},"OrderStatus":{"string":"Canceled"},"Email":{"string":"elizabeth.brown@example.com"},"Quantity":{"long":5}}`, + `{"Email":{"string":"james.wilson@example.com"},"Quantity":{"long":5},"Price":{"double":12.35},"OrderDate":{"long.timestamp-millis":1702926589000},"OrderStatus":{"string":"Pending"}}`, + `{"OrderDate":{"long.timestamp-millis":1708606337000},"OrderStatus":{"string":"Completed"},"Email":{"string":"kristin.walls@example.com"},"Quantity":{"long":6},"Price":{"double":10.3}}`, + }, + }, + { + name: "AVRO JSON", + rawJSON: true, + output: []string{ + `{"Email":"elizabeth.brown@example.com","OrderDate":1.687221496e+12,"OrderStatus":"Canceled","Price":12.32,"Quantity":5}`, + `{"Email":"james.wilson@example.com","OrderDate":1.702926589e+12,"OrderStatus":"Pending","Price":12.35,"Quantity":5}`, + `{"Email":"kristin.walls@example.com","OrderDate":1.708606337e+12,"OrderStatus":"Completed","Price":10.3,"Quantity":6}`, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + confSpec := service.NewConfigSpec().Field(service.NewScannerField("test")) + pConf, err := confSpec.ParseYAML(fmt.Sprintf(` +test: + avro: + raw_json: %t +`, test.rawJSON), nil) + require.NoError(t, err) + + rdr, err := pConf.FieldScanner("test") + require.NoError(t, err) + + b, err := os.ReadFile("./resources/ocf.avro") + require.NoError(t, err) + + buf := bytes.NewReader(b) + var acked bool + strm, err := rdr.Create(io.NopCloser(buf), func(ctx context.Context, err error) error { + acked = true + return nil + }, service.NewScannerSourceDetails()) + require.NoError(t, err) + + for _, s := range test.output { + m, aFn, err := strm.NextBatch(context.Background()) + require.NoError(t, err) + require.Len(t, m, 1) + mBytes, err := m[0].AsBytes() + require.NoError(t, err) + assert.JSONEq(t, s, string(mBytes)) + require.NoError(t, aFn(context.Background(), nil)) + assert.False(t, acked) + } + + _, _, err = strm.NextBatch(context.Background()) + require.Equal(t, io.EOF, err) + + require.NoError(t, strm.Close(context.Background())) + assert.True(t, acked) + }) + } +}