Skip to content

Commit d01e086

Browse files
committed
Make the replica that triggered the snapshot actually create it
1 parent 3479bc7 commit d01e086

File tree

3 files changed

+64
-29
lines changed

3 files changed

+64
-29
lines changed

app/src/main/java/io/apicurio/registry/config/RegistryStorageConfigCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void run() {
9595

9696
private void refresh() {
9797
Instant now = Instant.now();
98-
if (lastRefresh != null) {
98+
if (lastRefresh != null && this.delegate != null && this.delegate.isReady()) {
9999
List<DynamicConfigPropertyDto> staleConfigProperties = this.getStaleConfigProperties(lastRefresh);
100100
if (!staleConfigProperties.isEmpty()) {
101101
invalidateCache();

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas
123123
private volatile boolean stopped = true;
124124
private volatile boolean snapshotProcessed = false;
125125

126+
//The snapshot id used to determine if this replica must process a snapshot message
127+
private volatile String lastTriggeredSnapshot = null;
126128

127129
@Override
128130
public String storageName() {
@@ -139,6 +141,7 @@ public void initialize() {
139141
}
140142

141143
//Try to restore the internal database from a snapshot
144+
final long bootstrapStart = System.currentTimeMillis();
142145
String snapshotId = consumeSnapshotsTopic(snapshotsConsumer);
143146

144147
//Once the topics are created, and the snapshots processed, initialize the internal SQL Storage.
@@ -147,7 +150,7 @@ public void initialize() {
147150

148151
//Once the SQL storage has been initialized, start the Kafka consumer thread.
149152
log.info("SQL store initialized, starting consumer thread.");
150-
startConsumerThread(journalConsumer, snapshotId);
153+
startConsumerThread(journalConsumer, snapshotId, bootstrapStart);
151154
}
152155

153156
@Override
@@ -208,6 +211,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
208211
try {
209212
String path = snapshotFound.value();
210213
if (null != path && !path.isBlank() && Files.exists(Path.of(snapshotFound.value()))) {
214+
log.info("Snapshot with path {} found.", snapshotFound.value());
211215
snapshotRecordKey = snapshotFound.key();
212216
mostRecentSnapshotPath = Path.of(snapshotFound.value());
213217
}
@@ -219,6 +223,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
219223

220224
//Here we have the most recent snapshot that we can find, try to restore the internal database from it.
221225
if (null != mostRecentSnapshotPath) {
226+
log.info("Restoring snapshot {} to the internal database...", mostRecentSnapshotPath);
222227
sqlStore.restoreFromSnapshot(mostRecentSnapshotPath.toString());
223228
}
224229
}
@@ -233,13 +238,12 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
233238
* consuming JournalRecord entries found on that topic, and applying those journal entries to
234239
* the internal data model.
235240
*/
236-
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotId) {
241+
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotId, long bootstrapStart) {
237242
log.info("Starting KSQL consumer thread on topic: {}", configuration.topic());
238243
log.info("Bootstrap servers: {}", configuration.bootstrapServers());
239244

240245
final String bootstrapId = UUID.randomUUID().toString();
241246
submitter.submitBootstrap(bootstrapId);
242-
final long bootstrapStart = System.currentTimeMillis();
243247

244248
Runnable runner = () -> {
245249
try (consumer) {
@@ -254,6 +258,7 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
254258
// Main consumer loop
255259
while (!stopped) {
256260
final ConsumerRecords<KafkaSqlMessageKey, KafkaSqlMessage> records = consumer.poll(Duration.ofMillis(configuration.pollTimeout()));
261+
257262
if (records != null && !records.isEmpty()) {
258263
log.debug("Consuming {} journal records.", records.count());
259264

@@ -263,10 +268,12 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
263268
while (it.hasNext() && !snapshotProcessed) {
264269
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
265270
if (processSnapshot(snapshotId, record)) {
271+
log.debug("Snapshot marker found {} the new messages will be applied on top of the snapshot data.", record.key());
266272
snapshotProcessed = true;
267273
break;
268-
} else {
269-
log.debug("Discarding message with key {} as it was sent before a newer snapshot was created", record.key());
274+
}
275+
else {
276+
log.debug("Discarding message with key {} as it was sent before a newer snapshot was created.", record.key());
270277
}
271278
}
272279

@@ -318,6 +325,13 @@ private void processRecord(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> r
318325
return;
319326
}
320327

328+
// If the key is a CreateSnapshotMessage key, but this replica does not have the snapshotId, it means that it wasn't triggered here, so just skip the message.
329+
if (record.value() instanceof CreateSnapshot1Message && !((CreateSnapshot1Message) record.value()).getSnapshotId().equals(lastTriggeredSnapshot)) {
330+
log.debug("Snapshot trigger message with id {} being skipped since this replica did not trigger the creation.",
331+
((CreateSnapshot1Message) record.value()).getSnapshotId());
332+
return;
333+
}
334+
321335
// If the value is null, then this is a tombstone (or unrecognized) message and should not
322336
// be processed.
323337
if (record.value() == null) {
@@ -873,6 +887,7 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
873887
String snapshotId = UUID.randomUUID().toString();
874888
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
875889
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
890+
this.lastTriggeredSnapshot = snapshotId;
876891
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
877892
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
878893

examples/rest-client/src/main/java/io/apicurio/registry/examples/RegistryLoader.java

+43-23
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616

1717
package io.apicurio.registry.examples;
1818

19-
import java.io.ByteArrayInputStream;
20-
import java.io.File;
21-
import java.io.FileInputStream;
22-
import java.io.InputStream;
23-
import java.nio.charset.StandardCharsets;
24-
2519
import io.apicurio.registry.client.auth.VertXAuthFactory;
2620
import io.apicurio.registry.rest.client.RegistryClient;
2721
import io.apicurio.registry.rest.client.models.ArtifactContent;
28-
import io.apicurio.rest.client.util.IoUtil;
22+
import io.apicurio.registry.rest.client.models.Rule;
23+
import io.apicurio.registry.rest.client.models.RuleType;
2924
import io.kiota.http.vertx.VertXRequestAdapter;
3025

26+
import java.util.UUID;
27+
3128
/**
3229
* @author eric.wittmann@gmail.com
3330
*/
@@ -40,26 +37,49 @@ public static void main(String[] args) throws Exception {
4037
vertXRequestAdapter.setBaseUrl(registryUrl);
4138
RegistryClient client = new RegistryClient(vertXRequestAdapter);
4239

43-
File templateFile = new File("C:\\Temp\\registry.json");
44-
String template;
45-
try (InputStream templateIS = new FileInputStream(templateFile)) {
46-
template = IoUtil.toString(templateIS);
40+
String simpleAvro = """
41+
{
42+
"type" : "record",
43+
"name" : "userInfo",
44+
"namespace" : "my.example",
45+
"fields" : [{"name" : "age", "type" : "int"}]
46+
}""";
47+
48+
for (int i = 0; i < 600; i++) {
49+
Task task = new Task(simpleAvro, client, 1000, i + 800000);
50+
task.start();
4751
}
52+
}
4853

49-
for (int idx = 1; idx <= 1000; idx++) {
50-
System.out.println("Creating artifact #" + idx);
51-
String content = template.replaceFirst("Apicurio Registry API", "Apicurio Registry API :: Copy #" + idx);
52-
InputStream contentIS = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
54+
protected static class Task extends Thread {
5355

54-
ArtifactContent artifactContent = new ArtifactContent();
55-
artifactContent.setContent(IoUtil.toString(contentIS));
56+
private final RegistryClient client;
57+
private final String simpleAvro;
58+
private final int numArtifacts;
59+
private int threadId;
5660

57-
final io.apicurio.registry.rest.client.models.VersionMetaData amdCity = client.groups().byGroupId("default").artifacts().post(artifactContent, config -> {
58-
config.queryParameters.ifExists = io.apicurio.registry.rest.client.models.IfExists.RETURN_OR_UPDATE;
59-
config.headers.add("X-Registry-ArtifactId", "city");
60-
config.headers.add("X-Registry-ArtifactType", "JSON");
61-
});
61+
public Task(String artifactContent, RegistryClient client, int numArtifacts, int threadId) {
62+
this.client = client;
63+
this.simpleAvro = artifactContent;
64+
this.numArtifacts = numArtifacts;
65+
this.threadId = threadId;
6266
}
63-
}
6467

68+
@Override
69+
public void run() {
70+
for (int idx = 0; idx < numArtifacts; idx++) {
71+
System.out.println("Iteration: " + idx);
72+
String artifactId = UUID.randomUUID().toString();
73+
ArtifactContent content = new ArtifactContent();
74+
content.setContent(simpleAvro.replace("userInfo", "userInfo" + threadId + numArtifacts));
75+
client.groups().byGroupId("default").artifacts().post(content, config -> {
76+
config.headers.add("X-Registry-ArtifactId", artifactId);
77+
});
78+
Rule rule = new Rule();
79+
rule.setType(RuleType.VALIDITY);
80+
rule.setConfig("SYNTAX_ONLY");
81+
client.groups().byGroupId("default").artifacts().byArtifactId(artifactId).rules().post(rule);
82+
}
83+
}
84+
}
6585
}

0 commit comments

Comments
 (0)