Skip to content

Commit

Permalink
arbiter: discard the repeated kafka message (#837) (#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored and suzaku committed Dec 4, 2019
1 parent 948d8f3 commit a7b869c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
8 changes: 8 additions & 0 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,16 @@ func (s *Server) loadStatus() (int, error) {
func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.Loader) (err error) {
dest := ld.Input()
defer ld.Close()
var receivedTs int64
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))

if msg.Binlog.CommitTs <= receivedTs {
log.Info("skip repeated binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
continue
}
receivedTs = msg.Binlog.CommitTs

txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
Expand Down
20 changes: 14 additions & 6 deletions arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,23 +380,31 @@ type syncBinlogsSuite struct{}

var _ = Suite(&syncBinlogsSuite{})

func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message {
func (s *syncBinlogsSuite) createMsg(schema, table, sql string, commitTs int64) *reader.Message {
return &reader.Message{
Binlog: &pb.Binlog{
DdlData: &pb.DDLData{
SchemaName: &schema,
TableName: &table,
DdlQuery: []byte(sql),
},
CommitTs: commitTs,
},
}
}

func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
source := make(chan *reader.Message, 1)
msgs := []*reader.Message{
s.createMsg("test42", "users", "alter table users add column gender smallint"),
s.createMsg("test42", "operations", "alter table operations drop column seq"),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
}
expectMsgs := []*reader.Message{
s.createMsg("test42", "users", "alter table users add column gender smallint", 1),
s.createMsg("test42", "operations", "alter table operations drop column seq", 2),
}
dest := make(chan *loader.Txn, len(msgs))
go func() {
Expand All @@ -410,8 +418,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
err := syncBinlogs(context.Background(), source, &ld)
c.Assert(err, IsNil)

c.Assert(len(dest), Equals, 2)
for _, m := range msgs {
c.Assert(len(dest), Equals, len(expectMsgs))
for _, m := range expectMsgs {
txn := <-dest
c.Assert(txn.Metadata.(*reader.Message), DeepEquals, m)
}
Expand All @@ -426,7 +434,7 @@ func (s *syncBinlogsSuite) TestShouldQuitWhenSomeErrorOccurs(c *C) {
// input is set small to trigger blocking easily
input: make(chan *loader.Txn, 1),
}
msg := s.createMsg("test42", "users", "alter table users add column gender smallint")
msg := s.createMsg("test42", "users", "alter table users add column gender smallint", 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start a routine keep sending msgs to kafka reader
Expand Down

0 comments on commit a7b869c

Please sign in to comment.