diff --git a/CHANGELOG.md b/CHANGELOG.md index 18e1152f96..2cb7ff88ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file. - `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj) - Field `content_type` added to the `amqp_1` output. (@timo102) +- `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, `redpanda_migrator` now support `fetch_max_wait` configuration field. ### Fixed diff --git a/docs/modules/components/pages/inputs/kafka_franz.adoc b/docs/modules/components/pages/inputs/kafka_franz.adoc index ab35851710..54ca58a9d4 100644 --- a/docs/modules/components/pages/inputs/kafka_franz.adoc +++ b/docs/modules/components/pages/inputs/kafka_franz.adoc @@ -72,6 +72,7 @@ input: rack_id: "" start_from_oldest: true fetch_max_bytes: 50MiB + fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) @@ -571,6 +572,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note *Default*: `"50MiB"` +=== `fetch_max_wait` + +Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting. + + +*Type*: `string` + +*Default*: `"5s"` + === `fetch_min_bytes` Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting. diff --git a/docs/modules/components/pages/inputs/ockam_kafka.adoc b/docs/modules/components/pages/inputs/ockam_kafka.adoc index dc342740ba..2b2b4a1759 100644 --- a/docs/modules/components/pages/inputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/inputs/ockam_kafka.adoc @@ -77,6 +77,7 @@ input: rack_id: "" start_from_oldest: true fetch_max_bytes: 50MiB + fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) @@ -368,6 +369,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note *Default*: `"50MiB"` +=== `kafka.fetch_max_wait` + +Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting. + + +*Type*: `string` + +*Default*: `"5s"` + === `kafka.fetch_min_bytes` Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting. diff --git a/docs/modules/components/pages/inputs/redpanda.adoc b/docs/modules/components/pages/inputs/redpanda.adoc index 381400ead6..af0dff4358 100644 --- a/docs/modules/components/pages/inputs/redpanda.adoc +++ b/docs/modules/components/pages/inputs/redpanda.adoc @@ -70,6 +70,7 @@ input: rack_id: "" start_from_oldest: true fetch_max_bytes: 50MiB + fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) @@ -591,6 +592,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note *Default*: `"50MiB"` +=== `fetch_max_wait` + +Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting. + + +*Type*: `string` + +*Default*: `"5s"` + === `fetch_min_bytes` Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting. diff --git a/docs/modules/components/pages/inputs/redpanda_common.adoc b/docs/modules/components/pages/inputs/redpanda_common.adoc index ab0b6a912c..ea11508757 100644 --- a/docs/modules/components/pages/inputs/redpanda_common.adoc +++ b/docs/modules/components/pages/inputs/redpanda_common.adoc @@ -58,6 +58,7 @@ input: rack_id: "" start_from_oldest: true fetch_max_bytes: 50MiB + fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) @@ -191,6 +192,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note *Default*: `"50MiB"` +=== `fetch_max_wait` + +Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting. + + +*Type*: `string` + +*Default*: `"5s"` + === `fetch_min_bytes` Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting. diff --git a/docs/modules/components/pages/inputs/redpanda_migrator.adoc b/docs/modules/components/pages/inputs/redpanda_migrator.adoc index efbb5b8034..94acea9fab 100644 --- a/docs/modules/components/pages/inputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/inputs/redpanda_migrator.adoc @@ -72,6 +72,7 @@ input: rack_id: "" start_from_oldest: true fetch_max_bytes: 50MiB + fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) @@ -578,6 +579,15 @@ Sets the maximum amount of bytes a broker will try to send during a fetch. Note *Default*: `"50MiB"` +=== `fetch_max_wait` + +Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting. + + +*Type*: `string` + +*Default*: `"5s"` + === `fetch_min_bytes` Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting. diff --git a/internal/impl/kafka/franz_reader.go b/internal/impl/kafka/franz_reader.go index 98894805c1..345900e321 100644 --- a/internal/impl/kafka/franz_reader.go +++ b/internal/impl/kafka/franz_reader.go @@ -16,6 +16,7 @@ package kafka import ( "fmt" + "time" "github.com/dustin/go-humanize" "github.com/twmb/franz-go/pkg/kgo" @@ -53,6 +54,7 @@ const ( kfrFieldFetchMaxBytes = "fetch_max_bytes" kfrFieldFetchMinBytes = "fetch_min_bytes" kfrFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes" + kfrFieldFetchMaxWait = "fetch_max_wait" ) // FranzConsumerFields returns a slice of fields specifically for customising @@ -87,6 +89,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting."). Advanced(). Default("50MiB"), + service.NewDurationField(kfrFieldFetchMaxWait). + Description("Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting."). + Advanced(). + Default("5s"), service.NewStringField(kfrFieldFetchMinBytes). Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting."). Advanced(). @@ -109,6 +115,7 @@ type FranzConsumerDetails struct { FetchMinBytes int32 FetchMaxBytes int32 FetchMaxPartitionBytes int32 + FetchMaxWait time.Duration } // FranzConsumerDetailsFromConfig returns a summary of kafka consumer @@ -171,6 +178,10 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD return nil, err } + if d.FetchMaxWait, err = conf.FieldDuration(kfrFieldFetchMaxWait); err != nil { + return nil, err + } + return &d, nil } @@ -185,6 +196,7 @@ func (d *FranzConsumerDetails) FranzOpts() []kgo.Opt { kgo.FetchMaxBytes(d.FetchMaxBytes), kgo.FetchMinBytes(d.FetchMinBytes), kgo.FetchMaxPartitionBytes(d.FetchMaxPartitionBytes), + kgo.FetchMaxWait(d.FetchMaxWait), } if d.RegexPattern {