Skip to content

Commit df5d06a

Browse files
authored
Introduce isolated units for input spec (#4476)
* Introduce non grouped input components * Create fragment * Edit fragment * Additional comments * Fake test grouped * Additional tests * Addition of non grouped fake type * Removing unused * Fix case * Rename flag * Rename label * State change * Attempt without shipper * Trying output without shipper * Revert "Trying output without shipper" This reverts commit 62d3e93. * Revert "Attempt without shipper" This reverts commit fddaad6. * Trying single unit * Testing windows multi shipper inputs on same npipe * Rename and disable isolated on windows * Reduce duplication of code * Enable on windows and refactor grpc servers * Additional comments * Remove windows comments
1 parent 62316a3 commit df5d06a

File tree

12 files changed

+2551
-317
lines changed

12 files changed

+2551
-317
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Introduce isolate units for input spec.
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Introduce a new flag in the spec that will allow to disable grouping of same input type inputs into a single component.
21+
If that flag is being activated the inputs won't be grouped and we will create a separate component for each input that will run units for that particular input.
22+
23+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
24+
component: elastic-agent
25+
26+
# PR URL; optional; the PR number that added the changeset.
27+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
28+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
29+
# Please provide it if you are adding a fragment for a different PR.
30+
pr: https://github.com/elastic/elastic-agent/pull/4476
31+
32+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
33+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
34+
issue: https://github.com/elastic/security-team/issues/8669

internal/pkg/agent/application/coordinator/coordinator_test.go

+112-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@ var (
6161
},
6262
},
6363
}
64+
fakeIsolatedUnitsInputSpec = component.InputSpec{
65+
Name: "fake-isolated-units",
66+
Platforms: []string{fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH)},
67+
Shippers: []string{"fake-shipper"},
68+
Command: &component.CommandSpec{
69+
Timeouts: component.CommandTimeoutSpec{
70+
Checkin: 30 * time.Second,
71+
Restart: 10 * time.Millisecond, // quick restart during tests
72+
Stop: 30 * time.Second,
73+
},
74+
},
75+
IsolateUnits: true,
76+
}
6477
fakeShipperSpec = component.ShipperSpec{
6578
Name: "fake-shipper",
6679
Platforms: []string{fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH)},
@@ -547,6 +560,94 @@ func TestCoordinator_StateSubscribe(t *testing.T) {
547560
require.NoError(t, err)
548561
}
549562

