Skip to content

Commit

Permalink
Merge pull request #3100 from redpanda-data/jb/redpanda-input-fetch-m…
Browse files Browse the repository at this point in the history
…ax-wait

inputs/redpanda: add fetch_max_wait option
  • Loading branch information
rockwotj authored Jan 3, 2025
2 parents 21a9d97 + b706752 commit b2697c6
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)
- `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, `redpanda_migrator` now support `fetch_max_wait` configuration field.

### Fixed

Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ input:
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
Expand Down Expand Up @@ -571,6 +572,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note
*Default*: `"50MiB"`
=== `fetch_max_wait`
Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.
*Type*: `string`
*Default*: `"5s"`
=== `fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ input:
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
Expand Down Expand Up @@ -368,6 +369,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note
*Default*: `"50MiB"`
=== `kafka.fetch_max_wait`
Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.
*Type*: `string`
*Default*: `"5s"`
=== `kafka.fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ input:
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
Expand Down Expand Up @@ -591,6 +592,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note
*Default*: `"50MiB"`
=== `fetch_max_wait`
Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.
*Type*: `string`
*Default*: `"5s"`
=== `fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ input:
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
Expand Down Expand Up @@ -191,6 +192,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note
*Default*: `"50MiB"`
=== `fetch_max_wait`
Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.
*Type*: `string`
*Default*: `"5s"`
=== `fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ input:
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
Expand Down Expand Up @@ -578,6 +579,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note
*Default*: `"50MiB"`
=== `fetch_max_wait`
Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.
*Type*: `string`
*Default*: `"5s"`
=== `fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
Expand Down
12 changes: 12 additions & 0 deletions internal/impl/kafka/franz_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka

import (
"fmt"
"time"

"github.com/dustin/go-humanize"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -53,6 +54,7 @@ const (
kfrFieldFetchMaxBytes = "fetch_max_bytes"
kfrFieldFetchMinBytes = "fetch_min_bytes"
kfrFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
kfrFieldFetchMaxWait = "fetch_max_wait"
)

// FranzConsumerFields returns a slice of fields specifically for customising
Expand Down Expand Up @@ -87,6 +89,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add
Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.").
Advanced().
Default("50MiB"),
service.NewDurationField(kfrFieldFetchMaxWait).
Description("Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.").
Advanced().
Default("5s"),
service.NewStringField(kfrFieldFetchMinBytes).
Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.").
Advanced().
Expand All @@ -109,6 +115,7 @@ type FranzConsumerDetails struct {
FetchMinBytes int32
FetchMaxBytes int32
FetchMaxPartitionBytes int32
FetchMaxWait time.Duration
}

// FranzConsumerDetailsFromConfig returns a summary of kafka consumer
Expand Down Expand Up @@ -171,6 +178,10 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
return nil, err
}

if d.FetchMaxWait, err = conf.FieldDuration(kfrFieldFetchMaxWait); err != nil {
return nil, err
}

return &d, nil
}

Expand All @@ -185,6 +196,7 @@ func (d *FranzConsumerDetails) FranzOpts() []kgo.Opt {
kgo.FetchMaxBytes(d.FetchMaxBytes),
kgo.FetchMinBytes(d.FetchMinBytes),
kgo.FetchMaxPartitionBytes(d.FetchMaxPartitionBytes),
kgo.FetchMaxWait(d.FetchMaxWait),
}

if d.RegexPattern {
Expand Down

0 comments on commit b2697c6

Please sign in to comment.