Skip to content

Commit 8aae0d6

Browse files
authored
Remove inputs when all streams are removed (#1869)
* Remove inputs when all streams are removed. * Add changelog. * Fix tests. * Remove processors from test.
1 parent a836d4f commit 8aae0d6

File tree

3 files changed

+110
-0
lines changed

3 files changed

+110
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Remove inputs when all streams are removed
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
#description:
19+
20+
# Affected component; a word indicating the component this changeset affects.
21+
component:
22+
23+
# PR number; optional; the PR number that added the changeset.
24+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
25+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
26+
# Please provide it if you are adding a fragment for a different PR.
27+
pr: 1869
28+
29+
# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
30+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
31+
issue: 1868

internal/pkg/agent/transpiler/utils.go

+41
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ import (
99
"fmt"
1010
)
1111

12+
const (
13+
// streamsKey is the name of the dictionary key for streams that an input can have. In the case that
14+
// an input defines a set of streams and after conditions are applied all the streams are removed then
15+
// the entire input is removed.
16+
streamsKey = "streams"
17+
)
18+
1219
// RenderInputs renders dynamic inputs section
1320
func RenderInputs(inputs Node, varsArray []*Vars) (Node, error) {
1421
l, ok := inputs.Value().(*List)
@@ -23,6 +30,10 @@ func RenderInputs(inputs Node, varsArray []*Vars) (Node, error) {
2330
if !ok {
2431
continue
2532
}
33+
hadStreams := false
34+
if streams := getStreams(dict); streams != nil {
35+
hadStreams = true
36+
}
2637
n, err := dict.Apply(vars)
2738
if errors.Is(err, ErrNoMatch) {
2839
// has a variable that didn't exist, so we ignore it
@@ -37,6 +48,13 @@ func RenderInputs(inputs Node, varsArray []*Vars) (Node, error) {
3748
continue
3849
}
3950
dict = n.(*Dict)
51+
if hadStreams {
52+
streams := getStreams(dict)
53+
if streams == nil {
54+
// conditions removed all streams (input is removed)
55+
continue
56+
}
57+
}
4058
hash := string(dict.Hash())
4159
_, exists := nodesMap[hash]
4260
if !exists {
@@ -85,6 +103,29 @@ type varIDMap struct {
85103
d *Dict
86104
}
87105

106+
func getStreams(dict *Dict) *List {
107+
node, ok := dict.Find(streamsKey)
108+
if !ok {
109+
return nil
110+
}
111+
key, ok := node.(*Key)
112+
if !ok {
113+
return nil
114+
}
115+
if key.value == nil {
116+
return nil
117+
}
118+
list, ok := key.value.(*List)
119+
if !ok {
120+
return nil
121+
}
122+
if len(list.value) == 0 {
123+
// didn't have any streams defined in the list (so no removal should be done)
124+
return nil
125+
}
126+
return list
127+
}
128+
88129
func promoteProcessors(dict *Dict) *Dict {
89130
p := dict.Processors()
90131
if p == nil {

internal/pkg/agent/transpiler/utils_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,44 @@ func TestRenderInputs(t *testing.T) {
731731
}),
732732
},
733733
},
734+
"input removal with stream conditions": {
735+
input: NewKey("inputs", NewList([]Node{
736+
NewDict([]Node{
737+
NewKey("type", NewStrVal("logfile")),
738+
NewKey("streams", NewList([]Node{
739+
NewDict([]Node{
740+
NewKey("paths", NewList([]Node{
741+
NewStrVal("/var/log/${var1.name}.log"),
742+
})),
743+
NewKey("condition", NewStrVal("${var1.name} != 'value1'")),
744+
}),
745+
NewDict([]Node{
746+
NewKey("paths", NewList([]Node{
747+
NewStrVal("/var/log/${var1.name}.log"),
748+
})),
749+
NewKey("condition", NewStrVal("${var1.name} != 'value1'")),
750+
}),
751+
})),
752+
}),
753+
})),
754+
expected: NewList([]Node{}),
755+
varsArray: []*Vars{
756+
mustMakeVarsP("value1", map[string]interface{}{
757+
"var1": map[string]interface{}{
758+
"name": "value1",
759+
},
760+
},
761+
"var1",
762+
nil),
763+
mustMakeVarsP("value2", map[string]interface{}{
764+
"var1": map[string]interface{}{
765+
"name": "value1",
766+
},
767+
},
768+
"var1",
769+
nil),
770+
},
771+
},
734772
}
735773

736774
for name, test := range testcases {

0 commit comments

Comments
 (0)