Skip to content

Commit 9f2b65c

Browse files
authored
Make cloudbeat wait for initial reconfiguration from Fleet server. (#196)
The reconfiguration logic makes cloudbeat align with the CIS rules enabled/disabled by the user. Prior to this change, the "default" behavior would kick of and send findings for all CIS rules to the index, before the Fleet server could tell it which findings to send. This is undesirable as it pollutes the index with unwanted findings. * Log time deltas.
1 parent a8dd275 commit 9f2b65c

File tree

4 files changed

+350
-47
lines changed

4 files changed

+350
-47
lines changed

beater/cloudbeat.go

+94-29
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ import (
3737
"github.com/gofrs/uuid"
3838
)
3939

40+
const (
41+
reconfigureWaitTimeout = 5 * time.Minute
42+
)
43+
4044
// cloudbeat configuration.
4145
type cloudbeat struct {
4246
ctx context.Context
@@ -122,12 +126,25 @@ func (bt *cloudbeat) Run(b *beat.Beat) error {
122126
bt.log.Info("cloudbeat is running! Hit CTRL-C to stop it.")
123127

124128
// Configure the beats Manager to start after all the reloadable hooks are initialized
125-
// and shutdown when the function return.
129+
// and shutdown when the function returns.
126130
if err := b.Manager.Start(); err != nil {
127131
return err
128132
}
129133
defer b.Manager.Stop()
130134

135+
// Wait for Fleet-side reconfiguration only if cloudbeat is running in Agent-managed mode.
136+
if b.Manager.Enabled() {
137+
bt.log.Infof("Waiting for initial reconfiguration from Fleet server...")
138+
update, err := bt.reconfigureWait(reconfigureWaitTimeout)
139+
if err != nil {
140+
return err
141+
}
142+
143+
if err := bt.configUpdate(update); err != nil {
144+
return fmt.Errorf("failed to update with initial reconfiguration from Fleet server: %w", err)
145+
}
146+
}
147+
131148
if err := bt.data.Run(bt.ctx); err != nil {
132149
return err
133150
}
@@ -154,49 +171,97 @@ func (bt *cloudbeat) Run(b *beat.Beat) error {
154171
return nil
155172

156173
case update := <-bt.configUpdates:
157-
if err := bt.config.Update(update); err != nil {
158-
bt.log.Errorf("Could not update cloudbeat config: %v", err)
159-
break
174+
if err := bt.configUpdate(update); err != nil {
175+
bt.log.Errorf("Failed to update cloudbeat config: %v", err)
160176
}
161177

162-
policies, err := csppolicies.CISKubernetes()
163-
if err != nil {
164-
bt.log.Errorf("Could not load CIS Kubernetes policies: %v", err)
165-
break
166-
}
178+
case fetchedResources := <-output:
179+
cycleId, _ := uuid.NewV4()
180+
cycleStart := time.Now()
181+
bt.log.Infof("Eval cycle %v has started", cycleId)
182+
183+
cycleMetadata := transformer.CycleMetadata{CycleId: cycleId}
184+
// TODO: send events through a channel and publish them by a configured threshold & time
185+
events := bt.transformer.ProcessAggregatedResources(fetchedResources, cycleMetadata)
186+
187+
bt.log.Infof("Publishing %d events to index", len(events))
188+
bt.client.PublishAll(events)
189+
190+
bt.log.Infof("Eval cycle %v published %d events after %s", cycleId, len(events), time.Since(cycleStart))
191+
}
192+
}
193+
}
194+
195+
// reconfigureWait will wait for and consume incoming reconfuration from the Fleet server, and keep
196+
// discarding them until the incoming config contains the necessary information to start cloudbeat
197+
// properly, thereafter returning the valid config.
198+
func (bt *cloudbeat) reconfigureWait(timeout time.Duration) (*common.Config, error) {
199+
start := time.Now()
200+
timer := time.After(timeout)
201+
202+
for {
203+
select {
204+
case <-bt.ctx.Done():
205+
return nil, fmt.Errorf("cancelled via context")
167206

168-
if len(bt.config.Streams) == 0 {
169-
bt.log.Infof("Did not receive any input stream, skipping.")
170-
break
207+
case <-timer:
208+
return nil, fmt.Errorf("timed out waiting for reconfiguration")
209+
210+
case update, ok := <-bt.configUpdates:
211+
if !ok {
212+
return nil, fmt.Errorf("reconfiguration channel is closed")
171213
}
172214

173-
y, err := bt.config.DataYaml()
215+
c, err := config.New(update)
174216
if err != nil {
175-
bt.log.Errorf("Could not marshal to YAML: %v", err)
176-
break
217+
bt.log.Errorf("Could not parse reconfiguration %v, skipping with error: %v", update.FlattenedKeys(), err)
218+
continue
177219
}
178220

179-
if err := csppolicies.HostBundleWithDataYaml("bundle.tar.gz", policies, y); err != nil {
180-
bt.log.Errorf("Could not update bundle with dataYaml: %v", err)
181-
break
221+
if len(c.Streams) == 0 {
222+
bt.log.Infof("No streams received in reconfiguration %v", update.FlattenedKeys())
223+
continue
182224
}
183225

184-
bt.log.Infof("Bundle updated with dataYaml: %s", y)
226+
if c.Streams[0].DataYaml == nil {
227+
bt.log.Infof("data_yaml not present in reconfiguration %v", update.FlattenedKeys())
228+
continue
229+
}
185230

186-
case fetchedResources := <-output:
187-
cycleId, _ := uuid.NewV4()
188-
bt.log.Infof("Cycle %v has started", cycleId)
231+
bt.log.Infof("Received valid reconfiguration after waiting for %s", time.Since(start))
232+
return update, nil
233+
}
234+
}
235+
}
189236

190-
cycleMetadata := transformer.CycleMetadata{CycleId: cycleId}
191-
// TODO: send events through a channel and publish them by a configured threshold & time
192-
events := bt.transformer.ProcessAggregatedResources(fetchedResources, cycleMetadata)
237+
// configUpdate applies incoming reconfiguration from the Fleet server to the cloudbeat config,
238+
// and updates the hosted bundle with the new values.
239+
func (bt *cloudbeat) configUpdate(update *common.Config) error {
240+
if err := bt.config.Update(bt.log, update); err != nil {
241+
return err
242+
}
193243

194-
bt.log.Infof("Publishing %d events to index", len(events))
195-
bt.client.PublishAll(events)
244+
policies, err := csppolicies.CISKubernetes()
245+
if err != nil {
246+
return fmt.Errorf("could not load CIS Kubernetes policies: %w", err)
247+
}
196248

197-
bt.log.Infof("Cycle %v has ended", cycleId)
198-
}
249+
if len(bt.config.Streams) == 0 {
250+
bt.log.Infof("Did not receive any input stream from incoming config, skipping.")
251+
return nil
199252
}
253+
254+
y, err := bt.config.DataYaml()
255+
if err != nil {
256+
return fmt.Errorf("could not marshal to YAML: %w", err)
257+
}
258+
259+
if err := csppolicies.HostBundleWithDataYaml("bundle.tar.gz", policies, y); err != nil {
260+
return fmt.Errorf("could not update bundle with dataYaml: %w", err)
261+
}
262+
263+
bt.log.Infof("Bundle updated with dataYaml: %s", y)
264+
return nil
200265
}
201266

202267
func InitRegistry(log *logp.Logger, c config.Config) (manager.FetchersRegistry, error) {

beater/cloudbeat_test.go

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package beater
19+
20+
import (
21+
"context"
22+
"testing"
23+
"time"
24+
25+
"github.com/elastic/beats/v7/libbeat/common"
26+
"github.com/elastic/beats/v7/libbeat/logp"
27+
"github.com/stretchr/testify/suite"
28+
)
29+
30+
type BeaterTestSuite struct {
31+
suite.Suite
32+
33+
log *logp.Logger
34+
}
35+
36+
func TestBeaterTestSuite(t *testing.T) {
37+
s := new(BeaterTestSuite)
38+
s.log = logp.NewLogger("cloudbeat_beater_test_suite")
39+
40+
if err := logp.TestingSetup(); err != nil {
41+
t.Error(err)
42+
}
43+
44+
suite.Run(t, s)
45+
}
46+
47+
func (s *BeaterTestSuite) SetupTest() {
48+
}
49+
50+
func (s *BeaterTestSuite) TestReconfigureWait() {
51+
ctx, cancel := context.WithCancel(context.Background())
52+
53+
beat := &cloudbeat{
54+
ctx: ctx,
55+
cancel: cancel,
56+
log: s.log,
57+
}
58+
59+
configNoStreams, err := common.NewConfigFrom(`
60+
not_streams:
61+
- data_yaml:
62+
activated_rules:
63+
cis_k8s:
64+
- a
65+
- b
66+
- c
67+
- d
68+
- e
69+
`)
70+
s.NoError(err)
71+
72+
configNoDataYaml, err := common.NewConfigFrom(`
73+
streams:
74+
- not_data_yaml:
75+
activated_rules:
76+
cis_k8s:
77+
- a
78+
- b
79+
- c
80+
- d
81+
- e
82+
`)
83+
s.NoError(err)
84+
85+
configWithDataYaml, err := common.NewConfigFrom(`
86+
streams:
87+
- data_yaml:
88+
activated_rules:
89+
cis_k8s:
90+
- a
91+
- b
92+
- c
93+
- d
94+
- e
95+
`)
96+
s.NoError(err)
97+
98+
type incomingConfigs []struct {
99+
after time.Duration
100+
config *common.Config
101+
}
102+
103+
testcases := []struct {
104+
timeout time.Duration
105+
configs incomingConfigs
106+
expected *common.Config
107+
}{
108+
{
109+
5 * time.Millisecond,
110+
incomingConfigs{},
111+
nil,
112+
},
113+
{
114+
40 * time.Millisecond,
115+
incomingConfigs{
116+
{1 * time.Millisecond, configWithDataYaml},
117+
},
118+
configWithDataYaml,
119+
},
120+
{
121+
40 * time.Millisecond,
122+
incomingConfigs{
123+
{1 * time.Millisecond, configNoStreams},
124+
{1 * time.Millisecond, configNoDataYaml},
125+
{1 * time.Millisecond, configNoStreams},
126+
},
127+
nil,
128+
},
129+
{
130+
40 * time.Millisecond,
131+
incomingConfigs{
132+
{1 * time.Millisecond, configNoStreams},
133+
{1 * time.Millisecond, configNoDataYaml},
134+
{1 * time.Millisecond, configNoStreams},
135+
{1 * time.Millisecond, configWithDataYaml},
136+
},
137+
configWithDataYaml,
138+
},
139+
{
140+
40 * time.Millisecond,
141+
incomingConfigs{
142+
{1 * time.Millisecond, configNoStreams},
143+
{1 * time.Millisecond, configNoDataYaml},
144+
{1 * time.Millisecond, configNoStreams},
145+
{40 * time.Millisecond, configWithDataYaml},
146+
},
147+
nil,
148+
},
149+
{
150+
40 * time.Millisecond,
151+
incomingConfigs{
152+
{1 * time.Millisecond, configNoStreams},
153+
{40 * time.Millisecond, configNoDataYaml},
154+
{1 * time.Millisecond, configNoStreams},
155+
},
156+
nil,
157+
},
158+
{
159+
40 * time.Millisecond,
160+
incomingConfigs{
161+
{1 * time.Millisecond, configNoDataYaml},
162+
{1 * time.Millisecond, configWithDataYaml},
163+
{1 * time.Millisecond, configNoStreams},
164+
},
165+
configWithDataYaml,
166+
},
167+
{
168+
40 * time.Millisecond,
169+
incomingConfigs{
170+
{1 * time.Millisecond, configWithDataYaml},
171+
{1 * time.Millisecond, configNoStreams},
172+
},
173+
configWithDataYaml,
174+
},
175+
}
176+
177+
for _, tcase := range testcases {
178+
cu := make(chan *common.Config)
179+
beat.configUpdates = cu
180+
181+
go func(ic incomingConfigs) {
182+
defer close(cu)
183+
184+
for _, c := range ic {
185+
time.Sleep(c.after)
186+
cu <- c.config
187+
}
188+
}(tcase.configs)
189+
190+
u, err := beat.reconfigureWait(tcase.timeout)
191+
if tcase.expected == nil {
192+
s.Error(err)
193+
} else {
194+
s.Equal(tcase.expected, u)
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)