Skip to content

Commit

Permalink
backend: initial work on using buf protocompile packages
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Jan 10, 2025
1 parent 387a0c2 commit 9187cbd
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 136 deletions.
2 changes: 1 addition & 1 deletion backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
connectrpc.com/grpcreflect v1.2.0
github.com/aws/aws-sdk-go-v2/config v1.28.6
github.com/basgys/goxml2json v1.1.0
github.com/bufbuild/protocompile v0.14.1
github.com/bufbuild/protovalidate-go v0.8.0
github.com/carlmjohnson/requests v0.24.3
github.com/cloudhut/common v0.10.0
Expand Down Expand Up @@ -94,7 +95,6 @@ require (
github.com/aws/smithy-go v1.22.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bufbuild/protocompile v0.14.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.5.0 // indirect
Expand Down
190 changes: 86 additions & 104 deletions backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ import (
"time"

//nolint:staticcheck // Switching to the google golang protojson comes with a few breaking changes.
"github.com/golang/protobuf/jsonpb"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/msgregistry"
"github.com/bufbuild/protocompile"
"github.com/bufbuild/protocompile/linker"
"github.com/bufbuild/protocompile/reporter"
"github.com/twmb/franz-go/pkg/sr"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/protobuf/runtime/protoiface"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/filesystem"
"github.com/redpanda-data/console/backend/pkg/git"
"github.com/redpanda-data/console/backend/pkg/schema"
"github.com/redpanda-data/console/backend/pkg/schema/embed"
"google.golang.org/protobuf/types/dynamicpb"
)

// RecordPropertyType determines whether the to be recorded payload is either a
Expand Down Expand Up @@ -61,11 +63,11 @@ type Service struct {

// fileDescriptorsBySchemaID are used to find the right schema type for messages at deserialization time. The type
// index is encoded as part of the serialized message.
fileDescriptorsBySchemaID map[int]*desc.FileDescriptor
fileDescriptorsBySchemaID map[int]linker.File
fileDescriptorsBySchemaIDMutex sync.RWMutex

registryMutex sync.RWMutex
registry *msgregistry.MessageRegistry
registry *protoregistry.Types

sfGroup singleflight.Group
}
Expand Down Expand Up @@ -210,17 +212,19 @@ func (s *Service) unmarshalConfluentMessage(payload []byte) ([]byte, int, error)
}

// DeserializeProtobufMessageToJSON deserializes the protobuf message to JSON.
func (s *Service) DeserializeProtobufMessageToJSON(payload []byte, md *desc.MessageDescriptor) ([]byte, error) {
msg := dynamic.NewMessage(md)
err := msg.Unmarshal(payload)
func (s *Service) DeserializeProtobufMessageToJSON(payload []byte, md protoreflect.MessageDescriptor) ([]byte, error) {
msg := dynamicpb.NewMessage(md)
err := proto.Unmarshal(payload, msg)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal payload into protobuf message: %w", err)
}

jsonBytes, err := msg.MarshalJSONPB(&jsonpb.Marshaler{
AnyResolver: &anyResolver{s.registry},
EmitDefaults: true,
})
marshaller := protojson.MarshalOptions{
EmitDefaultValues: true,
Resolver: s.registry,
}

jsonBytes, err := marshaller.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to marshal protobuf message to JSON: %w", err)
}
Expand All @@ -229,35 +233,36 @@ func (s *Service) DeserializeProtobufMessageToJSON(payload []byte, md *desc.Mess
}

// SerializeJSONToProtobufMessage serializes the JSON data to Protobuf message.
func (s *Service) SerializeJSONToProtobufMessage(json []byte, md *desc.MessageDescriptor) ([]byte, error) {
msg := dynamic.NewMessage(md)
err := msg.UnmarshalJSONPB(&jsonpb.Unmarshaler{
AnyResolver: &anyResolver{s.registry},
AllowUnknownFields: true,
}, json)
func (s *Service) SerializeJSONToProtobufMessage(json []byte, md protoreflect.MessageDescriptor) ([]byte, error) {
msg := dynamicpb.NewMessage(md)
unmarshaller := protojson.UnmarshalOptions{
DiscardUnknown: false,
Resolver: s.registry,
}
err := unmarshaller.Unmarshal(json, msg)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal protobuf message from JSON: %w", err)
}

return msg.Marshal()
return protojson.Marshal(msg)
}

