Skip to content

Commit 4ad7824

Browse files
authored
Fix failing upgrade command when gRPC server interrupts connection (#4519)
The upgrade CLI command creates a client, connects to the Elastic Agent server and triggers the upgrade process which is supposed to shut down the agent and start the upgrade. Sometimes, this happens before the client gets a successful response and the upgrade command returns an error code although the upgrade succeeded. This change checks if the connection was interrupted and treats it as a successful result instead of error.
1 parent b71f072 commit 4ad7824

File tree

4 files changed

+120
-3
lines changed

4 files changed

+120
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kind: bug-fix
2+
summary: Fix failing upgrade command when gRPC server interrupts connection
3+
component: "elastic-agent"
4+
pr: https://github.com/elastic/elastic-agent/pull/4519
5+
issue: https://github.com/elastic/elastic-agent/issues/3890

internal/pkg/agent/cmd/upgrade.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"context"
99
"fmt"
1010
"os"
11+
"strings"
1112

1213
"github.com/spf13/cobra"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1316

1417
"github.com/elastic/elastic-agent/pkg/control"
1518
"github.com/elastic/elastic-agent/pkg/control/v2/client"
@@ -55,10 +58,14 @@ func newUpgradeCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Comman
5558
}
5659

5760
func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
61+
c := client.New()
62+
return upgradeCmdWithClient(streams, cmd, args, c)
63+
}
64+
65+
func upgradeCmdWithClient(streams *cli.IOStreams, cmd *cobra.Command, args []string, c client.Client) error {
5866
version := args[0]
5967
sourceURI, _ := cmd.Flags().GetString(flagSourceURI)
6068

61-
c := client.New()
6269
err := c.Connect(context.Background())
6370
if err != nil {
6471
return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address()))
@@ -106,7 +113,14 @@ func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) error
106113
skipDefaultPgp, _ := cmd.Flags().GetBool(flagSkipDefaultPgp)
107114
version, err = c.Upgrade(context.Background(), version, sourceURI, skipVerification, skipDefaultPgp, pgpChecks...)
108115
if err != nil {
109-
return errors.New(err, "Failed trigger upgrade of daemon")
116+
s, ok := status.FromError(err)
117+
// Sometimes the gRPC server shuts down before replying to the command which is expected
118+
// we can determine this state by the EOF error coming from the server.
119+
// If the server is just unavailable/not running, we should not succeed.
120+
isConnectionInterrupted := ok && s.Code() == codes.Unavailable && strings.Contains(s.Message(), "EOF")
121+
if !isConnectionInterrupted {
122+
return errors.New(err, "Failed trigger upgrade of daemon")
123+
}
110124
}
111125
fmt.Fprintf(streams.Out, "Upgrade triggered to version %s, Elastic Agent is currently restarting\n", version)
112126
return nil
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package cmd
6+
7+
import (
8+
"context"
9+
"net"
10+
"sync/atomic"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"google.golang.org/grpc"
17+
18+
"github.com/elastic/elastic-agent/internal/pkg/cli"
19+
"github.com/elastic/elastic-agent/pkg/control/v2/client"
20+
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
21+
)
22+
23+
func TestUpgradeCmd(t *testing.T) {
24+
t.Run("no error when connection gets interrupted", func(t *testing.T) {
25+
tcpServer, err := net.Listen("tcp", "127.0.0.1:")
26+
require.NoError(t, err)
27+
defer tcpServer.Close()
28+
29+
s := grpc.NewServer()
30+
defer s.Stop()
31+
32+
upgradeCh := make(chan struct{})
33+
mock := &mockServer{upgradeStop: upgradeCh}
34+
cproto.RegisterElasticAgentControlServer(s, mock)
35+
go func() {
36+
err := s.Serve(tcpServer)
37+
assert.NoError(t, err)
38+
}()
39+
40+
clientCh := make(chan struct{})
41+
// use HTTP prefix for the dialer to use TCP, otherwise it's a unix socket/named pipe
42+
c := client.New(client.WithAddress("http://" + tcpServer.Addr().String()))
43+
args := []string{"--skip-verify", "8.13.0"}
44+
streams := cli.NewIOStreams()
45+
cmd := newUpgradeCommandWithArgs(args, streams)
46+
47+
// the upgrade command will hang until the server shut down
48+
go func() {
49+
err = upgradeCmdWithClient(streams, cmd, args, c)
50+
assert.NoError(t, err)
51+
// verify that we actually talked to the server
52+
counter := atomic.LoadInt32(&mock.upgrades)
53+
assert.Equal(t, int32(1), counter, "server should have handled one upgrade")
54+
// unblock the further test execution
55+
close(clientCh)
56+
}()
57+
58+
// we will know that the client reached the server watching the `mock.upgrades` counter
59+
require.Eventually(t, func() bool {
60+
counter := atomic.LoadInt32(&mock.upgrades)
61+
return counter > 0
62+
}, 5*time.Second, 100*time.Millisecond)
63+
64+
// then we close the tcp server which is supposed to interrupt the connection
65+
s.Stop()
66+
// this stops the mock server
67+
close(upgradeCh)
68+
// this makes sure all client assertions are done
69+
<-clientCh
70+
})
71+
}
72+
73+
type mockServer struct {
74+
cproto.ElasticAgentControlServer
75+
upgradeStop <-chan struct{}
76+
upgrades int32
77+
}
78+
79+
func (s *mockServer) Upgrade(ctx context.Context, r *cproto.UpgradeRequest) (resp *cproto.UpgradeResponse, err error) {
80+
atomic.AddInt32(&s.upgrades, 1)
81+
<-s.upgradeStop
82+
return nil, nil
83+
}
84+
85+
func (s *mockServer) State(ctx context.Context, r *cproto.Empty) (resp *cproto.StateResponse, err error) {
86+
return &cproto.StateResponse{
87+
State: cproto.State_HEALTHY,
88+
Info: &cproto.StateAgentInfo{},
89+
}, nil
90+
}

testing/upgradetest/upgrader.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os"
1313
"path/filepath"
1414
"runtime"
15+
"strings"
1516
"time"
1617

1718
"github.com/otiai10/copy"
@@ -325,7 +326,14 @@ func PerformUpgrade(
325326

326327
upgradeOutput, err := startFixture.Exec(ctx, upgradeCmdArgs)
327328
if err != nil {
328-
return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
329+
// Sometimes the gRPC server shuts down before replying to the command which is expected
330+
// we can determine this state by the EOF error coming from the server.
331+
// If the server is just unavailable/not running, we should not succeed.
332+
// Starting with version 8.13.2, this is handled by the upgrade command itself.
333+
isConnectionInterrupted := strings.Contains(err.Error(), "Unavailable") && strings.Contains(err.Error(), "EOF")
334+
if !isConnectionInterrupted {
335+
return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
336+
}
329337
}
330338

331339
// wait for the watcher to show up

0 commit comments

Comments
 (0)