@@ -154,58 +154,6 @@ func (db *Database) AppendObservationIfNotExist(messageID string, observation Ob
154
154
})
155
155
}
156
156
157
- // QueryMessagesByIndex retrieves messages based on indexed attributes.
158
- func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
159
- var messagesToUpdate []* Message
160
- now := time .Now ()
161
-
162
- // Start a read-only transaction
163
- err := db .db .View (func (txn * badger.Txn ) error {
164
- opts := badger .DefaultIteratorOptions
165
- opts .PrefetchValues = false // Only keys are needed
166
- it := txn .NewIterator (opts )
167
- defer it .Close ()
168
-
169
- metricsCheckedPrefix := fmt .Sprintf ("metricsChecked|%t|" , metricsChecked )
170
-
171
- // Iterate over the metricsChecked index
172
- for it .Seek ([]byte (metricsCheckedPrefix )); it .ValidForPrefix ([]byte (metricsCheckedPrefix )); it .Next () {
173
- item := it .Item ()
174
- key := item .Key ()
175
-
176
- // Extract MessageID from the key and query lastObservedAt index
177
- _ , messageID , err := parseMetricsCheckedIndexKey (key )
178
- if err != nil {
179
- return fmt .Errorf ("failed to parse index key: %w" , err )
180
- }
181
-
182
- message , err := db .GetMessage (messageID )
183
- if err != nil {
184
- return fmt .Errorf ("failed to get message by ID: %w" , err )
185
- }
186
-
187
- // Check if the last observed timestamp is before the specified hours
188
- if message .LastObservedAt .Before (now .Add (- cutOffTime )) {
189
- message .MetricsChecked = true
190
- messagesToUpdate = append (messagesToUpdate , message )
191
- }
192
- }
193
-
194
- return nil
195
- })
196
-
197
- if err != nil {
198
- return nil , fmt .Errorf ("failed to query messages: %w" , err )
199
- }
200
-
201
- // Perform batch updates in smaller transactions
202
- if err := db .batchUpdateMessages (messagesToUpdate ); err != nil {
203
- return nil , fmt .Errorf ("failed to batch update messages: %w" , err )
204
- }
205
-
206
- return messagesToUpdate , nil
207
- }
208
-
209
157
// batchUpdateMessages performs batch updates on a slice of messages.
210
158
func (db * Database ) batchUpdateMessages (messages []* Message ) error {
211
159
batchSize := 100 // Adjust the batch size as needed
@@ -237,50 +185,107 @@ func (db *Database) updateMessagesBatch(messagesBatch []*Message) error {
237
185
})
238
186
}
239
187
240
- func (db * Database ) RemoveObservationsByIndex (metricsChecked bool , cutOffTime time.Duration ) error {
241
- now := time .Now ()
242
- var messagesToUpdate []* Message
243
-
244
- // Start a read-write transaction
245
- err := db .db .Update (func (txn * badger.Txn ) error {
246
- opts := badger .DefaultIteratorOptions
247
- opts .PrefetchValues = false // Only keys are needed
188
+ // iterateIndex iterates over a metricsChecked index and applies a callback function to each item.
189
+ func (db * Database ) iterateIndex (metricsChecked bool , callback func (item * badger.Item ) error ) error {
190
+ opts := badger .DefaultIteratorOptions
191
+ opts .PrefetchValues = false // Only keys are needed
192
+ return db .db .View (func (txn * badger.Txn ) error {
248
193
it := txn .NewIterator (opts )
249
194
defer it .Close ()
250
195
251
196
metricsCheckedPrefix := fmt .Sprintf ("metricsChecked|%t|" , metricsChecked )
252
-
253
- // Iterate over the metricsChecked index
254
197
for it .Seek ([]byte (metricsCheckedPrefix )); it .ValidForPrefix ([]byte (metricsCheckedPrefix )); it .Next () {
255
- item := it .Item ()
256
- key := item .Key ()
257
-
258
- // Extract MessageID from the key
259
- _ , messageID , err := parseMetricsCheckedIndexKey (key )
260
- if err != nil {
261
- return fmt .Errorf ("failed to parse index key: %w" , err )
198
+ if err := callback (it .Item ()); err != nil {
199
+ return err
262
200
}
201
+ }
202
+ return nil
203
+ })
204
+ }
263
205
264
- message , err := db .GetMessage (messageID )
265
- if err != nil {
266
- return fmt .Errorf ("failed to get message by ID: %w" , err )
267
- }
206
+ // processMessage applies the provided logic to the message if it meets the specified conditions.
207
+ func (db * Database ) processMessage (messageID string , now time.Time , cutOffTime time.Duration , updateFunc func (* Message ) bool ) (* Message , error ) {
208
+ message , err := db .GetMessage (messageID )
209
+ if err != nil {
210
+ return nil , fmt .Errorf ("failed to get message by ID: %w" , err )
211
+ }
268
212
269
- // Check if the last observed timestamp is before the specified cutOffTime
270
- if message .LastObservedAt .Before (now .Add (- cutOffTime )) {
271
- // Remove observations array from the message
272
- message .Observations = nil
273
- messagesToUpdate = append (messagesToUpdate , message )
274
- }
213
+ if message .LastObservedAt .Before (now .Add (- cutOffTime )) && updateFunc (message ) {
214
+ return message , nil
215
+ }
216
+ return nil , nil
217
+ }
218
+
219
+ // QueryMessagesByIndex retrieves messages based on indexed attributes.
220
+ func (db * Database ) QueryMessagesByIndex (metricsChecked bool , cutOffTime time.Duration ) ([]* Message , error ) {
221
+ var messagesToUpdate []* Message
222
+ now := time .Now ()
223
+
224
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
225
+ key := item .Key ()
226
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
227
+ if err != nil {
228
+ return fmt .Errorf ("failed to parse index key: %w" , err )
229
+ }
230
+
231
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
232
+ m .MetricsChecked = true
233
+ return true
234
+ })
235
+
236
+ if err != nil {
237
+ return err
238
+ }
239
+
240
+ if message != nil {
241
+ messagesToUpdate = append (messagesToUpdate , message )
242
+ }
243
+
244
+ return nil
245
+ })
246
+
247
+ if err != nil {
248
+ return nil , fmt .Errorf ("failed to query messages: %w" , err )
249
+ }
250
+
251
+ if err := db .batchUpdateMessages (messagesToUpdate ); err != nil {
252
+ return nil , fmt .Errorf ("failed to batch update messages: %w" , err )
253
+ }
254
+
255
+ return messagesToUpdate , nil
256
+ }
257
+
258
+ // RemoveObservationsByIndex removes observations from messages based on indexed attributes.
259
+ func (db * Database ) RemoveObservationsByIndex (metricsChecked bool , cutOffTime time.Duration ) error {
260
+ var messagesToUpdate []* Message
261
+ now := time .Now ()
262
+
263
+ err := db .iterateIndex (metricsChecked , func (item * badger.Item ) error {
264
+ key := item .Key ()
265
+ _ , messageID , err := parseMetricsCheckedIndexKey (key )
266
+ if err != nil {
267
+ return fmt .Errorf ("failed to parse index key: %w" , err )
268
+ }
269
+
270
+ message , err := db .processMessage (messageID , now , cutOffTime , func (m * Message ) bool {
271
+ m .Observations = nil
272
+ return true
273
+ })
274
+
275
+ if err != nil {
276
+ return err
275
277
}
276
278
279
+ if message != nil {
280
+ messagesToUpdate = append (messagesToUpdate , message )
281
+ }
282
+
277
283
return nil
278
284
})
279
285
280
286
if err != nil {
281
287
return err
282
288
}
283
289
284
- // Perform batch updates in smaller transactions
285
290
return db .batchUpdateMessages (messagesToUpdate )
286
291
}
0 commit comments