diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go index 2f127c699f0..84e30dd40df 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go @@ -6,7 +6,10 @@ package handlers import ( "context" + "errors" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" @@ -25,8 +28,15 @@ import ( ) type mockUpgradeManager struct { - msgChan chan string - completedChan chan struct{} + UpgradeFn func( + ctx context.Context, + version string, + sourceURI string, + action *fleetapi.ActionUpgrade, + details *details.Details, + skipVerifyOverride bool, + skipDefaultPgp bool, + pgpBytes ...string) (reexec.ShutdownCallbackFn, error) } func (u *mockUpgradeManager) Upgradeable() bool { @@ -37,15 +47,25 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error { return nil } -func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { - select { - case <-u.completedChan: - u.msgChan <- "completed " + version - return nil, nil - case <-ctx.Done(): - u.msgChan <- "canceled " + version - return nil, ctx.Err() - } +func (u *mockUpgradeManager) Upgrade( + ctx context.Context, + version string, + sourceURI string, + action *fleetapi.ActionUpgrade, + details *details.Details, + skipVerifyOverride bool, + skipDefaultPgp bool, + pgpBytes ...string) (reexec.ShutdownCallbackFn, error) { + + return u.UpgradeFn( + ctx, + version, + sourceURI, + action, + details, + skipVerifyOverride, + skipDefaultPgp, + pgpBytes...) } func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error { @@ -65,8 +85,7 @@ func TestUpgradeHandler(t *testing.T) { log, _ := logger.New("", false) agentInfo := &info.AgentInfo{} - msgChan := make(chan string) - completedChan := make(chan struct{}) + upgradeCalledChan := make(chan struct{}) // Create and start the coordinator c := coordinator.New( @@ -76,7 +95,21 @@ func TestUpgradeHandler(t *testing.T) { agentInfo, component.RuntimeSpecs{}, nil, - &mockUpgradeManager{msgChan: msgChan, completedChan: completedChan}, + &mockUpgradeManager{ + UpgradeFn: func( + ctx context.Context, + version string, + sourceURI string, + action *fleetapi.ActionUpgrade, + details *details.Details, + skipVerifyOverride bool, + skipDefaultPgp bool, + pgpBytes ...string) (reexec.ShutdownCallbackFn, error) { + + upgradeCalledChan <- struct{}{} + return nil, nil + }, + }, nil, nil, nil, nil, nil, false) //nolint:errcheck // We don't need the termination state of the Coordinator go c.Run(ctx) @@ -86,11 +119,14 @@ func TestUpgradeHandler(t *testing.T) { Version: "8.3.0", SourceURI: "http://localhost"}} ack := noopacker.New() err := u.Handle(ctx, &a, ack) - // indicate that upgrade is completed - close(completedChan) require.NoError(t, err) - msg := <-msgChan - require.Equal(t, "completed 8.3.0", msg) + + // Make sure this test does not dead lock or wait for too long + select { + case <-time.Tick(50 * time.Millisecond): + t.Fatal("mockUpgradeManager.Upgrade was not called") + case <-upgradeCalledChan: + } } func TestUpgradeHandlerSameVersion(t *testing.T) { @@ -102,10 +138,10 @@ func TestUpgradeHandlerSameVersion(t *testing.T) { log, _ := logger.New("", false) agentInfo := &info.AgentInfo{} - msgChan := make(chan string) - completedChan := make(chan struct{}) + upgradeCalledChan := make(chan struct{}) // Create and start the Coordinator + upgradeCalled := atomic.Bool{} c := coordinator.New( log, configuration.DefaultConfiguration(), @@ -113,7 +149,26 @@ func TestUpgradeHandlerSameVersion(t *testing.T) { agentInfo, component.RuntimeSpecs{}, nil, - &mockUpgradeManager{msgChan: msgChan, completedChan: completedChan}, + &mockUpgradeManager{ + UpgradeFn: func( + ctx context.Context, + version string, + sourceURI string, + action *fleetapi.ActionUpgrade, + details *details.Details, + skipVerifyOverride bool, + skipDefaultPgp bool, + pgpBytes ...string) (reexec.ShutdownCallbackFn, error) { + + if upgradeCalled.CompareAndSwap(false, true) { + upgradeCalledChan <- struct{}{} + return nil, nil + } + err := errors.New("mockUpgradeManager.Upgrade called more than once") + t.Error(err.Error()) + return nil, err + }, + }, nil, nil, nil, nil, nil, false) //nolint:errcheck // We don't need the termination state of the Coordinator go c.Run(ctx) @@ -126,10 +181,13 @@ func TestUpgradeHandlerSameVersion(t *testing.T) { err2 := u.Handle(ctx, &a, ack) require.NoError(t, err1) require.NoError(t, err2) - // indicate that upgrade is completed - close(completedChan) - msg := <-msgChan - require.Equal(t, "completed 8.3.0", msg) + + // Make sure this test does not dead lock or wait for too long + select { + case <-time.Tick(50 * time.Millisecond): + t.Fatal("mockUpgradeManager.Upgrade was not called") + case <-upgradeCalledChan: + } } func TestUpgradeHandlerNewVersion(t *testing.T) { @@ -139,10 +197,9 @@ func TestUpgradeHandlerNewVersion(t *testing.T) { defer cancel() log, _ := logger.New("", false) + upgradeCalledChan := make(chan string) agentInfo := &info.AgentInfo{} - msgChan := make(chan string) - completedChan := make(chan struct{}) // Create and start the Coordinator c := coordinator.New( @@ -152,7 +209,27 @@ func TestUpgradeHandlerNewVersion(t *testing.T) { agentInfo, component.RuntimeSpecs{}, nil, - &mockUpgradeManager{msgChan: msgChan, completedChan: completedChan}, + &mockUpgradeManager{ + UpgradeFn: func( + ctx context.Context, + version string, + sourceURI string, + action *fleetapi.ActionUpgrade, + details *details.Details, + skipVerifyOverride bool, + skipDefaultPgp bool, + pgpBytes ...string) (reexec.ShutdownCallbackFn, error) { + + defer func() { + upgradeCalledChan <- version + }() + if version == "8.2.0" { + return nil, errors.New("upgrade to 8.2.0 will always fail") + } + + return nil, nil + }, + }, nil, nil, nil, nil, nil, false) //nolint:errcheck // We don't need the termination state of the Coordinator go c.Run(ctx) @@ -163,14 +240,25 @@ func TestUpgradeHandlerNewVersion(t *testing.T) { a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{ Version: "8.5.0", SourceURI: "http://localhost"}} ack := noopacker.New() + + checkMsg := func(c <-chan string, expected, errMsg string) { + t.Helper() + // Make sure this test does not dead lock or wait for too long + // For some reason < 1s sometimes makes the test fail. + select { + case <-time.Tick(1300 * time.Millisecond): + t.Fatal("timed out waiting for Upgrade to return") + case msg := <-c: + require.Equal(t, expected, msg, errMsg) + } + } + + // Send both upgrade actions, a1 will error before a2 succeeds err1 := u.Handle(ctx, &a1, ack) require.NoError(t, err1) + checkMsg(upgradeCalledChan, "8.2.0", "first call must be with version 8.2.0") + err2 := u.Handle(ctx, &a2, ack) require.NoError(t, err2) - msg1 := <-msgChan - require.Equal(t, "canceled 8.2.0", msg1) - // indicate that upgrade is completed - close(completedChan) - msg2 := <-msgChan - require.Equal(t, "completed 8.5.0", msg2) + checkMsg(upgradeCalledChan, "8.5.0", "second call to Upgrade must be with version 8.5.0") } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index bdd05e32723..512181494c9 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/install" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control" "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -1209,7 +1210,7 @@ func createTempDir(t *testing.T) string { cleanup := func() { if !t.Failed() { - if err := os.RemoveAll(tempDir); err != nil { + if err := install.RemovePath(tempDir); err != nil { t.Errorf("could not remove temp dir '%s': %s", tempDir, err) } } else { diff --git a/testing/integration/event_logging_test.go b/testing/integration/event_logging_test.go index aecd1eb9de2..11a90da8bd5 100644 --- a/testing/integration/event_logging_test.go +++ b/testing/integration/event_logging_test.go @@ -75,7 +75,6 @@ func TestEventLogFile(t *testing.T) { Local: true, Sudo: false, }) - t.Skip("Flaky test: https://github.com/elastic/elastic-agent/issues/5397") ctx, cancel := testcontext.WithDeadline( t, context.Background(),