Skip to content

Commit

Permalink
Re-enable the rowserrcheck linter
Browse files Browse the repository at this point in the history
Seems to work fine on Go 1.23.4 ¯\_(ツ)_/¯

Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Jan 12, 2025
1 parent 48a8fa9 commit 7e0c4c4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ linters:
- tenv
- predeclared
- mirror
# - rowserrcheck
- rowserrcheck
- bodyclose
- nolintlint
17 changes: 10 additions & 7 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,12 @@ func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot)
return err
}
i.logger.Tracef("primary keys for table %s: %v", table, tablePks)
var numRowsProcessed int
lastSeenPksValues := map[string]any{}
for _, pk := range tablePks {
lastSeenPksValues[pk] = nil
}

var numRowsProcessed int
for {
var batchRows *sql.Rows
if numRowsProcessed == 0 {
Expand All @@ -326,21 +327,19 @@ func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot)
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize)
}
if err != nil {
return err
return fmt.Errorf("failed to execute snapshot table query: %s", err)
}

types, err := batchRows.ColumnTypes()
if err != nil {
_ = batchRows.Close()
return err
return fmt.Errorf("failed to fetch column types: %s", err)
}

values, mappers := prepSnapshotScannerAndMappers(types)

columns, err := batchRows.Columns()
if err != nil {
_ = batchRows.Close()
return err
return fmt.Errorf("failed to fetch columns: %s", err)
}

var batchRowsCount int
Expand All @@ -349,7 +348,6 @@ func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot)
batchRowsCount++

if err := batchRows.Scan(values...); err != nil {
_ = batchRows.Close()
return err
}

Expand All @@ -376,6 +374,11 @@ func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot)
return ctx.Err()
}
}

if err := batchRows.Err(); err != nil {
return fmt.Errorf("failed to iterate snapshot table: %s", err)
}

// TODO(cdc): Save checkpoint
if batchRowsCount < i.fieldSnapshotMaxBatchSize {
break
Expand Down
4 changes: 4 additions & 0 deletions internal/impl/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ ORDER BY ORDINAL_POSITION;
pks = append(pks, pk)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate table: %s", err)
}

if len(pks) == 0 {
return nil, fmt.Errorf("unable to find primary key for table %s - does the table exist and does it have a primary key set?", table)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/impl/timeplus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (d *driver) Run(sql string) error {
return err
}

if err := rows.Err(); err != nil {
return err
}

columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
Expand Down

0 comments on commit 7e0c4c4

Please sign in to comment.