Skip to content

Commit 23d265a

Browse files
tgummererLuap99
andcommitted
fix raceconditions in tail library
This fixes a couple of race conditions in the tail library, combining the ideas of nxadm/tail#70 and nxadm/tail#71. - Currently when StopAtEOF is called, and we previously encountered an EOF already, we stop reading the file immediately. However when tailing a file, new data might have become available in the meantime, before the StopAtEOF is called. The watcher might however not have notified us about that yet. Instead of exiting immediately if that happens, and leaving the data that's already in the file unread, continue iterating until we get the next EOF, as we can be reasonably sure that that's the EOF the user meant to stop at, making sure to read all the data that has been written by the time StopAtEOF is called. - When a StopAtEOF() is called the code should continue to send all lines to the Lines channel. The issue here is if the caller is not ready to receive a new line the code blocks as it is using a unbuffered channel. However <-tail.Dying() would return in this case so the line was skipped. This means that the caller did not get all lines until EOF. Now we still want to skip in case any other reason for kill was given therefore add special logic to only not read the Dying channel on the EOF case. The one downside is that StopAtEOF() could block forever if the caller never reads new Lines but this seems logical to me. If the caller wants to wait for EOF but never reads remaining Lines this would be a bug on their end. Co-authored-by: Paul Holzinger <pholzing@redhat.com>
1 parent 258035f commit 23d265a

File tree

2 files changed

+74
-14
lines changed

2 files changed

+74
-14
lines changed

sdk/go/common/tail/tail.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ func (tail *Tail) tailFileSync() {
291291

292292
tail.openReader()
293293

294+
stopOnNextEOF := false
294295
// Read line by line.
295296
for {
296297
// do not seek in named pipes
@@ -339,11 +340,24 @@ func (tail *Tail) tailFileSync() {
339340
}
340341
}
341342

343+
if stopOnNextEOF {
344+
return
345+
}
346+
342347
// When EOF is reached, wait for more data to become
343348
// available. Wait strategy is based on the `tail.watcher`
344349
// implementation (inotify or polling).
345350
err := tail.waitForChanges()
346351
if err != nil {
352+
// When StopAtEOF() is called, we
353+
// might have more data to read, that
354+
// the filewatcher might not have
355+
// notified us about. Continue
356+
// reading until we found an EOF
357+
// again, and then exit.
358+
if err == ErrStop && tail.Err() == errStopAtEOF {
359+
stopOnNextEOF = true
360+
}
347361
if err != ErrStop {
348362
tail.Kill(err)
349363
}
@@ -448,12 +462,24 @@ func (tail *Tail) sendLine(line string) bool {
448462
lines = util.PartitionString(line, tail.MaxLineSize)
449463
}
450464

465+
// This is a bit weird here, when a users requests stopAtEof we
466+
// must keep sending all lines however <-tail.Dying() will return
467+
// immediately at this point so the select below may not have
468+
// chance to send the line if the reader side has is not yet ready.
469+
// But if StopAtEOF was not set and it is a "normal" Kill then we
470+
// should exit right away still thus the special logic here.
471+
earlyExitChan := tail.Dying()
472+
if tail.Err() == errStopAtEOF {
473+
// Note that receive from a nil channel blocks forever so
474+
// below we know it can only take the tail.Lines case.
475+
earlyExitChan = nil
476+
}
451477
for _, line := range lines {
452478
tail.lineNum++
453479
offset, _ := tail.Tell()
454480
select {
455481
case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
456-
case <-tail.Dying():
482+
case <-earlyExitChan:
457483
return true
458484
}
459485
}

sdk/go/common/tail/tail_test.go

+47-13
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestWaitsForFileToExistRelativePath(t *testing.T) {
9696
go tailTest.VerifyTailOutput(tail, []string{"hello", "world"}, false)
9797

9898
<-time.After(100 * time.Millisecond)
99-
if err := ioutil.WriteFile("test.txt", []byte("hello\nworld\n"), 0600); err != nil {
99+
if err := ioutil.WriteFile("test.txt", []byte("hello\nworld\n"), 0o600); err != nil {
100100
tailTest.Fatal(err)
101101
}
102102
tailTest.Cleanup(tail, true)
@@ -313,7 +313,8 @@ func TestReSeekWithCursor(t *testing.T) {
313313
Config{Follow: true, ReOpen: false, Poll: false})
314314

315315
go tailTest.VerifyTailOutputUsingCursor(tail, []string{
316-
"a really long string goes here", "hello", "world", "but", "not", "me"}, false)
316+
"a really long string goes here", "hello", "world", "but", "not", "me",
317+
}, false)
317318

318319
// truncate now
319320
<-time.After(100 * time.Millisecond)
@@ -336,7 +337,8 @@ func TestRateLimiting(t *testing.T) {
336337
tailTest.CreateFile("test.txt", "hello\nworld\nagain\nextra\n")
337338
config := Config{
338339
Follow: true,
339-
RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second)}
340+
RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second),
341+
}
340342
leakybucketFull := "Too much log activity; waiting a second before resuming tailing"
341343
tail := tailTest.StartTail("test.txt", config)
342344

