Skip to content

Commit 2ab68fa

Browse files
authored
Support kafka sql snapshotting (#4665)
* Add basic structure for kafkasql snapshotting * Use strimzi container for testing * Add snapshot creation and basic testing structure * Implement restoring h2 database from snapshot * Improve snapshot creation process * Finish coordination mechanism for snapshot creation and add snapshot creation test * Make the replica that triggered the snapshot actually create it * Add some log messages to improve traceability * Fix snapshotting unit test * Enable ryuk in ci * Randomize db test url * Fix registry loader * Add kafkasql snapshotting integration tests
1 parent cb2879b commit 2ab68fa

File tree

54 files changed

+5621
-227
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+5621
-227
lines changed

.github/workflows/integration-tests.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ jobs:
234234
- name: Run Integration Tests - migration - Kafkasql
235235
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pmigration -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true
236236

237+
- name: Run Integration Tests - snapshotting - Kafkasql
238+
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pkafkasql-snapshotting -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true
239+
237240
- name: Collect logs
238241
if: failure()
239242
run: sh ./.github/scripts/collect_logs.sh

.github/workflows/verify.yaml

-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ concurrency:
2121
group: ${{ github.workflow }}-${{ github.ref }}
2222
cancel-in-progress: true
2323

24-
env:
25-
TESTCONTAINERS_RYUK_DISABLED: true
26-
2724
jobs:
2825
build-verify:
2926
name: Verify Application Build

app/.gitignore

-1
This file was deleted.

app/pom.xml

+2-6
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@
9393
<groupId>io.quarkus</groupId>
9494
<artifactId>quarkus-resteasy-jackson</artifactId>
9595
</dependency>
96-
<dependency>
97-
<groupId>io.quarkus</groupId>
98-
<artifactId>quarkus-smallrye-jwt</artifactId>
99-
</dependency>
10096
<dependency>
10197
<groupId>io.quarkus</groupId>
10298
<artifactId>quarkus-smallrye-health</artifactId>
@@ -294,8 +290,8 @@
294290
<type>test-jar</type>
295291
</dependency>
296292
<dependency>
297-
<groupId>org.testcontainers</groupId>
298-
<artifactId>redpanda</artifactId>
293+
<groupId>io.strimzi</groupId>
294+
<artifactId>strimzi-test-container</artifactId>
299295
<scope>test</scope>
300296
</dependency>
301297
<dependency>

app/src/main/java/io/apicurio/registry/config/RegistryStorageConfigCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void run() {
9595

9696
private void refresh() {
9797
Instant now = Instant.now();
98-
if (lastRefresh != null) {
98+
if (lastRefresh != null && this.delegate != null && this.delegate.isReady()) {
9999
List<DynamicConfigPropertyDto> staleConfigProperties = this.getStaleConfigProperties(lastRefresh);
100100
if (!staleConfigProperties.isEmpty()) {
101101
invalidateCache();

app/src/main/java/io/apicurio/registry/rest/v3/AdminResourceImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ public List<ArtifactTypeInfo> listArtifactTypes() {
133133

134134
}
135135

136+
@Override
137+
@Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin)
138+
public void triggerSnapshot() {
139+
storage.triggerSnapshotCreation();
140+
}
141+
136142
/**
137143
* @see io.apicurio.registry.rest.v3.AdminResource#listGlobalRules()
138144
*/

app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java

+18-32
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,8 @@
77
import io.apicurio.registry.model.GA;
88
import io.apicurio.registry.model.GAV;
99
import io.apicurio.registry.model.VersionId;
10-
import io.apicurio.registry.storage.dto.ArtifactMetaDataDto;
11-
import io.apicurio.registry.storage.dto.ArtifactReferenceDto;
12-
import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto;
13-
import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto;
14-
import io.apicurio.registry.storage.dto.CommentDto;
15-
import io.apicurio.registry.storage.dto.ContentWrapperDto;
16-
import io.apicurio.registry.storage.dto.DownloadContextDto;
17-
import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto;
18-
import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto;
19-
import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto;
20-
import io.apicurio.registry.storage.dto.GroupMetaDataDto;
21-
import io.apicurio.registry.storage.dto.GroupSearchResultsDto;
22-
import io.apicurio.registry.storage.dto.OrderBy;
23-
import io.apicurio.registry.storage.dto.OrderDirection;
24-
import io.apicurio.registry.storage.dto.RoleMappingDto;
25-
import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto;
26-
import io.apicurio.registry.storage.dto.RuleConfigurationDto;
27-
import io.apicurio.registry.storage.dto.SearchFilter;
28-
import io.apicurio.registry.storage.dto.StoredArtifactVersionDto;
29-
import io.apicurio.registry.storage.dto.VersionSearchResultsDto;
30-
import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException;
31-
import io.apicurio.registry.storage.error.ArtifactNotFoundException;
32-
import io.apicurio.registry.storage.error.ContentNotFoundException;
33-
import io.apicurio.registry.storage.error.GroupAlreadyExistsException;
34-
import io.apicurio.registry.storage.error.GroupNotFoundException;
35-
import io.apicurio.registry.storage.error.RegistryStorageException;
36-
import io.apicurio.registry.storage.error.RuleAlreadyExistsException;
37-
import io.apicurio.registry.storage.error.RuleNotFoundException;
38-
import io.apicurio.registry.storage.error.VersionAlreadyExistsException;
39-
import io.apicurio.registry.storage.error.VersionNotFoundException;
10+
import io.apicurio.registry.storage.dto.*;
11+
import io.apicurio.registry.storage.error.*;
4012
import io.apicurio.registry.storage.impexp.EntityInputStream;
4113
import io.apicurio.registry.types.RuleType;
4214
import io.apicurio.registry.utils.impexp.ArtifactBranchEntity;
@@ -399,7 +371,7 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
399371
/**
400372
* Gets the stored meta-data for a single version of an artifact. This will return all meta-data for the
401373
* version, including any user edited meta-data along with anything generated by the artifactStore.
402-
*
374+
*
403375
* @param globalId
404376
* @throws VersionNotFoundException
405377
* @throws RegistryStorageException
@@ -581,7 +553,7 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
581553
* @param limit the result size limit
582554
*/
583555
RoleMappingSearchResultsDto searchRoleMappings(int offset, int limit) throws RegistryStorageException;
584-
556+
585557
/**
586558
* Gets the details of a single role mapping.
587559
*
@@ -865,6 +837,20 @@ VersionSearchResultsDto searchVersions(String groupId, String artifactId, OrderB
865837
*/
866838
void deleteArtifactBranch(GA ga, BranchId branchId);
867839

840+
/**
841+
* Triggers a snapshot creation of the internal database.
842+
*
843+
* @throws RegistryStorageException
844+
*/
845+
String triggerSnapshotCreation() throws RegistryStorageException;
846+
847+
/**
848+
* Creates the snapshot of the internal database based on configuration.
849+
*
850+
* @param snapshotLocation
851+
* @throws RegistryStorageException
852+
*/
853+
String createSnapshot(String snapshotLocation) throws RegistryStorageException;
868854

869855
enum ArtifactRetrievalBehavior {
870856
DEFAULT,

app/src/main/java/io/apicurio/registry/storage/decorator/ReadOnlyRegistryStorageDecorator.java

+12
Original file line numberDiff line numberDiff line change
@@ -414,4 +414,16 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
414414
checkReadOnly();
415415
delegate.deleteArtifactBranch(ga, branchId);
416416
}
417+
418+
@Override
419+
public String triggerSnapshotCreation() throws RegistryStorageException {
420+
checkReadOnly();
421+
return delegate.triggerSnapshotCreation();
422+
}
423+
424+
@Override
425+
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
426+
checkReadOnly();
427+
return delegate.createSnapshot(snapshotLocation);
428+
}
417429
}

app/src/main/java/io/apicurio/registry/storage/decorator/RegistryStorageDecoratorBase.java

+10
Original file line numberDiff line numberDiff line change
@@ -326,4 +326,14 @@ public void createOrReplaceArtifactBranch(GA ga, BranchId branchId, List<Version
326326
public void deleteArtifactBranch(GA ga, BranchId branchId) {
327327
delegate.deleteArtifactBranch(ga, branchId);
328328
}
329+
330+
@Override
331+
public String triggerSnapshotCreation() throws RegistryStorageException {
332+
return delegate.triggerSnapshotCreation();
333+
}
334+
335+
@Override
336+
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
337+
return delegate.createSnapshot(snapshotLocation);
338+
}
329339
}

app/src/main/java/io/apicurio/registry/storage/impl/gitops/GitOpsRegistryStorage.java

+10
Original file line numberDiff line numberDiff line change
@@ -512,4 +512,14 @@ public GAV getArtifactBranchTip(GA ga, BranchId branchId, ArtifactRetrievalBehav
512512
public List<GAV> getArtifactBranch(GA ga, BranchId branchId, ArtifactRetrievalBehavior behavior) {
513513
return proxy(storage -> storage.getArtifactBranch(ga, branchId, behavior));
514514
}
515+
516+
@Override
517+
public String triggerSnapshotCreation() throws RegistryStorageException {
518+
return proxy((RegistryStorage::triggerSnapshotCreation));
519+
}
520+
521+
@Override
522+
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
523+
return proxy((storage -> storage.createSnapshot(snapshotLocation)));
524+
}
515525
}

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
import java.util.Properties;
44

55
public interface KafkaSqlConfiguration {
6-
76
String bootstrapServers();
87
String topic();
8+
String snapshotsTopic();
9+
String snapshotEvery();
10+
String snapshotLocation();
911
Properties topicProperties();
1012
boolean isTopicAutoCreate();
1113
Integer pollTimeout();
1214
Integer responseTimeout();
1315
Properties producerProperties();
1416
Properties consumerProperties();
1517
Properties adminProperties();
16-
1718
}

0 commit comments

Comments
 (0)