Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Redpanda Migrator components #3026

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
component_type_dropdown::[]


Redpanda Migrator consumer group offsets output using the https://github.com/twmb/franz-go[Franz Kafka client library^].
Redpanda Migrator consumer group offsets input using the https://github.com/twmb/franz-go[Franz Kafka client library^].

Introduced in version 4.44.0.
Introduced in version 4.45.0.


[tabs]
Expand Down Expand Up @@ -70,9 +70,6 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
fetch_max_bytes: 50MiB
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
Expand Down Expand Up @@ -533,33 +530,6 @@ A rack specifies where the client is physically located and changes fetch reques

*Default*: `""`

=== `fetch_max_bytes`

Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.


*Type*: `string`

*Default*: `"50MiB"`

=== `fetch_min_bytes`

Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.


*Type*: `string`

*Default*: `"1B"`

=== `fetch_max_partition_bytes`

Sets the maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This is the equivalent to the Java fetch.max.partition.bytes setting.


*Type*: `string`

*Default*: `"1MiB"`

=== `consumer_group`

An optional consumer group to consume as. When specified the partitions of specified topics are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically committed and resumed under this name. Consumer groups are not supported when specifying explicit partitions to consume from in the `topics` field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fields:
mapping: |
#!blobl

let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "topics", "regexp_topics", "consumer_group", "client_id", "rack_id", "fetch_max_bytes", "fetch_min_bytes", "fetch_max_partition_bytes", "tls", "sasl")
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "topics", "regexp_topics", "consumer_group", "topic_lag_refresh_period", "client_id", "rack_id", "tls", "sasl")

root = if this.redpanda_migrator.length() == 0 {
throw("the redpanda_migrator input must be configured")
Expand Down
51 changes: 16 additions & 35 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,17 @@ import (

const (
// Consumer fields
rmoiFieldTopics = "topics"
rmoiFieldRegexpTopics = "regexp_topics"
rmoiFieldRackID = "rack_id"
rmoiFieldFetchMaxBytes = "fetch_max_bytes"
rmoiFieldFetchMinBytes = "fetch_min_bytes"
rmoiFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
rmoiFieldTopics = "topics"
rmoiFieldRegexpTopics = "regexp_topics"
rmoiFieldRackID = "rack_id"
)

func redpandaMigratorOffsetsInputConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Categories("Services").
Version("4.44.0").
Summary(`Redpanda Migrator consumer group offsets output using the https://github.com/twmb/franz-go[Franz Kafka client library^].`).
Version("4.45.0").
Summary(`Redpanda Migrator consumer group offsets input using the https://github.com/twmb/franz-go[Franz Kafka client library^].`).
Description(`
TODO: Description

Expand Down Expand Up @@ -81,18 +78,6 @@ A list of topics to consume from. Multiple comma separated topics can be listed
Description("A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.").
Default("").
Advanced(),
service.NewStringField(rmoiFieldFetchMaxBytes).
Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.").
Advanced().
Default("50MiB"),
service.NewStringField(rmoiFieldFetchMinBytes).
Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.").
Advanced().
Default("1B"),
service.NewStringField(rmoiFieldFetchMaxPartitionBytes).
Description("Sets the maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This is the equivalent to the Java fetch.max.partition.bytes setting.").
Advanced().
Default("1MiB"),
},
kafka.FranzReaderOrderedConfigFields(),
[]*service.ConfigField{
Expand All @@ -104,13 +89,12 @@ A list of topics to consume from. Multiple comma separated topics can be listed
func init() {
err := service.RegisterBatchInput("redpanda_migrator_offsets", redpandaMigratorOffsetsInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
tmpOpts, err := kafka.FranzConnectionOptsFromConfig(conf, mgr.Logger())
clientOpts, err := kafka.FranzConnectionOptsFromConfig(conf, mgr.Logger())
if err != nil {
return nil, err
}
clientOpts := append([]kgo.Opt{}, tmpOpts...)

d := kafka.FranzConsumerDetails{}
clientOpts = append(clientOpts, kgo.ClientID("xxxxxxxxxxxxxxxxxxxxxx"))

var topics []string
if topicList, err := conf.FieldStringList(rmoiFieldTopics); err != nil {
Expand Down Expand Up @@ -139,23 +123,20 @@ func init() {
}
}

if d.RackID, err = conf.FieldString(rmoiFieldRackID); err != nil {
var rackID string
if rackID, err = conf.FieldString(rmoiFieldRackID); err != nil {
return nil, err
}
clientOpts = append(clientOpts, kgo.Rack(rackID))

if d.FetchMaxBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxBytes, conf); err != nil {
return nil, err
}
if d.FetchMinBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMinBytes, conf); err != nil {
return nil, err
}
if d.FetchMaxPartitionBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxPartitionBytes, conf); err != nil {
return nil, err
}
// Configure `start_from_oldest: true`
// This is probably not necessary since `__consumer_offsets` is a compacted topic
mihaitodor marked this conversation as resolved.
Show resolved Hide resolved
clientOpts = append(clientOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()))

// Consume messages from the `__consumer_offsets` topic
d.Topics = []string{`__consumer_offsets`}
clientOpts = append(clientOpts, d.FranzOpts()...)
clientOpts = append(clientOpts, kgo.ConsumeTopics("__consumer_offsets"))

clientOpts = append(clientOpts, kgo.WithLogger(&kafka.KGoLogger{L: mgr.Logger()}))

rdr, err := kafka.NewFranzReaderOrderedFromConfig(conf, mgr, func() ([]kgo.Opt, error) {
return clientOpts, nil
Expand Down