@@ -7,19 +7,27 @@ package checkin
7
7
8
8
import (
9
9
"context"
10
+ _ "embed"
10
11
"encoding/json"
12
+ "errors"
13
+ "fmt"
11
14
"sync"
12
15
"time"
13
16
14
17
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
15
18
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
16
19
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
17
20
21
+ estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types"
22
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage"
18
23
"github.com/rs/zerolog"
19
24
)
20
25
21
26
const defaultFlushInterval = 10 * time .Second
22
27
28
+ //go:embed deleteAuditFieldsOnCheckin.painless
29
+ var deleteAuditAttributesScript string
30
+
23
31
type optionsT struct {
24
32
flushInterval time.Duration
25
33
}
@@ -33,10 +41,11 @@ func WithFlushInterval(d time.Duration) Opt {
33
41
}
34
42
35
43
type extraT struct {
36
- meta []byte
37
- seqNo sqn.SeqNo
38
- ver string
39
- components []byte
44
+ meta []byte
45
+ seqNo sqn.SeqNo
46
+ ver string
47
+ components []byte
48
+ deleteAudit bool
40
49
}
41
50
42
51
// Minimize the size of this structure.
@@ -102,17 +111,18 @@ func (bc *Bulk) timestamp() string {
102
111
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
103
112
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
104
113
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
105
- func (bc * Bulk ) CheckIn (id string , status string , message string , meta []byte , components []byte , seqno sqn.SeqNo , newVer string , unhealthyReason * []string ) error {
114
+ func (bc * Bulk ) CheckIn (id string , status string , message string , meta []byte , components []byte , seqno sqn.SeqNo , newVer string , unhealthyReason * []string , deleteAudit bool ) error {
106
115
// Separate out the extra data to minimize
107
116
// the memory footprint of the 90% case of just
108
117
// updating the timestamp.
109
118
var extra * extraT
110
- if meta != nil || seqno .IsSet () || newVer != "" || components != nil {
119
+ if meta != nil || seqno .IsSet () || newVer != "" || components != nil || deleteAudit {
111
120
extra = & extraT {
112
- meta : meta ,
113
- seqNo : seqno ,
114
- ver : newVer ,
115
- components : components ,
121
+ meta : meta ,
122
+ seqNo : seqno ,
123
+ ver : newVer ,
124
+ components : components ,
125
+ deleteAudit : deleteAudit ,
116
126
}
117
127
}
118
128
@@ -182,7 +192,6 @@ func (bc *Bulk) flush(ctx context.Context) error {
182
192
// JSON body containing just the timestamp updates.
183
193
var body []byte
184
194
if pendingData .extra == nil {
185
-
186
195
var ok bool
187
196
body , ok = simpleCache [pendingData ]
188
197
if ! ok {
@@ -198,8 +207,27 @@ func (bc *Bulk) flush(ctx context.Context) error {
198
207
}
199
208
simpleCache [pendingData ] = body
200
209
}
210
+ } else if pendingData .extra .deleteAudit {
211
+ // Use a script instead of a partial doc to update if attributes need to be removed
212
+ params , err := encodeParams (nowTimestamp , pendingData )
213
+ if err != nil {
214
+ return err
215
+ }
216
+ action := & estypes.UpdateAction {
217
+ Script : & estypes.InlineScript {
218
+ Lang : & scriptlanguage .Painless ,
219
+ Source : deleteAuditAttributesScript ,
220
+ Params : params ,
221
+ },
222
+ }
223
+ body , err = json .Marshal (& action )
224
+ if err != nil {
225
+ return fmt .Errorf ("could not marshall script action: %w" , err )
226
+ }
227
+ if pendingData .extra .seqNo .IsSet () {
228
+ needRefresh = true
229
+ }
201
230
} else {
202
-
203
231
fields := bulk.UpdateFields {
204
232
dl .FieldLastCheckin : pendingData .ts , // Set the checkin timestamp
205
233
dl .FieldUpdatedAt : nowTimestamp , // Set "updated_at" to the current timestamp
@@ -265,3 +293,58 @@ func (bc *Bulk) flush(ctx context.Context) error {
265
293
266
294
return err
267
295
}
296
+
297
+ func encodeParams (now string , data pendingT ) (map [string ]json.RawMessage , error ) {
298
+ var (
299
+ tsNow json.RawMessage
300
+ ts json.RawMessage
301
+ status json.RawMessage
302
+ message json.RawMessage
303
+ reason json.RawMessage
304
+ ver json.RawMessage
305
+ meta json.RawMessage
306
+ components json.RawMessage
307
+ isSet json.RawMessage
308
+ seqNo json.RawMessage
309
+ err error
310
+ )
311
+ tsNow , err = json .Marshal (now )
312
+ Err := errors .Join (err )
313
+ ts , err = json .Marshal (data .ts )
314
+ Err = errors .Join (Err , err )
315
+ status , err = json .Marshal (data .status )
316
+ Err = errors .Join (Err , err )
317
+ message , err = json .Marshal (data .message )
318
+ Err = errors .Join (Err , err )
319
+ reason , err = json .Marshal (data .unhealthyReason )
320
+ Err = errors .Join (Err , err )
321
+ ver , err = json .Marshal (data .extra .ver )
322
+ Err = errors .Join (Err , err )
323
+ isSet , err = json .Marshal (data .extra .seqNo .IsSet ())
324
+ Err = errors .Join (Err , err )
325
+ seqNo , err = json .Marshal (data .extra .seqNo )
326
+ Err = errors .Join (Err , err )
327
+ if data .extra .meta != nil {
328
+ meta , err = json .Marshal (data .extra .meta )
329
+ Err = errors .Join (Err , err )
330
+ }
331
+ if data .extra .components != nil {
332
+ components , err = json .Marshal (data .extra .components )
333
+ Err = errors .Join (Err , err )
334
+ }
335
+ if Err != nil {
336
+ return nil , Err
337
+ }
338
+ return map [string ]json.RawMessage {
339
+ "Now" : tsNow ,
340
+ "TS" : ts ,
341
+ "Status" : status ,
342
+ "Message" : message ,
343
+ "UnhealthyReason" : reason ,
344
+ "Ver" : ver ,
345
+ "Meta" : meta ,
346
+ "Components" : components ,
347
+ "SeqNoSet" : isSet ,
348
+ "SeqNo" : seqNo ,
349
+ }, nil
350
+ }
0 commit comments