Browse Source

Make sure timers are stopped after use.

`time.After` keeps a timer running until the specified duration is
completed. It also allocates a new timer on each call. This can wind up
leaving lots of uneccessary timers running in the background that are
not needed and consume resources.

Instead of `time.After`, use `time.NewTimer` so the timer can actually
be stopped.
In some of these cases it's not a big deal since the duraiton is really
short, but in others it is much worse.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 6 years ago
parent
commit
eaad3ee3cf

+ 3 - 1
api/server/router/system/system_routes.go

@@ -165,7 +165,9 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
 
 
 		if !onlyPastEvents {
 		if !onlyPastEvents {
 			dur := until.Sub(now)
 			dur := until.Sub(now)
-			timeout = time.After(dur)
+			timer := time.NewTimer(dur)
+			defer timer.Stop()
+			timeout = timer.C
 		}
 		}
 	}
 	}
 
 

+ 5 - 1
cmd/dockerd/daemon.go

@@ -378,10 +378,14 @@ func shutdownDaemon(d *daemon.Daemon) {
 		logrus.Debug("Clean shutdown succeeded")
 		logrus.Debug("Clean shutdown succeeded")
 		return
 		return
 	}
 	}
+
+	timeout := time.NewTimer(time.Duration(shutdownTimeout) * time.Second)
+	defer timeout.Stop()
+
 	select {
 	select {
 	case <-ch:
 	case <-ch:
 		logrus.Debug("Clean shutdown succeeded")
 		logrus.Debug("Clean shutdown succeeded")
-	case <-time.After(time.Duration(shutdownTimeout) * time.Second):
+	case <-timeout.C:
 		logrus.Error("Force shutdown daemon")
 		logrus.Error("Force shutdown daemon")
 	}
 	}
 }
 }

+ 4 - 1
container/monitor.go

@@ -33,8 +33,11 @@ func (container *Container) Reset(lock bool) {
 				container.LogCopier.Wait()
 				container.LogCopier.Wait()
 				close(exit)
 				close(exit)
 			}()
 			}()
+
+			timer := time.NewTimer(loggerCloseTimeout)
+			defer timer.Stop()
 			select {
 			select {
-			case <-time.After(loggerCloseTimeout):
+			case <-timer.C:
 				logrus.Warn("Logger didn't exit in time: logs may be truncated")
 				logrus.Warn("Logger didn't exit in time: logs may be truncated")
 			case <-exit:
 			case <-exit:
 			}
 			}

+ 4 - 1
daemon/cluster/cluster.go

@@ -186,8 +186,11 @@ func (c *Cluster) Start() error {
 	}
 	}
 	c.nr = nr
 	c.nr = nr
 
 
