Skip to content

Commit c9fdefe

Browse files
committed
Fix avro upgrader
1 parent 961032b commit c9fdefe

10 files changed

+440
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package io.apicurio.tests.dbupgrade;
2+
3+
import io.apicurio.registry.rest.client.RegistryClientFactory;
4+
import io.apicurio.registry.test.utils.KafkaTestContainerManager;
5+
import io.apicurio.registry.types.ArtifactType;
6+
import io.apicurio.registry.utils.IoUtil;
7+
import io.apicurio.registry.utils.tests.SimpleDisplayName;
8+
import io.apicurio.tests.ApicurioRegistryBaseIT;
9+
import io.apicurio.tests.utils.Constants;
10+
import io.apicurio.tests.utils.CustomTestsUtils;
11+
import io.apicurio.tests.utils.TestSeparator;
12+
import io.quarkus.test.common.QuarkusTestResource;
13+
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
14+
import io.quarkus.test.junit.QuarkusIntegrationTest;
15+
import org.junit.jupiter.api.DisplayNameGeneration;
16+
import org.junit.jupiter.api.Tag;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.TestInstance;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
import org.testcontainers.containers.GenericContainer;
22+
import org.testcontainers.containers.Network;
23+
import org.testcontainers.containers.wait.strategy.Wait;
24+
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.stream.Collectors;
29+
30+
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.PREPARE_AVRO_GROUP;
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
33+
@DisplayNameGeneration(SimpleDisplayName.class)
34+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
35+
@Tag(Constants.DB_UPGRADE)
36+
@Tag(Constants.KAFKA_SQL)
37+
@QuarkusTestResource(value = KafkaTestContainerManager.class, restrictToAnnotatedClass = true)
38+
@QuarkusTestResource(value = KafkaSqlAvroUpgraderIT.KafkaSqlAvroUpgraderInitializer.class, restrictToAnnotatedClass = true)
39+
@QuarkusIntegrationTest
40+
public class KafkaSqlAvroUpgraderIT extends ApicurioRegistryBaseIT implements TestSeparator, Constants {
41+
42+
static final Logger logger = LoggerFactory.getLogger(KafkaSqlLogCompactionIT.class);
43+
protected static CustomTestsUtils.ArtifactData avroData;
44+
45+
@Override
46+
public void cleanArtifacts() throws Exception {
47+
//Don't clean artifacts for this test
48+
}
49+
50+
@Test
51+
public void testStorageUpgradeAvroUpgraderKafkaSql() throws Exception {
52+
//The check must be retried so the kafka storage has been bootstrapped
53+
retry(() -> assertEquals(3, registryClient.listArtifactsInGroup(PREPARE_AVRO_GROUP).getCount()));
54+
55+
var searchResults = registryClient.listArtifactsInGroup(PREPARE_AVRO_GROUP);
56+
57+
var avros = searchResults.getArtifacts().stream()
58+
.filter(ar -> ar.getType().equals(ArtifactType.AVRO))
59+
.collect(Collectors.toList());
60+
61+
System.out.println("Avro artifacts are " + avros.size());
62+
assertEquals(1, avros.size());
63+
var avroMetadata = registryClient.getArtifactMetaData(avros.get(0).getGroupId(), avros.get(0).getId());
64+
var content = registryClient.getContentByGlobalId(avroMetadata.getGlobalId());
65+
66+
//search with canonicalize
67+
var versionMetadata = registryClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), true, null, content);
68+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
69+
70+
String test1content = ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v1.json");
71+
72+
//search with canonicalize
73+
versionMetadata = registryClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), true, null, IoUtil.toStream(test1content));
74+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
75+
76+
//search without canonicalize
77+
versionMetadata = registryClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), false, null, IoUtil.toStream(test1content));
78+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
79+
80+
//create one more avro artifact and verify
81+
String test2content = ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v2.json");
82+
avroData = CustomTestsUtils.createArtifact(registryClient, PREPARE_AVRO_GROUP, ArtifactType.AVRO, test2content);
83+
versionMetadata = registryClient.getArtifactVersionMetaDataByContent(PREPARE_AVRO_GROUP, avroData.meta.getId(), true, null, IoUtil.toStream(test2content));
84+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
85+
86+
//assert total num of artifacts
87+
assertEquals(4, registryClient.listArtifactsInGroup(PREPARE_AVRO_GROUP).getCount());
88+
}
89+
90+
public static class KafkaSqlAvroUpgraderInitializer implements QuarkusTestResourceLifecycleManager {
91+
private GenericContainer genericContainer;
92+
@Override
93+
public Map<String, String> start() {
94+
if (!Boolean.parseBoolean(System.getProperty("cluster.tests"))) {
95+
96+
String bootstrapServers = System.getProperty("bootstrap.servers.internal");
97+
startOldRegistryVersion("quay.io/apicurio/apicurio-registry-kafkasql:2.3.0.Final", bootstrapServers);
98+
99+
try {
100+
var registryClient = RegistryClientFactory.create("http://localhost:8081/");
101+
UpgradeTestsDataInitializer.prepareAvroCanonicalHashUpgradeData(registryClient);
102+
} catch (Exception e) {
103+
logger.warn("Error filling old registry with information: ", e);
104+
}
105+
}
106+
return Collections.emptyMap();
107+
}
108+
109+
@Override
110+
public int order() {
111+
return 10000;
112+
}
113+
114+
@Override
115+
public void stop() {
116+
if (genericContainer != null && genericContainer.isRunning()) {
117+
genericContainer.stop();
118+
}
119+
}
120+
121+
private void startOldRegistryVersion(String registryImage, String bootstrapServers) {
122+
genericContainer = new GenericContainer<>(registryImage)
123+
.withEnv(Map.of("KAFKA_BOOTSTRAP_SERVERS", bootstrapServers, "QUARKUS_HTTP_PORT", "8081"))
124+
.withExposedPorts(8081)
125+
.withNetwork(Network.SHARED);
126+
127+
genericContainer.setPortBindings(List.of("8081:8081"));
128+
129+
genericContainer.waitingFor(Wait.forHttp("/apis/registry/v2/search/artifacts").forStatusCode(200));
130+
genericContainer.start();
131+
}
132+
}
133+
}

