Merge pull request #31856 from jim-minter/more_races

Resolve a set of race conditions in logging and attaching code
This commit is contained in:
Tõnis Tiigi 2017-04-26 16:04:02 -07:00 committed by GitHub
commit 5eca7f7c5d
9 changed files with 67 additions and 44 deletions

View file

@ -114,7 +114,7 @@ type Backend interface {
// PullOnBuild tells Docker to pull image referenced by `name`.
PullOnBuild(ctx context.Context, name string, authConfigs map[string]types.AuthConfig, output io.Writer) (Image, error)
// ContainerAttachRaw attaches to container.
ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error
ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool, attached chan struct{}) error
// ContainerCreate creates a new Docker container and returns potential warnings
ContainerCreate(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
// ContainerRm removes a container specified by `id`.

View file

@ -573,11 +573,18 @@ func (b *Builder) create() (string, error) {
var errCancelled = errors.New("build cancelled")
func (b *Builder) run(cID string) (err error) {
attached := make(chan struct{})
errCh := make(chan error)
go func() {
errCh <- b.docker.ContainerAttachRaw(cID, nil, b.Stdout, b.Stderr, true)
errCh <- b.docker.ContainerAttachRaw(cID, nil, b.Stdout, b.Stderr, true, attached)
}()
select {
case err := <-errCh:
return err
case <-attached:
}
finished := make(chan struct{})
cancelErrCh := make(chan error, 1)
go func() {

View file

@ -33,7 +33,7 @@ func (m *MockBackend) PullOnBuild(ctx context.Context, name string, authConfigs
return nil, nil
}
func (m *MockBackend) ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error {
func (m *MockBackend) ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool, attached chan struct{}) error {
return nil
}

View file

@ -73,7 +73,7 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
}
// ContainerAttachRaw attaches the provided streams to the container's stdio
func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool) error {
func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool, attached chan struct{}) error {
container, err := daemon.GetContainer(prefixOrName)
if err != nil {
return err
@ -86,6 +86,7 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose
CloseStdin: container.Config.StdinOnce,
}
container.StreamConfig.AttachStreams(&cfg)
close(attached)
if cfg.UseStdin {
cfg.Stdin = stdin
}
@ -101,15 +102,23 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose
func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.AttachConfig, logs, doStream bool) error {
if logs {
logDriver, err := daemon.getLogger(c)
logDriver, logCreated, err := daemon.getLogger(c)
if err != nil {
return err
}
if logCreated {
defer func() {
if err = logDriver.Close(); err != nil {
logrus.Errorf("Error closing logger: %v", err)
}
}()
}
cLog, ok := logDriver.(logger.LogReader)
if !ok {
return logger.ErrReadLogsNotSupported
}
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
defer logs.Close()
LogLoop:
for {

View file

@ -18,12 +18,13 @@ import (
const name = "journald"
type journald struct {
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message
readers readerList
closed bool
}
type readerList struct {
mu sync.Mutex
readers map[*logger.LogWatcher]*logger.LogWatcher
}

View file

@ -161,11 +161,12 @@ import (
)
func (s *journald) Close() error {
s.readers.mu.Lock()
s.mu.Lock()
s.closed = true
for reader := range s.readers.readers {
reader.Close()
}
s.readers.mu.Unlock()
s.mu.Unlock()
return nil
}
@ -245,9 +246,16 @@ drain:
}
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
s.readers.mu.Lock()
s.mu.Lock()
s.readers.readers[logWatcher] = logWatcher
s.readers.mu.Unlock()
if s.closed {
// the journald Logger is closed, presumably because the container has been
// reset. So we shouldn't follow, because we'll never be woken up. But we
// should make one more drainJournal call to be sure we've got all the logs.
// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
C.close(pfd[1])
}
s.mu.Unlock()
newCursor := make(chan *C.char)
@ -274,22 +282,22 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
// Clean up.
C.close(pfd[0])
s.readers.mu.Lock()
s.mu.Lock()
delete(s.readers.readers, logWatcher)
s.readers.mu.Unlock()
s.mu.Unlock()
close(logWatcher.Msg)
newCursor <- cursor
}()
// Wait until we're told to stop.
select {
case cursor = <-newCursor:
case <-logWatcher.WatchClose():
// Notify the other goroutine that its work is done.
C.close(pfd[1])
cursor = <-newCursor
}
cursor = <-newCursor
return cursor
}

View file

@ -27,6 +27,7 @@ type JSONFileLogger struct {
mu sync.Mutex
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
closed bool
}
func init() {
@ -142,6 +143,7 @@ func (l *JSONFileLogger) LogPath() string {
// Close closes underlying file and signals all readers to stop.
func (l *JSONFileLogger) Close() error {
l.mu.Lock()
l.closed = true
err := l.writer.Close()
for r := range l.readers {
r.Close()

View file

@ -88,10 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
}
}
if !config.Follow {
if err := latestFile.Close(); err != nil {
logrus.Errorf("Error closing file: %v", err)
}
if !config.Follow || l.closed {
l.mu.Unlock()
return
}
@ -100,17 +97,18 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
latestFile.Seek(0, os.SEEK_END)
}
notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)
l.readers[logWatcher] = struct{}{}
l.mu.Unlock()
notifyRotate := l.writer.NotifyRotate()
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.mu.Lock()
delete(l.readers, logWatcher)
l.mu.Unlock()
l.writer.NotifyRotateEvict(notifyRotate)
}
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {

View file

@ -45,17 +45,24 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
return nil, logger.ErrReadLogsNotSupported
}
cLog, err := daemon.getLogger(container)
cLog, cLogCreated, err := daemon.getLogger(container)
if err != nil {
return nil, err
}
if cLogCreated {
defer func() {
if err = cLog.Close(); err != nil {
logrus.Errorf("Error closing logger: %v", err)
}
}()
}
logReader, ok := cLog.(logger.LogReader)
if !ok {
return nil, logger.ErrReadLogsNotSupported
}
follow := config.Follow && container.IsRunning()
follow := config.Follow && !cLogCreated
tailLines, err := strconv.Atoi(config.Tail)
if err != nil {
tailLines = -1
@ -85,23 +92,8 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
messageChan := make(chan *backend.LogMessage, 1)
go func() {
// set up some defers
defer func() {
// ok so this function, originally, was placed right after that
// logger.ReadLogs call above. I THINK that means it sets off the
// chain of events that results in the logger needing to be closed.
// i do not know if an error in time parsing above causing an early
// return will result in leaking the logger. if that is the case,
// it would also have been a bug in the original code
logs.Close()
if cLog != container.LogDriver {
// Since the logger isn't cached in the container, which
// occurs if it is running, it must get explicitly closed
// here to avoid leaking it and any file handles it has.
if err := cLog.Close(); err != nil {
logrus.Errorf("Error closing logger: %v", err)
}
}
}()
defer logs.Close()
// close the messages channel. closing is the only way to signal above
// that we're doing with logs (other than context cancel i guess).
defer close(messageChan)
@ -148,11 +140,17 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
return messageChan, nil
}
func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) {
if container.LogDriver != nil && container.IsRunning() {
return container.LogDriver, nil
func (daemon *Daemon) getLogger(container *container.Container) (l logger.Logger, created bool, err error) {
container.Lock()
if container.State.Running {
l = container.LogDriver
}
return container.StartLogger()
container.Unlock()
if l == nil {
created = true
l, err = container.StartLogger()
}
return
}
// mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.