Skip to content

Commit

Permalink
Add transactional outbox example tests
Browse files Browse the repository at this point in the history
And require to specify the tables to listen to explicitly.
  • Loading branch information
rieske committed Nov 1, 2023
1 parent c442d9c commit 4284ce2
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,7 +35,14 @@ public class PostgresReplicationListener {
private final ReplicationStreamConsumer replicationStreamConsumer;


public PostgresReplicationListener(String jdbcUrl, String databaseUser, String databasePassword, String replicationSlotName, Consumer<ByteBuffer> consumer) {
public PostgresReplicationListener(
String jdbcUrl,
String databaseUser,
String databasePassword,
String replicationSlotName,
Set<String> tablesToListenTo,
Consumer<ByteBuffer> consumer
) {
this.jdbcUrl = jdbcUrl;
this.replicationSlotName = replicationSlotName;

Expand All @@ -51,7 +59,7 @@ public PostgresReplicationListener(String jdbcUrl, String databaseUser, String d
return thread;
});

this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, consumer);
this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, tablesToListenTo, consumer);
Runtime.getRuntime().addShutdownHook(new Thread(replicationStreamConsumer::stop));

replicationStreamExecutor.submit(replicationStreamConsumer);
Expand Down Expand Up @@ -119,13 +127,20 @@ class ReplicationStreamConsumer implements Runnable {

private final Supplier<PgConnection> connectionSupplier;
private final String replicationSlotName;
private final Set<String> tablesToListenTo;
private final Consumer<ByteBuffer> consumer;

private volatile boolean running = false;

ReplicationStreamConsumer(Supplier<PgConnection> connectionSupplier, String replicationSlotName, Consumer<ByteBuffer> consumer) {
ReplicationStreamConsumer(
Supplier<PgConnection> connectionSupplier,
String replicationSlotName,
Set<String> tablesToListenTo,
Consumer<ByteBuffer> consumer
) {
this.connectionSupplier = connectionSupplier;
this.replicationSlotName = replicationSlotName;
this.tablesToListenTo = tablesToListenTo;
this.consumer = consumer;
}

Expand Down Expand Up @@ -196,6 +211,7 @@ private PGReplicationStream getStream(PGConnection connection) throws SQLExcepti
.withSlotOption("format-version", 2)
.withSlotOption("include-transaction", false)
.withSlotOption("include-timestamp", true)
.withSlotOption("add-tables", String.join(",", tablesToListenTo))
.withStatusInterval(10, TimeUnit.SECONDS)
.start();
}
Expand Down
39 changes: 11 additions & 28 deletions lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -27,30 +24,16 @@ class ChangeDataCaptureTest {
final DatabaseExtension database = new DatabaseExtension();

private final String replicationSlotName = "cdc_stream";
private final Consumer<ByteBuffer> printingConsumer = msg -> {
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
};

private static class GatheringConsumer<T> implements Consumer<T> {
final List<T> consumedMessages = new CopyOnWriteArrayList<>();

@Override
public void accept(T message) {
consumedMessages.add(message);
}
}

private final GatheringConsumer<DatabaseChange> gatheringConsumer = new GatheringConsumer<>();
private final GatheringConsumer<DatabaseChange> gatheringConsumer = TestConsumers.gathering();

private final PostgresReplicationListener listener = new PostgresReplicationListener(
database.jdbcUrl(),
database.databaseUsername(),
database.databasePassword(),
replicationSlotName,
printingConsumer.andThen(new JsonDeserializingConsumer(gatheringConsumer))
Set.of("public.test_table"),
TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer))
);

@BeforeEach
Expand Down Expand Up @@ -173,9 +156,9 @@ private void insertIntoTestTable(
Instant updatedAt
) {
try (PreparedStatement statement = connection.prepareStatement(
"INSERT INTO test_table" +
"(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
"INSERT INTO test_table" +
"(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
) {
statement.setObject(1, id);
statement.setInt(2, integerField);
Expand Down Expand Up @@ -203,10 +186,10 @@ private void updateTestTable(
Instant updatedAt
) {
try (PreparedStatement statement = connection.prepareStatement(
"UPDATE test_table SET " +
"integer_field=?, text_field=?, varchar_field=?, char_field=?, " +
"decimal_field=?, bool_field=?, updated_at=? " +
"WHERE id=?")
"UPDATE test_table SET " +
"integer_field=?, text_field=?, varchar_field=?, char_field=?, " +
"decimal_field=?, bool_field=?, updated_at=? " +
"WHERE id=?")
) {
statement.setInt(1, integerField);
statement.setString(2, textField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DatabaseExtension implements BeforeEachCallback, AfterEachCallback
private static final int DB_PORT = 5432;
private static final String DEFAULT_DATABASE = "postgres";

private static String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION");
private static final String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION");

private static final GenericContainer<?> DB_CONTAINER = new GenericContainer<>(
new ImageFromDockerfile("postgres-cdc-test")
Expand Down
33 changes: 33 additions & 0 deletions lib/src/test/java/io/github/rieske/cdc/TestConsumers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.github.rieske.cdc;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

final class TestConsumers {
private TestConsumers() {
}

static Consumer<ByteBuffer> printing() {
return msg -> {
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
};
}

static <T> GatheringConsumer<T> gathering() {
return new GatheringConsumer<>();
}
}

class GatheringConsumer<T> implements Consumer<T> {
final List<T> consumedMessages = new CopyOnWriteArrayList<>();

@Override
public void accept(T message) {
consumedMessages.add(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.github.rieske.cdc;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class TransactionalOutboxTest {

@RegisterExtension
final DatabaseExtension database = new DatabaseExtension();

private final String replicationSlotName = "cdc_stream";
private final String testEntityOutboxTable = "test_entity_outbox";
private final String anotherOutboxTable = "another_outbox";

private final GatheringConsumer<DatabaseChange> gatheringConsumer = TestConsumers.gathering();

private final PostgresReplicationListener listener = new PostgresReplicationListener(
database.jdbcUrl(),
database.databaseUsername(),
database.databasePassword(),
replicationSlotName,
Set.of("public." + testEntityOutboxTable),
TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer))
);

@BeforeEach
void setup() {
listener.createReplicationSlot();
listener.start();
}

@AfterEach
void tearDown() {
listener.stop();
listener.dropReplicationSlot();
}

@Test
void capturesInsertEvent() throws SQLException {
String eventPayload = "{\"foo\":\"bar\"}";
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload);
}

await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
}

@Test
void ignoresEventsFromAnotherTable() throws SQLException {
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, anotherOutboxTable, "{}");
}
String eventPayload = "{\"foo\":\"bar\"}";
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload);
}

await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
}

private void insertIntoOutboxTable(Connection connection, String outboxTable, String eventPayload) {
try (PreparedStatement statement = connection.prepareStatement(
"INSERT INTO " + outboxTable + " (event_payload) VALUES(?::json)"
)) {
statement.setString(1, eventPayload);
statement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
10 changes: 10 additions & 0 deletions lib/src/test/resources/db/migration/V1__init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,13 @@ CREATE TABLE test_table(
bool_field BOOL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);

CREATE TABLE test_entity_outbox(
id SERIAL NOT NULL PRIMARY KEY,
event_payload JSON
);

CREATE TABLE another_outbox(
id SERIAL NOT NULL PRIMARY KEY,
event_payload JSON
);

0 comments on commit 4284ce2

Please sign in to comment.