Skip to content

Commit 8dd3652

Browse files
Revert "Replace policy and action limiters with a checkin limiter (#3255)" (#3274)
This reverts commit c67e65d.
1 parent c27a66b commit 8dd3652

19 files changed

+321
-99
lines changed

changelog/fragments/1707246268-Move-action-limiter-to-checkin-response-writer.yaml changelog/fragments/1702940211-Replace-policy-throttle-with-limiter.yaml

+7-8
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,16 @@
1111
kind: enhancement
1212

1313
# Change summary; a 80ish characters long description of the change.
14-
summary: Move action limiter to checkin response writer
14+
summary: Replace policy throttle with limiter
1515

1616
# Long description; in case the summary is not enough to describe the change
1717
# this field accommodate a description without length limits.
1818
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
1919
description: |
20-
Remove the policy_limiter that was added by 3008 and not released.
21-
Move the action limiter to the checkin handler's write response method before a gzip.Writer is retrieved from the pool or allocated.
22-
This allows fleet-server to control gzip.Writer access for responses that contain either actions from the action dispatcher, or a
23-
POLICY_CHANGE action from the policy monitor.
24-
Removing the throttle/limiter from the policy monitor also greatly improves performance of policy dispatches.
20+
Replace the existing policy_throttle setting with a policy_limit
21+
setting that controls the same thing, how fast policies are passed to
22+
subscriptions to control mass responses. If no policy_limit settings
23+
are specified the policy_throttle is used with burst 1.
2524
2625
# Affected component; a word indicating the component this changeset affects.
2726
component:
@@ -30,8 +29,8 @@ component:
3029
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
3130
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
3231
# Please provide it if you are adding a fragment for a different PR.
33-
pr: 3255
32+
#pr: https://github.com/owner/repo/1234
3433

3534
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
3635
# If not present is automatically filled by the tooling with the issue linked to the PR number.
37-
issue: 3254
36+
issue: 3008

fleet-server.reference.yml

+13-3
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ fleet:
145145
# # If specified a set of other limits is automatically loaded.
146146
# max_agents: 0
147147
# # policy_throttle is the duration that the fleet-server will wait in between attempts to dispatch policy updates to polling agents
148-
# # deprecated: replaced by action_limit settings
149-
# policy_throttle: 5ms
148+
# # deprecated: replaced by policy_limit settings
149+
# policy_throttle: 5ms # 1ms min is forced
150150
# # max_header_byte_size is the request header size limit
151151
# max_header_byte_size: 8192 # 8Kib
152152
# # max_connections is the maximum number of connnections per API endpoint
153153
# max_connections: 0
154154
#
155-
# # action_limit is a limiter for checkin responses that contain actions
155+
# # action_limit is a limiter for the action dispatcher, it is added to control how fast the checkin endpoint writes responses when an action effecting multiple agents is detected.
156156
# # This is done in order to be able to reuse gzip writers if gzip is requested as allocating new writers is expensive (around 1.2MB for a new allocation).
157157
# # If the interval is too high it may negativly effect assumtions around server write timeouts and poll poll durations, if used we expect the value to be around 5ms.
158158
# # An interval value of 0 disables the limiter by using an infinite rate limit (default behavior).
@@ -161,6 +161,16 @@ fleet:
161161
# interval: 0
162162
# burst: 5
163163
#
164+
# # policy_limit is a limiter used to control how fast fleet-server sends POLICY_CHANGE actions to agents.
165+
# # As with the action_limit settings this is done to avoid allocating a lot of gzip writers.
166+
# # The default settings are to have the interval of 5ms with a burst of 1.
167+
# # A min burst value of 1 is always enforced.
168+
# # If no interval is specified, the policy_throttle may be used as the interval instead.
169+
# # if both interval and policy_throttle are 0, a value of 1ns is used instead.
170+
# policy_limit:
171+
# interval: 5ms
172+
# burst: 1
173+
#
164174
# # endpoint specific limits below
165175
# checkin_limit:
166176
# interval: 1ms

internal/pkg/action/dispatcher.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
1818

1919
"github.com/rs/zerolog"
20+
"golang.org/x/time/rate"
2021
)
2122

