Skip to content

Commit cf41f38

Browse files
Replace most context.TODO calls, comment on other Background and TODOs (#4168)
1 parent 67b26e5 commit cf41f38

29 files changed

+247
-191
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: other
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Replace use of context.TODO
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: fleet-server
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/fleet-server/issues/3087

cmd/fleet/main.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/elastic/fleet-server/v7/internal/pkg/signal"
2222
"github.com/elastic/fleet-server/v7/internal/pkg/state"
2323

24+
"github.com/rs/zerolog"
2425
"github.com/rs/zerolog/log"
2526
"github.com/spf13/cobra"
2627
)
@@ -117,12 +118,13 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error
117118
return err
118119
}
119120

120-
srv, err := server.NewFleet(bi, state.NewLog(), true)
121+
ctx := installSignalHandler()
122+
srv, err := server.NewFleet(bi, state.NewLog(zerolog.Ctx(ctx)), true)
121123
if err != nil {
122124
return err
123125
}
124126

125-
if err := srv.Run(installSignalHandler(), cfg); err != nil && !errors.Is(err, context.Canceled) {
127+
if err := srv.Run(ctx, cfg); err != nil && !errors.Is(err, context.Canceled) {
126128
log.Error().Err(err).Msg("Exiting")
127129
l.Sync()
128130
return err

internal/pkg/action/dispatcher.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
7070

7171
// Subscribe generates a new subscription with the Dispatcher using the provided agentID and seqNo.
7272
// There is no check to ensure that the agentID has not been used; using the same one twice results in undefined behaviour.
73-
func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
73+
func (d *Dispatcher) Subscribe(log zerolog.Logger, agentID string, seqNo sqn.SeqNo) *Sub {
7474
cbCh := make(chan []model.Action, 1)
7575

7676
sub := Sub{
@@ -84,14 +84,14 @@ func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
8484
sz := len(d.subs)
8585
d.mx.Unlock()
8686

87-
zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")
87+
log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")
8888

8989
return &sub
9090
}
9191

9292
// Unsubscribe removes the given subscription from the dispatcher.
9393
// Note that the channel sub.Ch() provides is not closed in this event.
94-
func (d *Dispatcher) Unsubscribe(sub *Sub) {
94+
func (d *Dispatcher) Unsubscribe(log zerolog.Logger, sub *Sub) {
9595
if sub == nil {
9696
return
9797
}
@@ -101,7 +101,7 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) {
101101
sz := len(d.subs)
102102
d.mx.Unlock()
103103

104-
zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
104+
log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
105105
}
106106

107107
// process gathers actions from the monitor and dispatches them to the corresponding subscriptions.

internal/pkg/api/handleArtifacts.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ func (at ArtifactT) fetchArtifact(ctx context.Context, zlog zerolog.Logger, iden
213213
span, ctx := apm.StartSpan(ctx, "fetchArtifact", "search")
214214
defer span.End()
215215
// Throttle prevents more than N outstanding requests to elastic globally and per sha2.
216-
if token := at.esThrottle.Acquire(sha2, defaultThrottleTTL); token == nil {
216+
if token := at.esThrottle.Acquire(zlog, sha2, defaultThrottleTTL); token == nil {
217217
return nil, ErrorThrottle
218218
} else {
219-
defer token.Release()
219+
defer token.Release(zlog)
220220
}
221221

222222
start := time.Now()

internal/pkg/api/handleCheckin.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
277277
}
278278

279279
// Subscribe to actions dispatcher
280-
aSub := ct.ad.Subscribe(agent.Id, seqno)
281-
defer ct.ad.Unsubscribe(aSub)
280+
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
281+
defer ct.ad.Unsubscribe(zlog, aSub)
282282
actCh := aSub.Ch()
283283

284284
// use revision_idx=0 if the agent has a single output where no API key is defined

internal/pkg/api/handleFileDelivery.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@
55
package api
66

77
import (
8-
"context"
98
"errors"
109
"net/http"
1110
"strconv"
1211

12+
"github.com/rs/zerolog"
13+
1314
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
1415
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
1516
"github.com/elastic/fleet-server/v7/internal/pkg/config"
1617
"github.com/elastic/fleet-server/v7/internal/pkg/file/delivery"
1718
"github.com/elastic/fleet-server/v7/internal/pkg/model"
1819
"github.com/elastic/go-elasticsearch/v8"
19-
"github.com/rs/zerolog"
2020
)
2121

2222
type FileDeliveryT struct {
@@ -28,11 +28,6 @@ type FileDeliveryT struct {
2828
}
2929

3030
func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
31-
zerolog.Ctx(context.TODO()).Info().
32-
Interface("limits", cfg.Limits.ArtifactLimit).
33-
Int64("maxFileSize", maxFileSize).
34-
Msg("upload limits")
35-
3631
return &FileDeliveryT{
3732
chunkClient: chunkClient,
3833
bulker: bulker,

internal/pkg/api/handleUpload.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
"strings"
1717
"time"
1818

19+
"github.com/rs/zerolog"
20+
"go.elastic.co/apm/v2"
21+
1922
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
2023
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
2124
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
@@ -25,8 +28,6 @@ import (
2528
"github.com/elastic/fleet-server/v7/internal/pkg/file/uploader"
2629
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2730
"github.com/elastic/go-elasticsearch/v8"
28-
"github.com/rs/zerolog"
29-
"go.elastic.co/apm/v2"
3031
)
3132

3233
const (
@@ -53,11 +54,6 @@ type UploadT struct {
5354
}
5455

5556
func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
56-
zerolog.Ctx(context.TODO()).Info().
57-
Interface("limits", cfg.Limits.ArtifactLimit).
58-
Int64("maxFileSize", maxFileSize).
59-
Msg("upload limits")
60-
6157
return &UploadT{
6258
chunkClient: chunkClient,
6359
bulker: bulker,

internal/pkg/api/metrics.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ import (
1010
"fmt"
1111
"sync"
1212

13-
"github.com/elastic/elastic-agent-libs/api"
14-
cfglib "github.com/elastic/elastic-agent-libs/config"
15-
"github.com/elastic/elastic-agent-libs/monitoring"
16-
"github.com/elastic/elastic-agent-system-metrics/report"
1713
"github.com/prometheus/client_golang/prometheus"
1814
"github.com/prometheus/client_golang/prometheus/promhttp"
1915
"github.com/rs/zerolog"
2016
apmprometheus "go.elastic.co/apm/module/apmprometheus/v2"
2117
"go.elastic.co/apm/v2"
2218

19+
"github.com/elastic/elastic-agent-libs/api"
20+
cfglib "github.com/elastic/elastic-agent-libs/config"
21+
"github.com/elastic/elastic-agent-libs/monitoring"
22+
"github.com/elastic/elastic-agent-system-metrics/report"
23+
2324
"github.com/elastic/fleet-server/v7/internal/pkg/build"
2425
"github.com/elastic/fleet-server/v7/internal/pkg/config"
2526
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
@@ -56,7 +57,7 @@ var (
5657
func init() {
5758
err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion)
5859
if err != nil {
59-
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics")
60+
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics") // TODO is used because this may logged during the package load
6061
}
6162

6263
registry = newMetricsRegistry("http_server")

internal/pkg/api/server.go

+22-20
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (s *server) Run(ctx context.Context) error {
6060
MaxHeaderBytes: mhbz,
6161
BaseContext: func(net.Listener) context.Context { return ctx },
6262
ErrorLog: errLogger(ctx),
63-
ConnState: diagConn,
63+
ConnState: getDiagConnFunc(ctx),
6464
}
6565

6666
var listenCfg net.ListenConfig
@@ -117,7 +117,7 @@ func (s *server) Run(ctx context.Context) error {
117117
}
118118
// Do a clean shutdown if the context is cancelled
119119
case <-ctx.Done():
120-
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain)
120+
sCtx, cancel := context.WithTimeout(context.Background(), s.cfg.Timeouts.Drain) // Background context to allow connections to drain when server context is cancelled.
121121
defer cancel()
122122
if err := srv.Shutdown(sCtx); err != nil {
123123
cErr := srv.Close() // force it closed
@@ -128,24 +128,26 @@ func (s *server) Run(ctx context.Context) error {
128128
return nil
129129
}
130130

131-
func diagConn(c net.Conn, s http.ConnState) {
132-
if c == nil {
133-
return
134-
}
131+
func getDiagConnFunc(ctx context.Context) func(c net.Conn, s http.ConnState) {
132+
return func(c net.Conn, s http.ConnState) {
133+
if c == nil {
134+
return
135+
}
135136

136-
zerolog.Ctx(context.TODO()).Trace().
137-
Str("local", c.LocalAddr().String()).
138-
Str("remote", c.RemoteAddr().String()).
139-
Str("state", s.String()).
140-
Msg("connection state change")
141-
142-
switch s {
143-
case http.StateNew:
144-
cntHTTPNew.Inc()
145-
cntHTTPActive.Inc()
146-
case http.StateClosed:
147-
cntHTTPClose.Inc()
148-
cntHTTPActive.Dec()
137+
zerolog.Ctx(ctx).Trace().
138+
Str("local", c.LocalAddr().String()).
139+
Str("remote", c.RemoteAddr().String()).
140+
Str("state", s.String()).
141+
Msg("connection state change")
142+
143+
switch s {
144+
case http.StateNew:
145+
cntHTTPNew.Inc()
146+
cntHTTPActive.Inc()
147+
case http.StateClosed:
148+
cntHTTPClose.Inc()
149+
cntHTTPActive.Dec()
150+
}
149151
}
150152
}
151153

@@ -157,7 +159,7 @@ func wrapConnLimitter(ctx context.Context, ln net.Listener, cfg *config.Server)
157159
Int("hardConnLimit", hardLimit).
158160
Msg("server hard connection limiter installed")
159161

160-
ln = limit.Listener(ln, hardLimit)
162+
ln = limit.Listener(ln, hardLimit, zerolog.Ctx(ctx))
161163
} else {
162164
zerolog.Ctx(ctx).Info().Msg("server hard connection limiter disabled")
163165
}

internal/pkg/bulk/engine.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import (
2121
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
2222
"github.com/elastic/go-ucfg"
2323

24-
"github.com/elastic/go-elasticsearch/v8"
25-
"github.com/elastic/go-elasticsearch/v8/esapi"
2624
"github.com/rs/zerolog"
2725
"go.elastic.co/apm/v2"
2826
"golang.org/x/sync/semaphore"
27+
28+
"github.com/elastic/go-elasticsearch/v8"
29+
"github.com/elastic/go-elasticsearch/v8/esapi"
2930
)
3031

3132
type APIKey = apikey.APIKey
@@ -164,7 +165,7 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou
164165
cancelFn()
165166
}
166167
}
167-
bulkCtx, bulkCancel := context.WithCancel(context.Background())
168+
bulkCtx, bulkCancel := context.WithCancel(context.Background()) // background context used to allow bulker to flush on exit, exits when config changes or primary bulker exits.
168169
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap)
169170
if err != nil {
170171
defer bulkCancel()

0 commit comments

Comments
 (0)