Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d871946

Browse files
committedFeb 8, 2024
historical_uptime: add db cleanup to optimise memory
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent b2f23e2 commit d871946

File tree

4 files changed

+202
-48
lines changed

4 files changed

+202
-48
lines changed
 

‎fly/cmd/historical_uptime/main.go

+38-7
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func verifyEnvVar(key string) string {
9595
return value
9696
}
9797

98-
func initPromScraper(promRemoteURL string, logger *zap.Logger) {
98+
func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) {
9999
usingPromRemoteWrite := promRemoteURL != ""
100100
if usingPromRemoteWrite {
101101
var info promremotew.PromTelemetryInfo
@@ -106,7 +106,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
106106
}
107107

108108
promLogger := logger.With(zap.String("component", "prometheus_scraper"))
109-
errC := make(chan error)
109+
110110
node_common.StartRunnable(rootCtx, errC, false, "prometheus_scraper", func(ctx context.Context) error {
111111
t := time.NewTicker(15 * time.Second)
112112

@@ -144,8 +144,8 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
144144
}
145145
}
146146

147-
func initObservationScraper(db *db.Database, logger *zap.Logger) {
148-
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
147+
func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error) {
148+
node_common.StartRunnable(rootCtx, errC, false, "observation_scraper", func(ctx context.Context) error {
149149
t := time.NewTicker(15 * time.Second)
150150

151151
for {
@@ -175,6 +175,25 @@ func initObservationScraper(db *db.Database, logger *zap.Logger) {
175175
})
176176
}
177177

178+
func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) {
179+
node_common.StartRunnable(rootCtx, errC, false, "db_cleanup", func(ctx context.Context) error {
180+
t := time.NewTicker(common.DatabaseCleanUpInterval)
181+
182+
for {
183+
select {
184+
case <-ctx.Done():
185+
return nil
186+
case <-t.C:
187+
err := db.RemoveObservationsByIndex(true, common.ExpiryDuration)
188+
if err != nil {
189+
logger.Error("RemoveObservationsByIndex error", zap.Error(err))
190+
}
191+
}
192+
}
193+
})
194+
195+
}
196+
178197
func main() {
179198
loadEnvVars()
180199
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
@@ -229,10 +248,22 @@ func main() {
229248
gst.Set(&gs)
230249

231250
db := db.OpenDb(logger, nil)
232-
251+
promErrC := make(chan error)
233252
// Start Prometheus scraper
234-
initPromScraper(promRemoteURL, logger)
235-
initObservationScraper(db, logger)
253+
initPromScraper(promRemoteURL, logger, promErrC)
254+
initObservationScraper(db, logger, promErrC)
255+
initDatabaseCleanUp(db, logger, promErrC)
256+
257+
go func() {
258+
for {
259+
select {
260+
case <-rootCtx.Done():
261+
return
262+
case err := <-promErrC:
263+
logger.Error("error from prometheus scraper", zap.Error(err))
264+
}
265+
}
266+
}()
236267

237268
// WIP(bing): add metrics for guardian observations
238269
go func() {

‎fly/common/consts.go

+3
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ import (
66

77
const (
88
ExpiryDuration = 30 * time.Hour
9+
DatabaseCleanUpInterval = 48 * time.Hour
10+
11+
MessageUpdateBatchSize = 100
912
)

‎fly/pkg/db/message.go

+116-28
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"time"
1010

11+
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
1112
"github.com/dgraph-io/badger/v3"
1213
"github.com/wormhole-foundation/wormhole/sdk/vaa"
1314
)
@@ -154,42 +155,90 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
154155
})
155156
}
156157

157-
// QueryMessagesByIndex retrieves messages based on indexed attributes.
158-
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
159-
var messages []*Message
160-
now := time.Now()
158+
// batchUpdateMessages performs batch updates on a slice of messages.
159+
func (db *Database) batchUpdateMessages(messages []*Message) error {
160+
for i := 0; i < len(messages); i += common.MessageUpdateBatchSize {
161+
end := i + common.MessageUpdateBatchSize
162+
if end > len(messages) {
163+
end = len(messages)
164+
}
165+
if err := db.updateMessagesBatch(messages[i:end]); err != nil {
166+
return err
167+
}
168+
}
169+
return nil
170+
}
161171

162-
// Start a read-only transaction
163-
err := db.db.View(func(txn *badger.Txn) error {
164-
opts := badger.DefaultIteratorOptions
165-
opts.PrefetchValues = false // Only keys are needed
172+
// updateMessagesBatch updates a batch of messages in a single transaction.
173+
func (db *Database) updateMessagesBatch(messagesBatch []*Message) error {
174+
return db.db.Update(func(txn *badger.Txn) error {
175+
for _, message := range messagesBatch {
176+
data, err := json.Marshal(message)
177+
if err != nil {
178+
return fmt.Errorf("failed to marshal message: %w", err)
179+
}
180+
if err := txn.Set([]byte(message.MessageID), data); err != nil {
181+
return fmt.Errorf("failed to save message: %w", err)
182+
}
183+
}
184+
return nil
185+
})
186+
}
187+
188+
// iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
189+
func (db *Database) iterateIndex(metricsChecked bool, callback func(item *badger.Item) error) error {
190+
opts := badger.DefaultIteratorOptions
191+
opts.PrefetchValues = false // Only keys are needed
192+
return db.db.View(func(txn *badger.Txn) error {
166193
it := txn.NewIterator(opts)
167194
defer it.Close()
168195

169196
metricsCheckedPrefix := fmt.Sprintf("metricsChecked|%t|", metricsChecked)
170-
171-
// Iterate over the metricsChecked index
172197
for it.Seek([]byte(metricsCheckedPrefix)); it.ValidForPrefix([]byte(metricsCheckedPrefix)); it.Next() {
173-
item := it.Item()
174-
key := item.Key()
175-
176-
// Extract MessageID from the key and query lastObservedAt index
177-
_, messageID, err := parseMetricsCheckedIndexKey(key)
178-
if err != nil {
179-
return fmt.Errorf("failed to parse index key: %w", err)
198+
if err := callback(it.Item()); err != nil {
199+
return err
180200
}
201+
}
202+
return nil
203+
})
204+
}
181205

182-
message, err := db.GetMessage(messageID)
183-
if err != nil {
184-
return fmt.Errorf("failed to get message by ID: %w", err)
185-
}
206+
// processMessage applies the provided logic to the message if it meets the specified conditions.
207+
func (db *Database) processMessage(messageID string, now time.Time, cutOffTime time.Duration, updateFunc func(*Message) bool) (*Message, error) {
208+
message, err := db.GetMessage(messageID)
209+
if err != nil {
210+
return nil, fmt.Errorf("failed to get message by ID: %w", err)
211+
}
186212

187-
// Check if the last observed timestamp is before the specified hours
188-
if message.LastObservedAt.Before(now.Add(-cutOffTime)) {
189-
message.MetricsChecked = true
190-
db.SaveMessage(message)
191-
messages = append(messages, message)
192-
}
213+
if message.LastObservedAt.Before(now.Add(-cutOffTime)) && updateFunc(message) {
214+
return message, nil
215+
}
216+
return nil, nil
217+
}
218+
219+
// QueryMessagesByIndex retrieves messages based on indexed attributes.
220+
func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) ([]*Message, error) {
221+
var messagesToUpdate []*Message
222+
now := time.Now()
223+
224+
err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
225+
key := item.Key()
226+
_, messageID, err := parseMetricsCheckedIndexKey(key)
227+
if err != nil {
228+
return fmt.Errorf("failed to parse index key: %w", err)
229+
}
230+
231+
message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
232+
m.MetricsChecked = true
233+
return true
234+
})
235+
236+
if err != nil {
237+
return err
238+
}
239+
240+
if message != nil {
241+
messagesToUpdate = append(messagesToUpdate, message)
193242
}
194243

195244
return nil
@@ -199,5 +248,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
199248
return nil, fmt.Errorf("failed to query messages: %w", err)
200249
}
201250

202-
return messages, nil
251+
if err := db.batchUpdateMessages(messagesToUpdate); err != nil {
252+
return nil, fmt.Errorf("failed to batch update messages: %w", err)
253+
}
254+
255+
return messagesToUpdate, nil
256+
}
257+
258+
// RemoveObservationsByIndex removes observations from messages based on indexed attributes.
259+
func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime time.Duration) error {
260+
var messagesToUpdate []*Message
261+
now := time.Now()
262+
263+
err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
264+
key := item.Key()
265+
_, messageID, err := parseMetricsCheckedIndexKey(key)
266+
if err != nil {
267+
return fmt.Errorf("failed to parse index key: %w", err)
268+
}
269+
270+
message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
271+
m.Observations = nil
272+
return true
273+
})
274+
275+
if err != nil {
276+
return err
277+
}
278+
279+
if message != nil {
280+
messagesToUpdate = append(messagesToUpdate, message)
281+
}
282+
283+
return nil
284+
})
285+
286+
if err != nil {
287+
return err
288+
}
289+
290+
return db.batchUpdateMessages(messagesToUpdate)
203291
}

