Skip to content

1.1.0 #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
target/
.idea/
*.iml
.DS_STORE
.DS_STORE
/.classpath
*/.classpath
/.project
*/.project
*/.settings
bin/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ Happy learning!
- Word Count to learn the basic API
- Favourite Colour for a more advanced example (`Scala` version included)
- Bank Balance to demonstrate exactly once semantics
- User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
- User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
4 changes: 2 additions & 2 deletions bank-balance-exactly-once/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.1</version>
<version>1.1.0</version>
</dependency>

<!--to write the kafka producer-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
<version>1.1.0</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.time.Instant;
import java.util.Properties;
Expand All @@ -33,18 +35,17 @@ public static void main(String[] args) {

// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

StreamsBuilder builder = new StreamsBuilder();


KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> bankTransactions =
builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

KStream<String, JsonNode> bankTransactions = builder.stream("bank-transactions",
Consumed.with(Serdes.String(), jsonSerde));


// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
Expand All @@ -53,22 +54,23 @@ public static void main(String[] args) {
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

KTable<String, JsonNode> bankBalance = bankTransactions
.groupByKey(Serdes.String(), jsonSerde)
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
Materialized.<String, JsonNode, KeyValueStore<Bytes, byte[]>>as("bank-balance-agg")
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde)
);

bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once");
bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
2 changes: 1 addition & 1 deletion favourite-colour-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>1.1.0</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import java.util.Properties;
import java.util.Arrays;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

public class FavouriteColourApp {

Expand All @@ -26,8 +30,7 @@ public static void main(String[] args) {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

KStreamBuilder builder = new KStreamBuilder();

StreamsBuilder builder = new StreamsBuilder();
// Step 1: We create the topic of users keys to colours
KStream<String, String> textLines = builder.stream("favourite-colour-input");

Expand All @@ -43,25 +46,30 @@ public static void main(String[] args) {

usersAndColours.to("user-keys-and-colours");

Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// step 2 - we read that topic as a KTable so that updates are read correctly
KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours");

// step 3 - we count the occurences of colours
KTable<String, Long> favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count("CountsByColours");
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde));

// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output");
favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// only do this in dev - not in prod
streams.cleanUp();
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
2 changes: 1 addition & 1 deletion favourite-colour-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ scalaVersion := "2.12.3"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "0.11.0.0",
"org.apache.kafka" % "kafka-streams" % "1.1.0",
"org.slf4j" % "slf4j-api" % "1.7.25",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.lang
import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}

object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
Expand All @@ -21,7 +23,7 @@ object FavouriteColourAppScala {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")

val builder: KStreamBuilder = new KStreamBuilder
val builder: StreamsBuilder = new StreamsBuilder

// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
Expand All @@ -32,7 +34,9 @@ object FavouriteColourAppScala {
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
// 3 - we get the colour from the value (lowercase for safety)
.mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
.mapValues[String](new ValueMapper[String, String] {
override def apply(value: String): String = { value.split(",")(1).toLowerCase }
})
// 4 - we filter undesired colours (could be a data sanitization step)
.filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))

Expand All @@ -42,21 +46,29 @@ object FavouriteColourAppScala {
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)

val stringSerde: Serde[String] = Serdes.String
val longSerde: Serde[lang.Long] = Serdes.Long

// step 3 - we count the occurences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
.count("CountsByColours")
.groupBy(
(user: String, colour: String) => new KeyValue[String, String](colour, colour),
Serialized.`with`(stringSerde, stringSerde)
)
.count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))

// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde))

val streams: KafkaStreams = new KafkaStreams(builder, config)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.cleanUp()
streams.start()

// print the topology
System.out.println(streams.toString)
streams.localThreadsMetadata().forEach(t => System.out.print(t.toString))

// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
Expand Down
2 changes: 1 addition & 1 deletion streams-starter-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>1.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

Expand All @@ -20,18 +20,18 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> kStream = builder.stream("input-topic-name");
// do stuff
kStream.to("word-count-output");

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
2 changes: 1 addition & 1 deletion user-event-enricher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>1.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

Expand All @@ -20,7 +20,7 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();

// we get a global table out of Kafka. This table will be replicated on each Kafka Streams application
// the key of our globalKTable is the user ID
Expand Down Expand Up @@ -55,12 +55,12 @@ public static void main(String[] args) {
userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join");


KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
1 change: 1 addition & 0 deletions word-count/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin/
17 changes: 16 additions & 1 deletion word-count/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,22 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>1.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>


Expand Down
Loading