Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 70d8e1c

Browse files
committedFeb 27, 2024·
Add two missing examples
1 parent 5a767dc commit 70d8e1c

File tree

24 files changed

+1999
-8
lines changed

24 files changed

+1999
-8
lines changed
 

‎examples/debezium-openshift/README.md

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Debezium and Apicurio Registry on OpenShift
2+
3+
This example contains a simple application that uses Debezium with Apicurio Registry, deployed on OpenShift.
4+
5+
## Prerequisites
6+
7+
1. Prepare or provision an OpenShift cluster.
8+
9+
2. Install the following operators:
10+
11+
- AMQ Streams (tested on `2.5.0-0` / Kafka `3.4`)
12+
- Red Hat Integration - Service Registry Operator (tested on `2.2.2`)
13+
14+
3. Configure `oc`:
15+
16+
```shell
17+
oc login #...
18+
export NAMESPACE="example"
19+
oc new-project $NAMESPACE
20+
```
21+
22+
4. Prepare an image repository for example app images, and configure:
23+
24+
```shell
25+
export APP_IMAGE_GROUP="quay.io/myorg"
26+
```
27+
28+
which will result in `quay.io/myorg/apicurio-registry-examples-debezium-openshift:latest` image name.
29+
30+
5. Prepare an image repository for customized Kafka Connect images, and configure:
31+
32+
```shell
33+
export KAFKA_CONNECT_IMAGE="$APP_IMAGE_GROUP/kafka-connect-example:latest"
34+
```
35+
36+
which will result in `quay.io/myorg/kafka-connect-example:latest` image name.
37+
38+
6. Create a pull secret for the customized Kafka Connect image repository. This example command creates it from
39+
your local docker config file:
40+
41+
```shell
42+
oc create secret generic example-components-pull-secret \
43+
--from-file=.dockerconfigjson=$HOME/.docker/config.json \
44+
--type=kubernetes.io/dockerconfigjson
45+
```
46+
47+
## Deploy example components: MySQL, Kafka, and Debezium Kafka connector
48+
49+
Review the *example-components.yaml* template, then apply it:
50+
51+
```shell
52+
oc process -f example-components.yaml \
53+
-p NAMESPACE=$NAMESPACE \
54+
-p KAFKA_CONNECT_IMAGE=$KAFKA_CONNECT_IMAGE \
55+
| oc apply -f -
56+
```
57+
58+
Wait for all components to deploy (some pods may be failing for a short time).
59+
60+
After some time, you should be able to see the topics created by Debezium, for example:
61+
62+
```shell
63+
oc get --no-headers -o custom-columns=":metadata.name" kafkatopic
64+
```
65+
66+
```
67+
connect-cluster-configs
68+
connect-cluster-offsets
69+
connect-cluster-status
70+
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a
71+
example
72+
example.inventory.addresses
73+
example.inventory.customers
74+
example.inventory.orders
75+
example.inventory.products
76+
example.inventory.products-on-hand---406eef91b4bed15190ce4cbe31cee9b5db4c0133
77+
kafkasql-journal
78+
schema-changes.inventory
79+
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55
80+
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b
81+
```
82+
83+
Apicurio Registry should contain the AVRO schemas registered by Debezium. Get and configure Apicurio Registry URL by
84+
running `oc route`:
85+
86+
```shell
87+
export REGISTRY_URL="http://example-components-registry.example.router-default.apps.mycluster.com"
88+
```
89+
90+
Then, you can list the schemas using the following example command:
91+
92+
```shell
93+
curl -s "$REGISTRY_URL/apis/registry/v2/search/artifacts?limit=50&order=asc&orderby=name" \
94+
| jq -r ".artifacts[] | .id" \
95+
| sort
96+
```
97+
98+
```
99+
event.block
100+
example.inventory.addresses-key
101+
example.inventory.addresses-value
102+
example.inventory.addresses.Value
103+
example.inventory.customers-key
104+
example.inventory.customers-value
105+
example.inventory.customers.Value
106+
example.inventory.orders-key
107+
example.inventory.orders-value
108+
example.inventory.orders.Value
109+
example.inventory.products-key
110+
example.inventory.products_on_hand-key
111+
example.inventory.products_on_hand-value
112+
example.inventory.products_on_hand.Value
113+
example.inventory.products-value
114+
example.inventory.products.Value
115+
example-key
116+
example-value
117+
io.debezium.connector.mysql.Source
118+
io.debezium.connector.schema.Change
119+
io.debezium.connector.schema.Column
120+
io.debezium.connector.schema.Table
121+
```
122+
123+
From the Apicurio Registry URL, we can extract the `INGRESS_ROUTER_CANONICAL_HOSTNAME` variable that will be used later:
124+
125+
```shell
126+
export INGRESS_ROUTER_CANONICAL_HOSTNAME="router-default.apps.mycluster.com"
127+
```
128+
129+
## Build the example application
130+
131+
```shell
132+
mvn clean install \
133+
-Dregistry.url="$REGISTRY_URL" \
134+
-Dquarkus.container-image.build=true \
135+
-Dquarkus.container-image.group=$APP_IMAGE_GROUP \
136+
-Dquarkus.container-image.tag=latest
137+
```
138+
139+
Push the application image:
140+
141+
```shell
142+
docker push $APP_IMAGE_GROUP/apicurio-registry-examples-debezium-openshift:latest
143+
```
144+
145+
Apply the application template:
146+
147+
```shell
148+
oc process -f example-app.yaml \
149+
-p NAMESPACE=$NAMESPACE \
150+
-p APP_IMAGE_GROUP=$APP_IMAGE_GROUP \
151+
-p INGRESS_ROUTER_CANONICAL_HOSTNAME=$INGRESS_ROUTER_CANONICAL_HOSTNAME \
152+
| oc apply -f -
153+
```
154+
155+
## Run the example:
156+
157+
```shell
158+
curl -v -X POST -d 'start' http://example-app.$NAMESPACE.$INGRESS_ROUTER_CANONICAL_HOSTNAME/api/command
159+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
apiVersion: template.openshift.io/v1
2+
kind: Template
3+
metadata:
4+
name: example-app-template
5+
labels:
6+
template: example-app-template
7+
objects:
8+
9+
10+
- kind: Deployment
11+
apiVersion: apps/v1
12+
metadata:
13+
name: example-app
14+
namespace: ${NAMESPACE}
15+
labels:
16+
app: example-app
17+
spec:
18+
replicas: 1
19+
selector:
20+
matchLabels:
21+
app: example-app
22+
template:
23+
metadata:
24+
labels:
25+
app: example-app
26+
spec:
27+
containers:
28+
- resources:
29+
limits:
30+
cpu: 1000m
31+
memory: 1024Mi
32+
requests:
33+
cpu: 500m
34+
memory: 512Mi
35+
terminationMessagePath: /dev/termination-log
36+
name: apicurio-registry-examples-debezium-openshift
37+
env:
38+
- name: MYSQL_USER
39+
value: ${MYSQL_USER}
40+
- name: MYSQL_PASSWORD
41+
value: ${MYSQL_PASSWORD}
42+
- name: MYSQL_JDBC_URL
43+
value: jdbc:mysql://example-components-mysql.${NAMESPACE}.svc.cluster.local/${MYSQL_DATABASE}
44+
- name: KAFKA_BOOTSTRAP_SERVERS
45+
value: example-components-kafka-kafka-bootstrap.${NAMESPACE}.svc:9092
46+
- name: REGISTRY_URL
47+
value: http://example-components-registry-service.${NAMESPACE}.svc.cluster.local:8080/apis/registry/v2
48+
ports:
49+
- containerPort: 8080
50+
protocol: TCP
51+
imagePullPolicy: Always
52+
terminationMessagePolicy: File
53+
image: ${APP_IMAGE_GROUP}/apicurio-registry-examples-debezium-openshift:latest
54+
securityContext:
55+
allowPrivilegeEscalation: false
56+
runAsNonRoot: true
57+
capabilities:
58+
drop:
59+
- ALL
60+
seccompProfile:
61+
type: RuntimeDefault
62+
restartPolicy: Always
63+
terminationGracePeriodSeconds: 30
64+
dnsPolicy: ClusterFirst
65+
securityContext: { }
66+
schedulerName: default-scheduler
67+
strategy:
68+
type: RollingUpdate
69+
rollingUpdate:
70+
maxUnavailable: 25%
71+
maxSurge: 25%
72+
revisionHistoryLimit: 10
73+
progressDeadlineSeconds: 600
74+
75+
76+
- kind: Service
77+
apiVersion: v1
78+
metadata:
79+
name: example-app-service
80+
namespace: ${NAMESPACE}
81+
labels:
82+
app: example-app
83+
spec:
84+
selector:
85+
app: example-app
86+
ports:
87+
- protocol: TCP
88+
port: 8080
89+
targetPort: 8080
90+
91+
92+
- kind: Ingress
93+
apiVersion: networking.k8s.io/v1
94+
metadata:
95+
name: example-app
96+
namespace: ${NAMESPACE}
97+
spec:
98+
rules:
99+
- host: >-
100+
example-app.${NAMESPACE}.${INGRESS_ROUTER_CANONICAL_HOSTNAME}
101+
http:
102+
paths:
103+
- path: /
104+
pathType: Prefix
105+
backend:
106+
service:
107+
name: example-app-service
108+
port:
109+
number: 8080
110+
111+
112+
parameters:
113+
- name: NAMESPACE
114+
required: true
115+
- name: MYSQL_DATABASE
116+
value: inventory
117+
required: true
118+
- name: MYSQL_USER
119+
value: mysqluser
120+
required: true
121+
- name: MYSQL_PASSWORD
122+
value: mysqlpassword
123+
required: true
124+
- name: APP_IMAGE_GROUP
125+
required: true
126+
- name: INGRESS_ROUTER_CANONICAL_HOSTNAME
127+
# Find out from a status block of any Ingress or Route resource, e.g.:
128+
# status:
129+
# loadBalancer:
130+
# ingress:
131+
# - hostname: router-default.apps.mycluster.com
132+
required: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
apiVersion: template.openshift.io/v1
2+
kind: Template
3+
metadata:
4+
name: example-components-template
5+
labels:
6+
template: example-components-template
7+
objects:
8+
9+
10+
# MySQL
11+
12+
13+
- kind: ConfigMap
14+
apiVersion: v1
15+
metadata:
16+
name: example-components-mysql-init
17+
namespace: ${NAMESPACE}
18+
immutable: false
19+
data:
20+
init.sql: |
21+
USE inventory;
22+
23+
# Create and populate our products using a single insert with many rows
24+
25+
CREATE TABLE products (
26+
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
27+
name VARCHAR(255) NOT NULL,
28+
description VARCHAR(512),
29+
weight FLOAT
30+
);
31+
32+
ALTER TABLE products AUTO_INCREMENT = 101;
33+
34+
INSERT INTO products
35+
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
36+
(default,"car battery","12V car battery",8.1),
37+
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
38+
(default,"hammer","12oz carpenter's hammer",0.75),
39+
(default,"hammer","14oz carpenter's hammer",0.875),
40+
(default,"hammer","16oz carpenter's hammer",1.0),
41+
(default,"rocks","box of assorted rocks",5.3),
42+
(default,"jacket","water resistant black wind breaker",0.1),
43+
(default,"spare tire","24 inch spare tire",22.2);
44+
45+
# Create and populate the products on hand using multiple inserts
46+
47+
CREATE TABLE products_on_hand (
48+
product_id INTEGER NOT NULL PRIMARY KEY,
49+
quantity INTEGER NOT NULL,
50+
FOREIGN KEY (product_id) REFERENCES products(id)
51+
);
52+
53+
INSERT INTO products_on_hand VALUES (101,3);
54+
INSERT INTO products_on_hand VALUES (102,8);
55+
INSERT INTO products_on_hand VALUES (103,18);
56+
INSERT INTO products_on_hand VALUES (104,4);
57+
INSERT INTO products_on_hand VALUES (105,5);
58+
INSERT INTO products_on_hand VALUES (106,0);
59+
INSERT INTO products_on_hand VALUES (107,44);
60+
INSERT INTO products_on_hand VALUES (108,2);
61+
INSERT INTO products_on_hand VALUES (109,5);
62+
63+
# Create some customers ...
64+
65+
CREATE TABLE customers (
66+
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
67+
first_name VARCHAR(255) NOT NULL,
68+
last_name VARCHAR(255) NOT NULL,
69+
email VARCHAR(255) NOT NULL UNIQUE KEY
70+
) AUTO_INCREMENT=1001;
71+
72+
INSERT INTO customers
73+
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
74+
(default,"George","Bailey","gbailey@foobar.com"),
75+
(default,"Edward","Walker","ed@walker.com"),
76+
(default,"Anne","Kretchmar","annek@noanswer.org");
77+
78+
# Create some fake addresses
79+
80+
CREATE TABLE addresses (
81+
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
82+
customer_id INTEGER NOT NULL,
83+
street VARCHAR(255) NOT NULL,
84+
city VARCHAR(255) NOT NULL,
85+
state VARCHAR(255) NOT NULL,
86+
zip VARCHAR(255) NOT NULL,
87+
type enum('SHIPPING','BILLING','LIVING') NOT NULL,
88+
FOREIGN KEY address_customer (customer_id) REFERENCES customers(id)
89+
) AUTO_INCREMENT = 10;
90+
91+
INSERT INTO addresses
92+
93+
VALUES (default,1001,'3183 Moore Avenue','Euless','Texas','76036','SHIPPING'),
94+
(default,1001,'2389 Hidden Valley Road','Harrisburg','Pennsylvania','17116','BILLING'),
95+
(default,1002,'281 Riverside Drive','Augusta','Georgia','30901','BILLING'),
96+
(default,1003,'3787 Brownton Road','Columbus','Mississippi','39701','SHIPPING'),
97+
(default,1003,'2458 Lost Creek Road','Bethlehem','Pennsylvania','18018','SHIPPING'),
98+
(default,1003,'4800 Simpson Square','Hillsdale','Oklahoma','73743','BILLING'),
99+
(default,1004,'1289 University Hill Road','Canehill','Arkansas','72717','LIVING');
100+
101+
# Create some very simple orders
102+
103+
CREATE TABLE orders (
104+
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
105+
order_date DATE NOT NULL,
106+
purchaser INTEGER NOT NULL,
107+
quantity INTEGER NOT NULL,
108+
product_id INTEGER NOT NULL,
109+
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
110+
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
111+
) AUTO_INCREMENT = 10001;
112+
113+
INSERT INTO orders
114+
VALUES (default, '2016-01-16', 1001, 1, 102),
115+
(default, '2016-01-17', 1002, 2, 105),
116+
(default, '2016-02-19', 1002, 2, 106),
117+
(default, '2016-02-21', 1003, 1, 107);
118+
119+
- kind: Deployment
120+
apiVersion: apps/v1
121+
metadata:
122+
name: example-components-mysql
123+
namespace: ${NAMESPACE}
124+
labels:
125+
app: example-components-mysql
126+
spec:
127+
replicas: 1
128+
selector:
129+
matchLabels:
130+
app: example-components-mysql
131+
template:
132+
metadata:
133+
labels:
134+
app: example-components-mysql
135+
spec:
136+
containers:
137+
- resources:
138+
limits:
139+
cpu: 1000m
140+
memory: 1024Mi
141+
requests:
142+
cpu: 500m
143+
memory: 512Mi
144+
terminationMessagePath: /dev/termination-log
145+
name: mysql
146+
env:
147+
- name: MYSQL_ROOT_PASSWORD
148+
value: ${MYSQL_ROOT_PASSWORD}
149+
- name: MYSQL_USER
150+
value: ${MYSQL_USER}
151+
- name: MYSQL_PASSWORD
152+
value: ${MYSQL_PASSWORD}
153+
- name: MYSQL_DATABASE
154+
value: ${MYSQL_DATABASE}
155+
ports:
156+
- containerPort: 3306
157+
protocol: TCP
158+
imagePullPolicy: IfNotPresent
159+
terminationMessagePolicy: File
160+
image: mysql:latest
161+
securityContext:
162+
allowPrivilegeEscalation: false
163+
runAsNonRoot: true
164+
capabilities:
165+
drop:
166+
- ALL
167+
seccompProfile:
168+
type: RuntimeDefault
169+
livenessProbe:
170+
exec:
171+
command: [ "mysqladmin", "--user=${MYSQL_ROOT_USER}", "--password=${MYSQL_ROOT_PASSWORD}", "ping" ]
172+
initialDelaySeconds: 30
173+
periodSeconds: 10
174+
timeoutSeconds: 5
175+
volumeMounts:
176+
- name: mysql-init-volume
177+
mountPath: /docker-entrypoint-initdb.d
178+
volumes:
179+
- name: mysql-init-volume
180+
configMap:
181+
name: example-components-mysql-init
182+
restartPolicy: Always
183+
terminationGracePeriodSeconds: 30
184+
dnsPolicy: ClusterFirst
185+
securityContext: { }
186+
schedulerName: default-scheduler
187+
strategy:
188+
type: RollingUpdate
189+
rollingUpdate:
190+
maxUnavailable: 25%
191+
maxSurge: 25%
192+
revisionHistoryLimit: 10
193+
progressDeadlineSeconds: 600
194+
195+
196+
- kind: Service
197+
apiVersion: v1
198+
metadata:
199+
name: example-components-mysql
200+
namespace: ${NAMESPACE}
201+
labels:
202+
app: example-components-mysql
203+
spec:
204+
selector:
205+
app: example-components-mysql
206+
ports:
207+
- protocol: TCP
208+
port: 3306
209+
targetPort: 3306
210+
211+
212+
# Kafka
213+
214+
215+
- kind: Kafka
216+
apiVersion: kafka.strimzi.io/v1beta2
217+
metadata:
218+
name: example-components-kafka
219+
namespace: ${NAMESPACE}
220+
spec:
221+
entityOperator:
222+
topicOperator: { } # Required
223+
userOperator: { } # Required
224+
kafka:
225+
config:
226+
offsets.topic.replication.factor: 1
227+
transaction.state.log.min.isr: 1
228+
transaction.state.log.replication.factor: 1
229+
listeners:
230+
- name: plain
231+
port: 9092
232+
tls: false
233+
type: internal
234+
- name: tls
235+
port: 9093
236+
tls: true
237+
type: internal
238+
replicas: 1
239+
storage:
240+
type: ephemeral
241+
version: 3.4.0
242+
zookeeper:
243+
replicas: 1
244+
storage:
245+
type: ephemeral
246+
247+
248+
- apiVersion: kafka.strimzi.io/v1beta2
249+
kind: KafkaConnect
250+
metadata:
251+
annotations:
252+
strimzi.io/use-connector-resources: "true"
253+
name: example-components-kafka-connect
254+
namespace: ${NAMESPACE}
255+
spec:
256+
bootstrapServers: example-components-kafka-kafka-bootstrap.${NAMESPACE}.svc:9093
257+
build:
258+
output:
259+
image: ${KAFKA_CONNECT_IMAGE}
260+
type: docker
261+
pushSecret: example-components-pull-secret
262+
plugins:
263+
- name: debezium-connector-mysql
264+
artifacts:
265+
- type: zip
266+
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.3.Final/debezium-connector-mysql-2.3.3.Final-plugin.zip
267+
- type: zip
268+
url: https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.3.3.Final/debezium-scripting-2.3.3.Final.zip
269+
- type: jar
270+
url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.9/groovy-4.0.9.jar
271+
- type: jar
272+
url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json/4.0.9/groovy-json-4.0.9.jar
273+
- type: jar
274+
url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.9/groovy-jsr223-4.0.9.jar
275+
- type: zip
276+
url: https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final/apicurio-registry-distro-connect-converter-2.4.4.Final.zip
277+
config:
278+
config.storage.replication.factor: 1
279+
offset.storage.replication.factor: 1
280+
status.storage.replication.factor: 1
281+
replicas: 1
282+
tls:
283+
trustedCertificates:
284+
- certificate: ca.crt
285+
secretName: example-components-kafka-cluster-ca-cert
286+
287+
288+
- apiVersion: kafka.strimzi.io/v1beta2
289+
kind: KafkaConnector
290+
metadata:
291+
labels:
292+
strimzi.io/cluster: example-components-kafka-connect
293+
name: example-components-kafka-connector
294+
namespace: ${NAMESPACE}
295+
spec:
296+
class: io.debezium.connector.mysql.MySqlConnector
297+
config:
298+
299+
value.converter: io.apicurio.registry.utils.converter.AvroConverter
300+
value.converter.apicurio.registry.auto-register: true
301+
value.converter.apicurio.registry.find-latest: true
302+
value.converter.apicurio.registry.url: http://example-components-registry-service.${NAMESPACE}.svc.cluster.local:8080/apis/registry/v2
303+
304+
key.converter: io.apicurio.registry.utils.converter.AvroConverter
305+
key.converter.apicurio.registry.auto-register: true
306+
key.converter.apicurio.registry.find-latest: true
307+
key.converter.apicurio.registry.url: http://example-components-registry-service.${NAMESPACE}.svc.cluster.local:8080/apis/registry/v2
308+
309+
database.server.id: 1
310+
database.hostname: example-components-mysql.${NAMESPACE}.svc.cluster.local
311+
database.port: 3306
312+
database.user: ${MYSQL_ROOT_USER}
313+
database.password: ${MYSQL_ROOT_PASSWORD}
314+
database.dbname: inventory
315+
database.cdcschema: ASNCDC
316+
317+
schema.name.adjustment.mode: avro
318+
topic.prefix: example
319+
320+
schema.history.internal.kafka.topic: schema-changes.inventory
321+
schema.history.internal.kafka.bootstrap.servers: example-components-kafka-kafka-bootstrap.${NAMESPACE}.svc:9092 # TODO TLS?
322+
tasksMax: 1
323+
324+
325+
# Apicurio Registry
326+
327+
328+
- kind: ApicurioRegistry
329+
apiVersion: registry.apicur.io/v1
330+
metadata:
331+
name: example-components-registry
332+
namespace: ${NAMESPACE}
333+
spec:
334+
configuration:
335+
persistence: kafkasql
336+
kafkasql:
337+
bootstrapServers: example-components-kafka-kafka-bootstrap.${NAMESPACE}.svc:9092
338+
logLevel: DEBUG
339+
registryLogLevel: DEBUG
340+
341+
342+
parameters:
343+
- name: NAMESPACE
344+
required: true
345+
- name: MYSQL_ROOT_USER
346+
value: root # From MySQL image
347+
required: true
348+
- name: MYSQL_ROOT_PASSWORD
349+
value: debezium
350+
required: true
351+
- name: MYSQL_USER
352+
value: mysqluser
353+
required: true
354+
- name: MYSQL_PASSWORD
355+
value: mysqlpassword
356+
required: true
357+
- name: MYSQL_DATABASE
358+
value: inventory
359+
required: true
360+
- name: KAFKA_CONNECT_IMAGE
361+
required: true

‎examples/debezium-openshift/pom.xml

+316
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.apicurio.example.debezium;
2+
3+
4+
import lombok.Getter;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
/**
10+
* @author Jakub Senko <em>m@jsenko.net</em>
11+
*/
12+
public enum Operation {
13+
14+
CREATE("c"),
15+
READ("r"), // Used for snapshots, i.e. writes the initial (or incremental) state of database tables to each topic
16+
UPDATE("u"),
17+
DELETE("d"),
18+
TRUNCATE("t");
19+
20+
@Getter
21+
private String op;
22+
23+
Operation(String op) {
24+
this.op = op;
25+
}
26+
27+
private final static Map<String, Operation> CONSTANTS = new HashMap<>();
28+
29+
static {
30+
for (Operation c : values()) {
31+
CONSTANTS.put(c.op, c);
32+
}
33+
}
34+
35+
public static Operation from(String value) {
36+
return CONSTANTS.get(value);
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package io.apicurio.example.debezium.kafka;
2+
3+
import io.apicurio.example.debezium.Operation;
4+
import io.apicurio.example.debezium.model.*;
5+
import io.quarkus.runtime.StartupEvent;
6+
import org.apache.avro.specific.SpecificRecord;
7+
import org.apache.kafka.clients.consumer.KafkaConsumer;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import javax.enterprise.context.ApplicationScoped;
12+
import javax.enterprise.event.Observes;
13+
import javax.inject.Inject;
14+
import java.time.Duration;
15+
import java.time.temporal.ChronoUnit;
16+
import java.util.List;
17+
18+
/**
19+
* @author Jakub Senko <em>m@jsenko.net</em>
20+
*/
21+
@ApplicationScoped
22+
public class ExampleKafkaConsumer {
23+
24+
private static final Logger log = LoggerFactory.getLogger(ExampleKafkaConsumer.class);
25+
26+
@Inject
27+
KafkaFactory kafkaFactory;
28+
29+
30+
void onStart(@Observes StartupEvent event) {
31+
32+
Runnable runner = () -> {
33+
try (KafkaConsumer<Object, Object> consumer = kafkaFactory.createKafkaConsumer()) {
34+
35+
var topics = List.of(
36+
"example.inventory.addresses",
37+
"example.inventory.customers",
38+
"example.inventory.orders",
39+
"example.inventory.products",
40+
"example.inventory.products_on_hand"
41+
);
42+
var existingTopic = consumer.listTopics().keySet();
43+
if (!existingTopic.containsAll(topics)) {
44+
throw new IllegalStateException("Some topics are not available. " +
45+
"Expected: " + topics + ", actual: " + existingTopic);
46+
}
47+
48+
consumer.subscribe(topics);
49+
50+
while (true) {
51+
try {
52+
var records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS));
53+
if (records != null && !records.isEmpty()) {
54+
log.info("Consuming {} records:", records.count());
55+
records.forEach(record -> {
56+
if (record.key() == null) {
57+
log.debug("Discarded an unknown message");
58+
return;
59+
}
60+
if (record.value() == null) {
61+
log.debug("Discarded a tombstone message");
62+
return;
63+
}
64+
65+
log.info("---");
66+
log.info("Raw key: {}", record.key());
67+
log.info("Raw key schema: {}", ((SpecificRecord) record.key()).getSchema());
68+
log.info("Raw value: {}", record.value());
69+
log.info("Raw value schema: {}", ((SpecificRecord) record.value()).getSchema());
70+
71+
switch (record.topic()) {
72+
case "example.inventory.addresses": {
73+
var key = (example.inventory.addresses.Key) record.key();
74+
var value = (example.inventory.addresses.Envelope) record.value();
75+
log.info("Operation {} on Address", Operation.from(value.getOp()));
76+
log.info("ID: {}", key.getId());
77+
log.info("Before: {}", Address.from(value.getBefore()));
78+
log.info("After: {}", Address.from(value.getAfter()));
79+
break;
80+
}
81+
case "example.inventory.customers": {
82+
var key = (example.inventory.customers.Key) record.key();
83+
var value = (example.inventory.customers.Envelope) record.value();
84+
log.info("Operation {} on Customer", Operation.from(value.getOp()));
85+
log.info("ID: {}", key.getId());
86+
log.info("Before: {}", Customer.from(value.getBefore()));
87+
log.info("After: {}", Customer.from(value.getAfter()));
88+
break;
89+
}
90+
case "example.inventory.orders": {
91+
var key = (example.inventory.orders.Key) record.key();
92+
var value = (example.inventory.orders.Envelope) record.value();
93+
log.info("Operation {} on Order", Operation.from(value.getOp()));
94+
log.info("Order number: {}", key.getOrderNumber());
95+
log.info("Before: {}", Order.from(value.getBefore()));
96+
log.info("After: {}", Order.from(value.getAfter()));
97+
break;
98+
}
99+
case "example.inventory.products": {
100+
var key = (example.inventory.products.Key) record.key();
101+
var value = (example.inventory.products.Envelope) record.value();
102+
log.info("Operation {} on Product", Operation.from(value.getOp()));
103+
log.info("ID: {}", key.getId());
104+
log.info("Before: {}", Product.from(value.getBefore()));
105+
log.info("After: {}", Product.from(value.getAfter()));
106+
break;
107+
}
108+
case "example.inventory.products_on_hand": {
109+
var key = (example.inventory.products_on_hand.Key) record.key();
110+
var value = (example.inventory.products_on_hand.Envelope) record.value();
111+
log.info("Operation {} on ProductOnHand", Operation.from(value.getOp()));
112+
log.info("Product ID: {}", key.getProductId());
113+
log.info("Before: {}", ProductOnHand.from(value.getBefore()));
114+
log.info("After: {}", ProductOnHand.from(value.getAfter()));
115+
break;
116+
}
117+
default:
118+
throw new IllegalStateException("Received a message from unexpected topic: " + record.topic());
119+
}
120+
});
121+
}
122+
} catch (Exception ex) {
123+
log.error("Error reading records from Kafka", ex);
124+
}
125+
}
126+
}
127+
};
128+
var thread = new Thread(runner);
129+
thread.setDaemon(true);
130+
thread.setName("Kafka Consumer Thread");
131+
thread.start();
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.apicurio.example.debezium.kafka;
2+
3+
import io.apicurio.registry.serde.SerdeConfig;
4+
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
5+
import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
6+
import org.apache.kafka.clients.consumer.ConsumerConfig;
7+
import org.apache.kafka.clients.consumer.KafkaConsumer;
8+
import org.eclipse.microprofile.config.inject.ConfigProperty;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import javax.enterprise.context.ApplicationScoped;
13+
import java.util.Properties;
14+
15+
/**
16+
* @author Jakub Senko <em>m@jsenko.net</em>
17+
*/
18+
@ApplicationScoped
19+
public class KafkaFactory {
20+
21+
private static final Logger log = LoggerFactory.getLogger(KafkaFactory.class);
22+
23+
@ConfigProperty(name = "kafka.bootstrap.servers")
24+
String bootstrapServers;
25+
26+
@ConfigProperty(name = "registry.url")
27+
String registryUrl;
28+
29+
public KafkaConsumer<Object, Object> createKafkaConsumer() {
30+
31+
Properties props = new Properties();
32+
33+
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
34+
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "consumer-example-debezium-openshift");
35+
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
36+
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
37+
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
38+
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
39+
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
40+
41+
log.debug("Registry URL: {}", registryUrl);
42+
props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
43+
// Deserialize into a specific class instead of GenericRecord
44+
props.putIfAbsent(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
45+
46+
return new KafkaConsumer<>(props);
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.apicurio.example.debezium.model;
2+
3+
import example.inventory.addresses.Value;
4+
import lombok.*;
5+
6+
/**
7+
* @author Jakub Senko <em>m@jsenko.net</em>
8+
*/
9+
@Builder
10+
@Getter
11+
@Setter
12+
@EqualsAndHashCode
13+
@ToString
14+
public class Address {
15+
16+
private Integer id;
17+
18+
private Integer customerId;
19+
20+
private String street;
21+
22+
private String city;
23+
24+
private String state;
25+
26+
private String zip;
27+
28+
private String type;
29+
30+
public static Address from(Value value) {
31+
if (value == null) {
32+
return null;
33+
}
34+
return Address.builder()
35+
.id(value.getId())
36+
.customerId(value.getCustomerId())
37+
.street(value.getStreet())
38+
.city(value.getCity())
39+
.state(value.getState())
40+
.zip(value.getZip())
41+
.type(value.getType())
42+
.build();
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.apicurio.example.debezium.model;
2+
3+
import example.inventory.customers.Value;
4+
import lombok.*;
5+
6+
/**
7+
* @author Jakub Senko <em>m@jsenko.net</em>
8+
*/
9+
@Builder
10+
@Getter
11+
@Setter
12+
@EqualsAndHashCode
13+
@ToString
14+
public class Customer {
15+
16+
private Integer id;
17+
18+
private String firstName;
19+
20+
private String lastName;
21+
22+
private String email;
23+
24+
public static Customer from(Value value) {
25+
if (value == null) {
26+
return null;
27+
}
28+
return Customer.builder()
29+
.id(value.getId())
30+
.firstName(value.getFirstName())
31+
.lastName(value.getLastName())
32+
.email(value.getEmail())
33+
.build();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.apicurio.example.debezium.model;
2+
3+
import example.inventory.orders.Value;
4+
import lombok.*;
5+
6+
import java.time.Duration;
7+
import java.time.Instant;
8+
9+
/**
10+
* @author Jakub Senko <em>m@jsenko.net</em>
11+
*/
12+
@Builder
13+
@Getter
14+
@Setter
15+
@EqualsAndHashCode
16+
@ToString
17+
public class Order {
18+
19+
private Integer orderNumber;
20+
21+
private Instant orderDate;
22+
23+
private Integer purchaser;
24+
25+
private Integer quantity;
26+
27+
private Integer productId;
28+
29+
public static Order from(Value value) {
30+
if (value == null) {
31+
return null;
32+
}
33+
return Order.builder()
34+
.orderNumber(value.getOrderNumber())
35+
.orderDate(Instant.EPOCH.plus(Duration.ofDays(value.getOrderDate())))
36+
.purchaser(value.getPurchaser())
37+
.quantity(value.getQuantity())
38+
.productId(value.getProductId())
39+
.build();
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.apicurio.example.debezium.model;
2+
3+
import example.inventory.products.Value;
4+
import lombok.*;
5+
6+
/**
7+
* @author Jakub Senko <em>m@jsenko.net</em>
8+
*/
9+
@Builder
10+
@Getter
11+
@Setter
12+
@EqualsAndHashCode
13+
@ToString
14+
public class Product {
15+
16+
private Integer id;
17+
18+
private String name;
19+
20+
private String description;
21+
22+
private Float weight;
23+
24+
public static Product from(Value value) {
25+
if (value == null) {
26+
return null;
27+
}
28+
return Product.builder()
29+
.id(value.getId())
30+
.name(value.getName())
31+
.description(value.getDescription())
32+
.weight(value.getWeight())
33+
.build();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.apicurio.example.debezium.model;
2+
3+
import example.inventory.products_on_hand.Value;
4+
import lombok.*;
5+
6+
/**
7+
* @author Jakub Senko <em>m@jsenko.net</em>
8+
*/
9+
@Builder
10+
@Getter
11+
@Setter
12+
@EqualsAndHashCode
13+
@ToString
14+
public class ProductOnHand {
15+
16+
private Integer productId;
17+
18+
private Integer quantity;
19+
20+
public static ProductOnHand from(Value value) {
21+
if (value == null) {
22+
return null;
23+
}
24+
return ProductOnHand.builder()
25+
.productId(value.getProductId())
26+
.quantity(value.getQuantity())
27+
.build();
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.apicurio.example.debezium.rest;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import javax.inject.Inject;
7+
import javax.ws.rs.POST;
8+
import javax.ws.rs.Path;
9+
10+
/**
11+
* @author Jakub Senko <em>m@jsenko.net</em>
12+
*/
13+
@Path("/api")
14+
public class Api {
15+
16+
private static final Logger log = LoggerFactory.getLogger(Api.class);
17+
18+
@Inject
19+
ExampleRunner runner;
20+
21+
22+
@POST
23+
@Path("/command")
24+
public String command(String command) {
25+
log.info("Command received: {}", command);
26+
switch (command) {
27+
case "start":
28+
runner.setEnabled(true);
29+
return "OK";
30+
case "stop":
31+
runner.setEnabled(false);
32+
return "OK";
33+
default:
34+
return "Unknown command: " + command;
35+
}
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.apicurio.example.debezium.rest;
2+
3+
import io.apicurio.example.debezium.model.Product;
4+
import io.apicurio.example.debezium.sql.Database;
5+
import io.quarkus.scheduler.Scheduled;
6+
import lombok.Getter;
7+
import lombok.Setter;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import javax.enterprise.context.ApplicationScoped;
12+
import javax.inject.Inject;
13+
import java.util.Random;
14+
import java.util.UUID;
15+
16+
import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP;
17+
18+
@ApplicationScoped
19+
public class ExampleRunner {
20+
21+
private static final Random RANDOM = new Random();
22+
23+
private static final Logger log = LoggerFactory.getLogger(ExampleRunner.class);
24+
25+
26+
@Getter
27+
@Setter
28+
private boolean isEnabled;
29+
30+
@Inject
31+
Database database;
32+
33+
34+
@Scheduled(every = "5s", concurrentExecution = SKIP)
35+
public void run() {
36+
if (isEnabled) {
37+
var product = Product.builder()
38+
.name("name-" + UUID.randomUUID())
39+
.description("description-" + UUID.randomUUID())
40+
.weight(RANDOM.nextFloat() * 100 + 1)
41+
.build();
42+
log.info("Inserting: {}", product);
43+
product.setId(database.insertProduct(product));
44+
product.setName("updated-" + product.getName());
45+
log.info("Updating: {}", product);
46+
database.updateProduct(product);
47+
log.info("Deleting: {}", product);
48+
database.deleteProduct(product);
49+
}
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.apicurio.example.debezium.sql;
2+
3+
import io.agroal.api.AgroalDataSource;
4+
import io.apicurio.example.debezium.model.Product;
5+
6+
import javax.enterprise.context.ApplicationScoped;
7+
import javax.inject.Inject;
8+
import java.sql.*;
9+
import java.util.List;
10+
11+
/**
12+
* @author Jakub Senko <em>m@jsenko.net</em>
13+
*/
14+
@ApplicationScoped
15+
public class Database {
16+
17+
@Inject
18+
AgroalDataSource dataSource;
19+
20+
21+
public int insertProduct(Product product) {
22+
return executeUpdate("INSERT INTO products VALUES (default,?,?,?)", List.of(
23+
new SqlParam(0, product.getName(), SqlParamType.STRING),
24+
new SqlParam(1, product.getDescription(), SqlParamType.STRING),
25+
new SqlParam(2, product.getWeight(), SqlParamType.FLOAT)
26+
));
27+
}
28+
29+
30+
public void updateProduct(Product product) {
31+
executeUpdate("UPDATE products SET name = ?, description = ?, weight = ? WHERE id = ?", List.of(
32+
new SqlParam(0, product.getName(), SqlParamType.STRING),
33+
new SqlParam(1, product.getDescription(), SqlParamType.STRING),
34+
new SqlParam(2, product.getWeight(), SqlParamType.FLOAT),
35+
new SqlParam(3, product.getId(), SqlParamType.INTEGER)
36+
));
37+
}
38+
39+
40+
public void deleteProduct(Product product) {
41+
executeUpdate("DELETE FROM products WHERE id = ?", List.of(
42+
new SqlParam(0, product.getId(), SqlParamType.INTEGER)
43+
));
44+
}
45+
46+
47+
private int executeUpdate(String sql, List<SqlParam> parameters) {
48+
try (Connection connection = dataSource.getConnection()) {
49+
try (PreparedStatement statement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
50+
parameters.forEach(p -> {
51+
p.bindTo(statement);
52+
});
53+
statement.executeUpdate();
54+
ResultSet rs = statement.getGeneratedKeys();
55+
if (rs.next()) {
56+
return rs.getInt(1);
57+
} else {
58+
return -1;
59+
}
60+
}
61+
} catch (SQLException e) {
62+
throw new RuntimeException(e);
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2021 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.example.debezium.sql;
18+
19+
import java.sql.PreparedStatement;
20+
import java.sql.SQLException;
21+
import java.sql.Timestamp;
22+
import java.sql.Types;
23+
import java.util.Date;
24+
25+
/**
26+
* @author eric.wittmann@gmail.com
27+
* @author Jakub Senko <em>m@jsenko.net</em>
28+
*/
29+
public class SqlParam {
30+
31+
private final int position;
32+
33+
private final Object value;
34+
35+
private final SqlParamType type;
36+
37+
38+
public SqlParam(int position, Object value, SqlParamType type) {
39+
this.position = position;
40+
this.value = value;
41+
this.type = type;
42+
}
43+
44+
45+
public void bindTo(PreparedStatement statement) {
46+
int position = this.position + 1; // Convert from sensible position (starts at 0) to JDBC position index (starts at 1)
47+
try {
48+
switch (type) {
49+
case BYTES:
50+
statement.setBytes(position, (byte[]) value);
51+
break;
52+
case DATE:
53+
if (value == null) {
54+
statement.setNull(position, Types.TIMESTAMP);
55+
} else {
56+
Timestamp ts = new Timestamp(((Date) value).getTime());
57+
statement.setTimestamp(position, ts);
58+
}
59+
break;
60+
case ENUM:
61+
if (value == null) {
62+
statement.setNull(position, Types.VARCHAR);
63+
} else {
64+
statement.setString(position, ((Enum<?>) value).name());
65+
}
66+
break;
67+
case INTEGER:
68+
if (value == null) {
69+
statement.setNull(position, Types.INTEGER);
70+
} else {
71+
statement.setInt(position, (Integer) value);
72+
}
73+
break;
74+
case LONG:
75+
if (value == null) {
76+
statement.setNull(position, Types.INTEGER);
77+
} else {
78+
statement.setLong(position, (Long) value);
79+
}
80+
break;
81+
case STRING:
82+
statement.setString(position, (String) value);
83+
break;
84+
case FLOAT:
85+
if (value == null) {
86+
statement.setNull(position, Types.FLOAT);
87+
} else {
88+
statement.setFloat(position, (Float) value);
89+
}
90+
break;
91+
default:
92+
throw new RuntimeException("bindTo not supported for SqlParamType: " + type);
93+
}
94+
} catch (SQLException e) {
95+
throw new RuntimeException(e);
96+
}
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2021 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.example.debezium.sql;
18+
19+
/**
20+
* @author eric.wittmann@gmail.com
21+
* @author Jakub Senko <em>m@jsenko.net</em>
22+
*/
23+
public enum SqlParamType {
24+
25+
STRING, INTEGER, LONG, DATE, BYTES, ENUM, FLOAT
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
quarkus.datasource.username=${MYSQL_USER}
2+
quarkus.datasource.password=${MYSQL_PASSWORD}
3+
4+
quarkus.datasource.jdbc.url=${MYSQL_JDBC_URL}

‎examples/pom.xml

-8
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,5 @@
240240
</plugins>
241241
</build>
242242
</profile>
243-
<profile>
244-
<id>protobuf</id>
245-
<modules>
246-
<module>simple-protobuf</module>
247-
<module>protobuf-bean</module>
248-
<module>protobuf-find-latest</module>
249-
</modules>
250-
</profile>
251243
</profiles>
252244
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0"?>
2+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
3+
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>io.apicurio</groupId>
7+
<artifactId>apicurio-registry-examples</artifactId>
8+
<version>3.0.0-SNAPSHOT</version>
9+
<relativePath>../../pom.xml</relativePath>
10+
</parent>
11+
12+
<artifactId>apicurio-registry-tools-kafkasql-topic-import</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<dependencies>
16+
17+
<dependency>
18+
<groupId>org.projectlombok</groupId>
19+
<artifactId>lombok</artifactId>
20+
<version>1.18.28</version>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>com.fasterxml.jackson.core</groupId>
25+
<artifactId>jackson-core</artifactId>
26+
<version>2.15.2</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.fasterxml.jackson.core</groupId>
31+
<artifactId>jackson-databind</artifactId>
32+
<version>2.15.2</version>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>org.apache.kafka</groupId>
37+
<artifactId>kafka-clients</artifactId>
38+
<version>3.2.3</version>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>com.google.guava</groupId>
43+
<artifactId>guava</artifactId>
44+
<version>32.1.3-jre</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>info.picocli</groupId>
49+
<artifactId>picocli</artifactId>
50+
<version>4.7.5</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.slf4j</groupId>
55+
<artifactId>slf4j-api</artifactId>
56+
<version>2.0.9</version>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.slf4j</groupId>
61+
<artifactId>slf4j-simple</artifactId>
62+
<version>2.0.9</version>
63+
</dependency>
64+
65+
</dependencies>
66+
67+
<build>
68+
<plugins>
69+
<plugin>
70+
<artifactId>maven-assembly-plugin</artifactId>
71+
<configuration>
72+
<archive>
73+
<manifest>
74+
<mainClass>io.apicurio.registry.tools.kafkasqltopicimport.Main</mainClass>
75+
</manifest>
76+
</archive>
77+
<descriptorRefs>
78+
<descriptorRef>jar-with-dependencies</descriptorRef>
79+
</descriptorRefs>
80+
</configuration>
81+
<executions>
82+
<execution>
83+
<id>make-assembly</id>
84+
<phase>package</phase>
85+
<goals>
86+
<goal>single</goal>
87+
</goals>
88+
</execution>
89+
</executions>
90+
</plugin>
91+
</plugins>
92+
</build>
93+
94+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2020 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.tools.kafkasqltopicimport;
18+
19+
import lombok.*;
20+
21+
import java.util.List;
22+
23+
24+
/**
25+
* @author Jakub Senko <em>m@jsenko.net</em>
26+
*/
27+
@Builder
28+
@NoArgsConstructor
29+
@AllArgsConstructor
30+
@Getter
31+
@Setter
32+
@EqualsAndHashCode
33+
@ToString
34+
public class Envelope {
35+
36+
private String topic;
37+
38+
private Integer partition;
39+
40+
private Long offset;
41+
42+
private String tstype;
43+
44+
private Long ts;
45+
46+
private Long broker;
47+
48+
private List<String> headers;
49+
50+
private String key;
51+
52+
private String payload;
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2023 JBoss Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.tools.kafkasqltopicimport;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.google.common.collect.Streams;
21+
import org.apache.kafka.clients.producer.KafkaProducer;
22+
import org.apache.kafka.clients.producer.Producer;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.internals.RecordHeader;
26+
import org.apache.kafka.common.serialization.ByteArraySerializer;
27+
import org.slf4j.simple.SimpleLogger;
28+
import picocli.CommandLine.Command;
29+
30+
import java.io.BufferedReader;
31+
import java.io.FileReader;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.Base64;
34+
import java.util.List;
35+
import java.util.Properties;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.IntStream;
38+
39+
import static picocli.CommandLine.Option;
40+
41+
/**
42+
* @author Jakub Senko <em>m@jsenko.net</em>
43+
*/
44+
@Command(name = "import", version = "0.1", mixinStandardHelpOptions = true)
45+
public class ImportCommand implements Runnable {
46+
47+
private static final ObjectMapper mapper = new ObjectMapper();
48+
49+
@Option(names = {"-b", "--bootstrap-sever"}, description = "Kafka bootstrap server URL.",
50+
required = true, defaultValue = "localhost:9092")
51+
private String kafkaBootstrapServer;
52+
53+
@Option(names = {"-f", "--file"}, description = "Path to a kafkasql-journal topic dump file. " +
54+
"Messages must use a JSON envelope and have base64-encoded keys and values.", required = true)
55+
private String dumpFilePath;
56+
57+
@Option(names = {"-d", "--debug"}, description = "Print debug log messages.", defaultValue = "false")
58+
private boolean debug;
59+
60+
public void run() {
61+
62+
if(debug) {
63+
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "DEBUG");
64+
} else {
65+
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "WARN");
66+
}
67+
68+
try (Producer<byte[], byte[]> producer = createKafkaProducer()) {
69+
70+
try (BufferedReader br = new BufferedReader(new FileReader(dumpFilePath))) {
71+
String line;
72+
while ((line = br.readLine()) != null) {
73+
var envelope = mapper.readValue(line, Envelope.class);
74+
75+
if (envelope.getHeaders() == null) {
76+
envelope.setHeaders(List.of());
77+
}
78+
if (envelope.getHeaders().size() % 2 != 0) {
79+
throw new RuntimeException("Invalid length of the headers field: " + envelope.getHeaders().size());
80+
}
81+
82+
var key = envelope.getKey() != null ? Base64.getDecoder().decode(envelope.getKey()) : null;
83+
var value = envelope.getPayload() != null ? Base64.getDecoder().decode(envelope.getPayload()) : null;
84+
85+
var record = new ProducerRecord<>(
86+
envelope.getTopic(),
87+
envelope.getPartition(),
88+
envelope.getTs(),
89+
key,
90+
value,
91+
Streams.zip(
92+
Streams.zip(
93+
IntStream.range(0, Integer.MAX_VALUE).boxed(),
94+
envelope.getHeaders().stream(),
95+
Tuple::new
96+
).filter(t -> t.getA() % 2 == 0).map(Tuple::getB), // Even indexes: 0,2,4,...
97+
Streams.zip(
98+
IntStream.range(0, Integer.MAX_VALUE).boxed(),
99+
envelope.getHeaders().stream(),
100+
Tuple::new
101+
).filter(t -> t.getA() % 2 == 1).map(Tuple::getB), // Odd indexes: 1,3,5,...
102+
(k, v) -> new RecordHeader(k, v.getBytes(StandardCharsets.UTF_8)))
103+
.collect(Collectors.toList())
104+
);
105+
producer.send(record);
106+
}
107+
}
108+
109+
producer.flush();
110+
System.err.println("Data imported successfully.");
111+
112+
} catch (Exception ex) {
113+
System.err.println("Data import failed: " + ex.getMessage());
114+
ex.printStackTrace(System.err);
115+
}
116+
}
117+
118+
119+
private Producer<byte[], byte[]> createKafkaProducer() {
120+
121+
Properties props = new Properties();
122+
123+
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
124+
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-kafkasql-journal");
125+
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
126+
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
127+
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
128+
129+
return new KafkaProducer<>(props);
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2023 JBoss Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.tools.kafkasqltopicimport;
18+
19+
import picocli.CommandLine;
20+
21+
/**
22+
* @author Jakub Senko <em>m@jsenko.net</em>
23+
*/
24+
public class Main {
25+
26+
public static void main(String[] args) {
27+
28+
CommandLine cmd = new CommandLine(new ImportCommand());
29+
int exitCode = cmd.execute(args);
30+
System.exit(exitCode);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.tools.kafkasqltopicimport;
18+
19+
import lombok.*;
20+
21+
/**
22+
* @author Jakub Senko <em>m@jsenko.net</em>
23+
*/
24+
@Builder
25+
@NoArgsConstructor
26+
@AllArgsConstructor
27+
@Getter
28+
@Setter
29+
@EqualsAndHashCode
30+
@ToString
31+
public class Tuple<A, B> {
32+
33+
private A a;
34+
35+
private B b;
36+
}

0 commit comments

Comments
 (0)
Please sign in to comment.