Skip to content

Commit 4939aa2

Browse files
committed
Fix local test execution and wrong column being fetched
1 parent 5eb5e4c commit 4939aa2

File tree

4 files changed

+34
-23
lines changed

4 files changed

+34
-23
lines changed

integration-tests/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlLogCompactionIT.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.slf4j.Logger;
4141
import org.slf4j.LoggerFactory;
4242
import org.testcontainers.containers.GenericContainer;
43+
import org.testcontainers.containers.Network;
4344
import org.testcontainers.containers.wait.strategy.Wait;
4445

4546
import java.util.Collections;
@@ -106,24 +107,28 @@ public int order() {
106107
public Map<String, String> start() {
107108
if (!Boolean.parseBoolean(System.getProperty("cluster.tests"))) {
108109

109-
String bootstrapServers = System.getProperty("bootstrap.servers");
110+
String externalBootstrapServers = System.getProperty("bootstrap.servers.external");
111+
String internalBootstrapServers = System.getProperty("bootstrap.servers.internal");
110112

111-
genericContainer = new GenericContainer("quay.io/apicurio/apicurio-registry-kafkasql:2.1.2.Final")
112-
.withEnv(Map.of("KAFKA_BOOTSTRAP_SERVERS", bootstrapServers, "QUARKUS_HTTP_PORT", "8081"));
113+
genericContainer = new GenericContainer<>("quay.io/apicurio/apicurio-registry-kafkasql:2.1.2.Final")
114+
.withEnv(Map.of("KAFKA_BOOTSTRAP_SERVERS", internalBootstrapServers, "QUARKUS_HTTP_PORT", "8081"))
115+
.withExposedPorts(8081)
116+
.withNetwork(Network.SHARED);
113117

118+
genericContainer.setPortBindings(List.of("8081:8081"));
119+
genericContainer.waitingFor(Wait.forHttp("/apis/registry/v2/search/artifacts").forStatusCode(200));
120+
genericContainer.start();
114121
//create the topic with agressive log compaction
115-
createTopic("kafkasql-journal", 1, bootstrapServers);
116-
122+
createTopic("kafkasql-journal", 1, externalBootstrapServers);
117123
genericContainer.start();
118-
genericContainer.waitingFor(Wait.forLogMessage(".*(KSQL Kafka Consumer Thread) KafkaSQL storage bootstrapped.*", 1));
119124

120125
var registryClient = RegistryClientFactory.create("http://localhost:8081");
121126

122127
try {
123-
RegistryWaitUtils.retry(registryClient, registryClient1 -> CustomTestsUtils.createArtifact(registryClient, ArtifactType.AVRO, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v1.json")));
128+
RegistryWaitUtils.retry(registryClient, registryClient1 -> CustomTestsUtils.createArtifact(registryClient, PREPARE_LOG_COMPACTION, ArtifactType.AVRO, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "avro/multi-field_v1.json")));
124129

125-
var artifactdata = CustomTestsUtils.createArtifact(registryClient, ArtifactType.JSON, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "jsonSchema/person_v1.json"));
126-
CustomTestsUtils.createArtifact(registryClient, ArtifactType.PROTOBUF, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "protobuf/tutorial_v1.proto"));
130+
var artifactdata = CustomTestsUtils.createArtifact(registryClient, PREPARE_LOG_COMPACTION, ArtifactType.JSON, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "jsonSchema/person_v1.json"));
131+
CustomTestsUtils.createArtifact(registryClient, PREPARE_LOG_COMPACTION, ArtifactType.PROTOBUF, ApicurioRegistryBaseIT.resourceToString("artifactTypes/" + "protobuf/tutorial_v1.proto"));
127132

128133
assertEquals(3, registryClient.listArtifactsInGroup(PREPARE_LOG_COMPACTION).getCount());
129134

@@ -165,15 +170,11 @@ public AdminClient adminClient(String bootstrapServers) {
165170
public void createTopic(String topic, int partitions, String bootstrapServers) {
166171
var journal = new NewTopic(topic, partitions, (short) 1);
167172

168-
/*journal.configs(Map.of(
173+
journal.configs(Map.of(
169174
"min.cleanable.dirty.ratio","0.000001",
170175
"cleanup.policy","compact",
171176
"segment.ms", "100",
172177
"delete.retention.ms", "100"
173-
));*/
174-
175-
journal.configs(Map.of(
176-
"cleanup.policy", "compact"
177178
));
178179

179180
adminClient(bootstrapServers).createTopics(List.of(journal));

integration-tests/src/test/java/io/apicurio/tests/dbupgrade/KafkaSqlStorageUpgradeIT.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static class KafkaSqlStorageUpgradeInitializer implements QuarkusTestReso
129129
public Map<String, String> start() {
130130
if (!Boolean.parseBoolean(System.getProperty("cluster.tests"))) {
131131

132-
String bootstrapServers = System.getProperty("bootstrap.servers");
132+
String bootstrapServers = System.getProperty("bootstrap.servers.internal");
133133
startOldRegistryVersion("quay.io/apicurio/apicurio-registry-kafkasql:2.1.2.Final", bootstrapServers);
134134

135135
try {
@@ -159,12 +159,13 @@ public void stop() {
159159
private void startOldRegistryVersion(String registryImage, String bootstrapServers) {
160160
genericContainer = new GenericContainer<>(registryImage)
161161
.withEnv(Map.of("KAFKA_BOOTSTRAP_SERVERS", bootstrapServers, "QUARKUS_HTTP_PORT", "8081"))
162+
.withExposedPorts(8081)
162163
.withNetwork(Network.SHARED);
163164

164165
genericContainer.setPortBindings(List.of("8081:8081"));
165166

167+
genericContainer.waitingFor(Wait.forHttp("/apis/registry/v2/search/artifacts").forStatusCode(200));
166168
genericContainer.start();
167-
genericContainer.waitingFor(Wait.forLogMessage(".*(KSQL Kafka Consumer Thread) KafkaSQL storage bootstrapped.*", 1));
168169
}
169170
}
170171
}

storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlAvroCanonicalizerUpgrader.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import jakarta.inject.Inject;
3535
import org.apache.commons.codec.digest.DigestUtils;
3636
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3738

3839
import java.io.ByteArrayOutputStream;
3940
import java.io.IOException;
@@ -63,7 +64,7 @@ public void upgrade(Handle dbHandle) throws Exception {
6364

6465
@Override
6566
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception {
66-
String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, a.type "
67+
String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, v.tenantId "
6768
+ "FROM versions v "
6869
+ "JOIN content c on c.contentId = v.contentId "
6970
+ "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId "
@@ -83,8 +84,12 @@ protected void updateCanonicalHash(TenantContentEntity contentEntity) {
8384
try {
8485

8586
String canonicalContentHash;
86-
byte[] referencesBytes = contentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8);
87-
canonicalContentHash = DigestUtils.sha256Hex(concatContentAndReferences(this.canonicalizeContent(contentEntity.contentEntity, contentEntity.contentEntity.artifactType).bytes(), referencesBytes));
87+
if (contentEntity.contentEntity.serializedReferences != null) {
88+
byte[] referencesBytes = contentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8);
89+
canonicalContentHash = DigestUtils.sha256Hex(concatContentAndReferences(this.canonicalizeContent(contentEntity.contentEntity, ArtifactType.AVRO).bytes(), referencesBytes));
90+
} else {
91+
canonicalContentHash = DigestUtils.sha256Hex(this.canonicalizeContent(contentEntity.contentEntity, contentEntity.contentEntity.artifactType).bytes());
92+
}
8893

8994
if (canonicalContentHash.equals(contentEntity.contentEntity.canonicalHash)) {
9095
logger.debug("Skipping content because the canonical hash is up to date, updating contentId {}", contentEntity.contentEntity.contentId);
@@ -121,6 +126,7 @@ public static class TenantContentEntity {
121126
}
122127

123128
public static class TenantContentEntityRowMapper implements RowMapper<TenantContentEntity> {
129+
124130
@Override
125131
public TenantContentEntity map(ResultSet rs) throws SQLException {
126132
TenantContentEntity e = new TenantContentEntity();

utils/kafka/src/test/java/io/apicurio/registry/test/utils/KafkaTestContainerManager.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ public Map<String, String> start() {
5050
kafka.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-testcontainer")));
5151
kafka.start();
5252

53+
String externalBootstrapServers = kafka.getBootstrapServers();
54+
5355
String bootstrapServers = "redpanda:19092";
54-
System.setProperty("bootstrap.servers", bootstrapServers);
56+
System.setProperty("bootstrap.servers.internal", bootstrapServers);
57+
System.setProperty("bootstrap.servers.external", externalBootstrapServers);
5558

5659
return Map.of(
57-
"bootstrap.servers", bootstrapServers,
58-
"registry.events.kafka.config.bootstrap.servers", bootstrapServers,
59-
"registry.kafkasql.bootstrap.servers", bootstrapServers);
60+
"bootstrap.servers", externalBootstrapServers,
61+
"registry.events.kafka.config.bootstrap.servers", externalBootstrapServers,
62+
"registry.kafkasql.bootstrap.servers", externalBootstrapServers);
6063
} else {
6164
return Collections.emptyMap();
6265
}

0 commit comments

Comments
 (0)