Skip to content

Commit 332252b

Browse files
committed
Add some log messages to improve traceability
1 parent d01e086 commit 332252b

File tree

2 files changed

+5
-11
lines changed

2 files changed

+5
-11
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,8 @@ private void autoCreateTopics() {
187187
}
188188

189189
/**
190-
* Start the KSQL Kafka consumer thread which is responsible for subscribing to the kafka topic,
191-
* consuming JournalRecord entries found on that topic, and applying those journal entries to
192-
* the internal data model.
190+
* Consume the snapshots topic, looking for the most recent snapshots in the topic. Once found, it restores the internal h2 database using the snapshot's content.
191+
* WARNING: This has the limitation of processing the first 500 snapshots, which should be enough for most deployments.
193192
*/
194193
private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsConsumer) {
195194
// Subscribe to the snapshots topic
@@ -211,7 +210,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
211210
try {
212211
String path = snapshotFound.value();
213212
if (null != path && !path.isBlank() && Files.exists(Path.of(snapshotFound.value()))) {
214-
log.info("Snapshot with path {} found.", snapshotFound.value());
213+
log.debug("Snapshot with path {} found.", snapshotFound.value());
215214
snapshotRecordKey = snapshotFound.key();
216215
mostRecentSnapshotPath = Path.of(snapshotFound.value());
217216
}
@@ -229,7 +228,6 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
229228
}
230229

231230
snapshotsConsumer.commitSync();
232-
233231
return snapshotRecordKey;
234232
}
235233

@@ -248,9 +246,6 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
248246
Runnable runner = () -> {
249247
try (consumer) {
250248
log.info("Subscribing to {}", configuration.topic());
251-
252-
//TODO use the snapshot record metadata to put the journal consumer into the appropiate offset so it does not consume unneeded messages
253-
254249
// Subscribe to the journal topic
255250
Collection<String> topics = Collections.singleton(configuration.topic());
256251
consumer.subscribe(topics);
@@ -888,15 +883,13 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
888883
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
889884
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
890885
this.lastTriggeredSnapshot = snapshotId;
886+
log.debug("Snapshot with id {} triggered.", snapshotId);
891887
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
892888
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
893-
894889
//Then we send a new message to the snapshots topic, using the snapshot id as the key of the snapshot message.
895890
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, snapshotId, snapshotLocation,
896891
Collections.emptyList());
897-
898892
RecordMetadata recordMetadata = ConcurrentUtil.get(snapshotsProducer.apply(record));
899-
900893
return snapshotLocation;
901894
}
902895

app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java

+1
Original file line numberDiff line numberDiff line change
@@ -3492,6 +3492,7 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
34923492

34933493
@Override
34943494
public String createSnapshot(String location) throws RegistryStorageException {
3495+
log.debug("Creating internal database snapshot to location {}.", location);
34953496
handles.withHandleNoException(handle -> {
34963497
handle.createQuery(sqlStatements.createDataSnapshot())
34973498
.bind(0, location).mapTo(Integer.class);

0 commit comments

Comments
 (0)