-
Notifications
You must be signed in to change notification settings - Fork 847
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples: add a postgres replication example using CDC
Note that we can't dynamically do anything with the schema at the moment, we don't emit schema changes and creating the migration statements would require a complicated sql_raw output.
- Loading branch information
Showing
1 changed file
with
54 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
input: | ||
postgres_cdc: | ||
dsn: postgres://me:foobar@localhost:5432?sslmode=disable | ||
include_transaction_markers: true | ||
slot_name: test_slot_native_decoder | ||
stream_snapshot: true | ||
schema: public | ||
tables: [my_src_table] | ||
# Group by transaction, each message batch is all rows changed in a transaction | ||
# this might be massive, but might be required for foreign key constraints | ||
batching: | ||
check: '@operation == "commit"' | ||
period: 10s | ||
processors: | ||
# But drop the placeholder messages for start/end transaction | ||
- mapping: | | ||
root = if @operation == "begin" || @operation == "commit" { | ||
deleted() | ||
} else { | ||
this | ||
} | ||
output: | ||
# Dispatch the write based on the operation metadata | ||
switch: | ||
strict_mode: true | ||
cases: | ||
- check: '@operation == "read" || @operation == "insert"' | ||
output: | ||
sql_insert: | ||
driver: postgres | ||
dsn: postgres://me:foobar@localhost:5432?sslmode=disable | ||
table: my_dst_table | ||
columns: [id, foo, bar] | ||
args_mapping: root = [this.id, this.foo, this.bar] | ||
init_statement: | | ||
CREATE TABLE IF NOT EXISTS my_dst_table ( | ||
id serial PRIMARY KEY, | ||
foo text, | ||
bar timestamp | ||
); | ||
- check: '@operation == "update"' | ||
output: | ||
sql_raw: | ||
driver: postgres | ||
dsn: postgres://me:foobar@localhost:5432?sslmode=disable | ||
query: UPDATE my_dst_table SET foo = $1, bar = $2 WHERE id = $3 | ||
args_mapping: root = [this.foo, this.bar, this.id] | ||
- check: '@operation == "delete"' | ||
output: | ||
sql_raw: | ||
driver: postgres | ||
dsn: postgres://me:foobar@localhost:5432?sslmode=disable | ||
query: DELETE FROM my_dst_table WHERE id = $1 | ||
args_mapping: root = [this.id] |