|
22 | 22 | import io.apicurio.registry.rest.client.models.IfArtifactExists;
|
23 | 23 | import io.apicurio.registry.rest.client.models.VersionContent;
|
24 | 24 | import io.apicurio.registry.rest.client.models.VersionMetaData;
|
| 25 | +import io.apicurio.registry.serde.config.KafkaSerdeConfig; |
25 | 26 | import io.apicurio.registry.serde.config.SerdeConfig;
|
26 | 27 | import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
|
27 | 28 | import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
|
@@ -184,6 +185,8 @@ private static Producer<Object, Object> createKafkaProducer() {
|
184 | 185 | // Use the Apicurio Registry provided Kafka Serializer for JSON Schema
|
185 | 186 | props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
186 | 187 | JsonSchemaKafkaSerializer.class.getName());
|
| 188 | + // Send metadata in message headers (includes the schema coordinates and the message bean type) |
| 189 | + props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true); |
187 | 190 |
|
188 | 191 | // Configure Service Registry location
|
189 | 192 | props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
|
@@ -215,6 +218,8 @@ private static KafkaConsumer<Long, MessageBean> createKafkaConsumer() {
|
215 | 218 | // Use the Apicurio Registry provided Kafka Deserializer for JSON Schema
|
216 | 219 | props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
217 | 220 | JsonSchemaKafkaDeserializer.class.getName());
|
| 221 | + // Read metadata from message headers (includes the schema coordinates and the message bean type) |
| 222 | + props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true); |
218 | 223 |
|
219 | 224 | // Configure Service Registry location
|
220 | 225 | props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
|
|
0 commit comments