Skip to content

Commit c6e71d3

Browse files
committed
Fix snapshotting unit test
1 parent 5ecb855 commit c6e71d3

File tree

8 files changed

+4510
-22138
lines changed

8 files changed

+4510
-22138
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ public boolean isAlive() {
157157
@PreDestroy
158158
void onDestroy() {
159159
stopped = true;
160+
journalConsumer.close();
161+
snapshotsConsumer.close();
160162
}
161163

162164
/**
@@ -217,7 +219,6 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
217219
}
218220
}
219221

220-
snapshotsConsumer.commitSync();
221222
return snapshotRecordKey;
222223
}
223224

@@ -350,8 +351,10 @@ public void deleteConfigProperty(String propertyName) {
350351

351352
@Override
352353
public Pair<ArtifactMetaDataDto, ArtifactVersionMetaDataDto> createArtifact(String groupId, String artifactId,
353-
String artifactType, EditableArtifactMetaDataDto artifactMetaData, String version, ContentWrapperDto versionContent,
354-
EditableVersionMetaDataDto versionMetaData, List<String> versionBranches) throws RegistryStorageException {
354+
String artifactType, EditableArtifactMetaDataDto artifactMetaData, String version,
355+
ContentWrapperDto versionContent,
356+
EditableVersionMetaDataDto versionMetaData, List<String> versionBranches)
357+
throws RegistryStorageException {
355358
String content = versionContent != null ? versionContent.getContent().content() : null;
356359
String contentType = versionContent != null ? versionContent.getContentType() : null;
357360
List<ArtifactReferenceDto> references = versionContent != null ? versionContent.getReferences() : null;
@@ -384,7 +387,8 @@ public void deleteArtifacts(String groupId) throws RegistryStorageException {
384387

385388
@Override
386389
public ArtifactVersionMetaDataDto createArtifactVersion(String groupId, String artifactId, String version,
387-
String artifactType, ContentWrapperDto contentDto, EditableVersionMetaDataDto metaData, List<String> branches) throws RegistryStorageException {
390+
String artifactType, ContentWrapperDto contentDto, EditableVersionMetaDataDto metaData, List<String> branches)
391+
throws RegistryStorageException {
388392
String content = contentDto != null ? contentDto.getContent().content() : null;
389393
String contentType = contentDto != null ? contentDto.getContentType() : null;
390394
List<ArtifactReferenceDto> references = contentDto != null ? contentDto.getReferences() : null;
@@ -399,7 +403,7 @@ public ArtifactVersionMetaDataDto createArtifactVersion(String groupId, String a
399403
*/
400404
@Override
401405
public void updateArtifactMetaData(String groupId, String artifactId,
402-
EditableArtifactMetaDataDto metaData) throws ArtifactNotFoundException, RegistryStorageException {
406+
EditableArtifactMetaDataDto metaData) throws ArtifactNotFoundException, RegistryStorageException {
403407
var message = new UpdateArtifactMetaData3Message(groupId, artifactId, metaData);
404408
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
405409
coordinator.waitForResponse(uuid);

app/src/main/java/io/apicurio/registry/storage/impl/sql/SqlRegistryStorage.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ public void initialize() {
3636
}
3737

3838
public void restoreFromSnapshot(String snapshotLocation) {
39-
handleFactory.withHandleNoException(handle -> {
40-
handle.createUpdate(sqlStatements.restoreFromSnapshot())
41-
.bind(0, snapshotLocation).execute();
42-
});
39+
handleFactory.withHandle(handle -> handle.createUpdate(sqlStatements.restoreFromSnapshot())
40+
.bind(0, snapshotLocation).execute());
4341
}
4442
}
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,27 @@
11
package io.apicurio.registry.storage.impl.kafkasql;
22

33
import io.apicurio.registry.AbstractResourceTestBase;
4-
import io.apicurio.registry.rest.client.models.ArtifactContent;
4+
import io.apicurio.registry.rest.client.models.CreateArtifact;
55
import io.apicurio.registry.rest.client.models.Rule;
66
import io.apicurio.registry.rest.client.models.RuleType;
7-
import io.apicurio.registry.utils.kafka.KafkaUtil;
8-
import io.apicurio.registry.utils.tests.KafkasqlSnapshotTestProfile;
9-
import io.quarkus.test.common.QuarkusTestResource;
10-
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
7+
import io.apicurio.registry.types.ArtifactType;
8+
import io.apicurio.registry.types.ContentTypes;
9+
import io.apicurio.registry.utils.tests.KafkasqlTestProfile;
10+
import io.apicurio.registry.utils.tests.TestUtils;
1111
import io.quarkus.test.junit.QuarkusTest;
1212
import io.quarkus.test.junit.TestProfile;
1313
import jakarta.inject.Inject;
14-
import lombok.SneakyThrows;
15-
import org.apache.kafka.clients.CommonClientConfigs;
16-
import org.apache.kafka.clients.producer.KafkaProducer;
17-
import org.apache.kafka.clients.producer.ProducerRecord;
18-
import org.apache.kafka.common.header.internals.RecordHeader;
19-
import org.apache.kafka.common.serialization.StringSerializer;
2014
import org.junit.jupiter.api.Assertions;
2115
import org.junit.jupiter.api.BeforeAll;
2216
import org.junit.jupiter.api.Test;
2317

2418
import java.io.IOException;
25-
import java.net.URISyntaxException;
26-
import java.net.URL;
27-
import java.nio.charset.StandardCharsets;
2819
import java.nio.file.Files;
2920
import java.nio.file.Path;
30-
import java.nio.file.Paths;
31-
import java.util.Collections;
32-
import java.util.List;
33-
import java.util.Map;
34-
import java.util.Properties;
35-
import java.util.Set;
3621
import java.util.UUID;
37-
import java.util.concurrent.ExecutionException;
3822

3923
@QuarkusTest
40-
@TestProfile(KafkasqlSnapshotTestProfile.class)
41-
@QuarkusTestResource(value = KafkaSqlSnapshotTest.KafkaSqlSnapshotTestInitializer.class, restrictToAnnotatedClass = true)
24+
@TestProfile(KafkasqlTestProfile.class)
4225
public class KafkaSqlSnapshotTest extends AbstractResourceTestBase {
4326

4427
private static final String NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID = "SNAPSHOT_TEST_GROUP_ID";
@@ -54,15 +37,15 @@ public void init() {
5437
for (int idx = 0; idx < 1000; idx++) {
5538
System.out.println("Iteration: " + idx);
5639
String artifactId = UUID.randomUUID().toString();
57-
ArtifactContent content = new ArtifactContent();
58-
content.setContent(simpleAvro);
59-
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().post(content, config -> {
60-
config.headers.add("X-Registry-ArtifactId", artifactId);
61-
});
40+
CreateArtifact createArtifact = TestUtils.clientCreateArtifact(artifactId, ArtifactType.AVRO, simpleAvro,
41+
ContentTypes.APPLICATION_JSON);
42+
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts()
43+
.post(createArtifact, config -> config.headers.add("X-Registry-ArtifactId", artifactId));
6244
Rule rule = new Rule();
6345
rule.setType(RuleType.VALIDITY);
6446
rule.setConfig("SYNTAX_ONLY");
65-
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).rules().post(rule); }
47+
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).rules().post(rule);
48+
}
6649
}
6750

