Skip to content

Add micronaut integration #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ hs_err_pid*
!.yarn/sdks
!.yarn/versions

!gradle-wrapper.jar
!gradle-wrapper.jar

.micronaut/
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ lombok.config
.husky

org.springframework.boot.autoconfigure.AutoConfiguration.imports
io.micronaut.inject.annotation.AnnotationTransformer

doc/resources/**/*.xml
90 changes: 86 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,81 @@ See the [Core Kotlin Example](examples/core-kotlin-example) for a full example r

See the [Ktor Core Example](examples/ktor-example) for a full example running a Ktor framework that listens to a local ElasticMQ SQS Server.

### Micronaut Quick Guide

1. Include the Micronaut core dependency with Maven `<dependencies>`:

```xml
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-micronaut-core</artifactId>
<version>${sqs.listener.version}</version>
</dependency>
```

Or with Gradle:

```kotlin
dependencies {
implementation("com.jashmore:java-dynamic-sqs-listener-micronaut-core:${sqs.listener.version}")
}
```

1. Also, include the Micronaut annotation processor with Maven:

```xml
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
<configuration>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-micronaut-inject-java</artifactId>
<version>${sqs.listener.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</pluginManagement>
```

Or with Gradle:

```kotlin
dependencies {
annotationProcessor("com.jashmore:java-dynamic-sqs-listener-micronaut-inject-java:${sqs.listener.version}")
}
```

Micronaut will use this at compile time to transform usages of core listener annotations to enable
method processors to register annotated methods as message listeners.

1. In one of your beans, attach a
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java) or other supported annotation
to a method indicating that it should process messages from a queue.

```java
@Singleton
public class MyMessageListener {

// The queue here can point to your SQS server, e.g. a
// local SQS server or one on AWS
@QueueListener("${insert.queue.url.here}")
public void processMessage(@Payload final String payload) {
// process the message payload here
}
}

```

This will use any configured `SqsAsyncClient` in the application context for connecting to the queue, otherwise a default
will be provided that will look for AWS credentials/region from multiple areas, like the environment variables.

## Core Infrastructure

