Skip to content

Commit 91e49b4

Browse files
committed
Make the replica that triggered the snapshot actually create it
1 parent 8bd6556 commit 91e49b4

File tree

3 files changed

+64
-35
lines changed

3 files changed

+64
-35
lines changed

app/src/main/java/io/apicurio/registry/config/RegistryStorageConfigCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void run() {
9595

9696
private void refresh() {
9797
Instant now = Instant.now();
98-
if (lastRefresh != null) {
98+
if (lastRefresh != null && this.delegate != null && this.delegate.isReady()) {
9999
List<DynamicConfigPropertyDto> staleConfigProperties = this.getStaleConfigProperties(lastRefresh);
100100
if (!staleConfigProperties.isEmpty()) {
101101
invalidateCache();

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas
113113
private volatile boolean stopped = true;
114114
private volatile boolean snapshotProcessed = false;
115115

116+
//The snapshot id used to determine if this replica must process a snapshot message
117+
private volatile String lastTriggeredSnapshot = null;
116118

117119
@Override
118120
public String storageName() {
@@ -129,6 +131,7 @@ public void initialize() {
129131
}
130132

131133
//Try to restore the internal database from a snapshot
134+
final long bootstrapStart = System.currentTimeMillis();
132135
String snapshotId = consumeSnapshotsTopic(snapshotsConsumer);
133136

134137
//Once the topics are created, and the snapshots processed, initialize the internal SQL Storage.
@@ -137,7 +140,7 @@ public void initialize() {
137140

138141
//Once the SQL storage has been initialized, start the Kafka consumer thread.
139142
log.info("SQL store initialized, starting consumer thread.");
140-
startConsumerThread(journalConsumer, snapshotId);
143+
startConsumerThread(journalConsumer, snapshotId, bootstrapStart);
141144
}
142145

143146
@Override
@@ -198,6 +201,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
198201
try {
199202
String path = snapshotFound.value();
200203
if (null != path && !path.isBlank() && Files.exists(Path.of(snapshotFound.value()))) {
204+
log.info("Snapshot with path {} found.", snapshotFound.value());
201205
snapshotRecordKey = snapshotFound.key();
202206
mostRecentSnapshotPath = Path.of(snapshotFound.value());
203207
}
@@ -209,6 +213,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
209213

210214
//Here we have the most recent snapshot that we can find, try to restore the internal database from it.
211215
if (null != mostRecentSnapshotPath) {
216+
log.info("Restoring snapshot {} to the internal database...", mostRecentSnapshotPath);
212217
sqlStore.restoreFromSnapshot(mostRecentSnapshotPath.toString());
213218
}
214219
}
@@ -223,13 +228,12 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
223228
* consuming JournalRecord entries found on that topic, and applying those journal entries to
224229
* the internal data model.
225230
*/
226-
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotId) {
231+
private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSqlMessage> consumer, String snapshotId, long bootstrapStart) {
227232
log.info("Starting KSQL consumer thread on topic: {}", configuration.topic());
228233
log.info("Bootstrap servers: {}", configuration.bootstrapServers());
229234

230235
final String bootstrapId = UUID.randomUUID().toString();
231236
submitter.submitBootstrap(bootstrapId);
232-
final long bootstrapStart = System.currentTimeMillis();
233237

234238
Runnable runner = () -> {
235239
try (consumer) {
@@ -244,6 +248,7 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
244248
// Main consumer loop
245249
while (!stopped) {
246250
final ConsumerRecords<KafkaSqlMessageKey, KafkaSqlMessage> records = consumer.poll(Duration.ofMillis(configuration.pollTimeout()));
251+
247252
if (records != null && !records.isEmpty()) {
248253
log.debug("Consuming {} journal records.", records.count());
249254

@@ -253,10 +258,12 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
253258
while (it.hasNext() && !snapshotProcessed) {
254259
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
255260
if (processSnapshot(snapshotId, record)) {
261+
log.debug("Snapshot marker found {} the new messages will be applied on top of the snapshot data.", record.key());
256262
snapshotProcessed = true;
257263
break;
258-
} else {
259-
log.debug("Discarding message with key {} as it was sent before a newer snapshot was created", record.key());
264+
}
265+
else {
266+
log.debug("Discarding message with key {} as it was sent before a newer snapshot was created.", record.key());
260267
}
261268
}
262269

@@ -308,6 +315,13 @@ private void processRecord(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> r
308315
return;
309316
}
310317

318+
// If the key is a CreateSnapshotMessage key, but this replica does not have the snapshotId, it means that it wasn't triggered here, so just skip the message.
319+
if (record.value() instanceof CreateSnapshot1Message && !((CreateSnapshot1Message) record.value()).getSnapshotId().equals(lastTriggeredSnapshot)) {
320+
log.debug("Snapshot trigger message with id {} being skipped since this replica did not trigger the creation.",
321+
((CreateSnapshot1Message) record.value()).getSnapshotId());
322+
return;
323+
}
324+
311325
// If the value is null, then this is a tombstone (or unrecognized) message and should not
312326
// be processed.
313327
if (record.value() == null) {
@@ -827,6 +841,7 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
827841
String snapshotId = UUID.randomUUID().toString();
828842
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
829843
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
844+
this.lastTriggeredSnapshot = snapshotId;
830845
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
831846
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
832847

examples/rest-client/src/main/java/io/apicurio/registry/examples/RegistryLoader.java

+43-29
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,12 @@
1818

1919
import io.apicurio.registry.client.auth.VertXAuthFactory;
2020
import io.apicurio.registry.rest.client.RegistryClient;
21-
import io.apicurio.registry.rest.client.models.CreateArtifact;
22-
import io.apicurio.registry.rest.client.models.CreateVersion;
23-
import io.apicurio.registry.rest.client.models.IfArtifactExists;
24-
import io.apicurio.registry.rest.client.models.VersionContent;
25-
import io.apicurio.rest.client.util.IoUtil;
21+
import io.apicurio.registry.rest.client.models.ArtifactContent;
22+
import io.apicurio.registry.rest.client.models.Rule;
23+
import io.apicurio.registry.rest.client.models.RuleType;
2624
import io.kiota.http.vertx.VertXRequestAdapter;
2725

28-
import java.io.ByteArrayInputStream;
29-
import java.io.File;
30-
import java.io.FileInputStream;
31-
import java.io.InputStream;
32-
import java.nio.charset.StandardCharsets;
26+
import java.util.UUID;
3327

3428
/**
3529
* @author eric.wittmann@gmail.com
@@ -43,29 +37,49 @@ public static void main(String[] args) throws Exception {
4337
vertXRequestAdapter.setBaseUrl(registryUrl);
4438
RegistryClient client = new RegistryClient(vertXRequestAdapter);
4539

46-
File templateFile = new File("C:\\Temp\\registry.json");
47-
String template;
48-
try (InputStream templateIS = new FileInputStream(templateFile)) {
49-
template = IoUtil.toString(templateIS);
40+
String simpleAvro = """
41+
{
42+
"type" : "record",
43+
"name" : "userInfo",
44+
"namespace" : "my.example",
45+
"fields" : [{"name" : "age", "type" : "int"}]
46+
}""";
47+
48+
for (int i = 0; i < 600; i++) {
49+
Task task = new Task(simpleAvro, client, 1000, i + 800000);
50+
task.start();
5051
}
52+
}
5153

52-
for (int idx = 1; idx <= 1000; idx++) {
53-
System.out.println("Creating artifact #" + idx);
54-
String content = template.replaceFirst("Apicurio Registry API", "Apicurio Registry API :: Copy #" + idx);
55-
InputStream contentIS = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
54+
protected static class Task extends Thread {
5655

57-
CreateArtifact createArtifact = new CreateArtifact();
58-
createArtifact.setArtifactId("city");
59-
createArtifact.setType("JSON");
60-
createArtifact.setFirstVersion(new CreateVersion());
61-
createArtifact.getFirstVersion().setContent(new VersionContent());
62-
createArtifact.getFirstVersion().getContent().setContent(IoUtil.toString(contentIS));
63-
createArtifact.getFirstVersion().getContent().setContentType("application/json");
56+
private final RegistryClient client;
57+
private final String simpleAvro;
58+
private final int numArtifacts;
59+
private final int threadId;
6460

65-
client.groups().byGroupId("default").artifacts().post(createArtifact, config -> {
66-
config.queryParameters.ifExists = IfArtifactExists.FIND_OR_CREATE_VERSION;
67-
});
61+
public Task(String artifactContent, RegistryClient client, int numArtifacts, int threadId) {
62+
this.client = client;
63+
this.simpleAvro = artifactContent;
64+
this.numArtifacts = numArtifacts;
65+
this.threadId = threadId;
6866
}
69-
}
7067

68+
@Override
69+
public void run() {
70+
for (int idx = 0; idx < numArtifacts; idx++) {
71+
System.out.println("Iteration: " + idx);
72+
String artifactId = UUID.randomUUID().toString();
73+
ArtifactContent content = new ArtifactContent();
74+
content.setContent(simpleAvro.replace("userInfo", "userInfo" + threadId + numArtifacts));
75+
client.groups().byGroupId("default").artifacts().post(content, config -> {
76+
config.headers.add("X-Registry-ArtifactId", artifactId);
77+
});
78+
Rule rule = new Rule();
79+
rule.setType(RuleType.VALIDITY);
80+
rule.setConfig("SYNTAX_ONLY");
81+
client.groups().byGroupId("default").artifacts().byArtifactId(artifactId).rules().post(rule);
82+
}
83+
}
84+
}
7185
}

0 commit comments

Comments
 (0)