+	timer := time.NewTimer(swarmConnectTimeout)
+	defer timer.Stop()
+
 	select {
 	select {
-	case <-time.After(swarmConnectTimeout):
+	case <-timer.C:
 		logrus.Error("swarm component could not be started before timeout was reached")
 		logrus.Error("swarm component could not be started before timeout was reached")
 	case err := <-nr.Ready():
 	case err := <-nr.Ready():
 		if err != nil {
 		if err != nil {

+ 4 - 1
daemon/cluster/swarm.go

@@ -194,8 +194,11 @@ func (c *Cluster) Join(req types.JoinRequest) error {
 	c.nr = nr
 	c.nr = nr
 	c.mu.Unlock()
 	c.mu.Unlock()
 
 
+	timeout := time.NewTimer(swarmConnectTimeout)
+	defer timeout.Stop()
+
 	select {
 	select {
-	case <-time.After(swarmConnectTimeout):
+	case <-timeout.C:
 		return errSwarmJoinTimeoutReached
 		return errSwarmJoinTimeoutReached
 	case err := <-nr.Ready():
 	case err := <-nr.Ready():
 		if err != nil {
 		if err != nil {

+ 18 - 4
daemon/daemon.go

@@ -42,6 +42,7 @@ import (
 	"github.com/moby/buildkit/util/resolver"
 	"github.com/moby/buildkit/util/resolver"
 	"github.com/moby/buildkit/util/tracing"
 	"github.com/moby/buildkit/util/tracing"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
+
 	// register graph drivers
 	// register graph drivers
 	_ "github.com/docker/docker/daemon/graphdriver/register"
 	_ "github.com/docker/docker/daemon/graphdriver/register"
 	"github.com/docker/docker/daemon/stats"
 	"github.com/docker/docker/daemon/stats"
@@ -479,12 +480,14 @@ func (daemon *Daemon) restore() error {
 			// ignore errors here as this is a best effort to wait for children to be
 			// ignore errors here as this is a best effort to wait for children to be
 			//   running before we try to start the container
 			//   running before we try to start the container
 			children := daemon.children(c)
 			children := daemon.children(c)
-			timeout := time.After(5 * time.Second)
+			timeout := time.NewTimer(5 * time.Second)
+			defer timeout.Stop()
+
 			for _, child := range children {
 			for _, child := range children {
 				if notifier, exists := restartContainers[child]; exists {
 				if notifier, exists := restartContainers[child]; exists {
 					select {
 					select {
 					case <-notifier:
 					case <-notifier:
-					case <-timeout:
+					case <-timeout.C:
 					}
 					}
 				}
 				}
 			}
 			}
@@ -602,6 +605,7 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) {
 	if daemon.discoveryWatcher == nil {
 	if daemon.discoveryWatcher == nil {
 		return
 		return
 	}
 	}
+
 	// Make sure if the container has a network that requires discovery that the discovery service is available before starting
 	// Make sure if the container has a network that requires discovery that the discovery service is available before starting
 	for netName := range c.NetworkSettings.Networks {
 	for netName := range c.NetworkSettings.Networks {
 		// If we get `ErrNoSuchNetwork` here, we can assume that it is due to discovery not being ready
 		// If we get `ErrNoSuchNetwork` here, we can assume that it is due to discovery not being ready
@@ -610,13 +614,19 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) {
 			if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
 			if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
 				continue
 				continue
 			}
 			}
+
 			// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
 			// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
 			// FIXME: why is this slow???
 			// FIXME: why is this slow???
+			dur := 60 * time.Second
+			timer := time.NewTimer(dur)
+
 			logrus.Debugf("Container %s waiting for network to be ready", c.Name)
 			logrus.Debugf("Container %s waiting for network to be ready", c.Name)
 			select {
 			select {
 			case <-daemon.discoveryWatcher.ReadyCh():
 			case <-daemon.discoveryWatcher.ReadyCh():
-			case <-time.After(60 * time.Second):
+			case <-timer.C:
 			}
 			}
+			timer.Stop()
+
 			return
 			return
 		}
 		}
 	}
 	}
@@ -666,10 +676,14 @@ func (daemon *Daemon) DaemonLeavesCluster() {
 	// This is called also on graceful daemon shutdown. We need to
 	// This is called also on graceful daemon shutdown. We need to
 	// wait, because the ingress release has to happen before the
 	// wait, because the ingress release has to happen before the
 	// network controller is stopped.
 	// network controller is stopped.
+
 	if done, err := daemon.ReleaseIngress(); err == nil {
 	if done, err := daemon.ReleaseIngress(); err == nil {
+		timeout := time.NewTimer(5 * time.Second)
+		defer timeout.Stop()
+
 		select {
 		select {
 		case <-done:
 		case <-done:
-		case <-time.After(5 * time.Second):
+		case <-timeout.C:
 			logrus.Warn("timeout while waiting for ingress network removal")
 			logrus.Warn("timeout while waiting for ingress network removal")
 		}
 		}
 	} else {
 	} else {

+ 4 - 2
daemon/discovery/discovery.go

@@ -148,12 +148,14 @@ func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
 	// Setup a short ticker until the first heartbeat has succeeded
 	// Setup a short ticker until the first heartbeat has succeeded
 	t := time.NewTicker(500 * time.Millisecond)
 	t := time.NewTicker(500 * time.Millisecond)
 	defer t.Stop()
 	defer t.Stop()
+
 	// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
 	// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
-	timeout := time.After(60 * time.Second)
+	timeout := time.NewTimer(60 * time.Second)
+	defer timeout.Stop()
 
 
 	for {
 	for {
 		select {
 		select {
-		case <-timeout:
+		case <-timeout.C:
 			return errors.New("timeout waiting for initial discovery")
 			return errors.New("timeout waiting for initial discovery")
 		case <-d.term:
 		case <-d.term:
 			return errors.New("terminated")
 			return errors.New("terminated")

+ 7 - 3
daemon/exec.go

@@ -22,7 +22,7 @@ import (
 )
 )
 
 
 // Seconds to wait after sending TERM before trying KILL
 // Seconds to wait after sending TERM before trying KILL
-const termProcessTimeout = 10
+const termProcessTimeout = 10 * time.Second
 
 
 func (d *Daemon) registerExecCommand(container *container.Container, config *exec.Config) {
 func (d *Daemon) registerExecCommand(container *container.Container, config *exec.Config) {
 	// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
 	// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
@@ -265,9 +265,13 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
 	case <-ctx.Done():
 	case <-ctx.Done():
 		logrus.Debugf("Sending TERM signal to process %v in container %v", name, c.ID)
 		logrus.Debugf("Sending TERM signal to process %v in container %v", name, c.ID)
 		d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["TERM"]))
 		d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["TERM"]))
+
+		timeout := time.NewTimer(termProcessTimeout)
+		defer timeout.Stop()
+
 		select {
 		select {
-		case <-time.After(termProcessTimeout * time.Second):
-			logrus.Infof("Container %v, process %v failed to exit within %d seconds of signal TERM - using the force", c.ID, name, termProcessTimeout)
+		case <-timeout.C:
+			logrus.Infof("Container %v, process %v failed to exit within %v of signal TERM - using the force", c.ID, name, termProcessTimeout)
 			d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["KILL"]))
 			d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["KILL"]))
 		case <-attachErr:
 		case <-attachErr:
 			// TERM signal worked
 			// TERM signal worked

+ 7 - 1
daemon/health.go

@@ -187,12 +187,18 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch
 func monitor(d *Daemon, c *container.Container, stop chan struct{}, probe probe) {
 func monitor(d *Daemon, c *container.Container, stop chan struct{}, probe probe) {
 	probeTimeout := timeoutWithDefault(c.Config.Healthcheck.Timeout, defaultProbeTimeout)
 	probeTimeout := timeoutWithDefault(c.Config.Healthcheck.Timeout, defaultProbeTimeout)
 	probeInterval := timeoutWithDefault(c.Config.Healthcheck.Interval, defaultProbeInterval)
 	probeInterval := timeoutWithDefault(c.Config.Healthcheck.Interval, defaultProbeInterval)
+
+	intervalTimer := time.NewTimer(probeInterval)
+	defer intervalTimer.Stop()
+
 	for {
 	for {
+		intervalTimer.Reset(probeInterval)
+
 		select {
 		select {
 		case <-stop:
 		case <-stop:
 			logrus.Debugf("Stop healthcheck monitoring for container %s (received while idle)", c.ID)
 			logrus.Debugf("Stop healthcheck monitoring for container %s (received while idle)", c.ID)
 			return
 			return
-		case <-time.After(probeInterval):
+		case <-intervalTimer.C:
 			logrus.Debugf("Running health check for container %s ...", c.ID)
 			logrus.Debugf("Running health check for container %s ...", c.ID)
 			startTime := time.Now()
 			startTime := time.Now()
 			ctx, cancelProbe := context.WithTimeout(context.Background(), probeTimeout)
 			ctx, cancelProbe := context.WithTimeout(context.Background(), probeTimeout)

+ 5 - 2
daemon/resize.go

@@ -38,13 +38,16 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	// TODO: the timeout is hardcoded here, it would be more flexible to make it
 	// TODO: the timeout is hardcoded here, it would be more flexible to make it
 	// a parameter in resize request context, which would need API changes.
 	// a parameter in resize request context, which would need API changes.
-	timeout := 10 * time.Second
+	timeout := time.NewTimer(10 * time.Second)
+	defer timeout.Stop()
+
 	select {
 	select {
 	case <-ec.Started:
 	case <-ec.Started:
 		return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height)
 		return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height)
-	case <-time.After(timeout):
+	case <-timeout.C:
 		return fmt.Errorf("timeout waiting for exec session ready")
 		return fmt.Errorf("timeout waiting for exec session ready")
 	}
 	}
 }
 }

+ 30 - 17
libcontainerd/supervisor/remote_daemon.go

@@ -89,8 +89,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da
 
 
 	go r.monitorDaemon(ctx)
 	go r.monitorDaemon(ctx)
 
 
+	timeout := time.NewTimer(startupTimeout)
+	defer timeout.Stop()
+
 	select {
 	select {
-	case <-time.After(startupTimeout):
+	case <-timeout.C:
 		return nil, errors.New("timeout waiting for containerd to start")
 		return nil, errors.New("timeout waiting for containerd to start")
 	case err := <-r.daemonStartCh:
 	case err := <-r.daemonStartCh:
 		if err != nil {
 		if err != nil {
@@ -101,8 +104,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da
 	return r, nil
 	return r, nil
 }
 }
 func (r *remote) WaitTimeout(d time.Duration) error {
 func (r *remote) WaitTimeout(d time.Duration) error {
+	timeout := time.NewTimer(d)
+	defer timeout.Stop()
+
 	select {
 	select {
-	case <-time.After(d):
+	case <-timeout.C:
 		return errors.New("timeout waiting for containerd to stop")
 		return errors.New("timeout waiting for containerd to stop")
 	case <-r.daemonStopCh:
 	case <-r.daemonStopCh:
 	}
 	}
@@ -230,7 +236,8 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 		transientFailureCount = 0
 		transientFailureCount = 0
 		client                *containerd.Client
 		client                *containerd.Client
 		err                   error
 		err                   error
-		delay                 <-chan time.Time
+		delay                 time.Duration
+		timer                 = time.NewTimer(0)
 		started               bool
 		started               bool
 	)
 	)
 
 
@@ -245,19 +252,25 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 		r.platformCleanup()
 		r.platformCleanup()
 
 
 		close(r.daemonStopCh)
 		close(r.daemonStopCh)
+		timer.Stop()
 	}()
 	}()
 
 
+	// ensure no races on sending to timer.C even though there is a 0 duration.
+	if !timer.Stop() {
+		<-timer.C
+	}
+
 	for {
 	for {
-		if delay != nil {
-			select {
-			case <-ctx.Done():
-				r.logger.Info("stopping healthcheck following graceful shutdown")
-				if client != nil {
-					client.Close()
-				}
-				return
-			case <-delay:
+		timer.Reset(delay)
+
+		select {
+		case <-ctx.Done():
+			r.logger.Info("stopping healthcheck following graceful shutdown")
+			if client != nil {
+				client.Close()
 			}
 			}
+			return
+		case <-timer.C:
 		}
 		}
 
 
 		if r.daemonPid == -1 {
 		if r.daemonPid == -1 {
@@ -277,14 +290,14 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 					return
 					return
 				}
 				}
 				r.logger.WithError(err).Error("failed restarting containerd")
 				r.logger.WithError(err).Error("failed restarting containerd")
-				delay = time.After(50 * time.Millisecond)
+				delay = 50 * time.Millisecond
 				continue
 				continue
 			}
 			}
 
 
 			client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second))
 			client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second))
 			if err != nil {
 			if err != nil {
 				r.logger.WithError(err).Error("failed connecting to containerd")
 				r.logger.WithError(err).Error("failed connecting to containerd")
-				delay = time.After(100 * time.Millisecond)
+				delay = 100 * time.Millisecond
 				continue
 				continue
 			}
 			}
 		}
 		}
@@ -300,7 +313,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 				}
 				}
 
 
 				transientFailureCount = 0
 				transientFailureCount = 0
-				delay = time.After(500 * time.Millisecond)
+				delay = 500 * time.Millisecond
 				continue
 				continue
 			}
 			}
 
 
@@ -308,7 +321,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 
 
 			transientFailureCount++
 			transientFailureCount++
 			if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
 			if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
-				delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond)
+				delay = time.Duration(transientFailureCount) * 200 * time.Millisecond
 				continue
 				continue
 			}
 			}
 		}
 		}