563+
func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
564+
coordCh := make(chan error)
565+
ctx, cancel := context.WithCancel(context.Background())
566+
defer cancel()
567+
568+
coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithComponentInputSpec(fakeIsolatedUnitsInputSpec))
569+
go func() {
570+
err := coord.Run(ctx)
571+
if errors.Is(err, context.Canceled) {
572+
// allowed error
573+
err = nil
574+
}
575+
coordCh <- err
576+
}()
577+
578+
resultChan := make(chan error)
579+
go func() {
580+
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
581+
defer cancel()
582+
583+
subChan := coord.StateSubscribe(ctx, 32)
584+
for {
585+
select {
586+
case <-ctx.Done():
587+
resultChan <- ctx.Err()
588+
return
589+
case state := <-subChan:
590+
t.Logf("%+v", state)
591+
if len(state.Components) == 3 {
592+
compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0")
593+
compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1")
594+
if compState0 != nil && compState1 != nil {
595+
unit0, ok0 := compState0.State.Units[runtime.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-0-unit"}]
596+
unit1, ok1 := compState1.State.Units[runtime.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1-unit"}]
597+
if ok0 && ok1 {
598+
if (unit0.State == client.UnitStateHealthy && unit0.Message == "Healthy From Fake Isolated Units 0 Config") &&
599+
(unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") {
600+
resultChan <- nil
601+
return
602+
}
603+
}
604+
}
605+
}
606+
}
607+
}
608+
}()
609+
610+
// no vars used by the config
611+
varsMgr.Vars(ctx, []*transpiler.Vars{{}})
612+
613+
// set the configuration to run a fake input
614+
cfg, err := config.NewConfigFrom(map[string]interface{}{
615+
"outputs": map[string]interface{}{
616+
"default": map[string]interface{}{
617+
"type": "fake-action-output",
618+
"shipper": map[string]interface{}{
619+
"enabled": true,
620+
},
621+
},
622+
},
623+
"inputs": []interface{}{
624+
map[string]interface{}{
625+
"id": "fake-isolated-units-0",
626+
"type": "fake-isolated-units",
627+
"use_output": "default",
628+
"state": client.UnitStateHealthy,
629+
"message": "Healthy From Fake Isolated Units 0 Config",
630+
},
631+
map[string]interface{}{
632+
"id": "fake-isolated-units-1",
633+
"type": "fake-isolated-units",
634+
"use_output": "default",
635+
"state": client.UnitStateHealthy,
636+
"message": "Healthy From Fake Isolated Units 1 Config",
637+
},
638+
},
639+
})
640+
require.NoError(t, err)
641+
cfgMgr.Config(ctx, cfg)
642+
643+
err = <-resultChan
644+
require.NoError(t, err)
645+
cancel()
646+
647+
err = <-coordCh
648+
require.NoError(t, err)
649+
}
650+
550651
func TestCollectManagerErrorsTimeout(t *testing.T) {
551652
handlerChan, _, _, _, _ := setupManagerShutdownChannels(time.Millisecond)
552653
// Don't send anything to the shutdown channels, causing a timeout
@@ -757,6 +858,7 @@ func TestCoordinator_UpgradeDetails(t *testing.T) {
757858
type createCoordinatorOpts struct {
758859
managed bool
759860
upgradeManager UpgradeManager
861+
compInputSpec component.InputSpec
760862
}
761863

762864
type CoordinatorOpt func(o *createCoordinatorOpts)
@@ -773,13 +875,21 @@ func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt {
773875
}
774876
}
775877

878+
func WithComponentInputSpec(spec component.InputSpec) CoordinatorOpt {
879+
return func(o *createCoordinatorOpts) {
880+
o.compInputSpec = spec
881+
}
882+
}
883+
776884
// createCoordinator creates a coordinator that using a fake config manager and a fake vars manager.
777885
//
778886
// The runtime specifications is set up to use both the fake component and fake shipper.
779887
func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) {
780888
t.Helper()
781889

782-
o := &createCoordinatorOpts{}
890+
o := &createCoordinatorOpts{
891+
compInputSpec: fakeInputSpec,
892+
}
783893
for _, opt := range opts {
784894
opt(o)
785895
}
@@ -793,7 +903,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
793903
InputType: "fake",
794904
BinaryName: "",
795905
BinaryPath: testBinary(t, "component"),
796-
Spec: fakeInputSpec,
906+
Spec: o.compInputSpec,
797907
}
798908
shipperSpec := component.ShipperRuntimeSpec{
799909
ShipperType: "fake-shipper",

pkg/component/component.go

+114-44
Original file line numberDiff line numberDiff line change
@@ -351,18 +351,13 @@ func unitForShipperOutput(output outputI, id string, shipperType string) Unit {
351351
}
352352
}
353353

354-
// Collect all inputs of the given type going to the given output and return
355-
// the resulting Component. The returned Component may have no units if no
356-
// active inputs were found.
357-
func (r *RuntimeSpecs) componentForInputType(
358-
inputType string,
354+
// createShipperReference creates a ShipperReference for the given output and input spec.
355+
func (r *RuntimeSpecs) createShipperReference(
359356
output outputI,
360-
featureFlags *features.Flags,
361-
componentConfig *ComponentConfig,
362-
) Component {
363-
componentID := fmt.Sprintf("%s-%s", inputType, output.name)
364-
365-
inputSpec, componentErr := r.GetInput(inputType)
357+
inputSpec InputRuntimeSpec,
358+
componentID string,
359+
componentErr error,
360+
) (*ShipperReference, error) {
366361
var shipperRef *ShipperReference
367362
if componentErr == nil {
368363
if output.shipperEnabled {
@@ -393,58 +388,133 @@ func (r *RuntimeSpecs) componentForInputType(
393388
}
394389
}
395390
}
391+
396392
// If there's an error at this point we still proceed with assembling the
397393
// policy into a component, we just attach the error to its Err field to
398394
// indicate that it can't be run.
395+
return shipperRef, componentErr
396+
}
399397

400-
var units []Unit
401-
for _, input := range output.inputs[inputType] {
402-
if input.enabled {
403-
unitID := fmt.Sprintf("%s-%s", componentID, input.id)
404-
units = append(units, unitForInput(input, unitID))
398+
// populateOutputUnitsForInput adds the output units to the given slice.
399+
func populateOutputUnitsForInput(
400+
units *[]Unit,
401+
output outputI,
402+
componentID string,
403+
componentErr error,
404+
shipperRef *ShipperReference,
405+
) {
406+
if shipperRef != nil {
407+
// Shipper units are skipped if componentErr isn't nil, because in that
408+
// case we generally don't have a valid shipper type to base it on.
409+
if componentErr == nil {
410+
*units = append(*units, unitForShipperOutput(output, componentID, shipperRef.ShipperType))
405411
}
412+
} else {
413+
*units = append(*units, unitForOutput(output, componentID))
406414
}
407-
if len(units) > 0 {
408-
if shipperRef != nil {
409-
// Shipper units are skipped if componentErr isn't nil, because in that
410-
// case we generally don't have a valid shipper type to base it on.
411-
if componentErr == nil {
412-
units = append(units,
413-
unitForShipperOutput(output, componentID, shipperRef.ShipperType))
415+
}
416+
417+
// Collect all inputs of the given type going to the given output and return
418+
// the resulting Components. The returned Components may have no units if no
419+
// active inputs were found.
420+
func (r *RuntimeSpecs) componentsForInputType(
421+
inputType string,
422+
output outputI,
423+
featureFlags *features.Flags,
424+
componentConfig *ComponentConfig,
425+
) []Component {
426+
var components []Component
427+
inputSpec, componentErr := r.GetInput(inputType)
428+
429+
// Treat as non isolated units component on error of reading the input spec
430+
if componentErr != nil || !inputSpec.Spec.IsolateUnits {
431+
componentID := fmt.Sprintf("%s-%s", inputType, output.name)
432+
shipperRef, componentErr := r.createShipperReference(output, inputSpec, componentID, componentErr)
433+
434+
var units []Unit
435+
for _, input := range output.inputs[inputType] {
436+
if input.enabled {
437+
unitID := fmt.Sprintf("%s-%s", componentID, input.id)
438+
units = append(units, unitForInput(input, unitID))
414439
}
415-
} else {
416-
units = append(units, unitForOutput(output, componentID))
440+
}
441+
442+
if len(units) > 0 {
443+
// Populate the output units for this component
444+
populateOutputUnitsForInput(
445+
&units,
446+
output,
447+
componentID,
448+
componentErr,
449+
shipperRef,
450+
)
451+
}
452+
453+
components = append(components, Component{
454+
ID: componentID,
455+
Err: componentErr,
456+
InputSpec: &inputSpec,
457+
InputType: inputType,
458+
OutputType: output.outputType,
459+
Units: units,
460+
Features: featureFlags.AsProto(),
461+
Component: componentConfig.AsProto(),
462+
ShipperRef: shipperRef,
463+
})
464+
} else {
465+
for _, input := range output.inputs[inputType] {
466+
// Units are being mapped to components, so we need a unique ID for each.
467+
componentID := fmt.Sprintf("%s-%s-%s", inputType, output.name, input.id)
468+
shipperRef, componentErr := r.createShipperReference(output, inputSpec, componentID, componentErr)
469+
470+
var units []Unit
471+
if input.enabled {
472+
unitID := fmt.Sprintf("%s-unit", componentID)
473+
units = append(units, unitForInput(input, unitID))
474+
// Populate the output units for this component
475+
populateOutputUnitsForInput(
476+
&units,
477+
output,
478+
componentID,
479+
componentErr,
480+
shipperRef,
481+
)
482+
}
483+
484+
components = append(components, Component{
485+
ID: componentID,
486+
Err: componentErr,
487+
InputSpec: &inputSpec,
488+
InputType: inputType,
489+
OutputType: output.outputType,
490+
Units: units,
491+
Features: featureFlags.AsProto(),
492+
Component: componentConfig.AsProto(),
493+
ShipperRef: shipperRef,
494+
})
417495
}
418496
}
419-
return Component{
420-
ID: componentID,
421-
Err: componentErr,
422-
InputSpec: &inputSpec,
423-
InputType: inputType,
424-
OutputType: output.outputType,
425-
Units: units,
426-
Features: featureFlags.AsProto(),
427-
Component: componentConfig.AsProto(),
428-
ShipperRef: shipperRef,
429-
}
497+
return components
430498
}
431499

432500
func (r *RuntimeSpecs) componentsForOutput(output outputI, featureFlags *features.Flags, componentConfig *ComponentConfig) []Component {
433501
var components []Component
434502
shipperTypes := make(map[string]bool)
435503
for inputType := range output.inputs {
436504
// No need for error checking at this stage -- we are guaranteed
437-
// to get a Component back. If there is an error that prevents it
505+
// to get a Component/s back. If there is an error that prevents it/them
438506
// from running then it will be in the Component's Err field and
439-
// we will report it later. The only thing we skip is a component
507+
// we will report it later. The only thing we skip is a component/s
440508
// with no units.
441-
component := r.componentForInputType(inputType, output, featureFlags, componentConfig)
442-
if len(component.Units) > 0 {
443-
if component.ShipperRef != nil {
444-
// If this component uses a shipper, mark that shipper type as active
445-
shipperTypes[component.ShipperRef.ShipperType] = true
509+
typeComponents := r.componentsForInputType(inputType, output, featureFlags, componentConfig)
510+
for _, component := range typeComponents {
511+
if len(component.Units) > 0 {
512+
if component.ShipperRef != nil {
513+
// If this component uses a shipper, mark that shipper type as active
514+
shipperTypes[component.ShipperRef.ShipperType] = true
515+
}
516+
components = append(components, component)
446517
}
447-
components = append(components, component)
448518
}
449519
}
450520

0 commit comments

Comments
 (0)