Skip to content

Commit ee43553

Browse files
authored
Port implement dereference proto (#4636)
* Implement protobuf dereferencing for dependencies within the same package * Add dereferencing notes to docs
1 parent 6616662 commit ee43553

File tree

5 files changed

+110
-141
lines changed

5 files changed

+110
-141
lines changed

app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java

+39-83
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
11
package io.apicurio.registry.noprofile.serde;
22

3-
import static io.apicurio.registry.utils.tests.TestUtils.waitForSchema;
4-
import static org.junit.jupiter.api.Assertions.assertEquals;
5-
6-
import java.io.IOException;
7-
import java.util.HashMap;
8-
import java.util.Map;
9-
10-
import org.apache.kafka.common.serialization.Deserializer;
11-
import org.apache.kafka.common.serialization.Serializer;
12-
import org.junit.jupiter.api.Assertions;
13-
import org.junit.jupiter.api.BeforeEach;
14-
import org.junit.jupiter.api.Test;
15-
163
import com.google.protobuf.Descriptors;
174
import com.google.protobuf.DynamicMessage;
18-
195
import io.api.sample.TableNotification;
206
import io.apicurio.registry.AbstractResourceTestBase;
217
import io.apicurio.registry.client.auth.VertXAuthFactory;
8+
import io.apicurio.registry.resolver.SchemaResolverConfig;
229
import io.apicurio.registry.rest.client.RegistryClient;
2310
import io.apicurio.registry.rest.client.models.VersionMetaData;
2411
import io.apicurio.registry.serde.SerdeConfig;
@@ -29,13 +16,23 @@
2916
import io.apicurio.registry.utils.tests.TestUtils;
3017
import io.kiota.http.vertx.VertXRequestAdapter;
3118
import io.quarkus.test.junit.QuarkusTest;
19+
import org.apache.kafka.common.serialization.Deserializer;
20+
import org.apache.kafka.common.serialization.Serializer;
21+
import org.junit.jupiter.api.Assertions;
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.io.IOException;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static io.apicurio.registry.utils.tests.TestUtils.waitForSchema;
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
3231

3332
@QuarkusTest
3433
public class ProtobufSerdeTest extends AbstractResourceTestBase {
3534

3635
private RegistryClient restClient;
37-
// Isolating this test in it's own groupId:
38-
// io.apicurio.registry.rest.client.exception.ArtifactAlreadyExistsException: An artifact with ID 'google/protobuf/timestamp.proto' in group 'default' already exists.
3936
private String groupId = "protobuf-serde-test";
4037

4138
@BeforeEach
@@ -45,72 +42,11 @@ public void createIsolatedClient() {
4542
restClient = new RegistryClient(adapter);
4643
}
4744

48-
//FIXME
49-
//test not working because of getArtifactVersionMetaDataByContent does not find the schema for somereason
50-
// @Test
51-
// public void testConfiguration() throws Exception {
52-
//
53-
// TestCmmn.UUID record = TestCmmn.UUID.newBuilder().setLsb(2).setMsb(1).build();
54-
// byte[] schema = toSchemaProto(record.getDescriptorForType().getFile()).toByteArray();
55-
//// String schema = IoUtil.toString(toSchemaProto(record));
56-
//
57-
// String groupId = TestUtils.generateGroupId();
58-
// String topic = generateArtifactId();
59-
//
60-
// createArtifact(groupId, topic, ArtifactType.PROTOBUF_FD, IoUtil.toString(schema));
61-
//
62-
// System.out.println("artifaaact " + clientV2.listArtifactsInGroup(groupId).getArtifacts().get(0).getId());
63-
//
64-
// Map<String, Object> config = new HashMap<>();
65-
// config.put(SerdeConfigKeys.REGISTRY_URL, TestUtils.getRegistryV2ApiUrl());
66-
// config.put(SerdeConfigKeys.ARTIFACT_GROUP_ID, groupId);
67-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, new SimpleTopicIdStrategy<>());
68-
// Serializer<TestCmmn.UUID> serializer = new ProtobufKafkaSerializer<>();
69-
// serializer.configure(config, true);
70-
//
71-
// byte[] bytes = serializer.serialize(topic, record);
72-
//
73-
// Map<String, Object> deserializerConfig = new HashMap<>();
74-
// deserializerConfig.put(SerdeConfigKeys.REGISTRY_URL, TestUtils.getRegistryV2ApiUrl());
75-
// Deserializer<DynamicMessage> deserializer = new ProtobufKafkaDeserializer();
76-
// deserializer.configure(deserializerConfig, true);
77-
//
78-
// DynamicMessage deserializedRecord = deserializer.deserialize(topic, bytes);
79-
// assertProtobufEquals(record, deserializedRecord);
80-
//
81-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, SimpleTopicIdStrategy.class);
82-
// serializer.configure(config, true);
83-
// bytes = serializer.serialize(topic, record);
84-
//
85-
// deserializer.configure(deserializerConfig, true);
86-
// deserializedRecord = deserializer.deserialize(topic, bytes);
87-
// assertProtobufEquals(record, deserializedRecord);
88-
//
89-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, SimpleTopicIdStrategy.class.getName());
90-
// serializer.configure(config, true);
91-
// bytes = serializer.serialize(topic, record);
92-
// deserializer.configure(deserializerConfig, true);
93-
// deserializedRecord = deserializer.deserialize(topic, bytes);
94-
// assertProtobufEquals(record, deserializedRecord);
95-
//
96-
// serializer.close();
97-
// deserializer.close();
98-
// }
99-
//
100-
// private Serde.Schema toSchemaProto(Descriptors.FileDescriptor file) {
101-
// Serde.Schema.Builder b = Serde.Schema.newBuilder();
102-
// b.setFile(file.toProto());
103-
// for (Descriptors.FileDescriptor d : file.getDependencies()) {
104-
// b.addImport(toSchemaProto(d));
105-
// }
106-
// return b.build();
107-
// }
108-
10945
@SuppressWarnings({ "rawtypes", "unchecked" })
11046
@Test
11147
public void testProto() throws Exception {
11248
try (Serializer<TestCmmn.UUID> serializer = new ProtobufKafkaSerializer<>(restClient);
113-
Deserializer<DynamicMessage> deserializer = new ProtobufKafkaDeserializer(restClient)) {
49+
Deserializer<DynamicMessage> deserializer = new ProtobufKafkaDeserializer(restClient)) {
11450

11551
Map<String, Object> config = new HashMap<>();
11652
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class);
@@ -129,11 +65,13 @@ public void testProto() throws Exception {
12965
waitForSchema(globalId -> {
13066
try {
13167
if (restClient.ids().globalIds().byGlobalId(globalId).get().readAllBytes().length > 0) {
132-
VersionMetaData artifactMetadata = restClient.groups().byGroupId(groupId).artifacts().byArtifactId(topic).versions().byVersionExpression("branch=latest").get();
68+
VersionMetaData artifactMetadata = restClient.groups().byGroupId(groupId).artifacts().byArtifactId(topic).versions()
69+
.byVersionExpression("branch=latest").get();
13370
assertEquals(globalId, artifactMetadata.getGlobalId());
13471
return true;
13572
}
136-
} catch (IOException e) {
73+
}
74+
catch (IOException e) {
13775
throw new RuntimeException(e);
13876
}
13977
return false;
@@ -147,9 +85,8 @@ public void testProto() throws Exception {
14785
@SuppressWarnings({ "rawtypes", "unchecked" })
14886
@Test
14987
public void testProtobufSchemaWithReferences() {
150-
15188
try (Serializer<TableNotification> serializer = new ProtobufKafkaSerializer<>(restClient);
152-
Deserializer<TableNotification> deserializer = new ProtobufKafkaDeserializer(restClient)) {
89+
Deserializer<TableNotification> deserializer = new ProtobufKafkaDeserializer(restClient)) {
15390

15491
Map<String, Object> config = new HashMap<>();
15592
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class);
@@ -159,11 +96,30 @@ public void testProtobufSchemaWithReferences() {
15996
serializer.configure(config, false);
16097
deserializer.configure(config, false);
16198

162-
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
99+
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
163100
deserializer.deserialize("test", data);
164101

165102
}
103+
}
104+
105+
@SuppressWarnings({ "rawtypes", "unchecked" })
106+
@Test
107+
public void testProtobufSchemaWithReferencesDereferenced() {
108+
try (Serializer<TableNotification> serializer = new ProtobufKafkaSerializer<>(restClient);
109+
Deserializer<TableNotification> deserializer = new ProtobufKafkaDeserializer(restClient)) {
166110

111+
Map<String, Object> config = new HashMap<>();
112+
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class);
113+
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
114+
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
115+
config.put(SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID, groupId);
116+
config.put(SchemaResolverConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
117+
serializer.configure(config, false);
118+
deserializer.configure(config, false);
119+
120+
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
121+
deserializer.deserialize("test", data);
122+
}
167123
}
168124

169125
private void assertProtobufEquals(TestCmmn.UUID record, DynamicMessage dm) {

docs/modules/ROOT/partials/getting-started/proc-managing-artifact-references-using-rest-api.adoc

+9
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ $ curl -H "Authorization: Bearer $ACCESS_TOKEN" MY-REGISTRY-URL/apis/registry/v2
146146
[{"groupId":"my-group","artifactId":"ItemId","version":"1","name":"com.example.common.ItemId"}]
147147
----
148148

149+
**Dereference**
150+
151+
There are some situations where having an artifact's content with the referenced content inlined might be helpful. For those situations, the Core Registry API v2 supports the _dereference parameter_ in certain operations.
152+
153+
This support is currently implemented for Avro and Protobuf when the parameter is present in a particular API operation. The parameter is not supported in any other schema type.
154+
155+
#In Protobuf dereferencing content is only supported when all the schemas in the try belong to the same package.#
156+
157+
149158
[role="_additional-resources"]
150159
.Additional resources
151160
* For more details, see the {registry-rest-api}.

schema-util/protobuf/src/main/java/io/apicurio/registry/content/dereference/ProtobufDereferencer.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,43 @@
11
package io.apicurio.registry.content.dereference;
22

3+
import com.google.protobuf.DescriptorProtos;
4+
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
35
import io.apicurio.registry.content.ContentHandle;
6+
import io.apicurio.registry.utils.protobuf.schema.FileDescriptorUtils;
7+
import io.apicurio.registry.utils.protobuf.schema.ProtobufFile;
48

9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.util.Collections;
512
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.stream.Collectors;
615

716
public class ProtobufDereferencer implements ContentDereferencer {
817

918
@Override
1019
public ContentHandle dereference(ContentHandle content, Map<String, ContentHandle> resolvedReferences) {
11-
throw new DereferencingNotSupportedException("Content dereferencing is not supported for Protobuf");
20+
final ProtoFileElement protoFileElement = ProtobufFile.toProtoFileElement(content.content());
21+
final Map<String, String> schemaDefs = Collections.unmodifiableMap(resolvedReferences.entrySet()
22+
.stream()
23+
.collect(Collectors.toMap(
24+
Map.Entry::getKey,
25+
e -> e.getValue().content()
26+
)));
27+
28+
DescriptorProtos.FileDescriptorProto fileDescriptorProto = FileDescriptorUtils.toFileDescriptorProto(content.content(), FileDescriptorUtils.firstMessage(protoFileElement).getName(), Optional.ofNullable(protoFileElement.getPackageName()), schemaDefs);
29+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
30+
31+
try {
32+
fileDescriptorProto.writeTo(outputStream);
33+
} catch (IOException e) {
34+
throw new RuntimeException(e);
35+
}
36+
37+
//Dereference returns the whole file descriptor bytes representing the main protobuf schema with the required dependencies.
38+
return ContentHandle.create(outputStream.toByteArray());
1239
}
13-
40+
1441
/**
1542
* @see io.apicurio.registry.content.dereference.ContentDereferencer#rewriteReferences(io.apicurio.registry.content.ContentHandle, java.util.Map)
1643
*/

serdes/protobuf-serde/src/main/java/io/apicurio/registry/serde/protobuf/ProtobufSchemaParser.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.apicurio.registry.serde.protobuf;
22

3+
import com.google.protobuf.DescriptorProtos;
34
import com.google.protobuf.Descriptors;
45
import com.google.protobuf.Descriptors.DescriptorValidationException;
56
import com.google.protobuf.Descriptors.FileDescriptor;
7+
import com.google.protobuf.InvalidProtocolBufferException;
68
import com.google.protobuf.Message;
79
import com.squareup.wire.schema.internal.parser.MessageElement;
810
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
@@ -61,6 +63,20 @@ public ProtobufSchema parseSchema(byte[] rawSchema, Map<String, ParsedSchema<Pro
6163
}
6264
} catch (DescriptorValidationException pe) {
6365
throw new SerializationException("Error parsing protobuf schema ", pe);
66+
} catch (IllegalStateException illegalStateException) {
67+
//If qe get here the server likely returned the full descriptor, try to parse it.
68+
return parseDescriptor(rawSchema);
69+
}
70+
}
71+
72+
private ProtobufSchema parseDescriptor(byte[] rawSchema) {
73+
//Try to parse the binary format, in case the server has returned the descriptor format.
74+
try {
75+
DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto.parseFrom(rawSchema);
76+
ProtoFileElement protoFileElement = FileDescriptorUtils.fileDescriptorToProtoFile(fileDescriptorProto);
77+
return new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(fileDescriptorProto), protoFileElement);
78+
} catch (InvalidProtocolBufferException | DescriptorValidationException e) {
79+
throw new RuntimeException(e);
6480
}
6581
}
6682

@@ -99,7 +115,6 @@ public ParsedSchema<ProtobufSchema> getSchemaFromData(Record<U> data) {
99115

100116
@Override
101117
public ParsedSchema<ProtobufSchema> getSchemaFromData(Record<U> data, boolean dereference) {
102-
//dereferencing not supported, just extract with references
103118
return getSchemaFromData(data);
104119
}
105120

0 commit comments

Comments
 (0)