diff --git a/postgres-cdc/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java b/postgres-cdc/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java index 6f9cc78..e1e71ac 100644 --- a/postgres-cdc/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java +++ b/postgres-cdc/src/main/java/io/github/rieske/cdc/PostgresReplicationListener.java @@ -80,7 +80,7 @@ public void createReplicationSlot() { if (SQLSTATE_DUPLICATE_OBJECT.equals(e.getSQLState())) { LOGGER.info("Replication slot {} already exists", replicationSlotName); } else { - throw new RuntimeException("Could not create replication slot", e); + throw new RuntimeException("Could not create replication slot " + replicationSlotName, e); } } } @@ -92,7 +92,7 @@ public void dropReplicationSlot() { connection.getReplicationAPI().dropReplicationSlot(replicationSlotName); LOGGER.info("Dropped replications slot {}", replicationSlotName); } catch (SQLException e) { - throw new RuntimeException("Could not drop replication slot", e); + throw new RuntimeException("Could not drop replication slot " + replicationSlotName, e); } } @@ -110,7 +110,7 @@ public void stop() { try { if (!replicationStreamExecutor.awaitTermination(10, TimeUnit.SECONDS)) { replicationStreamExecutor.shutdownNow(); - LOGGER.warn("Replication stream executor was shut down forcefully"); + LOGGER.warn("Replication stream executor for slot {} was shut down forcefully", replicationSlotName); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -121,7 +121,7 @@ private PgConnection createConnection() { try { return DriverManager.getConnection(jdbcUrl, databaseConnectionProperties).unwrap(PgConnection.class); } catch (SQLException e) { - throw new RuntimeException("Could not create connection", e); + throw new RuntimeException("Could not create database connection", e); } } } @@ -192,7 +192,7 @@ private void consumeStream(PGReplicationStream stream) throws SQLException { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Thread was interrupted", e); + throw new RuntimeException("Replication slot " + replicationSlotName + " consumer thread was interrupted", e); } continue; } @@ -204,7 +204,7 @@ private void consumeStream(PGReplicationStream stream) throws SQLException { stream.setFlushedLSN(stream.getLastReceiveLSN()); stream.forceUpdateStatus(); } catch (Exception e) { - LOGGER.warn("Could not consume database change event", e); + LOGGER.warn("Could not consume database change event from replication slot" + replicationSlotName, e); } } LOGGER.info("Replication slot {} consumer was stopped", replicationSlotName);