@@ -321,7 +334,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
 		client.Close()
 		client.Close()
 		client = nil
 		client = nil
 		r.daemonPid = -1
 		r.daemonPid = -1
-		delay = nil
+		delay = 0
 		transientFailureCount = 0
 		transientFailureCount = 0
 	}
 	}
 }
 }

+ 10 - 1
pkg/filenotify/poller.go

@@ -146,9 +146,18 @@ func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
 // upon finding changes to a file or errors, sendEvent/sendErr is called
 // upon finding changes to a file or errors, sendEvent/sendErr is called
 func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
 func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
 	defer f.Close()
 	defer f.Close()
+
+	timer := time.NewTimer(watchWaitTime)
+	if !timer.Stop() {
+		<-timer.C
+	}
+	defer timer.Stop()
+
 	for {
 	for {
+		timer.Reset(watchWaitTime)
+
 		select {
 		select {
-		case <-time.After(watchWaitTime):
+		case <-timer.C:
 		case <-chClose:
 		case <-chClose:
 			logrus.Debugf("watch for %s closed", f.Name())
 			logrus.Debugf("watch for %s closed", f.Name())
 			return
 			return

+ 4 - 1
pkg/pubsub/publisher.go

@@ -107,9 +107,12 @@ func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg
 
 
 	// send under a select as to not block if the receiver is unavailable
 	// send under a select as to not block if the receiver is unavailable
 	if p.timeout > 0 {
 	if p.timeout > 0 {
+		timeout := time.NewTimer(p.timeout)
+		defer timeout.Stop()
+
 		select {
 		select {
 		case sub <- v:
 		case sub <- v:
-		case <-time.After(p.timeout):
+		case <-timeout.C:
 		}
 		}
 		return
 		return
 	}
 	}

+ 12 - 3
plugin/manager_linux.go

@@ -146,6 +146,8 @@ func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
 	return nil
 	return nil
 }
 }
 
 
+const shutdownTimeout = 10 * time.Second
+
 func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
 func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
 	pluginID := p.GetID()
 	pluginID := p.GetID()
 
 
@@ -153,19 +155,26 @@ func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
 	if err != nil {
 	if err != nil {
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
 	} else {
 	} else {
+
+		timeout := time.NewTimer(shutdownTimeout)
+		defer timeout.Stop()
+
 		select {
 		select {
 		case <-ec:
 		case <-ec:
 			logrus.Debug("Clean shutdown of plugin")
 			logrus.Debug("Clean shutdown of plugin")
-		case <-time.After(time.Second * 10):
+		case <-timeout.C:
 			logrus.Debug("Force shutdown plugin")
 			logrus.Debug("Force shutdown plugin")
 			if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil {
 			if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil {
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
 			}
 			}
+
+			timeout.Reset(shutdownTimeout)
+
 			select {
 			select {
 			case <-ec:
 			case <-ec:
 				logrus.Debug("SIGKILL plugin shutdown")
 				logrus.Debug("SIGKILL plugin shutdown")
-			case <-time.After(time.Second * 10):
-				logrus.Debug("Force shutdown plugin FAILED")
+			case <-timeout.C:
+				logrus.WithField("plugin", p.Name).Warn("Force shutdown plugin FAILED")
 			}
 			}
 		}
 		}
 	}
 	}

+ 4 - 1
restartmanager/restartmanager.go

@@ -107,11 +107,14 @@ func (rm *restartManager) ShouldRestart(exitCode uint32, hasBeenManuallyStopped
 
 
 	ch := make(chan error)
 	ch := make(chan error)
 	go func() {
 	go func() {
+		timeout := time.NewTimer(rm.timeout)
+		defer timeout.Stop()
+
 		select {
 		select {
 		case <-rm.cancel:
 		case <-rm.cancel:
 			ch <- ErrRestartCanceled
 			ch <- ErrRestartCanceled
 			close(ch)
 			close(ch)
-		case <-time.After(rm.timeout):
+		case <-timeout.C:
 			rm.Lock()
 			rm.Lock()
 			close(ch)
 			close(ch)
 			rm.active = false
 			rm.active = false