Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historical_uptime: regular db cleanup to optimise memory #200

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
rootCtx context.Context
rootCtxCancel context.CancelFunc

dataDir string
p2pNetworkID string
p2pPort uint
p2pBootstrap string
Expand Down Expand Up @@ -74,6 +75,7 @@ func loadEnvVars() {
if err != nil {
log.Fatal("Error loading .env file")
}
dataDir = verifyEnvVar("DATA_DIR")
p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID")
port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32)
if err != nil {
Expand All @@ -95,7 +97,7 @@ func verifyEnvVar(key string) string {
return value
}

func initPromScraper(promRemoteURL string, logger *zap.Logger) {
func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) {
usingPromRemoteWrite := promRemoteURL != ""
if usingPromRemoteWrite {
var info promremotew.PromTelemetryInfo
Expand All @@ -106,7 +108,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}

promLogger := logger.With(zap.String("component", "prometheus_scraper"))
errC := make(chan error)

node_common.StartRunnable(rootCtx, errC, false, "prometheus_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

Expand Down Expand Up @@ -144,8 +146,8 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}
}

func initObservationScraper(db *db.Database, logger *zap.Logger) {
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "observation_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
Expand Down Expand Up @@ -175,6 +177,25 @@ func initObservationScraper(db *db.Database, logger *zap.Logger) {
})
}

func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "db_cleanup", func(ctx context.Context) error {
t := time.NewTicker(common.DatabaseCleanUpInterval)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
err := db.RemoveObservationsByIndex(true, common.ExpiryDuration)
if err != nil {
logger.Error("RemoveObservationsByIndex error", zap.Error(err))
}
}
}
})

}

func main() {
loadEnvVars()
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
Expand Down Expand Up @@ -228,11 +249,23 @@ func main() {
}
gst.Set(&gs)

db := db.OpenDb(logger, nil)

db := db.OpenDb(logger, &dataDir)
promErrC := make(chan error)
// Start Prometheus scraper
initPromScraper(promRemoteURL, logger)
initObservationScraper(db, logger)
initPromScraper(promRemoteURL, logger, promErrC)
initObservationScraper(db, logger, promErrC)
initDatabaseCleanUp(db, logger, promErrC)

go func() {
for {
select {
case <-rootCtx.Done():
return
case err := <-promErrC:
logger.Error("error from prometheus scraper", zap.Error(err))
}
}
}()

// WIP(bing): add metrics for guardian observations
go func() {
Expand Down
3 changes: 3 additions & 0 deletions fly/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ import (

const (
ExpiryDuration = 30 * time.Hour
DatabaseCleanUpInterval = 48 * time.Hour

MessageUpdateBatchSize = 100
)
145 changes: 117 additions & 28 deletions fly/pkg/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/wormhole-foundation/wormhole-monitor/fly/common"
"github.com/dgraph-io/badger/v3"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
Expand Down Expand Up @@ -154,42 +155,91 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
})
}

// QueryMessagesByIndex retrieves messages based on indexed attributes.
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
var messages []*Message
now := time.Now()
// batchUpdateMessages performs batch updates on a slice of messages.
func (db *Database) batchUpdateMessages(messages []*Message) error {
for i := 0; i < len(messages); i += common.MessageUpdateBatchSize {
end := i + common.MessageUpdateBatchSize
if end > len(messages) {
end = len(messages)
}
if err := db.updateMessagesBatch(messages[i:end]); err != nil {
return err
}
}
return nil
}

// Start a read-only transaction
err := db.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // Only keys are needed
// updateMessagesBatch updates a batch of messages in a single transaction.
func (db *Database) updateMessagesBatch(messagesBatch []*Message) error {
return db.db.Update(func(txn *badger.Txn) error {
for _, message := range messagesBatch {
data, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
if err := txn.Set([]byte(message.MessageID), data); err != nil {
return fmt.Errorf("failed to save message: %w", err)
}
}
return nil
})
}

// iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
func (db *Database) iterateIndex(metricsChecked bool, callback func(item *badger.Item) error) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // Only keys are needed
return db.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(opts)
defer it.Close()

metricsCheckedPrefix := fmt.Sprintf("metricsChecked|%t|", metricsChecked)

// Iterate over the metricsChecked index
for it.Seek([]byte(metricsCheckedPrefix)); it.ValidForPrefix([]byte(metricsCheckedPrefix)); it.Next() {
item := it.Item()
key := item.Key()

// Extract MessageID from the key and query lastObservedAt index
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
if err := callback(it.Item()); err != nil {
return err
}
}
return nil
})
}

message, err := db.GetMessage(messageID)
if err != nil {
return fmt.Errorf("failed to get message by ID: %w", err)
}