@@ -345,7 +347,8 @@ func TestRateLimiting(t *testing.T) {
345347
"hello", "world", "again",
346348
leakybucketFull,
347349
"more", "data",
348-
leakybucketFull}, false)
350+
leakybucketFull,
351+
}, false)
349352

350353
// Add more data only after reasonable delay.
351354
<-time.After(1200 * time.Millisecond)
@@ -365,7 +368,8 @@ func TestTell(t *testing.T) {
365368
tailTest.CreateFile("test.txt", "hello\nworld\nagain\nmore\n")
366369
config := Config{
367370
Follow: false,
368-
Location: &SeekInfo{0, io.SeekStart}}
371+
Location: &SeekInfo{0, io.SeekStart},
372+
}
369373
tail := tailTest.StartTail("test.txt", config)
370374
// read one line
371375
line := <-tail.Lines
@@ -380,7 +384,8 @@ func TestTell(t *testing.T) {
380384

381385
config = Config{
382386
Follow: false,
383-
Location: &SeekInfo{offset, io.SeekStart}}
387+
Location: &SeekInfo{offset, io.SeekStart},
388+
}
384389
tail = tailTest.StartTail("test.txt", config)
385390
for l := range tail.Lines {
386391
// it may readed one line in the chan(tail.Lines),
@@ -422,6 +427,26 @@ func TestBlockUntilExists(t *testing.T) {
422427
tail.Cleanup()
423428
}
424429

430+
func TestFollowUntilEof(t *testing.T) {
431+
tailTest, cleanup := NewTailTest("incomplete-lines-no-follow", t)
432+
defer cleanup()
433+
filename := "test.txt"
434+
config := Config{
435+
Follow: false,
436+
}
437+
tailTest.CreateFile(filename, "hello\nworld\n")
438+
tail := tailTest.StartTail(filename, config)
439+
440+
// StopAtEOF blocks until the read is done and in order to do so
441+
// we have to drain the lines channel first which ReadLinesWithError does.
442+
go tail.StopAtEOF()
443+
tailTest.ReadLinesWithError(tail, []string{"hello", "world"}, false, errStopAtEOF)
444+
445+
tailTest.RemoveFile(filename)
446+
tail.Stop()
447+
tail.Cleanup()
448+
}
449+
425450
func maxLineSize(t *testing.T, follow bool, fileContent string, expected []string) {
426451
tailTest, cleanup := NewTailTest("maxlinesize", t)
427452
defer cleanup()
@@ -637,7 +662,8 @@ func reSeek(t *testing.T, poll bool) {
637662
Config{Follow: true, ReOpen: false, Poll: poll})
638663

639664
go tailTest.VerifyTailOutput(tail, []string{
640-
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"}, false)
665+
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld",
666+
}, false)
641667

642668
// truncate now
643669
<-time.After(100 * time.Millisecond)
@@ -677,14 +703,14 @@ func NewTailTest(name string, t *testing.T) (TailTest, func()) {
677703
}
678704

679705
func (t TailTest) CreateFile(name string, contents string) {
680-
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600)
706+
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0o600)
681707
if err != nil {
682708
t.Fatal(err)
683709
}
684710
}
685711

686712
func (t TailTest) AppendToFile(name string, contents string) {
687-
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600|os.ModeAppend)
713+
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0o600|os.ModeAppend)
688714
if err != nil {
689715
t.Fatal(err)
690716
}
@@ -707,7 +733,7 @@ func (t TailTest) RenameFile(oldname string, newname string) {
707733
}
708734

709735
func (t TailTest) AppendFile(name string, contents string) {
710-
f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0600)
736+
f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0o600)
711737
if err != nil {
712738
t.Fatal(err)
713739
}
@@ -719,7 +745,7 @@ func (t TailTest) AppendFile(name string, contents string) {
719745
}
720746

721747
func (t TailTest) TruncateFile(name string, contents string) {
722-
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600)
748+
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0o600)
723749
if err != nil {
724750
t.Fatal(err)
725751
}
@@ -765,6 +791,14 @@ func (t TailTest) VerifyTailOutputUsingCursor(tail *Tail, lines []string, expect
765791
}
766792

767793
func (t TailTest) ReadLines(tail *Tail, lines []string, useCursor bool) {
794+
t.readLines(tail, lines, useCursor, nil)
795+
}
796+
797+
func (t TailTest) ReadLinesWithError(tail *Tail, lines []string, useCursor bool, err error) {
798+
t.readLines(tail, lines, useCursor, err)
799+
}
800+
801+
func (t TailTest) readLines(tail *Tail, lines []string, useCursor bool, expectedErr error) {
768802
cursor := 1
769803

770804
for _, line := range lines {
@@ -773,8 +807,8 @@ func (t TailTest) ReadLines(tail *Tail, lines []string, useCursor bool) {
773807
if !ok {
774808
// tail.Lines is closed and empty.
775809
err := tail.Err()
776-
if err != nil {
777-
t.Fatalf("tail ended with error: %v", err)
810+
if err != expectedErr {
811+
t.Fatalf("tail ended with unexpected error: %v", err)
778812
}
779813
t.Fatalf("tail ended early; expecting more: %v", lines[cursor:])
780814
}

0 commit comments

Comments
 (0)