1
1
// Copyright 2015 The etcd Authors
2
+ // Modified work copyright 2018 The tiglabs Authors.
2
3
//
3
4
// Licensed under the Apache License, Version 2.0 (the "License");
4
5
// you may not use this file except in compliance with the License.
@@ -19,6 +20,7 @@ import (
19
20
"runtime"
20
21
"sync"
21
22
"sync/atomic"
23
+ "time"
22
24
"unsafe"
23
25
24
26
"github.com/tiglabs/raft/logger"
@@ -58,6 +60,11 @@ type peerState struct {
58
60
mu sync.RWMutex
59
61
}
60
62
63
+ type monitorStatus struct {
64
+ conErrCount uint8
65
+ replicasErrCnt map [uint64 ]uint8
66
+ }
67
+
61
68
func (s * peerState ) change (c * proto.ConfChange ) {
62
69
s .mu .Lock ()
63
70
switch c .Type {
@@ -102,6 +109,7 @@ type raft struct {
102
109
peerState peerState
103
110
pending map [uint64 ]* Future
104
111
snapping map [uint64 ]* snapshotStatus
112
+ mStatus * monitorStatus
105
113
propc chan * proposal
106
114
applyc chan * apply
107
115
recvc chan * proto.Message
@@ -130,10 +138,15 @@ func newRaft(config *Config, raftConfig *RaftConfig) (*raft, error) {
130
138
return nil , err
131
139
}
132
140
141
+ mStatus := & monitorStatus {
142
+ conErrCount : 0 ,
143
+ replicasErrCnt : make (map [uint64 ]uint8 ),
144
+ }
133
145
raft := & raft {
134
146
raftFsm : r ,
135
147
config : config ,
136
148
raftConfig : raftConfig ,
149
+ mStatus : mStatus ,
137
150
pending : make (map [uint64 ]* Future ),
138
151
snapping : make (map [uint64 ]* snapshotStatus ),
139
152
recvc : make (chan * proto.Message , config .ReqBufferSize ),
@@ -155,6 +168,7 @@ func newRaft(config *Config, raftConfig *RaftConfig) (*raft, error) {
155
168
156
169
util .RunWorker (raft .runApply , raft .handlePanic )
157
170
util .RunWorker (raft .run , raft .handlePanic )
171
+ util .RunWorker (raft .monitor , raft .handlePanic )
158
172
return raft , nil
159
173
}
160
174
@@ -388,6 +402,51 @@ func (s *raft) run() {
388
402
}
389
403
}
390
404
405
+ func (s * raft ) monitor () {
406
+ statusTicker := time .NewTicker (5 * time .Second )
407
+ leaderTicker := time .NewTicker (1 * time .Minute )
408
+ for {
409
+ select {
410
+ case <- s .stopc :
411
+ statusTicker .Stop ()
412
+ return
413
+
414
+ case <- statusTicker .C :
415
+ if s .raftFsm .leader == NoLeader || s .raftFsm .state == stateCandidate {
416
+ s .mStatus .conErrCount ++
417
+ } else {
418
+ s .mStatus .conErrCount = 0
419
+ }
420
+ if s .mStatus .conErrCount > 5 {
421
+ errMsg := fmt .Sprintf ("raft status not health partitionID[%d]_nodeID[%d]_leader[%v]_state[%v]_replicas[%v]" ,
422
+ s .raftFsm .id , s .raftFsm .config .NodeID , s .raftFsm .leader , s .raftFsm .state , s .raftFsm .peers ())
423
+ logger .Error (errMsg )
424
+
425
+ s .mStatus .conErrCount = 0
426
+ }
427
+ case <- leaderTicker .C :
428
+ if s .raftFsm .state == stateLeader {
429
+ for id , p := range s .raftFsm .replicas {
430
+ if id == s .raftFsm .config .NodeID {
431
+ continue
432
+ }
433
+ if p .active == false {
434
+ s .mStatus .replicasErrCnt [id ]++
435
+ } else {
436
+ s .mStatus .replicasErrCnt [id ] = 0
437
+ }
438
+ if s .mStatus .replicasErrCnt [id ] > 5 {
439
+ errMsg := fmt .Sprintf ("raft partitionID[%d] replicaID[%v] not active peer[%v]" ,
440
+ s .raftFsm .id , id , p .peer )
441
+ logger .Error (errMsg )
442
+ s .mStatus .replicasErrCnt [id ] = 0
443
+ }
444
+ }
445
+ }
446
+ }
447
+ }
448
+ }
449
+
391
450
func (s * raft ) tick () {
392
451
if s .restoringSnapshot .Get () {
393
452
return
@@ -446,6 +505,7 @@ func (s *raft) reciveMessage(m *proto.Message) {
446
505
case <- s .stopc :
447
506
case s .recvc <- m :
448
507
default :
508
+ logger .Warn (fmt .Sprintf ("raft[%v] discard message(%v)" , s .raftConfig .ID , m .ToString ()))
449
509
return
450
510
}
451
511
}
0 commit comments