Skip to content

Commit 01be414

Browse files
committed
Improve the code.
1 parent 8f9ea31 commit 01be414

File tree

4 files changed

+107
-82
lines changed

4 files changed

+107
-82
lines changed

cmd/registry/main.go

+49-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"sync"
45
"time"
56

67
"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
@@ -52,21 +53,58 @@ func main() {
5253
return
5354
}
5455

55-
reg, err := registry.NewRegistry(nc)
56+
reg, err := registry.NewRegistry(nc, discovery.DefaultExpire)
5657
if err != nil {
5758
log.Errorf("%v", err)
5859
return
5960
}
60-
reg.Listen(func(action discovery.Action, node discovery.Node) (bool, error) {
61-
//Add authentication here
62-
log.Infof("handle Node: %v, %v", action, node)
63-
//return false, fmt.Errorf("reject action: %v", action)
64-
return true, nil
65-
}, func(service string, params map[string]interface{}) ([]discovery.Node, error) {
66-
//Add load balancing here.
67-
log.Infof("handle get nodes: service %v, params %v", service, params)
68-
return reg.GetNodes(service)
69-
})
61+
62+
nodes := make(map[string]discovery.Node)
63+
mutex := sync.Mutex{}
64+
65+
reg.Listen(
66+
// handleNodeAction
67+
func(action discovery.Action, node discovery.Node) (bool, error) {
68+
log.Infof("handleNodeAction: action %v, node %v", action, node)
69+
70+
/*
71+
You can store node info in db like redis here.
72+
*/
73+
74+
mutex.Lock()
75+
defer mutex.Unlock()
76+
77+
switch action {
78+
case discovery.Save:
79+
fallthrough
80+
case discovery.Update:
81+
nodes[node.ID()] = node
82+
case discovery.Delete:
83+
delete(nodes, node.ID())
84+
}
85+
86+
return true, nil
87+
},
88+
//handleGetNodes
89+
func(service string, params map[string]interface{}) ([]discovery.Node, error) {
90+
log.Infof("handleGetNodes: service %v, params %v", service, params)
91+
92+
/*
93+
You can read the global node information from redis here.
94+
*/
95+
96+
mutex.Lock()
97+
defer mutex.Unlock()
98+
99+
nodesResp := []discovery.Node{}
100+
for _, item := range nodes {
101+
if item.Service == service || service == "*" {
102+
nodesResp = append(nodesResp, item)
103+
}
104+
}
105+
106+
return nodesResp, nil
107+
})
70108

71109
select {}
72110
}

pkg/client/client.go

+14-24
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"sync"
87
"time"
98

