Skip to content

Commit b1432ad

Browse files
committed
Use the new p2p.Run API
1 parent d63b975 commit b1432ad

File tree

11 files changed

+285
-512
lines changed

11 files changed

+285
-512
lines changed

fly/cmd/fly/main.go

+17-59
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,6 @@ func main() {
194194
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
195195
defer rootCtxCancel()
196196

197-
// Outbound gossip message queue
198-
sendC := make(chan []byte)
199-
200-
// Inbound observations
201-
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
202-
203-
// Inbound observation requests
204-
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
205-
206197
// Inbound signed VAAs
207198
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
208199

@@ -241,29 +232,6 @@ func main() {
241232
mu := sync.Mutex{}
242233
pythNetSeqs := map[string]map[uint64]time.Time{}
243234

244-
// Ignore observations
245-
go func() {
246-
for {
247-
select {
248-
case <-rootCtx.Done():
249-
return
250-
case <-obsvC:
251-
}
252-
}
253-
}()
254-
255-
// Ignore observation requests
256-
// Note: without this, the whole program hangs on observation requests
257-
go func() {
258-
for {
259-
select {
260-
case <-rootCtx.Done():
261-
return
262-
case <-obsvReqC:
263-
}
264-
}
265-
}()
266-
267235
// Write signed VAAs to bigtable periodically
268236
go func() {
269237
signedVAAs := map[string][]byte{}
@@ -534,36 +502,26 @@ func main() {
534502
components.GossipParams.Dlo = 1 // default: 5
535503
components.GossipParams.Dhi = 2 // default: 12
536504
components.GossipParams.Dout = 1 // default: 2
505+
506+
params, err := p2p.NewRunParams(
507+
p2pBootstrap,
508+
p2pNetworkID,
509+
priv,
510+
gst,
511+
rootCtxCancel,
512+
p2p.WithComponents(components),
513+
p2p.WithSignedVAAListener(signedInC),
514+
p2p.WithChainGovernorConfigListener(govConfigC),
515+
p2p.WithChainGovernorStatusListener(govStatusC),
516+
)
517+
if err != nil {
518+
logger.Fatal("Failed to create RunParams", zap.Error(err))
519+
}
520+
537521
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
538522
if err := supervisor.Run(ctx,
539523
"p2p",
540-
p2p.Run(obsvC,
541-
obsvReqC,
542-
nil,
543-
sendC,
544-
signedInC,
545-
priv,
546-
nil,
547-
gst,
548-
p2pNetworkID,
549-
p2pBootstrap,
550-
"",
551-
false,
552-
rootCtxCancel,
553-
nil,
554-
nil,
555-
govConfigC,
556-
govStatusC,
557-
components,
558-
nil,
559-
false,
560-
false,
561-
nil,
562-
nil,
563-
"",
564-
0,
565-
"",
566-
)); err != nil {
524+
p2p.Run(params)); err != nil {
567525
return err
568526
}
569527

fly/cmd/healthcheck/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func main() {
3939
flag.StringVar(&pubKey, "pubKey", "", "A guardian public key")
4040
flag.StringVar(&url, "url", "", "The public web url of a guardian")
4141
flag.DurationVar(&timeout, "timeout", 15*time.Second, "The duration to wait for a heartbeat and observations")
42-
flag.StringVar(&p2pNetworkID, "network", "/wormhole/mainnet/2", "P2P network identifier")
43-
flag.StringVar(&p2pBootstrap, "bootstrap", "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1", "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
42+
flag.StringVar(&p2pNetworkID, "network", p2p.MainnetNetworkId, "P2P network identifier")
43+
flag.StringVar(&p2pBootstrap, "bootstrap", p2p.MainnetBootstrapPeers, "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
4444
flag.UintVar(&p2pPort, "port", p2p.DefaultPort, "P2P UDP listener port")
4545
flag.StringVar(&nodeKeyPath, "nodeKeyPath", "/tmp/health_check.key", "A libp2p node key. Will be created if it does not exist.")
4646
flag.StringVar(&logLevel, "logLevel", "error", "The logging level. Valid values are error, warn, info, and debug.")

fly/cmd/heartbeats/main.go

+69-38
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ func main() {
184184

185185
const (
186186
GSM_signedObservation GossipMsgType = iota
187+
GSM_signedObservationInBatch
188+
GSM_signedObservationBatch
187189
GSM_tbObservation
188190
GSM_signedHeartbeat
189191
GSM_signedVaaWithQuorum
@@ -193,11 +195,9 @@ func main() {
193195
GSM_maxTypeVal
194196
)
195197

196-
// Outbound gossip message queue
197-
sendC := make(chan []byte)
198-
199198
// Inbound observations
200199
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
200+
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)
201201

202202
// Inbound observation requests
203203
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
@@ -266,7 +266,7 @@ func main() {
266266

267267
gossipMsgTable := table.NewWriter()
268268
gossipMsgTable.SetOutputMirror(os.Stdout)
269-
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
269+
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
270270
gossipMsgTable.SetStyle(table.StyleColoredDark)
271271

272272
obsvRateTable := table.NewWriter()
@@ -316,9 +316,11 @@ func main() {
316316
guardianTable.Render()
317317
prompt()
318318
case "m":
319+
gossipLock.Lock()
319320
activeTable = 2
320321
resetTerm(true)
321322
gossipMsgTable.Render()
323+
gossipLock.Unlock()
322324
prompt()
323325
case "o":
324326
activeTable = 3
@@ -337,11 +339,12 @@ func main() {
337339
// Just count observations
338340
go func() {
339341
uniqueObs := map[string]struct{}{}
342+
uniqueObsInBatch := map[string]struct{}{}
340343
for {
341344
select {
342345
case <-rootCtx.Done():
343346
return
344-
case o := <-obsvC:
347+
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
345348
spl := strings.Split(o.Msg.MessageId, "/")
346349
emitter := strings.ToLower(spl[1])
347350
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
@@ -367,9 +370,44 @@ func main() {
367370
gossipLock.Lock()
368371
gossipMsgTable.ResetRows()
369372
for idx, r := range gossipCounter {
370-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
373+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
371374
}
372375
gossipLock.Unlock()
376+
case batch := <-batchObsvC:
377+
for _, o := range batch.Msg.Observations {
378+
spl := strings.Split(o.MessageId, "/")
379+
emitter := strings.ToLower(spl[1])
380+
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
381+
idx := guardianIndexMap[strings.ToLower(addr)]
382+
if knownEmitters[emitter] {
383+
gossipCounter[idx][GSM_tbObservation]++
384+
gossipCounter[totalsRow][GSM_tbObservation]++
385+
}
386+
if handleObsv(uint(idx)) {
387+
obsvRateTable.ResetRows()
388+
for i := 0; i < numGuardians; i++ {
389+
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount,
390+
obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2],
391+
obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5],
392+
obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8],
393+
obsvRateRows[uint(i)].percents[9]})
394+
}
395+
}
396+
gossipCounter[idx][GSM_signedObservationInBatch]++
397+
gossipCounter[totalsRow][GSM_signedObservationInBatch]++
398+
399+
if *loadTesting {
400+
uniqueObsInBatch[hex.EncodeToString(o.Hash)] = struct{}{}
401+
gossipCounter[uniqueRow][GSM_signedObservationInBatch] = len(uniqueObsInBatch)
402+
}
403+
404+
gossipLock.Lock()
405+
gossipMsgTable.ResetRows()
406+
for idx, r := range gossipCounter {
407+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
408+
}
409+
gossipLock.Unlock()
410+
}
373411
}
374412
}
375413
}()
@@ -388,7 +426,7 @@ func main() {
388426
gossipLock.Lock()
389427
gossipMsgTable.ResetRows()
390428
for idx, r := range gossipCounter {
391-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
429+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
392430
}
393431
gossipLock.Unlock()
394432
}
@@ -420,7 +458,7 @@ func main() {
420458
gossipLock.Lock()
421459
gossipMsgTable.ResetRows()
422460
for idx, r := range gossipCounter {
423-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
461+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
424462
}
425463
gossipLock.Unlock()
426464
}
@@ -489,7 +527,7 @@ func main() {
489527
gossipLock.Lock()
490528
gossipMsgTable.ResetRows()
491529
for idx, r := range gossipCounter {
492-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
530+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
493531
}
494532
gossipLock.Unlock()
495533
if activeTable == 0 {
@@ -524,7 +562,7 @@ func main() {
524562
gossipLock.Lock()
525563
gossipMsgTable.ResetRows()
526564
for idx, r := range gossipCounter {
527-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
565+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
528566
}
529567
gossipLock.Unlock()
530568
}
@@ -545,7 +583,7 @@ func main() {
545583
gossipLock.Lock()
546584
gossipMsgTable.ResetRows()
547585
for idx, r := range gossipCounter {
548-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
586+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
549587
}
550588
gossipLock.Unlock()
551589
}
@@ -562,36 +600,29 @@ func main() {
562600
// Run supervisor.
563601
components := p2p.DefaultComponents()
564602
components.Port = *p2pPort
603+
604+
params, err := p2p.NewRunParams(
605+
*p2pBootstrap,
606+
*p2pNetworkID,
607+
priv,
608+
gst,
609+
rootCtxCancel,
610+
p2p.WithComponents(components),
611+
p2p.WithSignedObservationListener(obsvC),
612+
p2p.WithSignedObservationBatchListener(batchObsvC),
613+
p2p.WithSignedVAAListener(signedInC),
614+
p2p.WithObservationRequestListener(obsvReqC),
615+
p2p.WithChainGovernorConfigListener(govConfigC),
616+
p2p.WithChainGovernorStatusListener(govStatusC),
617+
)
618+
if err != nil {
619+
logger.Fatal("Failed to create RunParams", zap.Error(err))
620+
}
621+
565622
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
566623
if err := supervisor.Run(ctx,
567624
"p2p",
568-
p2p.Run(obsvC,
569-
obsvReqC,
570-
nil,
571-
sendC,
572-
signedInC,
573-
priv,
574-
nil,
575-
gst,
576-
*p2pNetworkID,
577-
*p2pBootstrap,
578-
"",
579-
false,
580-
rootCtxCancel,
581-
nil,
582-
nil,
583-
govConfigC,
584-
govStatusC,
585-
components,
586-
nil,
587-
false,
588-
false,
589-
nil,
590-
nil,
591-
"",
592-
0,
593-
"",
594-
)); err != nil {
625+
p2p.Run(params)); err != nil {
595626
return err
596627
}
597628

0 commit comments

Comments
 (0)