Skip to content

Commit c67e65d

Browse files
Replace policy and action limiters with a checkin limiter (#3255)
* Replace policy and action limiters with a checkin limiter Replace the seperate policy and action limiters with a unified limiter in the checkinT struct that is used if a response action (which includes policy change actions that are generated by the policy monitor) is detected in the checkin response, and gzip in enabled. --------- Co-authored-by: Julia Bardi <90178898+juliaElastic@users.noreply.github.com>
1 parent 144facb commit c67e65d

19 files changed

+99
-321
lines changed

changelog/fragments/1702940211-Replace-policy-throttle-with-limiter.yaml changelog/fragments/1707246268-Move-action-limiter-to-checkin-response-writer.yaml

+8-7
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@
1111
kind: enhancement
1212

1313
# Change summary; a 80ish characters long description of the change.
14-
summary: Replace policy throttle with limiter
14+
summary: Move action limiter to checkin response writer
1515

1616
# Long description; in case the summary is not enough to describe the change
1717
# this field accommodate a description without length limits.
1818
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
1919
description: |
20-
Replace the existing policy_throttle setting with a policy_limit
21-
setting that controls the same thing, how fast policies are passed to
22-
subscriptions to control mass responses. If no policy_limit settings
23-
are specified the policy_throttle is used with burst 1.
20+
Remove the policy_limiter that was added by 3008 and not released.
21+
Move the action limiter to the checkin handler's write response method before a gzip.Writer is retrieved from the pool or allocated.
22+
This allows fleet-server to control gzip.Writer access for responses that contain either actions from the action dispatcher, or a
23+
POLICY_CHANGE action from the policy monitor.
24+
Removing the throttle/limiter from the policy monitor also greatly improves performance of policy dispatches.
2425
2526
# Affected component; a word indicating the component this changeset affects.
2627
component:
@@ -29,8 +30,8 @@ component:
2930
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
3031
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
3132
# Please provide it if you are adding a fragment for a different PR.
32-
#pr: https://github.com/owner/repo/1234
33+
pr: 3255
3334

3435
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
3536
# If not present is automatically filled by the tooling with the issue linked to the PR number.
36-
issue: 3008
37+
issue: 3254

fleet-server.reference.yml

+3-13
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ fleet:
145145
# # If specified a set of other limits is automatically loaded.
146146
# max_agents: 0
147147
# # policy_throttle is the duration that the fleet-server will wait in between attempts to dispatch policy updates to polling agents
148-
# # deprecated: replaced by policy_limit settings
149-
# policy_throttle: 5ms # 1ms min is forced
148+
# # deprecated: replaced by action_limit settings
149+
# policy_throttle: 5ms
150150
# # max_header_byte_size is the request header size limit
151151
# max_header_byte_size: 8192 # 8Kib
152152
# # max_connections is the maximum number of connnections per API endpoint
153153
# max_connections: 0
154154
#
155-
# # action_limit is a limiter for the action dispatcher, it is added to control how fast the checkin endpoint writes responses when an action effecting multiple agents is detected.
155+
# # action_limit is a limiter for checkin responses that contain actions
156156
# # This is done in order to be able to reuse gzip writers if gzip is requested as allocating new writers is expensive (around 1.2MB for a new allocation).
157157
# # If the interval is too high it may negativly effect assumtions around server write timeouts and poll poll durations, if used we expect the value to be around 5ms.
158158
# # An interval value of 0 disables the limiter by using an infinite rate limit (default behavior).
@@ -161,16 +161,6 @@ fleet:
161161
# interval: 0
162162
# burst: 5
163163
#
164-
# # policy_limit is a limiter used to control how fast fleet-server sends POLICY_CHANGE actions to agents.
165-
# # As with the action_limit settings this is done to avoid allocating a lot of gzip writers.
166-
# # The default settings are to have the interval of 5ms with a burst of 1.
167-
# # A min burst value of 1 is always enforced.
168-
# # If no interval is specified, the policy_throttle may be used as the interval instead.
169-
# # if both interval and policy_throttle are 0, a value of 1ns is used instead.
170-
# policy_limit:
171-
# interval: 5ms
172-
# burst: 1
173-
#
174164
# # endpoint specific limits below
175165
# checkin_limit:
176166
# interval: 1ms

internal/pkg/action/dispatcher.go

+4-15
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
1818

1919
"github.com/rs/zerolog"
20-
"golang.org/x/time/rate"
2120
)
2221

