|
@@ -64,6 +64,7 @@ type ContainerConfig struct {
|
|
|
t *tomb.Tomb
|
|
|
logger *log.Entry
|
|
|
Labels map[string]string
|
|
|
+ Tty bool
|
|
|
}
|
|
|
|
|
|
func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
|
|
@@ -284,9 +285,14 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
return err
|
|
|
}
|
|
|
// we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
|
|
|
- reader := dlog.NewReader(dockerReader)
|
|
|
foundOne = true
|
|
|
- scanner := bufio.NewScanner(reader)
|
|
|
+ var scanner *bufio.Scanner
|
|
|
+ if containerConfig.Tty {
|
|
|
+ scanner = bufio.NewScanner(dockerReader)
|
|
|
+ } else {
|
|
|
+ reader := dlog.NewReader(dockerReader)
|
|
|
+ scanner = bufio.NewScanner(reader)
|
|
|
+ }
|
|
|
for scanner.Scan() {
|
|
|
line := scanner.Text()
|
|
|
if line == "" {
|
|
@@ -304,6 +310,10 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
out <- evt
|
|
|
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
|
|
|
}
|
|
|
+ err = scanner.Err()
|
|
|
+ if err != nil {
|
|
|
+ d.logger.Errorf("Got error from docker read: %s", err)
|
|
|
+ }
|
|
|
d.runningContainerState[container.ID] = containerConfig
|
|
|
}
|
|
|
}
|
|
@@ -333,10 +343,18 @@ func (d *DockerSource) CanRun() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (d *DockerSource) getContainerTTY(containerId string) bool {
|
|
|
+ containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
|
|
|
+ if err != nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return containerDetails.Config.Tty
|
|
|
+}
|
|
|
+
|
|
|
func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) {
|
|
|
for _, containerID := range d.Config.ContainerID {
|
|
|
if containerID == container.ID {
|
|
|
- return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
|
|
|
+ return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -346,7 +364,7 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*Containe
|
|
|
name = name[1:]
|
|
|
}
|
|
|
if name == containerName {
|
|
|
- return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
|
|
|
+ return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -354,14 +372,14 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*Containe
|
|
|
|
|
|
for _, cont := range d.compiledContainerID {
|
|
|
if matched := cont.Match([]byte(container.ID)); matched {
|
|
|
- return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
|
|
|
+ return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for _, cont := range d.compiledContainerName {
|
|
|
for _, name := range container.Names {
|
|
|
if matched := cont.Match([]byte(name)); matched {
|
|
|
- return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
|
|
|
+ return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -454,9 +472,15 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types
|
|
|
container.logger.Errorf("unable to read logs from container: %+v", err)
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
+ var scanner *bufio.Scanner
|
|
|
// we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
|
|
|
- reader := dlog.NewReader(dockerReader)
|
|
|
- scanner := bufio.NewScanner(reader)
|
|
|
+ if container.Tty {
|
|
|
+ scanner = bufio.NewScanner(dockerReader)
|
|
|
+ } else {
|
|
|
+ reader := dlog.NewReader(dockerReader)
|
|
|
+ scanner = bufio.NewScanner(reader)
|
|
|
+ }
|
|
|
readerChan := make(chan string)
|
|
|
readerTomb := &tomb.Tomb{}
|
|
|
readerTomb.Go(func() error {
|