Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 82fe4a8

Browse files
committedFeb 7, 2024
historical_uptime: track missing observations by guardians
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent 43998b8 commit 82fe4a8

File tree

9 files changed

+793
-19
lines changed

9 files changed

+793
-19
lines changed
 

‎fly/cmd/historical_uptime/main.go

+47-19
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ import (
1414
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
1515
"github.com/certusone/wormhole/node/pkg/supervisor"
1616
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
17-
eth_common "github.com/ethereum/go-ethereum/common"
1817
ipfslog "github.com/ipfs/go-log/v2"
1918
"github.com/joho/godotenv"
2019
"github.com/libp2p/go-libp2p/core/crypto"
2120
"github.com/prometheus/client_golang/prometheus"
2221
"github.com/prometheus/client_golang/prometheus/promauto"
2322
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
23+
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/db"
24+
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/historical_uptime"
2425
"github.com/wormhole-foundation/wormhole-monitor/fly/utils"
2526
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2627
"go.uber.org/zap"
@@ -56,6 +57,14 @@ var (
5657
},
5758
[]string{"guardian", "chain"},
5859
)
60+
61+
guardianMissedObservations = promauto.NewCounterVec(
62+
prometheus.CounterOpts{
63+
Name: "guardian_missed_observations_total",
64+
Help: "Total number of observations missed by each guardian on each chain",
65+
},
66+
[]string{"guardian", "chain"},
67+
)
5968
)
6069

6170
const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet)
@@ -120,6 +129,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
120129
// adding this will make sure chain labels are present regardless
121130
for _, guardianName := range common.GetGuardianIndexToNameMap() {
122131
guardianObservations.WithLabelValues(guardianName, chainName).Add(0)
132+
guardianMissedObservations.WithLabelValues(guardianName, chainName).Add(0)
123133
}
124134
}
125135
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)
@@ -134,6 +144,37 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
134144
}
135145
}
136146

