Skip to content

Commit ffda747

Browse files
authored
Implement derefence proto (#4272)
* Implement protobuf dereferencing for dependencies within the same package * Add dereferencing notes to docs
1 parent 0aa2d7e commit ffda747

File tree

5 files changed

+107
-140
lines changed

5 files changed

+107
-140
lines changed

app/src/test/java/io/apicurio/registry/noprofile/serde/ProtobufSerdeTest.java

+36-81
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,32 @@
1616

1717
package io.apicurio.registry.noprofile.serde;
1818

19-
import static io.apicurio.registry.utils.tests.TestUtils.waitForSchema;
20-
import static org.junit.jupiter.api.Assertions.assertEquals;
21-
22-
import java.util.HashMap;
23-
import java.util.Map;
24-
25-
import io.apicurio.registry.serde.SerdeConfig;
26-
import io.api.sample.TableNotification;
27-
import org.apache.kafka.common.serialization.Deserializer;
28-
import org.apache.kafka.common.serialization.Serializer;
29-
import org.junit.jupiter.api.Assertions;
30-
import org.junit.jupiter.api.BeforeEach;
31-
import org.junit.jupiter.api.Test;
32-
3319
import com.google.protobuf.Descriptors;
3420
import com.google.protobuf.DynamicMessage;
21+
import io.api.sample.TableNotification;
3522
import io.apicurio.registry.AbstractResourceTestBase;
23+
import io.apicurio.registry.resolver.SchemaResolverConfig;
3624
import io.apicurio.registry.rest.client.RegistryClient;
3725
import io.apicurio.registry.rest.client.RegistryClientFactory;
3826
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
27+
import io.apicurio.registry.serde.SerdeConfig;
3928
import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer;
4029
import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer;
4130
import io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy;
4231
import io.apicurio.registry.support.TestCmmn;
4332
import io.apicurio.registry.utils.tests.TestUtils;
4433
import io.quarkus.test.junit.QuarkusTest;
34+
import org.apache.kafka.common.serialization.Deserializer;
35+
import org.apache.kafka.common.serialization.Serializer;
36+
import org.junit.jupiter.api.Assertions;
37+
import org.junit.jupiter.api.BeforeEach;
38+
import org.junit.jupiter.api.Test;
39+
40+
import java.util.HashMap;
41+
import java.util.Map;
42+
43+
import static io.apicurio.registry.utils.tests.TestUtils.waitForSchema;
44+
import static org.junit.jupiter.api.Assertions.assertEquals;
4545

4646
/**
4747
* @author Fabian Martinez
@@ -50,77 +50,14 @@
5050
public class ProtobufSerdeTest extends AbstractResourceTestBase {
5151

5252
private RegistryClient restClient;
53-
// Isolating this test in it's own groupId:
54-
// io.apicurio.registry.rest.client.exception.ArtifactAlreadyExistsException: An artifact with ID 'google/protobuf/timestamp.proto' in group 'default' already exists.
5553
private String groupId = "protobuf-serde-test";
5654

5755
@BeforeEach
5856
public void createIsolatedClient() {
5957
restClient = RegistryClientFactory.create(TestUtils.getRegistryV2ApiUrl(testPort));
6058
}
6159

62-
//FIXME
63-
//test not working because of getArtifactVersionMetaDataByContent does not find the schema for somereason
64-
// @Test
65-
// public void testConfiguration() throws Exception {
66-
//
67-
// TestCmmn.UUID record = TestCmmn.UUID.newBuilder().setLsb(2).setMsb(1).build();
68-
// byte[] schema = toSchemaProto(record.getDescriptorForType().getFile()).toByteArray();
69-
//// String schema = IoUtil.toString(toSchemaProto(record));
70-
//
71-
// String groupId = TestUtils.generateGroupId();
72-
// String topic = generateArtifactId();
73-
//
74-
// createArtifact(groupId, topic, ArtifactType.PROTOBUF_FD, IoUtil.toString(schema));
75-
//
76-
// System.out.println("artifaaact " + clientV2.listArtifactsInGroup(groupId).getArtifacts().get(0).getId());
77-
//
78-
// Map<String, Object> config = new HashMap<>();
79-
// config.put(SerdeConfigKeys.REGISTRY_URL, TestUtils.getRegistryV2ApiUrl());
80-
// config.put(SerdeConfigKeys.ARTIFACT_GROUP_ID, groupId);
81-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, new SimpleTopicIdStrategy<>());
82-
// Serializer<TestCmmn.UUID> serializer = new ProtobufKafkaSerializer<>();
83-
// serializer.configure(config, true);
84-
//
85-
// byte[] bytes = serializer.serialize(topic, record);
86-
//
87-
// Map<String, Object> deserializerConfig = new HashMap<>();
88-
// deserializerConfig.put(SerdeConfigKeys.REGISTRY_URL, TestUtils.getRegistryV2ApiUrl());
89-
// Deserializer<DynamicMessage> deserializer = new ProtobufKafkaDeserializer();
90-
// deserializer.configure(deserializerConfig, true);
91-
//
92-
// DynamicMessage deserializedRecord = deserializer.deserialize(topic, bytes);
93-
// assertProtobufEquals(record, deserializedRecord);
94-
//
95-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, SimpleTopicIdStrategy.class);
96-
// serializer.configure(config, true);
97-
// bytes = serializer.serialize(topic, record);
98-
//
99-
// deserializer.configure(deserializerConfig, true);
100-
// deserializedRecord = deserializer.deserialize(topic, bytes);
101-
// assertProtobufEquals(record, deserializedRecord);
102-
//
103-
// config.put(SerdeConfigKeys.ARTIFACT_ID_STRATEGY, SimpleTopicIdStrategy.class.getName());
104-
// serializer.configure(config, true);
105-
// bytes = serializer.serialize(topic, record);
106-
// deserializer.configure(deserializerConfig, true);
107-
// deserializedRecord = deserializer.deserialize(topic, bytes);
108-
// assertProtobufEquals(record, deserializedRecord);
109-
//
110-
// serializer.close();
111-
// deserializer.close();
112-
// }
113-
//
114-
// private Serde.Schema toSchemaProto(Descriptors.FileDescriptor file) {
115-
// Serde.Schema.Builder b = Serde.Schema.newBuilder();
116-
// b.setFile(file.toProto());
117-
// for (Descriptors.FileDescriptor d : file.getDependencies()) {
118-
// b.addImport(toSchemaProto(d));
119-
// }
120-
// return b.build();
121-
// }
122-
123-
@SuppressWarnings({ "rawtypes", "unchecked" })
60+
@SuppressWarnings({"rawtypes", "unchecked"})
12461
@Test
12562
public void testProto() throws Exception {
12663
try (Serializer<TestCmmn.UUID> serializer = new ProtobufKafkaSerializer<>(restClient);
@@ -154,10 +91,9 @@ public void testProto() throws Exception {
15491
}
15592
}
15693

157-
@SuppressWarnings({ "rawtypes", "unchecked" })
94+
@SuppressWarnings({"rawtypes", "unchecked"})
15895
@Test
15996
public void testProtobufSchemaWithReferences() {
160-
16197
try (Serializer<TableNotification> serializer = new ProtobufKafkaSerializer<>(restClient);
16298
Deserializer<TableNotification> deserializer = new ProtobufKafkaDeserializer(restClient)) {
16399

@@ -169,11 +105,30 @@ public void testProtobufSchemaWithReferences() {
169105
serializer.configure(config, false);
170106
deserializer.configure(config, false);
171107

172-
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
108+
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
173109
deserializer.deserialize("test", data);
174110

175111
}
112+
}
113+
114+
@SuppressWarnings({"rawtypes", "unchecked"})
115+
@Test
116+
public void testProtobufSchemaWithReferencesDereferenced() {
117+
try (Serializer<TableNotification> serializer = new ProtobufKafkaSerializer<>(restClient);
118+
Deserializer<TableNotification> deserializer = new ProtobufKafkaDeserializer(restClient)) {
176119

120+
Map<String, Object> config = new HashMap<>();
121+
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class);
122+
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
123+
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
124+
config.put(SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID, groupId);
125+
config.put(SchemaResolverConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
126+
serializer.configure(config, false);
127+
deserializer.configure(config, false);
128+
129+
byte[] data = serializer.serialize("test", TableNotification.newBuilder().build());
130+
deserializer.deserialize("test", data);
131+
}
177132
}
178133

179134
private void assertProtobufEquals(TestCmmn.UUID record, DynamicMessage dm) {

docs/modules/ROOT/partials/getting-started/proc-managing-artifact-references-using-rest-api.adoc

+9
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ $ curl -H "Authorization: Bearer $ACCESS_TOKEN" MY-REGISTRY-URL/apis/registry/v2
146146
[{"groupId":"my-group","artifactId":"ItemId","version":"1","name":"com.example.common.ItemId"}]
147147
----
148148

149+
**Dereference**
150+
151+
There are some situations where having an artifact's content with the referenced content inlined might be helpful. For those situations, the Core Registry API v2 supports the _dereference parameter_ in certain operations.
152+
153+
This support is currently implemented for Avro and Protobuf when the parameter is present in a particular API operation. The parameter is not supported in any other schema type.
154+
155+
#In Protobuf dereferencing content is only supported when all the schemas in the try belong to the same package.#
156+
157+
149158
[role="_additional-resources"]
150159
.Additional resources
151160
* For more details, see the {registry-rest-api}.

schema-util/protobuf/src/main/java/io/apicurio/registry/content/dereference/ProtobufDereferencer.java

+29-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,18 @@
1616

1717
package io.apicurio.registry.content.dereference;
1818

19+
import com.google.protobuf.DescriptorProtos;
20+
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
1921
import io.apicurio.registry.content.ContentHandle;
22+
import io.apicurio.registry.utils.protobuf.schema.FileDescriptorUtils;
23+
import io.apicurio.registry.utils.protobuf.schema.ProtobufFile;
2024

25+
import java.io.ByteArrayOutputStream;
26+
import java.io.IOException;
27+
import java.util.Collections;
2128
import java.util.Map;
29+
import java.util.Optional;
30+
import java.util.stream.Collectors;
2231

2332
/**
2433
* @author carnalca@redhat.com
@@ -27,15 +36,32 @@ public class ProtobufDereferencer implements ContentDereferencer {
2736

2837
@Override
2938
public ContentHandle dereference(ContentHandle content, Map<String, ContentHandle> resolvedReferences) {
30-
throw new DereferencingNotSupportedException("Content dereferencing is not supported for Protobuf");
39+
final ProtoFileElement protoFileElement = ProtobufFile.toProtoFileElement(content.content());
40+
final Map<String, String> schemaDefs = Collections.unmodifiableMap(resolvedReferences.entrySet()
41+
.stream()
42+
.collect(Collectors.toMap(
43+
Map.Entry::getKey,
44+
e -> e.getValue().content()
45+
)));
46+
47+
DescriptorProtos.FileDescriptorProto fileDescriptorProto = FileDescriptorUtils.toFileDescriptorProto(content.content(), FileDescriptorUtils.firstMessage(protoFileElement).getName(), Optional.ofNullable(protoFileElement.getPackageName()), schemaDefs);
48+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
49+
50+
try {
51+
fileDescriptorProto.writeTo(outputStream);
52+
} catch (IOException e) {
53+
throw new RuntimeException(e);
54+
}
55+
56+
//Dereference returns the whole file descriptor bytes representing the main protobuf schema with the required dependencies.
57+
return ContentHandle.create(outputStream.toByteArray());
3158
}
32-
59+
3360
/**
3461
* @see io.apicurio.registry.content.dereference.ContentDereferencer#rewriteReferences(io.apicurio.registry.content.ContentHandle, java.util.Map)
3562
*/
3663
@Override
3764
public ContentHandle rewriteReferences(ContentHandle content, Map<String, String> resolvedReferenceUrls) {
38-
// TODO not yet implemented (perhaps cannot be implemented?)
3965
return content;
4066
}
4167
}

