Skip to content

Commit

Permalink
Merge pull request #2918 from redpanda-data/redpanda-input-output
Browse files Browse the repository at this point in the history
Refactor kafka components to reduce duplication
  • Loading branch information
Jeffail authored Nov 7, 2024
2 parents 44d711a + 6776618 commit ad34c85
Show file tree
Hide file tree
Showing 49 changed files with 7,635 additions and 3,744 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ jobs:
- name: Test
run: make test

- name: Fmt
run: |
go install golang.org/x/tools/cmd/goimports@latest
make fmt && test -z "$(git ls-files --others --modified --exclude-standard)" || { >&2 echo "Unformatted files detected. This can be fixed with 'make fmt'."; exit 1; }
golangci-lint:
if: ${{ github.repository == 'redpanda-data/connect' || github.event_name != 'schedule' }}
runs-on: ubuntu-latest
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ All notable changes to this project will be documented in this file.
- (Benthos) Field `follow_redirects` added to the `http` processor. (@ooesili)
- New CLI flag `--secrets` added. (@Jeffail)
- New CLI flag `--disable-telemetry` added. (@Jeffail)
- New experimental `spicedb` watch input.
- New experimental `spicedb` watch input. (@simon0191)
- New `redpanda_common` input and output. (@Jeffail)
- New `redpanda` input and output. (@Jeffail)
- New `snowflake_streaming` output. (@rockwotj)

### Fixed

Expand Down
8 changes: 6 additions & 2 deletions cmd/tools/docs_gen/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/public/schema"

_ "github.com/redpanda-data/connect/v4/public/components/all"
)

