Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL CDC Plugin #3014

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
76ef366
feat(mysql_cdc): working on mysql_cdc plugin
le-vlad Nov 6, 2024
19dab6e
chore(): continue working on mysql cdc
le-vlad Nov 8, 2024
29c950f
chore(): removed in test secret
le-vlad Nov 8, 2024
3391a64
chore(): working on mysql cdc
le-vlad Nov 13, 2024
b0747e2
chore(): added mysql tests
le-vlad Nov 13, 2024
77fbcc2
chore(): multi-version testing for mysql cdc
le-vlad Nov 14, 2024
2e9b57c
chore(): implemented snapshot support
le-vlad Nov 18, 2024
901b4fe
chore(): reset dumper config
le-vlad Nov 19, 2024
f275aa7
fix(): table filtering for mysql
le-vlad Nov 19, 2024
1927dbe
fix(): golangci-lint
le-vlad Nov 20, 2024
f074e50
chore(): added table validation
le-vlad Nov 21, 2024
882c8d1
fix(): lint
le-vlad Nov 21, 2024
86238d5
fix(): added integration.CheckSkip(t) to mysql cdc tests
le-vlad Nov 21, 2024
dadabe0
chore(): work on pr notes
le-vlad Nov 27, 2024
193c574
chore(): updated comments and use ctx from shutdown
le-vlad Nov 28, 2024
8a8eb8b
chore(): removed rows count && small pr notes
le-vlad Dec 4, 2024
2373dee
mysqlcdc: rename component
rockwotj Dec 10, 2024
166f1dc
mysql: seperate cache key vs cache that is used in configs
rockwotj Dec 10, 2024
6643ea4
snapshot: cleanup context usage
rockwotj Dec 10, 2024
78af575
mysql: fold mode into operation
rockwotj Dec 10, 2024
786480c
mysql: fix ack fn
rockwotj Dec 10, 2024
eead6dc
mysql: use error return type
rockwotj Dec 10, 2024
7498891
mysql: simplify constructor
rockwotj Dec 10, 2024
3b3998d
mysql: cleaner import name
rockwotj Dec 10, 2024
586b387
mysql: escape table regex
rockwotj Dec 10, 2024
c5328df
mysql: simplify checkpointer
rockwotj Dec 10, 2024
b5fad95
mycdc: use lexicographically ordered binlog position
rockwotj Dec 10, 2024
5555c92
mycdc: draw the rest of the owl
rockwotj Dec 10, 2024
dbbad4a
mycdc: make the linter happy
rockwotj Dec 10, 2024
63c320e
mycdc: cleanup integration tests
rockwotj Dec 10, 2024
b06d218
mycdc: decode stream message by type
rockwotj Dec 10, 2024
8ede4a6
mycdc: add more types
rockwotj Dec 10, 2024
b0f8ea7
mycdc: fix snapshot cleanup and missing pks
rockwotj Dec 20, 2024
30148d0
mycdc: fix shutdown hang
rockwotj Dec 20, 2024
243eee5
mycdc: fix snapshot types
rockwotj Dec 20, 2024
87986f8
mycdc: handle all types :tada:
rockwotj Dec 20, 2024
0f6efb5
mycdc: cleanup tests for MySQL CDC
rockwotj Dec 20, 2024
a2dd9ca
mycdc: handle nil streaming values
rockwotj Dec 20, 2024
f34111c
add changelog entry
rockwotj Dec 20, 2024
e118968
make linter happy
rockwotj Dec 20, 2024
e3d2b7d
mycdc: add comment about txn
rockwotj Jan 11, 2025
a873c23
Fix some nitpicks
mihaitodor Jan 12, 2025
a144500
Add to public components and generate docs
mihaitodor Jan 12, 2025
673f51a
Re-enable the rowserrcheck linter
mihaitodor Jan 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
- `snowpipe_streaming` now supports exactly once delivery using `offset_token`. (@rockwotj)
- `ollama_chat` now supports tool calling. (@rockwotj)
- New `ollama_moderation` which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj)
- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)

