Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds functionality to schema_registry_decode to emit defaults or not #2855

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ label: ""
schema_registry_decode:
avro_raw_json: false
url: "" # No default (required)
protojson_marshaler_opts:
emit_defaults: false
oauth:
enabled: false
consumer_key: ""
Expand Down Expand Up @@ -120,6 +122,23 @@ The base URL of the schema registry service.
*Type*: `string`


=== `protojson_marshaler_opts`

Configuration options for the protojson marshaler.


*Type*: `object`


=== `protojson_marshaler_opts.emit_defaults`

specifies whether to render fields with zero values.


*Type*: `bool`

*Default*: `false`

=== `oauth`

Allows you to specify open authentication via OAuth version 1.
Expand Down
41 changes: 32 additions & 9 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/redpanda-data/connect/v4/internal/impl/confluent/sr"
)

const protoJSONMarshalerField = "protojson_marshaler_opts"

func schemaRegistryDecoderConfig() *service.ConfigSpec {
spec := service.NewConfigSpec().
Beta().
Expand Down Expand Up @@ -65,7 +67,17 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
Field(service.NewBoolField("avro_raw_json").
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
Advanced().Default(false)).
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))
Field(service.NewURLField("url").Description("The base URL of the schema registry service.")).
Fields(
service.NewObjectField(protoJSONMarshalerField,
service.NewBoolField("emit_defaults").
Description("specifies whether to render fields with zero values.").
Default(false).
Advanced(),
).
Description("Configuration options for the protojson marshaler.").
Advanced(),
)

for _, f := range service.NewHTTPRequestAuthSignerFields() {
spec = spec.Field(f.Version("4.7.0"))
Expand All @@ -88,8 +100,9 @@ func init() {
//------------------------------------------------------------------------------

type schemaRegistryDecoder struct {
avroRawJSON bool
client *sr.Client
avroRawJSON bool
protoJSONEmitDefaults bool
client *sr.Client

schemas map[int]*cachedSchemaDecoder
cacheMut sync.RWMutex
Expand Down Expand Up @@ -117,22 +130,32 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)

protoJSONMarshalerConf := conf.Namespace(protoJSONMarshalerField)

protoJSONEmitDefaults, err := protoJSONMarshalerConf.FieldBool("emit_defaults")
if err != nil {
return nil, err
}

return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, protoJSONEmitDefaults, mgr)
}

func newSchemaRegistryDecoder(
urlStr string,
reqSigner func(f fs.FS, req *http.Request) error,
tlsConf *tls.Config,
avroRawJSON bool,
protoJSONEmitsDefaults bool,
mgr *service.Resources,
) (*schemaRegistryDecoder, error) {
s := &schemaRegistryDecoder{
avroRawJSON: avroRawJSON,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
avroRawJSON: avroRawJSON,
protoJSONEmitDefaults: protoJSONEmitsDefaults,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
}
var err error
if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/impl/confluent/processor_schema_registry_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) {
return nil, fmt.Errorf("nope")
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)
require.NoError(t, decoder.Close(context.Background()))

Expand Down Expand Up @@ -469,7 +469,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -532,7 +532,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestAvroReferences(t *testing.T) {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestResolveJsonSchema(t *testing.T) {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
7 changes: 6 additions & 1 deletion internal/impl/confluent/serde_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(ctx context.Context, info sr.
return nil, err
}

protoJSONMarshalOpts := protojson.MarshalOptions{
Resolver: types,
EmitDefaultValues: s.protoJSONEmitDefaults,
}

msgTypes := targetFile.Messages()
return func(m *service.Message) error {
b, err := m.AsBytes()
Expand Down Expand Up @@ -87,7 +92,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(ctx context.Context, info sr.
return fmt.Errorf("failed to unmarshal protobuf message: %w", err)
}

data, err := protojson.MarshalOptions{Resolver: types}.Marshal(dynMsg)
data, err := protoJSONMarshalOpts.Marshal(dynMsg)
if err != nil {
return fmt.Errorf("failed to marshal JSON protobuf message: %w", err)
}
Expand Down
120 changes: 118 additions & 2 deletions internal/impl/confluent/serde_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ message bar {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -226,7 +226,7 @@ message bar {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -275,6 +275,122 @@ message bar {
}
}

func TestDecodeEmitsDefaultValues(t *testing.T) {
tCtx, done := context.WithTimeout(context.Background(), time.Second*10)
defer done()

thingsSchema := `
syntax = "proto3";
package things;

message foo {
float a = 1;
string b = 2;
}

message bar {
string b = 1;
}
`
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
switch path {
case "/subjects/things/versions/latest", "/schemas/ids/1":
return mustJBytes(t, map[string]any{
"id": 1,
"version": 10,
"schema": thingsSchema,
"schemaType": "PROTOBUF",
}), nil
}
return nil, nil
})

subj, err := service.NewInterpolatedString("${! @subject }")
require.NoError(t, err)

tests := []struct {
name string
subject string
emitDefaults bool
input string
output string
errContains []string
}{
{
name: "things omitted on foo when defaults NOT emitted",
subject: "things",
emitDefaults: false,
input: `{"a":0, "b":"hello world"}`,
output: `{"b":"hello world"}`,
},
{
name: "things NOT omitted on foo when defaults emitted",
subject: "things",
emitDefaults: true,
input: `{"a":0, "b":"hello world"}`,
output: `{"a":0,"b":"hello world"}`,
},
}

for _, test := range tests {

test := test

t.Run(test.name, func(t *testing.T) {

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, test.emitDefaults, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
_ = encoder.Close(tCtx)
_ = decoder.Close(tCtx)
})

inMsg := service.NewMessage([]byte(test.input))
inMsg.MetaSetMut("subject", test.subject)

encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg})
require.NoError(t, err)
require.Len(t, encodedMsgs, 1)
require.Len(t, encodedMsgs[0], 1)

encodedMsg := encodedMsgs[0][0]

if len(test.errContains) > 0 {
require.Error(t, encodedMsg.GetError())
for _, errStr := range test.errContains {
assert.Contains(t, encodedMsg.GetError().Error(), errStr)
}
return
}

b, err := encodedMsg.AsBytes()
require.NoError(t, err)

require.NoError(t, encodedMsg.GetError())
require.NotEqual(t, test.input, string(b))

var n any
require.Error(t, json.Unmarshal(b, &n), "message contents should no longer be valid JSON")

decodedMsgs, err := decoder.Process(tCtx, encodedMsg)
require.NoError(t, err)
require.Len(t, decodedMsgs, 1)

decodedMsg := decodedMsgs[0]

b, err = decodedMsg.AsBytes()
require.NoError(t, err)

require.NoError(t, decodedMsg.GetError())
require.JSONEq(t, test.output, string(b))
})
}
}

func runEncoderAgainstInputsMultiple(t testing.TB, urlStr, subject string, inputs [][]byte) {
tCtx, done := context.WithTimeout(context.Background(), time.Second*10)
defer done()
Expand Down
Loading