@@ -154,42 +154,91 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
154
154
})
155
155
}
156
156
157
- // QueryMessagesByIndex retrieves messages based on indexed attributes.
158
- func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
159
- var messages []* Message
160
- now := time .Now ()
157
+ // batchUpdateMessages performs batch updates on a slice of messages.
158
+ func (db * Database ) batchUpdateMessages (messages []* Message ) error {
159
+ batchSize := 100
160
+ for i := 0 ; i < len (messages ); i += batchSize {
161
+ end := i + batchSize
162
+ if end > len (messages ) {
163
+ end = len (messages )
164
+ }
165
+ if err := db .updateMessagesBatch (messages [i :end ]); err != nil {
166
+ return err
167
+ }
168
+ }
169
+ return nil
170
+ }
161
171
162
- // Start a read-only transaction
163
- err := db .db .View (func (txn * badger.Txn ) error {
164
- opts := badger .DefaultIteratorOptions
165
- opts .PrefetchValues = false // Only keys are needed
172
+ // updateMessagesBatch updates a batch of messages in a single transaction.
173
+ func (db * Database ) updateMessagesBatch (messagesBatch []* Message ) error {
174
+ return db .db .Update (func (txn * badger.Txn ) error {
175
+ for _ , message := range messagesBatch {
176
+ data , err := json .Marshal (message )
177
+ if err != nil {
178
+ return fmt .Errorf ("failed to marshal message: %w" , err )
179
+ }
180
+ if err := txn .Set ([]byte (message .MessageID ), data ); err != nil {
181
+ return fmt .Errorf ("failed to save message: %w" , err )
182
+ }
183
+ }
184
+ return nil
185
+ })
186
+ }
187
+
188
+ // iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
189
+ func (db * Database ) iterateIndex (metricsChecked bool , callback func (item * badger.Item ) error ) error {
190
+ opts := badger .DefaultIteratorOptions
191
+ opts .PrefetchValues = false // Only keys are needed
192
+ return db .db .View (func (txn * badger.Txn ) error {
166
193
it := txn .NewIterator (opts )
167
194
defer it .Close ()
168
195
169
196
metricsCheckedPrefix := fmt .Sprintf ("metricsChecked|%t|" , metricsChecked )
170
-
171
- // Iterate over the metricsChecked index
172
197
for it .Seek ([]byte (metricsCheckedPrefix )); it .ValidForPrefix ([]byte (metricsCheckedPrefix )); it .Next () {
173
- item := it .Item ()
174
- key := item .Key ()
175
-
176
- // Extract MessageID from the key and query lastObservedAt index
177
- _ , messageID , err := parseMetricsCheckedIndexKey (key )
178
- if err != nil {
179
- return fmt .Errorf ("failed to parse index key: %w" , err )
198
+ if err := callback (it .Item ()); err != nil {
199
+ return err
180
200
}
201
+ }
202
+ return nil
203
+ })
204
+ }
181
205
182
- message , err := db .GetMessage (messageID )
183
- if err != nil {
184
- return fmt .Errorf ("failed to get message by ID: %w" , err )
185
- }
206
+ // processMessage applies the provided logic to the message if it meets the specified conditions.
207
+ func (db * Database ) processMessage (messageID string , now time.Time , cutOffTime time.Duration , updateFunc func (* Message ) bool ) (* Message , error ) {
208
+ message , err := db .GetMessage (messageID )
209
+ if err != nil {
210
+ return nil , fmt .Errorf ("failed to get message by ID: %w" , err )
211
+ }
186
212
187
- // Check if the last observed timestamp is before the specified hours
188
- if message .LastObservedAt .Before (now .Add (- cutOffTime )) {
189
- message .MetricsChecked = true
190
- db .SaveMessage (message )
191
- messages = append (messages , message )
192
- }
213
+ if message .LastObservedAt .Before (now .Add (- cutOffTime )) && updateFunc (message ) {
214
+ return message , nil
215
+ }
216
+ return nil , nil
217
+ }
218
+
219
+ // QueryMessagesByIndex retrieves messages based on indexed attributes.
220
+ func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
221
+ var messagesToUpdate []* Message
222
+ now := time .Now ()
223
+
224
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
225
+ key := item .Key ()
226
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
227
+ if err != nil {
228
+ return fmt .Errorf ("failed to parse index key: %w" , err )
229
+ }
230
+
231
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
232
+ m .MetricsChecked = true
233
+ return true
234
+ })
235
+
236
+ if err != nil {
237
+ return err
238
+ }
239
+
240
+ if message != nil {
241
+ messagesToUpdate = append (messagesToUpdate , message )
193
242
}
194
243
195
244
return nil
@@ -199,5 +248,44 @@ func (db *Database) QueryMessagesByIndex(metricsChecked bool, cutOffTime time.Du
199
248
return nil , fmt .Errorf ("failed to query messages: %w" , err )
200
249
}
201
250
202
- return messages , nil
251
+ if err := db .batchUpdateMessages (messagesToUpdate ); err != nil {
252
+ return nil , fmt .Errorf ("failed to batch update messages: %w" , err )
253
+ }
254
+
255
+ return messagesToUpdate , nil
256
+ }
257
+
258
+ // RemoveObservationsByIndex removes observations from messages based on indexed attributes.
259
+ func (db * Database ) RemoveObservationsByIndex (metricsChecked bool , cutOffTime time.Duration ) error {
260
+ var messagesToUpdate []* Message
261
+ now := time .Now ()
262
+
263
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
264
+ key := item .Key ()
265
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
266
+ if err != nil {
267
+ return fmt .Errorf ("failed to parse index key: %w" , err )
268
+ }
269
+
270
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
271
+ m .Observations = nil
272
+ return true
273
+ })
274
+
275
+ if err != nil {
276
+ return err
277
+ }
278
+
279
+ if message != nil {
280
+ messagesToUpdate = append (messagesToUpdate , message )
281
+ }
282
+
283
+ return nil
284
+ })
285
+
286
+ if err != nil {
287
+ return err
288
+ }
289
+
290
+ return db .batchUpdateMessages (messagesToUpdate )
203
291
}
0 commit comments