Skip to content

Commit

Permalink
Merge pull request #3060 from redpanda-data/snow-once
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Jan 7, 2025
2 parents 589d841 + 1975bcd commit ccf4086
Show file tree
Hide file tree
Showing 18 changed files with 1,739 additions and 482 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ All notable changes to this project will be documented in this file.
- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)
- `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, `redpanda_migrator` now support `fetch_max_wait` configuration field.
- `snowpipe_streaming` now supports interpolating table names (@rockwotj)
- `snowpipe_streaming` now supports interpolating channel names (@rockwotj)
- `snowpipe_streaming` now supports exactly once delivery using `offset_token` (@rockwotj)

### Fixed

Expand Down
207 changes: 174 additions & 33 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ output:
account: ORG-ACCOUNT # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
database: MY_DATABASE # No default (required)
schema: PUBLIC # No default (required)
table: MY_TABLE # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
Expand Down Expand Up @@ -83,9 +83,9 @@ output:
account: ORG-ACCOUNT # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
database: MY_DATABASE # No default (required)
schema: PUBLIC # No default (required)
table: MY_TABLE # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
Expand Down Expand Up @@ -113,7 +113,9 @@ output:
check: ""
processors: [] # No default (optional)
max_in_flight: 4
channel_prefix: "" # No default (optional)
channel_prefix: channel-${HOST} # No default (optional)
channel_name: partition-${!@kafka_partition} # No default (optional)
offset_token: offset-${!"%016X".format(@kafka_offset)} # No default (optional)
```
--
Expand Down Expand Up @@ -155,23 +157,70 @@ You can monitor the output batch size using the `snowflake_compressed_output_siz
[tabs]
======
Ingesting data from Redpanda::
Exactly once CDC into Snowflake::
+
--
How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake.
How to send data from a PostgreSQL table into Snowflake exactly once using Postgres Logical Replication.
NOTE: If attempting to do exactly-once it's important that rows are delivered in order to the output. Be sure to read the documentation for offset_token first.
Removing the offset_token is a safer option that will instruct Redpanda Connect to use its default at-least-once delivery model instead.
```yaml
input:
postgres_cdc:
dsn: postgres://foouser:foopass@localhost:5432/foodb
schema: "public"
tables: ["my_pg_table"]
# We want very large batches - each batch will be sent to Snowflake individually
# so to optimize query performance we want as big of files as we have memory for
batching:
count: 50000
period: 45s
# Prevent multiple batches from being in flight at once, so that we never send
# a batch while another batch is being retried, this is important to ensure that
# the Snowflake Snowpipe Streaming channel does not see older data - as it will
# assume that the older data is already committed.
checkpoint_limit: 1
output:
snowflake_streaming:
# We use the log sequence number in the WAL from Postgres to ensure we
# only upload data exactly once, these are already lexicographically
# ordered.
offset_token: "${!@lsn}"
# Since we're sending a single ordered log, we can only send one thing
# at a time to ensure that we're properly incrementing our offset_token
# and only using a single channel at a time.
max_in_flight: 1
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MY_PG_TABLE"
private_key_file: "my/private/key.p8"
```
--
Ingesting data exactly once from Redpanda::
+
--
How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake exactly once.
NOTE: If attempting to do exactly-once its important that records are delivered in order to the output and correctly partitioned. Be sure to read the documentation for
channel_name and offset_token first. Removing the offset_token is a safer option that will instruct Redpanda Connect to use its default at-least-once delivery model instead.
```yaml
input:
kafka_franz:
seed_brokers: ["redpanda.example.com:9092"]
redpanda_common:
topics: ["my_topic_going_to_snow"]
consumer_group: "redpanda_connect_to_snowflake"
tls: {enabled: true}
sasl:
- mechanism: SCRAM-SHA-256
username: MY_USER_NAME
password: "${TODO}"
# We want very large batches - each batch will be sent to Snowflake individually
# so to optimize query performance we want as big of files as we have memory for
fetch_max_bytes: 100MiB
fetch_min_bytes: 50MiB
partition_buffer_bytes: 100MiB
pipeline:
processors:
- schema_registry_decode:
Expand All @@ -181,25 +230,34 @@ pipeline:
username: MY_USER_NAME
password: "${TODO}"
output:
snowflake_streaming:
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
schema_evolution:
enabled: true
fallback:
- snowflake_streaming:
# To ensure that we write an ordered stream each partition in kafka gets its own
# channel.
channel_name: "partition-${!@kafka_partition}"
# Ensure that our offsets are lexicographically sorted in string form by padding with
# leading zeros
offset_token: offset-${!"%016X".format(@kafka_offset)}
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
schema_evolution:
enabled: true
# In order to prevent delivery orders from messing with the order of delivered records
# it's important that failures are immediately sent to a dead letter queue and not retried
# to Snowflake. See the ordering documentation for the "redpanda" input for more details.
- retry:
output:
redpanda_common:
topic: "dead_letter_queue"
```
--
HTTP Sidecar to push data to Snowflake::
HTTP Server to push data to Snowflake::
+
--
Expand Down Expand Up @@ -231,6 +289,13 @@ output:
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
schema_evolution:
enabled: true
```
--
Expand Down Expand Up @@ -282,6 +347,12 @@ The Snowflake database to ingest data into.
*Type*: `string`
```yml
# Examples
database: MY_DATABASE
```
=== `schema`
The Snowflake schema to ingest data into.
Expand All @@ -290,14 +361,27 @@ The Snowflake schema to ingest data into.
*Type*: `string`
```yml
# Examples
schema: PUBLIC
```
=== `table`
The Snowflake table to ingest data into.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
table: MY_TABLE
```
=== `private_key`
The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified.
Expand Down Expand Up @@ -527,14 +611,71 @@ The maximum number of messages to have in flight at a given time. Increase this
The prefix to use when creating a channel name.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
By default this will create a channel name that is based on the table FQN so there will only be a single stream per table.
By default if neither `channel_prefix` or `channel_name is specified then the output will create a channel name that is based on the table FQN so there will only be a single stream per table.
At most `max_in_flight` channels will be opened.
This option is mutually exclusive with `channel_name`.
NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.
*Type*: `string`
```yml
# Examples
channel_prefix: channel-${HOST}
```
=== `channel_name`
The channel name to use.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
Note that batches are assumed to all contain messages for the same channel, so this interpolation is only executed on the first
message in each batch. It's recommended to batch at the input level to ensure that batches contain messages for the same channel
if using an input that is partitioned (such as an Apache Kafka topic).
This option is mutually exclusive with `channel_prefix`.
NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
channel_name: partition-${!@kafka_partition}
```
=== `offset_token`
The offset token to use for exactly once delivery of data in the pipeline. When data is sent on a channel, each message in a batch's offset token
is compared to the latest token for a channel. If the offset token is lexicographically less than the latest in the channel, it's assumed the message is a duplicate and
is dropped. This means it is *very important* to have ordered delivery to the output, any out of order messages to the output will be seen as duplicates and dropped.
Specifically this means that retried messages could be seen as duplicates if later messages have succeeded in the meantime, so in most circumstances a dead letter queue
output should be employed for failed messages.
NOTE: It's assumed that messages within a batch are in increasing order by offset token, additionally if you're using a numeric value as an offset token, make sure to pad
the value so that it's lexicographically ordered in its string representation, since offset tokens are compared in string form.
For more information about offset tokens, see https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens[^Snowflake Documentation]
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
offset_token: offset-${!"%016X".format(@kafka_offset)}
offset_token: postgres-${!@lsn}
```
Loading

0 comments on commit ccf4086

Please sign in to comment.