Skip to content

Commit 4f3a7d1

Browse files
committed
historical_uptime: dynamic batch update and delete messages entirely to reduce memory usage
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent ae37cdb commit 4f3a7d1

File tree

3 files changed

+48
-14
lines changed

3 files changed

+48
-14
lines changed

fly/cmd/historical_uptime/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) {
220220
case <-ctx.Done():
221221
return nil
222222
case <-t.C:
223-
err := db.RemoveObservationsByIndex(true, common.ExpiryDuration)
223+
err := db.RemoveMessagesByIndex(true, common.ExpiryDuration)
224224
if err != nil {
225225
logger.Error("RemoveObservationsByIndex error", zap.Error(err))
226226
}

fly/pkg/db/message.go

+42-8
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"strings"
99
"time"
1010

11-
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
1211
"github.com/dgraph-io/badger/v3"
12+
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
1313
"github.com/wormhole-foundation/wormhole/sdk/vaa"
1414
)
1515

@@ -203,7 +203,6 @@ func (db *Database) iterateIndex(metricsChecked bool, callback func(item *badger
203203
})
204204
}
205205

206-
207206
// processMessage retrieves a message from the database and applies an update function to it if the message is older than the cut-off time
208207
func (db *Database) processMessage(messageID string, now time.Time, cutOffTime time.Duration, updateFunc func(*Message) bool) (*Message, error) {
209208
message, err := db.GetMessage(messageID)
@@ -240,6 +239,13 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
240239

241240
if message != nil {
242241
messagesToUpdate = append(messagesToUpdate, message)
242+
243+
if (len(messagesToUpdate) % common.MessageUpdateBatchSize) == 0 {
244+
if err := db.batchUpdateMessages(messagesToUpdate); err != nil {
245+
return fmt.Errorf("failed to batch update messages: %w", err)
246+
}
247+
messagesToUpdate = messagesToUpdate[:0]
248+
}
243249
}
244250

245251
return nil
@@ -256,11 +262,12 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
256262
return messagesToUpdate, nil
257263
}
258264

259-
// RemoveObservationsByIndex removes observations from messages based on indexed attributes.
260-
func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime time.Duration) error {
261-
var messagesToUpdate []*Message
265+
// removeMessagesByIndex dynamically deletes messages in batches based on indexed attributes.
266+
func (db *Database) RemoveMessagesByIndex(metricsChecked bool, cutOffTime time.Duration) error {
267+
messageIDsToDelete := make([]string, 0)
262268
now := time.Now()
263269

270+
// Iterate over the index to find messages to delete
264271
err := db.iterateIndex(metricsChecked, func(item *badger.Item) error {
265272
key := item.Key()
266273
_, messageID, err := parseMetricsCheckedIndexKey(key)
@@ -269,7 +276,7 @@ func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime ti
269276
}
270277

271278
message, err := db.processMessage(messageID, now, cutOffTime, func(m *Message) bool {
272-
m.Observations = nil
279+
// return true since we just want to delete the message, no updating is needed
273280
return true
274281
})
275282

@@ -278,7 +285,15 @@ func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime ti
278285
}
279286

280287
if message != nil {
281-
messagesToUpdate = append(messagesToUpdate, message)
288+
messageIDsToDelete = append(messageIDsToDelete, messageID)
289+
}
290+
291+
// Delete messages in batches to reduce total memory usage at a time
292+
if len(messageIDsToDelete) >= common.MessageUpdateBatchSize {
293+
if err := db.deleteMessagesBatch(messageIDsToDelete); err != nil {
294+
return fmt.Errorf("failed to delete messages: %w", err)
295+
}
296+
messageIDsToDelete = make([]string, 0)
282297
}
283298

284299
return nil
@@ -288,5 +303,24 @@ func (db *Database) RemoveObservationsByIndex(metricsChecked bool, cutOffTime ti
288303
return err
289304
}
290305

291-
return db.batchUpdateMessages(messagesToUpdate)
306+
return db.deleteMessagesBatch(messageIDsToDelete)
307+
}
308+
309+
func (db *Database) deleteMessagesBatch(messageIDs []string) error {
310+
return db.db.Update(func(txn *badger.Txn) error {
311+
for _, messageID := range messageIDs {
312+
err := txn.Delete([]byte(messageID))
313+
// deleting the message should delete the index as well
314+
mcKey := fmt.Sprintf(metricsCheckedIndexKeyFmt, true, messageID)
315+
if err := txn.Delete([]byte(mcKey)); err != nil {
316+
return fmt.Errorf("failed to delete index for messageID %s: %w", messageID, err)
317+
}
318+
if err != nil {
319+
// Depending on your error handling strategy, you might want to log this error
320+
// and continue, or you might want to return the error immediately.
321+
return fmt.Errorf("failed to delete messageID %s: %w", messageID, err)
322+
}
323+
}
324+
return nil
325+
})
292326
}

fly/pkg/db/message_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,16 @@ func TestRemoveObservationsByIndex(t *testing.T) {
112112
require.NoError(t, err)
113113
}
114114

115-
err := db.RemoveObservationsByIndex(true, 48*time.Hour)
115+
err := db.RemoveMessagesByIndex(true, 48*time.Hour)
116116
require.NoError(t, err)
117117

118118
for _, tc := range testCases {
119-
messageFromDb, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID))
120-
require.NoError(t, err)
119+
_, err := db.GetMessage(fmt.Sprintf("messageId%d", tc.messageID))
120+
// err should return message not found
121121
if tc.expectEmpty {
122-
require.Empty(t, messageFromDb.Observations, "expected observations to be removed for message", tc.messageID)
122+
require.ErrorIs(t, err, ErrMessageNotFound)
123123
} else {
124-
require.NotEmpty(t, messageFromDb.Observations, "expected observations to be present for message", tc.messageID)
124+
require.NoError(t, err)
125125
}
126126
}
127127
}

0 commit comments

Comments
 (0)