Skip to content

Commit 92a71ba

Browse files
committed
Finish coordination mechanism for snapshot creation and add snapshot creation test
1 parent c74186b commit 92a71ba

File tree

5 files changed

+43
-23
lines changed

5 files changed

+43
-23
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+19-16
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,15 @@ public void initialize() {
139139
}
140140

141141
//Try to restore the internal database from a snapshot
142-
String snapshotMessageKey = consumeSnapshotsTopic(snapshotsConsumer);
142+
String snapshotId = consumeSnapshotsTopic(snapshotsConsumer);
143143

144144
//Once the topics are created, and the snapshots processed, initialize the internal SQL Storage.
145145
sqlStore.initialize();
146146
setDelegate(sqlStore);
147147

148148
//Once the SQL storage has been initialized, start the Kafka consumer thread.
149149
log.info("SQL store initialized, starting consumer thread.");
150-
startConsumerThread(journalConsumer, snapshotMessageKey);
150+
startConsumerThread(journalConsumer, snapshotId);
151151
}
152152

153153
@Override
@@ -233,7 +233,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
233233
* consuming JournalRecord entries found on that topic, and applying those journal entries to
234234
* the internal data model.
235235
*/
236-
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotMessageKey) {
236+
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotId) {
237237
log.info("Starting KSQL consumer thread on topic: {}", configuration.topic());
238238
log.info("Bootstrap servers: {}", configuration.bootstrapServers());
239239

@@ -257,23 +257,26 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
257257
if (records != null && !records.isEmpty()) {
258258
log.debug("Consuming {} journal records.", records.count());
259259

260-
if (null != snapshotMessageKey && !snapshotProcessed) {
260+
if (null != snapshotId && !snapshotProcessed) {
261261
//If there is a snapshot key present, we process (and discard) all the messages until we find the snapshot marker that corresponds to the snapshot key.
262262
Iterator<ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage>> it = records.iterator();
263263
while (it.hasNext() && !snapshotProcessed) {
264264
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
265-
if (processSnapshot(snapshotMessageKey, record)) {
265+
if (processSnapshot(snapshotId, record)) {
266266
snapshotProcessed = true;
267267
break;
268268
} else {
269-
log.info("Discarding message with key {} as it was sent before a snapshot was created", record.key());
269+
log.debug("Discarding message with key {} as it was sent before a newer snapshot was created", record.key());
270270
}
271271
}
272272

273-
//Once the snapshot marker message has been found, we can process the rest of the messages as usual, applying the new changes on top of the existing ones in the snapshot.
274-
while (it.hasNext()) {
275-
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
276-
processRecord(record, bootstrapId, bootstrapStart);
273+
//If the snapshot marker has not been found, continue with message skipping until we find it.
274+
if (snapshotProcessed) {
275+
//Once the snapshot marker message has been found, we can process the rest of the messages as usual, applying the new changes on top of the existing ones in the snapshot.
276+
while (it.hasNext()) {
277+
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
278+
processRecord(record, bootstrapId, bootstrapStart);
279+
}
277280
}
278281
}
279282
else {
@@ -291,8 +294,8 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
291294
thread.start();
292295
}
293296

294-
private boolean processSnapshot(String snapshotKey, ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record) {
295-
return record.key() != null && snapshotKey.equals(record.key().getUuid());
297+
private boolean processSnapshot(String snapshotId, ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record) {
298+
return record.value() instanceof CreateSnapshot1Message && snapshotId.equals(((CreateSnapshot1Message) record.value()).getSnapshotId());
296299
}
297300

298301
private void processRecord(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record, String bootstrapId, long bootstrapStart) {
@@ -869,17 +872,17 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
869872
//First we generate an identifier for the snapshot, then we send a snapshot marker to the journal topic.
870873
String snapshotId = UUID.randomUUID().toString();
871874
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
872-
var message = new CreateSnapshot1Message(path.toString());
875+
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
873876
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
874877
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
875878

876-
//Then we send a new message to the snapshots topic, using the marker uuid as the keyof the snapshot message.
877-
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, uuid.toString(), snapshotLocation,
879+
//Then we send a new message to the snapshots topic, using the snapshot id as the key of the snapshot message.
880+
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, snapshotId, snapshotLocation,
878881
Collections.emptyList());
879882

880883
RecordMetadata recordMetadata = ConcurrentUtil.get(snapshotsProducer.apply(record));
881884

882-
return recordMetadata.toString();
885+
return snapshotLocation;
883886
}
884887

885888
@Override

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/messages/CreateSnapshot1Message.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
public class CreateSnapshot1Message extends AbstractMessage {
2121

2222
private String snapshotLocation;
23+
private String snapshotId;
2324

2425
/**
2526
* @see io.apicurio.registry.storage.impl.kafkasql.KafkaSqlMessage#dispatchTo(io.apicurio.registry.storage.RegistryStorage)

app/src/test/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSnapshotTest.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
1111
import io.quarkus.test.junit.QuarkusTest;
1212
import io.quarkus.test.junit.TestProfile;
13+
import jakarta.inject.Inject;
1314
import lombok.SneakyThrows;
1415
import org.apache.kafka.clients.CommonClientConfigs;
1516
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -20,9 +21,12 @@
2021
import org.junit.jupiter.api.BeforeAll;
2122
import org.junit.jupiter.api.Test;
2223

24+
import java.io.IOException;
2325
import java.net.URISyntaxException;
2426
import java.net.URL;
2527
import java.nio.charset.StandardCharsets;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
2630
import java.nio.file.Paths;
2731
import java.util.Collections;
2832
import java.util.List;
@@ -39,6 +43,9 @@ public class KafkaSqlSnapshotTest extends AbstractResourceTestBase {
3943

4044
private static final String NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID = "SNAPSHOT_TEST_GROUP_ID";
4145

46+
@Inject
47+
KafkaSqlRegistryStorage kafkaSqlRegistryStorage;
48+
4249
@BeforeAll
4350
public void init() {
4451
//Create a bunch of artifacts and rules, so they're added on top of the snapshot.
@@ -58,6 +65,15 @@ public void init() {
5865
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).rules().post(rule); }
5966
}
6067

68+
@Test
69+
public void testSnapshotCreation() throws IOException {
70+
String snapshotLocation = kafkaSqlRegistryStorage.triggerSnapshotCreation();
71+
Path path = Path.of(snapshotLocation);
72+
Assertions.assertTrue(Files.exists(path));
73+
Files.delete(path);
74+
}
75+
76+
6177
@Test
6278
public void testRecoverFromSnapshot() throws InterruptedException {
6379
//We expect 4001 artifacts coming from the snapshot
@@ -97,8 +113,9 @@ private void prepareSnapshotMarker(Properties props) throws ExecutionException,
97113
KafkaProducer<String, String> dataProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
98114
RecordHeader messageTypeHeader = new RecordHeader("mt", "CreateSnapshot1Message".getBytes(StandardCharsets.UTF_8));
99115
ProducerRecord<String, String> snapshotMarkerRecord = new ProducerRecord<>("kafkasql-journal", 0,
100-
"{\"uuid\":\"1302b402-c707-457e-af76-10c1045e68e8\",\"messageType\":\"CreateSnapshot1Message\",\"partitionKey\":\"__GLOBAL_PARTITION__\"}", "{\n"
116+
"{\"uuid\":\"1345b402-c707-457e-af76-10c1045e68e8\",\"messageType\":\"CreateSnapshot1Message\",\"partitionKey\":\"__GLOBAL_PARTITION__\"}", "{\n"
101117
+ " \"snapshotLocation\": \"/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql\",\n"
118+
+ " \"snapshotId\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
102119
+ " \"key\": {\n"
103120
+ " \"uuid\": \"1302b402-c707-457e-af76-10c1045e68e8\",\n"
104121
+ " \"messageType\": \"CreateSnapshot1Message\",\n"
@@ -107,7 +124,6 @@ private void prepareSnapshotMarker(Properties props) throws ExecutionException,
107124
+ " }", List.of(messageTypeHeader));
108125

109126
//Send snapshot marker
110-
111127
dataProducer.send(snapshotMarkerRecord).get();
112128
}
113129

@@ -137,7 +153,7 @@ private void prepareSnapshotMessages(Properties props) throws URISyntaxException
137153

138154
public Properties connectionProperties() {
139155
Properties properties = new Properties();
140-
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
156+
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers.external"));
141157
properties.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
142158
properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 5000);
143159
return properties;

app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public class ReadOnlyRegistryStorageTest {
147147
entry("updateGlobalRule2", new State(true, s -> s.updateGlobalRule(null, null))),
148148
entry("updateGroupMetaData2", new State(true, s -> s.updateGroupMetaData(null, null))),
149149
entry("updateRoleMapping2", new State(true, s -> s.updateRoleMapping(null, null))),
150-
entry("triggerSnapshotCreation1", new State(true, s -> s.triggerSnapshotCreation())),
150+
entry("triggerSnapshotCreation0", new State(true, RegistryStorage::triggerSnapshotCreation)),
151151
entry("createSnapshot1", new State(true, s -> s.createSnapshot(null)))
152152
);
153153

utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public Map<String, String> start() {
3838
System.setProperty("bootstrap.servers.external", externalBootstrapServers);
3939

4040
return Map.of(
41-
"bootstrap.servers", "localhost:9092",
42-
"apicurio.events.kafka.config.bootstrap.servers", "localhost:9092",
43-
"apicurio.kafkasql.bootstrap.servers", "localhost:9092");
41+
"bootstrap.servers", externalBootstrapServers,
42+
"apicurio.events.kafka.config.bootstrap.servers", externalBootstrapServers,
43+
"apicurio.kafkasql.bootstrap.servers", externalBootstrapServers);
4444
} else {
4545
return Collections.emptyMap();
4646
}

0 commit comments

Comments
 (0)