diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ad44c3b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,19 @@
+# default excludes
+
+# maven
+target
+
+# intellij
+*.iml
+.idea
+
+# eclipse
+*.project
+*.classpath
+.settings
+
+# mac os
+.DS_Store
+
+# local spring boot configuration
+/config
\ No newline at end of file
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..f87a804
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,4 @@
+Contributors:
+
+Alexander Palamarchuk
+Leo Kuzmanovic
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..95e3b0f
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2019 idealo internet GmbH
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ee49424
--- /dev/null
+++ b/README.md
@@ -0,0 +1,97 @@
+# Kafka EX1
+
+**Kafka EX1** is an autoconfigurable utility to equip your Spring Kafka listeners with the idempotency,
+maintaining the "exactly-once" (the name of the artifact is a play of these words) semantics.
+
+A typical use case for this is when a Kafka consumer executes one or a set of actions (e.g. sending an email) that should not be performed twice.
+But if you discover that your live system has a bug, which in some cases leads to non-retryable exceptions thrown to the consumer.
+With Kafka you can simply reset the offsets of the consumer and re-process the events from the moment the bug went live. This however demands you
+implement an extra-logic in your software, so that it understands what actions were already done and what weren't, so that you execute the missing steps,
+but not redo them. In quite many cases it is _universally_ enough just to remember a record and to ignore the whole consumer handler method call.
+And **Kafka EX1** will help you here easily. The only thing you need to have is persistence.
+
+## Requirements
+* Java 11
+* Spring Boot 2.2
+* Spring Kafka
+* Redis (the only supported persistence provider at the moment)
+
+## Installation
+1. Add dependency to your pom.xml.
+We are currently working on publishing the "Kafka EX1" artifact to Maven Central. Whilst this hasn't you can build it on your own.
+
+2. Enable AspectJ proxying:
+```java
+@EnableAspectJAutoProxy
+@SpringBootApplication
+public class Application {
+ // ...
+}
+```
+
+3. Set Spring Redis autoconfiguration properties:
+```yaml
+spring:
+ redis:
+ sentinel:
+ master: as_op_redis
+ nodes:
+ - 'redis-01.example.net:26379'
+ - 'redis-02.example.net:26379'
+ - 'redis-03.example.net:26379'
+```
+
+## Usage
+In order to use the @IdempotentListener annotation you have to supply Kafka listeners with sets of record parameters
+ that are unique at least within their consumer group.
+
+It can be one surrogate event UUID taken from the header if publish records equipped with that:
+```java
+class MyTopicListener {
+ @KafkaListener(topics = "my-topic")
+ @IdempotentListener(ttl = 86400)
+ public void onEvent(
+ @Payload MyEvent event,
+ @IdempotencyId @Header(name = "EVENT_ID") String eventId) {
+ // ...
+ }
+ // ...
+}
+```
+When this listener method is called @IdempotentListener will take care of iterating over all arguments annotated
+ with @IdempotencyId, build a unique key out of them and persist it in Redis with the configured TTL.
+ If the annotation parameter "ttl" is not set, the default global setting is used (also configurable; see below).
+
+The most universal way of supplying unique record keys is to use the topic name, partition and offset. You don't need to do
+ anything with producers, because Spring will take care of them automatically:
+```java
+import org.springframework.kafka.support.KafkaHeaders;
+
+class MyTopicListener {
+ @KafkaListener(topics = "my-topic")
+ @IdempotentListener
+ public void onEvent(
+ @Payload MyEvent event,
+ @IdempotencyId @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @IdempotencyId @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
+ @IdempotencyId @Header(KafkaHeaders.OFFSET) Long offset) {
+ // ...
+ }
+ // ...
+}
+```
+
+Of course you can use shared Redis even if there are multiple applications consuming from same topic.
+ The library automatically prefixes all keys with consumer group id taken from the autoconfigured Spring Kafka properties.
+
+## Configuration
+There is a number of properties that can change the behavior of Kafka Idempotency.
+
+| Property | Default | Description |
+| :---: | :---: | :---: |
+| `idealo.kafka.idempotency.enabled` | true | Use this to disable the library completely |
+| `idealo.kafka.idempotency.listener.checkEnabled` | true | Using this property you can completely ignore the idempotency check even if the annotation `@IdempotentListener` is set. This might be useful e.g. if you want to temporarily force the listeners re-consume all the events ignoring saved idempotency markers. Note that the idempotency markers keep to be persisted independently of this setting. |
+| `idealo.kafka.idempotency.listener.persistenceEnabled` | true | Using this property you can completely disable saving the idempotency markers. This might be useful is something in your setup is broken, but you do not want to block record consumption. Note that there'll be no way to provide the only-one semantic for the records that were consumed during this setting was set to false. |
+| `idealo.kafka.idempotency.listener.ttl` | 7d | Duration of the guaranteed idempotency per record. After expiring, the information about a consumed record is removed from persistence. This can be overridden per listener, directly in `@IdempotentListener`. |
+| `idealo.kafka.idempotency.listener.keyPrefix` | kafkaidmp | Prefix used for all the keys persisted in Redis that contain the idempotency information. Whilst in Redis it is only possible to use the TTL feature only on the keys, we cannot use Sets in order to at least namespace the data handled by this library. The workaround for this is to use simple "1-character" strings in the root namespace, where the information for the lookup is hold by the keys. This is how a typical key with the default prefix looks like: `kafkaidmp_myconsumerid_1c9bb6f0-5b91-4be7-acad-6bf089ed0bef`. If the traffic in the topic you use this library is really of a high scale, you should monitor the memory footprint of the idempotency data in Redis. This property give you extra means for optimisation. |
+| `idealo.kafka.idempotency.listener.suppressErrors` | false | If true, any exceptions during the lookup or persistence are logged, but not bubbled up to the listener container. A typical case when this matter is e.g. short outages of the Redis cluster. If this happens, the idempotency data cannot be persisted, hence cannot be later looked up and therefore the idempotency is simply not maintained. Such behavior is inconsistent and should be avoided. However in practice the error handling and acknowledgment logic are sometimes not properly configured, which in case of such an outage leads to skipped records, which is normally worse, than inability to maintain the exactly-one semantic. If this is your situation, you can set this option to true. * Note that this only changes the behavior of the look up hook, as it already makes no sense to throw an exception after the record handler has correctly finished its work: a retry would make it process the same record again, whereas it indeed relies on the idempotency check to maintain the exactly-one semantics.|
diff --git a/jenkins/pipeline.groovy b/jenkins/pipeline.groovy
new file mode 100644
index 0000000..a9f89c2
--- /dev/null
+++ b/jenkins/pipeline.groovy
@@ -0,0 +1,42 @@
+@Library('idp@0.17.11')
+
+String sourceRepository = 'ssh://git@code.eu.idealo.com:7999/dk/kafka-consumer-idempotency.git'
+
+pipeline {
+ agent { node { label 'java' } }
+
+ environment {
+ IDP_SKIP_STAGES = true
+ }
+
+ stages {
+ stage('test and install new version') {
+ steps {
+ script {
+ idp_maven {
+ gitRepoUrl = sourceRepository
+ mavenGoals = "clean install"
+ jdkVersion = 'openjdk-11.0.2'
+ }
+ }
+ }
+ }
+ stage('release to artifactory') {
+ steps {
+ script {
+ idp_createBuildVersion {
+ gitRepoUrl = sourceRepository
+ }
+ }
+ script {
+ idp_buildReleasable {
+ mvnSkipTests = true
+ artifactoryTargetRepo = 'libs-commit-local'
+ jdkVersion = 'openjdk-11.0.2'
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b209512
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,137 @@
+
+
+ 4.0.0
+ de.idealo.kafka
+ kafka-ex1
+ 0.17.0
+ jar
+ idempotency
+ Easy way to make Spring Kafka listeners idempotent
+
+
+
+ fankandin
+ Alexander Palamarchuk
+ a@palamarchuk.info
+ https://github.com/fankandin
+
+ author
+
+
+
+ FlokiOo
+ David Vinco
+ https://github.com/FlokiOo
+
+ developer
+ maintainer
+
+
+
+
+
+
+ Apache-2.0
+ https://opensource.org/licenses/Apache-2.0
+ repo
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.11.RELEASE
+
+
+
+ 11
+
+
+
+
+ org.aspectj
+ aspectjrt
+ 1.9.5
+
+
+ org.aspectj
+ aspectjweaver
+ 1.9.5
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.data
+ spring-data-redis
+
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ io.lettuce
+ lettuce-core
+ test
+
+
+ it.ozimov
+ embedded-redis
+ 0.7.3
+ test
+
+
+ org.slf4j
+ slf4j-simple
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.7.0
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+ ${project.build.sourceEncoding}
+
+
+
+
+
+
+
+ scm:git:https://github.com/idealo/kafka-ex1.git
+ https://github.com/idealo/kafka-ex1
+
+
+
diff --git a/src/main/java/de/idealo/kafka/idempotency/IdempotencyCheckException.java b/src/main/java/de/idealo/kafka/idempotency/IdempotencyCheckException.java
new file mode 100644
index 0000000..e255d46
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/IdempotencyCheckException.java
@@ -0,0 +1,14 @@
+package de.idealo.kafka.idempotency;
+
+/**
+ * Indicates that an errors has occurred while trying to execute the idempotency check, demanded by {@link IdempotencyId}.
+ */
+public class IdempotencyCheckException extends Exception {
+ public IdempotencyCheckException(final String message) {
+ super(message);
+ }
+
+ public IdempotencyCheckException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/IdempotencyId.java b/src/main/java/de/idealo/kafka/idempotency/IdempotencyId.java
new file mode 100644
index 0000000..45f6527
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/IdempotencyId.java
@@ -0,0 +1,11 @@
+package de.idealo.kafka.idempotency;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.PARAMETER)
+public @interface IdempotencyId {
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/IdempotentListener.java b/src/main/java/de/idealo/kafka/idempotency/IdempotentListener.java
new file mode 100644
index 0000000..67884eb
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/IdempotentListener.java
@@ -0,0 +1,18 @@
+package de.idealo.kafka.idempotency;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface IdempotentListener {
+ /**
+ * Listener-specific TTL for the idempotency information, in seconds.
+ * It overrides the globally-set default TTL.
+ * Defaults to 0, which means the global property is used.
+ * @return TTL for the idempotency information, in seconds
+ */
+ int ttl() default 0;
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/IdempotentListenerAspect.java b/src/main/java/de/idealo/kafka/idempotency/IdempotentListenerAspect.java
new file mode 100644
index 0000000..c8c610f
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/IdempotentListenerAspect.java
@@ -0,0 +1,106 @@
+package de.idealo.kafka.idempotency;
+
+import java.time.Duration;
+
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+import de.idealo.kafka.idempotency.configuration.IdealoKafkaIdempotencyAutoconfiguration;
+import de.idealo.kafka.idempotency.persistence.RecordIdempotencyLookup;
+
+/**
+ * AOP aspect which defines pointcuts and advices for using {@link IdempotentListener} annotations.
+ * In order for this to work properly, this class must be a bean in the spring context and AOP needs to be enabled
+ * (see {@link org.springframework.context.annotation.EnableAspectJAutoProxy}).
+ * Also, an implementation of {@link RecordIdempotencyLookup}) must be in the classpath.
+ */
+@Aspect
+public class IdempotentListenerAspect {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IdempotentListener.class);
+
+ private final IdealoKafkaIdempotencyAutoconfiguration configuration;
+ private final RecordIdempotencyLookup idempotencyLookup;
+ private final RecordIdentityExtractor recordIdentityExtractor;
+
+ public IdempotentListenerAspect(final IdealoKafkaIdempotencyAutoconfiguration configuration,
+ RecordIdempotencyLookup idempotencyLookup, RecordIdentityExtractor recordIdentityExtractor) {
+ this.configuration = configuration;
+ this.idempotencyLookup = idempotencyLookup;
+ this.recordIdentityExtractor = recordIdentityExtractor;
+ }
+
+ /**
+ * Around-advice to intercept methods that are annotated at one or more of their arguments.
+ *
+ * @param joinPoint the invocation's join point
+ * @return the result of the actual method invocation
+ * @throws Throwable any exception thrown during method invocation
+ */
+ @Around(value = "@annotation(de.idealo.kafka.idempotency.IdempotentListener)")
+ public Object check(ProceedingJoinPoint joinPoint) throws Throwable { // NOSONAR
+ if (!configuration.isCheckEnabled()) {
+ LOG.debug("Idempotency check is disabled. Skipped for the record {} at location {}");
+ return joinPoint.proceed();
+ }
+
+ try {
+ final var recordId = recordIdentityExtractor.extract(joinPoint);
+
+ if (idempotencyLookup.isLogged(recordId)) {
+ // skip method invocation
+ LOG.debug("Listener method invocation will be skipped for event due to idempotency check for the record {} at location {}",
+ recordId, joinPoint.getSignature().toString());
+ return null;
+ }
+ } catch (Throwable e) { // NOSONAR
+ LOG.error("Could not look up the idempotency information due to an error", e);
+ if (!configuration.isSuppressErrors()) {
+ throw new IdempotencyCheckException(e);
+ }
+ }
+ // proceed as normal
+ return joinPoint.proceed();
+ }
+
+ @AfterReturning(
+ value = "@annotation(idempotentListener)",
+ argNames = "joinPoint,idempotentListener"
+ )
+ public void persist(JoinPoint joinPoint, IdempotentListener idempotentListener) {
+ if (!configuration.isPersistenceEnabled()) {
+ LOG.debug("Saving idempotency markers is disabled. Skipped for the record {} at location {}");
+ return;
+ }
+
+ try {
+ final var recordId = recordIdentityExtractor.extract(joinPoint);
+ if(!StringUtils.isEmpty(recordId)) {
+ idempotencyLookup.log(recordId, selectTtl(idempotentListener.ttl()));
+ }
+ } catch (Throwable e) { // NOSONAR
+ LOG.error("Could not save idempotency marker for the record", e);
+ return;
+ }
+ }
+
+ /**
+ * Takes TTL given as an annotation parameter. If non-zero, it is converted from seconds to Duration and returned,
+ * otherwise the global default is used.
+ * @param ttl
+ * @return
+ */
+ private Duration selectTtl(final int ttl) {
+ if (ttl <= 0) {
+ return configuration.getTtl();
+ }
+
+ return Duration.ofSeconds(ttl);
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/RecordIdentity.java b/src/main/java/de/idealo/kafka/idempotency/RecordIdentity.java
new file mode 100644
index 0000000..d2258ab
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/RecordIdentity.java
@@ -0,0 +1,38 @@
+package de.idealo.kafka.idempotency;
+
+import java.util.List;
+
+public class RecordIdentity {
+
+ private static final String DELIMITER = "-";
+ private final String id;
+
+ public RecordIdentity(final List recordIdComponents) throws IdempotencyCheckException {
+ this.id = buildId(recordIdComponents);
+ }
+
+ private String buildId(final List recordIdComponents) throws IdempotencyCheckException {
+ if (recordIdComponents.size() == 0) {
+ throw instantiateException();
+ }
+ var joint = String.join(DELIMITER, recordIdComponents);
+ if (joint == null || joint.equals("")) {
+ throw instantiateException();
+ }
+ return joint;
+ }
+
+ private IdempotencyCheckException instantiateException() {
+ return new IdempotencyCheckException(
+ "No idempotency id arguments found. There must be at least one listener argument annotated with @IdempotencyId and the value must not be empty or null");
+ }
+
+ public String toString(final String prefix) {
+ return prefix + this.toString();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/RecordIdentityExtractor.java b/src/main/java/de/idealo/kafka/idempotency/RecordIdentityExtractor.java
new file mode 100644
index 0000000..fd14031
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/RecordIdentityExtractor.java
@@ -0,0 +1,50 @@
+package de.idealo.kafka.idempotency;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+
+/**
+ * Extracts all available listener method arguments annotated with {@link IdempotencyId} and forms RecordIdentity out of them.
+ */
+public class RecordIdentityExtractor {
+
+ private static final Set SUPPORTED_ID_TYPES = new HashSet(Arrays.asList(
+ String.class, Long.class, Integer.class, int.class, long.class
+ ));
+
+ public RecordIdentity extract(final JoinPoint joinPoint) throws IdempotencyCheckException {
+ final var arguments = joinPoint.getArgs();
+ final var parameters = ((MethodSignature) joinPoint.getSignature()).getMethod().getParameters();
+ final List idComponents = new ArrayList<>();
+
+ for (int i = 0; i < parameters.length; i++) {
+ if (isAnnotatedParam(parameters[i])) {
+ if (!SUPPORTED_ID_TYPES.contains(parameters[i].getType())) {
+ throw new IdempotencyCheckException("Idempotency check error: only following arguments are currently supported for the IdempotencyId:"
+ + " String, Long, Integer, int, long.");
+ }
+ idComponents.add(String.valueOf(arguments[i]));
+ }
+ }
+
+ return new RecordIdentity(idComponents);
+ }
+
+ /**
+ * Checks if the parameter from the joint point is annotated with {@link IdempotencyId}.
+ * @param parameter
+ * @return
+ */
+ private Boolean isAnnotatedParam(AnnotatedElement parameter) {
+ var eventIdempotentParam = parameter.getAnnotation(IdempotencyId.class);
+
+ return eventIdempotentParam != null;
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/configuration/IdealoKafkaIdempotencyAutoconfiguration.java b/src/main/java/de/idealo/kafka/idempotency/configuration/IdealoKafkaIdempotencyAutoconfiguration.java
new file mode 100644
index 0000000..ff095a3
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/configuration/IdealoKafkaIdempotencyAutoconfiguration.java
@@ -0,0 +1,100 @@
+package de.idealo.kafka.idempotency.configuration;
+
+import java.time.Duration;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.StringRedisTemplate;
+
+import de.idealo.kafka.idempotency.IdempotentListenerAspect;
+import de.idealo.kafka.idempotency.RecordIdentityExtractor;
+import de.idealo.kafka.idempotency.persistence.RecordIdempotencyLookup;
+import de.idealo.kafka.idempotency.persistence.RedisRecordIdempotencyLookup;
+
+@Configuration
+@AutoConfigureAfter({ KafkaAutoConfiguration.class })
+@EnableConfigurationProperties(KafkaListenerIdempotencyProperties.class)
+@ConditionalOnProperty(
+ prefix = IdealoKafkaIdempotencyAutoconfiguration.PROPERTY_PREFIX,
+ name = "enabled", havingValue = "true",
+ matchIfMissing = true
+)
+public class IdealoKafkaIdempotencyAutoconfiguration {
+
+ public static final String PROPERTY_PREFIX = "idealo.kafka.idempotency";
+
+ @Autowired
+ private KafkaProperties kafkaProperties;
+
+ @Autowired
+ private KafkaListenerIdempotencyProperties idempotencyProperties;
+
+ /**
+ * Combining conditionals the autoconfiguration in future will be able to use different persistence lookup providers.
+ * @param template
+ * @return
+ */
+ @Bean
+ public RecordIdempotencyLookup redisRecordIdempotencyLookup(StringRedisTemplate template) {
+ return new RedisRecordIdempotencyLookup(this, template);
+ }
+
+ @Bean
+ public IdempotentListenerAspect idempotentListenerAspect(RecordIdempotencyLookup recordIdempotencyLookup) {
+ return new IdempotentListenerAspect(this, recordIdempotencyLookup, new RecordIdentityExtractor());
+ }
+
+ /**
+ * Whether the idempotency check on listeners is enabled.
+ * @return
+ */
+ public boolean isCheckEnabled() {
+ return idempotencyProperties.isCheckEnabled();
+ }
+
+ /**
+ * Whether the idempotency markers are persisted after a record has been consumed.
+ * @return
+ */
+ public boolean isPersistenceEnabled() {
+ return idempotencyProperties.isPersistenceEnabled();
+ }
+
+ /**
+ * Gets TTL for persistence markers.
+ * @return
+ */
+ public Duration getTtl() {
+ return idempotencyProperties.getTtl();
+ }
+
+ /**
+ * Gets the prefix used for the Redis keys that hold the idempotency information.
+ * @return
+ */
+ public String getKeyPrefix() {
+ return idempotencyProperties.getKeyPrefix();
+ }
+
+ /**
+ * Gets the consumer group id (set via the standard Spring Kafka properties).
+ * @return
+ */
+ public String getConsumerGroupId() {
+ return kafkaProperties.getConsumer().getGroupId();
+ }
+
+ /**
+ * Whether to suppress any exceptions or to bubble them up.
+ * @return
+ */
+ public boolean isSuppressErrors() {
+ return idempotencyProperties.isSuppressErrors();
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/configuration/KafkaListenerIdempotencyProperties.java b/src/main/java/de/idealo/kafka/idempotency/configuration/KafkaListenerIdempotencyProperties.java
new file mode 100644
index 0000000..227c6e7
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/configuration/KafkaListenerIdempotencyProperties.java
@@ -0,0 +1,99 @@
+package de.idealo.kafka.idempotency.configuration;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import de.idealo.kafka.idempotency.IdempotentListener;
+
+@ConfigurationProperties(prefix = IdealoKafkaIdempotencyAutoconfiguration.PROPERTY_PREFIX + ".listener")
+public class KafkaListenerIdempotencyProperties {
+
+ /**
+ * Using this property you can completely ignore the idempotency check even if the annotation {@link IdempotentListener} is set.
+ * This might be useful e.g. if you want to temporarily force the listeners re-consume all the events ignoring saved idempotency markers.
+ * Note that the idempotency markers keep to be persisted independently of this setting.
+ */
+ private boolean checkEnabled = true;
+
+ /**
+ * Using this property you can completely disable saving the idempotency markers.
+ * This might be useful is something in your setup is broken, but you do not want to block record consumption.
+ * Note that there'll be no way to provide the only-one semantic for the records that were consumed during this setting was set to false.
+ */
+ private boolean persistenceEnabled = true;
+
+ /**
+ * Duration of the guaranteed idempotency per record.
+ * After expiring, the information about a consumed record is removed from persistence.
+ * If all Kafka topics in your domain have the same retention time, this property should be set equal to it.
+ * This can be overridden per listener, directly in @IdempotentListener.
+ */
+ private Duration ttl = Duration.of(7, ChronoUnit.DAYS);
+
+ /**
+ * Prefix used for all the keys persisted in Redis that contain the idempotency information.
+ * Whilst in Redis it is only possible to use the TTL feature only on the keys, we cannot use Sets in order to at least namespace the data
+ * handled by this library. The workaround for this is to use simple "1-character" strings in the root namespace, where the information
+ * for the lookup is hold by the keys. This is how a typical key with the default prefix looks like:
+ * kafkaidmp_myconsumerid_1c9bb6f0-5b91-4be7-acad-6bf089ed0bef
+ * If the traffic in the topic you use this library is really of a high scale, you should monitor the memory footprint of the idempotency
+ * data in Redis. This property give you extra means for optimisation.
+ */
+ private String keyPrefix = "";
+
+ /**
+ * If true, any exceptions during the lookup or persistence are logged, but not bubbled up to the listener container.
+ * A typical case when this matter is e.g. short outages of the Redis cluster. If this happens, the idempotency data cannot
+ * be persisted, hence cannot be later looked up and therefore the idempotency is simply not maintained.
+ * Such behavior is inconsistent and should be avoided. However in practice the error handling and acknowledgment logic
+ * are sometimes not properly configured, which in case of such an outage leads to skipped records, which is normally worse, than
+ * inability to maintain the exactly-one semantic. If this is your situation, you can set this option to true.
+ * Note that this only changes the behavior of the look up hook, as it already makes no sense to throw an exception after the record handler
+ * has correctly finished its work: a retry would make it process the same record again, whereas it indeed relies on the idempotency check
+ * to maintain the exactly-one semantics.
+ */
+ private boolean suppressErrors = false;
+
+ public Duration getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(final Duration ttl) {
+ this.ttl = ttl;
+ }
+
+ public boolean isCheckEnabled() {
+ return checkEnabled;
+ }
+
+ public void setCheckEnabled(final boolean checkEnabled) {
+ this.checkEnabled = checkEnabled;
+ }
+
+ public boolean isPersistenceEnabled() {
+ return persistenceEnabled;
+ }
+
+ public void setPersistenceEnabled(final boolean persistenceEnabled) {
+ this.persistenceEnabled = persistenceEnabled;
+ }
+
+ public String getKeyPrefix() {
+ return keyPrefix;
+ }
+
+ public void setKeyPrefix(final String keyPrefix) {
+ this.keyPrefix = keyPrefix;
+ }
+
+
+ public boolean isSuppressErrors() {
+ return suppressErrors;
+ }
+
+ public void setSuppressErrors(final boolean suppressErrors) {
+ this.suppressErrors = suppressErrors;
+ }
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/persistence/RecordIdempotencyLookup.java b/src/main/java/de/idealo/kafka/idempotency/persistence/RecordIdempotencyLookup.java
new file mode 100644
index 0000000..7ceea21
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/persistence/RecordIdempotencyLookup.java
@@ -0,0 +1,12 @@
+package de.idealo.kafka.idempotency.persistence;
+
+import java.time.Duration;
+
+import de.idealo.kafka.idempotency.RecordIdentity;
+
+public interface RecordIdempotencyLookup {
+
+ boolean isLogged(RecordIdentity id);
+
+ void log(final RecordIdentity id, final Duration ttl);
+}
diff --git a/src/main/java/de/idealo/kafka/idempotency/persistence/RedisRecordIdempotencyLookup.java b/src/main/java/de/idealo/kafka/idempotency/persistence/RedisRecordIdempotencyLookup.java
new file mode 100644
index 0000000..3d49745
--- /dev/null
+++ b/src/main/java/de/idealo/kafka/idempotency/persistence/RedisRecordIdempotencyLookup.java
@@ -0,0 +1,39 @@
+package de.idealo.kafka.idempotency.persistence;
+
+import java.time.Duration;
+
+import org.springframework.data.redis.core.StringRedisTemplate;
+
+import de.idealo.kafka.idempotency.RecordIdentity;
+import de.idealo.kafka.idempotency.configuration.IdealoKafkaIdempotencyAutoconfiguration;
+
+public class RedisRecordIdempotencyLookup implements RecordIdempotencyLookup {
+
+ /**
+ * Actually this can be everything.
+ */
+ private static final String LOOKUP_VALUE = "1";
+
+ private static final String KEY_DELIMITER = "_";
+
+ private final StringRedisTemplate redisTemplate;
+
+ private final String prefix;
+
+ public RedisRecordIdempotencyLookup(final IdealoKafkaIdempotencyAutoconfiguration configuration,
+ final StringRedisTemplate template) {
+ this.redisTemplate = template;
+ this.prefix = configuration.getKeyPrefix() + KEY_DELIMITER + configuration.getConsumerGroupId() + KEY_DELIMITER;
+ }
+
+ @Override
+ public boolean isLogged(RecordIdentity id) {
+ return redisTemplate.hasKey(id.toString(prefix));
+ }
+
+ @Override
+ public void log(final RecordIdentity id, final Duration ttl) {
+ redisTemplate.opsForValue().set(id.toString(prefix), LOOKUP_VALUE, ttl);
+ }
+
+}
diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..01bc99c
--- /dev/null
+++ b/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ de.idealo.kafka.idempotency.configuration.IdealoKafkaIdempotencyAutoconfiguration
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/main/resources/application.properties
@@ -0,0 +1 @@
+
diff --git a/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerAspectTest.java b/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerAspectTest.java
new file mode 100644
index 0000000..cece95f
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerAspectTest.java
@@ -0,0 +1,126 @@
+package de.idealo.kafka.idempotency;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import de.idealo.kafka.idempotency.configuration.IdealoKafkaIdempotencyAutoconfiguration;
+import de.idealo.kafka.idempotency.persistence.RecordIdempotencyLookup;
+
+public class IdempotentListenerAspectTest {
+
+ private final RecordIdempotencyLookup lookup = mock(RecordIdempotencyLookup.class);
+ private final RecordIdentityExtractor idExtractor = mock(RecordIdentityExtractor.class);
+ private final MethodSignature methodSignature = mock(MethodSignature.class);
+ private final ProceedingJoinPoint proceedingJoinPoint = mock(ProceedingJoinPoint.class);
+ private final IdealoKafkaIdempotencyAutoconfiguration configuration = mock(IdealoKafkaIdempotencyAutoconfiguration.class);
+ private final IdempotentListenerAspect aspect = new IdempotentListenerAspect(configuration, lookup, idExtractor);
+
+ private RecordIdentity recordId;
+ private IdempotentListener annotation = mock(IdempotentListener.class);
+
+ @BeforeEach
+ public void setUp() throws Throwable {
+ this.recordId = new RecordIdentity(Arrays.asList(UUID.randomUUID().toString()));
+
+ when(idExtractor.extract(proceedingJoinPoint)).thenReturn(recordId);
+ when(proceedingJoinPoint.proceed()).thenReturn(proceedingJoinPoint);
+ when(proceedingJoinPoint.getSignature()).thenReturn(methodSignature);
+ }
+
+ @Test
+ public void checks_record_feature_disabled() throws Throwable {
+ when(configuration.isCheckEnabled()).thenReturn(false);
+
+ assertThat(aspect.check(proceedingJoinPoint)).isEqualTo(proceedingJoinPoint);
+ verify(lookup, never()).isLogged(any());
+ }
+
+ @Test
+ public void checks_record_logged() throws Throwable {
+ when(configuration.isCheckEnabled()).thenReturn(true);
+ when(lookup.isLogged(any())).thenReturn(true);
+
+ assertThat(aspect.check(proceedingJoinPoint)).isNull();
+ }
+
+ @Test
+ public void checks_record_not_logged() throws Throwable {
+ when(configuration.isCheckEnabled()).thenReturn(true);
+ when(lookup.isLogged(any())).thenReturn(false);
+
+ assertThat(aspect.check(proceedingJoinPoint)).isNotNull();
+ }
+
+ @Test
+ public void bubbles_errors_on_check() throws Throwable {
+ when(configuration.isCheckEnabled()).thenReturn(true);
+ when(configuration.isSuppressErrors()).thenReturn(false);
+ when(idExtractor.extract(proceedingJoinPoint)).thenThrow(new IdempotencyCheckException("test"));
+
+ assertThrows(IdempotencyCheckException.class, () -> aspect.check(proceedingJoinPoint));
+ }
+
+ @Test
+ public void suppresses_errors_on_property() throws Throwable {
+ when(configuration.isCheckEnabled()).thenReturn(true);
+ when(configuration.isSuppressErrors()).thenReturn(true);
+ when(idExtractor.extract(proceedingJoinPoint)).thenThrow(new IdempotencyCheckException("test"));
+
+ assertThat(aspect.check(proceedingJoinPoint)).isNotNull();
+ }
+
+ @Test
+ public void logs() {
+ when(configuration.isPersistenceEnabled()).thenReturn(true);
+ when(configuration.getTtl()).thenReturn(Duration.ofSeconds(2));
+ when(annotation.ttl()).thenReturn(0);
+
+ aspect.persist(proceedingJoinPoint, annotation);
+
+ verify(lookup).log(recordId, Duration.ofSeconds(2));
+ }
+
+ @Test
+ public void logs_with_custom_ttl() {
+ when(configuration.isPersistenceEnabled()).thenReturn(true);
+ when(configuration.getTtl()).thenReturn(Duration.ofSeconds(86400));
+ when(annotation.ttl()).thenReturn(10);
+
+ aspect.persist(proceedingJoinPoint, annotation);
+
+ verify(lookup).log(recordId, Duration.ofSeconds(10));
+ }
+
+ @Test
+ public void logs_disabled() {
+ when(configuration.isPersistenceEnabled()).thenReturn(false);
+
+ aspect.persist(proceedingJoinPoint, annotation);
+
+ verify(lookup, never()).log(any(), any());
+ }
+
+ @Test
+ public void logs_no_param_extracted() throws Throwable {
+ when(configuration.isPersistenceEnabled()).thenReturn(true);
+ when(idExtractor.extract(proceedingJoinPoint)).thenThrow(new IdempotencyCheckException("test"));
+
+ aspect.persist(proceedingJoinPoint, annotation);
+
+ verify(lookup, never()).log(any(), any());
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerTest.java b/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerTest.java
new file mode 100644
index 0000000..9d78318
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/IdempotentListenerTest.java
@@ -0,0 +1,83 @@
+package de.idealo.kafka.idempotency;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import redis.embedded.RedisServer;
+
+import de.idealo.kafka.idempotency.configuration.RedisProperties;
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.consumer.group-id: test-group",
+ "spring.kafka.consumer.auto-offset-reset: earliest",
+ "spring.kafka.bootstrap-servers: ${spring.embedded.kafka.brokers}",
+ "spring.kafka.listener.poll-timeout: 300ms"
+ }
+)
+@EmbeddedKafka(partitions = 1, topics = {TestPlainKafkaConsumer.TOPIC, TestIdempotentKafkaConsumer.TOPIC})
+public class IdempotentListenerTest {
+
+ @Autowired
+ private TestRecordHandler handler;
+
+ @Autowired
+ private TestPlainKafkaConsumer consumerPlain;
+
+ @Autowired
+ private TestIdempotentKafkaConsumer consumerIdempotent;
+
+ @Autowired
+ private TestKafkaProducer producer;
+
+ @Autowired
+ private RedisProperties redisProperties;
+
+ private RedisServer redisServer;
+
+ @BeforeEach
+ void setUp() {
+ this.redisServer = new RedisServer(redisProperties.getRedisPort());
+ redisServer.start();
+ }
+
+ @AfterEach
+ void tearDown() {
+ redisServer.stop();
+ }
+
+ @Test
+ void test() throws InterruptedException {
+ var payload1 = "message1";
+ var payload2 = "message2";
+
+ producer.sendWithHeaderId(TestPlainKafkaConsumer.TOPIC, payload1);
+ producer.sendWithHeaderId(TestPlainKafkaConsumer.TOPIC, payload2);
+ producer.sendWithHeaderId(TestIdempotentKafkaConsumer.TOPIC, payload1);
+ producer.sendWithHeaderId(TestIdempotentKafkaConsumer.TOPIC, payload2);
+
+ Thread.sleep(1000);
+ resetOffsets();
+ Thread.sleep(1000);
+ resetOffsets();
+ Thread.sleep(1000);
+
+ assertEquals(3, handler.getNumOfProcessed(TestPlainKafkaConsumer.TOPIC, payload1));
+ assertEquals(3, handler.getNumOfProcessed(TestPlainKafkaConsumer.TOPIC, payload2));
+
+ assertEquals(1, handler.getNumOfProcessed(TestIdempotentKafkaConsumer.TOPIC, payload1));
+ assertEquals(1, handler.getNumOfProcessed(TestIdempotentKafkaConsumer.TOPIC, payload2));
+ }
+
+ private void resetOffsets() {
+ consumerPlain.getConsumerSeekCallback().seek(TestPlainKafkaConsumer.TOPIC, 0, 0);
+ consumerIdempotent.getConsumerSeekCallback().seek(TestIdempotentKafkaConsumer.TOPIC, 0, 0);
+ }
+
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/TestApplication.java b/src/test/java/de/idealo/kafka/idempotency/TestApplication.java
new file mode 100644
index 0000000..25e585c
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/TestApplication.java
@@ -0,0 +1,13 @@
+package de.idealo.kafka.idempotency;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.EnableAspectJAutoProxy;
+
+@SpringBootApplication
+@EnableAspectJAutoProxy
+public class TestApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(TestApplication.class, args);
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/TestIdempotentKafkaConsumer.java b/src/test/java/de/idealo/kafka/idempotency/TestIdempotentKafkaConsumer.java
new file mode 100644
index 0000000..0d4a30d
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/TestIdempotentKafkaConsumer.java
@@ -0,0 +1,43 @@
+package de.idealo.kafka.idempotency;
+
+import static de.idealo.kafka.idempotency.TestKafkaProducer.HEADER_RECORD_ID;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TestIdempotentKafkaConsumer implements ConsumerSeekAware {
+
+ public static final String TOPIC = "test-topic-idemp";
+
+ private final TestRecordHandler handler;
+ private ConsumerSeekCallback consumerSeekCallback;
+
+ public TestIdempotentKafkaConsumer(final TestRecordHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void registerSeekCallback(ConsumerSeekCallback callback) {
+ this.consumerSeekCallback = callback;
+ }
+
+ public ConsumerSeekCallback getConsumerSeekCallback() {
+ return consumerSeekCallback;
+ }
+
+ @KafkaListener(topics = TOPIC)
+ @IdempotentListener(ttl = 60)
+ public void receive(ConsumerRecord, ?> consumerRecord,
+ @IdempotencyId @Header(HEADER_RECORD_ID) String recordId,
+ @IdempotencyId @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @IdempotencyId @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
+ @IdempotencyId @Header(KafkaHeaders.OFFSET) Long offset
+ ) {
+ handler.handle(consumerRecord.topic(), consumerRecord);
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/TestKafkaProducer.java b/src/test/java/de/idealo/kafka/idempotency/TestKafkaProducer.java
new file mode 100644
index 0000000..a444aed
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/TestKafkaProducer.java
@@ -0,0 +1,26 @@
+package de.idealo.kafka.idempotency;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TestKafkaProducer {
+
+ public static final String HEADER_RECORD_ID = "record-id";
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ /**
+ * Sends a record with an header HEADER_RECORD_ID containing a unique identifier generated from the payload.
+ * @param topic
+ * @param payload
+ */
+ public void sendWithHeaderId(String topic, String payload) {
+ var record = new ProducerRecord(topic, payload);
+ record.headers().add(HEADER_RECORD_ID, (topic + payload).getBytes());
+ kafkaTemplate.send(record);
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/TestPlainKafkaConsumer.java b/src/test/java/de/idealo/kafka/idempotency/TestPlainKafkaConsumer.java
new file mode 100644
index 0000000..6ccd24e
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/TestPlainKafkaConsumer.java
@@ -0,0 +1,34 @@
+package de.idealo.kafka.idempotency;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TestPlainKafkaConsumer implements ConsumerSeekAware {
+
+ public static final String TOPIC = "test-topic-plain";
+
+ private final TestRecordHandler handler;
+ private ConsumerSeekCallback consumerSeekCallback;
+
+ public TestPlainKafkaConsumer(final TestRecordHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void registerSeekCallback(ConsumerSeekCallback callback) {
+ this.consumerSeekCallback = callback;
+ }
+
+ public ConsumerSeekCallback getConsumerSeekCallback() {
+ return consumerSeekCallback;
+ }
+
+ @KafkaListener(topics = TOPIC)
+ public void receivePlain(ConsumerRecord, ?> consumerRecord) {
+ handler.handle(consumerRecord.topic(), consumerRecord);
+ }
+
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/TestRecordHandler.java b/src/test/java/de/idealo/kafka/idempotency/TestRecordHandler.java
new file mode 100644
index 0000000..ac2de24
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/TestRecordHandler.java
@@ -0,0 +1,33 @@
+package de.idealo.kafka.idempotency;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TestRecordHandler {
+
+ private final Map> processingStats = new HashMap<>();
+
+ public TestRecordHandler() {
+ this.processingStats.put(TestPlainKafkaConsumer.TOPIC, new HashMap<>());
+ this.processingStats.put(TestIdempotentKafkaConsumer.TOPIC, new HashMap<>());
+ }
+
+ public void handle(final String topic, final ConsumerRecord record) {
+ if (!processingStats.containsKey(topic)) {
+ throw new RuntimeException("Processed a message from an unexpected topic " + topic);
+ }
+
+ final var counter = processingStats.get(topic);
+ var payload = record.value().toString();
+ counter.put(payload, (counter.containsKey(payload) ? counter.get(payload) : 0) + 1);
+ }
+
+ public int getNumOfProcessed(final String topic, final String payload) {
+ final var counter = processingStats.get(topic);
+ return counter.containsKey(payload) ? counter.get(payload) : 0;
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/configuration/RedisConfiguration.java b/src/test/java/de/idealo/kafka/idempotency/configuration/RedisConfiguration.java
new file mode 100644
index 0000000..fd76d29
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/configuration/RedisConfiguration.java
@@ -0,0 +1,26 @@
+package de.idealo.kafka.idempotency.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
+
+@Configuration
+@EnableRedisRepositories
+public class RedisConfiguration {
+
+ @Bean
+ public LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties) {
+ return new LettuceConnectionFactory(
+ redisProperties.getRedisHost(),
+ redisProperties.getRedisPort());
+ }
+
+ @Bean
+ public RedisTemplate, ?> redisTemplate(LettuceConnectionFactory connectionFactory) {
+ RedisTemplate template = new RedisTemplate<>();
+ template.setConnectionFactory(connectionFactory);
+ return template;
+ }
+}
diff --git a/src/test/java/de/idealo/kafka/idempotency/configuration/RedisProperties.java b/src/test/java/de/idealo/kafka/idempotency/configuration/RedisProperties.java
new file mode 100644
index 0000000..cf49c16
--- /dev/null
+++ b/src/test/java/de/idealo/kafka/idempotency/configuration/RedisProperties.java
@@ -0,0 +1,26 @@
+package de.idealo.kafka.idempotency.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RedisProperties {
+
+ private int redisPort;
+ private String redisHost;
+
+ public RedisProperties(
+ @Value("${spring.redis.port}") int redisPort,
+ @Value("${spring.redis.host}") String redisHost) {
+ this.redisPort = redisPort;
+ this.redisHost = redisHost;
+ }
+
+ public int getRedisPort() {
+ return redisPort;
+ }
+
+ public String getRedisHost() {
+ return redisHost;
+ }
+}
diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties
new file mode 100644
index 0000000..e178a38
--- /dev/null
+++ b/src/test/resources/application.properties
@@ -0,0 +1,2 @@
+spring.redis.host=localhost
+spring.redis.port=6370