|
@@ -2,6 +2,7 @@ package logger
|
|
|
|
|
|
import (
|
|
import (
|
|
"bufio"
|
|
"bufio"
|
|
|
|
+ "bytes"
|
|
"io"
|
|
"io"
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
@@ -40,14 +41,27 @@ func (c *Copier) Run() {
|
|
|
|
|
|
func (c *Copier) copySrc(name string, src io.Reader) {
|
|
func (c *Copier) copySrc(name string, src io.Reader) {
|
|
defer c.copyJobs.Done()
|
|
defer c.copyJobs.Done()
|
|
- scanner := bufio.NewScanner(src)
|
|
|
|
- for scanner.Scan() {
|
|
|
|
- if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil {
|
|
|
|
- logrus.Errorf("Failed to log msg %q for logger %s: %s", scanner.Bytes(), c.dst.Name(), err)
|
|
|
|
|
|
+ reader := bufio.NewReader(src)
|
|
|
|
+
|
|
|
|
+ for {
|
|
|
|
+ line, err := reader.ReadBytes('\n')
|
|
|
|
+ line = bytes.TrimSuffix(line, []byte{'\n'})
|
|
|
|
+
|
|
|
|
+ // ReadBytes can return full or partial output even when it failed.
|
|
|
|
+ // e.g. it can return a full entry and EOF.
|
|
|
|
+ if err == nil || len(line) > 0 {
|
|
|
|
+ if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
|
|
|
|
+ logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- if err := scanner.Err(); err != nil {
|
|
|
|
- logrus.Errorf("Error scanning log stream: %s", err)
|
|
|
|
|
|
+
|
|
|
|
+ if err != nil {
|
|
|
|
+ if err != io.EOF {
|
|
|
|
+ logrus.Errorf("Error scanning log stream: %s", err)
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|