### Fixed

Expand Down
281 changes: 281 additions & 0 deletions docs/modules/components/pages/inputs/mysql_cdc.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
= mysql_cdc
:type: input
:status: beta
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Enables MySQL streaming for RedPanda Connect.

Introduced in version 4.45.0.


[tabs]
======
Common::
+
--

```yml
# Common config fields, showing default values
input:
label: ""
mysql_cdc:
dsn: user:password@tcp(localhost:3306)/database # No default (required)
tables: [] # No default (required)
checkpoint_cache: "" # No default (required)
checkpoint_key: mysql_binlog_position
snapshot_max_batch_size: 1000
stream_snapshot: false # No default (required)
auto_replay_nacks: true
checkpoint_limit: 1024
batching:
count: 0
byte_size: 0
period: ""
check: ""
```

--
Advanced::
+
--

```yml
# All config fields, showing default values
input:
label: ""
mysql_cdc:
dsn: user:password@tcp(localhost:3306)/database # No default (required)
tables: [] # No default (required)
checkpoint_cache: "" # No default (required)
checkpoint_key: mysql_binlog_position
snapshot_max_batch_size: 1000
stream_snapshot: false # No default (required)
auto_replay_nacks: true
checkpoint_limit: 1024
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
```

--
======

== Metadata

This input adds the following metadata fields to each message:

- operation
- table
- binlog_position


== Fields

=== `dsn`

The DSN of the MySQL database to connect to.


*Type*: `string`


```yml
# Examples

dsn: user:password@tcp(localhost:3306)/database
```

=== `tables`

A list of tables to stream from the database.


*Type*: `array`


```yml
# Examples

tables:
- table1
- table2
```

=== `checkpoint_cache`

A https://www.docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] to use for storing the current latest BinLog Position that has been successfully delivered, this allows Redpanda Connect to continue from that BinLog Position upon restart, rather than consume the entire state of the table.


*Type*: `string`


=== `checkpoint_key`

The key to use to store the snapshot position in `checkpoint_cache`. An alternative key can be provided if multiple CDC inputs share the same cache.


*Type*: `string`

*Default*: `"mysql_binlog_position"`

=== `snapshot_max_batch_size`

The maximum number of rows to be streamed in a single batch when taking a snapshot.


*Type*: `int`

*Default*: `1000`

=== `stream_snapshot`

If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current binlog position.


*Type*: `bool`


=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`

=== `checkpoint_limit`

The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given BinLog Position will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.


*Type*: `int`

*Default*: `1024`

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].


*Type*: `object`


```yml
# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```

=== `batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.


*Type*: `int`

*Default*: `0`

=== `batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.


*Type*: `int`

*Default*: `0`

=== `batching.period`

A period in which an incomplete batch should be flushed regardless of its size.


*Type*: `string`

*Default*: `""`

```yml
# Examples

period: 1s

period: 1m

period: 500ms
```

=== `batching.check`

A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.


*Type*: `string`

*Default*: `""`

```yml
# Examples

check: this.type == "end_of_transaction"
```

=== `batching.processors`

A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.


*Type*: `array`


```yml
# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array
```


9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ require (
github.com/getsentry/sentry-go v0.28.1
github.com/go-faker/faker/v4 v4.4.2
github.com/go-jose/go-jose/v3 v3.0.3
github.com/go-mysql-org/go-mysql v1.10.0
github.com/go-resty/resty/v2 v2.15.3
github.com/go-sql-driver/mysql v1.8.1
github.com/gocql/gocql v1.6.0
Expand Down Expand Up @@ -161,8 +162,10 @@ require (
cloud.google.com/go/spanner v1.73.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect
Expand All @@ -177,8 +180,14 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/gomega v1.34.2 // indirect
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
github.com/pingcap/tidb/pkg/parser v0.0.0-20241118164214-4f047be191be // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand Down
Loading
Loading