Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historical uptime: missing signature tracking #199

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 47 additions & 19 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
eth_common "github.com/ethereum/go-ethereum/common"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/joho/godotenv"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/db"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/historical_uptime"
"github.com/wormhole-foundation/wormhole-monitor/fly/utils"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
Expand Down Expand Up @@ -56,6 +57,14 @@ var (
},
[]string{"guardian", "chain"},
)

guardianMissedObservations = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "guardian_missed_observations_total",
Help: "Total number of observations missed by each guardian on each chain",
},
[]string{"guardian", "chain"},
)
)

const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet)
Expand Down Expand Up @@ -120,6 +129,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
// adding this will make sure chain labels are present regardless
for _, guardianName := range common.GetGuardianIndexToNameMap() {
guardianObservations.WithLabelValues(guardianName, chainName).Add(0)
guardianMissedObservations.WithLabelValues(guardianName, chainName).Add(0)
}
}
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)
Expand All @@ -134,6 +144,37 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}
}

func initObservationScraper(db *db.Database, logger *zap.Logger) {
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
messages, err := db.QueryMessagesByIndex(false, common.ExpiryDuration)
if err != nil {
logger.Error("QueryMessagesByIndex error", zap.Error(err))
continue
}

// Tally the number of messages for each chain
messagesPerChain := historical_uptime.TallyMessagesPerChain(logger, messages)

// Initialize the missing observations count for each guardian for each chain
guardianMissingObservations := historical_uptime.InitializeMissingObservationsCount(logger, messages, messagesPerChain)

// Decrement the missing observations count for each observed message
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messages)

// Update the metrics with the final count of missing observations
historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations)
}
}
})
}

func main() {
loadEnvVars()
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
Expand Down Expand Up @@ -180,15 +221,18 @@ func main() {
if err != nil {
logger.Fatal("Failed to fetch guardian set", zap.Error(err))
}
logger.Info("guardian set", zap.Uint32("index", idx), zap.Any("gs", sgs))

gs := node_common.GuardianSet{
Keys: sgs.Keys,
Index: idx,
}
gst.Set(&gs)

db := db.OpenDb(logger, nil)

// Start Prometheus scraper
initPromScraper(promRemoteURL, logger)
initObservationScraper(db, logger)

// WIP(bing): add metrics for guardian observations
go func() {
Expand All @@ -197,23 +241,7 @@ func main() {
case <-rootCtx.Done():
return
case o := <-obsvC:
// Ignore observations from pythnnet
// Pythnet sends too many observations that could deteriorate the performance of the fly node
if o.Msg.MessageId[:3] != strconv.Itoa(PYTHNET_CHAIN_ID) + "/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
chainID := strings.Split(o.Msg.MessageId, "/")[0]
ui64, err := strconv.ParseUint(chainID, 10, 16)
if err != nil {
panic(err)
}
chainName := vaa.ChainID(ui64).String()
guardianName, ok := common.GetGuardianName(ga)
if !ok {
logger.Error("guardian name not found", zap.String("guardian", ga))
continue // Skip setting the metric if guardianName is not found
}
guardianObservations.WithLabelValues(guardianName, chainName).Inc()
}
historical_uptime.ProcessObservation(*db, logger, *o)
}
}
}()
Expand Down
9 changes: 9 additions & 0 deletions fly/common/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package common

import (
"time"
)

const (
ExpiryDuration = 30 * time.Hour
)
64 changes: 64 additions & 0 deletions fly/pkg/db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package db

import (
"fmt"
"os"
"path"

"github.com/dgraph-io/badger/v3"
"go.uber.org/zap"
)

type badgerZapLogger struct {
*zap.Logger
}

func (l badgerZapLogger) Errorf(f string, v ...interface{}) {
l.Error(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Warningf(f string, v ...interface{}) {
l.Warn(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Infof(f string, v ...interface{}) {
l.Info(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Debugf(f string, v ...interface{}) {
l.Debug(fmt.Sprintf(f, v...))
}

type Database struct {
db *badger.DB
}

func OpenDb(logger *zap.Logger, dataDir *string) *Database {
var options badger.Options

if dataDir != nil {
dbPath := path.Join(*dataDir, "db")
if err := os.MkdirAll(dbPath, 0700); err != nil {
logger.Fatal("failed to create database directory", zap.Error(err))
}

options = badger.DefaultOptions(dbPath)
} else {
options = badger.DefaultOptions("").WithInMemory(true)
}

options = options.WithLogger(badgerZapLogger{logger})

db, err := badger.Open(options)
if err != nil {
logger.Fatal("failed to open database", zap.Error(err))
}

return &Database{
db: db,
}
}

func (db *Database) Close() error {
return db.db.Close()
}
46 changes: 46 additions & 0 deletions fly/pkg/db/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package db

import (
"fmt"
"strconv"
"strings"

"github.com/dgraph-io/badger/v3"
)

// Index keys formats
const (
// Format for metricsChecked index key: metricsChecked|<bool>|<messageID>
metricsCheckedIndexKeyFmt = "metricsChecked|%t|%s"
)

// CreateOrUpdateIndex creates or updates indexes for a message
func CreateOrUpdateIndex(txn *badger.Txn, message *Message) error {
// Index for metricsChecked
mcKey := fmt.Sprintf(metricsCheckedIndexKeyFmt, message.MetricsChecked, message.MessageID)
if err := txn.Set([]byte(mcKey), []byte(message.MessageID)); err != nil {
return fmt.Errorf("failed to set metricsChecked index: %w", err)
}

return nil
}

// parseMetricsCheckedIndexKey helper function to parse index key and extract values
func parseMetricsCheckedIndexKey(key []byte) (bool, string, error) {
keyStr := string(key) // Convert byte slice to string
parts := strings.Split(keyStr, "|")
if len(parts) != 3 || parts[0] != "metricsChecked" {
return false, "", fmt.Errorf("invalid key format")
}

// Parse the metricsChecked value from the string to a bool
metricsChecked, err := strconv.ParseBool(parts[1])
if err != nil {
return false, "", fmt.Errorf("error parsing metricsChecked value from key: %w", err)
}

// The MessageID is the last part of the key
messageID := parts[2]

return metricsChecked, messageID, nil
}
Loading