-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator.go
140 lines (110 loc) · 3.36 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"log"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
type BlockStat struct {
MessageCount uint64
TotalLatency *big.Int
SentMesssages uint64
ReceivedMessages uint64
}
// Aggregates stats for a single sender -> receiver pair
type Aggregator struct {
ContractPair
messenger map[Identifier]*types.Log
inbox map[Identifier]*types.Log // the key in the map refers to the sender message that is being received
messengerContract Contract
inboxContract Contract
BlockStats map[uint64]BlockStat // with respect to sender blocknum
}
func MakeAggregator(sender, receiver *Chain) (agg Aggregator) {
agg.messenger = make(map[Identifier]*types.Log)
agg.inbox = make(map[Identifier]*types.Log)
agg.BlockStats = make(map[uint64]BlockStat)
agg.Sender = sender
agg.Receiver = receiver
agg.inboxContract, agg.messengerContract = agg.GetContracts()
return
}
// Add a message from the receiver
func (agg *Aggregator) AddInboxMessage(msg *types.Log) (err error) {
name, data, err := agg.inboxContract.ParseEventToDic(*msg)
if err != nil {
return err
}
senderId, err := coerceToIdentifier(data["id"])
if err != nil {
return err
}
// check if message is in messenger outbox
messageLog, ok := agg.messenger[senderId]
bs := agg.GetBlockStats(senderId.BlockNumber)
bs.MessageCount += 1
bs.ReceivedMessages += 1
agg.BlockStats[senderId.BlockNumber] = *bs
if ok {
agg.AddMessagePair(messageLog, msg)
delete(agg.messenger, senderId)
} else {
agg.inbox[senderId] = msg
}
log.Printf("inbox: %s %v", name, data)
return
}
// Add a message from the sender
func (agg *Aggregator) AddMessengerMessage(msg *types.Log) (err error) {
name, data, err := agg.messengerContract.ParseEventToDic(*msg)
if err != nil {
return err
}
id, err := agg.Sender.GetEventIdentifier(*msg)
if err != nil {
return err
}
// check if message is in receiver inbox
messageLog, ok := agg.inbox[id]
bs := agg.GetBlockStats(msg.BlockNumber)
bs.MessageCount += 1
bs.SentMesssages += 1
agg.BlockStats[msg.BlockNumber] = *bs
if ok {
agg.AddMessagePair(msg, messageLog)
delete(agg.inbox, id)
} else {
agg.messenger[id] = msg
}
log.Printf("messenger: %s %v %v", name, data, id)
log.Printf("map currently: %v", agg.inbox[id])
return
}
func (agg Aggregator) GetBlockStats(blockNumber uint64) (bs *BlockStat) {
bs_v, ok := agg.BlockStats[blockNumber]
if !ok {
bs_v = BlockStat{
TotalLatency: big.NewInt(0),
}
agg.BlockStats[blockNumber] = bs_v
}
return &bs_v
}
func (agg Aggregator) AddMessagePair(senderMsg, receiverMsg *types.Log) (err error) {
bs := agg.GetBlockStats(senderMsg.BlockNumber)
// We don't want to rely on the reported identifier time, so just in case we fetch the timestamp
// We keep a cache so we only ever fetch once per block
senderTimestamp, err := agg.Sender.GetBlockTimestamp(big.NewInt(int64(senderMsg.BlockNumber)))
if err != nil {
return
}
receiverTimestamp, err := agg.Receiver.GetBlockTimestamp(big.NewInt(int64(receiverMsg.BlockNumber)))
if err != nil {
return
}
latency := big.NewInt(0)
latency.Sub(receiverTimestamp, senderTimestamp)
bs.TotalLatency.Add(bs.TotalLatency, latency)
agg.BlockStats[senderMsg.BlockNumber] = *bs
log.Printf("addMessagePair: found pair, timestamps %d %d, stats: %v", senderTimestamp.Uint64(), receiverTimestamp.Uint64(), agg.BlockStats)
return
}