2223
// Sub is an action subscription that will give a single agent all of it's actions.
@@ -33,17 +34,23 @@ func (s Sub) Ch() chan []model.Action {
3334

3435
// Dispatcher tracks agent subscriptions and emits actions to the subscriptions.
3536
type Dispatcher struct {
36-
am monitor.SimpleMonitor
37+
am monitor.SimpleMonitor
38+
limit *rate.Limiter
3739

3840
mx sync.RWMutex
3941
subs map[string]Sub
4042
}
4143

4244
// NewDispatcher creates a Dispatcher using the provided monitor.
43-
func NewDispatcher(am monitor.SimpleMonitor) *Dispatcher {
45+
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher {
46+
r := rate.Inf
47+
if throttle > 0 {
48+
r = rate.Every(throttle)
49+
}
4450
return &Dispatcher{
45-
am: am,
46-
subs: make(map[string]Sub),
51+
am: am,
52+
limit: rate.NewLimiter(r, i),
53+
subs: make(map[string]Sub),
4754
}
4855
}
4956

@@ -122,6 +129,10 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
122129
}
123130

124131
for agentID, actions := range agentActions {
132+
if err := d.limit.Wait(ctx); err != nil {
133+
zerolog.Ctx(ctx).Error().Err(err).Msg("action dispatcher rate limit error")
134+
return
135+
}
125136
d.dispatch(ctx, agentID, actions)
126137
}
127138
}

internal/pkg/action/dispatcher_test.go

+43-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/mock"
18+
"golang.org/x/time/rate"
1819
)
1920

