-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathris.go
154 lines (140 loc) · 4.19 KB
/
ris.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main
import (
"bufio"
"encoding/binary"
"encoding/hex"
"encoding/json"
"flag"
"net"
"net/http"
"os"
"strconv"
"strings"
"github.com/golang/glog"
"github.com/sbezverk/gobmp/pkg/bmp"
)
var stream = "https://ris-live.ripe.net/v1/stream/?format=json"
var (
bmpAddress string
bgpID string
)
func init() {
flag.StringVar(&bmpAddress, "bmp-server", "localhost:5000", "IP or DNS address and port of gobmp server")
flag.StringVar(&bgpID, "bgp-id", "1.1.1.1", "BGP ID to use in BMP messages")
}
// RISData defines structure of data portion of RIS message
type RISData struct {
Type string `json:"type,omitempty"`
Timestamp json.Number `json:"timestamp,omitempty"`
Peer string `json:"peer,omitempty"`
PeerASN string `json:"peer_asn,omitempty"`
ID string `json:"id,omitempty"`
Raw string `json:"raw,omitempty"`
Host string `json:"host,omitempty"`
}
// RIS defines RIS message format
type RIS struct {
Type string `json:"type,omitempty"`
Data *RISData `json:"data,omitempty"`
}
type Message struct {
CommonHeader *bmp.CommonHeader
PerPeerHeader *bmp.PerPeerHeader
}
func main() {
flag.Parse()
_ = flag.Set("logtostderr", "true")
ris, err := http.Get(stream)
if err != nil {
glog.Errorf("failed to connect to RIS source with error: %+v", err)
os.Exit(1)
}
defer ris.Body.Close()
bmpSrv, err := net.Dial("tcp", bmpAddress)
if err != nil {
glog.Errorf("failed to connect to destination with error: %+v", err)
os.Exit(1)
}
defer bmpSrv.Close()
glog.Infof("connection to bmp %v established", bmpSrv.RemoteAddr())
reader := bufio.NewReader(ris.Body)
errorCh := make(chan error)
for {
m := &RIS{}
b, err := reader.ReadBytes('\n')
if err != nil {
glog.Errorf("failed to read message with error: %+v", err)
os.Exit(1)
}
go func(b []byte, errorCh chan error) {
if err := json.Unmarshal(b, m); err != nil {
glog.Errorf("failed to decode streamed message with error: %+v", err)
errorCh <- err
return
}
bmpMsg := Message{}
bmpMsg.CommonHeader = &bmp.CommonHeader{
Version: 3,
MessageType: bmp.RouteMonitorMsg,
}
bmpMsg.PerPeerHeader = &bmp.PerPeerHeader{
PeerType: 0, // * Peer Type = 0: Global Instance Peer
PeerDistinguisher: make([]byte, 8),
PeerAddress: make([]byte, 16),
PeerBGPID: net.ParseIP(bgpID).To4(),
PeerTimestamp: make([]byte, 8),
}
// Populating peer address
pa := net.ParseIP(m.Data.Peer)
if pa.To4() != nil {
bmpMsg.PerPeerHeader.FlagV = false
copy(bmpMsg.PerPeerHeader.PeerAddress[12:16], pa.To4())
} else if pa.To16() != nil {
bmpMsg.PerPeerHeader.FlagV = true
copy(bmpMsg.PerPeerHeader.PeerAddress, pa.To16())
} else {
glog.Warningf("invalid peer address %s", m.Data.Peer)
errorCh <- err
return
}
// Populating Peer ASN
asn, err := strconv.Atoi(m.Data.PeerASN)
if err != nil {
glog.Warningf("invalid peer asn %s", m.Data.PeerASN)
}
bmpMsg.PerPeerHeader.PeerAS = int32(asn)
// Populating peer timestamp
t := strings.Split(m.Data.Timestamp.String(), ".")
sec, _ := strconv.Atoi(t[0])
msec := 0
if len(t) > 1 {
msec, _ = strconv.Atoi(t[1])
}
binary.BigEndian.PutUint32(bmpMsg.PerPeerHeader.PeerTimestamp[0:4], uint32(sec))
binary.BigEndian.PutUint32(bmpMsg.PerPeerHeader.PeerTimestamp[4:8], uint32(msec))
raw, err := hex.DecodeString(m.Data.Raw)
if err != nil {
glog.Warningf("invalid raw data, failed to decode with error: %+v", err)
}
bmpMsg.CommonHeader.MessageLength = int32(6 + 42 + len(raw))
b1, _ := bmpMsg.CommonHeader.Serialize()
b2, _ := bmpMsg.PerPeerHeader.Serialize()
fullMsg := make([]byte, bmpMsg.CommonHeader.MessageLength)
copy(fullMsg, b1)
copy(fullMsg[6:], b2)
copy(fullMsg[48:], raw)
if _, err := bmpSrv.Write(fullMsg); err != nil {
glog.Errorf("fail to write to server %+v with error: %+v", bmpSrv.RemoteAddr(), err)
errorCh <- err
return
}
}(b, errorCh)
// Check if any goroutine reported error, if it is the case, then exit the loop
select {
case err := <-errorCh:
glog.Errorf("go routine failed with error: %+v, exiting the loop", err)
os.Exit(1)
default:
}
}
}