From f677f8212bfa0dc222f25e8d937008d270d7f825 Mon Sep 17 00:00:00 2001 From: Vaidotas Valuckas Date: Wed, 1 Nov 2023 16:49:27 +0200 Subject: [PATCH] Improve encapsulation, clear up some Javadoc warnings --- .../io/github/rieske/cdc/DatabaseChange.java | 102 ++++++++---------- .../cdc/JsonDeserializedDatabaseChange.java | 89 +++++++++++++++ .../rieske/cdc/JsonDeserializingConsumer.java | 2 +- .../rieske/cdc/ChangeDataCaptureTest.java | 88 +++++++-------- .../rieske/cdc/TransactionalOutboxTest.java | 16 +-- 5 files changed, 189 insertions(+), 108 deletions(-) create mode 100644 postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializedDatabaseChange.java diff --git a/postgres-cdc/src/main/java/io/github/rieske/cdc/DatabaseChange.java b/postgres-cdc/src/main/java/io/github/rieske/cdc/DatabaseChange.java index 808e50b..e932819 100644 --- a/postgres-cdc/src/main/java/io/github/rieske/cdc/DatabaseChange.java +++ b/postgres-cdc/src/main/java/io/github/rieske/cdc/DatabaseChange.java @@ -1,65 +1,57 @@ package io.github.rieske.cdc; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -public class DatabaseChange { - public final Action action; - public final String schema; - public final String table; - public final Map columns; - - @JsonCreator - DatabaseChange( - @JsonProperty("action") Action action, - @JsonProperty("schema") String schema, - @JsonProperty("table") String table, - @JsonProperty("columns") List columns - ) { - this.action = action; - this.schema = schema; - this.table = table; - Map mutableColumns = new HashMap<>(); - for (Column column : columns) { - mutableColumns.put(column.name, column.value); - } - this.columns = Collections.unmodifiableMap(mutableColumns); - } - - @Override - public String toString() { - return "DatabaseChange{" + - "action='" + action + '\'' + - ", schema='" + schema + '\'' + - ", table='" + table + '\'' + - ", columns=" + columns + - '}'; - } - - public enum Action { - @JsonProperty("I") +/** + * A record, representing a change in the database. + * Exposes the action (INSERT/UPDATE/DELETE/TRUNCATE), schema, table, and a map of column names and their values, all as Strings. + */ +public interface DatabaseChange { + + /** + * @return the INSERT/UPDATE/DELETE/TRUNCATE action that yielded this change. + */ + Action action(); + + /** + * @return the schema where this change originated. + */ + String schema(); + + /** + * @return the table where this change originated. + */ + String table(); + + /** + * @return a Map of column names and their values as Strings from the database change. + * Contains all columns from the changed table - both changed and unchanged. + */ + Map columns(); + + /** + * An action that was performed on the database to cause a change. + */ + enum Action { + + /** + * Indicates that the database change was created using INSERT command + */ INSERT, - @JsonProperty("U") + + /** + * Indicates that the database change was created using UPDATE command + */ UPDATE, - @JsonProperty("D") - DELETE, - @JsonProperty("T") - TRUNCATE - } - static class Column { - private final String name; - private final String value; + /** + * Indicates that the database change was created using DELETE command + */ + DELETE, - @JsonCreator - Column(@JsonProperty("name") String name, @JsonProperty("value") String value) { - this.name = name; - this.value = value; - } + /** + * Indicates that the database change was created using TRUNCATE command + */ + TRUNCATE } } diff --git a/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializedDatabaseChange.java b/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializedDatabaseChange.java new file mode 100644 index 0000000..e81c330 --- /dev/null +++ b/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializedDatabaseChange.java @@ -0,0 +1,89 @@ +package io.github.rieske.cdc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class JsonDeserializedDatabaseChange implements DatabaseChange { + private final Action action; + private final String schema; + private final String table; + private final Map columns; + + @JsonCreator + JsonDeserializedDatabaseChange( + @JsonProperty("action") String action, + @JsonProperty("schema") String schema, + @JsonProperty("table") String table, + @JsonProperty("columns") List columns + ) { + switch (action) { + case "I": + this.action = Action.INSERT; + break; + case "U": + this.action = Action.UPDATE; + break; + case "D": + this.action = Action.DELETE; + break; + case "T": + this.action = Action.TRUNCATE; + break; + default: + throw new IllegalArgumentException("Unrecognized database change action: " + action); + } + this.schema = schema; + this.table = table; + Map mutableColumns = new HashMap<>(); + for (Column column : columns) { + mutableColumns.put(column.name, column.value); + } + this.columns = Collections.unmodifiableMap(mutableColumns); + } + + @Override + public String toString() { + return "DatabaseChange{" + + "action='" + action + '\'' + + ", schema='" + schema + '\'' + + ", table='" + table + '\'' + + ", columns=" + columns + + '}'; + } + + @Override + public Action action() { + return action; + } + + @Override + public String schema() { + return schema; + } + + @Override + public String table() { + return table; + } + + @Override + public Map columns() { + return columns; + } + + static class Column { + private final String name; + private final String value; + + @JsonCreator + Column(@JsonProperty("name") String name, @JsonProperty("value") String value) { + this.name = name; + this.value = value; + } + } +} diff --git a/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializingConsumer.java b/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializingConsumer.java index cb44b21..5763fee 100644 --- a/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializingConsumer.java +++ b/postgres-cdc/src/main/java/io/github/rieske/cdc/JsonDeserializingConsumer.java @@ -25,7 +25,7 @@ public void accept(ByteBuffer message) { byte[] source = message.array(); int length = source.length - offset; try { - DatabaseChange deserializedMessage = MAPPER.readValue(source, offset, length, DatabaseChange.class); + JsonDeserializedDatabaseChange deserializedMessage = MAPPER.readValue(source, offset, length, JsonDeserializedDatabaseChange.class); delegate.accept(deserializedMessage); } catch (IOException e) { throw new RuntimeException(e); diff --git a/postgres-cdc/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java b/postgres-cdc/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java index b1b6bcb..9af5ce9 100644 --- a/postgres-cdc/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java +++ b/postgres-cdc/src/test/java/io/github/rieske/cdc/ChangeDataCaptureTest.java @@ -64,30 +64,30 @@ void capturesInsertEvents() throws SQLException { await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(2)); DatabaseChange firstChange = gatheringConsumer.consumedMessages.get(0); - assertThat(firstChange.action).isEqualTo(DatabaseChange.Action.INSERT); - assertThat(firstChange.schema).isEqualTo("public"); - assertThat(firstChange.table).isEqualTo("test_table"); - assertThat(firstChange.columns.get("id")).isEqualTo(id1.toString()); - assertThat(firstChange.columns.get("integer_field")).isEqualTo("1"); - assertThat(firstChange.columns.get("text_field")).isEqualTo("text1"); - assertThat(firstChange.columns.get("varchar_field")).isEqualTo("varchar1"); - assertThat(firstChange.columns.get("char_field")).isEqualTo("char1 "); - assertThat(firstChange.columns.get("decimal_field")).isEqualTo("42.00"); - assertThat(firstChange.columns.get("bool_field")).isEqualTo("false"); - assertThat(firstChange.columns.get("updated_at")).isNotEmpty(); + assertThat(firstChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT); + assertThat(firstChange.schema()).isEqualTo("public"); + assertThat(firstChange.table()).isEqualTo("test_table"); + assertThat(firstChange.columns().get("id")).isEqualTo(id1.toString()); + assertThat(firstChange.columns().get("integer_field")).isEqualTo("1"); + assertThat(firstChange.columns().get("text_field")).isEqualTo("text1"); + assertThat(firstChange.columns().get("varchar_field")).isEqualTo("varchar1"); + assertThat(firstChange.columns().get("char_field")).isEqualTo("char1 "); + assertThat(firstChange.columns().get("decimal_field")).isEqualTo("42.00"); + assertThat(firstChange.columns().get("bool_field")).isEqualTo("false"); + assertThat(firstChange.columns().get("updated_at")).isNotEmpty(); DatabaseChange secondChange = gatheringConsumer.consumedMessages.get(1); - assertThat(secondChange.action).isEqualTo(DatabaseChange.Action.INSERT); - assertThat(secondChange.schema).isEqualTo("public"); - assertThat(secondChange.table).isEqualTo("test_table"); - assertThat(secondChange.columns.get("id")).isEqualTo(id2.toString()); - assertThat(secondChange.columns.get("integer_field")).isEqualTo("42"); - assertThat(secondChange.columns.get("text_field")).isEqualTo("text2"); - assertThat(secondChange.columns.get("varchar_field")).isEqualTo("varchar2"); - assertThat(secondChange.columns.get("char_field")).isEqualTo("char2 "); - assertThat(secondChange.columns.get("decimal_field")).isEqualTo("0.42"); - assertThat(secondChange.columns.get("bool_field")).isEqualTo("true"); - assertThat(secondChange.columns.get("updated_at")).isNotEmpty(); + assertThat(secondChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT); + assertThat(secondChange.schema()).isEqualTo("public"); + assertThat(secondChange.table()).isEqualTo("test_table"); + assertThat(secondChange.columns().get("id")).isEqualTo(id2.toString()); + assertThat(secondChange.columns().get("integer_field")).isEqualTo("42"); + assertThat(secondChange.columns().get("text_field")).isEqualTo("text2"); + assertThat(secondChange.columns().get("varchar_field")).isEqualTo("varchar2"); + assertThat(secondChange.columns().get("char_field")).isEqualTo("char2 "); + assertThat(secondChange.columns().get("decimal_field")).isEqualTo("0.42"); + assertThat(secondChange.columns().get("bool_field")).isEqualTo("true"); + assertThat(secondChange.columns().get("updated_at")).isNotEmpty(); } @Test @@ -105,30 +105,30 @@ void capturesUpdateEvents() throws SQLException { await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(2)); DatabaseChange firstChange = gatheringConsumer.consumedMessages.get(0); - assertThat(firstChange.action).isEqualTo(DatabaseChange.Action.INSERT); - assertThat(firstChange.schema).isEqualTo("public"); - assertThat(firstChange.table).isEqualTo("test_table"); - assertThat(firstChange.columns.get("id")).isEqualTo(id.toString()); - assertThat(firstChange.columns.get("integer_field")).isEqualTo("1"); - assertThat(firstChange.columns.get("text_field")).isEqualTo("text1"); - assertThat(firstChange.columns.get("varchar_field")).isEqualTo("varchar1"); - assertThat(firstChange.columns.get("char_field")).isEqualTo("char1 "); - assertThat(firstChange.columns.get("decimal_field")).isEqualTo("42.00"); - assertThat(firstChange.columns.get("bool_field")).isEqualTo("false"); - assertThat(firstChange.columns.get("updated_at")).isNotEmpty(); + assertThat(firstChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT); + assertThat(firstChange.schema()).isEqualTo("public"); + assertThat(firstChange.table()).isEqualTo("test_table"); + assertThat(firstChange.columns().get("id")).isEqualTo(id.toString()); + assertThat(firstChange.columns().get("integer_field")).isEqualTo("1"); + assertThat(firstChange.columns().get("text_field")).isEqualTo("text1"); + assertThat(firstChange.columns().get("varchar_field")).isEqualTo("varchar1"); + assertThat(firstChange.columns().get("char_field")).isEqualTo("char1 "); + assertThat(firstChange.columns().get("decimal_field")).isEqualTo("42.00"); + assertThat(firstChange.columns().get("bool_field")).isEqualTo("false"); + assertThat(firstChange.columns().get("updated_at")).isNotEmpty(); DatabaseChange secondChange = gatheringConsumer.consumedMessages.get(1); - assertThat(secondChange.action).isEqualTo(DatabaseChange.Action.UPDATE); - assertThat(secondChange.schema).isEqualTo("public"); - assertThat(secondChange.table).isEqualTo("test_table"); - assertThat(secondChange.columns.get("id")).isEqualTo(id.toString()); - assertThat(secondChange.columns.get("integer_field")).isEqualTo("1"); - assertThat(secondChange.columns.get("text_field")).isEqualTo("text1"); - assertThat(secondChange.columns.get("varchar_field")).isEqualTo("varchar1"); - assertThat(secondChange.columns.get("char_field")).isNull(); - assertThat(secondChange.columns.get("decimal_field")).isEqualTo("123.45"); - assertThat(secondChange.columns.get("bool_field")).isEqualTo("true"); - assertThat(secondChange.columns.get("updated_at")).isNotEmpty(); + assertThat(secondChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.UPDATE); + assertThat(secondChange.schema()).isEqualTo("public"); + assertThat(secondChange.table()).isEqualTo("test_table"); + assertThat(secondChange.columns().get("id")).isEqualTo(id.toString()); + assertThat(secondChange.columns().get("integer_field")).isEqualTo("1"); + assertThat(secondChange.columns().get("text_field")).isEqualTo("text1"); + assertThat(secondChange.columns().get("varchar_field")).isEqualTo("varchar1"); + assertThat(secondChange.columns().get("char_field")).isNull(); + assertThat(secondChange.columns().get("decimal_field")).isEqualTo("123.45"); + assertThat(secondChange.columns().get("bool_field")).isEqualTo("true"); + assertThat(secondChange.columns().get("updated_at")).isNotEmpty(); } @Test diff --git a/postgres-cdc/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java b/postgres-cdc/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java index b518b4c..89309af 100644 --- a/postgres-cdc/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java +++ b/postgres-cdc/src/test/java/io/github/rieske/cdc/TransactionalOutboxTest.java @@ -56,10 +56,10 @@ void capturesInsertEvent() throws SQLException { await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1)); DatabaseChange event = gatheringConsumer.consumedMessages.get(0); - assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT); - assertThat(event.schema).isEqualTo("public"); - assertThat(event.table).isEqualTo("test_entity_outbox"); - assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload); + assertThat(event.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT); + assertThat(event.schema()).isEqualTo("public"); + assertThat(event.table()).isEqualTo("test_entity_outbox"); + assertThat(event.columns().get("event_payload")).isEqualTo(eventPayload); } @Test @@ -75,10 +75,10 @@ void ignoresEventsFromAnotherTable() throws SQLException { await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1)); DatabaseChange event = gatheringConsumer.consumedMessages.get(0); - assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT); - assertThat(event.schema).isEqualTo("public"); - assertThat(event.table).isEqualTo("test_entity_outbox"); - assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload); + assertThat(event.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT); + assertThat(event.schema()).isEqualTo("public"); + assertThat(event.table()).isEqualTo("test_entity_outbox"); + assertThat(event.columns().get("event_payload")).isEqualTo(eventPayload); } private void insertIntoOutboxTable(Connection connection, String outboxTable, String eventPayload) {