From 1420c684ded258d4dd9d4b9907d8d7bcc09978a4 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Wed, 20 Dec 2023 18:57:32 +0900 Subject: [PATCH] Handle situations with complex schemas in json converter (#4144) (#4151) --- .../tests/converters/RegistryConverterIT.java | 112 +++++++++++++++++- .../utils/converter/ExtJsonConverter.java | 39 ++++-- 2 files changed, 143 insertions(+), 8 deletions(-) diff --git a/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java b/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java index 2911de719c..b897827f23 100644 --- a/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java +++ b/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java @@ -239,6 +239,116 @@ public void testPrettyJson() throws Exception { ); } + @Test + public void testConnectStruct() throws Exception { + try (ExtJsonConverter converter = new ExtJsonConverter()) { + + converter.setFormatStrategy(new CompactFormatStrategy()); + Map config = new HashMap<>(); + config.put(SerdeConfig.REGISTRY_URL, getRegistryV2ApiUrl()); + config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true"); + converter.configure(config, false); + + org.apache.kafka.connect.data.Schema envelopeSchema = buildEnvelopeSchema(); + + // Create a Struct object for the Envelope + Struct envelopeStruct = new Struct(envelopeSchema); + + // Set values for the fields in the Envelope + envelopeStruct.put("before", buildValueStruct()); + envelopeStruct.put("after", buildValueStruct()); + envelopeStruct.put("source", buildSourceStruct()); + envelopeStruct.put("op", "insert"); + envelopeStruct.put("ts_ms", 1638362438000L); // Replace with the actual timestamp + envelopeStruct.put("transaction", buildTransactionStruct()); + + + String subject = TestUtils.generateArtifactId(); + + byte[] bytes = converter.fromConnectData(subject, envelopeSchema, envelopeStruct); + + // some impl details ... + TestUtils.waitForSchema(globalId -> registryClient.getContentByGlobalId(globalId) != null, bytes); + + + Struct ir = (Struct) converter.toConnectData(subject, bytes).value(); + Assertions.assertEquals(envelopeStruct, ir); + } + } + + private static org.apache.kafka.connect.data.Schema buildEnvelopeSchema() { + // Define the Envelope schema + return SchemaBuilder.struct() + .name("dbserver1.public.aviation.Envelope") + .version(1) + .field("before", buildValueSchema()) + .field("after", buildValueSchema()) + .field("source", buildSourceSchema()) + .field("op", SchemaBuilder.STRING_SCHEMA) + .field("ts_ms", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .field("transaction", buildTransactionSchema()) + .build(); + } + + private static org.apache.kafka.connect.data.Schema buildValueSchema() { + // Define the Value schema + return SchemaBuilder.struct() + .name("dbserver1.public.aviation.Value") + .version(1) + .field("id", SchemaBuilder.INT32_SCHEMA) + .build(); + } + + private static Struct buildValueStruct() { + // Create a Struct object for the Value + Struct valueStruct = new Struct(buildValueSchema()); + + // Set value for the "id" field + valueStruct.put("id", 123); // Replace with the actual ID value + + return valueStruct; + } + + private static org.apache.kafka.connect.data.Schema buildSourceSchema() { + // Define the Source schema + return SchemaBuilder.struct() + .name("io.debezium.connector.postgresql.Source") + .version(1) + .field("id", SchemaBuilder.STRING_SCHEMA) + .field("version", SchemaBuilder.STRING_SCHEMA) + .build(); + } + + private static Struct buildSourceStruct() { + // Create a Struct object for the Source + Struct sourceStruct = new Struct(buildSourceSchema()); + + // Set values for the fields in the Source + sourceStruct.put("id", "source_id"); + sourceStruct.put("version", "1.0"); + + return sourceStruct; + } + + private static org.apache.kafka.connect.data.Schema buildTransactionSchema() { + // Define the Transaction schema + return SchemaBuilder.struct() + .name("event.block") + .version(1) + .field("id", SchemaBuilder.STRING_SCHEMA) + .build(); + } + + private static Struct buildTransactionStruct() { + // Create a Struct object for the Transaction + Struct transactionStruct = new Struct(buildTransactionSchema()); + + // Set value for the "id" field in Transaction + transactionStruct.put("id", "transaction_id"); + + return transactionStruct; + } + @Test public void testCompactJson() throws Exception { testJson( @@ -270,7 +380,7 @@ private void testJson(RegistryClient restClient, FormatStrategy formatStrategy, TestUtils.waitForSchemaCustom(globalId -> restClient.getContentByGlobalId(globalId) != null, bytes, fn); //noinspection rawtypes - Map ir = (Map) converter.toConnectData("extjson", bytes).value(); + Struct ir = (Struct) converter.toConnectData("extjson", bytes).value(); Assertions.assertEquals("somebar", ir.get("bar").toString()); } } diff --git a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ExtJsonConverter.java b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ExtJsonConverter.java index 300325b13c..f9e4cb6691 100644 --- a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ExtJsonConverter.java +++ b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ExtJsonConverter.java @@ -16,9 +16,11 @@ package io.apicurio.registry.utils.converter; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.apicurio.registry.resolver.ParsedSchema; import io.apicurio.registry.resolver.ParsedSchemaImpl; import io.apicurio.registry.resolver.SchemaLookupResult; @@ -33,12 +35,12 @@ import io.apicurio.registry.utils.converter.json.JsonConverterMetadata; import io.apicurio.registry.utils.converter.json.JsonConverterRecord; import io.apicurio.registry.utils.converter.json.PrettyFormatStrategy; - import org.apache.kafka.common.header.Headers; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.storage.Converter; import java.io.IOException; @@ -53,10 +55,13 @@ */ public class ExtJsonConverter extends SchemaResolverConfigurer implements Converter, SchemaParser, AutoCloseable { private final JsonConverter jsonConverter; + private final JsonConverter deserializingConverter; private final ObjectMapper mapper; private FormatStrategy formatStrategy; private boolean isKey; + private JsonDeserializer jsonDeserializer; + public ExtJsonConverter() { this(null); } @@ -64,8 +69,10 @@ public ExtJsonConverter() { public ExtJsonConverter(RegistryClient client) { super(client); this.jsonConverter = new JsonConverter(); + this.deserializingConverter = new JsonConverter(); this.mapper = new ObjectMapper(); this.formatStrategy = new PrettyFormatStrategy(); + this.jsonDeserializer = new JsonDeserializer(); } public ExtJsonConverter setFormatStrategy(FormatStrategy formatStrategy) { @@ -75,11 +82,16 @@ public ExtJsonConverter setFormatStrategy(FormatStrategy formatStrategy) { @Override public void configure(Map configs, boolean isKey) { - super.configure((Map)configs, isKey, this); + super.configure((Map) configs, isKey, this); this.isKey = isKey; Map wrapper = new HashMap<>(configs); wrapper.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); jsonConverter.configure(wrapper, isKey); + + Map deserializingConfig = new HashMap<>(configs); + wrapper.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); + deserializingConverter.configure(deserializingConfig, false); + jsonDeserializer.configure(wrapper, false); } @Override @@ -110,10 +122,24 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { SchemaLookupResult schemaLookupResult = getSchemaResolver().resolveSchemaByArtifactReference(ArtifactReference.builder().globalId(globalId).build()); - Schema schema = jsonConverter.asConnectSchema(schemaLookupResult.getParsedSchema().getParsedSchema()); + JsonNode parsedSchema = schemaLookupResult.getParsedSchema().getParsedSchema(); + JsonNode dataDeserialized = jsonDeserializer.deserialize(topic, ip.getPayload()); + + //Since the json converter is expecting the data to have the schema to fully validate it, we build an envelope object containing the schema from registry and the data deserialized + ObjectNode envelope = JsonNodeFactory.withExactBigDecimals(true).objectNode(); + envelope.set("schema", parsedSchema); + envelope.set("payload", dataDeserialized); + dataDeserialized = envelope; - byte[] payload = ip.getPayload(); - SchemaAndValue sav = jsonConverter.toConnectData(topic, payload); + SchemaAndValue sav; + try { + sav = deserializingConverter.toConnectData(topic, mapper.writeValueAsBytes(dataDeserialized)); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + Schema schema = deserializingConverter.asConnectSchema(schemaLookupResult.getParsedSchema().getParsedSchema()); return new SchemaAndValue(schema, sav.value()); } @@ -163,5 +189,4 @@ public ParsedSchema getSchemaFromData(Record data, boolean der public void close() throws Exception { jsonConverter.close(); } - }