-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstream.go
77 lines (68 loc) · 1.79 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package dgnats
import (
"errors"
dgcoll "github.com/darwinOrg/go-common/collection"
dgctx "github.com/darwinOrg/go-common/context"
dglogger "github.com/darwinOrg/go-logger"
"github.com/nats-io/nats.go"
"strings"
)
var streamCache = map[string]*nats.StreamInfo{}
func InitStream(ctx *dgctx.DgContext, subject *NatsSubject) error {
subjectId := subject.GetId()
if streamCache[subjectId] != nil {
return nil
}
js, err := getJs()
if err != nil {
return err
}
streamInfo, _ := js.StreamInfo(subject.Category)
defer func() {
streamCache[subjectId] = streamInfo
}()
if streamInfo != nil {
if dgcoll.AnyMatch(streamInfo.Config.Subjects, func(s string) bool {
return s == subject.Name
}) {
return nil
}
dglogger.Debugf(ctx, "update stream[%s] for %s", subject.Category, subject.Name)
streamInfo.Config.Subjects = append(streamInfo.Config.Subjects, subject.Name)
_, err = js.UpdateStream(&streamInfo.Config)
if err != nil {
return err
}
} else {
dglogger.Debugf(ctx, "add stream %s", subject.Category)
si, err := js.AddStream(buildStreamConfig(subject))
if err != nil {
if errors.Is(err, nats.ErrStreamNameAlreadyInUse) || strings.Contains(err.Error(), "existing") {
return nil
} else {
dglogger.Errorf(ctx, "add stream[%s] error: %v", subject.Category, err)
return err
}
}
streamInfo = si
}
return nil
}
func DeleteStream(ctx *dgctx.DgContext, category string) error {
js, err := getJs()
if err != nil {
return err
}
err = js.DeleteStream(category)
if err != nil {
dglogger.Errorf(ctx, "delete stream[%s] error: %v", category, err)
}
return err
}
func buildStreamConfig(subject *NatsSubject) *nats.StreamConfig {
return &nats.StreamConfig{
Name: subject.Category,
Subjects: []string{subject.Name},
Storage: nats.FileStorage,
}
}