Skip to content

Commit

Permalink
Improve encapsulation, clear up some Javadoc warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
rieske committed Nov 1, 2023
1 parent b82fcee commit f677f82
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 108 deletions.
102 changes: 47 additions & 55 deletions postgres-cdc/src/main/java/io/github/rieske/cdc/DatabaseChange.java
Original file line number Diff line number Diff line change
@@ -1,65 +1,57 @@
package io.github.rieske.cdc;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DatabaseChange {
public final Action action;
public final String schema;
public final String table;
public final Map<String, String> columns;

@JsonCreator
DatabaseChange(
@JsonProperty("action") Action action,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("columns") List<Column> columns
) {
this.action = action;
this.schema = schema;
this.table = table;
Map<String, String> mutableColumns = new HashMap<>();
for (Column column : columns) {
mutableColumns.put(column.name, column.value);
}
this.columns = Collections.unmodifiableMap(mutableColumns);
}

@Override
public String toString() {
return "DatabaseChange{" +
"action='" + action + '\'' +
", schema='" + schema + '\'' +
", table='" + table + '\'' +
", columns=" + columns +
'}';
}

public enum Action {
@JsonProperty("I")
/**
* A record, representing a change in the database.
* Exposes the action (INSERT/UPDATE/DELETE/TRUNCATE), schema, table, and a map of column names and their values, all as Strings.
*/
public interface DatabaseChange {

/**
* @return the INSERT/UPDATE/DELETE/TRUNCATE action that yielded this change.
*/
Action action();

/**
* @return the schema where this change originated.
*/
String schema();

/**
* @return the table where this change originated.
*/
String table();

/**
* @return a Map of column names and their values as Strings from the database change.
* Contains all columns from the changed table - both changed and unchanged.
*/
Map<String, String> columns();

/**
* An action that was performed on the database to cause a change.
*/
enum Action {

/**
* Indicates that the database change was created using INSERT command
*/
INSERT,
@JsonProperty("U")

/**
* Indicates that the database change was created using UPDATE command
*/
UPDATE,
@JsonProperty("D")
DELETE,
@JsonProperty("T")
TRUNCATE
}

static class Column {
private final String name;
private final String value;
/**
* Indicates that the database change was created using DELETE command
*/
DELETE,

@JsonCreator
Column(@JsonProperty("name") String name, @JsonProperty("value") String value) {
this.name = name;
this.value = value;
}
/**
* Indicates that the database change was created using TRUNCATE command
*/
TRUNCATE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.github.rieske.cdc;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class JsonDeserializedDatabaseChange implements DatabaseChange {
private final Action action;
private final String schema;
private final String table;
private final Map<String, String> columns;

@JsonCreator
JsonDeserializedDatabaseChange(
@JsonProperty("action") String action,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("columns") List<Column> columns
) {
switch (action) {
case "I":
this.action = Action.INSERT;
break;
case "U":
this.action = Action.UPDATE;
break;
case "D":
this.action = Action.DELETE;
break;
case "T":
this.action = Action.TRUNCATE;
break;
default:
throw new IllegalArgumentException("Unrecognized database change action: " + action);
}
this.schema = schema;
this.table = table;
Map<String, String> mutableColumns = new HashMap<>();
for (Column column : columns) {
mutableColumns.put(column.name, column.value);
}
this.columns = Collections.unmodifiableMap(mutableColumns);
}

@Override
public String toString() {
return "DatabaseChange{" +
"action='" + action + '\'' +
", schema='" + schema + '\'' +
", table='" + table + '\'' +
", columns=" + columns +
'}';
}

@Override
public Action action() {
return action;
}

@Override
public String schema() {
return schema;
}

@Override
public String table() {
return table;
}

@Override
public Map<String, String> columns() {
return columns;
}

static class Column {
private final String name;
private final String value;

@JsonCreator
Column(@JsonProperty("name") String name, @JsonProperty("value") String value) {
this.name = name;
this.value = value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void accept(ByteBuffer message) {
byte[] source = message.array();
int length = source.length - offset;
try {
DatabaseChange deserializedMessage = MAPPER.readValue(source, offset, length, DatabaseChange.class);
JsonDeserializedDatabaseChange deserializedMessage = MAPPER.readValue(source, offset, length, JsonDeserializedDatabaseChange.class);
delegate.accept(deserializedMessage);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,30 @@ void capturesInsertEvents() throws SQLException {
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(2));

DatabaseChange firstChange = gatheringConsumer.consumedMessages.get(0);
assertThat(firstChange.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(firstChange.schema).isEqualTo("public");
assertThat(firstChange.table).isEqualTo("test_table");
assertThat(firstChange.columns.get("id")).isEqualTo(id1.toString());
assertThat(firstChange.columns.get("integer_field")).isEqualTo("1");
assertThat(firstChange.columns.get("text_field")).isEqualTo("text1");
assertThat(firstChange.columns.get("varchar_field")).isEqualTo("varchar1");
assertThat(firstChange.columns.get("char_field")).isEqualTo("char1 ");
assertThat(firstChange.columns.get("decimal_field")).isEqualTo("42.00");
assertThat(firstChange.columns.get("bool_field")).isEqualTo("false");
assertThat(firstChange.columns.get("updated_at")).isNotEmpty();
assertThat(firstChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT);
assertThat(firstChange.schema()).isEqualTo("public");
assertThat(firstChange.table()).isEqualTo("test_table");
assertThat(firstChange.columns().get("id")).isEqualTo(id1.toString());
assertThat(firstChange.columns().get("integer_field")).isEqualTo("1");
assertThat(firstChange.columns().get("text_field")).isEqualTo("text1");
assertThat(firstChange.columns().get("varchar_field")).isEqualTo("varchar1");
assertThat(firstChange.columns().get("char_field")).isEqualTo("char1 ");
assertThat(firstChange.columns().get("decimal_field")).isEqualTo("42.00");
assertThat(firstChange.columns().get("bool_field")).isEqualTo("false");
assertThat(firstChange.columns().get("updated_at")).isNotEmpty();

DatabaseChange secondChange = gatheringConsumer.consumedMessages.get(1);
assertThat(secondChange.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(secondChange.schema).isEqualTo("public");
assertThat(secondChange.table).isEqualTo("test_table");
assertThat(secondChange.columns.get("id")).isEqualTo(id2.toString());
assertThat(secondChange.columns.get("integer_field")).isEqualTo("42");
assertThat(secondChange.columns.get("text_field")).isEqualTo("text2");
assertThat(secondChange.columns.get("varchar_field")).isEqualTo("varchar2");
assertThat(secondChange.columns.get("char_field")).isEqualTo("char2 ");
assertThat(secondChange.columns.get("decimal_field")).isEqualTo("0.42");
assertThat(secondChange.columns.get("bool_field")).isEqualTo("true");
assertThat(secondChange.columns.get("updated_at")).isNotEmpty();
assertThat(secondChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT);
assertThat(secondChange.schema()).isEqualTo("public");
assertThat(secondChange.table()).isEqualTo("test_table");
assertThat(secondChange.columns().get("id")).isEqualTo(id2.toString());
assertThat(secondChange.columns().get("integer_field")).isEqualTo("42");
assertThat(secondChange.columns().get("text_field")).isEqualTo("text2");
assertThat(secondChange.columns().get("varchar_field")).isEqualTo("varchar2");
assertThat(secondChange.columns().get("char_field")).isEqualTo("char2 ");
assertThat(secondChange.columns().get("decimal_field")).isEqualTo("0.42");
assertThat(secondChange.columns().get("bool_field")).isEqualTo("true");
assertThat(secondChange.columns().get("updated_at")).isNotEmpty();
}

@Test
Expand All @@ -105,30 +105,30 @@ void capturesUpdateEvents() throws SQLException {
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(2));

DatabaseChange firstChange = gatheringConsumer.consumedMessages.get(0);
assertThat(firstChange.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(firstChange.schema).isEqualTo("public");
assertThat(firstChange.table).isEqualTo("test_table");
assertThat(firstChange.columns.get("id")).isEqualTo(id.toString());
assertThat(firstChange.columns.get("integer_field")).isEqualTo("1");
assertThat(firstChange.columns.get("text_field")).isEqualTo("text1");
assertThat(firstChange.columns.get("varchar_field")).isEqualTo("varchar1");
assertThat(firstChange.columns.get("char_field")).isEqualTo("char1 ");
assertThat(firstChange.columns.get("decimal_field")).isEqualTo("42.00");
assertThat(firstChange.columns.get("bool_field")).isEqualTo("false");
assertThat(firstChange.columns.get("updated_at")).isNotEmpty();
assertThat(firstChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT);
assertThat(firstChange.schema()).isEqualTo("public");
assertThat(firstChange.table()).isEqualTo("test_table");
assertThat(firstChange.columns().get("id")).isEqualTo(id.toString());
assertThat(firstChange.columns().get("integer_field")).isEqualTo("1");
assertThat(firstChange.columns().get("text_field")).isEqualTo("text1");
assertThat(firstChange.columns().get("varchar_field")).isEqualTo("varchar1");
assertThat(firstChange.columns().get("char_field")).isEqualTo("char1 ");
assertThat(firstChange.columns().get("decimal_field")).isEqualTo("42.00");
assertThat(firstChange.columns().get("bool_field")).isEqualTo("false");
assertThat(firstChange.columns().get("updated_at")).isNotEmpty();

DatabaseChange secondChange = gatheringConsumer.consumedMessages.get(1);
assertThat(secondChange.action).isEqualTo(DatabaseChange.Action.UPDATE);
assertThat(secondChange.schema).isEqualTo("public");
assertThat(secondChange.table).isEqualTo("test_table");
assertThat(secondChange.columns.get("id")).isEqualTo(id.toString());
assertThat(secondChange.columns.get("integer_field")).isEqualTo("1");
assertThat(secondChange.columns.get("text_field")).isEqualTo("text1");
assertThat(secondChange.columns.get("varchar_field")).isEqualTo("varchar1");
assertThat(secondChange.columns.get("char_field")).isNull();
assertThat(secondChange.columns.get("decimal_field")).isEqualTo("123.45");
assertThat(secondChange.columns.get("bool_field")).isEqualTo("true");
assertThat(secondChange.columns.get("updated_at")).isNotEmpty();
assertThat(secondChange.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.UPDATE);
assertThat(secondChange.schema()).isEqualTo("public");
assertThat(secondChange.table()).isEqualTo("test_table");
assertThat(secondChange.columns().get("id")).isEqualTo(id.toString());
assertThat(secondChange.columns().get("integer_field")).isEqualTo("1");
assertThat(secondChange.columns().get("text_field")).isEqualTo("text1");
assertThat(secondChange.columns().get("varchar_field")).isEqualTo("varchar1");
assertThat(secondChange.columns().get("char_field")).isNull();
assertThat(secondChange.columns().get("decimal_field")).isEqualTo("123.45");
assertThat(secondChange.columns().get("bool_field")).isEqualTo("true");
assertThat(secondChange.columns().get("updated_at")).isNotEmpty();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ void capturesInsertEvent() throws SQLException {
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
assertThat(event.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT);
assertThat(event.schema()).isEqualTo("public");
assertThat(event.table()).isEqualTo("test_entity_outbox");
assertThat(event.columns().get("event_payload")).isEqualTo(eventPayload);
}

@Test
Expand All @@ -75,10 +75,10 @@ void ignoresEventsFromAnotherTable() throws SQLException {
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertThat(gatheringConsumer.consumedMessages).hasSize(1));

DatabaseChange event = gatheringConsumer.consumedMessages.get(0);
assertThat(event.action).isEqualTo(DatabaseChange.Action.INSERT);
assertThat(event.schema).isEqualTo("public");
assertThat(event.table).isEqualTo("test_entity_outbox");
assertThat(event.columns.get("event_payload")).isEqualTo(eventPayload);
assertThat(event.action()).isEqualTo(JsonDeserializedDatabaseChange.Action.INSERT);
assertThat(event.schema()).isEqualTo("public");
assertThat(event.table()).isEqualTo("test_entity_outbox");
assertThat(event.columns().get("event_payload")).isEqualTo(eventPayload);
}

private void insertIntoOutboxTable(Connection connection, String outboxTable, String eventPayload) {
Expand Down

0 comments on commit f677f82

Please sign in to comment.