From 46c005c542c17d65143400366853bd3e76388e30 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Thu, 5 Dec 2024 15:50:19 +0000 Subject: [PATCH 1/2] Fix panic in redpanda_migrator output Signed-off-by: Mihai Todor --- .../enterprise/redpanda_migrator_bundle_output.tmpl.yaml | 5 ++++- internal/impl/kafka/enterprise/redpanda_migrator_output.go | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml index 65a5d296ec..99873d77df 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml +++ b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml @@ -50,7 +50,8 @@ mapping: | # Exclude metadata fields which start with `kafka_` "^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)" ] - } + }, + "translate_schema_ids": this.schema_registry.length() != 0 } ) @@ -172,6 +173,7 @@ tests: metadata: include_patterns: - ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*) + translate_schema_ids: true processors: - mapping: | meta input_label = deleted() @@ -234,6 +236,7 @@ tests: metadata: include_patterns: - ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*) + translate_schema_ids: false processors: - mapping: | meta input_label = deleted() diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index 866388e0b2..a852c5dfb8 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -253,7 +253,7 @@ func (w *RedpandaMigratorWriter) Connect(ctx context.Context) error { if res, ok := w.mgr.GetGeneric(w.schemaRegistryOutputResource); ok { w.schemaRegistryOutput = res.(*schemaRegistryOutput) } else { - return fmt.Errorf("schema_registry output resource %q not found", w.schemaRegistryOutputResource) + w.mgr.Logger().Warnf("schema_registry output resource %q not found; skipping schema ID translation", w.schemaRegistryOutputResource) } } @@ -275,7 +275,7 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa } var ch franz_sr.ConfluentHeader - if w.translateSchemaIDs { + if w.translateSchemaIDs && w.schemaRegistryOutput != nil { for recordIdx, record := range records { schemaID, _, err := ch.DecodeID(record.Value) if err != nil { From cfd81614e7a6c998168df79da7e715de3a964855 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 5 Dec 2024 15:57:58 +0000 Subject: [PATCH 2/2] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7be34880b6..5f416415ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ All notable changes to this project will be documented in this file. - The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj) +### Fixed + +- The `redpanda_migrator_bundle` output no longer attempts to translate schema IDs when a schema registry is not configured. (@mihaitodor) + ## 4.42.0 - 2024-12-02 ### Added