Skip to content

Commit

Permalink
Only update litestream_seq if size is below WAL header size
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Mar 5, 2024
1 parent 94f69a0 commit 80280fc
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 27 deletions.
16 changes: 10 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (db *DB) acquireReadLock() error {
}

// Execute read query to obtain read lock.
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback()
return err
}
Expand All @@ -649,6 +649,10 @@ func (db *DB) releaseReadLock() error {
// Rollback & clear read transaction.
err := db.rtx.Rollback()
db.rtx = nil

if errors.Is(err, context.Canceled) {
err = nil
}
return err
}

Expand Down Expand Up @@ -693,7 +697,7 @@ func (db *DB) createGeneration() (string, error) {

// Atomically write generation name as current generation.
generationNamePath := db.GenerationNamePath()
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if db.fileInfo != nil {
mode = db.fileInfo.Mode()
}
Expand Down Expand Up @@ -986,7 +990,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
}

// Write header to new WAL shadow file.
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if fi := db.fileInfo; fi != nil {
mode = fi.Mode()
}
Expand Down Expand Up @@ -1022,7 +1026,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
}
origWalSize = frameAlign(fi.Size(), db.pageSize)

w, err := os.OpenFile(filename, os.O_RDWR, 0666)
w, err := os.OpenFile(filename, os.O_RDWR, 0o666)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -1334,7 +1338,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// a new page is written.
if err := db.execCheckpoint(mode); err != nil {
return err
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
} else if err := db.ensureWALExists(); err != nil {
return err
}

Expand Down Expand Up @@ -1424,7 +1428,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {

// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
return fmt.Errorf("acquire read lock: %w", err)
}

return nil
Expand Down
16 changes: 2 additions & 14 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) {
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := os.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -552,12 +552,7 @@ func TestDB_Sync(t *testing.T) {
t.Fatal(err)
}

// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
// NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover.
})

// Ensure DB checkpoints after interval.
Expand All @@ -581,13 +576,6 @@ func TestDB_Sync(t *testing.T) {
} else if err := db.Sync(context.Background()); err != nil {
t.Fatal(err)
}

// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}

Expand Down
16 changes: 9 additions & 7 deletions replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func TestReplica_Sync(t *testing.T) {
t.Fatal(err)
}

if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}

c := file.NewReplicaClient(t.TempDir())
r := litestream.NewReplica(db, "")
c.Replica, r.Client = r, c
Expand Down Expand Up @@ -142,7 +146,7 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos0); got != want {
} else if got, want := info.Pos(), pos0.Truncate(); got != want {
t.Fatalf("pos=%s, want %s", got, want)
}

Expand All @@ -166,20 +170,18 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos1); got != want {
} else if got, want := info.Pos(), pos1.Truncate(); got != want {
t.Fatalf("pos=%v, want %v", got, want)
}

// Verify three snapshots exist.
// Verify snapshots exist.
if infos, err := r.Snapshots(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(infos), 3; got != want {
} else if got, want := len(infos), 2; got != want {
t.Fatalf("len=%v, want %v", got, want)
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
t.Fatalf("info[0]=%s, want %s", got, want)
} else if got, want := infos[1].Pos(), nextIndex(pos0); got != want {
} else if got, want := infos[1].Pos(), pos1.Truncate(); got != want {
t.Fatalf("info[1]=%s, want %s", got, want)
} else if got, want := infos[2].Pos(), nextIndex(pos1); got != want {
t.Fatalf("info[2]=%s, want %s", got, want)
}
}

0 comments on commit 80280fc

Please sign in to comment.