func TestComponentExamples(t *testing.T) {
env := service.GlobalEnvironment()
linter := env.FullConfigSchema("", "").NewStreamConfigLinter()
sch := schema.Standard("", "")
env := sch.Environment()

linter := sch.NewStreamConfigLinter()
linter.SetRejectDeprecated(true)
linter.SetSkipEnvVarCheck(true)

Expand Down
255 changes: 143 additions & 112 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,7 @@ input:
label: ""
kafka_franz:
seed_brokers: [] # No default (required)
topics: [] # No default (required)
regexp_topics: false
consumer_group: "" # No default (optional)
client_id: benthos
rack_id: ""
checkpoint_limit: 1024
auto_replay_nacks: true
commit_period: 5s
start_from_oldest: true
tls:
enabled: false
skip_cert_verify: false
Expand All @@ -74,14 +66,25 @@ input:
root_cas_file: ""
client_certs: []
sasl: [] # No default (optional)
metadata_max_age: 5m
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_min_bytes: 1B
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
checkpoint_limit: 1024
commit_period: 5s
multi_header: false
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
metadata_max_age: 5m
auto_replay_nacks: true
```
--
Expand All @@ -100,6 +103,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp
- kafka_timestamp_unix
- kafka_tombstone_message
- All record headers
Expand Down Expand Up @@ -130,60 +134,6 @@ seed_brokers:
- foo:9092,bar:9092
```
=== `topics`
A list of topics to consume from. Multiple comma separated topics can be listed in a single element. When a `consumer_group` is specified partitions are automatically distributed across consumers of a topic, otherwise all partitions are consumed.
Alternatively, it's possible to specify explicit partitions to consume from with a colon after the topic name, e.g. `foo:0` would consume the partition 0 of the topic foo. This syntax supports ranges, e.g. `foo:0-10` would consume partitions 0 through to 10 inclusive.
Finally, it's also possible to specify an explicit offset to consume from by adding another colon after the partition, e.g. `foo:0:10` would consume the partition 0 of the topic foo starting from the offset 10. If the offset is not present (or remains unspecified) then the field `start_from_oldest` determines which offset to start from.
*Type*: `array`
```yml
# Examples
topics:
- foo
- bar
topics:
- things.*
topics:
- foo,bar
topics:
- foo:0
- bar:1
- bar:3
topics:
- foo:0,bar:1,bar:3
topics:
- foo:0-5
```
=== `regexp_topics`
Whether listed topics should be interpreted as regular expression patterns for matching multiple topics. When topics are specified with explicit partitions this field must remain set to `false`.
*Type*: `bool`
*Default*: `false`
=== `consumer_group`
An optional consumer group to consume as. When specified the partitions of specified topics are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically committed and resumed under this name. Consumer groups are not supported when specifying explicit partitions to consume from in the `topics` field.
*Type*: `string`
=== `client_id`
An identifier for the client connection.
Expand All @@ -193,51 +143,6 @@ An identifier for the client connection.
*Default*: `"benthos"`
=== `rack_id`
A rack identifier for this client.
*Type*: `string`
*Default*: `""`
=== `checkpoint_limit`
Determines how many messages of the same partition can be processed in parallel before applying back pressure. When a message of a given offset is delivered to the output the offset is only allowed to be committed when all messages of prior offsets have also been delivered, this ensures at-least-once delivery guarantees. However, this mechanism also increases the likelihood of duplicates in the event of crashes or server faults, reducing the checkpoint limit will mitigate this.
*Type*: `int`
*Default*: `1024`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
*Type*: `bool`
*Default*: `true`
=== `commit_period`
The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown.
*Type*: `string`
*Default*: `"5s"`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
*Type*: `bool`
*Default*: `true`
=== `tls`
Custom TLS settings can be used to override system defaults.
Expand Down Expand Up @@ -584,6 +489,132 @@ An external ID to provide when assuming a role.
*Default*: `""`
=== `metadata_max_age`
The maximum age of metadata before it is refreshed.
*Type*: `string`
*Default*: `"5m"`
=== `topics`
A list of topics to consume from. Multiple comma separated topics can be listed in a single element. When a `consumer_group` is specified partitions are automatically distributed across consumers of a topic, otherwise all partitions are consumed.
Alternatively, it's possible to specify explicit partitions to consume from with a colon after the topic name, e.g. `foo:0` would consume the partition 0 of the topic foo. This syntax supports ranges, e.g. `foo:0-10` would consume partitions 0 through to 10 inclusive.
Finally, it's also possible to specify an explicit offset to consume from by adding another colon after the partition, e.g. `foo:0:10` would consume the partition 0 of the topic foo starting from the offset 10. If the offset is not present (or remains unspecified) then the field `start_from_oldest` determines which offset to start from.
*Type*: `array`
```yml
# Examples
topics:
- foo
- bar
topics:
- things.*
topics:
- foo,bar
topics:
- foo:0
- bar:1
- bar:3
topics:
- foo:0,bar:1,bar:3
topics:
- foo:0-5
```
=== `regexp_topics`
Whether listed topics should be interpreted as regular expression patterns for matching multiple topics. When topics are specified with explicit partitions this field must remain set to `false`.
*Type*: `bool`
*Default*: `false`
=== `rack_id`
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
*Type*: `bool`
*Default*: `true`
=== `fetch_max_bytes`
Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.
*Type*: `string`
*Default*: `"50MiB"`
=== `fetch_min_bytes`
Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.
*Type*: `string`
*Default*: `"1B"`
=== `fetch_max_partition_bytes`
Sets the maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This is the equivalent to the Java fetch.max.partition.bytes setting.
*Type*: `string`
*Default*: `"1MiB"`
=== `consumer_group`
An optional consumer group to consume as. When specified the partitions of specified topics are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically committed and resumed under this name. Consumer groups are not supported when specifying explicit partitions to consume from in the `topics` field.
*Type*: `string`
=== `checkpoint_limit`
Determines how many messages of the same partition can be processed in parallel before applying back pressure. When a message of a given offset is delivered to the output the offset is only allowed to be committed when all messages of prior offsets have also been delivered, this ensures at-least-once delivery guarantees. However, this mechanism also increases the likelihood of duplicates in the event of crashes or server faults, reducing the checkpoint limit will mitigate this.
*Type*: `int`
*Default*: `1024`
=== `commit_period`
The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown.
*Type*: `string`
*Default*: `"5s"`
=== `multi_header`
Decode headers into lists to allow handling of multiple values with the same key
Expand Down Expand Up @@ -695,13 +726,13 @@ processors:
format: json_array
```
=== `metadata_max_age`
=== `auto_replay_nacks`
The maximum age of metadata before it is refreshed.
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
*Type*: `string`
*Type*: `bool`
*Default*: `"5m"`
*Default*: `true`
Loading

0 comments on commit ad34c85

Please sign in to comment.