serdes/protobuf-serde/src/main/java/io/apicurio/registry/serde/protobuf/ProtobufSchemaParser.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package io.apicurio.registry.serde.protobuf;
1818

19+
import com.google.protobuf.DescriptorProtos;
1920
import com.google.protobuf.Descriptors;
2021
import com.google.protobuf.Descriptors.DescriptorValidationException;
2122
import com.google.protobuf.Descriptors.FileDescriptor;
23+
import com.google.protobuf.InvalidProtocolBufferException;
2224
import com.google.protobuf.Message;
2325
import com.squareup.wire.schema.internal.parser.MessageElement;
2426
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
@@ -80,6 +82,20 @@ public ProtobufSchema parseSchema(byte[] rawSchema, Map<String, ParsedSchema<Pro
8082
}
8183
} catch (DescriptorValidationException pe) {
8284
throw new SerializationException("Error parsing protobuf schema ", pe);
85+
} catch (IllegalStateException illegalStateException) {
86+
//If qe get here the server likely returned the full descriptor, try to parse it.
87+
return parseDescriptor(rawSchema);
88+
}
89+
}
90+
91+
private ProtobufSchema parseDescriptor(byte[] rawSchema) {
92+
//Try to parse the binary format, in case the server has returned the descriptor format.
93+
try {
94+
DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto.parseFrom(rawSchema);
95+
ProtoFileElement protoFileElement = FileDescriptorUtils.fileDescriptorToProtoFile(fileDescriptorProto);
96+
return new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(fileDescriptorProto), protoFileElement);
97+
} catch (InvalidProtocolBufferException | DescriptorValidationException e) {
98+
throw new RuntimeException(e);
8399
}
84100
}
85101

@@ -118,7 +134,6 @@ public ParsedSchema<ProtobufSchema> getSchemaFromData(Record<U> data) {
118134

119135
@Override
120136
public ParsedSchema<ProtobufSchema> getSchemaFromData(Record<U> data, boolean dereference) {
121-
//dereferencing not supported, just extract with references
122137
return getSchemaFromData(data);
123138
}
124139

0 commit comments

Comments
 (0)