From 80fc734630f7a63a8b714a88f733fab248497c01 Mon Sep 17 00:00:00 2001 From: Eduardo Alonso Date: Wed, 20 Nov 2024 17:03:08 +0100 Subject: [PATCH] emit default values for non populated fields in protobuf --- .../processors/schema_registry_decode.adoc | 10 ++ .../processor_schema_registry_decode.go | 29 +++- .../processor_schema_registry_decode_test.go | 155 +++++++++++++++++- internal/impl/confluent/serde_avro_test.go | 2 +- internal/impl/confluent/serde_json_test.go | 2 +- internal/impl/confluent/serde_protobuf.go | 2 +- .../impl/confluent/serde_protobuf_test.go | 4 +- 7 files changed, 185 insertions(+), 19 deletions(-) diff --git a/docs/modules/components/pages/processors/schema_registry_decode.adoc b/docs/modules/components/pages/processors/schema_registry_decode.adoc index f9f03cc8bf..d7c1b4c233 100644 --- a/docs/modules/components/pages/processors/schema_registry_decode.adoc +++ b/docs/modules/components/pages/processors/schema_registry_decode.adoc @@ -50,6 +50,7 @@ label: "" schema_registry_decode: avro_raw_json: false url: "" # No default (required) + emit_default_values: false oauth: enabled: false consumer_key: "" @@ -120,6 +121,15 @@ The base URL of the schema registry service. *Type*: `string` +=== `emit_default_values` + +Emit default values for non populated fileds in protobuf. + + +*Type*: `bool` + +*Default*: `false` + === `oauth` Allows you to specify open authentication via OAuth version 1. diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 46b4166152..11c388136e 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -65,7 +65,11 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Field(service.NewBoolField("avro_raw_json"). Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json."). Advanced().Default(false)). - Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) + Field(service.NewURLField("url"). + Description("The base URL of the schema registry service.")). + Field(service.NewBoolField("emit_default_values"). + Description("Emit default values for non populated fileds in protobuf."). + Advanced().Default(false)) for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("4.7.0")) @@ -88,8 +92,9 @@ func init() { //------------------------------------------------------------------------------ type schemaRegistryDecoder struct { - avroRawJSON bool - client *sr.Client + avroRawJSON bool + emitDefaultValues bool + client *sr.Client schemas map[int]*cachedSchemaDecoder cacheMut sync.RWMutex @@ -117,7 +122,11 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr) + emitDefaultValues, err := conf.FieldBool("emit_default_values") + if err != nil { + return nil, err + } + return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, emitDefaultValues, mgr) } func newSchemaRegistryDecoder( @@ -125,14 +134,16 @@ func newSchemaRegistryDecoder( reqSigner func(f fs.FS, req *http.Request) error, tlsConf *tls.Config, avroRawJSON bool, + emitDefaultValues bool, mgr *service.Resources, ) (*schemaRegistryDecoder, error) { s := &schemaRegistryDecoder{ - avroRawJSON: avroRawJSON, - schemas: map[int]*cachedSchemaDecoder{}, - shutSig: shutdown.NewSignaller(), - logger: mgr.Logger(), - mgr: mgr, + avroRawJSON: avroRawJSON, + emitDefaultValues: emitDefaultValues, + schemas: map[int]*cachedSchemaDecoder{}, + shutSig: shutdown.NewSignaller(), + logger: mgr.Logger(), + mgr: mgr, } var err error if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil { diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index 9b0fd920e1..c38a933ccb 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -197,6 +197,17 @@ message users { string gender = 4; }` +const testProtoDefaultValuesSchema= ` +syntax = "proto3"; +package ksql; + +message users { + int32 registertime = 1; + string userid = 2; + bool regionid = 3; + float gender = 4; +}` + const testJSONSchema = `{ "type": "object", "properties": { @@ -241,7 +252,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -344,7 +355,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -422,7 +433,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) { return nil, fmt.Errorf("nope") }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) require.NoError(t, decoder.Close(context.Background())) @@ -469,7 +480,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -515,6 +526,140 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) { decoder.cacheMut.Unlock() } +func TestSchemaRegistryDecodeProtobufWithEmitDefaultValuesFalse(t *testing.T) { + payload1, err := json.Marshal(struct { + Type string `json:"schemaType"` + Schema string `json:"schema"` + }{ + Type: "PROTOBUF", + Schema: testProtoDefaultValuesSchema, + }) + require.NoError(t, err) + + returnedSchema1 := false + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/schemas/ids/1": + assert.False(t, returnedSchema1) + returnedSchema1 = true + return payload1, nil + } + return nil, nil + }) + + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains string + }{ + { + name: "successful message", + input: "\x00\x00\x00\x00\x01\x00\x08\x01\x12\x01\x61\x18\x01\x25\x00\x00\x80\x3f", + output: `{"registertime": 1, "userid":"a" ,"regionid": true ,"gender":1.0}`, + }, + { + name: "successful message with default values", + input: "\x00\x00\x00\x00\x01\x00\x08\x00\x12\x00\x18\x00\x25\x00\x00\x00\x00", + output: `{}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input))) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } else { + require.NoError(t, err) + require.Len(t, outMsgs, 1) + + b, err := outMsgs[0].AsBytes() + require.NoError(t, err) + + assert.JSONEq(t, test.output, string(b), "%s: %s", test.name) + } + }) + } + + require.NoError(t, decoder.Close(context.Background())) + decoder.cacheMut.Lock() + assert.Empty(t, decoder.schemas) + decoder.cacheMut.Unlock() +} + +func TestSchemaRegistryDecodeProtobufWithEmitDefaultValuesTrue(t *testing.T) { + payload1, err := json.Marshal(struct { + Type string `json:"schemaType"` + Schema string `json:"schema"` + }{ + Type: "PROTOBUF", + Schema: testProtoDefaultValuesSchema, + }) + require.NoError(t, err) + + returnedSchema1 := false + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/schemas/ids/1": + assert.False(t, returnedSchema1) + returnedSchema1 = true + return payload1, nil + } + return nil, nil + }) + + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, true, service.MockResources()) + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains string + }{ + { + name: "successful message", + input: "\x00\x00\x00\x00\x01\x00\x08\x01\x12\x01\x61\x18\x01\x25\x00\x00\x80\x3f", + output: `{"registertime": 1, "userid":"a" ,"regionid": true ,"gender":1.0}`, + }, + { + name: "successful message with default values", + input: "\x00\x00\x00\x00\x01\x00\x08\x00\x12\x00\x18\x00\x25\x00\x00\x00\x00", + output: `{"registertime": 0, "userid":"" ,"regionid": false ,"gender":0.0}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input))) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } else { + require.NoError(t, err) + require.Len(t, outMsgs, 1) + + b, err := outMsgs[0].AsBytes() + require.NoError(t, err) + + assert.JSONEq(t, test.output, string(b), "%s: %s", test.name) + } + }) + } + + require.NoError(t, decoder.Close(context.Background())) + decoder.cacheMut.Lock() + assert.Empty(t, decoder.schemas) + decoder.cacheMut.Unlock() +} + func TestSchemaRegistryDecodeJson(t *testing.T) { returnedSchema3 := false urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { @@ -532,7 +677,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index cde3a55a09..733cfb8397 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -107,7 +107,7 @@ func TestAvroReferences(t *testing.T) { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_json_test.go b/internal/impl/confluent/serde_json_test.go index f1fd391cb2..f41386922d 100644 --- a/internal/impl/confluent/serde_json_test.go +++ b/internal/impl/confluent/serde_json_test.go @@ -110,7 +110,7 @@ func TestResolveJsonSchema(t *testing.T) { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_protobuf.go b/internal/impl/confluent/serde_protobuf.go index fc7696a132..cef8558cca 100644 --- a/internal/impl/confluent/serde_protobuf.go +++ b/internal/impl/confluent/serde_protobuf.go @@ -87,7 +87,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(ctx context.Context, info sr. return fmt.Errorf("failed to unmarshal protobuf message: %w", err) } - data, err := protojson.MarshalOptions{Resolver: types}.Marshal(dynMsg) + data, err := protojson.MarshalOptions{Resolver: types, EmitDefaultValues: s.emitDefaultValues}.Marshal(dynMsg) if err != nil { return fmt.Errorf("failed to marshal JSON protobuf message: %w", err) } diff --git a/internal/impl/confluent/serde_protobuf_test.go b/internal/impl/confluent/serde_protobuf_test.go index ab406c2aef..bf0023dd15 100644 --- a/internal/impl/confluent/serde_protobuf_test.go +++ b/internal/impl/confluent/serde_protobuf_test.go @@ -96,7 +96,7 @@ message bar { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -226,7 +226,7 @@ message bar { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() {