Skip to content

Commit 485fad9

Browse files
authored
Fix file delivery order (#3283)
1 parent 5c244b3 commit 485fad9

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: fix file reassembly for large files
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; a word indicating the component this changeset affects.
22+
component:
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: 3283
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/file/delivery/delivery.go

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"io"
13+
"sort"
1314

1415
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
1516
"github.com/elastic/fleet-server/v7/internal/pkg/file"
@@ -77,6 +78,9 @@ func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileI
7778
func (d *Deliverer) SendFile(ctx context.Context, zlog zerolog.Logger, w io.Writer, chunks []file.ChunkInfo, fileID string) error {
7879
span, ctx := apm.StartSpan(ctx, "response", "write")
7980
defer span.End()
81+
sort.SliceStable(chunks, func(i, j int) bool {
82+
return chunks[i].Pos < chunks[j].Pos
83+
})
8084
for _, chunkInfo := range chunks {
8185
body, err := readChunkStream(ctx, d.client, chunkInfo.Index, chunkInfo.ID)
8286
if err != nil {

internal/pkg/file/delivery/delivery_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"fmt"
1313
"io"
1414
"net/http"
15+
"strconv"
1516
"strings"
1617
"testing"
1718

@@ -252,6 +253,64 @@ func TestSendFileMultipleChunksUsesBackingIndex(t *testing.T) {
252253
require.NoError(t, err)
253254
}
254255

256+
func TestSendFileHandlesDisorderedChunks(t *testing.T) {
257+
buf := bytes.NewBuffer(nil)
258+
259+
fakeBulk := itesting.NewMockBulk()
260+
esClient, esMock := mockESClient(t)
261+
262+
const fileID = "xyz"
263+
idx := fmt.Sprintf(FileDataIndexPattern, "endpoint") + "-0001"
264+
sampleDocBody := hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")
265+
266+
chunks := []file.ChunkInfo{
267+
{Index: idx, ID: fileID + ".20", Pos: 20},
268+
{Index: idx, ID: fileID + ".21", Pos: 21},
269+
{Index: idx, ID: fileID + ".22", Pos: 22},
270+
{Index: idx, ID: fileID + ".9", Pos: 9},
271+
{Index: idx, ID: fileID + ".10", Pos: 10},
272+
{Index: idx, ID: fileID + ".11", Pos: 11},
273+
{Index: idx, ID: fileID + ".12", Pos: 12},
274+
{Index: idx, ID: fileID + ".13", Pos: 13},
275+
{Index: idx, ID: fileID + ".14", Pos: 14},
276+
{Index: idx, ID: fileID + ".15", Pos: 15},
277+
{Index: idx, ID: fileID + ".16", Pos: 16},
278+
{Index: idx, ID: fileID + ".17", Pos: 17},
279+
{Index: idx, ID: fileID + ".18", Pos: 18},
280+
{Index: idx, ID: fileID + ".19", Pos: 19},
281+
{Index: idx, ID: fileID + ".0", Pos: 0},
282+
{Index: idx, ID: fileID + ".1", Pos: 1},
283+
{Index: idx, ID: fileID + ".2", Pos: 2},
284+
{Index: idx, ID: fileID + ".3", Pos: 3},
285+
{Index: idx, ID: fileID + ".4", Pos: 4},
286+
{Index: idx, ID: fileID + ".5", Pos: 5},
287+
{Index: idx, ID: fileID + ".6", Pos: 6},
288+
{Index: idx, ID: fileID + ".7", Pos: 7},
289+
{Index: idx, ID: fileID + ".8", Pos: 8},
290+
}
291+
292+
expectedIdx := 0
293+
294+
esMock.RoundTripFn = func(req *http.Request) (*http.Response, error) {
295+
296+
// Parse out the chunk number requested
297+
parts := strings.Split(req.URL.Path, "/") // ["", ".fleet-filedelivery-data-endpoint-0001", "_doc", "xyz.1"]
298+
docIdx := strings.TrimPrefix(parts[3], fileID+".")
299+
docnum, err := strconv.Atoi(docIdx)
300+
require.NoError(t, err)
301+
302+
// should be our expected increasing counter
303+
assert.Equal(t, expectedIdx, docnum)
304+
expectedIdx += 1
305+
306+
return sendBodyBytes(sampleDocBody), nil
307+
}
308+
309+
d := New(esClient, fakeBulk, -1)
310+
err := d.SendFile(context.Background(), zerolog.Logger{}, buf, chunks, fileID)
311+
require.NoError(t, err)
312+
}
313+
255314
/*
256315
Setup to convert a *elasticsearch.Client as a harmless mock
257316
by replacing the Transport to nowhere

0 commit comments

Comments
 (0)