Skip to content

Commit 51f953e

Browse files
authored
Backport examples to old branch (#4710)
1 parent 35261f4 commit 51f953e

File tree

170 files changed

+13340
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+13340
-0
lines changed

examples/README.md

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
The examples on this repository have been moved to the [main Apicurio Registry Repository](https://github.com/Apicurio/apicurio-registry/examples).
2+
3+
4+
5+
# Apicurio Registry Example Applications
6+
This repository contains a set of example applications (mostly Kafka applications) that use the
7+
Apicurio Registry as part of their workflow. The registry is typically used to store schemas
8+
used by Kafka serializer and deserializer classes. These serdes classes will fetch the schema
9+
from the registry for use during producing or consuming operations (to serializer, deserializer,
10+
or validate the Kafka message payload).
11+
12+
Each example in this repository attempts to demonstrate some specific use-case or configuration.
13+
There are numerous options available when integrating with the registry, and therefore the set
14+
of examples found here may not cover every configuration permutation.
15+
16+
# List of Examples
17+
A list of examples is included below, with descriptions and explanations of each covered use-case.
18+
19+
## Simple Avro Example
20+
This example application demonstrates the basics of using the registry in a very simple Kafka
21+
publish/subscribe application using Apache Avro as the schema technology used to serialize
22+
and deserialize message payloads.
23+
24+
## Simple JSON Schema Example
25+
This example application demonstrates the basics of using the registry in a very simple Kafka
26+
publish/subscribe application using JSON Schema to validate message payloads when both producing
27+
and consuming them. JSON Schema is not a serialization technology, but rather is only used for
28+
validation. Therefore it can be enabled or disabled in the serializer and deserializer.
29+
30+
## Confluent Serdes Integration
31+
This example shows how Apicurio Registry serdes classes can be used along with Confluent serdes
32+
classes in a mixed application environment. In other words, some applications can be using
33+
Confluent classes while other applications can be using Apicurio Registry classes - and they
34+
can all work together seamlessly with just a little bit of extra configuration. This example
35+
is essentially the same as the Simple Avro Example, but using a Confluent serializer with an
36+
Apicurio Registry deserializer.
37+
38+
## Avro Bean Example
39+
This example demonstrates how to use Avro as the schema and serialization technology while
40+
using a Java Bean as the Kafka message payload. This is essentially the same as the Simple
41+
Avro Example, but using a java bean instead of a `GenericRecord` as the message payload.
42+
43+
## Custom ID Strategy Example
44+
This example demonstrates how to use a custom Global ID strategy. The Global ID strategy is
45+
used by a producer (serializer) application to lookup (or create) the Schema it is using for
46+
serialization. Apicurio Registry comes with some useful implementations of the Global ID
47+
strategy out of the box, but it is possible to create your own. This example is essentially
48+
the same as the Simple Avro Example, except instead of using one of the default Apicurio
49+
Registry Global ID strategies, it uses a custom one.
50+
51+
## Simple Avro Maven Example
52+
This example application demonstrates how to use the Apicurio Registry maven plugin to
53+
pre-register an Avro schema so that it does not need to be embedded within the producer
54+
application. Note that this example will fail unless the maven plugin is executed before
55+
the Java application. See the javadoc in the example for details.
56+
57+
## Rest Client example
58+
This example application demonstrates how to use the Apicurio Registry rest client to create,
59+
delete, or fetch schemas. This example contains two basic java application, one showing how to
60+
improve the logs by logging all the request and response headers and a basic example on how to
61+
use the client.
62+
63+
## Mix Avro Schemas Example
64+
This example application showcases an scenario where Apache Avro messages are published to the same
65+
Kafka topic using different Avro schemas. This example uses the Apicurio Registry serdes classes to serialize
66+
and deserialize Apache Avro messages using different schemas, even if received in the same Kafka topic.
67+
68+
## Cloud Events PoC
69+
This is an example application that implements a REST API that consumes and produces CloudEvents.
70+
This example application showcases an experimental library from apicurio-registry project. This library is used to validate incoming and outgoing CloudEvents messages in the REST API.
71+
The validation is performed against json schemas that are stored in Apicurio Registry. For a more detailed explanation go [here](../apicurio-registry/examples/cloudevents/README.md).

examples/avro-bean/pom.xml

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?xml version="1.0"?>
2+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
3+
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>io.apicurio</groupId>
7+
<artifactId>apicurio-registry-examples</artifactId>
8+
<version>2.5.12-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
12+
<artifactId>apicurio-registry-examples-avro-bean</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>io.apicurio</groupId>
18+
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
19+
<version>${apicurio-registry.version}</version>
20+
</dependency>
21+
<dependency>
22+
<groupId>org.apache.kafka</groupId>
23+
<artifactId>kafka-clients</artifactId>
24+
<version>${kafka.version}</version>
25+
</dependency>
26+
<dependency>
27+
<groupId>io.strimzi</groupId>
28+
<artifactId>kafka-oauth-client</artifactId>
29+
<version>0.8.1</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.slf4j</groupId>
33+
<artifactId>slf4j-api</artifactId>
34+
<version>${slf4j.version}</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.slf4j</groupId>
38+
<artifactId>slf4j-simple</artifactId>
39+
<version>${slf4j.version}</version>
40+
</dependency>
41+
</dependencies>
42+
43+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2023 JBoss Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.examples.avro.bean;
18+
19+
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.Date;
22+
import java.util.Properties;
23+
24+
import org.apache.kafka.clients.consumer.ConsumerConfig;
25+
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.clients.consumer.KafkaConsumer;
27+
import org.apache.kafka.clients.producer.KafkaProducer;
28+
import org.apache.kafka.clients.producer.Producer;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.clients.producer.ProducerRecord;
31+
import org.apache.kafka.common.config.SaslConfigs;
32+
import org.apache.kafka.common.serialization.StringDeserializer;
33+
import org.apache.kafka.common.serialization.StringSerializer;
34+
35+
import io.apicurio.registry.serde.SerdeConfig;
36+
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
37+
import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
38+
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
39+
import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
40+
41+
/**
42+
* This example demonstrates how to use the Apicurio Registry in a very simple publish/subscribe
43+
* scenario with Avro as the serialization type. The following aspects are demonstrated:
44+
*
45+
* <ol>
46+
* <li>Configuring a Kafka Serializer for use with Apicurio Registry</li>
47+
* <li>Configuring a Kafka Deserializer for use with Apicurio Registry</li>
48+
* <li>Auto-register the Avro schema in the registry (registered by the producer)</li>
49+
* <li>Data sent as a {@link GreetingBean}</li>
50+
* </ol>
51+
*
52+
* Pre-requisites:
53+
*
54+
* <ul>
55+
* <li>Kafka must be running on localhost:9092</li>
56+
* <li>Apicurio Registry must be running on localhost:8080</li>
57+
* </ul>
58+
*
59+
* @author eric.wittmann@gmail.com
60+
* @author carles.arnal@redhat.com
61+
*/
62+
public class AvroBeanExample {
63+
64+
private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
65+
private static final String SERVERS = "localhost:9092";
66+
private static final String TOPIC_NAME = AvroBeanExample.class.getSimpleName();
67+
private static final String SUBJECT_NAME = "Greeting";
68+
69+
70+
public static final void main(String [] args) throws Exception {
71+
System.out.println("Starting example " + AvroBeanExample.class.getSimpleName());
72+
String topicName = TOPIC_NAME;
73+
String subjectName = SUBJECT_NAME;
74+
75+
// Create the producer.
76+
Producer<Object, Object> producer = createKafkaProducer();
77+
// Produce 5 messages.
78+
int producedMessages = 0;
79+
try {
80+
System.out.println("Producing (5) messages.");
81+
for (int idx = 0; idx < 5; idx++) {
82+
GreetingBean greeting = new GreetingBean();
83+
greeting.setMessage("Hello (" + producedMessages++ + ")!");
84+
greeting.setTime(System.currentTimeMillis());
85+
86+
87+
// Send/produce the message on the Kafka Producer
88+
ProducerRecord<Object, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, greeting);
89+
producer.send(producedRecord);
90+
91+
Thread.sleep(100);
92+
}
93+
System.out.println("Messages successfully produced.");
94+
} finally {
95+
System.out.println("Closing the producer.");
96+
producer.flush();
97+
producer.close();
98+
}
99+
100+
// Create the consumer
101+
System.out.println("Creating the consumer.");
102+
KafkaConsumer<Long, GreetingBean> consumer = createKafkaConsumer();
103+
104+
// Subscribe to the topic
105+
System.out.println("Subscribing to topic " + topicName);
106+
consumer.subscribe(Collections.singletonList(topicName));
107+
108+
// Consume the 5 messages.
109+
try {
110+
int messageCount = 0;
111+
System.out.println("Consuming (5) messages.");
112+
while (messageCount < 5) {
113+
final ConsumerRecords<Long, GreetingBean> records = consumer.poll(Duration.ofSeconds(1));
114+
messageCount += records.count();
115+
if (records.count() == 0) {
116+
// Do nothing - no messages waiting.
117+
System.out.println("No messages waiting...");
118+
} else records.forEach(record -> {
119+
GreetingBean greeting = record.value();
120+
System.out.println("Consumed a message: " + greeting.getMessage() + " @ " + new Date(greeting.getTime()));
121+
});
122+
}
123+
} finally {
124+
consumer.close();
125+
}
126+
127+
System.out.println("Done (success).");
128+
}
129+
130+
/**
131+
* Creates the Kafka producer.
132+
*/
133+
private static Producer<Object, Object> createKafkaProducer() {
134+
Properties props = new Properties();
135+
136+
// Configure kafka settings
137+
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
138+
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
139+
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
140+
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
141+
// Use the Apicurio Registry provided Kafka Serializer for Avro
142+
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
143+
144+
// Configure Service Registry location
145+
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
146+
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
147+
// Use Java reflection as the Avro Datum Provider - this also generates an Avro schema from the java bean
148+
props.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
149+
150+
//Just if security values are present, then we configure them.
151+
configureSecurityIfPresent(props);
152+
153+
// Create the Kafka producer
154+
Producer<Object, Object> producer = new KafkaProducer<>(props);
155+
156+
return producer;
157+
}
158+
159+
/**
160+
* Creates the Kafka consumer.
161+
*/
162+
private static KafkaConsumer<Long, GreetingBean> createKafkaConsumer() {
163+
Properties props = new Properties();
164+
165+
// Configure Kafka
166+
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
167+
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
168+
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
169+
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
170+
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
171+
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
172+
// Use the Apicurio Registry provided Kafka Deserializer for Avro
173+
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
174+
175+
// Configure Service Registry location
176+
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
177+
// Use Java reflection as the Avro Datum Provider
178+
props.putIfAbsent(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
179+
// No other configuration needed for the deserializer, because the globalId of the schema
180+
// the deserializer should use is sent as part of the payload. So the deserializer simply
181+
// extracts that globalId and uses it to look up the Schema from the registry.
182+
183+
//Just if security values are present, then we configure them.
184+
configureSecurityIfPresent(props);
185+
186+
// Create the Kafka Consumer
187+
KafkaConsumer<Long, GreetingBean> consumer = new KafkaConsumer<>(props);
188+
return consumer;
189+
}
190+
191+
public static void configureSecurityIfPresent(Properties props) {
192+
final String tokenEndpoint = System.getenv(SerdeConfig.AUTH_TOKEN_ENDPOINT);
193+
if (tokenEndpoint != null) {
194+
195+
final String authClient = System.getenv(SerdeConfig.AUTH_CLIENT_ID);
196+
final String authSecret = System.getenv(SerdeConfig.AUTH_CLIENT_SECRET);
197+
198+
props.putIfAbsent(SerdeConfig.AUTH_CLIENT_SECRET, authSecret);
199+
props.putIfAbsent(SerdeConfig.AUTH_CLIENT_ID, authClient);
200+
props.putIfAbsent(SerdeConfig.AUTH_TOKEN_ENDPOINT, tokenEndpoint);
201+
props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
202+
props.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
203+
props.putIfAbsent("security.protocol", "SASL_SSL");
204+
205+
props.putIfAbsent(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
206+
" oauth.client.id=\"%s\" "+
207+
" oauth.client.secret=\"%s\" "+
208+
" oauth.token.endpoint.uri=\"%s\" ;", authClient, authSecret, tokenEndpoint));
209+
}
210+
}
211+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 Red Hat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.apicurio.registry.examples.avro.bean;
18+
19+
/**
20+
* @author eric.wittmann@gmail.com
21+
*/
22+
public class GreetingBean {
23+
24+
private String message;
25+
private long time;
26+
27+
/**
28+
* Constructor.
29+
*/
30+
public GreetingBean() {
31+
}
32+
33+
/**
34+
* @return the message
35+
*/
36+
public String getMessage() {
37+
return message;
38+
}
39+
40+
/**
41+
* @param message the message to set
42+
*/
43+
public void setMessage(String message) {
44+
this.message = message;
45+
}
46+
47+
/**
48+
* @return the time
49+
*/
50+
public long getTime() {
51+
return time;
52+
}
53+
54+
/**
55+
* @param time the time to set
56+
*/
57+
public void setTime(long time) {
58+
this.time = time;
59+
}
60+
61+
}

0 commit comments

Comments
 (0)