Skip to content

Commit 9ccf356

Browse files
authored
Add support for autoregister schemas in the json serde (#4127) (#4150)
1 parent 78ab748 commit 9ccf356

File tree

9 files changed

+114
-19
lines changed

9 files changed

+114
-19
lines changed

app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,54 @@ public void testJsonSchemaSerde() throws Exception {
116116
}
117117
}
118118

119+
@Test
120+
public void testJsonSchemaSerdeAutoRegister() throws Exception {
121+
String groupId = TestUtils.generateGroupId();
122+
String artifactId = generateArtifactId();
123+
124+
Person person = new Person("Carles", "Arnal", 30);
125+
126+
try (JsonSchemaKafkaSerializer<Person> serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
127+
Deserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
128+
129+
Map<String, Object> config = new HashMap<>();
130+
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
131+
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
132+
config.put(SerdeConfig.SCHEMA_LOCATION, "/io/apicurio/registry/util/json-schema.json");
133+
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
134+
serializer.configure(config, false);
135+
136+
deserializer.configure(Collections.emptyMap(), false);
137+
138+
Headers headers = new RecordHeaders();
139+
byte[] bytes = serializer.serialize(artifactId, headers, person);
140+
141+
person = deserializer.deserialize(artifactId, headers, bytes);
142+
143+
Assertions.assertEquals("Carles", person.getFirstName());
144+
Assertions.assertEquals("Arnal", person.getLastName());
145+
Assertions.assertEquals(30, person.getAge());
146+
147+
person.setAge(-1);
148+
149+
try {
150+
serializer.serialize(artifactId, new RecordHeaders(), person);
151+
Assertions.fail();
152+
} catch (Exception ignored) {
153+
}
154+
155+
serializer.setValidationEnabled(false); // disable validation
156+
// create invalid person bytes
157+
bytes = serializer.serialize(artifactId, headers, person);
158+
159+
try {
160+
deserializer.deserialize(artifactId, headers, bytes);
161+
Assertions.fail();
162+
} catch (Exception ignored) {
163+
}
164+
}
165+
}
166+
119167
@Test
120168
public void testJsonSchemaSerdeHeaders() throws Exception {
121169
InputStream jsonSchema = getClass().getResourceAsStream("/io/apicurio/registry/util/json-schema.json");

schema-resolver/src/main/java/io/apicurio/registry/resolver/DefaultSchemaResolver.java

+17-10
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,23 @@ private Optional<SchemaLookupResult<S>> getSchemaFromCache(ArtifactReference art
9191

9292
private SchemaLookupResult<S> getSchemaFromRegistry(ParsedSchema<S> parsedSchema, Record<T> data, ArtifactReference artifactReference) {
9393

94-
if (autoCreateArtifact && schemaParser.supportsExtractSchemaFromData()) {
95-
if (parsedSchema == null) {
96-
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
97-
}
98-
99-
if (parsedSchema.hasReferences()) {
100-
//List of references lookup, to be used to create the references for the artifact
101-
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
102-
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
103-
} else {
94+
if (autoCreateArtifact) {
95+
96+
if (schemaParser.supportsExtractSchemaFromData()) {
97+
98+
if (parsedSchema == null) {
99+
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
100+
}
101+
102+
if (parsedSchema.hasReferences()) {
103+
//List of references lookup, to be used to create the references for the artifact
104+
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
105+
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
106+
} else {
107+
return handleAutoCreateArtifact(parsedSchema, artifactReference);
108+
}
109+
} else if (config.getExplicitSchemaLocation() != null && schemaParser.supportsGetSchemaFromLocation()) {
110+
parsedSchema = schemaParser.getSchemaFromLocation(config.getExplicitSchemaLocation());
104111
return handleAutoCreateArtifact(parsedSchema, artifactReference);
105112
}
106113
}

schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaParser.java

+17
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,27 @@ public interface SchemaParser<S, U> {
3131
*/
3232
ParsedSchema<S> getSchemaFromData(Record<U> data, boolean dereference);
3333

34+
/**
35+
* In some artifact types, such as Json, we allow defining a local place for the schema.
36+
*
37+
* @param location the schema location
38+
* @return the ParsedSchema, containing both the raw schema (bytes) and the parsed schema. Can be null.
39+
*/
40+
default ParsedSchema<S> getSchemaFromLocation(String location) {
41+
return null;
42+
}
43+
3444
/**
3545
* Flag that indicates if {@link SchemaParser#getSchemaFromData(Record)} is implemented or not.
3646
*/
3747
default boolean supportsExtractSchemaFromData() {
3848
return true;
3949
}
50+
51+
/**
52+
* Flag that indicates if {@link SchemaParser#getSchemaFromLocation(String)} is implemented or not.
53+
*/
54+
default boolean supportsGetSchemaFromLocation() {
55+
return false;
56+
}
4057
}

schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public class SchemaResolverConfig {
7373
*/
7474
public static final String EXPLICIT_ARTIFACT_ID = "apicurio.registry.artifact.artifact-id";
7575

76+
/**
77+
* Only applicable for serializers
78+
* Optional, set explicitly the schema location in the classpath for the schema to be used for serializing the data.
79+
*/
80+
public static final String SCHEMA_LOCATION = "apicurio.registry.artifact.schema.location";
81+
7682
/**
7783
* Only applicable for serializers
7884
* Optional, set explicitly the version used for querying/creating an artifact.

schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ public String getExplicitArtifactId() {
112112
return getString(EXPLICIT_ARTIFACT_ID);
113113
}
114114

115+
public String getExplicitSchemaLocation() {
116+
return getString(SCHEMA_LOCATION);
117+
}
118+
115119
public String getExplicitArtifactVersion() {
116120
return getString(EXPLICIT_ARTIFACT_VERSION);
117121
}

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaSerializer.java

-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ public SchemaParser<JsonSchema, T> schemaParser() {
104104
*/
105105
@Override
106106
protected void serializeData(ParsedSchema<JsonSchema> schema, T data, OutputStream out) throws IOException {
107-
//TODO add property to specify a jsonschema to allow for auto-register json schemas
108107
serializeData(null, schema, data, out);
109108
}
110109

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaSerializerConfig.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
public class JsonSchemaKafkaSerializerConfig extends BaseKafkaSerDeConfig {
1414

1515
private static ConfigDef configDef() {
16-
ConfigDef configDef = new ConfigDef()
16+
return new ConfigDef()
1717
.define(VALIDATION_ENABLED, Type.BOOLEAN, VALIDATION_ENABLED_DEFAULT, Importance.MEDIUM, "Whether to validate the data against the json schema");
18-
return configDef;
1918
}
2019

2120
/**
@@ -30,5 +29,4 @@ public JsonSchemaKafkaSerializerConfig(Map<?, ?> originals) {
3029
public boolean validationEnabled() {
3130
return this.getBoolean(VALIDATION_ENABLED);
3231
}
33-
3432
}

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaParser.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.apicurio.registry.serde.jsonschema;
22

33
import io.apicurio.registry.resolver.ParsedSchema;
4+
import io.apicurio.registry.resolver.ParsedSchemaImpl;
45
import io.apicurio.registry.resolver.SchemaParser;
56
import io.apicurio.registry.resolver.data.Record;
67
import io.apicurio.registry.types.ArtifactType;
@@ -28,11 +29,6 @@ public JsonSchema parseSchema(byte[] rawSchema, Map<String, ParsedSchema<JsonSch
2829
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getParsedSchema())), 0);
2930
}
3031

31-
//TODO we could implement some way of providing the jsonschema beforehand:
32-
// - via annotation in the object being serialized
33-
// - via config property
34-
//if we do this users will be able to automatically registering the schema when using this serde
35-
3632
/**
3733
* @see io.apicurio.registry.resolver.SchemaParser#getSchemaFromData(java.lang.Object)
3834
*/
@@ -48,8 +44,22 @@ public ParsedSchema<JsonSchema> getSchemaFromData(Record<T> data, boolean derefe
4844
return null;
4945
}
5046

47+
@Override
48+
public ParsedSchema<JsonSchema> getSchemaFromLocation(String location) {
49+
String rawSchema = IoUtil.toString(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
50+
51+
return new ParsedSchemaImpl<JsonSchema>()
52+
.setParsedSchema(new JsonSchema(rawSchema))
53+
.setRawSchema(rawSchema.getBytes());
54+
}
55+
5156
@Override
5257
public boolean supportsExtractSchemaFromData() {
5358
return false;
5459
}
60+
61+
@Override
62+
public boolean supportsGetSchemaFromLocation() {
63+
return true;
64+
}
5565
}

serdes/serde-common/src/main/java/io/apicurio/registry/serde/SerdeConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public class SerdeConfig {
7575
*/
7676
public static final String EXPLICIT_ARTIFACT_ID = SchemaResolverConfig.EXPLICIT_ARTIFACT_ID;
7777

78+
/**
79+
* Only applicable for serializers
80+
* Optional, set explicitly the schema used for serialization.
81+
*/
82+
public static final String SCHEMA_LOCATION = SchemaResolverConfig.SCHEMA_LOCATION;
83+
7884
/**
7985
* Only applicable for serializers
8086
* Optional, set explicitly the version used for querying/creating an artifact.

0 commit comments

Comments
 (0)