Skip to content

Commit 5ecb855

Browse files
committed
Add some log messages to improve traceability
1 parent 91e49b4 commit 5ecb855

File tree

2 files changed

+5
-11
lines changed

2 files changed

+5
-11
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,8 @@ private void autoCreateTopics() {
177177
}
178178

179179
/**
180-
* Start the KSQL Kafka consumer thread which is responsible for subscribing to the kafka topic,
181-
* consuming JournalRecord entries found on that topic, and applying those journal entries to
182-
* the internal data model.
180+
* Consume the snapshots topic, looking for the most recent snapshots in the topic. Once found, it restores the internal h2 database using the snapshot's content.
181+
* WARNING: This has the limitation of processing the first 500 snapshots, which should be enough for most deployments.
183182
*/
184183
private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsConsumer) {
185184
// Subscribe to the snapshots topic
@@ -201,7 +200,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
201200
try {
202201
String path = snapshotFound.value();
203202
if (null != path && !path.isBlank() && Files.exists(Path.of(snapshotFound.value()))) {
204-
log.info("Snapshot with path {} found.", snapshotFound.value());
203+
log.debug("Snapshot with path {} found.", snapshotFound.value());
205204
snapshotRecordKey = snapshotFound.key();
206205
mostRecentSnapshotPath = Path.of(snapshotFound.value());
207206
}
@@ -219,7 +218,6 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
219218
}
220219

221220
snapshotsConsumer.commitSync();
222-
223221
return snapshotRecordKey;
224222
}
225223

@@ -238,9 +236,6 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
238236
Runnable runner = () -> {
239237
try (consumer) {
240238
log.info("Subscribing to {}", configuration.topic());
241-
242-
//TODO use the snapshot record metadata to put the journal consumer into the appropiate offset so it does not consume unneeded messages
243-
244239
// Subscribe to the journal topic
245240
Collection<String> topics = Collections.singleton(configuration.topic());
246241
consumer.subscribe(topics);
@@ -842,15 +837,13 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
842837
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
843838
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
844839
this.lastTriggeredSnapshot = snapshotId;
840+
log.debug("Snapshot with id {} triggered.", snapshotId);
845841
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
846842
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
847-
848843
//Then we send a new message to the snapshots topic, using the snapshot id as the key of the snapshot message.
849844
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, snapshotId, snapshotLocation,
850845
Collections.emptyList());
851-
852846
RecordMetadata recordMetadata = ConcurrentUtil.get(snapshotsProducer.apply(record));
853-
854847
return snapshotLocation;
855848
}
856849

app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java

+1
Original file line numberDiff line numberDiff line change
@@ -3393,6 +3393,7 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
33933393

33943394
@Override
33953395
public String createSnapshot(String location) throws RegistryStorageException {
3396+
log.debug("Creating internal database snapshot to location {}.", location);
33963397
handles.withHandleNoException(handle -> {
33973398
handle.createQuery(sqlStatements.createDataSnapshot())
33983399
.bind(0, location).mapTo(Integer.class);

0 commit comments

Comments
 (0)