123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package loggerutils
- import (
- "bufio"
- "context"
- "io"
- "io/ioutil"
- "os"
- "strings"
- "testing"
- "time"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/tailfile"
- "gotest.tools/v3/assert"
- )
- func TestTailFiles(t *testing.T) {
- s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
- s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
- s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
- files := []SizeReaderAt{s1, s2, s3}
- watcher := logger.NewLogWatcher()
- createDecoder := func(r io.Reader) func() (*logger.Message, error) {
- scanner := bufio.NewScanner(r)
- return func() (*logger.Message, error) {
- if !scanner.Scan() {
- return nil, scanner.Err()
- }
- // some comment
- return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
- }
- }
- tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, lines)
- }
- for desc, config := range map[string]logger.ReadConfig{} {
- t.Run(desc, func(t *testing.T) {
- started := make(chan struct{})
- go func() {
- close(started)
- tailFiles(files, watcher, createDecoder, tailReader, config)
- }()
- <-started
- })
- }
- config := logger.ReadConfig{Tail: 2}
- started := make(chan struct{})
- go func() {
- close(started)
- tailFiles(files, watcher, createDecoder, tailReader, config)
- }()
- <-started
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
- }
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
- }
- }
- func TestFollowLogsConsumerGone(t *testing.T) {
- lw := logger.NewLogWatcher()
- f, err := ioutil.TempFile("", t.Name())
- assert.NilError(t, err)
- defer func() {
- f.Close()
- os.Remove(f.Name())
- }()
- makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
- return func() (*logger.Message, error) {
- return &logger.Message{}, nil
- }
- }
- followLogsDone := make(chan struct{})
- var since, until time.Time
- go func() {
- followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
- close(followLogsDone)
- }()
- select {
- case <-lw.Msg:
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-followLogsDone:
- t.Fatal("follow logs finished unexpectedly")
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for log message")
- }
- lw.ConsumerGone()
- select {
- case <-followLogsDone:
- case <-time.After(20 * time.Second):
- t.Fatal("timeout waiting for followLogs() to finish")
- }
- }
- func TestFollowLogsProducerGone(t *testing.T) {
- lw := logger.NewLogWatcher()
- f, err := ioutil.TempFile("", t.Name())
- assert.NilError(t, err)
- defer os.Remove(f.Name())
- var sent, received, closed int
- makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
- return func() (*logger.Message, error) {
- if closed == 1 {
- closed++
- t.Logf("logDecode() closed after sending %d messages\n", sent)
- return nil, io.EOF
- } else if closed > 1 {
- t.Fatal("logDecode() called after closing!")
- return nil, io.EOF
- }
- sent++
- return &logger.Message{}, nil
- }
- }
- var since, until time.Time
- followLogsDone := make(chan struct{})
- go func() {
- followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
- close(followLogsDone)
- }()
- // read 1 message
- select {
- case <-lw.Msg:
- received++
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-followLogsDone:
- t.Fatal("followLogs() finished unexpectedly")
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for log message")
- }
- // "stop" the "container"
- closed = 1
- lw.ProducerGone()
- // should receive all the messages sent
- readDone := make(chan struct{})
- go func() {
- defer close(readDone)
- for {
- select {
- case <-lw.Msg:
- received++
- if received == sent {
- return
- }
- case err := <-lw.Err:
- assert.NilError(t, err)
- }
- }
- }()
- select {
- case <-readDone:
- case <-time.After(30 * time.Second):
- t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
- }
- t.Logf("messages sent: %d, received: %d", sent, received)
- // followLogs() should be done by now
- select {
- case <-followLogsDone:
- case <-time.After(30 * time.Second):
- t.Fatal("timeout waiting for followLogs() to finish")
- }
- select {
- case <-lw.WatchConsumerGone():
- t.Fatal("consumer should not have exited")
- default:
- }
- }
|