Skip to content

Commit 3b4d17e

Browse files
souravagrawalEricWittmanncarlesarnal
authored
Improvements on Nats Serdes (#5222)
* Renamed nats producer send() to publish() * Renamed nats received() to fetch() * Renamed NATS_MGMT_PORT to NATS_MNTR_PORT * Added Nak, Term and InProgress() with ConsumerRecord * Added test case for new methods term, nak and inprogress in nats * Fixed formatting issue, causing build failure * Using assertTrue instead of assertEqual * fixed build failure * Formatting fix * Removing flaky test * Fixing Indentation Issues --------- Co-authored-by: Eric Wittmann <eric.wittmann@gmail.com> Co-authored-by: Carles Arnal <carlesarnal92@gmail.com>
1 parent cbf657b commit 3b4d17e

File tree

7 files changed

+62
-21
lines changed

7 files changed

+62
-21
lines changed

integration-tests/src/test/java/io/apicurio/tests/serdes/apicurio/nats/AvroNatsSerdeIT.java

+31-11
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616
import io.nats.client.Options;
1717
import io.nats.client.PullSubscribeOptions;
1818
import io.nats.client.api.ConsumerConfiguration;
19+
import io.nats.client.api.RetentionPolicy;
1920
import io.nats.client.api.StreamConfiguration;
2021
import io.quarkus.test.junit.QuarkusIntegrationTest;
2122
import org.apache.avro.Schema;
2223
import org.apache.avro.generic.GenericData;
2324
import org.apache.avro.generic.GenericRecord;
24-
import org.junit.jupiter.api.AfterAll;
25-
import org.junit.jupiter.api.Assertions;
26-
import org.junit.jupiter.api.BeforeAll;
27-
import org.junit.jupiter.api.Tag;
28-
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.*;
2926
import org.testcontainers.containers.GenericContainer;
3027

3128
import java.io.IOException;
@@ -43,12 +40,12 @@ public class AvroNatsSerdeIT extends ApicurioRegistryBaseIT {
4340

4441
public static final Integer NATS_PORT = 4222;
4542

46-
public static final Integer NATS_MGMT_PORT = 8222;
43+
public static final Integer NATS_MNTR_PORT = 8222;
4744

4845
@BeforeAll
4946
void setupEnvironment() {
5047
if (nats == null || !nats.isRunning()) {
51-
nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MGMT_PORT)
48+
nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MNTR_PORT)
5249
.withCommand("--jetstream");
5350
nats.start();
5451
}
@@ -76,10 +73,10 @@ public void testNatsJsonSchema() throws IOException, InterruptedException, JetSt
7673
jsm = connection.jetStreamManagement();
7774

7875
StreamConfiguration stream = new StreamConfiguration.Builder().subjects(subjectId).name(subjectId)
79-
.build();
76+
.retentionPolicy(RetentionPolicy.WorkQueue).build();
8077

8178
ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder().durable(subjectId)
82-
.durable(subjectId).filterSubject(subjectId).build();
79+
.durable(subjectId).filterSubject(subjectId).ackWait(2000).build();
8380

8481
jsm.addStream(stream); // Create Stream in advance
8582
jsm.addOrUpdateConsumer(stream.getName(), consumerConfiguration); // Create Consumer in advance
@@ -95,16 +92,39 @@ public void testNatsJsonSchema() throws IOException, InterruptedException, JetSt
9592
NatsConsumer<GenericRecord> consumer = new NatsConsumerImpl<>(connection, subjectId, options,
9693
configs);
9794

98-
producer.send(record);
95+
producer.publish(record);
9996

100-
NatsConsumerRecord<GenericRecord> message = consumer.receive();
97+
NatsConsumerRecord<GenericRecord> message = consumer.fetch();
10198

10299
if (message.getPayload() != null) {
103100
GenericRecord event1 = message.getPayload();
104101
Assertions.assertEquals(record, event1);
105102
}
106103

107104
message.ack();
105+
106+
producer.publish(record);
107+
consumer.fetch().nak(); // Nak will redeliver the message until ack'd so message should be left in
108+
// stream
109+
Assertions.assertTrue(jsm.getStreamInfo(stream.getName()).getStreamState().getMsgCount() == 1);
110+
111+
jsm.purgeStream(stream.getName());
112+
// producer.publish(record);
113+
// consumer.fetch().term(); // this will terminate the message, since there was only one message
114+
// in
115+
// stream and after calling terminate we should not have any message left
116+
// in stream
117+
// Assertions.assertTrue(jsm.getStreamInfo(stream.getName()).getStreamState().getMsgCount() == 0);
118+
producer.publish(record);
119+
120+
NatsConsumerRecord<GenericRecord> newMessage = consumer.fetch();
121+
Thread.sleep(1000); // Ack wait is set to 2second for consumer,after 2 second if ack is not
122+
// received consumer will redeliver the message
123+
newMessage.inProgress(); // with this we are resetting the ackwait to 2 second again so consumer
124+
// do not redeliver the message. we should have a message in ack pending
125+
// state
126+
Assertions.assertTrue(jsm.getConsumerInfo(stream.getName(), consumerConfiguration.getDurable())
127+
.getNumAckPending() == 1);
108128
}
109129
}
110130
}

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ public interface NatsConsumer<T> extends AutoCloseable {
1010

1111
String getSubject();
1212

13-
NatsConsumerRecord<T> receive() throws JetStreamApiException, IOException;
13+
NatsConsumerRecord<T> fetch() throws JetStreamApiException, IOException;
1414

15-
NatsConsumerRecord<T> receive(Duration timeout) throws JetStreamApiException, IOException;
15+
NatsConsumerRecord<T> fetch(Duration timeout) throws JetStreamApiException, IOException;
1616

17-
Collection<NatsConsumerRecord<T>> receive(int batchSize, Duration timeout)
17+
Collection<NatsConsumerRecord<T>> fetch(int batchSize, Duration timeout)
1818
throws JetStreamApiException, IOException;
1919
}

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,19 @@ public String getSubject() {
5858
}
5959

