From 335ef085141167fe6d412207de975820bc060658 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Thu, 16 Jan 2025 10:52:05 +0100 Subject: [PATCH] fix: better protect against nil topicAttributeFunc set the default func in the common config instead of the metrics hook --- kafka/common.go | 7 +++++++ kafka/common_test.go | 1 + kafka/consumer_test.go | 36 ++++++++++++++++-------------------- kafka/metrics.go | 6 ------ 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/kafka/common.go b/kafka/common.go index d2284b3f..ae2ff716 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -34,6 +34,7 @@ import ( "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/plugin/kzap" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -247,6 +248,12 @@ func (cfg *CommonConfig) finalize() error { if cfg.TopicLogFieldFunc != nil { cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc) } + if cfg.TopicAttributeFunc == nil { + cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue { + return attribute.KeyValue{} + } + } + return errors.Join(errs...) } diff --git a/kafka/common_test.go b/kafka/common_test.go index 31c68d0e..7cddc7f4 100644 --- a/kafka/common_test.go +++ b/kafka/common_test.go @@ -47,6 +47,7 @@ func TestCommonConfig(t *testing.T) { t.Helper() err := in.finalize() require.NoError(t, err) + in.TopicAttributeFunc = nil in.TopicLogFieldFunc = nil in.hooks = nil assert.Equal(t, expected, in) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index eeb08516..1dc613cf 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -780,10 +780,7 @@ func TestConsumerConfigFinalizer(t *testing.T) { } err := cfg.finalize() require.NoError(t, err) - assert.NotNil(t, cfg.Processor) - cfg.Processor = nil - assert.NotNil(t, cfg.Logger) - cfg.Logger = nil + assertNotNilOptions(t, &cfg) assert.Equal(t, ConsumerConfig{ CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, @@ -805,10 +802,7 @@ func TestConsumerConfigFinalizer(t *testing.T) { } err := cfg.finalize() require.NoError(t, err) - assert.NotNil(t, cfg.Processor) - cfg.Processor = nil - assert.NotNil(t, cfg.Logger) - cfg.Logger = nil + assertNotNilOptions(t, &cfg) assert.Equal(t, ConsumerConfig{ CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, @@ -830,10 +824,7 @@ func TestConsumerConfigFinalizer(t *testing.T) { } err := cfg.finalize() require.NoError(t, err) - assert.NotNil(t, cfg.Processor) - cfg.Processor = nil - assert.NotNil(t, cfg.Logger) - cfg.Logger = nil + assertNotNilOptions(t, &cfg) assert.Equal(t, ConsumerConfig{ CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, @@ -855,10 +846,7 @@ func TestConsumerConfigFinalizer(t *testing.T) { } err := cfg.finalize() require.NoError(t, err) - assert.NotNil(t, cfg.Processor) - cfg.Processor = nil - assert.NotNil(t, cfg.Logger) - cfg.Logger = nil + assertNotNilOptions(t, &cfg) assert.Equal(t, ConsumerConfig{ CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, @@ -880,10 +868,7 @@ func TestConsumerConfigFinalizer(t *testing.T) { } err := cfg.finalize() require.NoError(t, err) - assert.NotNil(t, cfg.Processor) - cfg.Processor = nil - assert.NotNil(t, cfg.Logger) - cfg.Logger = nil + assertNotNilOptions(t, &cfg) assert.Equal(t, ConsumerConfig{ CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, @@ -909,6 +894,17 @@ func TestConsumerConfigFinalizer(t *testing.T) { }) } +func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) { + t.Helper() + + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + assert.NotNil(t, cfg.TopicAttributeFunc) + cfg.TopicAttributeFunc = nil +} + func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { if cfg.MaxPollWait <= 0 { // Lower MaxPollWait, ShutdownGracePeriod to speed up execution. diff --git a/kafka/metrics.go b/kafka/metrics.go index 2e883456..09f4fb89 100644 --- a/kafka/metrics.go +++ b/kafka/metrics.go @@ -299,12 +299,6 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string, return nil, formatMetricError(throttlingDurationKey, err) } - if topicAttributeFunc == nil { - topicAttributeFunc = func(topic string) attribute.KeyValue { - return attribute.KeyValue{} - } - } - return &metricHooks{ namespace: namespace, topicPrefix: topicPrefix,