Skip to content

Commit

Permalink
Add Redpanda Migrator offset metadata
Browse files Browse the repository at this point in the history
- New `redpanda_migrator_offsets` input.
- Field `kafka_offset_metadata` added to the `redpanda_migrator_offsets` output.

Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Nov 21, 2024
1 parent 4461a69 commit 34421d0
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 41 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ All notable changes to this project will be documented in this file.

### Added

- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad)
- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL. (@le-vlad)
- Field `metadata_max_age` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
- Field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` outputs. (@mihaitodor)
- New `redpanda_migrator_offsets` input. (@mihaitodor)
- Field `kafka_offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/enterprise/redpanda_common_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func init() {
time.Sleep(time.Millisecond * 100)
}
return
})
}, nil)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fields:
mapping: |
#!blobl
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "tls", "sasl").assign({"topics": ["__consumer_offsets"]})
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "fetch_max_bytes", "fetch_min_bytes", "fetch_max_partition_bytes", "tls", "sasl")
root = if this.redpanda_migrator.length() == 0 {
throw("the redpanda_migrator input must be configured")
Expand All @@ -63,7 +63,7 @@ mapping: |
- redpanda_migrator: %s
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz: %s
- redpanda_migrator_offsets: %s
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
""".format(this.schema_registry.string(), this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
Expand All @@ -86,7 +86,7 @@ mapping: |
- redpanda_migrator: %s
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz: %s
- redpanda_migrator_offsets: %s
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
""".format(this.schema_registry.string(), this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
Expand All @@ -97,7 +97,7 @@ mapping: |
- redpanda_migrator: %s
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz: %s
- redpanda_migrator_offsets: %s
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
""".format(this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
Expand Down Expand Up @@ -137,9 +137,8 @@ tests:
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
- redpanda_migrator_offsets:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
Expand Down Expand Up @@ -177,9 +176,8 @@ tests:
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
- redpanda_migrator_offsets:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
Expand All @@ -200,9 +198,8 @@ tests:
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
- redpanda_migrator_offsets:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
148 changes: 148 additions & 0 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed as a Redpanda Enterprise file under the Redpanda Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md

package enterprise

import (
"slices"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/connect/v4/internal/impl/kafka"
)

const (
// Consumer fields
rmoiFieldRackID = "rack_id"
rmoiFieldFetchMaxBytes = "fetch_max_bytes"
rmoiFieldFetchMinBytes = "fetch_min_bytes"
rmoiFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
)

func redpandaInputConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Categories("Services").
Summary(`Redpanda Migrator consumer group offsets output using the https://github.com/twmb/franz-go[Franz Kafka client library^].`).
Description(`
TODO: Description
== Metadata
This input adds the following metadata fields to each message:
` + "```text" + `
- kafka_key
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp_unix
- kafka_timestamp_ms
- kafka_tombstone_message
- kafka_offset_metadata
` + "```" + `
`).
Fields(redpandaInputConfigFields()...)
}

func redpandaInputConfigFields() []*service.ConfigField {
return slices.Concat(
kafka.FranzConnectionFields(),
[]*service.ConfigField{
service.NewStringField(rmoiFieldRackID).
Description("A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.").
Default("").
Advanced(),
service.NewStringField(rmoiFieldFetchMaxBytes).
Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.").
Advanced().
Default("50MiB"),
service.NewStringField(rmoiFieldFetchMinBytes).
Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.").
Advanced().
Default("1B"),
service.NewStringField(rmoiFieldFetchMaxPartitionBytes).
Description("Sets the maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This is the equivalent to the Java fetch.max.partition.bytes setting.").
Advanced().
Default("1MiB"),
},
kafka.FranzReaderOrderedConfigFields(),
[]*service.ConfigField{
service.NewAutoRetryNacksToggleField(),
},
)
}

func init() {
err := service.RegisterBatchInput("redpanda_migrator_offsets", redpandaInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
tmpOpts, err := kafka.FranzConnectionOptsFromConfig(conf, mgr.Logger())
if err != nil {
return nil, err
}
clientOpts := append([]kgo.Opt{}, tmpOpts...)

d := kafka.FranzConsumerDetails{}

if d.RackID, err = conf.FieldString(rmoiFieldRackID); err != nil {
return nil, err
}

d.Topics = []string{`__consumer_offsets`}

if d.FetchMaxBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxBytes, conf); err != nil {
return nil, err
}
if d.FetchMinBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMinBytes, conf); err != nil {
return nil, err
}
if d.FetchMaxPartitionBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxPartitionBytes, conf); err != nil {
return nil, err
}

clientOpts = append(clientOpts, d.FranzOpts()...)

rdr, err := kafka.NewFranzReaderOrderedFromConfig(conf, mgr, func() ([]kgo.Opt, error) {
return clientOpts, nil
}, func(record *kgo.Record) *service.Message {
key := kmsg.NewOffsetCommitKey()
// Check the version to ensure that we process only offset commit keys
if err := key.ReadFrom(record.Key); err != nil || (key.Version != 0 && key.Version != 1) {
return nil
}

offsetCommitValue := kmsg.NewOffsetCommitValue()
if err := offsetCommitValue.ReadFrom(record.Value); err != nil {
// Omit records we can't decode
return nil
}

msg := service.NewMessage(record.Value)
msg.MetaSetMut("kafka_key", record.Key)
msg.MetaSetMut("kafka_topic", record.Topic)
msg.MetaSetMut("kafka_partition", int(record.Partition))
msg.MetaSetMut("kafka_offset", int(record.Offset))
msg.MetaSetMut("kafka_timestamp_unix", record.Timestamp.Unix())
msg.MetaSetMut("kafka_timestamp_ms", record.Timestamp.UnixMilli())
msg.MetaSetMut("kafka_tombstone_message", record.Value == nil)
msg.MetaSetMut("kafka_offset_metadata", offsetCommitValue.Metadata)

return msg
})
if err != nil {
return nil, err
}

return service.AutoRetryNacksBatchedToggled(conf, rdr)
})
if err != nil {
panic(err)
}
}
52 changes: 38 additions & 14 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
rmooFieldMaxInFlight = "max_in_flight"
rmooFieldKafkaKey = "kafka_key"
rmooFieldKafkaKey = "kafka_key"
rmooFieldOffsetMetadata = "offset_metadata"
rmooFieldMaxInFlight = "max_in_flight"
)

func redpandaMigratorOffsetsOutputConfig() *service.ConfigSpec {
Expand All @@ -49,6 +50,9 @@ func RedpandaMigratorOffsetsOutputConfigFields() []*service.ConfigField {
[]*service.ConfigField{
service.NewInterpolatedStringField(rmooFieldKafkaKey).
Description("Kafka key.").Default("${! @kafka_key }"),
service.NewInterpolatedStringField(rmooFieldOffsetMetadata).
Description("The offset metadata value.").
Default(`${! @kafka_offset_metadata }`),
service.NewIntField(rmooFieldMaxInFlight).
Description("The maximum number of batches to be sending in parallel at any given time.").
Default(1),
Expand Down Expand Up @@ -80,10 +84,11 @@ func init() {

// RedpandaMigratorOffsetsWriter implements a Redpanda Migrator offsets writer using the franz-go library.
type RedpandaMigratorOffsetsWriter struct {
clientDetails *kafka.FranzConnectionDetails
clientOpts []kgo.Opt
kafkaKey *service.InterpolatedString
backoffCtor func() backoff.BackOff
clientDetails *kafka.FranzConnectionDetails
clientOpts []kgo.Opt
kafkaKey *service.InterpolatedString
offsetMetadata *service.InterpolatedString
backoffCtor func() backoff.BackOff

connMut sync.Mutex
client *kadm.Client
Expand All @@ -106,6 +111,10 @@ func NewRedpandaMigratorOffsetsWriterFromConfig(conf *service.ParsedConfig, mgr
return nil, err
}

if w.offsetMetadata, err = conf.FieldInterpolatedString(rmooFieldOffsetMetadata); err != nil {
return nil, err
}

if w.clientOpts, err = kafka.FranzProducerLimitsOptsFromConfig(conf); err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +176,6 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

var kafkaKey []byte
var err error
// TODO: The `kafka_key` metadata field is cast from `[]byte` to string in the `kafka_franz` input, which is wrong.
if kafkaKey, err = w.kafkaKey.TryBytes(msg); err != nil {
return fmt.Errorf("failed to extract kafka key: %w", err)
}
Expand All @@ -183,13 +191,20 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
return fmt.Errorf("failed to get message bytes: %s", err)
}

val := kmsg.NewOffsetCommitValue()
if err := val.ReadFrom(msgBytes); err != nil {
offsetCommitValue := kmsg.NewOffsetCommitValue()
if err := offsetCommitValue.ReadFrom(msgBytes); err != nil {
return fmt.Errorf("failed to decode offset commit value: %s", err)
}

var offsetMetadata string
if w.offsetMetadata != nil {
if offsetMetadata, err = w.offsetMetadata.TryString(msg); err != nil {
return fmt.Errorf("failed to extract offset metadata: %w", err)
}
}

updateConsumerOffsets := func() error {
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, val.CommitTimestamp, key.Topic)
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, offsetCommitValue.CommitTimestamp, key.Topic)
if err != nil {
return fmt.Errorf("failed to translate consumer offsets: %s", err)
}
Expand All @@ -198,11 +213,20 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
return fmt.Errorf("listed offsets returned and error: %s", err)
}

// TODO: Add metadata to offsets!
offsets := listedOffsets.Offsets()
offsets.KeepFunc(func(o kadm.Offset) bool {
return o.Partition == key.Partition
})
// Logic extracted from offsets.KeepFunc() and adjusted to set the metadata.
for topic, partitionOffsets := range offsets {
for partition, offset := range partitionOffsets {
if offset.Partition != key.Partition {
delete(partitionOffsets, partition)
}
offset.Metadata = offsetMetadata
partitionOffsets[partition] = offset
}
if len(partitionOffsets) == 0 {
delete(offsets, topic)
}
}

offsetResponses, err := w.client.CommitOffsets(ctx, key.Group, offsets)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/impl/kafka/franz_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func bytesFromStrField(name string, pConf *service.ParsedConfig) (uint64, error)
return fieldAsBytes, nil
}

func bytesFromStrFieldAsInt32(name string, pConf *service.ParsedConfig) (int32, error) {
// BytesFromStrFieldAsInt32 attempts to parse string field containing a human-readable byte size
func BytesFromStrFieldAsInt32(name string, pConf *service.ParsedConfig) (int32, error) {
ui64, err := bytesFromStrField(name, pConf)
if err != nil {
return 0, err
Expand Down Expand Up @@ -161,13 +162,13 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
return nil, err
}

if d.FetchMaxBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMaxBytes, conf); err != nil {
if d.FetchMaxBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMaxBytes, conf); err != nil {
return nil, err
}
if d.FetchMinBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMinBytes, conf); err != nil {
if d.FetchMinBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMinBytes, conf); err != nil {
return nil, err
}
if d.FetchMaxPartitionBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMaxPartitionBytes, conf); err != nil {
if d.FetchMaxPartitionBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMaxPartitionBytes, conf); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 34421d0

Please sign in to comment.