Skip to content

Commit 916ebec

Browse files
committed
Polish "Support relative topicPattern in PulsarListener"
Adds to the previous commit by - replacing commons StringUtils with Spring StringUtils - only resetting the topics pattern if it changed Signed-off-by: onobc <chris.bono@gmail.com>
1 parent cab35a7 commit 916ebec

File tree

4 files changed

+28
-17
lines changed

4 files changed

+28
-17
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.List;
2121
import java.util.regex.Pattern;
2222

23-
import org.apache.commons.lang3.StringUtils;
2423
import org.apache.pulsar.client.api.Schema;
2524
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
2625
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
@@ -29,6 +28,7 @@
2928

3029
import org.springframework.pulsar.core.PulsarTopicBuilder;
3130
import org.springframework.util.CollectionUtils;
31+
import org.springframework.util.StringUtils;
3232

3333
/**
3434
* Default implementation for {@link ReactivePulsarConsumerFactory}.
@@ -109,10 +109,12 @@ protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<
109109
}
110110
var mutableSpec = consumerBuilder.getMutableSpec();
111111
var topicsPattern = mutableSpec.getTopicsPattern();
112-
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
112+
if (topicsPattern != null && StringUtils.hasText(topicsPattern.pattern())) {
113113
var topicsPatternStr = topicsPattern.pattern();
114-
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
115-
mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
114+
var fqTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
115+
if (!topicsPatternStr.equals(fqTopicsPatternStr)) {
116+
mutableSpec.setTopicsPattern(Pattern.compile(fqTopicsPatternStr));
117+
}
116118
}
117119
}
118120

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void createConsumerEnsureTopicsPatternFullyQualified() {
138138
var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*";
139139
var consumer = consumerFactory.createConsumer(SCHEMA,
140140
Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern))));
141-
ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer)
141+
var reactiveMessageConsumerSpec = assertThat(consumer)
142142
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
143143
.actual();
144144
assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern);

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.TreeMap;
2626
import java.util.regex.Pattern;
2727

28-
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.pulsar.client.api.Consumer;
3029
import org.apache.pulsar.client.api.ConsumerBuilder;
3130
import org.apache.pulsar.client.api.PulsarClient;
@@ -36,6 +35,7 @@
3635

3736
import org.springframework.pulsar.PulsarException;
3837
import org.springframework.util.CollectionUtils;
38+
import org.springframework.util.StringUtils;
3939

4040
/**
4141
* Default implementation for {@link PulsarConsumerFactory}.
@@ -157,10 +157,12 @@ protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {
157157
}
158158
var builderImpl = (ConsumerBuilderImpl<T>) builder;
159159
var topicsPattern = builderImpl.getConf().getTopicsPattern();
160-
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
160+
if (topicsPattern != null && StringUtils.hasText(topicsPattern.pattern())) {
161161
var topicsPatternStr = topicsPattern.pattern();
162-
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
163-
builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
162+
var fqTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
163+
if (!topicsPatternStr.equals(fqTopicsPatternStr)) {
164+
builderImpl.getConf().setTopicsPattern(Pattern.compile(fqTopicsPatternStr));
165+
}
164166
}
165167
}
166168

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.concurrent.CompletableFuture;
3131

32+
import org.apache.pulsar.client.api.Consumer;
3233
import org.apache.pulsar.client.api.ConsumerBuilder;
3334
import org.apache.pulsar.client.api.PulsarClient;
3435
import org.apache.pulsar.client.api.PulsarClientException;
@@ -280,7 +281,7 @@ void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientE
280281
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
281282
null);
282283
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
283-
try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
284+
try (Consumer<String> consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
284285
"with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) {
285286
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
286287
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
@@ -302,16 +303,22 @@ void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClie
302303
var topicsPattern = patternMultiTopicsConsumer.getPattern();
303304
assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*");
304305
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*");
305-
CompletableFuture<?> watcherFuture = assertThat(patternMultiTopicsConsumer)
306-
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
307-
.actual();
308-
// Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method.
309-
// If watcherFuture is not completed, invoke pulsarClient.close() first
310-
// will cause exception.
311-
watcherFuture.join();
306+
temporarilyDealWithPulsar24698(patternMultiTopicsConsumer);
312307
}
313308
}
314309

310+
// TODO remove when Pulsar client updates to 4.2.0
311+
private void temporarilyDealWithPulsar24698(PatternMultiTopicsConsumerImpl<String> consumer) {
312+
// See https://github.com/apache/pulsar/pull/24698
313+
// If this is not here there will be numerous exceptions when
314+
// PulsarClient.close
315+
CompletableFuture<?> watcherFuture = assertThat(consumer)
316+
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
317+
.actual();
318+
watcherFuture.join();
319+
320+
}
321+
315322
}
316323

317324
}

0 commit comments

Comments
 (0)