Skip to content

Commit 4998106

Browse files
committed
fix: concurrent access issue when metrics collection happen
1 parent d12356e commit 4998106

File tree

4 files changed

+23
-10
lines changed

4 files changed

+23
-10
lines changed

config/config.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,19 @@ import (
99
)
1010

1111
type Elasticsearch struct {
12-
Username string `yaml:"username"`
13-
Password string `yaml:"password"`
1412
BatchByteSizeLimit any `yaml:"batchByteSizeLimit"`
13+
BatchCommitTickerDuration *time.Duration `yaml:"batchCommitTickerDuration"`
1514
CollectionIndexMapping map[string]string `yaml:"collectionIndexMapping"`
1615
MaxConnsPerHost *int `yaml:"maxConnsPerHost"`
1716
MaxIdleConnDuration *time.Duration `yaml:"maxIdleConnDuration"`
1817
DiscoverNodesInterval *time.Duration `yaml:"discoverNodesInterval"`
1918
TypeName string `yaml:"typeName"`
20-
Urls []string `yaml:"urls"`
19+
Password string `yaml:"password"`
20+
Username string `yaml:"username"`
2121
RejectionLog RejectionLog `yaml:"rejectionLog"`
22+
Urls []string `yaml:"urls"`
2223
BatchSizeLimit int `yaml:"batchSizeLimit"`
2324
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
24-
BatchCommitTickerDuration *time.Duration `yaml:"batchCommitTickerDuration"`
2525
ConcurrentRequest int `yaml:"concurrentRequest"`
2626
CompressionEnabled bool `yaml:"compressionEnabled"`
2727
DisableDiscoverNodesOnStart bool `yaml:"disableDiscoverNodesOnStart"`

elasticsearch/bulk/bulk.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -439,19 +439,27 @@ func fillErrorDataWithBulkRequestError(batchActions []*document.ESActionDocument
439439
return errorData
440440
}
441441

442+
func (b *Bulk) LockMetrics() {
443+
b.metricCounterMutex.Lock()
444+
}
445+
446+
func (b *Bulk) UnlockMetrics() {
447+
b.metricCounterMutex.Unlock()
448+
}
449+
442450
func (b *Bulk) finalizeProcess(batchActions []*document.ESActionDocument, errorData map[string]string) {
443451
for _, action := range batchActions {
444452
key := getActionKey(*action)
445453
if _, ok := errorData[key]; ok {
446-
b.countError(action)
454+
go b.countError(action)
447455
if b.sinkResponseHandler != nil {
448456
b.sinkResponseHandler.OnError(&dcpElasticsearch.SinkResponseHandlerContext{
449457
Action: action,
450458
Err: fmt.Errorf(errorData[key]),
451459
})
452460
}
453461
} else {
454-
b.countSuccess(action)
462+
go b.countSuccess(action)
455463
if b.sinkResponseHandler != nil {
456464
b.sinkResponseHandler.OnSuccess(&dcpElasticsearch.SinkResponseHandlerContext{
457465
Action: action,
@@ -462,8 +470,8 @@ func (b *Bulk) finalizeProcess(batchActions []*document.ESActionDocument, errorD
462470
}
463471

464472
func (b *Bulk) countError(action *document.ESActionDocument) {
465-
b.metricCounterMutex.Lock()
466-
defer b.metricCounterMutex.Unlock()
473+
b.LockMetrics()
474+
defer b.UnlockMetrics()
467475

468476
if action.Type == document.Index || action.Type == document.DocUpdate || action.Type == document.ScriptUpdate {
469477
b.metric.IndexingErrorActionCounter[action.IndexName]++
@@ -473,8 +481,8 @@ func (b *Bulk) countError(action *document.ESActionDocument) {
473481
}
474482

475483
func (b *Bulk) countSuccess(action *document.ESActionDocument) {
476-
b.metricCounterMutex.Lock()
477-
defer b.metricCounterMutex.Unlock()
484+
b.LockMetrics()
485+
defer b.UnlockMetrics()
478486

479487
if action.Type == document.Index || action.Type == document.DocUpdate || action.Type == document.ScriptUpdate {
480488
b.metric.IndexingSuccessActionCounter[action.IndexName]++

elasticsearch/bulk/bulk_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"reflect"
88
"strings"
99
"testing"
10+
"time"
1011

1112
"github.com/Trendyol/go-dcp-elasticsearch/elasticsearch"
1213
"github.com/Trendyol/go-dcp-elasticsearch/elasticsearch/document"
@@ -165,6 +166,7 @@ func TestBulk_executeSinkResponseHandler(t *testing.T) {
165166

166167
// When
167168
sut.finalizeProcess(batchActions, errorData)
169+
time.Sleep(1 * time.Second)
168170

169171
// Then
170172
if handler.successResult != "1:someIndex" {

metric/collector.go

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ func (s *Collector) Describe(ch chan<- *prometheus.Desc) {
1919
}
2020

2121
func (s *Collector) Collect(ch chan<- prometheus.Metric) {
22+
s.bulk.LockMetrics()
23+
defer s.bulk.UnlockMetrics()
24+
2225
bulkMetric := s.bulk.GetMetric()
2326

2427
ch <- prometheus.MustNewConstMetric(

0 commit comments

Comments
 (0)