0e5eaf8ee3
Before this change, volume management was relying on the fact that everything the plugin mounts is visible on the host within the plugin's rootfs. In practice this caused some issues with mount leaks, so we changed the behavior such that mounts are not visible on the plugin's rootfs, but available outside of it, which breaks volume management. To fix the issue, allow the plugin to scope the path correctly rather than assuming that everything is visible in `p.Rootfs`. In practice this is just scoping the `PropagatedMount` paths to the correct host path. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
139 lines
2.9 KiB
Go
139 lines
2.9 KiB
Go
package logger
|
|
|
|
import (
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types/plugins/logdriver"
|
|
"github.com/docker/docker/pkg/plugingetter"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// pluginAdapter takes a plugin and implements the Logger interface for logger
|
|
// instances
|
|
type pluginAdapter struct {
|
|
driverName string
|
|
id string
|
|
plugin logPlugin
|
|
fifoPath string
|
|
capabilities Capability
|
|
logInfo Info
|
|
|
|
// synchronize access to the log stream and shared buffer
|
|
mu sync.Mutex
|
|
enc logdriver.LogEntryEncoder
|
|
stream io.WriteCloser
|
|
// buf is shared for each `Log()` call to reduce allocations.
|
|
// buf must be protected by mutex
|
|
buf logdriver.LogEntry
|
|
}
|
|
|
|
func (a *pluginAdapter) Log(msg *Message) error {
|
|
a.mu.Lock()
|
|
|
|
a.buf.Line = msg.Line
|
|
a.buf.TimeNano = msg.Timestamp.UnixNano()
|
|
a.buf.Partial = msg.Partial
|
|
a.buf.Source = msg.Source
|
|
|
|
err := a.enc.Encode(&a.buf)
|
|
a.buf.Reset()
|
|
|
|
a.mu.Unlock()
|
|
|
|
PutMessage(msg)
|
|
return err
|
|
}
|
|
|
|
func (a *pluginAdapter) Name() string {
|
|
return a.driverName
|
|
}
|
|
|
|
func (a *pluginAdapter) Close() error {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := a.stream.Close(); err != nil {
|
|
logrus.WithError(err).Error("error closing plugin fifo")
|
|
}
|
|
if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
|
|
logrus.WithError(err).Error("error cleaning up plugin fifo")
|
|
}
|
|
|
|
// may be nil, especially for unit tests
|
|
if pluginGetter != nil {
|
|
pluginGetter.Get(a.Name(), extName, plugingetter.Release)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type pluginAdapterWithRead struct {
|
|
*pluginAdapter
|
|
}
|
|
|
|
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
|
|
watcher := NewLogWatcher()
|
|
|
|
go func() {
|
|
defer close(watcher.Msg)
|
|
stream, err := a.plugin.ReadLogs(a.logInfo, config)
|
|
if err != nil {
|
|
watcher.Err <- errors.Wrap(err, "error getting log reader")
|
|
return
|
|
}
|
|
defer stream.Close()
|
|
|
|
dec := logdriver.NewLogEntryDecoder(stream)
|
|
for {
|
|
select {
|
|
case <-watcher.WatchClose():
|
|
return
|
|
default:
|
|
}
|
|
|
|
var buf logdriver.LogEntry
|
|
if err := dec.Decode(&buf); err != nil {
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
select {
|
|
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
|
|
case <-watcher.WatchClose():
|
|
}
|
|
return
|
|
}
|
|
|
|
msg := &Message{
|
|
Timestamp: time.Unix(0, buf.TimeNano),
|
|
Line: buf.Line,
|
|
Source: buf.Source,
|
|
}
|
|
|
|
// plugin should handle this, but check just in case
|
|
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
|
|
continue
|
|
}
|
|
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case watcher.Msg <- msg:
|
|
case <-watcher.WatchClose():
|
|
// make sure the message we consumed is sent
|
|
watcher.Msg <- msg
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return watcher
|
|
}
|