// GetMessageDescriptorForSchema gets the Protobuf message descriptor for the schema ID and message index.
// TODO consolidate this with getMessageDescriptorFromConfluentMessage
func (s *Service) GetMessageDescriptorForSchema(schemaID int, index []int) (*desc.MessageDescriptor, error) {
func (s *Service) GetMessageDescriptorForSchema(schemaID int, index []int) (protoreflect.MessageDescriptor, error) {
fd, exists := s.GetFileDescriptorBySchemaID(schemaID)
if !exists {
return nil, fmt.Errorf("schema ID %+v not found", schemaID)
}

messageTypes := fd.GetMessageTypes()
var messageDescriptor *desc.MessageDescriptor
messageTypes := fd.Messages()
var messageDescriptor protoreflect.MessageDescriptor
for _, idx := range index {
if idx > len(messageTypes) {
if idx > messageTypes.Len() {
return nil, fmt.Errorf("message index is larger than the message types array length")
}
messageDescriptor = messageTypes[idx]
messageTypes = messageDescriptor.GetNestedMessageTypes()
messageDescriptor = messageTypes.Get(idx)
messageTypes = messageDescriptor.Messages()
}

if messageDescriptor == nil {
Expand All @@ -279,21 +284,22 @@ func (s *Service) SerializeJSONToConfluentProtobufMessage(json []byte, schemaID
return nil, err
}

msg := dynamic.NewMessage(messageDescriptor)
err = msg.UnmarshalJSONPB(&jsonpb.Unmarshaler{
AnyResolver: &anyResolver{s.registry},
AllowUnknownFields: true,
}, json)
msg := dynamicpb.NewMessage(messageDescriptor)
unmarshaller := protojson.UnmarshalOptions{
DiscardUnknown: false,
Resolver: s.registry,
}
unmarshaller.Unmarshal(json, msg)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal protobuf message from JSON: %w", err)
}

var srSerde sr.Serde
srSerde.Register(
schemaID,
&dynamic.Message{},
&dynamicpb.Message{},
sr.EncodeFn(func(v any) ([]byte, error) {
return v.(*dynamic.Message).Marshal()
return protojson.Marshal(v.(*dynamicpb.Message))
}),
sr.Index(index...),
)
Expand Down Expand Up @@ -332,7 +338,7 @@ func (s *Service) IsProtobufSchemaRegistryEnabled() bool {
}

// GetMessageDescriptor tries to find the apr
func (s *Service) GetMessageDescriptor(topicName string, property RecordPropertyType) (*desc.MessageDescriptor, error) {
func (s *Service) GetMessageDescriptor(topicName string, property RecordPropertyType) (protoreflect.MessageDescriptor, error) {
// 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type
mapping, err := s.getMatchingMapping(topicName)
if err != nil {
Expand All @@ -354,17 +360,17 @@ func (s *Service) GetMessageDescriptor(topicName string, property RecordProperty

s.registryMutex.RLock()
defer s.registryMutex.RUnlock()
messageDescriptor, err := s.registry.FindMessageTypeByUrl(protoTypeURL)
messageType, err := s.registry.FindMessageByURL(protoTypeURL)
if err != nil {
return nil, fmt.Errorf("failed to find the proto type %s in the proto registry: %w", protoTypeURL, err)
}
if messageDescriptor == nil {
if messageType == nil {
// If this happens the user should already know that because we check the existence of all mapped types
// when we create the proto registry. A log message is printed if a mapping can't be find in the registry.
return nil, fmt.Errorf("failed to find the proto type %s in the proto registry: message descriptor is nil", protoTypeURL)
}

return messageDescriptor, nil
return messageType.Descriptor(), nil
}

type confluentEnvelope struct {
Expand Down Expand Up @@ -496,9 +502,14 @@ func (s *Service) createProtoRegistry(ctx context.Context) error {
}

// Create registry and add types from file descriptors
registry := msgregistry.NewMessageRegistryWithDefaults()
registry := new(protoregistry.Types)
for _, descriptor := range fileDescriptors {
registry.AddFile("", descriptor)
mds := descriptor.Messages()

for i := 0; i < mds.Len(); i++ {
mt := dynamicpb.NewMessageType(mds.Get(i))
registry.RegisterMessage(mt)
}
}
s.logger.Info("registered proto types in Console's local proto registry", zap.Int("registered_types", len(fileDescriptors)))

Expand All @@ -511,32 +522,33 @@ func (s *Service) createProtoRegistry(ctx context.Context) error {
missingTypes := 0
for _, mapping := range s.cfg.Mappings {
if mapping.ValueProtoType != "" {
messageDesc, err := s.registry.FindMessageTypeByUrl(mapping.ValueProtoType)
_, err := s.registry.FindMessageByURL(mapping.ValueProtoType)
if err != nil {
if err == protoregistry.NotFound {
s.logger.Warn("protobuf type from configured topic mapping does not exist",
zap.String("topic_name", mapping.TopicName.String()),
zap.String("value_proto_type", mapping.ValueProtoType))
missingTypes++
}
return fmt.Errorf("failed to get proto type from registry: %w", err)
}
if messageDesc == nil {
s.logger.Warn("protobuf type from configured topic mapping does not exist",
zap.String("topic_name", mapping.TopicName.String()),
zap.String("value_proto_type", mapping.ValueProtoType))
missingTypes++
} else {
foundTypes++
}

foundTypes++

}
if mapping.KeyProtoType != "" {
messageDesc, err := s.registry.FindMessageTypeByUrl(mapping.KeyProtoType)
_, err := s.registry.FindMessageByURL(mapping.ValueProtoType)
if err != nil {
if err == protoregistry.NotFound {
s.logger.Info("protobuf type from configured topic mapping does not exist",
zap.String("topic_name", mapping.TopicName.String()),
zap.String("key_proto_type", mapping.KeyProtoType))
missingTypes++
}
return fmt.Errorf("failed to get proto type from registry: %w", err)
}
if messageDesc == nil {
s.logger.Info("protobuf type from configured topic mapping does not exist",
zap.String("topic_name", mapping.TopicName.String()),
zap.String("key_proto_type", mapping.KeyProtoType))
missingTypes++
} else {
foundTypes++
}

foundTypes++
}
}

Expand All @@ -554,7 +566,7 @@ func (s *Service) createProtoRegistry(ctx context.Context) error {
// protoFileToDescriptorWithBinary parses a .proto file and compiles it to a descriptor using the protoc binary. Protoc must
// be available as command or this will fail.
// Imported dependencies (such as Protobuf timestamp) are included so that the descriptors are self-contained.
func (s *Service) protoFileToDescriptor(files map[string]filesystem.File) ([]*desc.FileDescriptor, error) {
func (s *Service) protoFileToDescriptor(files map[string]filesystem.File) ([]linker.File, error) {
filesStr := make(map[string]string, len(files))
filePaths := make([]string, 0, len(filesStr))
for _, file := range files {
Expand All @@ -570,14 +582,12 @@ func (s *Service) protoFileToDescriptor(files map[string]filesystem.File) ([]*de
if strings.HasPrefix(trimmedFilepath, prefix) {
trimmedFilepath = strings.TrimPrefix(trimmedFilepath, prefix)
trimmedFilepath = strings.TrimPrefix(trimmedFilepath, "/")
filesStr[trimmedFilepath] = string(file.Payload)
filePaths = append(filePaths, trimmedFilepath)
}
}
} else {
filesStr[trimmedFilepath] = string(file.Payload)
filePaths = append(filePaths, trimmedFilepath)
}
filesStr[trimmedFilepath] = string(file.Payload)
filePaths = append(filePaths, trimmedFilepath)

}

// Add common proto types
Expand All @@ -595,7 +605,7 @@ func (s *Service) protoFileToDescriptor(files map[string]filesystem.File) ([]*de
}
}

errorReporter := func(err protoparse.ErrorWithPos) error {
errorReporter := func(err reporter.ErrorWithPos) error {
position := err.GetPosition()
s.logger.Warn("failed to parse proto file to descriptor",
zap.String("file", position.Filename),
Expand All @@ -604,61 +614,33 @@ func (s *Service) protoFileToDescriptor(files map[string]filesystem.File) ([]*de
return nil
}

parser := protoparse.Parser{
Accessor: protoparse.FileContentsFromMap(filesStr),
ImportPaths: []string{"."},
InferImportPaths: true,
ValidateUnlinkedFiles: true,
IncludeSourceCodeInfo: true,
ErrorReporter: errorReporter,
compiler := protocompile.Compiler{
Resolver: &protocompile.SourceResolver{
Accessor: protocompile.SourceAccessorFromMap(filesStr),
},
Reporter: reporter.NewReporter(errorReporter, nil),
}
descriptors, err := parser.ParseFiles(filePaths...)

compiledFiles, err := compiler.Compile(context.Background(), filePaths...)
if err != nil {
return nil, fmt.Errorf("failed to parse proto files to descriptors: %w", err)
}

return descriptors, nil
return compiledFiles, nil
}

func (s *Service) setFileDescriptorsBySchemaID(descriptors map[int]*desc.FileDescriptor) {
func (s *Service) setFileDescriptorsBySchemaID(descriptors map[int]linker.File) {
s.fileDescriptorsBySchemaIDMutex.Lock()
defer s.fileDescriptorsBySchemaIDMutex.Unlock()

s.fileDescriptorsBySchemaID = descriptors
}

// GetFileDescriptorBySchemaID gets the file descriptor by schema ID.
func (s *Service) GetFileDescriptorBySchemaID(schemaID int) (*desc.FileDescriptor, bool) {
func (s *Service) GetFileDescriptorBySchemaID(schemaID int) (linker.File, bool) {
s.fileDescriptorsBySchemaIDMutex.Lock()
defer s.fileDescriptorsBySchemaIDMutex.Unlock()

desc, exists := s.fileDescriptorsBySchemaID[schemaID]
return desc, exists
}

// AnyResolver is used to resolve the google.protobuf.Any type.
// It takes a type URL, present in an Any message, and resolves
// it into an instance of the associated message.
//
// This custom resolver is required because the built-in / default
// any resolver in the protoreflect library, does not consider any
// types that are used in referenced types that are not directly
// part of the schema that is deserialized. This is described in
// more detail as part of the pull request that addresses the
// deserialization issue with the any types:
// https://github.com/redpanda-data/console/pull/425
type anyResolver struct {
mr *msgregistry.MessageRegistry
}

func (r *anyResolver) Resolve(typeURL string) (protoiface.MessageV1, error) {
// Protoreflect registers the type by stripping the contents before the last
// slash. Therefore we need to mimic this behaviour in order to resolve
// the type by it's given type url.
mname := typeURL
if slash := strings.LastIndex(mname, "/"); slash >= 0 {
mname = mname[slash+1:]
}

return r.mr.Resolve(mname)
}
Loading

0 comments on commit 9187cbd

Please sign in to comment.