integration-tests/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java integration-tests/src/test/java/io/apicurio/tests/dbupgrade/kafka/KafkaSqlProtobufUpgraderIT.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.quarkus.test.common.QuarkusTestResource;
3030
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
3131
import io.quarkus.test.junit.QuarkusIntegrationTest;
32-
import org.junit.jupiter.api.Assertions;
3332
import org.junit.jupiter.api.DisplayNameGeneration;
3433
import org.junit.jupiter.api.Tag;
3534
import org.junit.jupiter.api.Test;
@@ -45,8 +44,7 @@
4544
import java.util.Map;
4645
import java.util.stream.Collectors;
4746

48-
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.ARTIFACT_CONTENT;
49-
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.PREPARE_PROTO_GROUP;
47+
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.*;
5048
import static org.junit.jupiter.api.Assertions.assertEquals;
5149

5250
/**
@@ -57,12 +55,11 @@
5755
@Tag(Constants.DB_UPGRADE)
5856
@Tag(Constants.KAFKA_SQL)
5957
@QuarkusTestResource(value = KafkaTestContainerManager.class, restrictToAnnotatedClass = true)
60-
@QuarkusTestResource(value = KafkaSqlStorageUpgradeIT.KafkaSqlStorageUpgradeInitializer.class, restrictToAnnotatedClass = true)
58+
@QuarkusTestResource(value = KafkaSqlProtobufUpgraderIT.KafkaSqlStorageProtobufUpgraderInitializer.class, restrictToAnnotatedClass = true)
6159
@QuarkusIntegrationTest
62-
public class KafkaSqlStorageUpgradeIT extends ApicurioRegistryBaseIT implements TestSeparator, Constants {
60+
public class KafkaSqlProtobufUpgraderIT extends ApicurioRegistryBaseIT implements TestSeparator, Constants {
6361

6462
static final Logger logger = LoggerFactory.getLogger(KafkaSqlLogCompactionIT.class);
65-
6663
protected static CustomTestsUtils.ArtifactData artifactWithReferences;
6764
protected static List<ArtifactReference> artifactReferences;
6865
protected static CustomTestsUtils.ArtifactData protoData;
@@ -112,17 +109,7 @@ public void testStorageUpgradeProtobufUpgraderKafkaSql() throws Exception {
112109
assertEquals(4, registryClient.listArtifactsInGroup(PREPARE_PROTO_GROUP).getCount());
113110
}
114111

115-
@Test
116-
public void testStorageUpgradeReferencesContentHash() throws Exception {
117-
//The check must be retried so the kafka storage has been bootstrapped
118-
retry(() -> Assertions.assertTrue(registryClient.listArtifactsInGroup(artifactWithReferences.meta.getGroupId()).getCount() > 0), 1000L);
119-
//Once the storage is filled with the proper information, if we try to create the same artifact with the same references, no new version will be created and the same ids are used.
120-
CustomTestsUtils.ArtifactData upgradedArtifact = CustomTestsUtils.createArtifactWithReferences(artifactWithReferences.meta.getGroupId(), artifactWithReferences.meta.getId(), registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences);
121-
assertEquals(artifactWithReferences.meta.getGlobalId(), upgradedArtifact.meta.getGlobalId());
122-
assertEquals(artifactWithReferences.meta.getContentId(), upgradedArtifact.meta.getContentId());
123-
}
124-
125-
public static class KafkaSqlStorageUpgradeInitializer implements QuarkusTestResourceLifecycleManager {
112+
public static class KafkaSqlStorageProtobufUpgraderInitializer implements QuarkusTestResourceLifecycleManager {
126113
private GenericContainer genericContainer;
127114

128115
@Override
@@ -134,9 +121,7 @@ public Map<String, String> start() {
134121

135122
try {
136123
var registryClient = RegistryClientFactory.create("http://localhost:8081/");
137-
138124
UpgradeTestsDataInitializer.prepareProtobufHashUpgradeTest(registryClient);
139-
UpgradeTestsDataInitializer.prepareReferencesUpgradeTest(registryClient);
140125
} catch (Exception e) {
141126
logger.warn("Error filling old registry with information: ", e);
142127
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package io.apicurio.tests.dbupgrade;
2+
3+
import io.apicurio.registry.rest.client.RegistryClientFactory;
4+
import io.apicurio.registry.rest.v2.beans.ArtifactReference;
5+
import io.apicurio.registry.test.utils.KafkaTestContainerManager;
6+
import io.apicurio.registry.types.ArtifactType;
7+
import io.apicurio.registry.utils.tests.SimpleDisplayName;
8+
import io.apicurio.tests.ApicurioRegistryBaseIT;
9+
import io.apicurio.tests.utils.Constants;
10+
import io.apicurio.tests.utils.CustomTestsUtils;
11+
import io.apicurio.tests.utils.TestSeparator;
12+
import io.quarkus.test.common.QuarkusTestResource;
13+
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
14+
import io.quarkus.test.junit.QuarkusIntegrationTest;
15+
import org.junit.jupiter.api.Assertions;
16+
import org.junit.jupiter.api.DisplayNameGeneration;
17+
import org.junit.jupiter.api.Tag;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.TestInstance;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.testcontainers.containers.GenericContainer;
23+
import org.testcontainers.containers.Network;
24+
import org.testcontainers.containers.wait.strategy.Wait;
25+
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.ARTIFACT_CONTENT;
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
33+
@DisplayNameGeneration(SimpleDisplayName.class)
34+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
35+
@Tag(Constants.DB_UPGRADE)
36+
@Tag(Constants.KAFKA_SQL)
37+
@QuarkusTestResource(value = KafkaTestContainerManager.class, restrictToAnnotatedClass = true)
38+
@QuarkusTestResource(value = KafkaSqlReferencesUpgraderIT.KafkaSqlStorageReferencesUpgraderInitializer.class, restrictToAnnotatedClass = true)
39+
@QuarkusIntegrationTest
40+
public class KafkaSqlReferencesUpgraderIT extends ApicurioRegistryBaseIT implements TestSeparator, Constants {
41+
42+
static final Logger logger = LoggerFactory.getLogger(KafkaSqlLogCompactionIT.class);
43+
protected static CustomTestsUtils.ArtifactData artifactWithReferences;
44+
protected static List<ArtifactReference> artifactReferences;
45+
46+
@Override
47+
public void cleanArtifacts() throws Exception {
48+
//Don't clean artifacts for this test
49+
}
50+
51+
@Test
52+
public void testStorageUpgradeReferencesContentHash() throws Exception {
53+
//The check must be retried so the kafka storage has been bootstrapped
54+
retry(() -> Assertions.assertTrue(registryClient.listArtifactsInGroup(artifactWithReferences.meta.getGroupId()).getCount() > 0), 1000L);
55+
//Once the storage is filled with the proper information, if we try to create the same artifact with the same references, no new version will be created and the same ids are used.
56+
CustomTestsUtils.ArtifactData upgradedArtifact = CustomTestsUtils.createArtifactWithReferences(artifactWithReferences.meta.getGroupId(), artifactWithReferences.meta.getId(), registryClient, ArtifactType.AVRO, ARTIFACT_CONTENT, artifactReferences);
57+
assertEquals(artifactWithReferences.meta.getGlobalId(), upgradedArtifact.meta.getGlobalId());
58+
assertEquals(artifactWithReferences.meta.getContentId(), upgradedArtifact.meta.getContentId());
59+
}
60+
61+
public static class KafkaSqlStorageReferencesUpgraderInitializer implements QuarkusTestResourceLifecycleManager {
62+
private GenericContainer genericContainer;
63+
64+
@Override
65+
public Map<String, String> start() {
66+
if (!Boolean.parseBoolean(System.getProperty("cluster.tests"))) {
67+
68+
String bootstrapServers = System.getProperty("bootstrap.servers.internal");
69+
startOldRegistryVersion("quay.io/apicurio/apicurio-registry-kafkasql:2.3.0.Final", bootstrapServers);
70+
71+
try {
72+
var registryClient = RegistryClientFactory.create("http://localhost:8081/");
73+
UpgradeTestsDataInitializer.prepareReferencesUpgradeTest(registryClient);
74+
} catch (Exception e) {
75+
logger.warn("Error filling old registry with information: ", e);
76+
}
77+
}
78+
return Collections.emptyMap();
79+
}
80+
81+
@Override
82+
public int order() {
83+
return 10000;
84+
}
85+
86+
@Override
87+
public void stop() {
88+
if (genericContainer != null && genericContainer.isRunning()) {
89+
genericContainer.stop();
90+
}
91+
}
92+
93+
private void startOldRegistryVersion(String registryImage, String bootstrapServers) {
94+
genericContainer = new GenericContainer<>(registryImage)
95+
.withEnv(Map.of("KAFKA_BOOTSTRAP_SERVERS", bootstrapServers, "QUARKUS_HTTP_PORT", "8081"))
96+
.withExposedPorts(8081)
97+
.withNetwork(Network.SHARED);
98+
99+
genericContainer.setPortBindings(List.of("8081:8081"));
100+
101+
genericContainer.waitingFor(Wait.forHttp("/apis/registry/v2/search/artifacts").forStatusCode(200));
102+
genericContainer.start();
103+
}
104+
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
package io.apicurio.tests.dbupgrade.sql;public class SqlAvroUpgraderIt {
2+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
package io.apicurio.tests.dbupgrade.sql;public class SqlProtobufCanonicalHashUpgraderIT {
2+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
package io.apicurio.tests.dbupgrade.sql;public class SqlReferencesUpgraderIT {
2+
}

integration-tests/src/test/java/io/apicurio/tests/dbupgrade/SqlStorageUpgradeIT.java integration-tests/src/test/java/io/apicurio/tests/dbupgrade/sql/SqlStorageUpgradeIT.java

+47
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.UUID;
5959
import java.util.stream.Collectors;
6060

61+
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.PREPARE_AVRO_GROUP;
6162
import static io.apicurio.tests.dbupgrade.UpgradeTestsDataInitializer.PREPARE_PROTO_GROUP;
6263
import static org.junit.jupiter.api.Assertions.assertEquals;
6364

@@ -86,6 +87,7 @@ public class SqlStorageUpgradeIT extends ApicurioRegistryBaseIT implements TestS
8687
protected static CustomTestsUtils.ArtifactData artifactWithReferences;
8788
protected static List<ArtifactReference> artifactReferences;
8889
protected static CustomTestsUtils.ArtifactData protoData;
90+
protected static CustomTestsUtils.ArtifactData avroData;
8991

9092
public static RegistryClient upgradeTenantClient;
9193

@@ -121,6 +123,50 @@ public void testStorageUpgradeProtobufUpgraderKafkaSql() throws Exception {
121123
testStorageUpgradeProtobufUpgrader("protobufCanonicalHashKafkaSql");
122124
}
123125

126+
@Test
127+
public void testStorageUpgradeAvroUpgraderKafkaSql() throws Exception {
128+
testStorageUpgradeAvroUpgrader("testStorageUpgradeAvroUpgraderKafkaSql");
129+
}
130+
131+
public void testStorageUpgradeAvroUpgrader(String testName) throws Exception {
132+
//The check must be retried so the kafka storage has been bootstrapped
133+
retry(() -> assertEquals(3, upgradeTenantClient.listArtifactsInGroup(PREPARE_AVRO_GROUP).getCount()));
134+
135+
var searchResults = upgradeTenantClient.listArtifactsInGroup(PREPARE_AVRO_GROUP);
136+
137+
var avros = searchResults.getArtifacts().stream()
138+
.filter(ar -> ar.getType().equals(ArtifactType.AVRO))
139+
.collect(Collectors.toList());
140+
141+
System.out.println("Avro artifacts are " + avros.size());
142+
assertEquals(1, avros.size());
143+
var avroMetadata = upgradeTenantClient.getArtifactMetaData(avros.get(0).getGroupId(), avros.get(0).getId());
144+
var content = upgradeTenantClient.getContentByGlobalId(avroMetadata.getGlobalId());
145+
146+
//search with canonicalize
147+
var versionMetadata = upgradeTenantClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), true, null, content);
148+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
149+
150+
String test1content = ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v1.json");
151+
152+
//search with canonicalize
153+
versionMetadata = upgradeTenantClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), true, null, IoUtil.toStream(test1content));
154+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
155+
156+
//search without canonicalize
157+
versionMetadata = upgradeTenantClient.getArtifactVersionMetaDataByContent(avros.get(0).getGroupId(), avros.get(0).getId(), false, null, IoUtil.toStream(test1content));
158+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
159+
160+
//create one more avro artifact and verify
161+
String test2content = ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v2.json");
162+
avroData = CustomTestsUtils.createArtifact(upgradeTenantClient, PREPARE_AVRO_GROUP, ArtifactType.AVRO, test2content);
163+
versionMetadata = upgradeTenantClient.getArtifactVersionMetaDataByContent(PREPARE_AVRO_GROUP, avroData.meta.getId(), true, null, IoUtil.toStream(test2content));
164+
assertEquals(avroData.meta.getContentId(), versionMetadata.getContentId());
165+
166+
//assert total num of artifacts
167+
assertEquals(4, upgradeTenantClient.listArtifactsInGroup(PREPARE_AVRO_GROUP).getCount());
168+
}
169+
124170
public void testStorageUpgradeProtobufUpgrader(String testName) throws Exception {
125171
//The check must be retried so the kafka storage has been bootstrapped
126172
retry(() -> assertEquals(3, upgradeTenantClient.listArtifactsInGroup(PREPARE_PROTO_GROUP).getCount()));
@@ -200,6 +246,7 @@ public Map<String, String> start() {
200246
//Prepare the data for the content and canonical hash upgraders using an isolated tenant so we don't have data conflicts.
201247
UpgradeTestsDataInitializer.prepareProtobufHashUpgradeTest(tenantUpgradeClient.client);
202248
UpgradeTestsDataInitializer.prepareReferencesUpgradeTest(tenantUpgradeClient.client);
249+
UpgradeTestsDataInitializer.prepareAvroCanonicalHashUpgradeData(tenantUpgradeClient.client);
203250

204251
upgradeTenantClient = tenantUpgradeClient.client;
205252

0 commit comments

Comments
 (0)