Skip to content

Commit 62b1672

Browse files
committed
Backport enhanced avro canonicalizer
1 parent 0c4216d commit 62b1672

File tree

18 files changed

+463
-228
lines changed

18 files changed

+463
-228
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ private void applyUpgrader(Handle handle, String cname) {
368368
@SuppressWarnings("unchecked")
369369
Class<IDbUpgrader> upgraderClass = (Class<IDbUpgrader>) Class.forName(cname);
370370
IDbUpgrader upgrader = upgraderClass.getConstructor().newInstance();
371-
upgrader.upgrade(handle);
371+
upgrader.upgrade(this, handle);
372372
} catch (Exception e) {
373373
throw new RuntimeException(e);
374374
}

app/src/main/java/io/apicurio/registry/storage/impl/sql/IDbUpgrader.java

+8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.apicurio.registry.storage.impl.sql;
1818

19+
import io.apicurio.registry.storage.RegistryStorage;
1920
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
2021

2122
/**
@@ -29,4 +30,11 @@ public interface IDbUpgrader {
2930
*/
3031
public void upgrade(Handle dbHandle) throws Exception;
3132

33+
/**
34+
* Called by the {@link AbstractSqlRegistryStorage} class when upgrading the database.
35+
* @param registryStorage
36+
* @param dbHandle
37+
*/
38+
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception;
39+
3240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2021 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.storage.impl.sql.upgrader;
18+
19+
import io.apicurio.registry.content.ContentHandle;
20+
import io.apicurio.registry.content.canon.ContentCanonicalizer;
21+
import io.apicurio.registry.storage.RegistryStorage;
22+
import io.apicurio.registry.storage.dto.ArtifactReferenceDto;
23+
import io.apicurio.registry.storage.impl.sql.IDbUpgrader;
24+
import io.apicurio.registry.storage.impl.sql.SqlUtil;
25+
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
26+
import io.apicurio.registry.storage.impl.sql.jdb.RowMapper;
27+
import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper;
28+
import io.apicurio.registry.types.ArtifactType;
29+
import io.apicurio.registry.types.provider.ArtifactTypeUtilProviderFactory;
30+
import io.apicurio.registry.types.provider.DefaultArtifactTypeUtilProviderImpl;
31+
import io.apicurio.registry.utils.impexp.ContentEntity;
32+
import io.quarkus.runtime.annotations.RegisterForReflection;
33+
import org.apache.commons.codec.digest.DigestUtils;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.io.ByteArrayOutputStream;
38+
import java.io.IOException;
39+
import java.nio.charset.StandardCharsets;
40+
import java.sql.ResultSet;
41+
import java.sql.SQLException;
42+
import java.util.List;
43+
import java.util.stream.Stream;
44+
45+
/**
46+
* @author Carles Arnal
47+
*/
48+
@RegisterForReflection
49+
public class AvroCanonicalHashUpgrader implements IDbUpgrader {
50+
51+
private static final Logger logger = LoggerFactory.getLogger(ReferencesContentHashUpgrader.class);
52+
53+
private static final ArtifactTypeUtilProviderFactory factory = new DefaultArtifactTypeUtilProviderImpl();
54+
55+
private RegistryStorage storage;
56+
57+
/**
58+
* @see io.apicurio.registry.storage.impl.sql.IDbUpgrader#upgrade(io.apicurio.registry.storage.impl.sql.jdb.Handle)
59+
*/
60+
@Override
61+
public void upgrade(Handle dbHandle) throws Exception {
62+
//Do nothing, this is just implemented for backward compatibility reasons.
63+
}
64+
65+
@Override
66+
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception {
67+
this.storage = registryStorage;
68+
String sql = "SELECT c.contentId, c.content, c.canonicalHash, c.contentHash, c.artifactreferences, a.type "
69+
+ "FROM versions v "
70+
+ "JOIN content c on c.contentId = v.contentId "
71+
+ "JOIN artifacts a ON v.tenantId = a.tenantId AND v.groupId = a.groupId AND v.artifactId = a.artifactId "
72+
+ "WHERE a.type = ?";
73+
74+
Stream<TypeContentEntity> stream = dbHandle.createQuery(sql)
75+
.setFetchSize(50)
76+
.bind(0, ArtifactType.AVRO)
77+
.map(new TenantContentEntityRowMapper())
78+
.stream();
79+
try (stream) {
80+
stream.forEach(entity -> updateCanonicalHash(entity, dbHandle));
81+
}
82+
}
83+
84+
private void updateCanonicalHash(TypeContentEntity contentEntity, Handle dbHandle) {
85+
try {
86+
String canonicalContentHash;
87+
byte[] referencesBytes = contentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8);
88+
canonicalContentHash = DigestUtils.sha256Hex(concatContentAndReferences(this.canonicalizeContent(contentEntity.contentEntity, contentEntity.type).bytes(), referencesBytes));
89+
90+
if (canonicalContentHash.equals(contentEntity.contentEntity.canonicalHash)) {
91+
logger.debug("Skipping content because the canonical hash is up to date, updating contentId {}", contentEntity.contentEntity.contentId);
92+
return;
93+
}
94+
95+
logger.debug("Avro content canonicalHash outdated value detected, updating contentId {}", contentEntity.contentEntity.contentId);
96+
97+
String update = "UPDATE content SET canonicalHash = ? WHERE contentId = ? AND contentHash = ?";
98+
int rowCount = dbHandle.createUpdate(update)
99+
.bind(0, canonicalContentHash)
100+
.bind(1, contentEntity.contentEntity.contentId)
101+
.bind(2, contentEntity.contentEntity.contentHash)
102+
.execute();
103+
104+
if (rowCount == 0) {
105+
logger.warn("content row not matched for canonical hash upgrade contentId {} contentHash {}", contentEntity.contentEntity.contentId, contentEntity.contentEntity.contentHash);
106+
}
107+
108+
} catch (Exception e) {
109+
logger.warn("Error found processing content with id {} and hash {}", contentEntity.contentEntity.contentId, contentEntity.contentEntity.contentHash, e);
110+
}
111+
}
112+
113+
private byte[] concatContentAndReferences(byte[] contentBytes, byte[] referencesBytes) throws IOException {
114+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(contentBytes.length + referencesBytes.length);
115+
outputStream.write(contentBytes);
116+
outputStream.write(referencesBytes);
117+
return outputStream.toByteArray();
118+
}
119+
120+
private ContentHandle canonicalizeContent(ContentEntity contentEntity, String type) {
121+
ContentHandle contentHandle = ContentHandle.create(contentEntity.contentBytes);
122+
List<ArtifactReferenceDto> artifactReferenceDtos = SqlUtil.deserializeReferences(contentEntity.serializedReferences);
123+
ContentCanonicalizer canonicalizer = factory.getArtifactTypeProvider(type).getContentCanonicalizer();
124+
return canonicalizer.canonicalize(contentHandle, storage.resolveReferences(artifactReferenceDtos));
125+
}
126+
127+
128+
public static class TypeContentEntity {
129+
String type;
130+
ContentEntity contentEntity;
131+
}
132+
133+
public static class TenantContentEntityRowMapper implements RowMapper<TypeContentEntity> {
134+
@Override
135+
public TypeContentEntity map(ResultSet rs) throws SQLException {
136+
TypeContentEntity e = new TypeContentEntity();
137+
e.type = rs.getString("type");
138+
e.contentEntity = ContentEntityMapper.instance.map(rs);
139+
return e;
140+
}
141+
}
142+
}

