Skip to content

Commit c74186b

Browse files
committed
Improve snapshot creation process
1 parent 3d0cb5b commit c74186b

File tree

13 files changed

+105
-80
lines changed

13 files changed

+105
-80
lines changed

app/src/main/java/io/apicurio/registry/rest/v3/AdminResourceImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ public List<ArtifactTypeInfo> listArtifactTypes() {
133133

134134
}
135135

136+
@Override
137+
@Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin)
138+
public void triggerSnapshot() {
139+
storage.triggerSnapshotCreation();
140+
}
141+
136142
/**
137143
* @see io.apicurio.registry.rest.v3.AdminResource#listGlobalRules()
138144
*/

app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -896,10 +896,9 @@ ArtifactVersionMetaDataDto createArtifactWithMetadata(String groupId, String art
896896
/**
897897
* Triggers a snapshot creation of the internal database.
898898
*
899-
* @param snapshotLocation
900899
* @throws RegistryStorageException
901900
*/
902-
String triggerSnapshotCreation(String snapshotLocation) throws RegistryStorageException;
901+
String triggerSnapshotCreation() throws RegistryStorageException;
903902

904903
/**
905904
* Creates the snapshot of the internal database based on configuration.

app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,9 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
471471
}
472472

473473
@Override
474-
public String triggerSnapshotCreation(String snapshotLocation) throws RegistryStorageException {
474+
public String triggerSnapshotCreation() throws RegistryStorageException {
475475
checkReadOnly();
476-
return delegate.triggerSnapshotCreation(snapshotLocation);
476+
return delegate.triggerSnapshotCreation();
477477
}
478478

479479
@Override

app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,8 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
384384
}
385385

386386
@Override
387-
public String triggerSnapshotCreation(String snapshotLocation) throws RegistryStorageException {
388-
return delegate.triggerSnapshotCreation(snapshotLocation);
387+
public String triggerSnapshotCreation() throws RegistryStorageException {
388+
return delegate.triggerSnapshotCreation();
389389
}
390390

391391
@Override

app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,8 @@ public List<GAV> getArtifactBranch(GA ga, BranchId branchId, ArtifactRetrievalBe
500500
}
501501

502502
@Override
503-
public String triggerSnapshotCreation(String snapshotLocation) throws RegistryStorageException {
504-
return proxy((storage -> storage.triggerSnapshotCreation(snapshotLocation)));
503+
public String triggerSnapshotCreation() throws RegistryStorageException {
504+
return proxy((RegistryStorage::triggerSnapshotCreation));
505505
}
506506

507507
@Override

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+24-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
import java.util.stream.Collectors;
88

99
import io.apicurio.registry.storage.impl.kafkasql.messages.*;
10+
import io.apicurio.registry.utils.kafka.ProducerActions;
1011
import jakarta.inject.Named;
1112
import org.apache.kafka.clients.CommonClientConfigs;
1213
import org.apache.kafka.clients.consumer.ConsumerRecord;
1314
import org.apache.kafka.clients.consumer.ConsumerRecords;
1415
import org.apache.kafka.clients.consumer.KafkaConsumer;
16+
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.clients.producer.RecordMetadata;
1518
import org.apache.kafka.common.errors.TopicExistsException;
1619
import org.slf4j.Logger;
1720

@@ -106,6 +109,10 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas
106109
@Named("KafkaSqlSnapshotsConsumer")
107110
KafkaConsumer<String, String> snapshotsConsumer;
108111

112+
@Inject
113+
@Named("KafkaSqlSnapshotsProducer")
114+
ProducerActions<String, String> snapshotsProducer;
115+
109116
@Inject
110117
KafkaSqlSubmitter submitter;
111118

@@ -251,14 +258,15 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
251258
log.debug("Consuming {} journal records.", records.count());
252259

253260
if (null != snapshotMessageKey && !snapshotProcessed) {
254-
//If there is a snapshot key present, we process (and discard) all the messages until we found the snapshot marker that corresponds to the snapshot key.
261+
//If there is a snapshot key present, we process (and discard) all the messages until we find the snapshot marker that corresponds to the snapshot key.
255262
Iterator<ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage>> it = records.iterator();
256263
while (it.hasNext() && !snapshotProcessed) {
257264
ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> record = it.next();
258265
if (processSnapshot(snapshotMessageKey, record)) {
259-
log.info("Subscribing to {}", configuration.topic());
260266
snapshotProcessed = true;
261267
break;
268+
} else {
269+
log.info("Discarding message with key {} as it was sent before a snapshot was created", record.key());
262270
}
263271
}
264272

@@ -857,10 +865,21 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
857865
}
858866

859867
@Override
860-
public String triggerSnapshotCreation(String snapshotLocation) throws RegistryStorageException {
861-
var message = new CreateSnapshot1Message(snapshotLocation);
868+
public String triggerSnapshotCreation() throws RegistryStorageException {
869+
//First we generate an identifier for the snapshot, then we send a snapshot marker to the journal topic.
870+
String snapshotId = UUID.randomUUID().toString();
871+
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
872+
var message = new CreateSnapshot1Message(path.toString());
862873
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
863-
return (String) coordinator.waitForResponse(uuid);
874+
String snapshotLocation = (String) coordinator.waitForResponse(uuid);
875+
876+
//Then we send a new message to the snapshots topic, using the marker uuid as the keyof the snapshot message.
877+
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, uuid.toString(), snapshotLocation,
878+
Collections.emptyList());
879+
880+
RecordMetadata recordMetadata = ConcurrentUtil.get(snapshotsProducer.apply(record));
881+
882+
return recordMetadata.toString();
864883
}
865884

866885
@Override

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSnapshotManager.java

-45
This file was deleted.

app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -3486,14 +3486,8 @@ public void importArtifactBranch(ArtifactBranchEntity entity) {
34863486
}
34873487

34883488
@Override
3489-
public String triggerSnapshotCreation(String location) throws RegistryStorageException {
3490-
//Snapshot creation is only supported in h2 and kafkasql. In this case, when this is invoked directly, it just creates the snapshot.
3491-
if ("h2".equals(sqlStatements.dbType())) {
3492-
return createSnapshot(location);
3493-
}
3494-
else {
3495-
return null;
3496-
}
3489+
public String triggerSnapshotCreation() throws RegistryStorageException {
3490+
throw new RegistryStorageException("Directly triggering the snapshot creation is not supported for sql storages.");
34973491
}
34983492

34993493
@Override

app/src/main/resources-unfiltered/META-INF/resources/api-specifications/registry/v3/openapi.json

+22
Original file line numberDiff line numberDiff line change
@@ -2801,6 +2801,28 @@
28012801
"description": "Gets a list of all the configured artifact types.\n\nThis operation can fail for the following reasons:\n\n* A server error occurred (HTTP error `500`)\n"
28022802
}
28032803
},
2804+
"/admin/config/triggerSnapshot": {
2805+
"summary": "Triggers a snapshot of the Registry storage. Only supported in KafkaSQL storage",
2806+
"get": {
2807+
"tags": [
2808+
"KafkaSQL",
2809+
"Admin",
2810+
"Snapshot"
2811+
],
2812+
"responses": {
2813+
"200": {
2814+
"content": {},
2815+
"description": "Empty content. A 200 means that the snapshot has been successfully triggered."
2816+
},
2817+
"500": {
2818+
"$ref": "#/components/responses/ServerError"
2819+
}
2820+
},
2821+
"operationId": "triggerSnapshot",
2822+
"summary": "Trigger storage snapshot",
2823+
"description": "Triggers the creation of a snapshot of the internal database for compatible storages.\n\nThis operation can fail for the following reasons:\n\n* A server error occurred (HTTP error `500`)\n"
2824+
}
2825+
},
28042826
"/groups/{groupId}/artifacts/{artifactId}": {
28052827
"summary": "Manage a single artifact.",
28062828
"get": {

app/src/test/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlSnapshotTest.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import io.apicurio.registry.AbstractResourceTestBase;
44
import io.apicurio.registry.rest.client.models.ArtifactContent;
5+
import io.apicurio.registry.rest.client.models.Rule;
6+
import io.apicurio.registry.rest.client.models.RuleType;
57
import io.apicurio.registry.utils.kafka.KafkaUtil;
68
import io.apicurio.registry.utils.tests.KafkasqlSnapshotTestProfile;
79
import io.quarkus.test.common.QuarkusTestResource;
@@ -10,18 +12,20 @@
1012
import io.quarkus.test.junit.TestProfile;
1113
import lombok.SneakyThrows;
1214
import org.apache.kafka.clients.CommonClientConfigs;
13-
import org.apache.kafka.clients.consumer.ConsumerConfig;
1415
import org.apache.kafka.clients.producer.KafkaProducer;
1516
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.common.header.internals.RecordHeader;
1618
import org.apache.kafka.common.serialization.StringSerializer;
1719
import org.junit.jupiter.api.Assertions;
1820
import org.junit.jupiter.api.BeforeAll;
1921
import org.junit.jupiter.api.Test;
2022

2123
import java.net.URISyntaxException;
2224
import java.net.URL;
25+
import java.nio.charset.StandardCharsets;
2326
import java.nio.file.Paths;
2427
import java.util.Collections;
28+
import java.util.List;
2529
import java.util.Map;
2630
import java.util.Properties;
2731
import java.util.Set;
@@ -37,9 +41,9 @@ public class KafkaSqlSnapshotTest extends AbstractResourceTestBase {
3741

3842
@BeforeAll
3943
public void init() {
40-
//Create a bunch of artifacts so they're added on top of the snapshot
44+
//Create a bunch of artifacts and rules, so they're added on top of the snapshot.
4145
String simpleAvro = resourceToString("avro.json");
42-
/*
46+
4347
for (int idx = 0; idx < 1000; idx++) {
4448
System.out.println("Iteration: " + idx);
4549
String artifactId = UUID.randomUUID().toString();
@@ -48,15 +52,18 @@ public void init() {
4852
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().post(content, config -> {
4953
config.headers.add("X-Registry-ArtifactId", artifactId);
5054
});
51-
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).delete();
52-
}
53-
*/
54-
55+
Rule rule = new Rule();
56+
rule.setType(RuleType.VALIDITY);
57+
rule.setConfig("SYNTAX_ONLY");
58+
clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().byArtifactId(artifactId).rules().post(rule); }
5559
}
5660

