Skip to content

Commit 8284ce2

Browse files
authored
Refactoring HTTP downloader progress reporter to accept multiple observers (#3542)
* Refactoring HTTP downlader progress reporter to accept multiple observers * Improving names * Running mage fmt * Fixing conflicts * Rename variable * Renaming receivers * Better variable name * Add comment about Report callers needing to cancel context * Add optimization * Remove context and handle cancellation internally instead * More optimizations * Add back context * Make test more robust * Print logs if assertions fail
1 parent 82896f5 commit 8284ce2

File tree

5 files changed

+322
-204
lines changed

5 files changed

+322
-204
lines changed

internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go

+7-145
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@ import (
1717
"strings"
1818
"time"
1919

20-
"github.com/docker/go-units"
21-
22-
"github.com/elastic/elastic-agent-libs/atomic"
2320
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
2421

2522
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
2623
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download"
2724
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
25+
"github.com/elastic/elastic-agent/pkg/core/logger"
2826
)
2927

3028
const (
@@ -46,13 +44,13 @@ const (
4644

4745
// Downloader is a downloader able to fetch artifacts from elastic.co web page.
4846
type Downloader struct {
49-
log progressLogger
47+
log *logger.Logger
5048
config *artifact.Config
5149
client http.Client
5250
}
5351

5452
// NewDownloader creates and configures Elastic Downloader
55-
func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, error) {
53+
func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) {
5654
client, err := config.HTTPTransportSettings.Client(
5755
httpcommon.WithAPMHTTPInstrumentation(),
5856
httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second},
@@ -66,7 +64,7 @@ func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, er
6664
}
6765

6866
// NewDownloaderWithClient creates Elastic Downloader with specific client used
69-
func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client http.Client) *Downloader {
67+
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader {
7068
return &Downloader{
7169
log: log,
7270
config: config,
@@ -208,152 +206,16 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
208206
}
209207
}
210208

211-
reportCtx, reportCancel := context.WithCancel(ctx)
212-
dp := newDownloadProgressReporter(e.log, sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize)
213-
dp.Report(reportCtx)
209+
loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
210+
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
211+
dp.Report(ctx)
214212
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
215213
if err != nil {
216-
reportCancel()
217214
dp.ReportFailed(err)
218215
// return path, file already exists and needs to be cleaned up
219216
return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
220217
}
221-
reportCancel()
222218
dp.ReportComplete()
223219

224220
return fullPath, nil
225221
}
226-
227-
type downloadProgressReporter struct {
228-
log progressLogger
229-
sourceURI string
230-
interval time.Duration
231-
warnTimeout time.Duration
232-
length float64
233-
234-
downloaded atomic.Int
235-
started time.Time
236-
}
237-
238-
func newDownloadProgressReporter(log progressLogger, sourceURI string, timeout time.Duration, length int) *downloadProgressReporter {
239-
interval := time.Duration(float64(timeout) * downloadProgressIntervalPercentage)
240-
if interval == 0 {
241-
interval = downloadProgressMinInterval
242-
}
243-
244-
return &downloadProgressReporter{
245-
log: log,
246-
sourceURI: sourceURI,
247-
interval: interval,
248-
warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage),
249-
length: float64(length),
250-
}
251-
}
252-
253-
func (dp *downloadProgressReporter) Write(b []byte) (int, error) {
254-
n := len(b)
255-
dp.downloaded.Add(n)
256-
return n, nil
257-
}
258-
259-
func (dp *downloadProgressReporter) Report(ctx context.Context) {
260-
started := time.Now()
261-
dp.started = started
262-
sourceURI := dp.sourceURI
263-
log := dp.log
264-
length := dp.length
265-
warnTimeout := dp.warnTimeout
266-
interval := dp.interval
267-
268-
go func() {
269-
t := time.NewTicker(interval)
270-
defer t.Stop()
271-
for {
272-
select {
273-
case <-ctx.Done():
274-
return
275-
case <-t.C:
276-
now := time.Now()
277-
timePast := now.Sub(started)
278-
downloaded := float64(dp.downloaded.Load())
279-
bytesPerSecond := downloaded / float64(timePast/time.Second)
280-
281-
var msg string
282-
var args []interface{}
283-
if length > 0 {
284-
// length of the download is known, so more detail can be provided
285-
percentComplete := downloaded / length * 100.0
286-
msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps"
287-
args = []interface{}{
288-
sourceURI, units.HumanSize(downloaded), units.HumanSize(length), percentComplete, units.HumanSize(bytesPerSecond),
289-
}
290-
} else {
291-
// length unknown so provide the amount downloaded and the speed
292-
msg = "download progress from %s has fetched %s @ %sps"
293-
args = []interface{}{
294-
sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond),
295-
}
296-
}
297-
298-
log.Infof(msg, args...)
299-
if timePast >= warnTimeout {
300-
// duplicate to warn when over the warnTimeout; this still has it logging to info that way if
301-
// they are filtering the logs to info they still see the messages when over the warnTimeout, but
302-
// when filtering only by warn they see these messages only
303-
log.Warnf(msg, args...)
304-
}
305-
}
306-
}
307-
}()
308-
}
309-
310-
func (dp *downloadProgressReporter) ReportComplete() {
311-
now := time.Now()
312-
timePast := now.Sub(dp.started)
313-
downloaded := float64(dp.downloaded.Load())
314-
bytesPerSecond := downloaded / float64(timePast/time.Second)
315-
msg := "download from %s completed in %s @ %sps"
316-
args := []interface{}{
317-
dp.sourceURI, units.HumanDuration(timePast), units.HumanSize(bytesPerSecond),
318-
}
319-
dp.log.Infof(msg, args...)
320-
if timePast >= dp.warnTimeout {
321-
// see reason in `Report`
322-
dp.log.Warnf(msg, args...)
323-
}
324-
}
325-
326-
func (dp *downloadProgressReporter) ReportFailed(err error) {
327-
now := time.Now()
328-
timePast := now.Sub(dp.started)
329-
downloaded := float64(dp.downloaded.Load())
330-
bytesPerSecond := downloaded / float64(timePast/time.Second)
331-
var msg string
332-
var args []interface{}
333-
if dp.length > 0 {
334-
// length of the download is known, so more detail can be provided
335-
percentComplete := downloaded / dp.length * 100.0
336-
msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s"
337-
args = []interface{}{
338-
dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(dp.length), percentComplete, units.HumanSize(bytesPerSecond), err,
339-
}
340-
} else {
341-
// length unknown so provide the amount downloaded and the speed
342-
msg = "download from %s failed at %s @ %sps: %s"
343-
args = []interface{}{
344-
dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), err,
345-
}
346-
}
347-
dp.log.Infof(msg, args...)
348-
if timePast >= dp.warnTimeout {
349-
// see reason in `Report`
350-
dp.log.Warnf(msg, args...)
351-
}
352-
}
353-
354-
// progressLogger is a logger that only needs to implement Infof and Warnf, as those are the only functions
355-
// that the downloadProgressReporter uses.
356-
type progressLogger interface {
357-
Infof(format string, args ...interface{})
358-
Warnf(format string, args ...interface{})
359-
}

0 commit comments

Comments
 (0)