app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ProtobufCanonicalHashUpgrader.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616

1717
package io.apicurio.registry.storage.impl.sql.upgrader;
1818

19-
import java.util.Collections;
20-
import java.util.stream.Stream;
21-
22-
import org.apache.commons.codec.digest.DigestUtils;
23-
import org.slf4j.Logger;
24-
import org.slf4j.LoggerFactory;
25-
2619
import io.apicurio.registry.content.ContentHandle;
2720
import io.apicurio.registry.content.canon.ContentCanonicalizer;
2821
import io.apicurio.registry.content.canon.ProtobufContentCanonicalizer;
22+
import io.apicurio.registry.storage.RegistryStorage;
2923
import io.apicurio.registry.storage.impl.sql.IDbUpgrader;
3024
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
3125
import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper;
3226
import io.apicurio.registry.types.ArtifactType;
3327
import io.apicurio.registry.utils.impexp.ContentEntity;
3428
import io.quarkus.runtime.annotations.RegisterForReflection;
29+
import org.apache.commons.codec.digest.DigestUtils;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.util.Collections;
34+
import java.util.stream.Stream;
3535

3636
/**
3737
* @author Fabian Martinez
@@ -66,6 +66,11 @@ public void upgrade(Handle dbHandle) throws Exception {
6666

6767
}
6868

69+
@Override
70+
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception {
71+
this.upgrade(dbHandle);
72+
}
73+
6974
private void updateCanonicalHash(ContentEntity contentEntity, Handle dbHandle) {
7075

7176
ContentHandle canonicalContent = this.canonicalizeContent(ContentHandle.create(contentEntity.contentBytes));

app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesCanonicalHashUpgrader.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.apicurio.registry.content.ContentHandle;
2020
import io.apicurio.registry.content.canon.ContentCanonicalizer;
21+
import io.apicurio.registry.storage.RegistryStorage;
2122
import io.apicurio.registry.storage.impl.sql.IDbUpgrader;
2223
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
2324
import io.apicurio.registry.storage.impl.sql.jdb.RowMapper;
@@ -63,12 +64,15 @@ public void upgrade(Handle dbHandle) throws Exception {
6364
try (stream) {
6465
stream.forEach(entity -> updateHash(entity, dbHandle));
6566
}
67+
}
6668

69+
@Override
70+
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception {
71+
this.upgrade(dbHandle);
6772
}
6873

6974
private void updateHash(TypeContentEntity typeContentEntity, Handle dbHandle) {
7075
try {
71-
7276
String canonicalContentHash;
7377
if (typeContentEntity.contentEntity.serializedReferences != null) {
7478
byte[] referencesBytes = typeContentEntity.contentEntity.serializedReferences.getBytes(StandardCharsets.UTF_8);

app/src/main/java/io/apicurio/registry/storage/impl/sql/upgrader/ReferencesContentHashUpgrader.java

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.apicurio.registry.storage.impl.sql.upgrader;
1818

19+
import io.apicurio.registry.storage.RegistryStorage;
1920
import io.apicurio.registry.storage.impl.sql.IDbUpgrader;
2021
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
2122
import io.apicurio.registry.storage.impl.sql.mappers.ContentEntityMapper;
@@ -56,7 +57,11 @@ public void upgrade(Handle dbHandle) throws Exception {
5657
try (stream) {
5758
stream.forEach(entity -> updateHash(entity, dbHandle));
5859
}
60+
}
5961

62+
@Override
63+
public void upgrade(RegistryStorage registryStorage, Handle dbHandle) throws Exception {
64+
this.upgrade(dbHandle);
6065
}
6166

6267
private void updateHash(ContentEntity contentEntity, Handle dbHandle) {
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
14
1+
15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- *********************************************************************
2+
-- DDL for the Apicurio Registry - Database: H2
3+
-- Upgrades the DB schema from version 14 to version 15.
4+
-- *********************************************************************
5+
6+
UPDATE apicurio SET prop_value = 15 WHERE prop_name = 'db_version';
7+
8+
UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.AvroCanonicalHashUpgrader;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- *********************************************************************
2+
-- DDL for the Apicurio Registry - Database: PostgreSQL
3+
-- Upgrades the DB schema from version 14 to version 15.
4+
-- *********************************************************************
5+
6+
UPDATE apicurio SET prop_value = 15 WHERE prop_name = 'db_version';
7+
8+
UPGRADER:io.apicurio.registry.storage.impl.sql.upgrader.AvroCanonicalHashUpgrader;

0 commit comments

Comments
 (0)