cff4f20c44
The github.com/containerd/containerd/log package was moved to a separate module, which will also be used by upcoming (patch) releases of containerd. This patch moves our own uses of the package to use the new module. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
137 lines
3.1 KiB
Go
137 lines
3.1 KiB
Go
package logger // import "github.com/docker/docker/daemon/logger"
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/log"
|
|
"github.com/docker/docker/api/types/plugins/logdriver"
|
|
"github.com/docker/docker/pkg/plugingetter"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// pluginAdapter takes a plugin and implements the Logger interface for logger
|
|
// instances
|
|
type pluginAdapter struct {
|
|
driverName string
|
|
id string
|
|
plugin logPlugin
|
|
fifoPath string
|
|
capabilities Capability
|
|
logInfo Info
|
|
|
|
// synchronize access to the log stream and shared buffer
|
|
mu sync.Mutex
|
|
enc logdriver.LogEntryEncoder
|
|
stream io.WriteCloser
|
|
// buf is shared for each `Log()` call to reduce allocations.
|
|
// buf must be protected by mutex
|
|
buf logdriver.LogEntry
|
|
}
|
|
|
|
func (a *pluginAdapter) Log(msg *Message) error {
|
|
a.mu.Lock()
|
|
|
|
a.buf.Line = msg.Line
|
|
a.buf.TimeNano = msg.Timestamp.UnixNano()
|
|
a.buf.Partial = msg.PLogMetaData != nil
|
|
a.buf.Source = msg.Source
|
|
if msg.PLogMetaData != nil {
|
|
a.buf.PartialLogMetadata = &logdriver.PartialLogEntryMetadata{
|
|
Id: msg.PLogMetaData.ID,
|
|
Last: msg.PLogMetaData.Last,
|
|
Ordinal: int32(msg.PLogMetaData.Ordinal),
|
|
}
|
|
}
|
|
|
|
err := a.enc.Encode(&a.buf)
|
|
a.buf.Reset()
|
|
|
|
a.mu.Unlock()
|
|
|
|
PutMessage(msg)
|
|
return err
|
|
}
|
|
|
|
func (a *pluginAdapter) Name() string {
|
|
return a.driverName
|
|
}
|
|
|
|
func (a *pluginAdapter) Close() error {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := a.stream.Close(); err != nil {
|
|
log.G(context.TODO()).WithError(err).Error("error closing plugin fifo")
|
|
}
|
|
if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
|
|
log.G(context.TODO()).WithError(err).Error("error cleaning up plugin fifo")
|
|
}
|
|
|
|
// may be nil, especially for unit tests
|
|
if pluginGetter != nil {
|
|
pluginGetter.Get(a.Name(), extName, plugingetter.Release)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type pluginAdapterWithRead struct {
|
|
*pluginAdapter
|
|
}
|
|
|
|
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
|
|
watcher := NewLogWatcher()
|
|
|
|
go func() {
|
|
defer close(watcher.Msg)
|
|
stream, err := a.plugin.ReadLogs(a.logInfo, config)
|
|
if err != nil {
|
|
watcher.Err <- errors.Wrap(err, "error getting log reader")
|
|
return
|
|
}
|
|
defer stream.Close()
|
|
|
|
dec := logdriver.NewLogEntryDecoder(stream)
|
|
for {
|
|
var buf logdriver.LogEntry
|
|
if err := dec.Decode(&buf); err != nil {
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
watcher.Err <- errors.Wrap(err, "error decoding log message")
|
|
return
|
|
}
|
|
|
|
msg := &Message{
|
|
Timestamp: time.Unix(0, buf.TimeNano),
|
|
Line: buf.Line,
|
|
Source: buf.Source,
|
|
}
|
|
|
|
// plugin should handle this, but check just in case
|
|
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
|
|
continue
|
|
}
|
|
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
|
|
return
|
|
}
|
|
|
|
// send the message unless the consumer is gone
|
|
select {
|
|
case watcher.Msg <- msg:
|
|
case <-watcher.WatchConsumerGone():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return watcher
|
|
}
|