Skip to content

Commit

Permalink
feat: add missing project.id to consumer logger (#576)
Browse files Browse the repository at this point in the history
## πŸ§‘β€πŸ’» What

the main consumer logger is passed to internal topiconsumer which are
not logging the additional fields from the field func.
The topicconsumer doesn't have access to the config in the public
Consumer.
As a solution, the logfieldfunc is passed as well to delegate logging of
the addtional field

## ❓ Why

Logs should be consistent so they can be correlated

## βœ… How

Look at log messages before and after
  • Loading branch information
kruskall authored Sep 27, 2024
1 parent 9df9e40 commit c9aa4c8
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
namespacePrefix := cfg.namespacePrefix()
consumer := &consumer{
topicPrefix: namespacePrefix,
logFieldFn: cfg.TopicLogFieldFunc,
assignments: make(map[topicPartition]*pc),
processor: cfg.Processor,
logger: cfg.Logger.Named("partition"),
Expand Down Expand Up @@ -394,6 +395,7 @@ type consumer struct {
processor apmqueue.Processor
logger *zap.Logger
delivery apmqueue.DeliveryType
logFieldFn TopicLogFieldFunc
// ctx contains the graceful cancellation context that is passed to the
// partition consumers.
ctx context.Context
Expand All @@ -412,11 +414,16 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[
for topic, partitions := range assigned {
for _, partition := range partitions {
t := strings.TrimPrefix(topic, c.topicPrefix)
logger := c.logger.With(
zap.String("topic", t),
zap.Int32("partition", partition),
)
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(t))
}

pc := newPartitionConsumer(c.ctx, client, c.processor,
c.delivery, t, c.logger.With(
zap.String("topic", t),
zap.Int32("partition", partition),
),
c.delivery, t, logger,
)
c.assignments[topicPartition{topic: topic, partition: partition}] = pc
}
Expand Down Expand Up @@ -492,12 +499,17 @@ func (c *consumer) processFetch(fetches kgo.Fetches) {
// NOTE(marclop) While possible, this is unlikely to happen given the
// locking that's in place in the caller.
if c.delivery == apmqueue.AtMostOnceDeliveryType {
c.logger.Warn(
topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
logger := c.logger
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(topicName))
}
logger.Warn(
"data loss: failed to send records to process after commit",
zap.Error(errors.New(
"attempted to process records for revoked partition",
)),
zap.String("topic", strings.TrimPrefix(ftp.Topic, c.topicPrefix)),
zap.String("topic", topicName),
zap.Int32("partition", ftp.Partition),
zap.Int64("offset", ftp.HighWatermark),
zap.Int("records", len(ftp.Records)),
Expand Down

0 comments on commit c9aa4c8

Please sign in to comment.