Skip to content

Commit

Permalink
Merge pull request #3111 from redpanda-data/pgcdc
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Jan 11, 2025
2 parents e3f9827 + 85be188 commit 03fc96e
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 92 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ All notable changes to this project will be documented in this file.
- The `code` and `file` fields on the `javascript` processor docs no longer erroneously mention interpolation support. (@mihaitodor)
- The `postgres_cdc` now correctly handles `null` values. (@rockwotj)
- Fix an issue in `aws_sqs` with refreshing in-flight message leases which could prevent acks from processed. (@rockwotj)
- Fix an issue with `postgres_cdc` with TOAST values not being propagated with `REPLICA IDENTITY FULL`. (@rockwotj)
- Fix a initial snapshot streaming consistency issue with `postgres_cdc`. (@rockwotj)

## 4.44.0 - 2024-12-13

Expand Down
6 changes: 4 additions & 2 deletions internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,11 @@ func (p *pgStreamInput) processStream(pgStream *pglogicalstream.Stream, batcher
})
monitorLoop.Start()
defer monitorLoop.Stop()
ctx, _ := p.stopSig.SoftStopCtx(context.Background())
ctx, cancel := p.stopSig.SoftStopCtx(context.Background())
defer cancel()
defer func() {
ctx, _ := p.stopSig.HardStopCtx(context.Background())
ctx, cancel := p.stopSig.HardStopCtx(context.Background())
defer cancel()
if err := batcher.Close(ctx); err != nil {
p.logger.Errorf("unable to close batcher: %s", err)
}
Expand Down
108 changes: 108 additions & 0 deletions internal/impl/postgresql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func ResourceWithPostgreSQLVersion(t *testing.T, pool *dockertest.Pool, version
return err
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS large_values (id serial PRIMARY KEY, value TEXT);")
if err != nil {
return err
}

// flights_non_streamed is a control table with data that should not be streamed or queried by snapshot streaming
_, err = db.Exec("CREATE TABLE IF NOT EXISTS flights_non_streamed (id serial PRIMARY KEY, name VARCHAR(50), created_at TIMESTAMP);")

Expand Down Expand Up @@ -757,3 +762,106 @@ file:
})
}
}

