Skip to content

Commit 0dcfcd1

Browse files
committed
Use a separate Subscription to watch.
1 parent ae94d70 commit 0dcfcd1

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

pkg/client/client.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type NodeStateChangeCallback func(state discovery.NodeState, node *discovery.Nod
2121

2222
type Client struct {
2323
nc *nats.Conn
24-
sub *nats.Subscription
2524
nodes map[string]*discovery.Node
2625
nodeLock sync.Mutex
2726
ctx context.Context
@@ -30,7 +29,6 @@ type Client struct {
3029

3130
func (c *Client) Close() {
3231
c.cancel()
33-
c.sub.Unsubscribe()
3432
}
3533

3634
// NewService create a service instance
@@ -92,19 +90,20 @@ func (c *Client) handleNatsMsg(msg *nats.Msg, callback NodeStateChangeCallback)
9290
if _, ok := c.nodes[nid]; !ok {
9391
logger.Infof("node.save")
9492
c.nodes[nid] = &event.Node
95-
callback(discovery.NodeUp, &event.Node)
9693
}
94+
callback(discovery.NodeUp, &event.Node)
9795
case discovery.Update:
9896
if _, ok := c.nodes[nid]; ok {
9997
logger.Infof("node.update")
100-
callback(discovery.NodeKeepalive, &event.Node)
98+
c.nodes[nid] = &event.Node
10199
}
100+
callback(discovery.NodeKeepalive, &event.Node)
102101
case discovery.Delete:
103102
if _, ok := c.nodes[nid]; ok {
104103
logger.Infof("node.delete")
105-
callback(discovery.NodeDown, &event.Node)
104+
delete(c.nodes, nid)
106105
}
107-
delete(c.nodes, nid)
106+
callback(discovery.NodeDown, &event.Node)
108107
default:
109108
err = fmt.Errorf("unkonw message: %v", msg.Data)
110109
logger.Warnf("handleNatsMsg: err => %v", err)
@@ -115,24 +114,26 @@ func (c *Client) handleNatsMsg(msg *nats.Msg, callback NodeStateChangeCallback)
115114
}
116115

117116
func (c *Client) Watch(service string, handleNodeState NodeStateChangeCallback) error {
118-
var err error
119-
120117
if handleNodeState == nil {
121-
err = fmt.Errorf("Watch callback must be set for %v", service)
118+
err := fmt.Errorf("Watch callback must be set for %v", service)
122119
logger.Warnf("Watch: err => %v", err)
123120
return err
124121
}
125122

126123
subj := discovery.DefaultDiscoveryPrefix + "." + service + ".>"
127124
msgCh := make(chan *nats.Msg)
128125

129-
if c.sub, err = c.nc.Subscribe(subj, func(msg *nats.Msg) {
126+
sub, err := c.nc.Subscribe(subj, func(msg *nats.Msg) {
130127
msgCh <- msg
131-
}); err != nil {
128+
})
129+
130+
if err != nil {
132131
return err
133132
}
134133

135134
go func() error {
135+
defer sub.Unsubscribe()
136+
136137
for {
137138
select {
138139
case <-c.ctx.Done():

0 commit comments

Comments
 (0)