8
8
"strings"
9
9
"time"
10
10
11
+ "github.com/wormhole-foundation/wormhole-monitor/fly/common"
11
12
"github.com/dgraph-io/badger/v3"
12
13
"github.com/wormhole-foundation/wormhole/sdk/vaa"
13
14
)
@@ -154,42 +155,91 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
154
155
})
155
156
}
156
157
157
- // QueryMessagesByIndex retrieves messages based on indexed attributes.
158
- func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
159
- var messages []* Message
160
- now := time .Now ()
158
+ // batchUpdateMessages performs batch updates on a slice of messages.
159
+ func (db * Database ) batchUpdateMessages (messages []* Message ) error {
160
+ for i := 0 ; i < len (messages ); i += common .MessageUpdateBatchSize {
161
+ end := i + common .MessageUpdateBatchSize
162
+ if end > len (messages ) {
163
+ end = len (messages )
164
+ }
165
+ if err := db .updateMessagesBatch (messages [i :end ]); err != nil {
166
+ return err
167
+ }
168
+ }
169
+ return nil
170
+ }
161
171
162
- // Start a read-only transaction
163
- err := db .db .View (func (txn * badger.Txn ) error {
164
- opts := badger .DefaultIteratorOptions
165
- opts .PrefetchValues = false // Only keys are needed
172
+ // updateMessagesBatch updates a batch of messages in a single transaction.
173
+ func (db * Database ) updateMessagesBatch (messagesBatch []* Message ) error {
174
+ return db .db .Update (func (txn * badger.Txn ) error {
175
+ for _ , message := range messagesBatch {
176
+ data , err := json .Marshal (message )
177
+ if err != nil {
178
+ return fmt .Errorf ("failed to marshal message: %w" , err )
179
+ }
180
+ if err := txn .Set ([]byte (message .MessageID ), data ); err != nil {
181
+ return fmt .Errorf ("failed to save message: %w" , err )
182
+ }
183
+ }
184
+ return nil
185
+ })
186
+ }
187
+
188
+ // iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
189
+ func (db * Database ) iterateIndex (metricsChecked bool , callback func (item * badger.Item ) error ) error {
190
+ opts := badger .DefaultIteratorOptions
191
+ opts .PrefetchValues = false // Only keys are needed
192
+ return db .db .View (func (txn * badger.Txn ) error {
166
193
it := txn .NewIterator (opts )
167
194
defer it .Close ()
168
195
169
196
metricsCheckedPrefix := fmt .Sprintf ("metricsChecked|%t|" , metricsChecked )
170
-
171
- // Iterate over the metricsChecked index
172
197
for it .Seek ([]byte (metricsCheckedPrefix )); it .ValidForPrefix ([]byte (metricsCheckedPrefix )); it .Next () {
173
- item := it .Item ()
174
- key := item .Key ()
175
-
176
- // Extract MessageID from the key and query lastObservedAt index
177
- _ , messageID , err := parseMetricsCheckedIndexKey (key )
178
- if err != nil {
179
- return fmt .Errorf ("failed to parse index key: %w" , err )
198
+ if err := callback (it .Item ()); err != nil {
199
+ return err
180
200
}
201
+ }
202
+ return nil
203
+ })
204
+ }
181
205
182
- message , err := db .GetMessage (messageID )
183
- if err != nil {
184
- return fmt .Errorf ("failed to get message by ID: %w" , err )
185
- }
186
206
187
- // Check if the last observed timestamp is before the specified hours
188
- if message .LastObservedAt .Before (now .Add (- cutOffTime )) {
189
- message .MetricsChecked = true
190
- db .SaveMessage (message )
191
- messages = append (messages , message )
192
- }
207
+ // processMessage retrieves a message from the database and applies an update function to it if the message is older than the cut-off time
208
+ func (db * Database ) processMessage (messageID string , now time.Time , cutOffTime time.Duration , updateFunc func (* Message ) bool ) (* Message , error ) {
209
+ message , err := db .GetMessage (messageID )
210
+ if err != nil {
211
+ return nil , fmt .Errorf ("failed to get message by ID: %w" , err )
212
+ }
213
+
214
+ if message .LastObservedAt .Before (now .Add (- cutOffTime )) && updateFunc (message ) {
215
+ return message , nil
216
+ }
217
+ return nil , nil
218
+ }
219
+
220
+ // QueryMessagesByIndex retrieves messages based on indexed attributes.
221
+ func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
222
+ var messagesToUpdate []* Message
223
+ now := time .Now ()
224
+
225
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
226
+ key := item .Key ()
227
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
228
+ if err != nil {
229
+ return fmt .Errorf ("failed to parse index key: %w" , err )
230
+ }
231
+
232
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
233
+ m .MetricsChecked = true
234
+ return true
235
+ })
236
+
237
+ if err != nil {
238
+ return err
239
+ }
240
+
241
+ if message != nil {
242
+ messagesToUpdate = append (messagesToUpdate , message )
193
243
}
194
244
195
245
return nil
@@ -199,5 +249,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
199
249
return nil , fmt .Errorf ("failed to query messages: %w" , err )
200
250
}
201
251
202
- return messages , nil
252
+ if err := db .batchUpdateMessages (messagesToUpdate ); err != nil {
253
+ return nil , fmt .Errorf ("failed to batch update messages: %w" , err )
254
+ }
255
+
256
+ return messagesToUpdate , nil
257
+ }
258
+
259
+ // RemoveObservationsByIndex removes observations from messages based on indexed attributes.
260
+ func (db * Database ) RemoveObservationsByIndex (metricsChecked bool , cutOffTime time.Duration ) error {
261
+ var messagesToUpdate []* Message
262
+ now := time .Now ()
263
+
264
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
265
+ key := item .Key ()
266
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
267
+ if err != nil {
268
+ return fmt .Errorf ("failed to parse index key: %w" , err )
269
+ }
270
+
271
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
272
+ m .Observations = nil
273
+ return true
274
+ })
275
+
276
+ if err != nil {
277
+ return err
278
+ }
279
+
280
+ if message != nil {
281
+ messagesToUpdate = append (messagesToUpdate , message )
282
+ }
283
+
284
+ return nil
285
+ })
286
+
287
+ if err != nil {
288
+ return err
289
+ }
290
+
291
+ return db .batchUpdateMessages (messagesToUpdate )
203
292
}
0 commit comments