5761
@Test
5862
public void testRecoverFromSnapshot() throws InterruptedException {
63+
//We expect 4001 artifacts coming from the snapshot
5964
Assertions.assertEquals(4001, clientV3.groups().byGroupId("default").artifacts().get().getCount());
65+
//We expect another 1000 artifacts coming added on top of the snapshot
66+
Assertions.assertEquals(1000, clientV3.groups().byGroupId(NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID).artifacts().get().getCount());
6067
}
6168

6269
protected static class KafkaSqlSnapshotTestInitializer implements QuarkusTestResourceLifecycleManager {
@@ -73,7 +80,6 @@ public int order() {
7380
@SneakyThrows
7481
public Map<String, String> start() {
7582
Properties props = connectionProperties();
76-
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
7783

7884
KafkaUtil.createTopics(props, Set.of("kafkasql-snapshots", "kafkasql-journal"), Collections.emptyMap());
7985

@@ -89,6 +95,7 @@ private void prepareSnapshotMarker(Properties props) throws ExecutionException,
8995

9096
//Create the data producer to send a snapshot marker
9197
KafkaProducer<String, String> dataProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
98+
RecordHeader messageTypeHeader = new RecordHeader("mt", "CreateSnapshot1Message".getBytes(StandardCharsets.UTF_8));
9299
ProducerRecord<String, String> snapshotMarkerRecord = new ProducerRecord<>("kafkasql-journal", 0,
93100
"{\"uuid\":\"1302b402-c707-457e-af76-10c1045e68e8\",\"messageType\":\"CreateSnapshot1Message\",\"partitionKey\":\"__GLOBAL_PARTITION__\"}", "{\n"
94101
+ " \"snapshotLocation\": \"/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8.sql\",\n"
@@ -97,9 +104,10 @@ private void prepareSnapshotMarker(Properties props) throws ExecutionException,
97104
+ " \"messageType\": \"CreateSnapshot1Message\",\n"
98105
+ " \"partitionKey\": \"__GLOBAL_PARTITION__\"\n"
99106
+ " }\n"
100-
+ " }", Collections.emptyList());
107+
+ " }", List.of(messageTypeHeader));
101108

102109
//Send snapshot marker
110+
103111
dataProducer.send(snapshotMarkerRecord).get();
104112
}
105113

@@ -129,7 +137,7 @@ private void prepareSnapshotMessages(Properties props) throws URISyntaxException
129137

130138
public Properties connectionProperties() {
131139
Properties properties = new Properties();
132-
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers.external"));
140+
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
133141
properties.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
134142
properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 5000);
135143
return properties;

app/src/test/java/io/apicurio/registry/storage/impl/readonly/ReadOnlyRegistryStorageTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public class ReadOnlyRegistryStorageTest {
147147
entry("updateGlobalRule2", new State(true, s -> s.updateGlobalRule(null, null))),
148148
entry("updateGroupMetaData2", new State(true, s -> s.updateGroupMetaData(null, null))),
149149
entry("updateRoleMapping2", new State(true, s -> s.updateRoleMapping(null, null))),
150-
entry("triggerSnapshotCreation1", new State(true, s -> s.triggerSnapshotCreation(null))),
150+
entry("triggerSnapshotCreation1", new State(true, s -> s.triggerSnapshotCreation())),
151151
entry("createSnapshot1", new State(true, s -> s.createSnapshot(null)))
152152
);
153153

common/src/main/resources/META-INF/openapi.json

+22
Original file line numberDiff line numberDiff line change
@@ -2737,6 +2737,28 @@
27372737
"description": "Gets a list of all the configured artifact types.\n\nThis operation can fail for the following reasons:\n\n* A server error occurred (HTTP error `500`)\n"
27382738
}
27392739
},
2740+
"/admin/config/triggerSnapshot": {
2741+
"summary": "Triggers a snapshot of the Registry storage. Only supported in KafkaSQL storage",
2742+
"get": {
2743+
"tags": [
2744+
"KafkaSQL",
2745+
"Admin",
2746+
"Snapshot"
2747+
],
2748+
"responses": {
2749+
"200": {
2750+
"content": {},
2751+
"description": "Empty content. A 200 means that the snapshot has been successfully triggered."
2752+
},
2753+
"500": {
2754+
"$ref": "#/components/responses/ServerError"
2755+
}
2756+
},
2757+
"operationId": "triggerSnapshot",
2758+
"summary": "Trigger storage snapshot",
2759+
"description": "Triggers the creation of a snapshot of the internal database for compatible storages.\n\nThis operation can fail for the following reasons:\n\n* A server error occurred (HTTP error `500`)\n"
2760+
}
2761+
},
27402762
"/groups/{groupId}/artifacts/{artifactId}": {
27412763
"summary": "Manage a single artifact.",
27422764
"get": {

utils/tests/src/main/java/io/apicurio/registry/utils/tests/KafkaTestContainerManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public Map<String, String> start() {
3838
System.setProperty("bootstrap.servers.external", externalBootstrapServers);
3939

4040
return Map.of(
41-
"bootstrap.servers", externalBootstrapServers,
42-
"apicurio.events.kafka.config.bootstrap.servers", externalBootstrapServers,
43-
"apicurio.kafkasql.bootstrap.servers", externalBootstrapServers);
41+
"bootstrap.servers", "localhost:9092",
42+
"apicurio.events.kafka.config.bootstrap.servers", "localhost:9092",
43+
"apicurio.kafkasql.bootstrap.servers", "localhost:9092");
4444
} else {
4545
return Collections.emptyMap();
4646
}

0 commit comments

Comments
 (0)