Skip to content

Commit

Permalink
Merge pull request #2840 from redpanda-data/mihaitodor-update-kafka-m…
Browse files Browse the repository at this point in the history
…igrator

Make Schema Registry optional in the kafka_migrator_bundle components
  • Loading branch information
Jeffail authored Sep 6, 2024
2 parents 50d1da3 + 184fb4a commit 7ec354c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ mapping: |
let kafkaMigratorOffsets = this.kafka_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "tls", "sasl").assign({"topics": ["__consumer_offsets"]})
root = if this.kafka_migrator.length() == 0 || this.schema_registry.length() == 0 {
throw("both kafka_migrator and schema_registry inputs must be configured")
} else if this.migrate_schemas_before_data {
root = if this.kafka_migrator.length() == 0 {
throw("the kafka_migrator input must be configured")
} else if this.migrate_schemas_before_data && this.schema_registry.length() > 0 {
"""
sequence:
inputs:
Expand All @@ -67,7 +67,7 @@ mapping: |
processors:
- mapping: meta input_label = "kafka_migrator_offsets"
""".format(this.schema_registry.string(), this.kafka_migrator.string(), $kafkaMigratorOffsets.string()).parse_yaml()
} else {
} else if this.schema_registry.length() > 0 {
"""
broker:
inputs:
Expand All @@ -90,17 +90,30 @@ mapping: |
processors:
- mapping: meta input_label = "kafka_migrator_offsets"
""".format(this.schema_registry.string(), this.kafka_migrator.string(), $kafkaMigratorOffsets.string()).parse_yaml()
} else {
"""
broker:
inputs:
- kafka_migrator: %s
processors:
- mapping: meta input_label = "kafka_migrator"
- kafka_franz: %s
processors:
- mapping: meta input_label = "kafka_migrator_offsets"
""".format(this.kafka_migrator.string(), $kafkaMigratorOffsets.string()).parse_yaml()
}
tests:
- name: Migrate both data and schemas simultaneously
- name: Migrate messages, offsets and schemas simultaneously
config:
kafka_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
schema_registry:
url: http://localhost:8081

migrate_schemas_before_data: false

expected:
broker:
inputs:
Expand Down Expand Up @@ -128,14 +141,13 @@ tests:
processors:
- mapping: meta input_label = "kafka_migrator_offsets"

- name: Migrate schemas first
- name: Migrate schemas first, then messages and offsets
config:
kafka_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
schema_registry:
url: http://localhost:8081
migrate_schemas_before_data: true

expected:
sequence:
Expand Down Expand Up @@ -167,3 +179,25 @@ tests:
topics: [ "__consumer_offsets" ]
processors:
- mapping: meta input_label = "kafka_migrator_offsets"

- name: Migrate messages and offsets without schemas
config:
kafka_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]

expected:
broker:
inputs:
- kafka_migrator:
seed_brokers:
- 127.0.0.1:9092
topics:
- foobar
processors:
- mapping: meta input_label = "kafka_migrator"
- kafka_franz:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
processors:
- mapping: meta input_label = "kafka_migrator_offsets"
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ mapping: |
if this.schema_registry.keys().contains("subject") {
root = throw("The subject field of the schema_registry output must not be set")
}
let schema_registry = this.schema_registry.assign({"subject": "${! @schema_registry_subject }"})
let schema_registry = if this.schema_registry.length() > 0 { this.schema_registry.assign({"subject": "${! @schema_registry_subject }"}) }
if ["topic", "key", "partition", "partitioner", "timestamp"].any(f -> this.kafka_migrator.keys().contains(f)) {
root = throw("The topic, key, partition, partitioner and timestamp fields of the kafka_migrator output must be left empty")
}
root = if this.kafka_migrator.length() == 0 || this.schema_registry.length() == 0 {
throw("both kafka_migrator and schema_registry inputs must be configured")
} else {
root = if this.kafka_migrator.length() == 0 {
throw("the kafka_migrator output must be configured")
} else if this.schema_registry.length() > 0 {
"""
switch:
cases:
Expand Down Expand Up @@ -96,10 +96,35 @@ mapping: |
- output:
reject: ${! @fallback_error }
""".format($kafka_migrator.string(), $kafkaMigratorOffsets.string(), $schema_registry.string()).parse_yaml()
} else {
"""
switch:
cases:
- check: metadata("input_label") == "kafka_migrator"
output:
fallback:
- kafka_migrator: %s
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "kafka_migrator_offsets"
output:
fallback:
- kafka_migrator_offsets: %s
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
""".format($kafka_migrator.string(), $kafkaMigratorOffsets.string()).parse_yaml()
}
tests:
- name: Migrate both data and schemas
- name: Migrate messages, offsets and schemas
config:
kafka_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
Expand Down Expand Up @@ -133,7 +158,6 @@ tests:
- kafka_migrator_offsets:
seed_brokers:
- 127.0.0.1:9092
max_in_flight: 1
- drop: {}
processors:
- log:
Expand All @@ -156,3 +180,41 @@ tests:
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
- output:
reject: ${! @fallback_error }

- name: Migrate only messages and offsets
config:
kafka_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
max_in_flight: 1

expected:
switch:
cases:
- check: metadata("input_label") == "kafka_migrator"
output:
fallback:
- kafka_migrator:
key: ${! metadata("kafka_key") }
max_in_flight: 1
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
partitioner: manual
seed_brokers:
- 127.0.0.1:9092
timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) }
topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) }
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "kafka_migrator_offsets"
output:
fallback:
- kafka_migrator_offsets:
seed_brokers:
- 127.0.0.1:9092
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }

0 comments on commit 7ec354c

Please sign in to comment.