2021
type mockMonitor struct {
@@ -38,7 +39,7 @@ func (m *mockMonitor) GetCheckpoint() sqn.SeqNo {
3839

3940
func TestNewDispatcher(t *testing.T) {
4041
m := &mockMonitor{}
41-
d := NewDispatcher(m)
42+
d := NewDispatcher(m, 0, 0)
4243

4344
assert.NotNil(t, d.am)
4445
assert.NotNil(t, d.subs)
@@ -127,6 +128,41 @@ func Test_Dispatcher_Run(t *testing.T) {
127128
Type: "upgrade",
128129
}},
129130
},
131+
}, {
132+
name: "three agent action with limiter",
133+
throttle: 1 * time.Second,
134+
getMock: func() *mockMonitor {
135+
m := &mockMonitor{}
136+
ch := make(chan []es.HitT)
137+
go func() {
138+
ch <- []es.HitT{es.HitT{
139+
Source: json.RawMessage(`{"action_id":"test-action","agents":["agent1","agent2","agent3"],"data":{"key":"value"},"type":"upgrade"}`),
140+
}}
141+
}()
142+
var rch <-chan []es.HitT = ch
143+
m.On("Output").Return(rch)
144+
return m
145+
},
146+
expect: map[string][]model.Action{
147+
"agent1": []model.Action{model.Action{
148+
ActionID: "test-action",
149+
Agents: nil,
150+
Data: json.RawMessage(`{"key":"value"}`),
151+
Type: "upgrade",
152+
}},
153+
"agent2": []model.Action{model.Action{
154+
ActionID: "test-action",
155+
Agents: nil,
156+
Data: json.RawMessage(`{"key":"value"}`),
157+
Type: "upgrade",
158+
}},
159+
"agent3": []model.Action{model.Action{
160+
ActionID: "test-action",
161+
Agents: nil,
162+
Data: json.RawMessage(`{"key":"value"}`),
163+
Type: "upgrade",
164+
}},
165+
},
130166
}, {
131167
name: "one agent action with scheduling",
132168
getMock: func() *mockMonitor {
@@ -199,8 +235,13 @@ func Test_Dispatcher_Run(t *testing.T) {
199235
for _, tt := range tests {
200236
t.Run(tt.name, func(t *testing.T) {
201237
m := tt.getMock()
238+
throttle := rate.Inf
239+
if tt.throttle > 0 {
240+
throttle = rate.Every(tt.throttle)
241+
}
202242
d := &Dispatcher{
203-
am: m,
243+
am: m,
244+
limit: rate.NewLimiter(throttle, 1),
204245
subs: map[string]Sub{
205246
"agent1": Sub{
206247
agentID: "agent1",

internal/pkg/api/handleCheckin.go

+1-21
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/hashicorp/go-version"
3535
"github.com/miolini/datacounter"
3636
"github.com/rs/zerolog"
37-
"golang.org/x/time/rate"
3837

3938
"go.elastic.co/apm/module/apmhttp/v2"
4039
"go.elastic.co/apm/v2"
@@ -77,7 +76,6 @@ type CheckinT struct {
7776
// gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests.
7877
// gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected).
7978
// effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute.
80-
limit *rate.Limiter
8179
gwPool sync.Pool
8280
bulker bulk.Bulk
8381
}
@@ -93,13 +91,6 @@ func NewCheckinT(
9391
tr *action.TokenResolver,
9492
bulker bulk.Bulk,
9593
) *CheckinT {
96-
rt := rate.Every(cfg.Limits.ActionLimit.Interval)
97-
if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle != 0 {
98-
rt = rate.Every(cfg.Limits.PolicyThrottle)
99-
} else if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle == 0 {
100-
rt = rate.Inf
101-
}
102-
zerolog.Ctx(context.TODO()).Debug().Any("event_rate", rt).Int("burst", cfg.Limits.ActionLimit.Burst).Msg("checkin response gzip limiter")
10394
ct := &CheckinT{
10495
verCon: verCon,
10596
cfg: cfg,
@@ -109,7 +100,6 @@ func NewCheckinT(
109100
gcp: gcp,
110101
ad: ad,
111102
tr: tr,
112-
limit: rate.NewLimiter(rt, cfg.Limits.ActionLimit.Burst),
113103
gwPool: sync.Pool{
114104
New: func() any {
115105
zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel)
@@ -567,17 +557,7 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r
567557
compressionLevel := ct.cfg.CompressionLevel
568558
compressThreshold := ct.cfg.CompressionThresh
569559

570-
if (len(fromPtr(resp.Actions)) > 0 || len(payload) > compressThreshold) && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
571-
// We compress the response if it would be over the threshold (default 1kb) or we are sending an action.
572-
// gzip.Writer allocations are expensive, and we can easily hit an OOM issue if we try to write a large number of responses at once.
573-
// This is likely to happen on bulk upgrades, or policy changes.
574-
// In order to avoid a massive memory allocation we do two things:
575-
// 1. re-use gzip.Writer instances across responses (through a pool)
576-
// 2. rate-limit access to the writer pool
577-
if err := ct.limit.Wait(ctx); err != nil {
578-
return fmt.Errorf("checkin response limiter error: %w", err)
579-
}
580-
560+
if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
581561
wrCounter := datacounter.NewWriterCounter(w)
582562

583563
zipper, _ := ct.gwPool.Get().(*gzip.Writer)

internal/pkg/api/handleCheckin_test.go

+2-42
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,12 @@ func TestResolveSeqNo(t *testing.T) {
267267
ctx, cancel := context.WithCancel(context.Background())
268268
defer cancel()
269269
verCon := mustBuildConstraints("8.0.0")
270-
cfg := &config.Server{Limits: config.ServerLimits{ActionLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}}}
270+
cfg := &config.Server{}
271271
c, _ := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
272272
bc := checkin.NewBulk(nil)
273273
bulker := ftesting.NewMockBulk()
274274
pim := mockmonitor.NewMockMonitor()
275-
pm := policy.NewMonitor(bulker, pim)
275+
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
276276
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)
277277

278278
resp, _ := ct.resolveSeqNo(ctx, logger, tc.req, tc.agent)
@@ -623,46 +623,6 @@ func Test_CheckinT_writeResponse(t *testing.T) {
623623
}
624624
}
625625

626-
func Test_CheckinT_writeResponse_limiter(t *testing.T) {
627-
verCon := mustBuildConstraints("8.0.0")
628-
cfg := &config.Server{
629-
CompressionLevel: flate.BestSpeed,
630-
CompressionThresh: 1,
631-
Limits: config.ServerLimits{
632-
ActionLimit: config.Limit{
633-
Interval: 50 * time.Millisecond,
634-
Burst: 1,
635-
},
636-
},
637-
}
638-
639-
ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk())
640-
wr1 := httptest.NewRecorder()
641-
wr2 := httptest.NewRecorder()
642-
req := &http.Request{
643-
Header: http.Header{
644-
"Accept-Encoding": []string{"gzip"},
645-
},
646-
}
647-
cresp := CheckinResponse{
648-
Action: "checkin",
649-
}
650-
651-
ts := time.Now()
652-
err1 := ct.writeResponse(testlog.SetLogger(t), wr1, req, &model.Agent{}, cresp)
653-
err2 := ct.writeResponse(testlog.SetLogger(t), wr2, req, &model.Agent{}, cresp)
654-
dur := time.Since(ts)
655-
require.NoError(t, err1)
656-
require.NoError(t, err2)
657-
658-
res1 := wr1.Result()
659-
assert.Equal(t, "gzip", res1.Header.Get("Content-Encoding"))
660-
res2 := wr2.Result()
661-
assert.Equal(t, "gzip", res2.Header.Get("Content-Encoding"))
662-
663-
assert.GreaterOrEqual(t, dur, 50*time.Millisecond)
664-
}
665-
666626
func Benchmark_CheckinT_writeResponse(b *testing.B) {
667627
verCon := mustBuildConstraints("8.0.0")
668628
cfg := &config.Server{

internal/pkg/api/server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func Test_server_Run(t *testing.T) {
5151
require.NoError(t, err)
5252
bulker := ftesting.NewMockBulk()
5353
pim := mock.NewMockMonitor()
54-
pm := policy.NewMonitor(bulker, pim)
54+
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
5555
bc := checkin.NewBulk(nil)
5656
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)
5757
et, err := NewEnrollerT(verCon, cfg, nil, c)

internal/pkg/config/defaults/lte10000_limits.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ server_limits:
99
max_connections: 22000
1010
action_limit:
1111
interval: 1ms
12-
burst: 100
12+
burst: 10
13+
policy_interval:
14+
interval: 5ms
15+
burst: 1
1316
checkin_limit:
1417
interval: 1ms
1518
burst: 2000

internal/pkg/config/defaults/lte20000_limits.yml

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ server_limits:
1010
action_limit:
1111
interval: 1ms
1212
burst: 100
13+
policy_limit:
14+
interval: 5ms
15+
burst: 1
1316
checkin_limit:
1417
interval: 0.5ms
1518
burst: 4000

internal/pkg/config/defaults/lte2500_limite.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ server_limits:
99
max_connections: 7000
1010
action_limit:
1111
interval: 5ms
12-
burst: 20
12+
burst: 1
13+
policy_limit:
14+
interval: 5ms
15+
burst: 1
1316
checkin_limit:
1417
interval: 5ms
1518
burst: 500

internal/pkg/config/defaults/lte40000_limits.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ cache_limits:
88
server_limits:
99
action_limit:
1010
interval: 0.5ms
11-
burst: 200
11+
burst: 100
12+
policy_limit:
13+
interval: 2ms
14+
burst: 1
1215
checkin_limit:
1316
interval: 0.5ms
1417
burst: 4000

internal/pkg/config/defaults/lte5000_limits.yml

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ server_limits:
1010
action_limit:
1111
interval: 5ms
1212
burst: 5
13+
policy_limit:
14+
interval: 5ms
15+
burst: 1
1316
checkin_limit:
1417
interval: 2ms
1518
burst: 1000

internal/pkg/config/defaults/max_limits.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ cache_limits:
77
server_limits:
88
action_limit:
99
interval: 0.25ms
10-
burst: 1000
10+
burst: 100
11+
policy_limit:
12+
interval: 0.25ms
13+
burst: 1
1114
checkin_limit:
1215
interval: 0.25ms
1316
burst: 8000

0 commit comments

Comments
 (0)