Skip to content

Commit

Permalink
kafka consumer: Limit MaxBrokerReadBytes (#606)
Browse files Browse the repository at this point in the history
Limits the `MaxBrokerReadBytes` and `MaxFetchBytes` to the maximum value
that is accepted by `franz-go` library.

---------

Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
  • Loading branch information
marclop authored Dec 10, 2024
1 parent 8dfaf4c commit cc45977
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
29 changes: 29 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,6 +151,34 @@ func (cfg *ConsumerConfig) finalize() error {
if cfg.FetchMinBytes < 0 {
errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative"))
}
if cfg.BrokerMaxReadBytes < 0 {
errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
}
if cfg.MaxPollPartitionBytes > 1<<30 {
cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB")
cfg.MaxPollPartitionBytes = 1 << 30
}
if cfg.BrokerMaxReadBytes > 1<<30 {
cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
cfg.BrokerMaxReadBytes = 1 << 30
}
if cfg.MaxPollBytes > 0 {
// math.MaxInt32 is 1<<31-1.
if cfg.MaxPollBytes > 1<<30 {
cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB")
cfg.MaxPollBytes = 1 << 30
}
if cfg.BrokerMaxReadBytes == 0 {
cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest")
cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30))
}
if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes {
errs = append(errs, fmt.Errorf(
"kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)",
cfg.BrokerMaxReadBytes, cfg.MaxPollBytes,
))
}
}
return errors.Join(errs...)
}

Expand Down
146 changes: 146 additions & 0 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,152 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) {
})
}

func TestConsumerConfigFinalizer(t *testing.T) {
proc := apmqueue.ProcessorFunc(func(context.Context, apmqueue.Record) error { return nil })
ccfg := CommonConfig{
Brokers: []string{"localhost:9092"},
Logger: zapTest(t),
}
t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
BrokerMaxReadBytes: 1 << 21,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
BrokerMaxReadBytes: 1 << 29,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1<<31 - 1,
MaxPollPartitionBytes: 1<<31 - 1,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("BrokerMaxReadBytes is less than MaxPollBytes", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
BrokerMaxReadBytes: 1,
MaxPollBytes: 1<<31 - 1,
MaxPollPartitionBytes: 1<<31 - 1,
}
err := cfg.finalize()
assert.EqualError(t, err, "kafka: BrokerMaxReadBytes (1) cannot be less than MaxPollBytes (1073741824)")
})
}

func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {
if cfg.MaxPollWait <= 0 {
// Lower MaxPollWait, ShutdownGracePeriod to speed up execution.
Expand Down

0 comments on commit cc45977

Please sign in to comment.