|
@@ -4,13 +4,11 @@ import (
|
|
"bufio"
|
|
"bufio"
|
|
"bytes"
|
|
"bytes"
|
|
"context"
|
|
"context"
|
|
- "errors"
|
|
|
|
"fmt"
|
|
"fmt"
|
|
"io"
|
|
"io"
|
|
"os"
|
|
"os"
|
|
"path/filepath"
|
|
"path/filepath"
|
|
"strings"
|
|
"strings"
|
|
- "sync"
|
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
"testing"
|
|
"testing"
|
|
"text/tabwriter"
|
|
"text/tabwriter"
|
|
@@ -24,8 +22,9 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
type testDecoder struct {
|
|
type testDecoder struct {
|
|
- rdr io.Reader
|
|
|
|
- scanner *bufio.Scanner
|
|
|
|
|
|
+ rdr io.Reader
|
|
|
|
+ scanner *bufio.Scanner
|
|
|
|
+ resetCount int
|
|
}
|
|
}
|
|
|
|
|
|
func (d *testDecoder) Decode() (*logger.Message, error) {
|
|
func (d *testDecoder) Decode() (*logger.Message, error) {
|
|
@@ -42,6 +41,7 @@ func (d *testDecoder) Decode() (*logger.Message, error) {
|
|
func (d *testDecoder) Reset(rdr io.Reader) {
|
|
func (d *testDecoder) Reset(rdr io.Reader) {
|
|
d.rdr = rdr
|
|
d.rdr = rdr
|
|
d.scanner = bufio.NewScanner(rdr)
|
|
d.scanner = bufio.NewScanner(rdr)
|
|
|
|
+ d.resetCount++
|
|
}
|
|
}
|
|
|
|
|
|
func (d *testDecoder) Close() {
|
|
func (d *testDecoder) Close() {
|
|
@@ -246,77 +246,6 @@ func TestFollowLogsProducerGone(t *testing.T) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-type lineDecoder struct {
|
|
|
|
- r *bufio.Reader
|
|
|
|
- resetCount int
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (d *lineDecoder) Decode() (*logger.Message, error) {
|
|
|
|
- line, err := d.r.ReadString('\n')
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- m := logger.NewMessage()
|
|
|
|
- m.Line = []byte(line)
|
|
|
|
- return m, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (d *lineDecoder) Reset(r io.Reader) {
|
|
|
|
- d.r = bufio.NewReader(r)
|
|
|
|
- d.resetCount++
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (d *lineDecoder) Close() {
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func TestFollowLogsHandleDecodeErr(t *testing.T) {
|
|
|
|
- lw := logger.NewLogWatcher()
|
|
|
|
- defer lw.ConsumerGone()
|
|
|
|
-
|
|
|
|
- fw, err := os.CreateTemp("", t.Name())
|
|
|
|
- assert.NilError(t, err)
|
|
|
|
- defer os.Remove(fw.Name())
|
|
|
|
-
|
|
|
|
- fr, err := os.Open(fw.Name())
|
|
|
|
- assert.NilError(t, err)
|
|
|
|
-
|
|
|
|
- dec := &lineDecoder{}
|
|
|
|
- dec.Reset(fr)
|
|
|
|
-
|
|
|
|
- var since, until time.Time
|
|
|
|
- rotate := make(chan interface{})
|
|
|
|
- evict := make(chan interface{})
|
|
|
|
-
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
- wg.Add(1)
|
|
|
|
- go func() {
|
|
|
|
- defer wg.Done()
|
|
|
|
- followLogs(fr, lw, rotate, evict, dec, since, until)
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- sendReceive := func(f io.Writer, message string) {
|
|
|
|
- _, err = f.Write([]byte(message))
|
|
|
|
- assert.NilError(t, err)
|
|
|
|
- m := <-lw.Msg
|
|
|
|
- assert.Equal(t, message, string(m.Line))
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- sendReceive(fw, "log1\n")
|
|
|
|
- sendReceive(fw, "log2\n")
|
|
|
|
-
|
|
|
|
- ft, err := os.OpenFile(fw.Name(), os.O_WRONLY|os.O_TRUNC, 0600)
|
|
|
|
- assert.NilError(t, err)
|
|
|
|
-
|
|
|
|
- sendReceive(ft, "log3\n")
|
|
|
|
-
|
|
|
|
- evict <- errors.New("stop followLogs")
|
|
|
|
- wg.Wait()
|
|
|
|
-
|
|
|
|
- // followLogs calls Reset() in the beginning,
|
|
|
|
- // each 3 writes result Reset(), then handleDecodeErr() calles Reset().
|
|
|
|
- assert.Equal(t, 5, dec.resetCount)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func TestCheckCapacityAndRotate(t *testing.T) {
|
|
func TestCheckCapacityAndRotate(t *testing.T) {
|
|
dir, err := os.MkdirTemp("", t.Name())
|
|
dir, err := os.MkdirTemp("", t.Name())
|
|
assert.NilError(t, err)
|
|
assert.NilError(t, err)
|