Skip to content

Commit ae2cbe1

Browse files
committed
Add handler for Actions.
1 parent 14a9a98 commit ae2cbe1

File tree

6 files changed

+377
-298
lines changed

6 files changed

+377
-298
lines changed

cmd/registry/main.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
7+
"github.com/cloudwebrtc/nats-discovery/pkg/registry"
78
"github.com/nats-io/nats.go"
89
log "github.com/pion/ion-log"
910
)
@@ -51,13 +52,20 @@ func main() {
5152
return
5253
}
5354

54-
reg, err := discovery.NewRegistry(nc)
55+
reg, err := registry.NewRegistry(nc)
5556
if err != nil {
5657
log.Errorf("%v", err)
5758
return
5859
}
59-
reg.Listen(func(action string, node discovery.Node) {
60+
reg.Listen(func(action discovery.Action, node discovery.Node) (bool, error) {
61+
//Add authentication here
6062
log.Infof("handle Node: %v, %v", action, node)
63+
//return false, fmt.Errorf("reject action: %v", action)
64+
return true, nil
65+
}, func(service string, params map[string]interface{}) ([]discovery.Node, error) {
66+
//Add load balancing here.
67+
log.Infof("handle get nodes: service %v, params %v", service, params)
68+
return reg.GetNodes(service)
6169
})
6270

6371
select {}

pkg/client/client.go

+49-30
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@ import (
1616
type NodeStateChangeCallback func(state discovery.NodeState, node *discovery.Node)
1717

1818
type Client struct {
19-
nc *nats.Conn
20-
sub *nats.Subscription
21-
nodes map[string]*discovery.Node
22-
nodeLock sync.Mutex
23-
ctx context.Context
24-
cancel context.CancelFunc
25-
handleNodeStateChange NodeStateChangeCallback
19+
nc *nats.Conn
20+
sub *nats.Subscription
21+
nodes map[string]*discovery.Node
22+
nodeLock sync.Mutex
23+
ctx context.Context
24+
cancel context.CancelFunc
2625
}
2726

2827
func (c *Client) Close() {
@@ -42,9 +41,11 @@ func NewClient(nc *nats.Conn) (*Client, error) {
4241
return c, nil
4342
}
4443

45-
func (c *Client) Get(service string) (*discovery.GetResponse, error) {
46-
data, err := util.Marshal(&discovery.KeepAlive{
47-
Action: discovery.Get, Node: discovery.Node{},
44+
func (c *Client) Get(service string, params map[string]interface{}) (*discovery.GetResponse, error) {
45+
data, err := util.Marshal(&discovery.Request{
46+
Action: discovery.Get,
47+
Service: service,
48+
Params: params,
4849
})
4950
if err != nil {
5051
log.Errorf("%v", err)
@@ -61,21 +62,21 @@ func (c *Client) Get(service string) (*discovery.GetResponse, error) {
6162
var resp discovery.GetResponse
6263
err = util.Unmarshal(msg.Data, &resp)
6364
if err != nil {
64-
log.Errorf("Get: error parsing offer: %v", err)
65+
log.Errorf("Get: error parsing discovery.GetResponse: %v", err)
6566
return nil, err
6667
}
6768

6869
log.Infof("nodes %v", resp.Nodes)
6970
return &resp, nil
7071
}
7172

72-
func (c *Client) handleMsg(msg *nats.Msg) error {
73+
func (c *Client) handleNatsMsg(msg *nats.Msg, callback NodeStateChangeCallback) error {
7374
log.Infof("handle discovery message: %v", msg.Subject)
7475

7576
c.nodeLock.Lock()
7677
defer c.nodeLock.Unlock()
7778

78-
var event discovery.KeepAlive
79+
var event discovery.Request
7980
err := util.Unmarshal(msg.Data, &event)
8081
if err != nil {
8182
log.Errorf("connect: error parsing offer: %v", err)
@@ -87,31 +88,38 @@ func (c *Client) handleMsg(msg *nats.Msg) error {
8788
if _, ok := c.nodes[nid]; !ok {
8889
log.Infof("node.save")
8990
c.nodes[nid] = &event.Node
90-
c.handleNodeStateChange(discovery.NodeUp, &event.Node)
91+
callback(discovery.NodeUp, &event.Node)
9192
}
9293
case discovery.Update:
9394
if _, ok := c.nodes[nid]; ok {
9495
log.Infof("node.update")
95-
c.handleNodeStateChange(discovery.NodeKeepalive, &event.Node)
96+
callback(discovery.NodeKeepalive, &event.Node)
9697
}
9798
case discovery.Delete:
9899
if _, ok := c.nodes[nid]; ok {
99100
log.Infof("node.delete")
100-
c.handleNodeStateChange(discovery.NodeDown, &event.Node)
101+
callback(discovery.NodeDown, &event.Node)
101102
}
102103
delete(c.nodes, nid)
103104
default:
104-
log.Warnf("unkonw message: %v", string(msg.Data))
105-
return fmt.Errorf("unkonw message: %v", msg.Data)
105+
err = fmt.Errorf("unkonw message: %v", msg.Data)
106+
log.Warnf("handleNatsMsg: err => %v", err)
107+
return err
106108
}
107109

108110
return nil
109111
}
110112

111-
func (c *Client) Watch(service string, onStateChange NodeStateChangeCallback) error {
113+
func (c *Client) Watch(service string, handleNodeState NodeStateChangeCallback) error {
112114
var err error
113-
subj := discovery.DefaultDiscoveryPrefix + "." + service + ".>"
114115

116+
if handleNodeState == nil {
117+
err = fmt.Errorf("Watch callback must be set for %v", service)
118+
log.Warnf("Watch: err => %v", err)
119+
return err
120+
}
121+
122+
subj := discovery.DefaultDiscoveryPrefix + "." + service + ".>"
115123
msgCh := make(chan *nats.Msg)
116124

117125
if c.sub, err = c.nc.Subscribe(subj, func(msg *nats.Msg) {
@@ -120,16 +128,14 @@ func (c *Client) Watch(service string, onStateChange NodeStateChangeCallback) er
120128
return err
121129
}
122130

123-
c.handleNodeStateChange = onStateChange
124-
125131
go func() error {
126132
for {
127133
select {
128134
case <-c.ctx.Done():
129135
return c.ctx.Err()
130136
case msg, ok := <-msgCh:
131137
if ok {
132-
err := c.handleMsg(msg)
138+
err := c.handleNatsMsg(msg, handleNodeState)
133139
if err != nil {
134140
return err
135141
}
@@ -147,11 +153,11 @@ func (c *Client) KeepAlive(node discovery.Node) error {
147153
t := time.NewTicker(discovery.DefaultLivecycle)
148154

149155
defer func() {
150-
c.SendAction(node, discovery.Delete)
156+
c.sendAction(node, discovery.Delete)
151157
t.Stop()
152158
}()
153159

154-
c.SendAction(node, discovery.Save)
160+
c.sendAction(node, discovery.Save)
155161

156162
for {
157163
select {
@@ -160,24 +166,37 @@ func (c *Client) KeepAlive(node discovery.Node) error {
160166
log.Errorf("keepalive abort: err %v", err)
161167
return err
162168
case <-t.C:
163-
c.SendAction(node, discovery.Update)
164-
break
169+
c.sendAction(node, discovery.Update)
165170
}
166171
}
167172
}
168173

169-
func (c *Client) SendAction(node discovery.Node, action string) error {
170-
data, err := util.Marshal(&discovery.KeepAlive{
174+
func (c *Client) sendAction(node discovery.Node, action discovery.Action) error {
175+
data, err := util.Marshal(&discovery.Request{
171176
Action: action, Node: node,
172177
})
173178
if err != nil {
174179
log.Errorf("%v", err)
175180
return err
176181
}
177182
subj := discovery.DefaultPublishPrefix + "." + node.Service + "." + node.ID()
178-
if err := c.nc.Publish(subj, data); err != nil {
183+
msg, err := c.nc.Request(subj, data, time.Duration(time.Second*15))
184+
if err != nil {
179185
log.Errorf("node start error: err=%v, id=%v", err, node.ID())
180186
return nil
181187
}
188+
189+
var resp discovery.Response
190+
err = util.Unmarshal(msg.Data, &resp)
191+
if err != nil {
192+
log.Errorf("sendAction: [%v] parsing discovery.Response error: %v", action, err)
193+
return err
194+
}
195+
196+
if !resp.Success {
197+
err := fmt.Errorf("[%v] response error %v", action, resp.Reason)
198+
log.Errorf("sendAction: error: %v", err)
199+
return err
200+
}
182201
return nil
183202
}

pkg/client/client_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ func TestWatch(t *testing.T) {
7070
go s.KeepAlive(node)
7171
wg.Wait()
7272

73-
res, err := s.Get("sfu")
73+
res, err := s.Get("sfu", map[string]interface{}{
74+
"nid": "11111",
75+
})
7476
if err != nil {
7577
t.Error(err)
7678
}
@@ -80,6 +82,6 @@ func TestWatch(t *testing.T) {
8082
assert.Equal(t, node.RPC, res.Nodes[0].RPC)
8183

8284
wg.Add(1)
83-
s.SendAction(node, discovery.Delete)
85+
s.sendAction(node, discovery.Delete)
8486
wg.Wait()
8587
}

pkg/discovery/discovery.go

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package discovery
2+
3+
import "time"
4+
5+
// NodeState define the node state type
6+
type NodeState int32
7+
8+
const (
9+
// NodeUp node starting up
10+
NodeUp NodeState = 0
11+
// NodeDown node shutdown
12+
NodeDown NodeState = 1
13+
// NodeKeepalive node keepalive
14+
NodeKeepalive NodeState = 2
15+
16+
DefaultPublishPrefix = "node.publish"
17+
DefaultDiscoveryPrefix = "node.discovery"
18+
19+
DefaultLivecycle = 2 * time.Second
20+
DefaultExpire = 5
21+
)
22+
23+
type Action string
24+
25+
const (
26+
Save Action = "save"
27+
Update Action = "update"
28+
Delete Action = "delete"
29+
Get Action = "get"
30+
)
31+
32+
type Protocol string
33+
34+
const (
35+
GRPC Protocol = "grpc"
36+
NGRPC Protocol = "nats-grpc"
37+
JSONRPC Protocol = "json-rpc"
38+
)
39+
40+
type RPC struct {
41+
Protocol Protocol
42+
Addr string
43+
Params map[string]string
44+
}
45+
46+
// Node represents a node info
47+
type Node struct {
48+
DC string
49+
Service string
50+
NID string
51+
RPC RPC
52+
ExtraInfo map[string]interface{}
53+
}
54+
55+
// ID return the node id with scheme prefix
56+
func (n *Node) ID() string {
57+
return n.DC + "." + n.NID
58+
}
59+
60+
type Request struct {
61+
Action Action
62+
Node Node
63+
Service string
64+
Params map[string]interface{}
65+
}
66+
67+
type Response struct {
68+
Success bool
69+
Reason string
70+
}
71+
72+
type GetResponse struct {
73+
Nodes []Node
74+
}

0 commit comments

Comments
 (0)