6851
@Test
@@ -72,95 +55,4 @@ public void testSnapshotCreation() throws IOException {
7255
Assertions.assertTrue(Files.exists(path));
7356
Files.delete(path);
7457
}
75-
76-
77-
@Test
78-
public void testRecoverFromSnapshot() throws InterruptedException {
79-
//We expect 4001 artifacts coming from the snapshot
80-
Assertions.assertEquals(4001, clientV3.groups().byGroupId("default").artifacts().get().getCount());
81-
//We expect another 1000 artifacts coming added on top of the snapshot
82-
Assertions.assertEquals(1000, clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().get().getCount());
83-
}
84-
85-
protected static class KafkaSqlSnapshotTestInitializer implements QuarkusTestResourceLifecycleManager {
86-
87-
public KafkaSqlSnapshotTestInitializer() {
88-
}
89-
90-
@Override
91-
public int order() {
92-
return 10001;
93-
}
94-
95-
@Override
96-
@SneakyThrows
97-
public Map<String, String> start() {
98-
Properties props = connectionProperties();
99-
100-
KafkaUtil.createTopics(props, Set.of("kafkasql-snapshots", "kafkasql-journal"), Collections.emptyMap());
101-
102-
prepareSnapshotMarker(props);
103-
prepareSnapshotMessages(props);
104-
105-
return Collections.emptyMap();
106-
}
107-
108-
private void prepareSnapshotMarker(Properties props) throws ExecutionException, InterruptedException {
109-
StringSerializer keySerializer = new StringSerializer();
110-
StringSerializer valueSerializer = new StringSerializer();
111-
112-
//Create the data producer to send a snapshot marker
113-
KafkaProducer<String, String> dataProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
114-
RecordHeader messageTypeHeader = new RecordHeader("mt", "CreateSnapshot1Message".getBytes(StandardCharsets.UTF_8));
115-
ProducerRecord<String, String> snapshotMarkerRecord = new ProducerRecord<>("kafkasql-journal", 0,
116-
"{\"uuid\":\"1345b402-c707-457e-af76-10c1045e68e8\",\"messageType\":\"CreateSnapshot1Message\",\"partitionKey\":\"__GLOBAL_PARTITION__\"}", "{\n"
117-
+ " \"snapshotLocation\": \"/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql\",\n"
118-
+ " \"snapshotId\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
119-
+ " \"key\": {\n"
120-
+ " \"uuid\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
121-
+ " \"messageType\": \"CreateSnapshot1Message\",\n"
122-
+ " \"partitionKey\": \"__GLOBAL_PARTITION__\"\n"
123-
+ " }\n"
124-
+ " }", List.of(messageTypeHeader));
125-
126-
//Send snapshot marker
127-
dataProducer.send(snapshotMarkerRecord).get();
128-
}
129-
130-
private void prepareSnapshotMessages(Properties props) throws URISyntaxException, ExecutionException, InterruptedException {
131-
StringSerializer keySerializer = new StringSerializer();
132-
StringSerializer valueSerializer = new StringSerializer();
133-
134-
URL resource = getClass().getResource("/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql");
135-
String snapshotLocation = Paths.get(resource.toURI()).toFile().getAbsolutePath();
136-
137-
//Send three messages to the snapshots topic, two invalid, and one valid. Only the latest valid one must be processed.
138-
ProducerRecord<String, String> olderInvalidSnapshot = new ProducerRecord<>("kafkasql-snapshots", 0, "1312b402-c707-457e-af76-10c1045e68e8",
139-
"snapshotLocation",
140-
Collections.emptyList());
141-
ProducerRecord<String, String> record = new ProducerRecord<>("kafkasql-snapshots", 0, "1302b402-c707-457e-af76-10c1045e68e8", snapshotLocation,
142-
Collections.emptyList());
143-
ProducerRecord<String, String> newerInvalidSnaphot = new ProducerRecord<>("kafkasql-snapshots", 0, "1322b402-c707-457e-af76-10c1045e68e8", "",
144-
Collections.emptyList());
145-
146-
// Create the Kafka Producer
147-
KafkaProducer<String, String> snapshotsProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
148-
149-
snapshotsProducer.send(olderInvalidSnapshot).get();
150-
snapshotsProducer.send(record).get();
151-
snapshotsProducer.send(newerInvalidSnaphot).get();
152-
}
153-
154-
public Properties connectionProperties() {
155-
Properties properties = new Properties();
156-
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers.external"));
157-
properties.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
158-
properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 5000);
159-
return properties;
160-
}
161-
162-
@Override
163-
public void stop() {
164-
}
165-
}
16658
}

0 commit comments

Comments
 (0)