Skip to content

Commit

Permalink
kafka: manager: List topics before creating them
Browse files Browse the repository at this point in the history
Updates the manager `CreateTopics` method to only issue the CreateTopics
call if the topics don't exist in Kafka, preventing undesirable errors
from being returned and logged.

Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
  • Loading branch information
marclop committed Oct 30, 2023
1 parent 8c6a813 commit 27f8c0e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 44 deletions.
100 changes: 62 additions & 38 deletions kafka/topiccreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,51 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
responses, err := c.m.adminClient.CreateTopics(
ctx,

existing, err := c.m.adminClient.ListTopics(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to list kafka topics: %w", err)
}

// Build two lists, one for the topics that haven't been created yet, and
// another one that can be used to update a topic's partitions.
missingTopics := make([]string, 0, len(topicNames))
updatePartitions := make([]string, 0, len(topicNames))
for _, wantTopic := range topicNames {
if !existing.Has(wantTopic) {
missingTopics = append(missingTopics, wantTopic)
continue
}
if len(existing[wantTopic].Partitions) < c.partitionCount {
updatePartitions = append(updatePartitions, wantTopic)
}
}
fmt.Printf("%+v\n", missingTopics)

responses, err := c.m.adminClient.CreateTopics(ctx,
int32(c.partitionCount),
-1, // default.replication.factor
c.topicConfigs,
topicNames...,
missingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to create kafka topics: %w", err)
}
createTopicParamsFields := []zap.Field{
loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
createTopicParamsFields = append(createTopicParamsFields,
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
}

existingTopics := make([]string, 0, len(topicNames))
var updateErrors []error
logger := c.m.cfg.Logger.With(createTopicParamsFields...)
logger := c.m.cfg.Logger.With(loggerFields...)
for _, response := range responses.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
if err := response.Err; err != nil {
Expand All @@ -137,7 +158,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
span.AddEvent("kafka topic already exists", trace.WithAttributes(
semconv.MessagingDestinationKey.String(topicName),
))
existingTopics = append(existingTopics, response.Topic)
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -150,13 +170,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
logger.Info("created kafka topic", zap.String("topic", topicName))
}

if len(existingTopics) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, existingTopics...)
// Update the topic partitions.
if len(updatePartitions) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
c.partitionCount,
updatePartitions...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
existingTopics, err,
updatePartitions, err,
)
}
for _, response := range updateResp.Sorted() {
Expand All @@ -180,37 +204,37 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
zap.String("topic", topicName),
)
}
if len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg,
existingTopics...,
}
if len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
alterCfg, topicNames...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
topicNames, err,
)
if err != nil {
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
}
return errors.Join(updateErrors...)
Expand Down
48 changes: 42 additions & 6 deletions kafka/topiccreator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
commonConfig.Logger = zap.New(core)
commonConfig.TracerProvider = tp

m, err := NewManager(ManagerConfig{
CommonConfig: commonConfig,
})
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })
c, err := m.NewTopicCreator(TopicCreatorConfig{
Expand All @@ -80,6 +78,19 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
})
require.NoError(t, err)

cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.MetadataResponse{
Version: r.GetVersion(),
Topics: []kmsg.MetadataResponseTopic{{
Topic: kmsg.StringPtr("name_space-topic0"),
TopicID: [16]byte{111},
Partitions: []kmsg.MetadataResponseTopicPartition{{
Partition: 0,
}},
}},
}, nil, true
})

// Simulate a situation where topic1, topic4 exists, topic2 is invalid and
// topic3 is successfully created.
var createTopicsRequest *kmsg.CreateTopicsRequest
Expand Down Expand Up @@ -111,8 +122,9 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest)
return &kmsg.CreatePartitionsResponse{
Version: createPartitionsRequest.Version,
Version: req.GetVersion(),
Topics: []kmsg.CreatePartitionsResponseTopic{
{Topic: "name_space-topic0"},
{Topic: "name_space-topic1"},
{Topic: "name_space-topic4"},
},
Expand All @@ -123,8 +135,12 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest)
return &kmsg.IncrementalAlterConfigsResponse{
Version: alterConfigsRequest.Version,
Version: alterConfigsRequest.GetVersion(),
Resources: []kmsg.IncrementalAlterConfigsResponseResource{
{
ResourceName: "name_space-topic0",
ResourceType: kmsg.ConfigResourceTypeTopic,
},
{
ResourceName: "name_space-topic1",
ResourceType: kmsg.ConfigResourceTypeTopic,
Expand All @@ -136,7 +152,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
},
}, nil, true
})
err = c.CreateTopics(context.Background(), "topic1", "topic2", "topic3", "topic4")
err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4")
require.Error(t, err)
assert.EqualError(t, err, `failed to create topic "topic2": `+
`INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`,
Expand Down Expand Up @@ -177,6 +193,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
}},
}}, createTopicsRequest.Topics)

assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{
{Topic: "name_space-topic0", Count: 123},
}, createPartitionsRequest.Topics)

matchingLogs := observedLogs.FilterFieldKey("topic")
for _, ml := range matchingLogs.AllUntimed() {
t.Log(ml.Message)
Expand Down Expand Up @@ -241,6 +261,22 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic4"),
},
}, {
Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic0"),
},
}, {
Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic0"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
Expand Down

0 comments on commit 27f8c0e

Please sign in to comment.