forked from Apicurio/apicurio-registry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaEventSink.java
108 lines (88 loc) · 3.49 KB
/
KafkaEventSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package io.apicurio.registry.events.kafka;
import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import io.apicurio.common.apps.config.Info;
import io.apicurio.registry.events.EventSink;
import io.apicurio.registry.utils.RegistryProperties;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
@ApplicationScoped
public class KafkaEventSink implements EventSink {
@Inject
Logger log;
@Inject
@RegistryProperties(
value = {"apicurio.events.kafka.config"},
empties = {"ssl.endpoint.identification.algorithm="}
)
Properties producerProperties;
private ProducerActions<String, byte[]> producer;
private Integer partition;
@ConfigProperty(name = "apicurio.events.kafka.topic")
@Info(category = "kafka", description = "Events Kafka topic", availableSince = "2.0.0.Final")
Optional<String> eventsTopic;
@ConfigProperty(name = "apicurio.events.kafka.topic-partition")
@Info(category = "kafka", description = "Events Kafka topic partition", availableSince = "2.0.0.Final")
Optional<Integer> eventsTopicPartition;
@PostConstruct
void init() {
partition = eventsTopicPartition.orElse(null);
}
@Override
public String name() {
return "Kafka Sink";
}
@Override
public boolean isConfigured() {
return eventsTopic.isPresent();
}
@Override
public void handle(Message<Buffer> message) {
String type = message.headers().get("type");
String artifactId = message.headers().get("artifactId");
log.info("Firing event " + type);
UUID uuid = UUID.randomUUID();
Headers headers = new RecordHeaders();
headers.add("ce_id", uuid.toString().getBytes());
headers.add("ce_specversion", "1.0".getBytes());
headers.add("ce_source", "apicurio-registry".getBytes());
headers.add("ce_type", type.getBytes());
headers.add("ce_time", Instant.now().toString().getBytes());
headers.add("content-type", "application/json".getBytes());
//for artifact related operations message key will be the artifactId which will place all messages for an artifact in the same topic
String key = artifactId;
if (key == null) {
key = uuid.toString();
}
getProducer()
.apply(new ProducerRecord<String, byte[]>(
eventsTopic.get(),
partition, //partition is optional and can be null
key,
message.body().getBytes(),
headers));
}
public synchronized ProducerActions<String, byte[]> getProducer() {
if (producer == null) {
producer = new AsyncProducer<String, byte[]>(
producerProperties,
Serdes.String().serializer(),
Serdes.ByteArray().serializer()
);
}
return producer;
}
}