Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

emit default values for non populated fields in protobuf #3022

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ label: ""
schema_registry_decode:
avro_raw_json: false
url: "" # No default (required)
emit_default_values: false
oauth:
enabled: false
consumer_key: ""
Expand Down Expand Up @@ -120,6 +121,15 @@ The base URL of the schema registry service.
*Type*: `string`


=== `emit_default_values`

Emit default values for non populated fileds in protobuf.


*Type*: `bool`

*Default*: `false`

=== `oauth`

Allows you to specify open authentication via OAuth version 1.
Expand Down
29 changes: 20 additions & 9 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
Field(service.NewBoolField("avro_raw_json").
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
Advanced().Default(false)).
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))
Field(service.NewURLField("url").
Description("The base URL of the schema registry service.")).
Field(service.NewBoolField("emit_default_values").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead create a nested config object and add a boolean field for emit_defaults? I'd like to namespace these as over time I expect to have more protobuf knobs available.

That will look something like this:

service.NewObjectField("protobuf", service.NewBoolField("emit_defaults"))

Description("Emit default values for non populated fileds in protobuf.").
Advanced().Default(false))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
spec = spec.Field(f.Version("4.7.0"))
Expand All @@ -88,8 +92,9 @@ func init() {
//------------------------------------------------------------------------------

type schemaRegistryDecoder struct {
avroRawJSON bool
client *sr.Client
avroRawJSON bool
emitDefaultValues bool
client *sr.Client

schemas map[int]*cachedSchemaDecoder
cacheMut sync.RWMutex
Expand Down Expand Up @@ -117,22 +122,28 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
emitDefaultValues, err := conf.FieldBool("emit_default_values")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, emitDefaultValues, mgr)
}

func newSchemaRegistryDecoder(
urlStr string,
reqSigner func(f fs.FS, req *http.Request) error,
tlsConf *tls.Config,
avroRawJSON bool,
emitDefaultValues bool,
mgr *service.Resources,
) (*schemaRegistryDecoder, error) {
s := &schemaRegistryDecoder{
avroRawJSON: avroRawJSON,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
avroRawJSON: avroRawJSON,
emitDefaultValues: emitDefaultValues,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
}
var err error
if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
Expand Down
155 changes: 150 additions & 5 deletions internal/impl/confluent/processor_schema_registry_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ message users {
string gender = 4;
}`

const testProtoDefaultValuesSchema= `
syntax = "proto3";
package ksql;

message users {
int32 registertime = 1;
string userid = 2;
bool regionid = 3;
float gender = 4;
}`

const testJSONSchema = `{
"type": "object",
"properties": {
Expand Down Expand Up @@ -241,7 +252,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -344,7 +355,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -422,7 +433,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) {
return nil, fmt.Errorf("nope")
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)
require.NoError(t, decoder.Close(context.Background()))

Expand Down Expand Up @@ -469,7 +480,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -515,6 +526,140 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
decoder.cacheMut.Unlock()
}

func TestSchemaRegistryDecodeProtobufWithEmitDefaultValuesFalse(t *testing.T) {
payload1, err := json.Marshal(struct {
Type string `json:"schemaType"`
Schema string `json:"schema"`
}{
Type: "PROTOBUF",
Schema: testProtoDefaultValuesSchema,
})
require.NoError(t, err)

returnedSchema1 := false
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
switch path {
case "/schemas/ids/1":
assert.False(t, returnedSchema1)
returnedSchema1 = true
return payload1, nil
}
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
name string
input string
output string
errContains string
}{
{
name: "successful message",
input: "\x00\x00\x00\x00\x01\x00\x08\x01\x12\x01\x61\x18\x01\x25\x00\x00\x80\x3f",
output: `{"registertime": 1, "userid":"a" ,"regionid": true ,"gender":1.0}`,
},
{
name: "successful message with default values",
input: "\x00\x00\x00\x00\x01\x00\x08\x00\x12\x00\x18\x00\x25\x00\x00\x00\x00",
output: `{}`,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input)))
if test.errContains != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
} else {
require.NoError(t, err)
require.Len(t, outMsgs, 1)

b, err := outMsgs[0].AsBytes()
require.NoError(t, err)

assert.JSONEq(t, test.output, string(b), "%s: %s", test.name)
}
})
}

require.NoError(t, decoder.Close(context.Background()))
decoder.cacheMut.Lock()
assert.Empty(t, decoder.schemas)
decoder.cacheMut.Unlock()
}

func TestSchemaRegistryDecodeProtobufWithEmitDefaultValuesTrue(t *testing.T) {
payload1, err := json.Marshal(struct {
Type string `json:"schemaType"`
Schema string `json:"schema"`
}{
Type: "PROTOBUF",
Schema: testProtoDefaultValuesSchema,
})
require.NoError(t, err)

returnedSchema1 := false
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
switch path {
case "/schemas/ids/1":
assert.False(t, returnedSchema1)
returnedSchema1 = true
return payload1, nil
}
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, true, service.MockResources())
require.NoError(t, err)

tests := []struct {
name string
input string
output string
errContains string
}{
{
name: "successful message",
input: "\x00\x00\x00\x00\x01\x00\x08\x01\x12\x01\x61\x18\x01\x25\x00\x00\x80\x3f",
output: `{"registertime": 1, "userid":"a" ,"regionid": true ,"gender":1.0}`,
},
{
name: "successful message with default values",
input: "\x00\x00\x00\x00\x01\x00\x08\x00\x12\x00\x18\x00\x25\x00\x00\x00\x00",
output: `{"registertime": 0, "userid":"" ,"regionid": false ,"gender":0.0}`,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input)))
if test.errContains != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
} else {
require.NoError(t, err)
require.Len(t, outMsgs, 1)

b, err := outMsgs[0].AsBytes()
require.NoError(t, err)

assert.JSONEq(t, test.output, string(b), "%s: %s", test.name)
}
})
}

require.NoError(t, decoder.Close(context.Background()))
decoder.cacheMut.Lock()
assert.Empty(t, decoder.schemas)
decoder.cacheMut.Unlock()
}

func TestSchemaRegistryDecodeJson(t *testing.T) {
returnedSchema3 := false
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
Expand All @@ -532,7 +677,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestAvroReferences(t *testing.T) {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestResolveJsonSchema(t *testing.T) {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(ctx context.Context, info sr.
return fmt.Errorf("failed to unmarshal protobuf message: %w", err)
}

data, err := protojson.MarshalOptions{Resolver: types}.Marshal(dynMsg)
data, err := protojson.MarshalOptions{Resolver: types, EmitDefaultValues: s.emitDefaultValues}.Marshal(dynMsg)
if err != nil {
return fmt.Errorf("failed to marshal JSON protobuf message: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/confluent/serde_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ message bar {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -226,7 +226,7 @@ message bar {
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
Loading