diff --git a/lib/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java b/lib/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java index 7beb798..ab6473e 100644 --- a/lib/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java +++ b/lib/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java @@ -11,6 +11,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -34,7 +35,14 @@ public class PostgresReplicationListener { private final ReplicationStreamConsumer replicationStreamConsumer; - public PostgresReplicationListener(String jdbcUrl, String databaseUser, String databasePassword, String replicationSlotName, Consumer consumer) { + public PostgresReplicationListener( + String jdbcUrl, + String databaseUser, + String databasePassword, + String replicationSlotName, + Set tablesToListenTo, + Consumer consumer + ) { this.jdbcUrl = jdbcUrl; this.replicationSlotName = replicationSlotName; @@ -51,7 +59,7 @@ public PostgresReplicationListener(String jdbcUrl, String databaseUser, String d return thread; }); - this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, consumer); + this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, tablesToListenTo, consumer); Runtime.getRuntime().addShutdownHook(new Thread(replicationStreamConsumer::stop)); replicationStreamExecutor.submit(replicationStreamConsumer); @@ -119,13 +127,20 @@ class ReplicationStreamConsumer implements Runnable { private final Supplier connectionSupplier; private final String replicationSlotName; + private final Set tablesToListenTo; private final Consumer consumer; private volatile boolean running = false; - ReplicationStreamConsumer(Supplier connectionSupplier, String replicationSlotName, Consumer consumer) { + ReplicationStreamConsumer( + Supplier connectionSupplier, + String replicationSlotName, + Set tablesToListenTo, + Consumer consumer + ) { this.connectionSupplier = connectionSupplier; this.replicationSlotName = replicationSlotName; + this.tablesToListenTo = tablesToListenTo; this.consumer = consumer; } @@ -196,6 +211,7 @@ private PGReplicationStream getStream(PGConnection connection) throws SQLExcepti .withSlotOption("format-version", 2) .withSlotOption("include-transaction", false) .withSlotOption("include-timestamp", true) + .withSlotOption("add-tables", String.join(",", tablesToListenTo)) .withStatusInterval(10, TimeUnit.SECONDS) .start(); } diff --git a/lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java b/lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java index 65cce09..b1b6bcb 100644 --- a/lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java +++ b/lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java @@ -6,17 +6,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; -import java.util.List; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -27,30 +24,16 @@ class ChangeDataCaptureTest { final DatabaseExtension database = new DatabaseExtension(); private final String replicationSlotName = "cdc_stream"; - private final Consumer printingConsumer = msg -> { - int offset = msg.arrayOffset(); - byte[] source = msg.array(); - int length = source.length - offset; - System.out.println(new String(source, offset, length)); - }; - - private static class GatheringConsumer implements Consumer { - final List consumedMessages = new CopyOnWriteArrayList<>(); - - @Override - public void accept(T message) { - consumedMessages.add(message); - } - } - private final GatheringConsumer gatheringConsumer = new GatheringConsumer<>(); + private final GatheringConsumer gatheringConsumer = TestConsumers.gathering(); private final PostgresReplicationListener listener = new PostgresReplicationListener( database.jdbcUrl(), database.databaseUsername(), database.databasePassword(), replicationSlotName, - printingConsumer.andThen(new JsonDeserializingConsumer(gatheringConsumer)) + Set.of("public.test_table"), + TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer)) ); @BeforeEach @@ -173,9 +156,9 @@ private void insertIntoTestTable( Instant updatedAt ) { try (PreparedStatement statement = connection.prepareStatement( - "INSERT INTO test_table" + - "(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + "INSERT INTO test_table" + + "(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)") ) { statement.setObject(1, id); statement.setInt(2, integerField); @@ -203,10 +186,10 @@ private void updateTestTable( Instant updatedAt ) { try (PreparedStatement statement = connection.prepareStatement( - "UPDATE test_table SET " + - "integer_field=?, text_field=?, varchar_field=?, char_field=?, " + - "decimal_field=?, bool_field=?, updated_at=? " + - "WHERE id=?") + "UPDATE test_table SET " + + "integer_field=?, text_field=?, varchar_field=?, char_field=?, " + + "decimal_field=?, bool_field=?, updated_at=? " + + "WHERE id=?") ) { statement.setInt(1, integerField); statement.setString(2, textField); diff --git a/lib/src/test/java/io/github/rieske/cdc/DatabaseExtension.java b/lib/src/test/java/io/github/rieske/cdc/DatabaseExtension.java index 0082cec..6b0d4e6 100644 --- a/lib/src/test/java/io/github/rieske/cdc/DatabaseExtension.java +++ b/lib/src/test/java/io/github/rieske/cdc/DatabaseExtension.java @@ -28,7 +28,7 @@ public class DatabaseExtension implements BeforeEachCallback, AfterEachCallback private static final int DB_PORT = 5432; private static final String DEFAULT_DATABASE = "postgres"; - private static String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION"); + private static final String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION"); private static final GenericContainer DB_CONTAINER = new GenericContainer<>( new ImageFromDockerfile("postgres-cdc-test") diff --git a/lib/src/test/java/io/github/rieske/cdc/TestConsumers.java b/lib/src/test/java/io/github/rieske/cdc/TestConsumers.java new file mode 100644 index 0000000..23bafcd --- /dev/null +++ b/lib/src/test/java/io/github/rieske/cdc/TestConsumers.java @@ -0,0 +1,33 @@ +package io.github.rieske.cdc; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +final class TestConsumers { + private TestConsumers() { + } + + static Consumer printing() { + return msg -> { + int offset = msg.arrayOffset(); + byte[] source = msg.array(); + int length = source.length - offset; + System.out.println(new String(source, offset, length)); + }; + } + + static GatheringConsumer gathering() { + return new GatheringConsumer<>(); + } +} + +class GatheringConsumer implements Consumer { + final List consumedMessages = new CopyOnWriteArrayList<>(); + + @Override + public void accept(T message) { + consumedMessages.add(message); + } +} diff --git a/lib/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java b/lib/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java new file mode 100644 index 0000000..b518b4c --- /dev/null +++ b/lib/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java @@ -0,0 +1,94 @@ +package io.github.rieske.cdc; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class TransactionalOutboxTest { + + @RegisterExtension + final DatabaseExtension database = new DatabaseExtension(); + + private final String replicationSlotName = "cdc_stream"; + private final String testEntityOutboxTable = "test_entity_outbox"; + private final String anotherOutboxTable = "another_outbox"; + + private final GatheringConsumer gatheringConsumer = TestConsumers.gathering(); + + private final PostgresReplicationListener listener = new PostgresReplicationListener( + database.jdbcUrl(), + database.databaseUsername(), + database.databasePassword(), + replicationSlotName, + Set.of("public." + testEntityOutboxTable), + TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer)) + ); + + @BeforeEach + void setup() { + listener.createReplicationSlot(); + listener.start(); + } + + @AfterEach + void tearDown() { + listener.stop(); + listener.dropReplicationSlot(); + } + + @Test + void capturesInsertEvent() throws SQLException { + String eventPayload = "{\"foo\":\"bar\"}"; + try (Connection connection = database.getDataSource().getConnection()) { + insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload); + } + + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1)); + + DatabaseChange event = gatheringConsumer.consumedMessages.get(0); + assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT); + assertThat(event.schema).isEqualTo("public"); + assertThat(event.table).isEqualTo("test_entity_outbox"); + assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload); + } + + @Test + void ignoresEventsFromAnotherTable() throws SQLException { + try (Connection connection = database.getDataSource().getConnection()) { + insertIntoOutboxTable(connection, anotherOutboxTable, "{}"); + } + String eventPayload = "{\"foo\":\"bar\"}"; + try (Connection connection = database.getDataSource().getConnection()) { + insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload); + } + + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1)); + + DatabaseChange event = gatheringConsumer.consumedMessages.get(0); + assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT); + assertThat(event.schema).isEqualTo("public"); + assertThat(event.table).isEqualTo("test_entity_outbox"); + assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload); + } + + private void insertIntoOutboxTable(Connection connection, String outboxTable, String eventPayload) { + try (PreparedStatement statement = connection.prepareStatement( + "INSERT INTO " + outboxTable + " (event_payload) VALUES(?::json)" + )) { + statement.setString(1, eventPayload); + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/lib/src/test/resources/db/migration/V1__init.sql b/lib/src/test/resources/db/migration/V1__init.sql index d67e03e..dc71a62 100644 --- a/lib/src/test/resources/db/migration/V1__init.sql +++ b/lib/src/test/resources/db/migration/V1__init.sql @@ -10,3 +10,13 @@ CREATE TABLE test_table( bool_field BOOL, updated_at TIMESTAMP WITH TIME ZONE NOT NULL ); + +CREATE TABLE test_entity_outbox( + id SERIAL NOT NULL PRIMARY KEY, + event_payload JSON +); + +CREATE TABLE another_outbox( + id SERIAL NOT NULL PRIMARY KEY, + event_payload JSON +);