6060
@Override
61-
public NatsConsumerRecord<DATA> receive() throws JetStreamApiException, IOException {
62-
return receive(Duration.ofSeconds(3));
61+
public NatsConsumerRecord<DATA> fetch() throws JetStreamApiException, IOException {
62+
return fetch(Duration.ofSeconds(3));
6363
}
6464

6565
@Override
66-
public NatsConsumerRecord<DATA> receive(Duration timeout) throws JetStreamApiException, IOException {
67-
Collection<NatsConsumerRecord<DATA>> messages = receive(1, timeout);
66+
public NatsConsumerRecord<DATA> fetch(Duration timeout) throws JetStreamApiException, IOException {
67+
Collection<NatsConsumerRecord<DATA>> messages = fetch(1, timeout);
6868
Optional<NatsConsumerRecord<DATA>> record = messages.stream().findFirst();
6969
return record.orElse(null);
7070
}
7171

7272
@Override
73-
public List<NatsConsumerRecord<DATA>> receive(int batchSize, Duration timeout)
73+
public List<NatsConsumerRecord<DATA>> fetch(int batchSize, Duration timeout)
7474
throws JetStreamApiException, IOException {
7575
List<Message> messages = getLazySubscription().fetch(batchSize, timeout);
7676

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecord.java

+6
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,10 @@ public interface NatsConsumerRecord<T> {
99
T getPayload();
1010

1111
void ack();
12+
13+
void nak();
14+
15+
void term();
16+
17+
void inProgress();
1218
}

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecordImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,19 @@ public T getPayload() {
2727
public void ack() {
2828
natsMessage.ack();
2929
}
30+
31+
@Override
32+
public void nak() {
33+
natsMessage.nak();
34+
}
35+
36+
@Override
37+
public void term() {
38+
natsMessage.term();
39+
}
40+
41+
@Override
42+
public void inProgress() {
43+
natsMessage.inProgress();
44+
}
3045
}

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44

55
public interface NatsProducer<T> extends AutoCloseable {
66

7-
void send(T message) throws ApicurioNatsException;
7+
void publish(T message) throws ApicurioNatsException;
88
}

serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public NatsProducerImpl(Connection connection, String subject, Map<String, Objec
3232
}
3333

3434
@Override
35-
public void send(DATA message) throws ApicurioNatsException {
35+
public void publish(DATA message) throws ApicurioNatsException {
3636
byte[] data = serializer.serializeData(subject, message);
3737

3838
try {

0 commit comments

Comments
 (0)