Skip to content

Commit 840b002

Browse files
authored
Added a new config for the Kafka consumer group name (#5479)
1 parent 1511c18 commit 840b002

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public class KafkaSqlFactory {
8989
"ssl.endpoint.identification.algorithm=" })
9090
Properties consumerProperties;
9191

92+
@ConfigProperty(name = "apicurio.kafkasql.consumer.group-prefix", defaultValue = "apicurio-")
93+
@Info(category = "storage", description = "Kafka sql storage prefix for consumer group name")
94+
String groupPrefix;
95+
9296
@Inject
9397
@RegistryProperties(value = { "apicurio.kafka.common", "apicurio.kafkasql.admin" }, empties = {
9498
"ssl.endpoint.identification.algorithm=" })
@@ -256,8 +260,10 @@ public ProducerActions<KafkaSqlMessageKey, KafkaSqlMessage> createKafkaJournalPr
256260
public KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> createKafkaJournalConsumer() {
257261
Properties props = (Properties) consumerProperties.clone();
258262

263+
String consumerGroupId = groupPrefix + UUID.randomUUID().toString();
264+
259265
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
260-
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
266+
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
261267
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
262268
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
263269
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -305,8 +311,10 @@ public ProducerActions<String, String> createKafkaSnapshotsProducer() {
305311
public KafkaConsumer<String, String> createKafkaSnapshotsConsumer() {
306312
Properties props = (Properties) consumerProperties.clone();
307313

314+
String consumerGroupId = groupPrefix + UUID.randomUUID().toString();
315+
308316
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
309-
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
317+
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
310318
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
311319
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
312320
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

0 commit comments

Comments
 (0)