From 7ce5d998646ae57b9b6eae6ae6782f841d6cfd12 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 11:51:41 +0100 Subject: [PATCH 01/10] fix(): parse uuid type to string during the replication --- .../replication_message_decoders.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index 0c1ed3b236..339044d437 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -10,9 +10,9 @@ package pglogicalstream import ( "fmt" - "log" - + "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" + "log" ) // ---------------------------------------------------------------------------- @@ -147,7 +147,18 @@ func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeM func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interface{}, error) { if dt, ok := mi.TypeForOID(dataType); ok { - return dt.Codec.DecodeValue(mi, dataType, pgtype.TextFormatCode, data) + val, err := dt.Codec.DecodeValue(mi, dataType, pgtype.TextFormatCode, data) + if err != nil { + return val, err + } + + if dt.Name == "uuid" { + typesValueForUUID := val.([16]uint8) + return uuid.UUID(typesValueForUUID).String(), err + } + + return val, err } + return string(data), nil } From f3503c9d1ea6319e6ec043b7090f48be1913c951 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 12:52:37 +0100 Subject: [PATCH 02/10] chore(): added support for JSONB/INET/TSRAGE/INT[]/TEXT[] types --- .../replication_message_decoders.go | 11 ++++ .../postgresql/pglogicalstream/snapshotter.go | 59 +++++++++++++++++++ .../impl/postgresql/pglogicalstream/types.go | 26 ++++++++ 3 files changed, 96 insertions(+) diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index 339044d437..981e66fe49 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -11,6 +11,7 @@ package pglogicalstream import ( "fmt" "github.com/google/uuid" + pgtypes "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgtype" "log" ) @@ -157,6 +158,16 @@ func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interfa return uuid.UUID(typesValueForUUID).String(), err } + if dt.Name == "tsrange" { + newArray := pgtypes.Tsrange{} + if err := newArray.Scan(data); err != nil { + return nil, err + } + + vv, _ := newArray.Value() + return vv, err + } + return val, err } diff --git a/internal/impl/postgresql/pglogicalstream/snapshotter.go b/internal/impl/postgresql/pglogicalstream/snapshotter.go index 8f6c737da6..55646e23d5 100644 --- a/internal/impl/postgresql/pglogicalstream/snapshotter.go +++ b/internal/impl/postgresql/pglogicalstream/snapshotter.go @@ -11,7 +11,9 @@ package pglogicalstream import ( "context" "database/sql" + "encoding/json" "fmt" + "github.com/jackc/pgtype" "strings" "errors" @@ -156,6 +158,63 @@ func (s *Snapshotter) prepareScannersAndGetters(columnTypes []*sql.ColumnType) ( case "INT4": scanArgs[i] = new(sql.NullInt64) valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullInt64).Int64 } + case "JSONB": + scanArgs[i] = new(sql.NullString) + valueGetters[i] = func(v interface{}) interface{} { + payload := v.(*sql.NullString).String + if payload == "" { + return nil + } + var dst any + if err := json.Unmarshal([]byte(v.(*sql.NullString).String), &dst); err != nil { + s.logger.Warnf("Failed to unmarshal JSONB value: %v", err) + } + + return dst + } + case "INET": + scanArgs[i] = new(sql.NullString) + valueGetters[i] = func(v interface{}) interface{} { + payload := v.(*sql.NullString).String + return formatIP(payload) + } + case "TSRANGE": + scanArgs[i] = new(sql.NullString) + valueGetters[i] = func(v interface{}) interface{} { + newArray := pgtype.Tsrange{} + val := v.(*sql.NullString).String + if err := newArray.Scan(val); err != nil { + s.logger.Warnf("Failed to scan array of TEXT values: %v", err) + return nil + } + + vv, _ := newArray.Value() + return vv + } + case "_INT4": + scanArgs[i] = new(sql.NullString) + valueGetters[i] = func(v interface{}) interface{} { + newArray := pgtype.Int4Array{} + val := v.(*sql.NullString).String + if err := newArray.Scan(val); err != nil { + s.logger.Warnf("Failed to scan array of INT4 values: %v", err) + return nil + } + + return newArray.Elements + } + case "_TEXT": + scanArgs[i] = new(sql.NullString) + valueGetters[i] = func(v interface{}) interface{} { + newArray := pgtype.TextArray{} + val := v.(*sql.NullString).String + if err := newArray.Scan(val); err != nil { + s.logger.Warnf("Failed to scan array of TEXT values: %v", err) + return nil + } + + return newArray.Elements + } default: scanArgs[i] = new(sql.NullString) valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullString).String } diff --git a/internal/impl/postgresql/pglogicalstream/types.go b/internal/impl/postgresql/pglogicalstream/types.go index 2d1d0ff3ad..9b9bea1c90 100644 --- a/internal/impl/postgresql/pglogicalstream/types.go +++ b/internal/impl/postgresql/pglogicalstream/types.go @@ -7,3 +7,29 @@ // https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md package pglogicalstream + +import ( + "fmt" + "net" + "strings" +) + +func formatIP(ipStr string) string { + if strings.Contains(ipStr, "/") { + // If it has a mask, use ParseCIDR + ip, network, err := net.ParseCIDR(ipStr) + if err != nil { + return "" // or handle error as needed + } + + maskLen, _ := network.Mask.Size() + return fmt.Sprintf("%s/%d", ip.String(), maskLen) + } else { + // Just an IP without mask + ip := net.ParseIP(ipStr) + if ip == nil { + return "" // or handle error as needed + } + return ip.String() + } +} From 367b713637d36750c245e52e97f63faa71d850d3 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 13:11:03 +0100 Subject: [PATCH 03/10] chore(): added integration tests for complex datatypes --- internal/impl/postgresql/integration_test.go | 135 ++++++++++++++++++ .../replication_message_decoders.go | 3 +- .../postgresql/pglogicalstream/snapshotter.go | 13 +- .../impl/postgresql/pglogicalstream/types.go | 26 ---- 4 files changed, 147 insertions(+), 30 deletions(-) diff --git a/internal/impl/postgresql/integration_test.go b/internal/impl/postgresql/integration_test.go index f22878d2cb..5af3944d37 100644 --- a/internal/impl/postgresql/integration_test.go +++ b/internal/impl/postgresql/integration_test.go @@ -105,6 +105,22 @@ func ResourceWithPostgreSQLVersion(t *testing.T, pool *dockertest.Pool, version return err } + // Creating table with complex PG types + _, err = db.Exec(`CREATE TABLE complex_types_example ( + id SERIAL PRIMARY KEY, + json_data JSONB, + tags TEXT[], + ip_addr INET, + search_text TSVECTOR, + time_range TSRANGE, + location POINT, + uuid_col UUID, + int_array INTEGER[] + );`) + if err != nil { + return err + } + _, err = db.Exec(` CREATE TABLE IF NOT EXISTS flights_composite_pks ( id serial, seq integer, name VARCHAR(50), created_at TIMESTAMP, @@ -471,6 +487,125 @@ file: }) } +func TestIntegrationPgCDCForPgOutputStreamComplexTypesPlugin(t *testing.T) { + integration.CheckSkip(t) + tmpDir := t.TempDir() + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + var ( + resource *dockertest.Resource + db *sql.DB + ) + + resource, db, err = ResourceWithPostgreSQLVersion(t, pool, "16") + require.NoError(t, err) + require.NoError(t, resource.Expire(120)) + + hostAndPort := resource.GetHostPort("5432/tcp") + hostAndPortSplited := strings.Split(hostAndPort, ":") + password := "l]YLSc|4[i56%{gY" + + // inserting data + _, err = db.Exec(`INSERT INTO complex_types_example ( + json_data, + tags, + ip_addr, + search_text, + time_range, + location, + uuid_col, + int_array + ) VALUES ( + '{"name": "test", "value": 42}'::jsonb, + ARRAY['tag1', 'tag2', 'tag3'], + '192.168.1.1', + to_tsvector('english', 'The quick brown fox jumps over the lazy dog'), + tsrange('2024-01-01', '2024-12-31'), + point(45.5, -122.6), + 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', + ARRAY[1, 2, 3, 4, 5] + );`) + require.NoError(t, err) + + databaseURL := fmt.Sprintf("user=user_name password=%s dbname=dbname sslmode=disable host=%s port=%s", password, hostAndPortSplited[0], hostAndPortSplited[1]) + template := fmt.Sprintf(` +pg_stream: + dsn: %s + slot_name: test_slot_native_decoder + snapshot_batch_size: 100 + stream_snapshot: true + batch_transactions: true + schema: public + tables: + - complex_types_example +`, databaseURL) + + cacheConf := fmt.Sprintf(` +label: pg_stream_cache +file: + directory: %v +`, tmpDir) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*25, time.Millisecond*100) + + messageWithComplexTypes := outBatches[0] + + // producing change to non-complex type to trigger replication and receive updated row so we can check the complex types again + // but after they have been produced by replication to ensure the consistency + _, err = db.Exec("UPDATE complex_types_example SET id = 2 WHERE id = 1") + require.NoError(t, err) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 2 + }, time.Second*25, time.Millisecond*100) + + // replacing update with insert to remove replication messages type differences + // so we will be checking only the data + lastMessage := outBatches[len(outBatches)-1] + lastMessage = strings.Replace(lastMessage, "update", "insert", 1) + messageWithComplexTypes = strings.Replace(messageWithComplexTypes, "\"table_snapshot_progress\":0,", "", 1) + + require.Equal(t, messageWithComplexTypes, strings.Replace(lastMessage, ":2", ":1", 1)) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + t.Log("All the conditions are met 🎉") + + t.Cleanup(func() { + db.Close() + }) +} + func TestIntegrationPgMultiVersionsCDCForPgOutputStreamUncomitedPlugin(t *testing.T) { integration.CheckSkip(t) // running tests in the look to test different PostgreSQL versions diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index 981e66fe49..27f413d9e5 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -10,10 +10,11 @@ package pglogicalstream import ( "fmt" + "log" + "github.com/google/uuid" pgtypes "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgtype" - "log" ) // ---------------------------------------------------------------------------- diff --git a/internal/impl/postgresql/pglogicalstream/snapshotter.go b/internal/impl/postgresql/pglogicalstream/snapshotter.go index 55646e23d5..6ab3b14a6e 100644 --- a/internal/impl/postgresql/pglogicalstream/snapshotter.go +++ b/internal/impl/postgresql/pglogicalstream/snapshotter.go @@ -13,9 +13,10 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/jackc/pgtype" "strings" + "github.com/jackc/pgtype" + "errors" _ "github.com/lib/pq" @@ -175,8 +176,14 @@ func (s *Snapshotter) prepareScannersAndGetters(columnTypes []*sql.ColumnType) ( case "INET": scanArgs[i] = new(sql.NullString) valueGetters[i] = func(v interface{}) interface{} { - payload := v.(*sql.NullString).String - return formatIP(payload) + inet := pgtype.Inet{} + val := v.(*sql.NullString).String + if err := inet.Scan(val); err != nil { + s.logger.Warnf("Failed to scan array of INT4 values: %v", err) + return nil + } + + return inet.IPNet.String() } case "TSRANGE": scanArgs[i] = new(sql.NullString) diff --git a/internal/impl/postgresql/pglogicalstream/types.go b/internal/impl/postgresql/pglogicalstream/types.go index 9b9bea1c90..2d1d0ff3ad 100644 --- a/internal/impl/postgresql/pglogicalstream/types.go +++ b/internal/impl/postgresql/pglogicalstream/types.go @@ -7,29 +7,3 @@ // https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md package pglogicalstream - -import ( - "fmt" - "net" - "strings" -) - -func formatIP(ipStr string) string { - if strings.Contains(ipStr, "/") { - // If it has a mask, use ParseCIDR - ip, network, err := net.ParseCIDR(ipStr) - if err != nil { - return "" // or handle error as needed - } - - maskLen, _ := network.Mask.Size() - return fmt.Sprintf("%s/%d", ip.String(), maskLen) - } else { - // Just an IP without mask - ip := net.ParseIP(ipStr) - if ip == nil { - return "" // or handle error as needed - } - return ip.String() - } -} From f076b337bf6f6e6b57b60619c090de6b2ccd1dda Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 13:16:12 +0100 Subject: [PATCH 04/10] fix(): deps --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 8b78e1cd66..2f2c60bed2 100644 --- a/go.mod +++ b/go.mod @@ -319,7 +319,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgtype v1.14.3 // indirect + github.com/jackc/pgtype v1.14.3 github.com/jackc/puddle v1.3.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect From bfb2b4c93f824fa756e1c3d2941498ea3ba9915a Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 14:57:31 +0100 Subject: [PATCH 05/10] chore(): added error return for value getters on snapshot stage --- .../pglogicalstream/logical_stream.go | 9 +++- .../replication_message_decoders.go | 2 +- .../postgresql/pglogicalstream/snapshotter.go | 46 +++++++++---------- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/internal/impl/postgresql/pglogicalstream/logical_stream.go b/internal/impl/postgresql/pglogicalstream/logical_stream.go index a9f769626b..e17f4367fc 100644 --- a/internal/impl/postgresql/pglogicalstream/logical_stream.go +++ b/internal/impl/postgresql/pglogicalstream/logical_stream.go @@ -500,9 +500,14 @@ func (s *Stream) processSnapshot() error { var data = make(map[string]any) for i, getter := range valueGetters { - data[columnNames[i]] = getter(scanArgs[i]) + if data[columnNames[i]], err = getter(scanArgs[i]); err != nil { + return err + } + if _, ok := lastPrimaryKey[columnNames[i]]; ok { - lastPkVals[columnNames[i]] = getter(scanArgs[i]) + if lastPkVals[columnNames[i]], err = getter(scanArgs[i]); err != nil { + return err + } } } diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index a33e76aa0a..e95a3337be 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -157,7 +157,7 @@ func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interfa if dt.Name == "uuid" { typesValueForUUID := val.([16]uint8) - return uuid.UUID(typesValueForUUID).String(), err + return uuid.UUID(typesValueForUUID).String(), nil } if dt.Name == "tsrange" { diff --git a/internal/impl/postgresql/pglogicalstream/snapshotter.go b/internal/impl/postgresql/pglogicalstream/snapshotter.go index 6ab3b14a6e..97167bb27f 100644 --- a/internal/impl/postgresql/pglogicalstream/snapshotter.go +++ b/internal/impl/postgresql/pglogicalstream/snapshotter.go @@ -144,87 +144,83 @@ func (s *Snapshotter) findAvgRowSize(ctx context.Context, table string) (sql.Nul return avgRowSize, nil } -func (s *Snapshotter) prepareScannersAndGetters(columnTypes []*sql.ColumnType) ([]interface{}, []func(interface{}) interface{}) { +func (s *Snapshotter) prepareScannersAndGetters(columnTypes []*sql.ColumnType) ([]interface{}, []func(interface{}) (interface{}, error)) { scanArgs := make([]interface{}, len(columnTypes)) - valueGetters := make([]func(interface{}) interface{}, len(columnTypes)) + valueGetters := make([]func(interface{}) (interface{}, error), len(columnTypes)) for i, v := range columnTypes { switch v.DatabaseTypeName() { case "VARCHAR", "TEXT", "UUID", "TIMESTAMP": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullString).String } + valueGetters[i] = func(v interface{}) (interface{}, error) { return v.(*sql.NullString).String, nil } case "BOOL": scanArgs[i] = new(sql.NullBool) - valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullBool).Bool } + valueGetters[i] = func(v interface{}) (interface{}, error) { return v.(*sql.NullBool).Bool, nil } case "INT4": scanArgs[i] = new(sql.NullInt64) - valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullInt64).Int64 } + valueGetters[i] = func(v interface{}) (interface{}, error) { return v.(*sql.NullInt64).Int64, nil } case "JSONB": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { + valueGetters[i] = func(v interface{}) (interface{}, error) { payload := v.(*sql.NullString).String if payload == "" { - return nil + return payload, nil } var dst any if err := json.Unmarshal([]byte(v.(*sql.NullString).String), &dst); err != nil { s.logger.Warnf("Failed to unmarshal JSONB value: %v", err) } - return dst + return dst, nil } case "INET": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { + valueGetters[i] = func(v interface{}) (interface{}, error) { inet := pgtype.Inet{} val := v.(*sql.NullString).String if err := inet.Scan(val); err != nil { - s.logger.Warnf("Failed to scan array of INT4 values: %v", err) - return nil + return nil, err } - return inet.IPNet.String() + return inet.IPNet.String(), nil } case "TSRANGE": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { + valueGetters[i] = func(v interface{}) (interface{}, error) { newArray := pgtype.Tsrange{} val := v.(*sql.NullString).String if err := newArray.Scan(val); err != nil { - s.logger.Warnf("Failed to scan array of TEXT values: %v", err) - return nil + return nil, err } vv, _ := newArray.Value() - return vv + return vv, nil } case "_INT4": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { + valueGetters[i] = func(v interface{}) (interface{}, error) { newArray := pgtype.Int4Array{} val := v.(*sql.NullString).String if err := newArray.Scan(val); err != nil { - s.logger.Warnf("Failed to scan array of INT4 values: %v", err) - return nil + return nil, err } - return newArray.Elements + return newArray.Elements, nil } case "_TEXT": scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { + valueGetters[i] = func(v interface{}) (interface{}, error) { newArray := pgtype.TextArray{} val := v.(*sql.NullString).String if err := newArray.Scan(val); err != nil { - s.logger.Warnf("Failed to scan array of TEXT values: %v", err) - return nil + return nil, err } - return newArray.Elements + return newArray.Elements, nil } default: scanArgs[i] = new(sql.NullString) - valueGetters[i] = func(v interface{}) interface{} { return v.(*sql.NullString).String } + valueGetters[i] = func(v interface{}) (interface{}, error) { return v.(*sql.NullString).String, nil } } } From 4d47ca34bd3fb09141386ec493098c3d670ebd29 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 14:59:32 +0100 Subject: [PATCH 06/10] chore(): added include_transaction_markers field --- internal/impl/postgresql/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/postgresql/integration_test.go b/internal/impl/postgresql/integration_test.go index d08fafc8ec..a59c4b1761 100644 --- a/internal/impl/postgresql/integration_test.go +++ b/internal/impl/postgresql/integration_test.go @@ -531,7 +531,7 @@ pg_stream: slot_name: test_slot_native_decoder snapshot_batch_size: 100 stream_snapshot: true - batch_transactions: true + include_transaction_markers: false schema: public tables: - complex_types_example From b7426693942d51295c7752c574ef0f4c9be1dd12 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 15:00:59 +0100 Subject: [PATCH 07/10] chore(): return error on jsonb parsing failed --- internal/impl/postgresql/pglogicalstream/snapshotter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/postgresql/pglogicalstream/snapshotter.go b/internal/impl/postgresql/pglogicalstream/snapshotter.go index 97167bb27f..72a76eb815 100644 --- a/internal/impl/postgresql/pglogicalstream/snapshotter.go +++ b/internal/impl/postgresql/pglogicalstream/snapshotter.go @@ -168,7 +168,7 @@ func (s *Snapshotter) prepareScannersAndGetters(columnTypes []*sql.ColumnType) ( } var dst any if err := json.Unmarshal([]byte(v.(*sql.NullString).String), &dst); err != nil { - s.logger.Warnf("Failed to unmarshal JSONB value: %v", err) + return nil, err } return dst, nil From 1496a6969a85f775179d38feba40422c2e453943 Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 15:02:56 +0100 Subject: [PATCH 08/10] chore(): prevent type casting from causing panic --- .../pglogicalstream/replication_message_decoders.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index e95a3337be..46a0aa925a 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -9,8 +9,8 @@ package pglogicalstream import ( + "errors" "fmt" - "github.com/google/uuid" pgtypes "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgtype" @@ -156,7 +156,11 @@ func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interfa } if dt.Name == "uuid" { - typesValueForUUID := val.([16]uint8) + typesValueForUUID, ok := val.([16]uint8) + if !ok { + return nil, errors.New("unable to convert uuid to string. type casting failed") + } + return uuid.UUID(typesValueForUUID).String(), nil } From 925454952205645977b5d6d6a960e1d06d5b14da Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 15:15:43 +0100 Subject: [PATCH 09/10] chore(): format file --- .../postgresql/pglogicalstream/replication_message_decoders.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index 46a0aa925a..b82202e4d6 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -11,6 +11,7 @@ package pglogicalstream import ( "errors" "fmt" + "github.com/google/uuid" pgtypes "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgtype" From d7cac5f6d643821d120c0385126a2a6b65d0bdaf Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Thu, 28 Nov 2024 15:49:50 +0100 Subject: [PATCH 10/10] chore(): updated changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74cfa70b67..6c4475b5db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ All notable changes to this project will be documented in this file. ### Added - Add support for `spanner` driver to SQL plugins. (@yufeng-deng) +- Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad) + +### Fixed + +- Fixed `pg_stream` issue with discrepancies between replication and snapshot streaming for `UUID` type (@le-vlad) ### Changed