147+
func initObservationScraper(db *db.Database, logger *zap.Logger) {
148+
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
149+
t := time.NewTicker(15 * time.Second)
150+
151+
for {
152+
select {
153+
case <-ctx.Done():
154+
return nil
155+
case <-t.C:
156+
messages, err := db.QueryMessagesByIndex(false, common.ExpiryDuration)
157+
if err != nil {
158+
logger.Error("QueryMessagesByIndex error", zap.Error(err))
159+
continue
160+
}
161+
162+
// Tally the number of messages for each chain
163+
messagesPerChain := historical_uptime.TallyMessagesPerChain(logger, messages)
164+
165+
// Initialize the missing observations count for each guardian for each chain
166+
guardianMissingObservations := historical_uptime.InitializeMissingObservationsCount(logger, messages, messagesPerChain)
167+
168+
// Decrement the missing observations count for each observed message
169+
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messages)
170+
171+
// Update the metrics with the final count of missing observations
172+
historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations)
173+
}
174+
}
175+
})
176+
}
177+
137178
func main() {
138179
loadEnvVars()
139180
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
@@ -180,15 +221,18 @@ func main() {
180221
if err != nil {
181222
logger.Fatal("Failed to fetch guardian set", zap.Error(err))
182223
}
183-
logger.Info("guardian set", zap.Uint32("index", idx), zap.Any("gs", sgs))
224+
184225
gs := node_common.GuardianSet{
185226
Keys: sgs.Keys,
186227
Index: idx,
187228
}
188229
gst.Set(&gs)
189230

231+
db := db.OpenDb(logger, nil)
232+
190233
// Start Prometheus scraper
191234
initPromScraper(promRemoteURL, logger)
235+
initObservationScraper(db, logger)
192236

193237
// WIP(bing): add metrics for guardian observations
194238
go func() {
@@ -197,23 +241,7 @@ func main() {
197241
case <-rootCtx.Done():
198242
return
199243
case o := <-obsvC:
200-
// Ignore observations from pythnnet
201-
// Pythnet sends too many observations that could deteriorate the performance of the fly node
202-
if o.Msg.MessageId[:3] != strconv.Itoa(PYTHNET_CHAIN_ID) + "/" {
203-
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
204-
chainID := strings.Split(o.Msg.MessageId, "/")[0]
205-
ui64, err := strconv.ParseUint(chainID, 10, 16)
206-
if err != nil {
207-
panic(err)
208-
}
209-
chainName := vaa.ChainID(ui64).String()
210-
guardianName, ok := common.GetGuardianName(ga)
211-
if !ok {
212-
logger.Error("guardian name not found", zap.String("guardian", ga))
213-
continue // Skip setting the metric if guardianName is not found
214-
}
215-
guardianObservations.WithLabelValues(guardianName, chainName).Inc()
216-
}
244+
historical_uptime.ProcessObservation(*db, logger, *o)
217245
}
218246
}
219247
}()

‎fly/common/consts.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package common
2+
3+
import (
4+
"time"
5+
)
6+
7+
const (
8+
ExpiryDuration = 30 * time.Hour
9+
)

‎fly/pkg/db/db.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package db
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path"
7+
8+
"github.com/dgraph-io/badger/v3"
9+
"go.uber.org/zap"
10+
)
11+
12+
type badgerZapLogger struct {
13+
*zap.Logger
14+
}
15+
16+
func (l badgerZapLogger) Errorf(f string, v ...interface{}) {
17+
l.Error(fmt.Sprintf(f, v...))
18+
}
19+
20+
func (l badgerZapLogger) Warningf(f string, v ...interface{}) {
21+
l.Warn(fmt.Sprintf(f, v...))
22+
}
23+
24+
func (l badgerZapLogger) Infof(f string, v ...interface{}) {
25+
l.Info(fmt.Sprintf(f, v...))
26+
}
27+
28+
func (l badgerZapLogger) Debugf(f string, v ...interface{}) {
29+
l.Debug(fmt.Sprintf(f, v...))
30+
}
31+
32+
type Database struct {
33+
db *badger.DB
34+
}
35+
36+
func OpenDb(logger *zap.Logger, dataDir *string) *Database {
37+
var options badger.Options
38+
39+
if dataDir != nil {
40+
dbPath := path.Join(*dataDir, "db")
41+
if err := os.MkdirAll(dbPath, 0700); err != nil {
42+
logger.Fatal("failed to create database directory", zap.Error(err))
43+
}
44+
45+
options = badger.DefaultOptions(dbPath)
46+
} else {
47+
options = badger.DefaultOptions("").WithInMemory(true)
48+
}
49+
50+
options = options.WithLogger(badgerZapLogger{logger})
51+
52+
db, err := badger.Open(options)
53+
if err != nil {
54+
logger.Fatal("failed to open database", zap.Error(err))
55+
}
56+
57+
return &Database{
58+
db: db,
59+
}
60+
}
61+
62+
func (db *Database) Close() error {
63+
return db.db.Close()
64+
}

‎fly/pkg/db/index.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package db
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
8+
"github.com/dgraph-io/badger/v3"
9+
)
10+
11+
// Index keys formats
12+
const (
13+
// Format for metricsChecked index key: metricsChecked|<bool>|<messageID>
14+
metricsCheckedIndexKeyFmt = "metricsChecked|%t|%s"
15+
)
16+
17+
// CreateOrUpdateIndex creates or updates indexes for a message
18+
func CreateOrUpdateIndex(txn *badger.Txn, message *Message) error {
19+
// Index for metricsChecked
20+
mcKey := fmt.Sprintf(metricsCheckedIndexKeyFmt, message.MetricsChecked, message.MessageID)
21+
if err := txn.Set([]byte(mcKey), []byte(message.MessageID)); err != nil {
22+
return fmt.Errorf("failed to set metricsChecked index: %w", err)
23+
}
24+
25+
return nil
26+
}
27+
28+
// parseMetricsCheckedIndexKey helper function to parse index key and extract values
29+
func parseMetricsCheckedIndexKey(key []byte) (bool, string, error) {
30+
keyStr := string(key) // Convert byte slice to string
31+
parts := strings.Split(keyStr, "|")
32+
if len(parts) != 3 || parts[0] != "metricsChecked" {
33+
return false, "", fmt.Errorf("invalid key format")
34+
}
35+
36+
// Parse the metricsChecked value from the string to a bool
37+
metricsChecked, err := strconv.ParseBool(parts[1])
38+
if err != nil {
39+
return false, "", fmt.Errorf("error parsing metricsChecked value from key: %w", err)
40+
}
41+
42+
// The MessageID is the last part of the key
43+
messageID := parts[2]
44+
45+
return metricsChecked, messageID, nil
46+
}

0 commit comments

Comments
 (0)
Please sign in to comment.