-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcached.go
96 lines (76 loc) · 1.72 KB
/
cached.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package logger
import (
"context"
"io"
"sync"
"sync/atomic"
)
var (
_ io.Closer = &CachedLogging[any]{}
_ Log[any] = &CachedLogging[any]{}
)
type CachedLogging[T any] struct {
cancel context.CancelFunc
logger Log[T]
chs []chan T
chsLen uint64
idx uint64
wg *sync.WaitGroup
}
func NewCached[T any](ctx context.Context, log Log[T], mods ...ModifierCached) *CachedLogging[T] {
config := defaultCachedConfig
ctx, cancel := context.WithCancel(ctx)
for _, mod := range mods {
mod(&config)
}
if config.retryCount <= 0 {
config.retryCount = 1
}
chs := make([]chan T, config.workers)
cancelFns := make([]context.CancelFunc, 0, config.workers)
wg := &sync.WaitGroup{}
wg.Add(config.workers)
for i := 0; i < config.workers; i++ {
chs[i] = make(chan T, 4096)
workerCtx, cancel := context.WithCancel(ctx)
cancelFns = append(cancelFns, cancel)
go logWorker(workerCtx, wg, log, chs[i], config.bufferSize, config.retryCount)
}
go func(ctx context.Context) {
<-ctx.Done()
for i := 0; i < config.workers; i++ {
cancel := cancelFns[i]
close(chs[i])
cancel()
}
}(ctx)
return &CachedLogging[T]{
logger: log,
chs: chs,
chsLen: uint64(len(chs)),
cancel: cancel,
wg: wg,
}
}
func (l *CachedLogging[T]) Log(log T) error {
idx := atomic.AddUint64(&l.idx, 1) % l.chsLen
l.chs[idx] <- log
return nil
}
func (l *CachedLogging[T]) LogMultiple(logs []T) error {
length := uint64(len(logs))
for i := uint64(0); i < length; i++ {
// Evenly distribute the logs to the workers
l.chs[i%l.chsLen] <- logs[i]
}
return nil
}
func (l *CachedLogging[T]) Close() error {
c := l.cancel
c()
l.wg.Wait()
if logger, ok := l.logger.(io.Closer); ok {
return logger.Close()
}
return nil
}