Skip to content

Commit 6b29ab4

Browse files
Add timeout to bulker flush, add default case in failQueue (#3986)
* use poll timeout in es ctx * Add some SCALEDEBUG logs * add agent id to logs * debug logs in fleet.go * add default case, log deadline * 5m timeout * cleanup logs, move deadline to doFlush * move timeout before doFlush * cleanup logs * extracted const * remove log * exit bulker on checkin error * update to latest stack snapshot * revert break LOOP * move deadline inside doFlush * fix cancel * remove doFlush param * separate context * added changelog --------- Co-authored-by: Jill Guyonnet <jill.guyonnet@gmail.com>
1 parent d019e25 commit 6b29ab4

File tree

4 files changed

+66
-15
lines changed

4 files changed

+66
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Added context deadline around flush bulk queue
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component:
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/api/handleAck.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -365,14 +365,14 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag
365365

366366
// Save action result document
367367
if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil {
368-
zlog.Error().Err(err).Msg("create action result")
368+
zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("create action result")
369369
return err
370370
}
371371

372372
if action.Type == TypeUpgrade {
373373
event, _ := ev.AsUpgradeEvent()
374374
if err := ack.handleUpgrade(ctx, zlog, agent, event); err != nil {
375-
zlog.Error().Err(err).Msg("handle upgrade event")
375+
zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("handle upgrade event")
376376
return err
377377
}
378378
}
@@ -634,6 +634,8 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
634634
zlog.Info().
635635
Str("lastReportedVersion", agent.Agent.Version).
636636
Str("upgradedAt", now).
637+
Str(logger.AgentID, agent.Agent.ID).
638+
Str(logger.ActionID, event.ActionId).
637639
Msg("ack upgrade")
638640

639641
return nil

internal/pkg/api/handleCheckin.go

+3
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
254254
}
255255

256256
func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error {
257+
zlog = zlog.With().
258+
Str(logger.AgentID, agent.Id).Logger()
257259
validated, err := ct.validateRequest(zlog, w, r, start, agent)
258260
if err != nil {
259261
return err
@@ -338,6 +340,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
338340
actions, ackToken = convertActions(zlog, agent.Id, pendingActions)
339341

340342
span, ctx := apm.StartSpan(r.Context(), "longPoll", "process")
343+
341344
if len(actions) == 0 {
342345
LOOP:
343346
for {

internal/pkg/bulk/engine.go

+27-13
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,14 @@ type Bulker struct {
9797
}
9898

9999
const (
100-
defaultFlushInterval = time.Second * 5
101-
defaultFlushThresholdCnt = 32768
102-
defaultFlushThresholdSz = 1024 * 1024 * 10
103-
defaultMaxPending = 32
104-
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
105-
defaultAPIKeyMaxParallel = 32
106-
defaultApikeyMaxReqSize = 100 * 1024 * 1024
100+
defaultFlushInterval = time.Second * 5
101+
defaultFlushThresholdCnt = 32768
102+
defaultFlushThresholdSz = 1024 * 1024 * 10
103+
defaultMaxPending = 32
104+
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
105+
defaultAPIKeyMaxParallel = 32
106+
defaultApikeyMaxReqSize = 100 * 1024 * 1024
107+
defaultFlushContextTimeout = time.Minute * 1
107108
)
108109

109110
func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
@@ -416,6 +417,7 @@ func (b *Bulker) Run(ctx context.Context) error {
416417
Int("itemCnt", itemCnt).
417418
Int("byteCnt", byteCnt).
418419
Msg("Flush on timer")
420+
419421
err = doFlush()
420422

421423
case <-ctx.Done():
@@ -443,7 +445,11 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
443445
Str("queue", queue.Type()).
444446
Msg("flushQueue Wait")
445447

446-
if err := w.Acquire(ctx, 1); err != nil {
448+
acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
449+
defer cancel()
450+
451+
if err := w.Acquire(acquireCtx, 1); err != nil {
452+
zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error")
447453
return err
448454
}
449455

@@ -458,6 +464,10 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
458464
go func() {
459465
start := time.Now()
460466

467+
// deadline prevents bulker being blocked on flush
468+
flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
469+
defer cancel()
470+
461471
if b.tracer != nil {
462472
trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker")
463473
trans.Context.SetLabel("queue.size", queue.cnt)
@@ -471,13 +481,13 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
471481
var err error
472482
switch queue.ty {
473483
case kQueueRead, kQueueRefreshRead:
474-
err = b.flushRead(ctx, queue)
484+
err = b.flushRead(flushCtx, queue)
475485
case kQueueSearch, kQueueFleetSearch:
476-
err = b.flushSearch(ctx, queue)
486+
err = b.flushSearch(flushCtx, queue)
477487
case kQueueAPIKeyUpdate:
478-
err = b.flushUpdateAPIKey(ctx, queue)
488+
err = b.flushUpdateAPIKey(flushCtx, queue)
479489
default:
480-
err = b.flushBulk(ctx, queue)
490+
err = b.flushBulk(flushCtx, queue)
481491
}
482492

483493
if err != nil {
@@ -502,8 +512,12 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
502512
func failQueue(queue queueT, err error) {
503513
for n := queue.head; n != nil; {
504514
next := n.next // 'n' is invalid immediately on channel send
505-
n.ch <- respT{
515+
select {
516+
case n.ch <- respT{
506517
err: err,
518+
}:
519+
default:
520+
panic("Unexpected blocked response channel on failQueue")
507521
}
508522
n = next
509523
}

0 commit comments

Comments
 (0)