Skip to content

Commit

Permalink
Improve log and exception messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rieske committed Nov 2, 2023
1 parent 9c519c5 commit 663b970
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void createReplicationSlot() {
if (SQLSTATE_DUPLICATE_OBJECT.equals(e.getSQLState())) {
LOGGER.info("Replication slot {} already exists", replicationSlotName);
} else {
throw new RuntimeException("Could not create replication slot", e);
throw new RuntimeException("Could not create replication slot " + replicationSlotName, e);
}
}
}
Expand All @@ -92,7 +92,7 @@ public void dropReplicationSlot() {
connection.getReplicationAPI().dropReplicationSlot(replicationSlotName);
LOGGER.info("Dropped replications slot {}", replicationSlotName);
} catch (SQLException e) {
throw new RuntimeException("Could not drop replication slot", e);
throw new RuntimeException("Could not drop replication slot " + replicationSlotName, e);
}
}

Expand All @@ -110,7 +110,7 @@ public void stop() {
try {
if (!replicationStreamExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
replicationStreamExecutor.shutdownNow();
LOGGER.warn("Replication stream executor was shut down forcefully");
LOGGER.warn("Replication stream executor for slot {} was shut down forcefully", replicationSlotName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -121,7 +121,7 @@ private PgConnection createConnection() {
try {
return DriverManager.getConnection(jdbcUrl, databaseConnectionProperties).unwrap(PgConnection.class);
} catch (SQLException e) {
throw new RuntimeException("Could not create connection", e);
throw new RuntimeException("Could not create database connection", e);
}
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ private void consumeStream(PGReplicationStream stream) throws SQLException {
TimeUnit.MILLISECONDS.sleep(10L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread was interrupted", e);
throw new RuntimeException("Replication slot " + replicationSlotName + " consumer thread was interrupted", e);
}
continue;
}
Expand All @@ -204,7 +204,7 @@ private void consumeStream(PGReplicationStream stream) throws SQLException {
stream.setFlushedLSN(stream.getLastReceiveLSN());
stream.forceUpdateStatus();
} catch (Exception e) {
LOGGER.warn("Could not consume database change event", e);
LOGGER.warn("Could not consume database change event from replication slot" + replicationSlotName, e);
}
}
LOGGER.info("Replication slot {} consumer was stopped", replicationSlotName);
Expand Down

0 comments on commit 663b970

Please sign in to comment.