Skip to content

Commit

Permalink
*: add more metrics and some temporary adjustment (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL authored May 21, 2018
1 parent e5e7ab8 commit 5c3d7a8
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 32 deletions.
3 changes: 1 addition & 2 deletions cmd/drainer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ func main() {
log.Fatalf("verifying flags error, See 'drainer --help'. %s", errors.ErrorStack(err))
}

drainer.InitLogger(cfg)
version.PrintVersionInfo()
log.Infof("use config: %+v", cfg)

drainer.InitLogger(cfg)

bs, err := drainer.NewServer(cfg)
if err != nil {
log.Fatalf("create drainer server error, %s", errors.ErrorStack(err))
Expand Down
13 changes: 11 additions & 2 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (c *Collector) updateCollectStatus(synced bool) {
// updateStatus queries pumps' status , deletes the offline pump
// and updates pumps' latest ts
func (c *Collector) updateStatus(ctx context.Context) error {
begin := time.Now()
defer func() {
publishBinlogHistogram.WithLabelValues("drainer").Observe(time.Since(begin).Seconds())
}()

if err := c.updatePumpStatus(ctx); err != nil {
log.Errorf("DetectPumps error: %v", errors.ErrorStack(err))
c.updateCollectStatus(false)
Expand Down Expand Up @@ -303,6 +308,7 @@ func (c *Collector) LoadHistoryDDLJobs() ([]*model.Job, error) {

// publishBinlogs collects binlogs whose commitTS are in (minTS, maxTS], then publish them in ascending commitTS order
func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
begin := time.Now()
// multiple ways sort:
// 1. get multiple way sorted binlogs
// 2. use heap to merge sort
Expand All @@ -316,11 +322,13 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
bss[id] = bs
binlogOffsets[id] = 1
// first push the first item into heap every pump
c.bh.push(ctx, bs[0])
c.bh.push(ctx, bs[0], false)
}
total += bs.Len()
}
publishBinlogHistogram.WithLabelValues("drainer_collector").Observe(time.Since(begin).Seconds())

begin = time.Now()
item := c.bh.pop()
for item != nil {
c.syncer.Add(item)
Expand All @@ -329,11 +337,12 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
delete(bss, item.nodeID)
} else {
// push next item into heap and increase the offset
c.bh.push(ctx, bss[item.nodeID][binlogOffsets[item.nodeID]])
c.bh.push(ctx, bss[item.nodeID][binlogOffsets[item.nodeID]], false)
binlogOffsets[item.nodeID] = binlogOffsets[item.nodeID] + 1
}
item = c.bh.pop()
}
publishBinlogHistogram.WithLabelValues("drainer_merge_sort").Observe(time.Since(begin).Seconds())

publishBinlogCounter.WithLabelValues("drainer").Add(float64(total))
}
Expand Down
19 changes: 2 additions & 17 deletions drainer/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ func newBinlogHeap(size int) *binlogHeap {
}
}

func (b *binlogHeap) push(ctx context.Context, item *binlogItem) {
func (b *binlogHeap) push(ctx context.Context, item *binlogItem, check bool) {
for {
select {
case <-ctx.Done():
return
default:
b.Lock()
if b.bh.Len() == b.size {
if check && b.bh.Len() == b.size {
b.Unlock()
time.Sleep(pushRetryTime)
continue
Expand All @@ -97,18 +97,3 @@ func (b *binlogHeap) pop() *binlogItem {
b.Unlock()
return item.(*binlogItem)
}

func (b *binlogHeap) peek() *binlogItem {
b.Lock()
if b.bh.Len() == 0 {
b.Unlock()
return nil
}

item := heap.Pop(b.bh)
heap.Push(b.bh, item)
b.Unlock()

return item.(*binlogItem)

}
6 changes: 3 additions & 3 deletions drainer/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *testDrainerSuite) TestHeap(c *C) {
wg.Add(1)
defer wg.Done()
for _, cs := range testCase {
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"))
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"), true)
}
}()

Expand All @@ -64,12 +64,12 @@ func (s *testDrainerSuite) TestHeap(c *C) {

//test push block and cancel push operator
bh = newBinlogHeap(1)
bh.push(ctx, newBinlogItem(testCase[0], pb.Pos{}, "testnode"))
bh.push(ctx, newBinlogItem(testCase[0], pb.Pos{}, "testnode"), true)
go func() {
wg.Add(1)
defer wg.Done()
for _, cs := range testCase {
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"))
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"), true)
}
}()

Expand Down
20 changes: 20 additions & 0 deletions drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ var (
Help: "offset for each pump.",
}, []string{"nodeID"})

findMatchedBinlogHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "find_matched_binlog_duration_time",
Help: "Bucketed histogram of find a matched binlog.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"nodeID"})

publishBinlogHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "publish_binlog_duration_time",
Help: "Bucketed histogram of publish a binlog.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"nodeID"})

publishBinlogCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Expand Down Expand Up @@ -114,6 +132,8 @@ func init() {
prometheus.MustRegister(txnHistogram)
prometheus.MustRegister(readBinlogHistogram)
prometheus.MustRegister(readBinlogSizeHistogram)
prometheus.MustRegister(publishBinlogHistogram)
prometheus.MustRegister(findMatchedBinlogHistogram)
}

type metricClient struct {
Expand Down
22 changes: 17 additions & 5 deletions drainer/pump.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package drainer

import (
"fmt"
"io"
"strconv"
"sync"
Expand All @@ -21,6 +22,9 @@ import (
pb "github.com/pingcap/tipb/go-binlog"
)

// sleep 10 millisecond to wait matched binlog
var waitMatchedTime = 10 * time.Millisecond

type binlogEntity struct {
tp pb.BinlogType
startTS int64
Expand Down Expand Up @@ -157,10 +161,12 @@ func (p *Pump) publish(t *tikv.LockResolver) {
case entity = <-p.binlogChan:
}

begin := time.Now()
switch entity.tp {
case pb.BinlogType_Prewrite:
// while we meet the prebinlog we must find it's mathced commit binlog
p.mustFindCommitBinlog(t, entity.startTS)
findMatchedBinlogHistogram.WithLabelValues(p.nodeID).Observe(time.Since(begin).Seconds())
case pb.BinlogType_Commit, pb.BinlogType_Rollback:
// if the commitTs is larger than maxCommitTs,
// we would publish all binlogs:
Expand All @@ -175,6 +181,7 @@ func (p *Pump) publish(t *tikv.LockResolver) {
} else {
binlogs = make(map[int64]*binlogItem)
}
publishBinlogHistogram.WithLabelValues(p.nodeID).Observe(time.Since(begin).Seconds())
}
}

Expand All @@ -195,7 +202,7 @@ func (p *Pump) mustFindCommitBinlog(t *tikv.LockResolver, startTS int64) {

b, ok := p.getPrewriteBinlogEntity(startTS)
if ok {
time.Sleep(waitTime)
time.Sleep(waitMatchedTime)
// check again after sleep a moment
b, ok = p.getPrewriteBinlogEntity(startTS)
if ok {
Expand Down Expand Up @@ -296,7 +303,7 @@ func (p *Pump) putIntoHeap(items map[int64]*binlogItem) {
// if we meet a smaller binlog, we should ignore it. because we have published binlogs that before window low boundary
continue
}
p.bh.push(p.ctx, item)
p.bh.push(p.ctx, item, true)
}

errorBinlogCount.Add(float64(errorBinlogs))
Expand Down Expand Up @@ -352,8 +359,9 @@ func (p *Pump) getDDLJob(id int64) (*model.Job, error) {
}

func (p *Pump) collectBinlogs(windowLower, windowUpper int64) binlogItems {
begin := time.Now()
var bs binlogItems
item := p.bh.peek()
item := p.bh.pop()
for item != nil && item.binlog.CommitTs <= windowUpper {
// make sure to discard old binlogs whose commitTS is earlier or equal minTS
if item.binlog.CommitTs > windowLower {
Expand All @@ -363,10 +371,14 @@ func (p *Pump) collectBinlogs(windowLower, windowUpper int64) binlogItems {
if ComparePos(p.currentPos, item.pos) == -1 {
p.currentPos = item.pos
}
_ = p.bh.pop()
item = p.bh.peek()
item = p.bh.pop()
}
if item != nil {
p.bh.push(p.ctx, item, false)
}

publishBinlogHistogram.WithLabelValues(fmt.Sprintf("%s_collect_binlogs", p.nodeID)).Observe(time.Since(begin).Seconds())

return bs
}

Expand Down
5 changes: 4 additions & 1 deletion drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ func genDrainerID(listenAddr string) (string, error) {
}

func execute(executor executor.Executor, sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error {
// compute txn duration
if len(sqls) == 0 {
return nil
}

beginTime := time.Now()
defer func() {
txnHistogram.Observe(time.Since(beginTime).Seconds())
Expand Down
2 changes: 2 additions & 0 deletions pkg/offsets/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (ks *KafkaSeeker) getAndCompare(topic string, partition int32, offset int64
for msg := range pc.Messages() {
bp, err := ks.operator.Decode(msg)
if err != nil {
//log.Errorf("decode message(offset %d) error %v", message.Offset, err)
//return 1, -2, nil
return 0, bp, errors.Annotatef(err, "decode %s", msg)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func ExecuteSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) er
time.Sleep(RetryWaitTime)
}

err = appleTxn(db, sqls, args)
err = ExecuteTxn(db, sqls, args)
if err == nil {
return nil
}
Expand All @@ -50,7 +50,8 @@ func ExecuteSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) er
return errors.Trace(err)
}

func appleTxn(db *sql.DB, sqls []string, args [][]interface{}) error {
// ExecuteTxn executes transaction
func ExecuteTxn(db *sql.DB, sqls []string, args [][]interface{}) error {
txn, err := db.Begin()
if err != nil {
log.Errorf("exec sqls[%v] begin failed %v", sqls, errors.ErrorStack(err))
Expand Down

0 comments on commit 5c3d7a8

Please sign in to comment.