diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index c33dd2f48..13ae005b3 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -31,11 +31,14 @@ type CheckPoint interface { Load() error // Save saves checkpoint information. - Save(commitTS int64, secondaryTS int64, consistent bool) error + Save(commitTS int64, secondaryTS int64, consistent bool, version int64) error // TS gets checkpoint commit timestamp. TS() int64 + // SchemaVersion gets checkpoint current schemaversion. + SchemaVersion() int64 + // IsConsistent return the Consistent status saved. IsConsistent() bool @@ -61,7 +64,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) { return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg) } - log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg)) + log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Int64("version", cp.SchemaVersion()), zap.Reflect("cfg", cfg)) return cp, nil } diff --git a/drainer/checkpoint/file.go b/drainer/checkpoint/file.go index 133eca8a5..eaa3754e7 100644 --- a/drainer/checkpoint/file.go +++ b/drainer/checkpoint/file.go @@ -33,6 +33,7 @@ type FileCheckPoint struct { ConsistentSaved bool `toml:"consistent" json:"consistent"` CommitTS int64 `toml:"commitTS" json:"commitTS"` + Version int64 `toml:"schema-version" json:"schema-version"` } // NewFile creates a new FileCheckpoint. @@ -82,7 +83,7 @@ func (sp *FileCheckPoint) Load() error { } // Save implements CheckPoint.Save interface -func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { +func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error { sp.Lock() defer sp.Unlock() @@ -92,6 +93,9 @@ func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { sp.CommitTS = ts sp.ConsistentSaved = consistent + if version > sp.Version { + sp.Version = version + } var buf bytes.Buffer e := toml.NewEncoder(&buf) @@ -116,6 +120,14 @@ func (sp *FileCheckPoint) TS() int64 { return sp.CommitTS } +// SchemaVersion implements CheckPoint.SchemaVersion interface. +func (sp *FileCheckPoint) SchemaVersion() int64 { + sp.RLock() + defer sp.RUnlock() + + return sp.Version +} + // IsConsistent implements CheckPoint interface func (sp *FileCheckPoint) IsConsistent() bool { sp.RLock() diff --git a/drainer/checkpoint/file_test.go b/drainer/checkpoint/file_test.go index 225830215..12f9d665d 100644 --- a/drainer/checkpoint/file_test.go +++ b/drainer/checkpoint/file_test.go @@ -33,7 +33,7 @@ func (t *testCheckPointSuite) TestFile(c *C) { testTs := int64(1) // save ts - err = meta.Save(testTs, 0, false) + err = meta.Save(testTs, 0, false, 0) c.Assert(err, IsNil) // check ts ts := meta.TS() @@ -41,7 +41,7 @@ func (t *testCheckPointSuite) TestFile(c *C) { c.Assert(meta.IsConsistent(), Equals, false) // check consistent true case. - err = meta.Save(testTs, 0, true) + err = meta.Save(testTs, 0, true, 0) c.Assert(err, IsNil) ts = meta.TS() c.Assert(ts, Equals, testTs) @@ -70,6 +70,6 @@ func (t *testCheckPointSuite) TestFile(c *C) { err = meta.Close() c.Assert(err, IsNil) c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed) - c.Assert(errors.Cause(meta.Save(0, 0, true)), Equals, ErrCheckPointClosed) + c.Assert(errors.Cause(meta.Save(0, 0, true, 0)), Equals, ErrCheckPointClosed) c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed) } diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index df0c58ee8..dbcc6aa16 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -14,9 +14,11 @@ package checkpoint import ( + "context" "database/sql" "encoding/json" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -25,6 +27,7 @@ import ( // mysql driver _ "github.com/go-sql-driver/mysql" "github.com/pingcap/tidb-binlog/pkg/loader" + "github.com/pingcap/tidb-binlog/pkg/util" ) // MysqlCheckPoint is a local savepoint struct for mysql @@ -41,6 +44,7 @@ type MysqlCheckPoint struct { ConsistentSaved bool `toml:"consistent" json:"consistent"` CommitTS int64 `toml:"commitTS" json:"commitTS"` TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` + Version int64 `toml:"schema-version" json:"schema-version"` } var _ CheckPoint = &MysqlCheckPoint{} @@ -126,7 +130,7 @@ func (sp *MysqlCheckPoint) Load() error { } // Save implements checkpoint.Save interface -func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { +func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error { sp.Lock() defer sp.Unlock() @@ -136,6 +140,9 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { sp.CommitTS = ts sp.ConsistentSaved = consistent + if version > sp.Version { + sp.Version = version + } if secondaryTS > 0 { sp.TsMap["primary-ts"] = ts @@ -148,12 +155,13 @@ func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { } sql := genReplaceSQL(sp, string(b)) - _, err = sp.db.Exec(sql) - if err != nil { - return errors.Annotatef(err, "query sql failed: %s", sql) - } - - return nil + return util.RetryContext(context.TODO(), 5, time.Second, 1, func(context.Context) error { + _, err = sp.db.Exec(sql) + if err != nil { + return errors.Annotatef(err, "query sql failed: %s", sql) + } + return nil + }) } // IsConsistent implements CheckPoint interface @@ -172,6 +180,14 @@ func (sp *MysqlCheckPoint) TS() int64 { return sp.CommitTS } +// SchemaVersion implements CheckPoint.SchemaVersion interface. +func (sp *MysqlCheckPoint) SchemaVersion() int64 { + sp.RLock() + defer sp.RUnlock() + + return sp.Version +} + // Close implements CheckPoint.Close interface func (sp *MysqlCheckPoint) Close() error { sp.Lock() diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 30b06146c..8a2ee3090 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -50,7 +50,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) { c.Assert(err, IsNil) mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0)) cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"} - err = cp.Save(1111, 0, false) + err = cp.Save(1111, 0, false, 0) c.Assert(err, IsNil) } @@ -64,7 +64,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) { table: "tbl", TsMap: make(map[string]int64), } - err = cp.Save(65536, 3333, false) + err = cp.Save(65536, 3333, false, 0) c.Assert(err, IsNil) c.Assert(cp.TsMap["primary-ts"], Equals, int64(65536)) c.Assert(cp.TsMap["secondary-ts"], Equals, int64(3333)) diff --git a/drainer/relay.go b/drainer/relay.go index 2f2d7ccb9..48a2f37f4 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -150,7 +150,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) return errors.Trace(readerErr) } - err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/) + err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/, 0) if err != nil { return errors.Trace(err) } diff --git a/drainer/relay_test.go b/drainer/relay_test.go index d3838a4fc..3fc31fc27 100644 --- a/drainer/relay_test.go +++ b/drainer/relay_test.go @@ -74,7 +74,7 @@ var _ loader.Loader = &noOpLoader{} func (s *relaySuite) TestFeedByRealyLog(c *check.C) { cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint") c.Assert(err, check.IsNil) - err = cp.Save(0, 0, false) + err = cp.Save(0, 0, false, 0) c.Assert(err, check.IsNil) c.Assert(cp.IsConsistent(), check.Equals, false) diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index 921ccfb8b..a225a6398 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -28,6 +28,11 @@ type Item struct { Table string RelayLogPos pb.Pos + // Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change. + // which makes restart & reload history DDL with previous SchemaVersion not reliable. + // so we should save this version as checkpoint. + SchemaVersion int64 + // the applied TS executed in downstream, only for tidb AppliedTS int64 // should skip to replicate this item at downstream diff --git a/drainer/syncer.go b/drainer/syncer.go index 5038fd531..de06707cc 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -184,6 +184,7 @@ func (s *Syncer) enableSafeModeInitializationPhase() { func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { successes := s.dsyncer.Successes() var lastSaveTS int64 + var latestVersion int64 lastSaveTime := time.Now() for { @@ -208,6 +209,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { if ts > atomic.LoadInt64(lastTS) { atomic.StoreInt64(lastTS, ts) } + latestVersion = item.SchemaVersion // save ASAP for DDL, and if FinishTS > 0, we should save the ts map if item.Binlog.DdlJobId > 0 || item.AppliedTS > 0 { @@ -229,7 +231,7 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { ts := atomic.LoadInt64(lastTS) if ts > lastSaveTS { if saveNow || time.Since(lastSaveTime) > 3*time.Second { - s.savePoint(ts, appliedTS) + s.savePoint(ts, appliedTS, latestVersion) lastSaveTime = time.Now() lastSaveTS = ts appliedTS = 0 @@ -242,22 +244,22 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { ts := atomic.LoadInt64(lastTS) if ts > lastSaveTS { - s.savePoint(ts, 0) + s.savePoint(ts, 0, latestVersion) eventCounter.WithLabelValues("savepoint").Add(1) } log.Info("handleSuccess quit") } -func (s *Syncer) savePoint(ts, secondaryTS int64) { +func (s *Syncer) savePoint(ts, secondaryTS, version int64) { if ts < s.cp.TS() { log.Error("save ts is less than checkpoint ts %d", zap.Int64("save ts", ts), zap.Int64("checkpoint ts", s.cp.TS())) } - log.Info("write save point", zap.Int64("ts", ts)) - err := s.cp.Save(ts, secondaryTS, false) + log.Info("write save point", zap.Int64("ts", ts), zap.Int64("version", version)) + err := s.cp.Save(ts, secondaryTS, false, version) if err != nil { - log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err)) + log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Int64("version", version), zap.Error(err)) } checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(ts)))) @@ -282,6 +284,12 @@ func (s *Syncer) run() error { s.enableSafeModeInitializationPhase() + err = s.schema.handlePreviousDDLJobIfNeed(s.cp.SchemaVersion() + 1) + if err != nil { + err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed") + return err + } + var lastDDLSchemaVersion int64 var b *binlogItem @@ -354,6 +362,7 @@ ForLoop: err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed") break ForLoop } + var isFilterTransaction = false var err1 error if s.loopbackSync != nil && s.loopbackSync.LoopbackControl { @@ -375,7 +384,7 @@ ForLoop: s.addDMLEventMetrics(preWrite.GetMutations()) beginTime := time.Now() lastAddComitTS = binlog.GetCommitTs() - err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite}) + err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite, SchemaVersion: preWrite.SchemaVersion}) if err != nil { err = errors.Annotatef(err, "failed to add item") break ForLoop @@ -396,7 +405,6 @@ ForLoop: log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion)) lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion - err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion) if err != nil { err = errors.Trace(err) @@ -443,7 +451,7 @@ ForLoop: log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs)) - err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip}) + err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip, SchemaVersion: lastDDLSchemaVersion}) if err != nil { err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) break ForLoop @@ -473,7 +481,7 @@ ForLoop: return cerr } - return s.cp.Save(s.cp.TS(), 0, true /*consistent*/) + return s.cp.Save(s.cp.TS(), 0, true /*consistent*/, lastDDLSchemaVersion) } func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) {