Skip to content

Commit 28e62a3

Browse files
committed
Fix snapshotting unit test
1 parent 5ecb855 commit 28e62a3

File tree

3 files changed

+179
-0
lines changed

3 files changed

+179
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package io.apicurio.registry.storage.impl.kafkasql;
2+
3+
import io.apicurio.registry.AbstractResourceTestBase;
4+
import io.apicurio.registry.rest.client.models.ArtifactContent;
5+
import io.apicurio.registry.rest.client.models.Rule;
6+
import io.apicurio.registry.rest.client.models.RuleType;
7+
import io.apicurio.registry.utils.kafka.KafkaUtil;
8+
import io.apicurio.registry.utils.tests.KafkasqlRecoverFromSnapshotTestProfile;
9+
import io.quarkus.test.common.QuarkusTestResource;
10+
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
11+
import io.quarkus.test.junit.QuarkusTest;
12+
import io.quarkus.test.junit.TestProfile;
13+
import lombok.SneakyThrows;
14+
import org.apache.kafka.clients.CommonClientConfigs;
15+
import org.apache.kafka.clients.producer.KafkaProducer;
16+
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.common.header.internals.RecordHeader;
18+
import org.apache.kafka.common.serialization.StringSerializer;
19+
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.BeforeAll;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.net.URISyntaxException;
24+
import java.net.URL;
25+
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Paths;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Properties;
31+
import java.util.Set;
32+
import java.util.UUID;
33+
import java.util.concurrent.ExecutionException;
34+
35+
@QuarkusTest
36+
@TestProfile(KafkasqlRecoverFromSnapshotTestProfile.class)
37+
@QuarkusTestResource(value = KafkasqlRecoverFromSnapshotTest.KafkaSqlSnapshotTestInitializer.class, restrictToAnnotatedClass = true)
38+
public class KafkasqlRecoverFromSnapshotTest extends AbstractResourceTestBase {
39+
40+
private static final String NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID = "SNAPSHOT_TEST_GROUP_ID";
41+
42+
@BeforeAll
43+
public void init() {
44+
//Create a bunch of artifacts and rules, so they're added on top of the snapshot.
45+
String simpleAvro = resourceToString("avro.json");
46+
47+
for (int idx = 0; idx < 1000; idx++) {
48+
System.out.println("Iteration: " + idx);
49+
String artifactId = UUID.randomUUID().toString();
50+
ArtifactContent content = new ArtifactContent();
51+
content.setContent(simpleAvro);
52+
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().post(content, config -> {
53+
config.headers.add("X-Registry-ArtifactId", artifactId);
54+
});
55+
Rule rule = new Rule();
56+
rule.setType(RuleType.VALIDITY);
57+
rule.setConfig("SYNTAX_ONLY");
58+
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).rules().post(rule);
59+
}
60+
}
61+
62+
@Test
63+
public void testRecoverFromSnapshot() throws InterruptedException {
64+
//We expect 4001 artifacts coming from the snapshot
65+
Assertions.assertEquals(4001, clientV3.groups().byGroupId("default").artifacts().get().getCount());
66+
//We expect another 1000 artifacts coming added on top of the snapshot
67+
Assertions.assertEquals(1000, clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().get().getCount());
68+
}
69+
70+
protected static class KafkaSqlSnapshotTestInitializer implements QuarkusTestResourceLifecycleManager {
71+
72+
public KafkaSqlSnapshotTestInitializer() {
73+
}
74+
75+
@Override
76+
public int order() {
77+
return 10001;
78+
}
79+
80+
@Override
81+
@SneakyThrows
82+
public Map<String, String> start() {
83+
Properties props = connectionProperties();
84+
85+
KafkaUtil.createTopics(props, Set.of("kafkasql-snapshots", "kafkasql-journal"), Collections.emptyMap());
86+
87+
prepareSnapshotMarker(props);
88+
prepareSnapshotMessages(props);
89+
90+
return Collections.emptyMap();
91+
}
92+
93+
private void prepareSnapshotMarker(Properties props) throws ExecutionException, InterruptedException {
94+
StringSerializer keySerializer = new StringSerializer();
95+
StringSerializer valueSerializer = new StringSerializer();
96+
97+
//Create the data producer to send a snapshot marker
98+
KafkaProducer<String, String> dataProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
99+
RecordHeader messageTypeHeader = new RecordHeader("mt", "CreateSnapshot1Message".getBytes(StandardCharsets.UTF_8));
100+
ProducerRecord<String, String> snapshotMarkerRecord = new ProducerRecord<>("kafkasql-journal", 0,
101+
"{\"uuid\":\"1345b402-c707-457e-af76-10c1045e68e8\",\"messageType\":\"CreateSnapshot1Message\",\"partitionKey\":\"__GLOBAL_PARTITION__\"}", "{\n"
102+
+ " \"snapshotLocation\": \"/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql\",\n"
103+
+ " \"snapshotId\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
104+
+ " \"key\": {\n"
105+
+ " \"uuid\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
106+
+ " \"messageType\": \"CreateSnapshot1Message\",\n"
107+
+ " \"partitionKey\": \"__GLOBAL_PARTITION__\"\n"
108+
+ " }\n"
109+
+ " }", List.of(messageTypeHeader));
110+
111+
//Send snapshot marker
112+
dataProducer.send(snapshotMarkerRecord).get();
113+
}
114+
115+
private void prepareSnapshotMessages(Properties props) throws URISyntaxException, ExecutionException, InterruptedException {
116+
StringSerializer keySerializer = new StringSerializer();
117+
StringSerializer valueSerializer = new StringSerializer();
118+
119+
URL resource = getClass().getResource("/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql");
120+
String snapshotLocation = Paths.get(resource.toURI()).toFile().getAbsolutePath();
121+
122+
//Send three messages to the snapshots topic, two invalid, and one valid. Only the latest valid one must be processed.
123+
ProducerRecord<String, String> olderInvalidSnapshot = new ProducerRecord<>("kafkasql-snapshots", 0, "1312b402-c707-457e-af76-10c1045e68e8",
124+
"snapshotLocation",
125+
Collections.emptyList());
126+
ProducerRecord<String, String> record = new ProducerRecord<>("kafkasql-snapshots", 0, "1302b402-c707-457e-af76-10c1045e68e8", snapshotLocation,
127+
Collections.emptyList());
128+
ProducerRecord<String, String> newerInvalidSnaphot = new ProducerRecord<>("kafkasql-snapshots", 0, "1322b402-c707-457e-af76-10c1045e68e8", "",
129+
Collections.emptyList());
130+
131+
// Create the Kafka Producer
132+
KafkaProducer<String, String> snapshotsProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
133+
134+
snapshotsProducer.send(olderInvalidSnapshot).get();
135+
snapshotsProducer.send(record).get();
136+
snapshotsProducer.send(newerInvalidSnaphot).get();
137+
}
138+
139+
public Properties connectionProperties() {
140+
Properties properties = new Properties();
141+
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers.external"));
142+
properties.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
143+
properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 5000);
144+
return properties;
145+
}
146+
147+
@Override
148+
public void stop() {
149+
}
150+
}
151+
}

utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public Map<String, String> start() {
5050
public void stop() {
5151
if (kafka != null) {
5252
log.info("Stopping the Kafka Test Container");
53+
kafka.close();
5354
kafka.stop();
5455
}
5556
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.apicurio.registry.utils.tests;
2+
3+
import io.quarkus.test.junit.QuarkusTestProfile;
4+
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class KafkasqlRecoverFromSnapshotTestProfile implements QuarkusTestProfile {
10+
11+
@Override
12+
public Map<String, String> getConfigOverrides() {
13+
return Map.of("apicurio.storage.kind", "kafkasql", "apicurio.kafkasql.snapshot.every.seconds", "2s");
14+
}
15+
16+
@Override
17+
public List<TestResourceEntry> testResources() {
18+
if (!Boolean.parseBoolean(System.getProperty("cluster.tests"))) {
19+
return List.of(
20+
new TestResourceEntry(KafkaTestContainerManager.class));
21+
}
22+
else {
23+
return Collections.emptyList();
24+
}
25+
}
26+
27+
}

0 commit comments

Comments
 (0)