Skip to content

Commit fefe64f

Browse files
authored
Fix issue where its possible for a component to receive a unit without a config (#2138)
* Fix issue where checkinExpected channel might have out dated information. * Run mage fmt. * Add changelog entry. * Increase rate lime for failure in test for slow CI runners. * Cleanups from code review. * Refactor the design of ensuring an initial expected comes from the observed message.
1 parent 5bc81d9 commit fefe64f

12 files changed

+550
-76
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix issue where its possible for a component to receive a unit without a config
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
#description:
19+
20+
# Affected component; a word indicating the component this changeset affects.
21+
component:
22+
23+
# PR number; optional; the PR number that added the changeset.
24+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
25+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
26+
# Please provide it if you are adding a fragment for a different PR.
27+
pr: 2138
28+
29+
# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
30+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
31+
issue: 2086

internal/pkg/agent/control/v1/proto/control_v1.pb.go

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/agent/control/v1/proto/control_v1_grpc.pb.go

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/agent/control/v2/cproto/control_v2.pb.go

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/agent/control/v2/cproto/control_v2_grpc.pb.go

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/component/fake/component/main.go

+65-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"io"
1313
"os"
1414
"os/signal"
15+
"strconv"
1516
"syscall"
1617
"time"
1718

@@ -46,7 +47,7 @@ func main() {
4647
}
4748

4849
func run() error {
49-
logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
50+
logger := zerolog.New(os.Stderr).Level(zerolog.TraceLevel).With().Timestamp().Logger()
5051
ver := client.VersionInfo{
5152
Name: fake,
5253
Version: "1.0",
@@ -347,7 +348,8 @@ type fakeInput struct {
347348
state client.UnitState
348349
stateMsg string
349350

350-
canceller context.CancelFunc
351+
canceller context.CancelFunc
352+
killerCanceller context.CancelFunc
351353
}
352354

353355
func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager *stateManager, unit *client.Unit, cfg *proto.UnitExpectedConfig) (*fakeInput, error) {
@@ -399,7 +401,7 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager *
399401
}
400402
}()
401403
i.canceller = cancel
402-
404+
i.parseConfig(cfg)
403405
return i, nil
404406
}
405407

@@ -429,6 +431,7 @@ func (f *fakeInput) Update(u *client.Unit) error {
429431
return fmt.Errorf("unit type changed with the same unit ID: %s", config.Type)
430432
}
431433

434+
f.parseConfig(config)
432435
state, stateMsg, err := getStateFromConfig(config)
433436
if err != nil {
434437
return fmt.Errorf("unit config parsing error: %w", err)
@@ -440,6 +443,65 @@ func (f *fakeInput) Update(u *client.Unit) error {
440443
return nil
441444
}
442445

446+
func (f *fakeInput) parseConfig(config *proto.UnitExpectedConfig) {
447+
// handle a case for killing the component when the pid of the component
448+
// matches the current running PID
449+
cfg := config.Source.AsMap()
450+
killPIDRaw, kill := cfg["kill"]
451+
if kill {
452+
f.maybeKill(killPIDRaw)
453+
}
454+
455+
// handle a case where random killing of the component is enabled
456+
_, killOnInterval := cfg["kill_on_interval"]
457+
f.logger.Trace().Bool("kill_on_interval", killOnInterval).Msg("kill_on_interval config set value")
458+
if killOnInterval {
459+
f.logger.Info().Msg("starting interval killer")
460+
f.runKiller()
461+
} else {
462+
f.logger.Info().Msg("stopping interval killer")
463+
f.stopKiller()
464+
}
465+
}
466+
467+
func (f *fakeInput) maybeKill(pidRaw interface{}) {
468+
if killPID, ok := pidRaw.(string); ok {
469+
if pid, err := strconv.Atoi(killPID); err == nil {
470+
if pid == os.Getpid() {
471+
f.logger.Warn().Msg("killing from config pid")
472+
os.Exit(1)
473+
}
474+
}
475+
}
476+
}
477+
478+
func (f *fakeInput) runKiller() {
479+
if f.killerCanceller != nil {
480+
// already running
481+
return
482+
}
483+
ctx, canceller := context.WithCancel(context.Background())
484+
f.killerCanceller = canceller
485+
go func() {
486+
t := time.NewTimer(500 * time.Millisecond)
487+
defer t.Stop()
488+
select {
489+
case <-ctx.Done():
490+
return
491+
case <-t.C:
492+
f.logger.Warn().Msg("killer performing kill")
493+
os.Exit(1)
494+
}
495+
}()
496+
}
497+
498+
func (f *fakeInput) stopKiller() {
499+
if f.killerCanceller != nil {
500+
f.killerCanceller()
501+
f.killerCanceller = nil
502+
}
503+
}
504+
443505
type stateSetterAction struct {
444506
input *fakeInput
445507
}

pkg/component/fake/shipper/listener.go

+8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"net"
1212
"os"
13+
"path/filepath"
1314
"strings"
1415
)
1516

@@ -21,6 +22,13 @@ func createListener(path string) (net.Listener, error) {
2122
if _, err := os.Stat(path); !os.IsNotExist(err) {
2223
os.Remove(path)
2324
}
25+
dir := filepath.Dir(path)
26+
if _, err := os.Stat(dir); os.IsNotExist(err) {
27+
err := os.MkdirAll(dir, 0750)
28+
if err != nil {
29+
return nil, err
30+
}
31+
}
2432
lis, err := net.Listen("unix", path)
2533
if err != nil {
2634
return nil, err

pkg/component/runtime/command.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
151151
sendExpected := c.state.syncExpected(&newComp)
152152
changed := c.state.syncUnits(&newComp)
153153
if sendExpected || c.state.unsettled() {
154-
comm.CheckinExpected(c.state.toCheckinExpected())
154+
comm.CheckinExpected(c.state.toCheckinExpected(), nil)
155155
}
156156
if changed {
157157
c.sendObserved()
@@ -177,7 +177,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
177177
sendExpected = true
178178
}
179179
if sendExpected {
180-
comm.CheckinExpected(c.state.toCheckinExpected())
180+
comm.CheckinExpected(c.state.toCheckinExpected(), checkin)
181181
}
182182
if changed {
183183
c.sendObserved()
@@ -331,10 +331,6 @@ func (c *CommandRuntime) start(comm Communicator) error {
331331
c.lastCheckin = time.Time{}
332332
c.missedCheckins = 0
333333

334-
// Ensure there is no pending checkin expected message buffered to avoid sending the new process
335-
// the expected state of the previous process: https://github.com/elastic/beats/issues/34137
336-
comm.ClearPendingCheckinExpected()
337-
338334
proc, err := process.Start(path,
339335
process.WithArgs(args),
340336
process.WithEnv(env),

pkg/component/runtime/conn_info_server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service
5757
return nil
5858
}
5959

60-
func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected) {
60+
func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected, observed *proto.CheckinObserved) {
6161
}
6262

6363
func (c *mockCommunicator) ClearPendingCheckinExpected() {

0 commit comments

Comments
 (0)