// Check if the last observed timestamp is before the specified hours
if message.LastObservedAt.Before(now.Add(-cutOffTime)) {
message.MetricsChecked = true
db.SaveMessage(message)
messages = append(messages, message)
}
// processMessage retrieves a message from the database and applies an update function to it if the message is older than the cut-off time
func (db *Database) processMessage(messageID string, now time.Time, cutOffTime time.Duration, updateFunc func(*Message) bool) (*Message, error) {
message, err := db.GetMessage(messageID)
if err != nil {
return nil, fmt.Errorf("failed to get message by ID: %w", err)
}

if message.LastObservedAt.Before(now.Add(-cutOffTime)) && updateFunc(message) {
return message, nil
}
return nil, nil
}

// QueryMessagesByIndex retrieves messages based on indexed attributes.
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
var messagesToUpdate []*Message
now := time.Now()

err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
key := item.Key()
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
}

message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
m.MetricsChecked = true
return true
})

if err != nil {
return err
}

if message != nil {
messagesToUpdate = append(messagesToUpdate, message)
}

return nil
Expand All @@ -199,5 +249,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
return nil, fmt.Errorf("failed to query messages: %w", err)
}

return messages, nil
if err := db.batchUpdateMessages(messagesToUpdate); err != nil {
return nil, fmt.Errorf("failed to batch update messages: %w", err)
}

return messagesToUpdate, nil
}

// RemoveObservationsByIndex removes observations from messages based on indexed attributes.
func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime time.Duration) error {
var messagesToUpdate []*Message
now := time.Now()

err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
key := item.Key()
_, messageID, err := parseMetricsCheckedIndexKey(key)
if err != nil {
return fmt.Errorf("failed to parse index key: %w", err)
}

message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
m.Observations = nil
return true
})

if err != nil {
return err
}

if message != nil {
messagesToUpdate = append(messagesToUpdate, message)
}

return nil
})

if err != nil {
return err
}

return db.batchUpdateMessages(messagesToUpdate)
}
58 changes: 45 additions & 13 deletions fly/pkg/db/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,65 @@ func TestQueryMessagesByIndex(t *testing.T) {
db := OpenDb(zap.NewNop(), nil)
defer db.db.Close()

// Create a message with LastObservedAt set to 30 hours ago
message0 := getMessage(0, time.Now().Add(-30*time.Hour), false)
// Store the time for consistent comparison
observedTime := time.Now().Add(-30 * time.Hour)

// Create messages
message0 := getMessage(0, observedTime, false)
err := db.SaveMessage(message0)
require.NoError(t, err)

// Create a message with LastObservedAt set to 30 hours ago but with metrics checked
message1 := getMessage(1, time.Now().Add(-30*time.Hour), true)
message1 := getMessage(1, observedTime, true)
err = db.SaveMessage(message1)
require.NoError(t, err)

// Create a message with LastObservedAt set to 10 hours ago
message2 := getMessage(2, time.Now().Add(-10*time.Hour), false)
err = db.SaveMessage(message2)
require.NoError(t, err)

// Query messages
result, err := db.QueryMessagesByIndex(false, 30*time.Hour)
require.NoError(t, err)
require.Len(t, result, 1, "expected 1 message")

// Check if the message0 is in the result
require.Equal(t, message0.MessageID, result[0].MessageID)
require.True(t, observedTime.Equal(result[0].LastObservedAt), "message0 should be found in the result set")
}

func TestRemoveObservationsByIndex(t *testing.T) {
db := OpenDb(zap.NewNop(), nil)
defer db.db.Close()

length := len(result)
require.Equal(t, 1, length, "expected 1 message")
testCases := []struct {
messageID int
timeOffset time.Duration
metricsChecked bool
expectEmpty bool
}{
{0, -49 * time.Hour, true, true},
{1, -40 * time.Hour, true, false},
{2, -50 * time.Hour, false, false},
{3, -72 * time.Hour, true, true},
{4, -96 * time.Hour, true, true},
}

for _, tc := range testCases {
message := getMessage(tc.messageID, time.Now().Add(tc.timeOffset), tc.metricsChecked)
err := db.SaveMessage(message)
require.NoError(t, err)
}

err := db.RemoveObservationsByIndex(true, 48*time.Hour)
require.NoError(t, err)

found := false
for _, msg := range result {
if msg.MessageID == message0.MessageID && msg.LastObservedAt.Equal(message0.LastObservedAt) && msg.MetricsChecked == true {
found = true
break
for _, tc := range testCases {
messageFromDb, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID))
require.NoError(t, err)
if tc.expectEmpty {
require.Empty(t, messageFromDb.Observations, "expected observations to be removed for message", tc.messageID)
} else {
require.NotEmpty(t, messageFromDb.Observations, "expected observations to be present for message", tc.messageID)
}
}
require.True(t, found, "message found in the result set")
}
Loading