109
"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
@@ -20,11 +19,9 @@ var (
2019
type NodeStateChangeCallback func(state discovery.NodeState, node *discovery.Node)
2120

2221
type Client struct {
23-
nc *nats.Conn
24-
nodes map[string]*discovery.Node
25-
nodeLock sync.Mutex
26-
ctx context.Context
27-
cancel context.CancelFunc
22+
nc *nats.Conn
23+
ctx context.Context
24+
cancel context.CancelFunc
2825
}
2926

3027
func (c *Client) Close() {
@@ -35,8 +32,7 @@ func (c *Client) Close() {
3532
func NewClient(nc *nats.Conn) (*Client, error) {
3633

3734
c := &Client{
38-
nc: nc,
39-
nodes: make(map[string]*discovery.Node),
35+
nc: nc,
4036
}
4137

4238
c.ctx, c.cancel = context.WithCancel(context.Background())
@@ -73,36 +69,26 @@ func (c *Client) Get(service string, params map[string]interface{}) (*discovery.
7369
}
7470

7571
func (c *Client) handleNatsMsg(msg *nats.Msg, callback NodeStateChangeCallback) error {
76-
logger.Infof("handle discovery message: %v", msg.Subject)
7772

78-
c.nodeLock.Lock()
79-
defer c.nodeLock.Unlock()
73+
logger.Infof("handle discovery message: %v", msg.Subject)
8074

8175
var event discovery.Request
8276
err := util.Unmarshal(msg.Data, &event)
8377
if err != nil {
8478
logger.Errorf("connect: error parsing offer: %v", err)
8579
return err
8680
}
81+
8782
nid := event.Node.ID()
8883
switch event.Action {
8984
case discovery.Save:
90-
if _, ok := c.nodes[nid]; !ok {
91-
logger.Infof("node.save")
92-
c.nodes[nid] = &event.Node
93-
}
85+
logger.Infof("node.save: %v", nid)
9486
callback(discovery.NodeUp, &event.Node)
9587
case discovery.Update:
96-
if _, ok := c.nodes[nid]; ok {
97-
logger.Infof("node.update")
98-
c.nodes[nid] = &event.Node
99-
}
88+
logger.Infof("node.update: %v", nid)
10089
callback(discovery.NodeKeepalive, &event.Node)
10190
case discovery.Delete:
102-
if _, ok := c.nodes[nid]; ok {
103-
logger.Infof("node.delete")
104-
delete(c.nodes, nid)
105-
}
91+
logger.Infof("node.delete: %v", nid)
10692
callback(discovery.NodeDown, &event.Node)
10793
default:
10894
err = fmt.Errorf("unkonw message: %v", msg.Data)
@@ -132,7 +118,11 @@ func (c *Client) Watch(ctx context.Context, service string, handleNodeState Node
132118
}
133119

134120
go func() error {
135-
defer sub.Unsubscribe()
121+
122+
defer func() {
123+
sub.Unsubscribe()
124+
close(msgCh)
125+
}()
136126

137127
for {
138128
select {

pkg/client/client_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ func TestWatch(t *testing.T) {
6666
}
6767
})
6868

69+
s.Watch(context.Background(), "*", func(state discovery.NodeState, n *discovery.Node) {
70+
if state == discovery.NodeUp {
71+
log.Infof("NodeUp2 => %v", *n)
72+
} else if state == discovery.NodeDown {
73+
log.Infof("NodeDown2 => %v", *n)
74+
}
75+
})
76+
6977
wg.Add(1)
7078

7179
go s.KeepAlive(node)

pkg/registry/registry.go

+36-47
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"io"
77
"strings"
8-
"sync"
98
"time"
109

1110
"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
@@ -26,34 +25,32 @@ type NodeItem struct {
2625

2726
type Registry struct {
2827
nc *nats.Conn
29-
sub *nats.Subscription
30-
nodes map[string]*NodeItem
3128
ctx context.Context
3229
cancel context.CancelFunc
33-
mutex sync.Mutex
30+
expire int64
3431
}
3532

3633
func (s *Registry) Close() {
3734
s.cancel()
38-
s.sub.Unsubscribe()
3935
}
4036

4137
// NewService create a service instance
42-
func NewRegistry(nc *nats.Conn) (*Registry, error) {
38+
func NewRegistry(nc *nats.Conn, expire int64) (*Registry, error) {
4339
s := &Registry{
44-
nc: nc,
45-
nodes: make(map[string]*NodeItem),
40+
nc: nc,
41+
expire: expire,
42+
}
43+
44+
if s.expire <= 0 {
45+
s.expire = discovery.DefaultExpire
4646
}
4747

4848
s.ctx, s.cancel = context.WithCancel(context.Background())
4949
return s, nil
5050
}
5151

52-
func (s *Registry) checkExpires(now int64, handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error)) error {
53-
s.mutex.Lock()
54-
defer s.mutex.Unlock()
55-
56-
for key, item := range s.nodes {
52+
func (s *Registry) checkExpires(nodes map[string]*NodeItem, now int64, handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error)) error {
53+
for key, item := range nodes {
5754
if item.expire <= now {
5855
discoverySubj := strings.ReplaceAll(item.subj, discovery.DefaultPublishPrefix, discovery.DefaultDiscoveryPrefix)
5956
logger.Infof("node.delete %v, %v", discoverySubj, key)
@@ -70,50 +67,38 @@ func (s *Registry) checkExpires(now int64, handleNodeAction func(action discover
7067
return nil
7168
}
7269
handleNodeAction(discovery.Delete, *item.node)
73-
delete(s.nodes, key)
70+
delete(nodes, key)
7471
}
7572
}
7673
return nil
7774
}
7875

79-
func (s *Registry) GetNodes(service string) ([]discovery.Node, error) {
80-
s.mutex.Lock()
81-
defer s.mutex.Unlock()
82-
nodes := []discovery.Node{}
83-
for _, item := range s.nodes {
84-
if item.node.Service == service || service == "*" {
85-
nodes = append(nodes, *item.node)
86-
}
87-
}
88-
return nodes, nil
89-
}
90-
9176
func (s *Registry) Listen(
9277
handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error),
9378
handleGetNodes func(service string, params map[string]interface{}) ([]discovery.Node, error)) error {
94-
var err error
9579

9680
if handleNodeAction == nil || handleGetNodes == nil {
97-
err = fmt.Errorf("Listen callback must be set for Registry.Listen")
81+
err := fmt.Errorf("Listen callback must be set for Registry.Listen")
9882
logger.Warnf("Listen: err => %v", err)
9983
return err
10084
}
10185

10286
subj := discovery.DefaultPublishPrefix + ".>"
10387
msgCh := make(chan *nats.Msg)
10488

105-
if s.sub, err = s.nc.Subscribe(subj, func(msg *nats.Msg) {
89+
sub, err := s.nc.Subscribe(subj, func(msg *nats.Msg) {
10690
msgCh <- msg
107-
}); err != nil {
91+
})
92+
93+
if err != nil {
10894
return err
10995
}
11096

111-
logger.Infof("Registry: listen prefix => %v", subj)
97+
logger.Infof("Registry: listen subj prefix => %v", subj)
11298

113-
handleNatsMsg := func(msg *nats.Msg) error {
114-
s.mutex.Lock()
115-
defer s.mutex.Unlock()
99+
nodes := make(map[string]*NodeItem)
116100

101+
handleNatsMsg := func(msg *nats.Msg) error {
117102
logger.Debugf("handle storage key: %v", msg.Subject)
118103
var req discovery.Request
119104
err := util.Unmarshal(msg.Data, &req)
@@ -128,7 +113,7 @@ func (s *Registry) Listen(
128113
}
129114
switch req.Action {
130115
case discovery.Save:
131-
if _, ok := s.nodes[nid]; !ok {
116+
if _, ok := nodes[nid]; !ok {
132117
logger.Infof("node.save")
133118
// accept or reject
134119
if ok, err := handleNodeAction(req.Action, req.Node); !ok {
@@ -141,16 +126,16 @@ func (s *Registry) Listen(
141126
discoverySubj := strings.ReplaceAll(msg.Subject, discovery.DefaultPublishPrefix, discovery.DefaultDiscoveryPrefix)
142127
s.nc.Publish(discoverySubj, msg.Data)
143128

144-
s.nodes[nid] = &NodeItem{
145-
expire: time.Now().Unix() + discovery.DefaultExpire,
129+
nodes[nid] = &NodeItem{
130+
expire: time.Now().Unix() + s.expire,
146131
node: &req.Node,
147132
subj: msg.Subject,
148133
}
149134
}
150135
case discovery.Update:
151-
if node, ok := s.nodes[nid]; ok {
136+
if node, ok := nodes[nid]; ok {
152137
logger.Debugf("node.update")
153-
node.expire = time.Now().Unix() + discovery.DefaultExpire
138+
node.expire = time.Now().Unix() + s.expire
154139
if ok, err := handleNodeAction(req.Action, req.Node); !ok {
155140
logger.Errorf("aciont %v, rejected %v", req.Action, err)
156141
resp.Success = false
@@ -166,14 +151,14 @@ func (s *Registry) Listen(
166151
}
167152
discoverySubj := strings.ReplaceAll(msg.Subject, discovery.DefaultPublishPrefix, discovery.DefaultDiscoveryPrefix)
168153
s.nc.Publish(discoverySubj, msg.Data)
169-
s.nodes[nid] = &NodeItem{
170-
expire: time.Now().Unix() + discovery.DefaultExpire,
154+
nodes[nid] = &NodeItem{
155+
expire: time.Now().Unix() + s.expire,
171156
node: &req.Node,
172157
subj: msg.Subject,
173158
}
174159
}
175160
case discovery.Delete:
176-
if _, ok := s.nodes[nid]; ok {
161+
if _, ok := nodes[nid]; ok {
177162
logger.Infof("node.delete")
178163
if ok, err := handleNodeAction(req.Action, req.Node); !ok {
179164
logger.Errorf("aciont %v, rejected %v", req.Action, err)
@@ -184,14 +169,12 @@ func (s *Registry) Listen(
184169
discoverySubj := strings.ReplaceAll(msg.Subject, discovery.DefaultPublishPrefix, discovery.DefaultDiscoveryPrefix)
185170
s.nc.Publish(discoverySubj, msg.Data)
186171
}
187-
delete(s.nodes, nid)
172+
delete(nodes, nid)
188173
case discovery.Get:
189174
resp := &discovery.GetResponse{}
190-
s.mutex.Unlock()
191175
if nodes, err := handleGetNodes(req.Service, req.Params); err == nil {
192176
resp.Nodes = nodes
193177
}
194-
s.mutex.Lock()
195178
data, err := util.Marshal(resp)
196179
if err != nil {
197180
logger.Errorf("%v", err)
@@ -209,12 +192,18 @@ func (s *Registry) Listen(
209192
logger.Errorf("%v", err)
210193
return err
211194
}
195+
212196
s.nc.Publish(msg.Reply, data)
213197
return nil
214198
}
215199

216200
go func() error {
217-
defer close(msgCh)
201+
202+
defer func() {
203+
sub.Unsubscribe()
204+
close(msgCh)
205+
}()
206+
218207
now := time.Now().Unix()
219208
t := time.NewTicker(time.Second * 1)
220209
for {
@@ -223,7 +212,7 @@ func (s *Registry) Listen(
223212
return s.ctx.Err()
224213
case <-t.C:
225214
now++
226-
if err := s.checkExpires(now, handleNodeAction); err != nil {
215+
if err := s.checkExpires(nodes, now, handleNodeAction); err != nil {
227216
logger.Warnf("checkExpires err: %v", err)
228217
return err
229218
}

0 commit comments

Comments
 (0)