Skip to content

Commit d217855

Browse files
authored
Consume dataYaml in config, add config testing (#61)
1 parent a634129 commit d217855

File tree

4 files changed

+164
-15
lines changed

4 files changed

+164
-15
lines changed

beater/cloudbeat.go

+23-15
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ type cloudbeat struct {
4040
ctx context.Context
4141
cancel context.CancelFunc
4242

43-
config config.Config
44-
client beat.Client
45-
data *manager.Data
46-
evaluator evaluator.Evaluator
47-
transformer transformer.Transformer
48-
log *logp.Logger
43+
config config.Config
44+
configUpdates <-chan *common.Config
45+
client beat.Client
46+
data *manager.Data
47+
evaluator evaluator.Evaluator
48+
transformer transformer.Transformer
49+
log *logp.Logger
4950
}
5051

5152
// New creates an instance of cloudbeat.
@@ -54,8 +55,8 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
5455

5556
ctx, cancel := context.WithCancel(context.Background())
5657

57-
c := config.DefaultConfig
58-
if err := cfg.Unpack(&c); err != nil {
58+
c, err := config.New(cfg)
59+
if err != nil {
5960
cancel()
6061
return nil, fmt.Errorf("error reading config file: %w", err)
6162
}
@@ -102,13 +103,14 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
102103
t := transformer.NewTransformer(ctx, eval, commonData, resultsIndex)
103104

104105
bt := &cloudbeat{
105-
ctx: ctx,
106-
cancel: cancel,
107-
config: c,
108-
evaluator: eval,
109-
data: data,
110-
transformer: t,
111-
log: log,
106+
ctx: ctx,
107+
cancel: cancel,
108+
config: c,
109+
configUpdates: config.Updates(ctx),
110+
evaluator: eval,
111+
data: data,
112+
transformer: t,
113+
log: log,
112114
}
113115
return bt, nil
114116
}
@@ -148,6 +150,12 @@ func (bt *cloudbeat) Run(b *beat.Beat) error {
148150
select {
149151
case <-bt.ctx.Done():
150152
return nil
153+
154+
case update := <-bt.configUpdates:
155+
if err := bt.config.Update(update); err != nil {
156+
logp.L().Errorf("could not update cloudbeat config: %v", err)
157+
}
158+
151159
case fetchedResources := <-output:
152160
cycleId, _ := uuid.NewV4()
153161
bt.log.Debugf("Cycle % has started", cycleId)

config/config.go

+28
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,40 @@ type Config struct {
3636
Period time.Duration `config:"period"`
3737
Processors processors.PluginConfig `config:"processors"`
3838
Fetchers []*common.Config `config:"fetchers"`
39+
40+
Streams []Stream `config:"streams"`
41+
}
42+
43+
type Stream struct {
44+
DataYaml struct {
45+
ActivatedRules struct {
46+
CISK8S []string `config:"cis_k8s"`
47+
} `config:"activated_rules"`
48+
} `config:"data_yaml"`
3949
}
4050

4151
var DefaultConfig = Config{
4252
Period: 10 * time.Second,
4353
}
4454

55+
func New(cfg *common.Config) (Config, error) {
56+
c := DefaultConfig
57+
58+
if err := cfg.Unpack(&c); err != nil {
59+
return c, err
60+
}
61+
62+
return c, nil
63+
}
64+
65+
func (c *Config) Update(cfg *common.Config) error {
66+
if err := cfg.Unpack(&c); err != nil {
67+
return err
68+
}
69+
70+
return nil
71+
}
72+
4573
// Datastream function to generate the datastream value
4674
func Datastream(namespace string, indexPrefix string) string {
4775
if namespace == "" {

config/config_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,52 @@
1919
// +build !integration
2020

2121
package config
22+
23+
import (
24+
"testing"
25+
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
"github.com/stretchr/testify/suite"
28+
)
29+
30+
type ConfigTestSuite struct {
31+
suite.Suite
32+
}
33+
34+
func TestConfigTestSuite(t *testing.T) {
35+
suite.Run(t, new(ConfigTestSuite))
36+
}
37+
38+
func (s *ConfigTestSuite) SetupTest() {
39+
}
40+
41+
func (s *ConfigTestSuite) TestNew() {
42+
var tests = []struct {
43+
config string
44+
expectedPatterns []string
45+
}{
46+
{
47+
`
48+
streams:
49+
- data_yaml:
50+
activated_rules:
51+
cis_k8s:
52+
- a
53+
- b
54+
- c
55+
- d
56+
`,
57+
[]string{"a", "b", "c", "d"},
58+
},
59+
}
60+
61+
for _, test := range tests {
62+
cfg, err := common.NewConfigFrom(test.config)
63+
s.NoError(err)
64+
65+
c, err := New(cfg)
66+
s.NoError(err)
67+
68+
s.Equal(test.expectedPatterns, c.Streams[0].DataYaml.ActivatedRules.CISK8S)
69+
}
70+
}

config/updates.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Config is put into a different package to prevent cyclic imports in case
19+
// it is needed in several locations
20+
21+
package config
22+
23+
import (
24+
"context"
25+
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
"github.com/elastic/beats/v7/libbeat/common/reload"
28+
"github.com/elastic/beats/v7/libbeat/logp"
29+
)
30+
31+
type reloader struct {
32+
ctx context.Context
33+
ch chan<- *common.Config
34+
}
35+
36+
func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
37+
if len(configs) == 0 {
38+
return nil
39+
}
40+
41+
logp.L().Infof("Received %v new configs for reload.", len(configs))
42+
43+
select {
44+
case <-r.ctx.Done():
45+
default:
46+
// TODO(yashtewari): Based on limitations elsewhere, such as the CSP integration,
47+
// don't think we should receive more than one Config here. Need to confirm and handle.
48+
r.ch <- configs[len(configs)-1].Config
49+
}
50+
51+
return nil
52+
}
53+
54+
func Updates(ctx context.Context) <-chan *common.Config {
55+
ch := make(chan *common.Config)
56+
r := &reloader{
57+
ctx: ctx,
58+
ch: ch,
59+
}
60+
61+
reload.Register.MustRegisterList("inputs", r)
62+
63+
return ch
64+
}

0 commit comments

Comments
 (0)