2322
// Sub is an action subscription that will give a single agent all of it's actions.
@@ -34,23 +33,17 @@ func (s Sub) Ch() chan []model.Action {
3433

3534
// Dispatcher tracks agent subscriptions and emits actions to the subscriptions.
3635
type Dispatcher struct {
37-
am monitor.SimpleMonitor
38-
limit *rate.Limiter
36+
am monitor.SimpleMonitor
3937

4038
mx sync.RWMutex
4139
subs map[string]Sub
4240
}
4341

4442
// NewDispatcher creates a Dispatcher using the provided monitor.
45-
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher {
46-
r := rate.Inf
47-
if throttle > 0 {
48-
r = rate.Every(throttle)
49-
}
43+
func NewDispatcher(am monitor.SimpleMonitor) *Dispatcher {
5044
return &Dispatcher{
51-
am: am,
52-
limit: rate.NewLimiter(r, i),
53-
subs: make(map[string]Sub),
45+
am: am,
46+
subs: make(map[string]Sub),
5447
}
5548
}
5649

@@ -129,10 +122,6 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
129122
}
130123

131124
for agentID, actions := range agentActions {
132-
if err := d.limit.Wait(ctx); err != nil {
133-
zerolog.Ctx(ctx).Error().Err(err).Msg("action dispatcher rate limit error")
134-
return
135-
}
136125
d.dispatch(ctx, agentID, actions)
137126
}
138127
}

internal/pkg/action/dispatcher_test.go

+2-43
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/mock"
18-
"golang.org/x/time/rate"
1918
)
2019