This library has been divided into isolated components each with distinct responsibilities. The following is a diagram describing a simple flow of a
Expand Down Expand Up @@ -261,14 +336,16 @@ for compatibility.
- [AWS SQS SDK](https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/welcome.html)
- [Jackson Databind](https://github.com/FasterXML/jackson-databind)
- [SLF4J API](https://github.com/qos-ch/slf4j)

The following require all the core dependencies above.

- [Spring Framework](./spring)
- All the core dependencies above
- [Spring Boot](https://github.com/spring-projects/spring-boot)
- [Ktor Framework](./ktor)

- All the core dependencies above
- [Kotlin](https://github.com/JetBrains/kotlin)
- [Ktor](https://github.com/ktorio/ktor)
- [Micronaut Framework](./micronaut)
- [Micronaut](https://github.com/micronaut-projects/micronaut-core)

See the [gradle.properties](gradle.properties) for the specific versions of these dependencies.

Expand Down Expand Up @@ -363,6 +440,11 @@ public class MyMessageListener {

```

### Micronaut

The `micronaut-core` library is applied pretty much the same way as `spring-starter`,
so for Micronaut it will be useful to look through the Spring guides and examples.

### Spring - Adding a custom argument resolver

There are some core [ArgumentResolvers](./api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java) provided in the
Expand Down Expand Up @@ -576,7 +658,7 @@ finish in 10 milliseconds but one takes 10 seconds no other messages will be pic
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
ready for another message.

#### Core/Spring Boot
#### Core/Spring Boot/Micronaut

```java
public class MyMessageListener {
Expand Down
5 changes: 4 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ subprojects {
// these classes are better handled by integration tests
"com.jashmore.sqs.container.fifo*" ,
"com.jashmore.sqs.container.batching*",
"com.jashmore.sqs.container.prefetching*"
"com.jashmore.sqs.container.prefetching*",
"com.jashmore.sqs.micronaut.configuration*",
"com.jashmore.sqs.micronaut.container*",
"com.jashmore.sqs.micronaut.placeholder*"
)
element = "PACKAGE"
limit {
Expand Down
10 changes: 10 additions & 0 deletions examples/micronaut-integration-test-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Micronaut Integration Test Example

There are many examples in this codebase to run integration tests for this application (look in the src/test/java/integrationTest folder of
the module) but this shows a minimal use case to copy from.

## Usage

```bash
gradle integrationTest
```
22 changes: 22 additions & 0 deletions examples/micronaut-integration-test-example/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

description = "Contains an example for writing an integration test for the SQS Listener"

plugins {
id("io.micronaut.minimal.application") version "4.4.2"
}

val micronautVersion: String by project

dependencies {
implementation("io.micronaut:micronaut-http-server-netty")
runtimeOnly("ch.qos.logback:logback-classic")
runtimeOnly("org.yaml:snakeyaml")
implementation(project(":java-dynamic-sqs-listener-micronaut-core"))
annotationProcessor(project(":java-dynamic-sqs-listener-micronaut-inject-java"))

testImplementation(project(":elasticmq-sqs-client"))
testImplementation(project(":expected-test-exception"))
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-core")
testImplementation("org.mockito:mockito-junit-jupiter")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.jashmore.sqs.examples.integrationtests;

import com.jashmore.sqs.annotations.core.basic.QueueListener;
import com.jashmore.sqs.argument.payload.Payload;
import io.micronaut.runtime.Micronaut;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;

public class TestApplication {

public static void main(String[] args) {
Micronaut.build(args).mainClass(TestApplication.class).start();
}

@Singleton
public static class SomeService {

/**
* Process a message payload, no-op.
*
* @param payload the payload of the message
*/
public void run(final String payload) {
// do nothing
}
}

@Singleton
@AllArgsConstructor
public static class MessageListener {

private final SomeService someService;

/**
* We specifically override the visibility timeout here from the default of 30 to decrease the time
* for the tests to run.
*
* @param message the payload of the message
*/

@QueueListener(value = "${sqs.queues.integrationTestingQueue}", messageVisibilityTimeoutInSeconds = 2)
public void messageListener(@Payload final String message) {
someService.run(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
java-dynamic-sqs-listener-micronaut:
auto-start-containers-enabled: true # default
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<!-- We reduce the log messages from ElasticMQ, Akka and Netty to reduce the amount of unnecessary log messages being published -->
<logger name="org.elasticmq" level="OFF" />
<logger name="akka" level="OFF" />
<logger name="io.netty" level="ERROR" />

<logger name="com.jashmore" level="DEBUG" />

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.jashmore.sqs.examples.integrationtests;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;

import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient;
import com.jashmore.sqs.util.ExpectedTestException;
import com.jashmore.sqs.util.LocalSqsAsyncClient;
import com.jashmore.sqs.util.SqsQueuesConfig;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Property;
import io.micronaut.test.annotation.MockBean;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Property(name = "sqs.queues.integrationTestingQueue", value = SqsListenerExampleIntegrationTest.QUEUE_NAME)
class SqsListenerExampleIntegrationTest {

static final String QUEUE_NAME = "SqsListenerExampleIntegrationTest";
private static final int QUEUE_MAX_RECEIVE_COUNT = 3;
private static final int VISIBILITY_TIMEOUT_IN_SECONDS = 2;

@Inject
private LocalSqsAsyncClient localSqsAsyncClient;

@MockBean(TestApplication.SomeService.class)
TestApplication.SomeService someService() {
return mock(TestApplication.SomeService.class);
}

@Inject
private TestApplication.SomeService mockSomeService;

@Factory
public static class TestConfig {

@Singleton
public LocalSqsAsyncClient localSqsAsyncClient() {
return new ElasticMqSqsAsyncClient(
Collections.singletonList(
SqsQueuesConfig.QueueConfig
.builder()
.queueName(QUEUE_NAME)
.maxReceiveCount(QUEUE_MAX_RECEIVE_COUNT)
.visibilityTimeout(VISIBILITY_TIMEOUT_IN_SECONDS)
.build()
)
);
}
}

@AfterEach
void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
localSqsAsyncClient.purgeAllQueues().get(5, TimeUnit.SECONDS);
}

@Test
void messagesPlacedOntoQueueArePickedUpMessageListener() throws Exception {
// arrange
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
messageReceivedCountDownLatch.countDown();
return null;
})
.when(mockSomeService)
.run(anyString());

// act
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);

// assert
verify(mockSomeService).run("my message");
}

@Test
void messageFailingToProcessWillBeProcessedAgain() throws Exception {
// arrange
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(1);
final AtomicBoolean processedMessageOnce = new AtomicBoolean();
doAnswer(invocationOnMock -> {
if (!processedMessageOnce.getAndSet(true)) {
throw new ExpectedTestException();
}
messageReceivedCountDownLatch.countDown();
return null;
})
.when(mockSomeService)
.run(anyString());

// act
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
messageReceivedCountDownLatch.await(3 * VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);

// assert
verify(mockSomeService, times(2)).run("my message");
}

@Test
void messageThatContinuesToFailWillBePlacedIntoDlq() throws Exception {
// arrange
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(QUEUE_MAX_RECEIVE_COUNT);
doAnswer(invocationOnMock -> {
messageReceivedCountDownLatch.countDown();
throw new ExpectedTestException();
})
.when(mockSomeService)
.run(anyString());

// act
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS * (QUEUE_MAX_RECEIVE_COUNT + 1), TimeUnit.SECONDS);
waitForMessageVisibilityToExpire();

// assert
final int numberOfMessages = localSqsAsyncClient.getApproximateMessages(QUEUE_NAME + "-dlq").get();
assertThat(numberOfMessages).isEqualTo(1);
}

private void waitForMessageVisibilityToExpire() throws InterruptedException {
Thread.sleep(Duration.ofSeconds(VISIBILITY_TIMEOUT_IN_SECONDS + 1).toMillis());
}
}
Loading
Loading