- Introduction
- Environment Setup
- Database Initialization
- Prepare
pgstream
Configuration - Validate
pgstream
status - Run
pgstream
- Verify Data Transformation
- Troubleshooting
- Summary
This tutorial will showcase the use of pgstream to replicate data from PostgreSQL using transformers. We'll use a target PostgreSQL database as target, but the same would apply to any of the supported targets.
- A source PostgreSQL database
- A target PostgreSQL database
- pgstream (see installation instructions for more details)
tutorial_pg2pg_transformer_demo.mov
Youtube link here.
The first step is to start the two PostgreSQL databases that will be used as source and target for replication. The pgstream
repository provides a docker installation that will be used for the purposes of this tutorial, but can be replaced by any available PostgreSQL servers, as long as they have wal2json
installed.
To start the docker provided PostgreSQL servers, run the following command:
# Start two PostgreSQL databases using Docker.
# The source database will run on port 5432, and the target database will run on port 7654.
docker-compose -f build/docker/docker-compose.yml --profile pg2pg up
This will start two PostgreSQL databases on ports 5432
and 7654
.
Once both PostgreSQL servers are up and running, the next step is to initialise pgstream on the source database. This will create the pgstream
schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. This step will also create a replication slot on the source database which will be used by the pgstream service.
The initialisation step allows to provide both the URL of the PostgreSQL database and the name of the replication slot to be created. The PostgreSQL URL is required, but the replication slot name is optional. If not provided, it will default to pgstream_<dbname>_slot
, where <dbname>
is the name of the PostgreSQL database. The configuration can be provided either by using the CLI supported parameters, or using the environment variables.
For this tutorial, we'll create a replication slot with the name pgstream_tutorial_slot
.
- Using CLI flags:
pgstream init --postgres-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --replication-slot pgstream_tutorial_slot
- Using environment variables:
PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot PGSTREAM_POSTGRES_LISTENER_URL=postgres://postgres:postgres@localhost:5432?sslmode=disable pgstream init
Successful initialisation should prompt the following message:
SUCCESS pgstream initialisation complete
If at any point the initialisation performed by pgstream needs to be reverted, all state will be removed by running the tear-down
CLI command.
pgstream tear-down --postgres-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --replication-slot pgstream_tutorial_slot
The listener configuration will be the same as the one in the PostgreSQL to PostgreSQL replication tutorial. Check the details here.
- Without initial snapshot
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot
- With initial snapshot
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot
PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
# Initial snapshot of all tables in the public schema
PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*"
The processor configuration will be the same as the one in the PostgreSQL to PostgreSQL replication tutorial. Check the details here.
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://postgres:postgres@localhost:7654?sslmode=disable"
PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=25
PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=5s
PGSTREAM_POSTGRES_WRITER_SCHEMALOG_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
In addition to the processor configuration above, we will add the transformer processor wrapper which will apply the column transformations. The only configuration required is the path to the transformer rules.
PGSTREAM_TRANSFORMER_RULES_FILE="tutorial_transformer_rules.yaml"
The transformer rules follow the pattern defined in the transfomer section of the documentation. To see all the transformers currently supported, check out the transformers section. For this tutorial we'll be using the following transformations:
transformations:
validation_mode: relaxed # Specifies the validation mode for the transformer.
table_transformers:
- schema: public
table: test
column_transformers:
email:
name: neosync_email
parameters:
preserve_length: true # Ensures the transformed email has the same length as the original.
preserve_domain: true # Keeps the domain of the original email intact.
email_type: fullname # Specifies the type of email transformation.
name:
name: greenmask_firstname
parameters:
generator: deterministic # Ensures the same input always produces the same output.
gender: Female # Generates female names for the transformation.
The full configuration for this tutorial can be put into a pg2pg_transformer_tutorial.env
file to be used in the next step. An equivalent pg2pg_transformer_tutorial.yaml
configuration can be found below the environment one, and can be used interchangeably.
- Without initial snapshot
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot
# Processor config
PGSTREAM_TRANSFORMER_RULES_FILE="tutorial_transformer_rules.yaml"
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://postgres:postgres@localhost:7654?sslmode=disable"
PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=25
PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=5s
PGSTREAM_POSTGRES_WRITER_SCHEMALOG_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
source:
postgres:
url: "postgres://postgres:postgres@localhost:5432?sslmode=disable"
mode: replication # options are replication, snapshot or snapshot_and_replication
replication:
replication_slot: pgstream_tutorial_slot
target:
postgres:
url: "postgres://postgres:postgres@localhost:7654?sslmode=disable"
batch:
timeout: 5000 # batch timeout in milliseconds
size: 25 # number of messages in a batch
disable_triggers: false # whether to disable triggers on the target database
on_conflict_action: "nothing" # options are update, nothing or error
modifiers:
transformations:
validation_mode: relaxed
table_transformers:
- schema: public
table: test
column_transformers:
email:
name: neosync_email
parameters:
preserve_length: true
preserve_domain: true
email_type: fullname
name:
name: greenmask_firstname
parameters:
generator: deterministic
gender: Female
- With initial snapshot
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot
PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
# Initial snapshot of all tables in the public schema
PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*"
# Processor config
PGSTREAM_TRANSFORMER_RULES_FILE="tutorial_transformer_rules.yaml"
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://postgres:postgres@localhost:7654?sslmode=disable"
PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=25
PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=5s
PGSTREAM_POSTGRES_WRITER_SCHEMALOG_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable"
source:
postgres:
url: "postgres://postgres:postgres@localhost:5432?sslmode=disable"
mode: snapshot_and_replication # options are replication, snapshot or snapshot_and_replication
replication:
replication_slot: pgstream_tutorial_slot
snapshot: # when mode is snapshot or snapshot_and_replication
mode: full # options are data_and, schema or data
tables: ["*"] # tables to snapshot, can be a list of table names or a pattern
recorder:
repeatable_snapshots: true # whether to repeat snapshots that have already been taken
postgres_url: "postgres://postgres:postgres@localhost:5432?sslmode=disable" # URL of the database where the snapshot status is recorded
schema: # when mode is full or schema
mode: pgdump_pgrestore # options are pgdump_pgrestore or schemalog
pgdump_pgrestore:
clean_target_db: false # whether to clean the target database before restoring
target:
postgres:
url: "postgres://postgres:postgres@localhost:7654?sslmode=disable"
batch:
timeout: 5000 # batch timeout in milliseconds
size: 25 # number of messages in a batch
disable_triggers: false # whether to disable triggers on the target database
on_conflict_action: "nothing" # options are update, nothing or error
modifiers:
transformations:
validation_mode: relaxed
table_transformers:
- schema: public
table: test
column_transformers:
email:
name: neosync_email
parameters:
preserve_length: true
preserve_domain: true
email_type: fullname
name:
name: greenmask_firstname
parameters:
generator: deterministic
gender: Female
We can validate that the initialisation and the configuration are valid by running the status
command before starting pgstream
.
# using yaml configuration file
./pgstream status -c pg2pg_transformer_tutorial.yaml
# using env configuration file
./pgstream status -c pg2pg_transformer_tutorial.env
SUCCESS pgstream status check encountered no issues
Initialisation status:
- Pgstream schema exists: true
- Pgstream schema_log table exists: true
- Migration current version: 7
- Migration status: success
- Replication slot name: pgstream_tutorial_slot
- Replication slot plugin: wal2json
- Replication slot database: postgres
Config status:
- Valid: true
Transformation rules status:
- Valid: true
Source status:
- Reachable: true
With the configuration ready, we can now run pgstream. In this case we set the log level as trace to provide more context for debugging and have more visibility into what pgstream is doing under the hood.
Important: Ensure that the source and target databases are running before proceeding.
# with the environment configuration
pgstream run -c pg2pg_transformer_tutorial.env --log-level trace
# with the yaml configuration
pgstream run -c pg2pg_transformer_tutorial.yaml --log-level trace
# with the CLI flags and relying on defaults
PGSTREAM_TRANSFORMER_RULES_FILE="tutorial_transformer_rules.yaml" pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target postgres --target-url "postgres://postgres:postgres@localhost:7654?sslmode=disable" --log-level trace
Now we can connect to the source database, create a table and start inserting data.
➜ psql postgresql://postgres:postgres@localhost:5432/postgres
CREATE TABLE test(id SERIAL PRIMARY KEY, name TEXT, email TEXT);
INSERT INTO test(name,email) VALUES('alice','alice@test.com'),('bob','bob@test.com'),('charlie','charlie@test.com');
SELECT * FROM test;
+----+---------+------------------+
| id | name | email |
|----+---------+------------------|
| 1 | alice | alice@test.com |
| 2 | bob | bob@test.com |
| 3 | charlie | charlie@test.com |
+----+---------+------------------+
When we check the target PostgreSQL database, we should see the three rows have been inserted, but both the name and the emails have been anonymised.
➜ psql postgresql://postgres:postgres@localhost:7654/postgres
SELECT * FROM test;
+----+---------+------------------+
| id | name | email |
|----+---------+------------------|
| 1 | Herman | ecdox@test.com |
| 2 | Domingo | nab@test.com |
| 3 | Elwin | aafsack@test.com |
+----+---------+------------------+
Since we used the deterministic transformation for the name column, if we insert the same names again the value generated should be the same.
➜ psql postgresql://postgres:postgres@localhost:5432/postgres
INSERT INTO test(name,email) VALUES('alice','alice@test.com'),('bob','bob@test.com'),('charlie','charlie@test.com');
SELECT * FROM test;
+----+---------+------------------+
| id | name | email |
|----+---------+------------------|
| 1 | alice | alice@test.com |
| 2 | bob | bob@test.com |
| 3 | charlie | charlie@test.com |
| 4 | alice | alice@test.com |
| 5 | bob | bob@test.com |
| 6 | charlie | charlie@test.com |
+----+---------+------------------+
➜ psql postgresql://postgres:postgres@localhost:7654/postgres
SELECT * FROM test;
+----+---------+------------------+
| id | name | email |
|----+---------+------------------|
| 1 | Herman | ecdox@test.com |
| 2 | Domingo | nab@test.com |
| 3 | Elwin | aafsack@test.com |
| 4 | Herman | rwamy@test.com |
| 5 | Domingo | wie@test.com |
| 6 | Elwin | cailbni@test.com |
+----+---------+------------------+
To see all the transformers currently supported, check out the transformers section.
-
Error:
Connection refused
- Ensure the Docker containers for the source and target databases are running.
- Verify the database URLs in the configuration.
-
Error:
Replication slot not found
- Check that the replication slot was created successfully during initialization.
- Run the
pgstream status
command to validate the initialisation was successful. - Run the following query on the source database to verify:
SELECT slot_name FROM pg_replication_slots;
-
Error:
Transformation rules not applied
- Ensure the
PGSTREAM_TRANSFORMER_RULES_FILE
points to the correct YAML file. - Validate the syntax of the transformer rules using the pgstream documentation.
- Run the
pgstream status
command to validate the transformation rules are correct.
- Ensure the
In this tutorial, we successfully configured pgstream
to replicate data from a source PostgreSQL database to a target PostgreSQL database, applying transformations to anonymize sensitive data during replication. For more use cases, refer to the pgstream documentation.