Skip to content

Commit

Permalink
Switch the redpanda_migrator to the new ack mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Dec 11, 2024
1 parent 2fcbb8c commit a86bdbd
Show file tree
Hide file tree
Showing 19 changed files with 542 additions and 839 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@ All notable changes to this project will be documented in this file.
- The `redpanda_migrator_bundle` output now skips schema ID translation when `translate_schema_ids: false` and `schema_registry` is configured. (@mihaitodor)
- The `redpanda_migrator` output no longer rejects messages if it can't perform schema ID translation. (@mihaitodor)
- The `redpanda_migrator` input no longer converts the kafka key to string. (@mihaitodor)
- Field `multi_header` for the `redpanda_migrator` input is now deprecated. (@mihaitodor)

### Added

- New `redpanda_migrator_offsets` input. (@mihaitodor)
- Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
- Fields `kafka_key` and `max_in_flight` for the `redpanda_migrator_offsets` output are now deprecated. (@mihaitodor)
- Fields `batching` for the `redpanda_migrator` output is now deprecated. (@mihaitodor)
- Field `topic_lag_refresh_period` added to the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metric `redpanda_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metadata `kafka_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)

### Changed

- The `kafka_key` and `max_in_flight` fields of the `redpanda_migrator_offsets` output have been deprecated.
- Fields `batch_size` and `multi_header` for the `redpanda_migrator` input are now deprecated. (@mihaitodor)

## 4.43.0 - 2024-12-05

Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```
Expand Down Expand Up @@ -114,6 +115,10 @@ output:
Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.
== Metrics
Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.
== Metadata
This input adds the following metadata fields to each message:
Expand All @@ -123,6 +128,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -635,6 +641,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
The period of time between each topic lag refresh cycle.
*Type*: `string`
*Default*: `"5s"`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```
Expand Down Expand Up @@ -100,6 +101,10 @@ output:
Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.
== Metrics
Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.
== Metadata
This input adds the following metadata fields to each message:
Expand All @@ -109,6 +114,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -235,6 +241,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
The period of time between each topic lag refresh cycle.
*Type*: `string`
*Default*: `"5s"`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
38 changes: 14 additions & 24 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ input:
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
commit_period: 5s
batch_size: 1024
auto_replay_nacks: true
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
output_resource: redpanda_migrator_output
replication_factor_override: true
replication_factor: 3
multi_header: false
```
--
Expand Down Expand Up @@ -613,32 +612,32 @@ The period of time between each commit of the current partition offsets. Offsets
*Default*: `"5s"`
=== `batch_size`
=== `partition_buffer_bytes`
The maximum number of messages that should be accumulated into each batch.
A buffer size (in bytes) for each consumed partition, allowing records to be queued internally before flushing. Increasing this may improve throughput at the cost of higher memory utilisation. Note that each buffer can grow slightly beyond this value.
*Type*: `int`
*Type*: `string`
*Default*: `1024`
*Default*: `"1MB"`
=== `auto_replay_nacks`
=== `topic_lag_refresh_period`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
The period of time between each topic lag refresh cycle.
*Type*: `bool`
*Type*: `string`
*Default*: `true`
*Default*: `"5s"`
=== `topic_lag_refresh_period`
=== `auto_replay_nacks`
The period of time between each topic lag refresh cycle.
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
*Type*: `string`
*Type*: `bool`
*Default*: `"5s"`
*Default*: `true`
=== `output_resource`
Expand Down Expand Up @@ -667,13 +666,4 @@ Replication factor for created topics. This is only used when `replication_facto
*Default*: `3`
=== `multi_header`
Decode headers into lists to allow handling of multiple values with the same key
*Type*: `bool`
*Default*: `false`
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```
Expand Down Expand Up @@ -585,6 +586,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
The period of time between each topic lag refresh cycle.
*Type*: `string`
*Default*: `"5s"`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
119 changes: 3 additions & 116 deletions docs/modules/components/pages/outputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
max_in_flight: 10
batching:
count: 0
byte_size: 0
period: ""
check: ""
max_in_flight: 256
```
--
Expand Down Expand Up @@ -82,13 +77,7 @@ output:
include_prefixes: []
include_patterns: []
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
max_in_flight: 10
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 256
input_resource: redpanda_migrator_input
replication_factor_override: true
replication_factor: 3
Expand Down Expand Up @@ -641,109 +630,7 @@ The maximum number of batches to be sending in parallel at any given time.
*Type*: `int`
*Default*: `10`
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
*Type*: `object`
```yml
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```
=== `batching.count`
A number of messages at which the batch should be flushed. If `0` disables count based batching.
*Type*: `int`
*Default*: `0`
=== `batching.byte_size`
An amount of bytes at which the batch should be flushed. If `0` disables size based batching.
*Type*: `int`
*Default*: `0`
=== `batching.period`
A period in which an incomplete batch should be flushed regardless of its size.
*Type*: `string`
*Default*: `""`
```yml
# Examples
period: 1s
period: 1m
period: 500ms
```
=== `batching.check`
A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.
*Type*: `string`
*Default*: `""`
```yml
# Examples
check: this.type == "end_of_transaction"
```
=== `batching.processors`
A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
*Type*: `array`
```yml
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
```
*Default*: `256`
=== `input_resource`
Expand Down
2 changes: 1 addition & 1 deletion internal/asyncroutine/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewPeriodicWithContext(duration time.Duration, work func(context.Context))

// Start starts the `Periodic` work.
//
// It does not do work immedately, only after the time has passed.
// It does not do work immediately, only after the time has passed.
func (p *Periodic) Start() {
if p.cancel != nil {
return
Expand Down
Loading

0 comments on commit a86bdbd

Please sign in to comment.