diff --git a/kafka/consumer.go b/kafka/consumer.go index ad31b227..3d603eae 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math" "strings" "sync" "time" @@ -150,6 +151,34 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.FetchMinBytes < 0 { errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative")) } + if cfg.BrokerMaxReadBytes < 0 { + errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) + } + if cfg.MaxPollPartitionBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") + cfg.MaxPollPartitionBytes = 1 << 30 + } + if cfg.BrokerMaxReadBytes > 1<<30 { + cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") + cfg.BrokerMaxReadBytes = 1 << 30 + } + if cfg.MaxPollBytes > 0 { + // math.MaxInt32 is 1<<31-1. + if cfg.MaxPollBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB") + cfg.MaxPollBytes = 1 << 30 + } + if cfg.BrokerMaxReadBytes == 0 { + cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest") + cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30)) + } + if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes { + errs = append(errs, fmt.Errorf( + "kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)", + cfg.BrokerMaxReadBytes, cfg.MaxPollBytes, + )) + } + } return errors.Join(errs...) } diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index a3ed2f1b..eeb08516 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -763,6 +763,152 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) { }) } +func TestConsumerConfigFinalizer(t *testing.T) { + proc := apmqueue.ProcessorFunc(func(context.Context, apmqueue.Record) error { return nil }) + ccfg := CommonConfig{ + Brokers: []string{"localhost:9092"}, + Logger: zapTest(t), + } + t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, + BrokerMaxReadBytes: 1 << 21, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, + BrokerMaxReadBytes: 1 << 29, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1<<31 - 1, + MaxPollPartitionBytes: 1<<31 - 1, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) + t.Run("BrokerMaxReadBytes is less than MaxPollBytes", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + BrokerMaxReadBytes: 1, + MaxPollBytes: 1<<31 - 1, + MaxPollPartitionBytes: 1<<31 - 1, + } + err := cfg.finalize() + assert.EqualError(t, err, "kafka: BrokerMaxReadBytes (1) cannot be less than MaxPollBytes (1073741824)") + }) +} + func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { if cfg.MaxPollWait <= 0 { // Lower MaxPollWait, ShutdownGracePeriod to speed up execution.