Browse Source

Add func to get an io.Reader for tail operations

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 7 years ago
parent
commit
874867d4e5
2 changed files with 378 additions and 43 deletions
  1. 192 36
      pkg/tailfile/tailfile.go
  2. 186 7
      pkg/tailfile/tailfile_test.go

+ 192 - 36
pkg/tailfile/tailfile.go

@@ -3,7 +3,9 @@
 package tailfile // import "github.com/docker/docker/pkg/tailfile"
 
 import (
+	"bufio"
 	"bytes"
+	"context"
 	"errors"
 	"io"
 	"os"
@@ -16,51 +18,205 @@ var eol = []byte("\n")
 // ErrNonPositiveLinesNumber is an error returned if the lines number was negative.
 var ErrNonPositiveLinesNumber = errors.New("The number of lines to extract from the file must be positive")
 
-//TailFile returns last n lines of reader f (could be a nil).
-func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
-	if n <= 0 {
-		return nil, ErrNonPositiveLinesNumber
+//TailFile returns last n lines of the passed in file.
+func TailFile(f *os.File, n int) ([][]byte, error) {
+	size, err := f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return nil, err
 	}
-	size, err := f.Seek(0, os.SEEK_END)
+
+	rAt := io.NewSectionReader(f, 0, size)
+	r, nLines, err := NewTailReader(context.Background(), rAt, n)
 	if err != nil {
 		return nil, err
 	}
-	block := -1
-	var data []byte
-	var cnt int
+
+	buf := make([][]byte, 0, nLines)
+	scanner := bufio.NewScanner(r)
+
+	for scanner.Scan() {
+		buf = append(buf, scanner.Bytes())
+	}
+	return buf, nil
+}
+
+// SizeReaderAt is an interface used to get a ReaderAt as well as the size of the underlying reader.
+// Note that the size of the underlying reader should not change when using this interface.
+type SizeReaderAt interface {
+	io.ReaderAt
+	Size() int64
+}
+
+// NewTailReader scopes the passed in reader to just the last N lines passed in
+func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) {
+	return NewTailReaderWithDelimiter(ctx, r, reqLines, eol)
+}
+
+// NewTailReaderWithDelimiter scopes the passed in reader to just the last N lines passed in
+// In this case a "line" is defined by the passed in delimiter.
+//
+// Delimiter lengths should be generally small, no more than 12 bytes
+func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) {
+	if reqLines < 1 {
+		return nil, 0, ErrNonPositiveLinesNumber
+	}
+	if len(delimiter) == 0 {
+		return nil, 0, errors.New("must provide a delimiter")
+	}
+	var (
+		size      = r.Size()
+		tailStart int64
+		tailEnd   = size
+		found     int
+	)
+
+	if int64(len(delimiter)) >= size {
+		return bytes.NewReader(nil), 0, nil
+	}
+
+	scanner := newScanner(r, delimiter)
+	for scanner.Scan(ctx) {
+		if err := scanner.Err(); err != nil {
+			return nil, 0, scanner.Err()
+		}
+
+		found++
+		if found == 1 {
+			tailEnd = scanner.End()
+		}
+		if found == reqLines {
+			break
+		}
+	}
+
+	tailStart = scanner.Start(ctx)
+
+	if found == 0 {
+		return bytes.NewReader(nil), 0, nil
+	}
+
+	if found < reqLines && tailStart != 0 {
+		tailStart = 0
+	}
+	return io.NewSectionReader(r, tailStart, tailEnd-tailStart), found, nil
+}
+
+func newScanner(r SizeReaderAt, delim []byte) *scanner {
+	size := r.Size()
+	readSize := blockSize
+	if readSize > int(size) {
+		readSize = int(size)
+	}
+	// silly case...
+	if len(delim) >= readSize/2 {
+		readSize = len(delim)*2 + 2
+	}
+
+	return &scanner{
+		r:     r,
+		pos:   size,
+		buf:   make([]byte, readSize),
+		delim: delim,
+	}
+}
+
+type scanner struct {
+	r     SizeReaderAt
+	pos   int64
+	buf   []byte
+	delim []byte
+	err   error
+	idx   int
+	done  bool
+}
+
+func (s *scanner) Start(ctx context.Context) int64 {
+	if s.idx > 0 {
+		idx := bytes.LastIndex(s.buf[:s.idx], s.delim)
+		if idx >= 0 {
+			return s.pos + int64(idx) + int64(len(s.delim))
+		}
+	}
+
+	// slow path
+	buf := make([]byte, len(s.buf))
+	copy(buf, s.buf)
+
+	readAhead := &scanner{
+		r:     s.r,
+		pos:   s.pos,
+		delim: s.delim,
+		idx:   s.idx,
+		buf:   buf,
+	}
+
+	if !readAhead.Scan(ctx) {
+		return 0
+	}
+	return readAhead.End()
+}
+
+func (s *scanner) End() int64 {
+	return s.pos + int64(s.idx) + int64(len(s.delim))
+}
+
+func (s *scanner) Err() error {
+	return s.err
+}
+
+func (s *scanner) Scan(ctx context.Context) bool {
+	if s.err != nil {
+		return false
+	}
+
 	for {
-		var b []byte
-		step := int64(block * blockSize)
-		left := size + step // how many bytes to beginning
-		if left < 0 {
-			if _, err := f.Seek(0, os.SEEK_SET); err != nil {
-				return nil, err
-			}
-			b = make([]byte, blockSize+left)
-			if _, err := f.Read(b); err != nil {
-				return nil, err
+		select {
+		case <-ctx.Done():
+			s.err = ctx.Err()
+			return false
+		default:
+		}
+
+		idx := s.idx - len(s.delim)
+		if idx < 0 {
+			readSize := int(s.pos)
+			if readSize > len(s.buf) {
+				readSize = len(s.buf)
 			}
-			data = append(b, data...)
-			break
-		} else {
-			b = make([]byte, blockSize)
-			if _, err := f.Seek(left, os.SEEK_SET); err != nil {
-				return nil, err
+
+			if readSize < len(s.delim) {
+				return false
 			}
-			if _, err := f.Read(b); err != nil {
-				return nil, err
+
+			offset := s.pos - int64(readSize)
+			n, err := s.r.ReadAt(s.buf[:readSize], offset)
+			if err != nil && err != io.EOF {
+				s.err = err
+				return false
 			}
-			data = append(b, data...)
+
+			s.pos -= int64(n)
+			idx = n
 		}
-		cnt += bytes.Count(b, eol)
-		if cnt > n {
-			break
+
+		s.idx = bytes.LastIndex(s.buf[:idx], s.delim)
+		if s.idx >= 0 {
+			return true
 		}
-		block--
-	}
-	lines := bytes.Split(data, eol)
-	if n < len(lines) {
-		return lines[len(lines)-n-1 : len(lines)-1], nil
+
+		if len(s.delim) > 1 && s.pos > 0 {
+			// in this case, there may be a partial delimiter at the front of the buffer, so set the position forward
+			// up to the maximum size partial that could be there so it can be read again in the next iteration with any
+			// potential remainder.
+			// An example where delimiter is `####`:
+			// [##asdfqwerty]
+			//    ^
+			// This resets the position to where the arrow is pointing.
+			// It could actually check if a partial exists and at the front, but that is pretty similar to the indexing
+			// code above though a bit more complex since each byte has to be checked (`len(delimiter)-1`) factorial).
+			// It's much simpler and cleaner to just re-read `len(delimiter)-1` bytes again.
+			s.pos += int64(len(s.delim)) - 1
+		}
+
 	}
-	return lines[:len(lines)-1], nil
 }

+ 186 - 7
pkg/tailfile/tailfile_test.go

@@ -1,9 +1,17 @@
 package tailfile // import "github.com/docker/docker/pkg/tailfile"
 
 import (
+	"bufio"
+	"bytes"
+	"context"
+	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
+	"strings"
 	"testing"
+
+	"gotest.tools/assert"
 )
 
 func TestTailFile(t *testing.T) {
@@ -42,7 +50,7 @@ truncated line`)
 	if _, err := f.Write(testFile); err != nil {
 		t.Fatal(err)
 	}
-	if _, err := f.Seek(0, os.SEEK_SET); err != nil {
+	if _, err := f.Seek(0, io.SeekStart); err != nil {
 		t.Fatal(err)
 	}
 	expected := []string{"last fourth line", "last fifth line"}
@@ -50,10 +58,12 @@ truncated line`)
 	if err != nil {
 		t.Fatal(err)
 	}
+	if len(res) != len(expected) {
+		t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res)
+	}
 	for i, l := range res {
-		t.Logf("%s", l)
 		if expected[i] != string(l) {
-			t.Fatalf("Expected line %s, got %s", expected[i], l)
+			t.Fatalf("Expected line %q, got %q", expected[i], l)
 		}
 	}
 }
@@ -71,7 +81,7 @@ truncated line`)
 	if _, err := f.Write(testFile); err != nil {
 		t.Fatal(err)
 	}
-	if _, err := f.Seek(0, os.SEEK_SET); err != nil {
+	if _, err := f.Seek(0, io.SeekStart); err != nil {
 		t.Fatal(err)
 	}
 	expected := []string{"first line", "second line"}
@@ -79,8 +89,10 @@ truncated line`)
 	if err != nil {
 		t.Fatal(err)
 	}
+	if len(expected) != len(res) {
+		t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res)
+	}
 	for i, l := range res {
-		t.Logf("%s", l)
 		if expected[i] != string(l) {
 			t.Fatalf("Expected line %s, got %s", expected[i], l)
 		}
@@ -116,11 +128,11 @@ truncated line`)
 	if _, err := f.Write(testFile); err != nil {
 		t.Fatal(err)
 	}
-	if _, err := f.Seek(0, os.SEEK_SET); err != nil {
+	if _, err := f.Seek(0, io.SeekStart); err != nil {
 		t.Fatal(err)
 	}
 	if _, err := TailFile(f, -1); err != ErrNonPositiveLinesNumber {
-		t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err)
+		t.Fatalf("Expected ErrNonPositiveLinesNumber, got %v", err)
 	}
 	if _, err := TailFile(f, 0); err != ErrNonPositiveLinesNumber {
 		t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err)
@@ -146,3 +158,170 @@ func BenchmarkTail(b *testing.B) {
 		}
 	}
 }
+
+func TestNewTailReader(t *testing.T) {
+	t.Parallel()
+	ctx := context.Background()
+
+	for dName, delim := range map[string][]byte{
+		"no delimiter":          {},
+		"single byte delimiter": {'\n'},
+		"2 byte delimiter":      []byte(";\n"),
+		"4 byte delimiter":      []byte("####"),
+		"8 byte delimiter":      []byte("########"),
+		"12 byte delimiter":     []byte("############"),
+	} {
+		t.Run(dName, func(t *testing.T) {
+			delim := delim
+			t.Parallel()
+
+			s1 := "Hello world."
+			s2 := "Today is a fine day."
+			s3 := "So long, and thanks for all the fish!"
+			s4 := strings.Repeat("a", blockSize/2) // same as block size
+			s5 := strings.Repeat("a", blockSize)   // just to make sure
+			s6 := strings.Repeat("a", blockSize*2) // bigger than block size
+			s7 := strings.Repeat("a", blockSize-1) // single line same as block
+
+			s8 := `{"log":"Don't panic!\n","stream":"stdout","time":"2018-04-04T20:28:44.7207062Z"}`
+			jsonTest := make([]string, 0, 20)
+			for i := 0; i < 20; i++ {
+				jsonTest = append(jsonTest, s8)
+			}
+
+			for _, test := range []struct {
+				desc string
+				data []string
+			}{
+				{desc: "one small entry", data: []string{s1}},
+				{desc: "several small entries", data: []string{s1, s2, s3}},
+				{desc: "various sizes", data: []string{s1, s2, s3, s4, s5, s1, s2, s3, s7, s6}},
+				{desc: "multiple lines with one more than block", data: []string{s5, s5, s5, s5, s5}},
+				{desc: "multiple lines much bigger than block", data: []string{s6, s6, s6, s6, s6}},
+				{desc: "multiple lines same as block", data: []string{s4, s4, s4, s4, s4}},
+				{desc: "single line same as block", data: []string{s7}},
+				{desc: "single line half block", data: []string{s4}},
+				{desc: "single line twice block", data: []string{s6}},
+				{desc: "json encoded values", data: jsonTest},
+				{desc: "no lines", data: []string{}},
+				{desc: "same length as delimiter", data: []string{strings.Repeat("a", len(delim))}},
+			} {
+				t.Run(test.desc, func(t *testing.T) {
+					test := test
+					t.Parallel()
+
+					max := len(test.data)
+					if max > 10 {
+						max = 10
+					}
+
+					s := strings.Join(test.data, string(delim))
+					if len(test.data) > 0 {
+						s += string(delim)
+					}
+
+					for i := 1; i <= max; i++ {
+						t.Run(fmt.Sprintf("%d lines", i), func(t *testing.T) {
+							i := i
+							t.Parallel()
+
+							r := strings.NewReader(s)
+							tr, lines, err := NewTailReaderWithDelimiter(ctx, r, i, delim)
+							if len(delim) == 0 {
+								assert.Assert(t, err != nil)
+								assert.Assert(t, lines == 0)
+								return
+							}
+							assert.Assert(t, err)
+							assert.Check(t, lines == i, "%d -- %d", lines, i)
+
+							b, err := ioutil.ReadAll(tr)
+							assert.Assert(t, err)
+
+							expectLines := test.data[len(test.data)-i:]
+							assert.Check(t, len(expectLines) == i)
+							expect := strings.Join(expectLines, string(delim)) + string(delim)
+							assert.Check(t, string(b) == expect, "\n%v\n%v", b, []byte(expect))
+						})
+					}
+
+					t.Run("request more lines than available", func(t *testing.T) {
+						t.Parallel()
+
+						r := strings.NewReader(s)
+						tr, lines, err := NewTailReaderWithDelimiter(ctx, r, len(test.data)*2, delim)
+						if len(delim) == 0 {
+							assert.Assert(t, err != nil)
+							assert.Assert(t, lines == 0)
+							return
+						}
+						if len(test.data) == 0 {
+							assert.Assert(t, err == ErrNonPositiveLinesNumber, err)
+							return
+						}
+
+						assert.Assert(t, err)
+						assert.Check(t, lines == len(test.data), "%d -- %d", lines, len(test.data))
+						b, err := ioutil.ReadAll(tr)
+						assert.Assert(t, err)
+						assert.Check(t, bytes.Equal(b, []byte(s)), "\n%v\n%v", b, []byte(s))
+					})
+				})
+			}
+		})
+	}
+	t.Run("truncated last line", func(t *testing.T) {
+		t.Run("more than available", func(t *testing.T) {
+			tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 3)
+			assert.Assert(t, err)
+			assert.Check(t, nLines == 2, nLines)
+
+			rdr := bufio.NewReader(tail)
+			data, _, err := rdr.ReadLine()
+			assert.Assert(t, err)
+			assert.Check(t, string(data) == "a", string(data))
+
+			data, _, err = rdr.ReadLine()
+			assert.Assert(t, err)
+			assert.Check(t, string(data) == "b", string(data))
+
+			_, _, err = rdr.ReadLine()
+			assert.Assert(t, err == io.EOF, err)
+		})
+	})
+	t.Run("truncated last line", func(t *testing.T) {
+		t.Run("exact", func(t *testing.T) {
+			tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 2)
+			assert.Assert(t, err)
+			assert.Check(t, nLines == 2, nLines)
+
+			rdr := bufio.NewReader(tail)
+			data, _, err := rdr.ReadLine()
+			assert.Assert(t, err)
+			assert.Check(t, string(data) == "a", string(data))
+
+			data, _, err = rdr.ReadLine()
+			assert.Assert(t, err)
+			assert.Check(t, string(data) == "b", string(data))
+
+			_, _, err = rdr.ReadLine()
+			assert.Assert(t, err == io.EOF, err)
+		})
+	})
+
+	t.Run("truncated last line", func(t *testing.T) {
+		t.Run("one line", func(t *testing.T) {
+			tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 1)
+			assert.Assert(t, err)
+			assert.Check(t, nLines == 1, nLines)
+
+			rdr := bufio.NewReader(tail)
+			data, _, err := rdr.ReadLine()
+			assert.Assert(t, err)
+			assert.Check(t, string(data) == "b", string(data))
+
+			_, _, err = rdr.ReadLine()
+			assert.Assert(t, err == io.EOF, err)
+		})
+	})
+}