‎fly/pkg/db/message_test.go

+45-13
Original file line numberDiff line numberDiff line change
@@ -63,33 +63,65 @@ func TestQueryMessagesByIndex(t *testing.T) {
6363
db := OpenDb(zap.NewNop(), nil)
6464
defer db.db.Close()
6565

66-
// Create a message with LastObservedAt set to 30 hours ago
67-
message0 := getMessage(0, time.Now().Add(-30*time.Hour), false)
66+
// Store the time for consistent comparison
67+
observedTime := time.Now().Add(-30 * time.Hour)
68+
69+
// Create messages
70+
message0 := getMessage(0, observedTime, false)
6871
err := db.SaveMessage(message0)
6972
require.NoError(t, err)
7073

71-
// Create a message with LastObservedAt set to 30 hours ago but with metrics checked
72-
message1 := getMessage(1, time.Now().Add(-30*time.Hour), true)
74+
message1 := getMessage(1, observedTime, true)
7375
err = db.SaveMessage(message1)
7476
require.NoError(t, err)
7577

76-
// Create a message with LastObservedAt set to 10 hours ago
7778
message2 := getMessage(2, time.Now().Add(-10*time.Hour), false)
7879
err = db.SaveMessage(message2)
7980
require.NoError(t, err)
8081

82+
// Query messages
8183
result, err := db.QueryMessagesByIndex(false, 30*time.Hour)
8284
require.NoError(t, err)
85+
require.Len(t, result, 1, "expected 1 message")
86+
87+
// Check if the message0 is in the result
88+
require.Equal(t, message0.MessageID, result[0].MessageID)
89+
require.True(t, observedTime.Equal(result[0].LastObservedAt), "message0 should be found in the result set")
90+
}
91+
92+
func TestRemoveObservationsByIndex(t *testing.T) {
93+
db := OpenDb(zap.NewNop(), nil)
94+
defer db.db.Close()
8395

84-
length := len(result)
85-
require.Equal(t, 1, length, "expected 1 message")
96+
testCases := []struct {
97+
messageID int
98+
timeOffset time.Duration
99+
metricsChecked bool
100+
expectEmpty bool
101+
}{
102+
{0, -49 * time.Hour, true, true},
103+
{1, -40 * time.Hour, true, false},
104+
{2, -50 * time.Hour, false, false},
105+
{3, -72 * time.Hour, true, true},
106+
{4, -96 * time.Hour, true, true},
107+
}
108+
109+
for _, tc := range testCases {
110+
message := getMessage(tc.messageID, time.Now().Add(tc.timeOffset), tc.metricsChecked)
111+
err := db.SaveMessage(message)
112+
require.NoError(t, err)
113+
}
114+
115+
err := db.RemoveObservationsByIndex(true, 48*time.Hour)
116+
require.NoError(t, err)
86117

87-
found := false
88-
for _, msg := range result {
89-
if msg.MessageID == message0.MessageID && msg.LastObservedAt.Equal(message0.LastObservedAt) && msg.MetricsChecked == true {
90-
found = true
91-
break
118+
for _, tc := range testCases {
119+
messageFromDb, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID))
120+
require.NoError(t, err)
121+
if tc.expectEmpty {
122+
require.Empty(t, messageFromDb.Observations, "expected observations to be removed for message", tc.messageID)
123+
} else {
124+
require.NotEmpty(t, messageFromDb.Observations, "expected observations to be present for message", tc.messageID)
92125
}
93126
}
94-
require.True(t, found, "message found in the result set")
95127
}

0 commit comments

Comments
 (0)