func TestIntegrationTOASTValues(t *testing.T) {
t.Parallel()
integration.CheckSkip(t)
tmpDir := t.TempDir()
pool, err := dockertest.NewPool("")
require.NoError(t, err)

var (
resource *dockertest.Resource
db *sql.DB
)

resource, db, err = ResourceWithPostgreSQLVersion(t, pool, "16")
require.NoError(t, err)
require.NoError(t, resource.Expire(120))

_, err = db.Exec(`ALTER TABLE large_values REPLICA IDENTITY FULL;`)
require.NoError(t, err)

const stringSize = 400_000

hostAndPort := resource.GetHostPort("5432/tcp")
hostAndPortSplited := strings.Split(hostAndPort, ":")
password := "l]YLSc|4[i56%{gY"

require.NoError(t, err)

// Insert a large >1MiB value
_, err = db.Exec(`INSERT INTO large_values (id, value) VALUES ($1, $2);`, 1, strings.Repeat("foo", stringSize))
require.NoError(t, err)

databaseURL := fmt.Sprintf("user=user_name password=%s dbname=dbname sslmode=disable host=%s port=%s", password, hostAndPortSplited[0], hostAndPortSplited[1])
template := fmt.Sprintf(`
pg_stream:
dsn: %s
slot_name: test_slot_native_decoder
stream_snapshot: true
snapshot_batch_size: 1
schema: public
tables:
- large_values
`, databaseURL)

cacheConf := fmt.Sprintf(`
label: pg_stream_cache
file:
directory: %v
`, tmpDir)

streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: TRACE`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

var outBatches []string
var outBatchMut sync.Mutex
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
require.NoError(t, err)
outBatchMut.Lock()
outBatches = append(outBatches, string(msgBytes))
outBatchMut.Unlock()
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
_ = streamOut.Run(context.Background())
}()

assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1
}, time.Second*10, time.Millisecond*100)

_, err = db.Exec(`UPDATE large_values SET value=$1;`, strings.Repeat("bar", stringSize))
require.NoError(t, err)
_, err = db.Exec(`UPDATE large_values SET id=$1;`, 3)
require.NoError(t, err)
_, err = db.Exec(`DELETE FROM large_values`)
require.NoError(t, err)
_, err = db.Exec(`INSERT INTO large_values (id, value) VALUES ($1, $2);`, 2, strings.Repeat("qux", stringSize))
require.NoError(t, err)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
outBatchMut.Lock()
defer outBatchMut.Unlock()
assert.Len(c, outBatches, 5, "got: %#v", outBatches)
}, time.Second*10, time.Millisecond*100)
require.JSONEq(t, `{"id":1, "value": "`+strings.Repeat("foo", stringSize)+`"}`, outBatches[0], "GOT: %s", outBatches[0])
require.JSONEq(t, `{"id":1, "value": "`+strings.Repeat("bar", stringSize)+`"}`, outBatches[1], "GOT: %s", outBatches[1])
require.JSONEq(t, `{"id":3, "value": "`+strings.Repeat("bar", stringSize)+`"}`, outBatches[2], "GOT: %s", outBatches[2])
require.JSONEq(t, `{"id":3, "value": "`+strings.Repeat("bar", stringSize)+`"}`, outBatches[3], "GOT: %s", outBatches[3])
require.JSONEq(t, `{"id":2, "value": "`+strings.Repeat("qux", stringSize)+`"}`, outBatches[4], "GOT: %s", outBatches[4])

require.NoError(t, streamOut.StopWithin(time.Second*10))
}
31 changes: 20 additions & 11 deletions internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {
stream.errors <- fmt.Errorf("failed to process snapshot: %w", err)
return
}
ctx, _ := stream.shutSig.SoftStopCtx(context.Background())
ctx, done := stream.shutSig.SoftStopCtx(context.Background())
defer done()
if err := stream.startLr(ctx, lsnrestart); err != nil {
stream.errors <- fmt.Errorf("failed to start logical replication: %w", err)
return
Expand Down Expand Up @@ -338,7 +339,8 @@ func (s *Stream) streamMessages(currentLSN LSN) error {
lastEmittedCommitLSN := currentLSN

commitLSN := func(force bool) error {
ctx, _ := s.shutSig.HardStopCtx(context.Background())
ctx, done := s.shutSig.HardStopCtx(context.Background())
defer done()
ackedLSN := s.getAckedLSN()
if ackedLSN == lastEmittedLSN {
ackedLSN = lastEmittedCommitLSN
Expand All @@ -358,7 +360,8 @@ func (s *Stream) streamMessages(currentLSN LSN) error {
}
}()

ctx, _ := s.shutSig.SoftStopCtx(context.Background())
ctx, done := s.shutSig.SoftStopCtx(context.Background())
defer done()
for !s.shutSig.IsSoftStopSignalled() {
if err := commitLSN(time.Now().After(s.nextStandbyMessageDeadline)); err != nil {
return err
Expand Down Expand Up @@ -388,7 +391,6 @@ func (s *Stream) streamMessages(currentLSN LSN) error {
s.logger.Warn("received malformatted with no data")
continue
}

switch msg.Data[0] {
case PrimaryKeepaliveMessageByteID:
pkm, err := ParsePrimaryKeepaliveMessage(msg.Data[1:])
Expand Down Expand Up @@ -420,6 +422,8 @@ func (s *Stream) streamMessages(currentLSN LSN) error {
lastEmittedLSN = msgLSN
lastEmittedCommitLSN = msgLSN
}
default:
return fmt.Errorf("unknown message type: %c", msg.Data[0])
}
}
// clean shutdown, return nil
Expand Down Expand Up @@ -465,10 +469,13 @@ func (s *Stream) processChange(ctx context.Context, msgLSN LSN, xld XLogData, re
}

func (s *Stream) processSnapshot() error {
if err := s.snapshotter.prepare(); err != nil {
ctx, done := s.shutSig.SoftStopCtx(context.Background())
defer done()
if err := s.snapshotter.prepare(ctx); err != nil {
return fmt.Errorf("failed to prepare database snapshot - snapshot may be expired: %w", err)
}
defer func() {
s.logger.Debugf("Finished snapshot processing")
if err := s.snapshotter.releaseSnapshot(); err != nil {
s.logger.Warnf("Failed to release database snapshot: %v", err.Error())
}
Expand All @@ -491,8 +498,6 @@ func (s *Stream) processSnapshot() error {
offset = 0
)

ctx, _ := s.shutSig.SoftStopCtx(context.Background())

avgRowSizeBytes, err = s.snapshotter.findAvgRowSize(ctx, table)
if err != nil {
return fmt.Errorf("failed to calculate average row size for table %v: %w", table, err)
Expand Down Expand Up @@ -549,7 +554,7 @@ func (s *Stream) processSnapshot() error {
var rowsCount = 0
rowsStart := time.Now()
totalScanDuration := time.Duration(0)
totalWaitingFromBenthos := time.Duration(0)
sendDuration := time.Duration(0)

for snapshotRows.Next() {
rowsCount += 1
Expand Down Expand Up @@ -596,14 +601,17 @@ func (s *Stream) processSnapshot() error {
case <-s.shutSig.SoftStopChan():
return nil
}
totalWaitingFromBenthos += time.Since(waitingFromBenthos)
sendDuration += time.Since(waitingFromBenthos)
}

if snapshotRows.Err() != nil {
return fmt.Errorf("failed to close snapshot data iterator for table %v: %w", table, snapshotRows.Err())
}

batchEnd := time.Since(rowsStart)
s.logger.Debugf("Batch duration: %v %s \n", batchEnd, tableName)
s.logger.Debugf("Scan duration %v %s\n", totalScanDuration, tableName)
s.logger.Debugf("Waiting from benthos duration %v %s\n", totalWaitingFromBenthos, tableName)
s.logger.Debugf("Send duration %v %s\n", sendDuration, tableName)

offset += batchSize

Expand Down Expand Up @@ -672,7 +680,8 @@ func (s *Stream) getPrimaryKeyColumn(ctx context.Context, table TableFQN) (map[s
func (s *Stream) Stop(ctx context.Context) error {
s.shutSig.TriggerSoftStop()
var wg errgroup.Group
stopNowCtx, _ := s.shutSig.HardStopCtx(ctx)
stopNowCtx, done := s.shutSig.HardStopCtx(ctx)
defer done()
wg.Go(func() error {
return s.pgConn.Close(stopNowCtx)
})
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/postgresql/pglogicalstream/pglogrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func CreateReplicationSlot(
var snapshotResponse SnapshotCreationResponse
if options.SnapshotAction == "export" {
var err error
snapshotResponse, err = snapshotter.initSnapshotTransaction()
snapshotResponse, err = snapshotter.initSnapshotTransaction(ctx)
if err != nil {
return CreateReplicationSlotResult{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeM
for idx, col := range logicalMsg.Tuple.Columns {
colName := rel.Columns[idx].Name
switch col.DataType {
case 'n': // null
case 'n', 'u': // null or unchanged toast
values[colName] = nil
case 'u': // unchanged toast
// This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
case 't': //text
val, err := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
if err != nil {
Expand All @@ -104,13 +102,29 @@ func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeM
case 'n': // null
values[colName] = nil
case 'u': // unchanged toast
// This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
// In the case of an update of an unchanged toast value and the replica is set to
// IDENTITY FULL, we need to look at the old tuple in order to get the data, it's
// just marked as unchanged in the new tuple.
if idx < len(logicalMsg.OldTuple.Columns) {
col = logicalMsg.OldTuple.Columns[idx]
switch col.DataType {
case 'n', 'u':
values[colName] = nil
continue
case 't':
default:
return nil, fmt.Errorf("unable to decode column data, unknown data type: %d", col.DataType)
}
}
fallthrough
case 't': //text
val, err := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
if err != nil {
return nil, fmt.Errorf("unable to decode column data: %w", err)
}
values[colName] = val
default:
return nil, fmt.Errorf("unable to decode column data, unknown data type: %d", col.DataType)
}
}
message.Data = values
Expand All @@ -126,16 +140,15 @@ func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeM
for idx, col := range logicalMsg.OldTuple.Columns {
colName := rel.Columns[idx].Name
switch col.DataType {
case 'n': // null
case 'n', 'u': // null or unchanged toast
values[colName] = nil
case 'u': // unchanged toast
// This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
case 't': //text
val, err := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
if err != nil {
return nil, fmt.Errorf("unable to decode column data: %w", err)
}
values[colName] = val
default:
}
}
message.Data = values
Expand Down
Loading

0 comments on commit 03fc96e

Please sign in to comment.