Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transactional outbox example tests #5

Merged
merged 6 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# postgres-cdc

[![Actions Status](https://github.com/rieske/postgres-cdc/workflows/master/badge.svg)](https://github.com/rieske/postgres-cdc/actions)

Java library that utilizes [PostgreSQL logical replication](https://www.postgresql.org/docs/current/logical-replication.html)
feature to implement [Change Data Capture](https://en.wikipedia.org/wiki/Change_data_capture).

Once logical replication is configured on the PostgreSQL server, this library can subscribe to changes
in the specified tables.
The change events are streamed in real time and can be relayed to message brokers
as they occur, allowing to implement the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html)
pattern.

## Prerequisites

PostgreSQL version 13.12 or later.
Note 13.12 is the earliest one that this library is tested against at the time of writing.
In theory, it may work with PostgreSQL 9.5 and above.

Logical replication must be [configured](https://www.postgresql.org/docs/current/logical-replication-config.html#LOGICAL-REPLICATION-CONFIG-PUBLISHER)
on the PostgreSQL server.

If you are using AWS Aurora, see [here](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.Replication.Logical.html#AuroraPostgreSQL.Replication.Logical.Configure)
for instructions to enable logical replication.

## Usage

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,7 +35,14 @@ public class PostgresReplicationListener {
private final ReplicationStreamConsumer replicationStreamConsumer;


public PostgresReplicationListener(String jdbcUrl, String databaseUser, String databasePassword, String replicationSlotName, Consumer<ByteBuffer> consumer) {
public PostgresReplicationListener(
String jdbcUrl,
String databaseUser,
String databasePassword,
String replicationSlotName,
Set<String> tablesToListenTo,
Consumer<ByteBuffer> consumer
) {
this.jdbcUrl = jdbcUrl;
this.replicationSlotName = replicationSlotName;

Expand All @@ -51,7 +59,7 @@ public PostgresReplicationListener(String jdbcUrl, String databaseUser, String d
return thread;
});

this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, consumer);
this.replicationStreamConsumer = new ReplicationStreamConsumer(this::createConnection, replicationSlotName, tablesToListenTo, consumer);
Runtime.getRuntime().addShutdownHook(new Thread(replicationStreamConsumer::stop));

replicationStreamExecutor.submit(replicationStreamConsumer);
Expand Down Expand Up @@ -119,13 +127,20 @@ class ReplicationStreamConsumer implements Runnable {

private final Supplier<PgConnection> connectionSupplier;
private final String replicationSlotName;
private final Set<String> tablesToListenTo;
private final Consumer<ByteBuffer> consumer;

private volatile boolean running = false;

ReplicationStreamConsumer(Supplier<PgConnection> connectionSupplier, String replicationSlotName, Consumer<ByteBuffer> consumer) {
ReplicationStreamConsumer(
Supplier<PgConnection> connectionSupplier,
String replicationSlotName,
Set<String> tablesToListenTo,
Consumer<ByteBuffer> consumer
) {
this.connectionSupplier = connectionSupplier;
this.replicationSlotName = replicationSlotName;
this.tablesToListenTo = tablesToListenTo;
this.consumer = consumer;
}

Expand Down Expand Up @@ -196,6 +211,7 @@ private PGReplicationStream getStream(PGConnection connection) throws SQLExcepti
.withSlotOption("format-version", 2)
.withSlotOption("include-transaction", false)
.withSlotOption("include-timestamp", true)
.withSlotOption("add-tables", String.join(",", tablesToListenTo))
.withStatusInterval(10, TimeUnit.SECONDS)
.start();
}
Expand Down
39 changes: 11 additions & 28 deletions lib/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -27,30 +24,16 @@ class ChangeDataCaptureTest {
final DatabaseExtension database = new DatabaseExtension();

private final String replicationSlotName = "cdc_stream";
private final Consumer<ByteBuffer> printingConsumer = msg -> {
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
};

private static class GatheringConsumer<T> implements Consumer<T> {
final List<T> consumedMessages = new CopyOnWriteArrayList<>();

@Override
public void accept(T message) {
consumedMessages.add(message);
}
}

private final GatheringConsumer<DatabaseChange> gatheringConsumer = new GatheringConsumer<>();
private final GatheringConsumer<DatabaseChange> gatheringConsumer = TestConsumers.gathering();

private final PostgresReplicationListener listener = new PostgresReplicationListener(
database.jdbcUrl(),
database.databaseUsername(),
database.databasePassword(),
replicationSlotName,
printingConsumer.andThen(new JsonDeserializingConsumer(gatheringConsumer))
Set.of("public.test_table"),
TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer))
);

@BeforeEach
Expand Down Expand Up @@ -173,9 +156,9 @@ private void insertIntoTestTable(
Instant updatedAt
) {
try (PreparedStatement statement = connection.prepareStatement(
"INSERT INTO test_table" +
"(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
"INSERT INTO test_table" +
"(id, integer_field, text_field, varchar_field, char_field, decimal_field, bool_field, updated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
) {
statement.setObject(1, id);
statement.setInt(2, integerField);
Expand Down Expand Up @@ -203,10 +186,10 @@ private void updateTestTable(
Instant updatedAt
) {
try (PreparedStatement statement = connection.prepareStatement(
"UPDATE test_table SET " +
"integer_field=?, text_field=?, varchar_field=?, char_field=?, " +
"decimal_field=?, bool_field=?, updated_at=? " +
"WHERE id=?")
"UPDATE test_table SET " +
"integer_field=?, text_field=?, varchar_field=?, char_field=?, " +
"decimal_field=?, bool_field=?, updated_at=? " +
"WHERE id=?")
) {
statement.setInt(1, integerField);
statement.setString(2, textField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DatabaseExtension implements BeforeEachCallback, AfterEachCallback
private static final int DB_PORT = 5432;
private static final String DEFAULT_DATABASE = "postgres";

private static String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION");
private static final String POSTGRES_VERSION = System.getenv("POSTGRES_VERSION");

private static final GenericContainer<?> DB_CONTAINER = new GenericContainer<>(
new ImageFromDockerfile("postgres-cdc-test")
Expand Down
33 changes: 33 additions & 0 deletions lib/src/test/java/io/github/rieske/cdc/TestConsumers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.github.rieske.cdc;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

final class TestConsumers {
private TestConsumers() {
}

static Consumer<ByteBuffer> printing() {
return msg -> {
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
};
}

static <T> GatheringConsumer<T> gathering() {
return new GatheringConsumer<>();
}
}

class GatheringConsumer<T> implements Consumer<T> {
final List<T> consumedMessages = new CopyOnWriteArrayList<>();

@Override
public void accept(T message) {
consumedMessages.add(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.github.rieske.cdc;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class TransactionalOutboxTest {

@RegisterExtension
final DatabaseExtension database = new DatabaseExtension();

private final String replicationSlotName = "cdc_stream";
private final String testEntityOutboxTable = "test_entity_outbox";
private final String anotherOutboxTable = "another_outbox";

private final GatheringConsumer<DatabaseChange> gatheringConsumer = TestConsumers.gathering();

private final PostgresReplicationListener listener = new PostgresReplicationListener(
database.jdbcUrl(),
database.databaseUsername(),
database.databasePassword(),
replicationSlotName,
Set.of("public." + testEntityOutboxTable),
TestConsumers.printing().andThen(new JsonDeserializingConsumer(gatheringConsumer))
);

@BeforeEach
void setup() {
listener.createReplicationSlot();
listener.start();
}

@AfterEach
void tearDown() {
listener.stop();
listener.dropReplicationSlot();
}

@Test
void capturesInsertEvent() throws SQLException {
String eventPayload = "{\"foo\":\"bar\"}";
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload);
}

await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
}

@Test
void ignoresEventsFromAnotherTable() throws SQLException {
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, anotherOutboxTable, "{}");
}
String eventPayload = "{\"foo\":\"bar\"}";
try (Connection connection = database.getDataSource().getConnection()) {
insertIntoOutboxTable(connection, testEntityOutboxTable, eventPayload);
}

await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
}

private void insertIntoOutboxTable(Connection connection, String outboxTable, String eventPayload) {
try (PreparedStatement statement = connection.prepareStatement(
"INSERT INTO " + outboxTable + " (event_payload) VALUES(?::json)"
)) {
statement.setString(1, eventPayload);
statement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
10 changes: 10 additions & 0 deletions lib/src/test/resources/db/migration/V1__init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,13 @@ CREATE TABLE test_table(
bool_field BOOL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);

CREATE TABLE test_entity_outbox(
id SERIAL NOT NULL PRIMARY KEY,
event_payload JSON
);

CREATE TABLE another_outbox(
id SERIAL NOT NULL PRIMARY KEY,
event_payload JSON
);
Loading