2120
type mockMonitor struct {
@@ -39,7 +38,7 @@ func (m *mockMonitor) GetCheckpoint() sqn.SeqNo {
3938

4039
func TestNewDispatcher(t *testing.T) {
4140
m := &mockMonitor{}
42-
d := NewDispatcher(m, 0, 0)
41+
d := NewDispatcher(m)
4342

4443
assert.NotNil(t, d.am)
4544
assert.NotNil(t, d.subs)
@@ -128,41 +127,6 @@ func Test_Dispatcher_Run(t *testing.T) {
128127
Type: "upgrade",
129128
}},
130129
},
131-
}, {
132-
name: "three agent action with limiter",
133-
throttle: 1 * time.Second,
134-
getMock: func() *mockMonitor {
135-
m := &mockMonitor{}
136-
ch := make(chan []es.HitT)
137-
go func() {
138-
ch <- []es.HitT{es.HitT{
139-
Source: json.RawMessage(`{"action_id":"test-action","agents":["agent1","agent2","agent3"],"data":{"key":"value"},"type":"upgrade"}`),
140-
}}
141-
}()
142-
var rch <-chan []es.HitT = ch
143-
m.On("Output").Return(rch)
144-
return m
145-
},
146-
expect: map[string][]model.Action{
147-
"agent1": []model.Action{model.Action{
148-
ActionID: "test-action",
149-
Agents: nil,
150-
Data: json.RawMessage(`{"key":"value"}`),
151-
Type: "upgrade",
152-
}},
153-
"agent2": []model.Action{model.Action{
154-
ActionID: "test-action",
155-
Agents: nil,
156-
Data: json.RawMessage(`{"key":"value"}`),
157-
Type: "upgrade",
158-
}},
159-
"agent3": []model.Action{model.Action{
160-
ActionID: "test-action",
161-
Agents: nil,
162-
Data: json.RawMessage(`{"key":"value"}`),
163-
Type: "upgrade",
164-
}},
165-
},
166130
}, {
167131
name: "one agent action with scheduling",
168132
getMock: func() *mockMonitor {
@@ -235,13 +199,8 @@ func Test_Dispatcher_Run(t *testing.T) {
235199
for _, tt := range tests {
236200
t.Run(tt.name, func(t *testing.T) {
237201
m := tt.getMock()
238-
throttle := rate.Inf
239-
if tt.throttle > 0 {
240-
throttle = rate.Every(tt.throttle)
241-
}
242202
d := &Dispatcher{
243-
am: m,
244-
limit: rate.NewLimiter(throttle, 1),
203+
am: m,
245204
subs: map[string]Sub{
246205
"agent1": Sub{
247206
agentID: "agent1",

internal/pkg/api/handleCheckin.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/hashicorp/go-version"
3535
"github.com/miolini/datacounter"
3636
"github.com/rs/zerolog"
37+
"golang.org/x/time/rate"
3738

3839
"go.elastic.co/apm/module/apmhttp/v2"
3940
"go.elastic.co/apm/v2"
@@ -76,6 +77,7 @@ type CheckinT struct {
7677
// gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests.
7778
// gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected).
7879
// effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute.
80+
limit *rate.Limiter
7981
gwPool sync.Pool
8082
bulker bulk.Bulk
8183
}
@@ -91,6 +93,13 @@ func NewCheckinT(
9193
tr *action.TokenResolver,
9294
bulker bulk.Bulk,
9395
) *CheckinT {
96+
rt := rate.Every(cfg.Limits.ActionLimit.Interval)
97+
if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle != 0 {
98+
rt = rate.Every(cfg.Limits.PolicyThrottle)
99+
} else if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle == 0 {
100+
rt = rate.Inf
101+
}
102+
zerolog.Ctx(context.TODO()).Debug().Any("event_rate", rt).Int("burst", cfg.Limits.ActionLimit.Burst).Msg("checkin response gzip limiter")
94103
ct := &CheckinT{
95104
verCon: verCon,
96105
cfg: cfg,
@@ -100,6 +109,7 @@ func NewCheckinT(
100109
gcp: gcp,
101110
ad: ad,
102111
tr: tr,
112+
limit: rate.NewLimiter(rt, cfg.Limits.ActionLimit.Burst),
103113
gwPool: sync.Pool{
104114
New: func() any {
105115
zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel)
@@ -549,7 +559,17 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r
549559
compressionLevel := ct.cfg.CompressionLevel
550560
compressThreshold := ct.cfg.CompressionThresh
551561

552-
if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
562+
if (len(fromPtr(resp.Actions)) > 0 || len(payload) > compressThreshold) && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
563+
// We compress the response if it would be over the threshold (default 1kb) or we are sending an action.
564+
// gzip.Writer allocations are expensive, and we can easily hit an OOM issue if we try to write a large number of responses at once.
565+
// This is likely to happen on bulk upgrades, or policy changes.
566+
// In order to avoid a massive memory allocation we do two things:
567+
// 1. re-use gzip.Writer instances across responses (through a pool)
568+
// 2. rate-limit access to the writer pool
569+
if err := ct.limit.Wait(ctx); err != nil {
570+
return fmt.Errorf("checkin response limiter error: %w", err)
571+
}
572+
553573
wrCounter := datacounter.NewWriterCounter(w)
554574

555575
zipper, _ := ct.gwPool.Get().(*gzip.Writer)

internal/pkg/api/handleCheckin_test.go

+42-2
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,12 @@ func TestResolveSeqNo(t *testing.T) {
267267
ctx, cancel := context.WithCancel(context.Background())
268268
defer cancel()
269269
verCon := mustBuildConstraints("8.0.0")
270-
cfg := &config.Server{}
270+
cfg := &config.Server{Limits: config.ServerLimits{ActionLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}}}
271271
c, _ := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
272272
bc := checkin.NewBulk(nil)
273273
bulker := ftesting.NewMockBulk()
274274
pim := mockmonitor.NewMockMonitor()
275-
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
275+
pm := policy.NewMonitor(bulker, pim)
276276
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)
277277

278278
resp, _ := ct.resolveSeqNo(ctx, logger, tc.req, tc.agent)
@@ -623,6 +623,46 @@ func Test_CheckinT_writeResponse(t *testing.T) {
623623
}
624624
}
625625

626+
func Test_CheckinT_writeResponse_limiter(t *testing.T) {
627+
verCon := mustBuildConstraints("8.0.0")
628+
cfg := &config.Server{
629+
CompressionLevel: flate.BestSpeed,
630+
CompressionThresh: 1,
631+
Limits: config.ServerLimits{
632+
ActionLimit: config.Limit{
633+
Interval: 50 * time.Millisecond,
634+
Burst: 1,
635+
},
636+
},
637+
}
638+
639+
ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk())
640+
wr1 := httptest.NewRecorder()
641+
wr2 := httptest.NewRecorder()
642+
req := &http.Request{
643+
Header: http.Header{
644+
"Accept-Encoding": []string{"gzip"},
645+
},
646+
}
647+
cresp := CheckinResponse{
648+
Action: "checkin",
649+
}
650+
651+
ts := time.Now()
652+
err1 := ct.writeResponse(testlog.SetLogger(t), wr1, req, &model.Agent{}, cresp)
653+
err2 := ct.writeResponse(testlog.SetLogger(t), wr2, req, &model.Agent{}, cresp)
654+
dur := time.Since(ts)
655+
require.NoError(t, err1)
656+
require.NoError(t, err2)
657+
658+
res1 := wr1.Result()
659+
assert.Equal(t, "gzip", res1.Header.Get("Content-Encoding"))
660+
res2 := wr2.Result()
661+
assert.Equal(t, "gzip", res2.Header.Get("Content-Encoding"))
662+
663+
assert.GreaterOrEqual(t, dur, 50*time.Millisecond)
664+
}
665+
626666
func Benchmark_CheckinT_writeResponse(b *testing.B) {
627667
verCon := mustBuildConstraints("8.0.0")
628668
cfg := &config.Server{

internal/pkg/api/server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func Test_server_Run(t *testing.T) {
5050
require.NoError(t, err)
5151
bulker := ftesting.NewMockBulk()
5252
pim := mock.NewMockMonitor()
53-
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
53+
pm := policy.NewMonitor(bulker, pim)
5454
bc := checkin.NewBulk(nil)
5555
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)
5656
et, err := NewEnrollerT(verCon, cfg, nil, c)

internal/pkg/config/defaults/lte10000_limits.yml

+1-4
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ server_limits:
99
max_connections: 22000
1010
action_limit:
1111
interval: 1ms
12-
burst: 10
13-
policy_interval:
14-
interval: 5ms
15-
burst: 1
12+
burst: 100
1613
checkin_limit:
1714
interval: 1ms
1815
burst: 2000

internal/pkg/config/defaults/lte20000_limits.yml

-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ server_limits:
1010
action_limit:
1111
interval: 1ms
1212
burst: 100
13-
policy_limit:
14-
interval: 5ms
15-
burst: 1
1613
checkin_limit:
1714
interval: 0.5ms
1815
burst: 4000

internal/pkg/config/defaults/lte2500_limite.yml

+1-4
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ server_limits:
99
max_connections: 7000
1010
action_limit:
1111
interval: 5ms
12-
burst: 1
13-
policy_limit:
14-
interval: 5ms
15-
burst: 1
12+
burst: 20
1613
checkin_limit:
1714
interval: 5ms
1815
burst: 500

internal/pkg/config/defaults/lte40000_limits.yml

+1-4
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ cache_limits:
88
server_limits:
99
action_limit:
1010
interval: 0.5ms
11-
burst: 100
12-
policy_limit:
13-
interval: 2ms
14-
burst: 1
11+
burst: 200
1512
checkin_limit:
1613
interval: 0.5ms
1714
burst: 4000

internal/pkg/config/defaults/lte5000_limits.yml

-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ server_limits:
1010
action_limit:
1111
interval: 5ms
1212
burst: 5
13-
policy_limit:
14-
interval: 5ms
15-
burst: 1
1613
checkin_limit:
1714
interval: 2ms
1815
burst: 1000

internal/pkg/config/defaults/max_limits.yml

+1-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@ cache_limits:
77
server_limits:
88
action_limit:
99
interval: 0.25ms
10-
burst: 100
11-
policy_limit:
12-
interval: 0.25ms
13-
burst: 1
10+
burst: 1000
1411
checkin_limit:
1512
interval: 0.25ms
1613
burst: 8000

0 commit comments

Comments
 (0)