Ver Fonte

Merge pull request #7449 from shykes/cleanup-shutdown

Cleanup: refactor shutdown and signal handling facility
Michael Crosby há 11 anos atrás
pai
commit
2c90fde982
11 ficheiros alterados com 269 adições e 157 exclusões
  1. 0 3
      builtins/builtins.go
  2. 4 7
      daemon/container.go
  3. 36 31
      daemon/daemon.go
  4. 0 5
      daemon/server.go
  5. 2 8
      docker/daemon.go
  6. 85 8
      engine/engine.go
  7. 7 1
      engine/job.go
  8. 80 0
      engine/shutdown_test.go
  9. 53 0
      pkg/signal/trap.go
  10. 2 58
      server/init.go
  11. 0 36
      server/server.go

+ 0 - 3
builtins/builtins.go

@@ -54,9 +54,6 @@ func remote(eng *engine.Engine) error {
 // These components should be broken off into plugins of their own.
 //
 func daemon(eng *engine.Engine) error {
-	if err := eng.Register("initserverpidfile", server.InitPidfile); err != nil {
-		return err
-	}
 	if err := eng.Register("initserver", server.InitServer); err != nil {
 		return err
 	}

+ 4 - 7
daemon/container.go

@@ -514,13 +514,10 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
 	if container.Config.OpenStdin {
 		container.stdin, container.stdinPipe = io.Pipe()
 	}
-	if container.daemon != nil && container.daemon.srv != nil {
-		container.LogEvent("die")
-	}
-	if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
-		// FIXME: here is race condition between two RUN instructions in Dockerfile
-		// because they share same runconfig and change image. Must be fixed
-		// in builder/builder.go
+	container.LogEvent("die")
+	// If the engine is shutting down, don't save the container state as stopped.
+	// This will cause it to be restarted when the engine is restarted.
+	if container.daemon != nil && container.daemon.eng != nil && !container.daemon.eng.IsShutdown() {
 		if err := container.toDisk(); err != nil {
 			utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err)
 		}

+ 36 - 31
daemon/daemon.go

@@ -94,7 +94,6 @@ type Daemon struct {
 	idIndex        *truncindex.TruncIndex
 	sysInfo        *sysinfo.SysInfo
 	volumes        *graph.Graph
-	srv            Server
 	eng            *engine.Engine
 	config         *daemonconfig.Config
 	containerGraph *graphdb.Database
@@ -667,6 +666,20 @@ func NewDaemon(config *daemonconfig.Config, eng *engine.Engine) (*Daemon, error)
 }
 
 func NewDaemonFromDirectory(config *daemonconfig.Config, eng *engine.Engine) (*Daemon, error) {
+	// Claim the pidfile first, to avoid any and all unexpected race conditions.
+	// Some of the init doesn't need a pidfile lock - but let's not try to be smart.
+	if config.Pidfile != "" {
+		if err := utils.CreatePidFile(config.Pidfile); err != nil {
+			return nil, err
+		}
+		eng.OnShutdown(func() {
+			// Always release the pidfile last, just in case
+			utils.RemovePidFile(config.Pidfile)
+		})
+	}
+
+	// Check that the system is supported and we have sufficient privileges
+	// FIXME: return errors instead of calling Fatal
 	if runtime.GOOS != "linux" {
 		log.Fatalf("The Docker daemon is only supported on linux")
 	}
@@ -819,13 +832,32 @@ func NewDaemonFromDirectory(config *daemonconfig.Config, eng *engine.Engine) (*D
 		eng:            eng,
 		Sockets:        config.Sockets,
 	}
-
 	if err := daemon.checkLocaldns(); err != nil {
 		return nil, err
 	}
 	if err := daemon.restore(); err != nil {
 		return nil, err
 	}
+	// Setup shutdown handlers
+	// FIXME: can these shutdown handlers be registered closer to their source?
+	eng.OnShutdown(func() {
+		// FIXME: if these cleanup steps can be called concurrently, register
+		// them as separate handlers to speed up total shutdown time
+		// FIXME: use engine logging instead of utils.Errorf
+		if err := daemon.shutdown(); err != nil {
+			utils.Errorf("daemon.shutdown(): %s", err)
+		}
+		if err := portallocator.ReleaseAll(); err != nil {
+			utils.Errorf("portallocator.ReleaseAll(): %s", err)
+		}
+		if err := daemon.driver.Cleanup(); err != nil {
+			utils.Errorf("daemon.driver.Cleanup(): %s", err.Error())
+		}
+		if err := daemon.containerGraph.Close(); err != nil {
+			utils.Errorf("daemon.containerGraph.Close(): %s", err.Error())
+		}
+	})
+
 	return daemon, nil
 }
 
@@ -853,30 +885,6 @@ func (daemon *Daemon) shutdown() error {
 	return nil
 }
 
-func (daemon *Daemon) Close() error {
-	errorsStrings := []string{}
-	if err := daemon.shutdown(); err != nil {
-		utils.Errorf("daemon.shutdown(): %s", err)
-		errorsStrings = append(errorsStrings, err.Error())
-	}
-	if err := portallocator.ReleaseAll(); err != nil {
-		utils.Errorf("portallocator.ReleaseAll(): %s", err)
-		errorsStrings = append(errorsStrings, err.Error())
-	}
-	if err := daemon.driver.Cleanup(); err != nil {
-		utils.Errorf("daemon.driver.Cleanup(): %s", err.Error())
-		errorsStrings = append(errorsStrings, err.Error())
-	}
-	if err := daemon.containerGraph.Close(); err != nil {
-		utils.Errorf("daemon.containerGraph.Close(): %s", err.Error())
-		errorsStrings = append(errorsStrings, err.Error())
-	}
-	if len(errorsStrings) > 0 {
-		return fmt.Errorf("%s", strings.Join(errorsStrings, ", "))
-	}
-	return nil
-}
-
 func (daemon *Daemon) Mount(container *Container) error {
 	dir, err := daemon.driver.Get(container.ID, container.GetMountLabel())
 	if err != nil {
@@ -967,6 +975,8 @@ func (daemon *Daemon) Kill(c *Container, sig int) error {
 // from the content root, including images, volumes and
 // container filesystems.
 // Again: this will remove your entire docker daemon!
+// FIXME: this is deprecated, and only used in legacy
+// tests. Please remove.
 func (daemon *Daemon) Nuke() error {
 	var wg sync.WaitGroup
 	for _, container := range daemon.List() {
@@ -977,7 +987,6 @@ func (daemon *Daemon) Nuke() error {
 		}(container)
 	}
 	wg.Wait()
-	daemon.Close()
 
 	return os.RemoveAll(daemon.config.Root)
 }
@@ -1022,10 +1031,6 @@ func (daemon *Daemon) ContainerGraph() *graphdb.Database {
 	return daemon.containerGraph
 }
 
-func (daemon *Daemon) SetServer(server Server) {
-	daemon.srv = server
-}
-
 func (daemon *Daemon) checkLocaldns() error {
 	resolvConf, err := resolvconf.Get()
 	if err != nil {

+ 0 - 5
daemon/server.go

@@ -1,5 +0,0 @@
-package daemon
-
-type Server interface {
-	IsRunning() bool // returns true if the server is currently in operation
-}

+ 2 - 8
docker/daemon.go

@@ -10,6 +10,7 @@ import (
 	"github.com/docker/docker/dockerversion"
 	"github.com/docker/docker/engine"
 	flag "github.com/docker/docker/pkg/mflag"
+	"github.com/docker/docker/pkg/signal"
 	"github.com/docker/docker/sysinit"
 )
 
@@ -39,19 +40,12 @@ func mainDaemon() {
 	}
 
 	eng := engine.New()
+	signal.Trap(eng.Shutdown)
 	// Load builtins
 	if err := builtins.Register(eng); err != nil {
 		log.Fatal(err)
 	}
 
-	// handle the pidfile early. https://github.com/docker/docker/issues/6973
-	if len(*pidfile) > 0 {
-		job := eng.Job("initserverpidfile", *pidfile)
-		if err := job.Run(); err != nil {
-			log.Fatal(err)
-		}
-	}
-
 	// load the daemon in the background so we can immediately start
 	// the http api so that connections don't fail while the daemon
 	// is booting

+ 85 - 8
engine/engine.go

@@ -7,6 +7,8 @@ import (
 	"os"
 	"sort"
 	"strings"
+	"sync"
+	"time"
 
 	"github.com/docker/docker/utils"
 )
@@ -43,14 +45,18 @@ func unregister(name string) {
 // It acts as a store for *containers*, and allows manipulation of these
 // containers by executing *jobs*.
 type Engine struct {
-	handlers map[string]Handler
-	catchall Handler
-	hack     Hack // data for temporary hackery (see hack.go)
-	id       string
-	Stdout   io.Writer
-	Stderr   io.Writer
-	Stdin    io.Reader
-	Logging  bool
+	handlers   map[string]Handler
+	catchall   Handler
+	hack       Hack // data for temporary hackery (see hack.go)
+	id         string
+	Stdout     io.Writer
+	Stderr     io.Writer
+	Stdin      io.Reader
+	Logging    bool
+	tasks      sync.WaitGroup
+	l          sync.RWMutex // lock for shutdown
+	shutdown   bool
+	onShutdown []func() // shutdown handlers
 }
 
 func (eng *Engine) Register(name string, handler Handler) error {
@@ -130,6 +136,77 @@ func (eng *Engine) Job(name string, args ...string) *Job {
 	return job
 }
 
+// OnShutdown registers a new callback to be called by Shutdown.
+// This is typically used by services to perform cleanup.
+func (eng *Engine) OnShutdown(h func()) {
+	eng.l.Lock()
+	eng.onShutdown = append(eng.onShutdown, h)
+	eng.l.Unlock()
+}
+
+// Shutdown permanently shuts down eng as follows:
+// - It refuses all new jobs, permanently.
+// - It waits for all active jobs to complete (with no timeout)
+// - It calls all shutdown handlers concurrently (if any)
+// - It returns when all handlers complete, or after 15 seconds,
+//	whichever happens first.
+func (eng *Engine) Shutdown() {
+	eng.l.Lock()
+	if eng.shutdown {
+		eng.l.Unlock()
+		return
+	}
+	eng.shutdown = true
+	eng.l.Unlock()
+	// We don't need to protect the rest with a lock, to allow
+	// for other calls to immediately fail with "shutdown" instead
+	// of hanging for 15 seconds.
+	// This requires all concurrent calls to check for shutdown, otherwise
+	// it might cause a race.
+
+	// Wait for all jobs to complete.
+	// Timeout after 5 seconds.
+	tasksDone := make(chan struct{})
+	go func() {
+		eng.tasks.Wait()
+		close(tasksDone)
+	}()
+	select {
+	case <-time.After(time.Second * 5):
+	case <-tasksDone:
+	}
+
+	// Call shutdown handlers, if any.
+	// Timeout after 10 seconds.
+	var wg sync.WaitGroup
+	for _, h := range eng.onShutdown {
+		wg.Add(1)
+		go func(h func()) {
+			defer wg.Done()
+			h()
+		}(h)
+	}
+	done := make(chan struct{})
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+	select {
+	case <-time.After(time.Second * 10):
+	case <-done:
+	}
+	return
+}
+
+// IsShutdown returns true if the engine is in the process
+// of shutting down, or already shut down.
+// Otherwise it returns false.
+func (eng *Engine) IsShutdown() bool {
+	eng.l.RLock()
+	defer eng.l.RUnlock()
+	return eng.shutdown
+}
+
 // ParseJob creates a new job from a text description using a shell-like syntax.
 //
 // The following syntax is used to parse `input`:

+ 7 - 1
engine/job.go

@@ -32,7 +32,6 @@ type Job struct {
 	handler Handler
 	status  Status
 	end     time.Time
-	onExit  []func()
 }
 
 type Status int
@@ -47,6 +46,13 @@ const (
 // If the job returns a failure status, an error is returned
 // which includes the status.
 func (job *Job) Run() error {
+	if job.Eng.IsShutdown() {
+		return fmt.Errorf("engine is shutdown")
+	}
+	job.Eng.l.Lock()
+	job.Eng.tasks.Add(1)
+	job.Eng.l.Unlock()
+	defer job.Eng.tasks.Done()
 	// FIXME: make this thread-safe
 	// FIXME: implement wait
 	if !job.end.IsZero() {

+ 80 - 0
engine/shutdown_test.go

@@ -0,0 +1,80 @@
+package engine
+
+import (
+	"testing"
+	"time"
+)
+
+func TestShutdownEmpty(t *testing.T) {
+	eng := New()
+	if eng.IsShutdown() {
+		t.Fatalf("IsShutdown should be false")
+	}
+	eng.Shutdown()
+	if !eng.IsShutdown() {
+		t.Fatalf("IsShutdown should be true")
+	}
+}
+
+func TestShutdownAfterRun(t *testing.T) {
+	eng := New()
+	var called bool
+	eng.Register("foo", func(job *Job) Status {
+		called = true
+		return StatusOK
+	})
+	if err := eng.Job("foo").Run(); err != nil {
+		t.Fatal(err)
+	}
+	eng.Shutdown()
+	if err := eng.Job("foo").Run(); err == nil {
+		t.Fatalf("%#v", *eng)
+	}
+}
+
+// An approximate and racy, but better-than-nothing test that
+//
+func TestShutdownDuringRun(t *testing.T) {
+	var (
+		jobDelay     time.Duration = 500 * time.Millisecond
+		jobDelayLow  time.Duration = 100 * time.Millisecond
+		jobDelayHigh time.Duration = 700 * time.Millisecond
+	)
+	eng := New()
+	var completed bool
+	eng.Register("foo", func(job *Job) Status {
+		time.Sleep(jobDelay)
+		completed = true
+		return StatusOK
+	})
+	go eng.Job("foo").Run()
+	time.Sleep(50 * time.Millisecond)
+	done := make(chan struct{})
+	var startShutdown time.Time
+	go func() {
+		startShutdown = time.Now()
+		eng.Shutdown()
+		close(done)
+	}()
+	time.Sleep(50 * time.Millisecond)
+	if err := eng.Job("foo").Run(); err == nil {
+		t.Fatalf("run on shutdown should fail: %#v", *eng)
+	}
+	<-done
+	// Verify that Shutdown() blocks for roughly 500ms, instead
+	// of returning almost instantly.
+	//
+	// We use >100ms to leave ample margin for race conditions between
+	// goroutines. It's possible (but unlikely in reasonable testing
+	// conditions), that this test will cause a false positive or false
+	// negative. But it's probably better than not having any test
+	// for the 99.999% of time where testing conditions are reasonable.
+	if d := time.Since(startShutdown); d.Nanoseconds() < jobDelayLow.Nanoseconds() {
+		t.Fatalf("shutdown did not block long enough: %v", d)
+	} else if d.Nanoseconds() > jobDelayHigh.Nanoseconds() {
+		t.Fatalf("shutdown blocked too long: %v", d)
+	}
+	if !completed {
+		t.Fatalf("job did not complete")
+	}
+}

+ 53 - 0
pkg/signal/trap.go

@@ -0,0 +1,53 @@
+package signal
+
+import (
+	"log"
+	"os"
+	gosignal "os/signal"
+	"sync/atomic"
+	"syscall"
+)
+
+// Trap sets up a simplified signal "trap", appropriate for common
+// behavior expected from a vanilla unix command-line tool in general
+// (and the Docker engine in particular).
+//
+// * If SIGINT or SIGTERM are received, `cleanup` is called, then the process is terminated.
+// * If SIGINT or SIGTERM are repeated 3 times before cleanup is complete, then cleanup is
+// skipped and the process terminated directly.
+// * If "DEBUG" is set in the environment, SIGQUIT causes an exit without cleanup.
+//
+func Trap(cleanup func()) {
+	c := make(chan os.Signal, 1)
+	signals := []os.Signal{os.Interrupt, syscall.SIGTERM}
+	if os.Getenv("DEBUG") == "" {
+		signals = append(signals, syscall.SIGQUIT)
+	}
+	gosignal.Notify(c, signals...)
+	go func() {
+		interruptCount := uint32(0)
+		for sig := range c {
+			go func(sig os.Signal) {
+				log.Printf("Received signal '%v', starting shutdown of docker...\n", sig)
+				switch sig {
+				case os.Interrupt, syscall.SIGTERM:
+					// If the user really wants to interrupt, let him do so.
+					if atomic.LoadUint32(&interruptCount) < 3 {
+						atomic.AddUint32(&interruptCount, 1)
+						// Initiate the cleanup only once
+						if atomic.LoadUint32(&interruptCount) == 1 {
+							// Call cleanup handler
+							cleanup()
+						} else {
+							return
+						}
+					} else {
+						log.Printf("Force shutdown of docker, interrupting cleanup\n")
+					}
+				case syscall.SIGQUIT:
+				}
+				os.Exit(128 + int(sig.(syscall.Signal)))
+			}(sig)
+		}
+	}()
+}

+ 2 - 58
server/init.go

@@ -5,83 +5,29 @@
 package server
 
 import (
-	"fmt"
-	"log"
-	"os"
-	gosignal "os/signal"
-	"sync/atomic"
-	"syscall"
-
 	"github.com/docker/docker/daemon"
 	"github.com/docker/docker/daemonconfig"
 	"github.com/docker/docker/engine"
-	"github.com/docker/docker/utils"
 )
 
 func (srv *Server) handlerWrap(h engine.Handler) engine.Handler {
 	return func(job *engine.Job) engine.Status {
-		if !srv.IsRunning() {
-			return job.Errorf("Server is not running")
-		}
 		srv.tasks.Add(1)
 		defer srv.tasks.Done()
 		return h(job)
 	}
 }
 
-func InitPidfile(job *engine.Job) engine.Status {
-	if len(job.Args) == 0 {
-		return job.Error(fmt.Errorf("no pidfile provided to initialize"))
-	}
-	job.Logf("Creating pidfile")
-	if err := utils.CreatePidFile(job.Args[0]); err != nil {
-		return job.Error(err)
-	}
-	return engine.StatusOK
-}
-
 // jobInitApi runs the remote api server `srv` as a daemon,
 // Only one api server can run at the same time - this is enforced by a pidfile.
 // The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
 func InitServer(job *engine.Job) engine.Status {
 	job.Logf("Creating server")
-	srv, err := NewServer(job.Eng, daemonconfig.ConfigFromJob(job))
+	cfg := daemonconfig.ConfigFromJob(job)
+	srv, err := NewServer(job.Eng, cfg)
 	if err != nil {
 		return job.Error(err)
 	}
-	job.Logf("Setting up signal traps")
-	c := make(chan os.Signal, 1)
-	signals := []os.Signal{os.Interrupt, syscall.SIGTERM}
-	if os.Getenv("DEBUG") == "" {
-		signals = append(signals, syscall.SIGQUIT)
-	}
-	gosignal.Notify(c, signals...)
-	go func() {
-		interruptCount := uint32(0)
-		for sig := range c {
-			go func(sig os.Signal) {
-				log.Printf("Received signal '%v', starting shutdown of docker...\n", sig)
-				switch sig {
-				case os.Interrupt, syscall.SIGTERM:
-					// If the user really wants to interrupt, let him do so.
-					if atomic.LoadUint32(&interruptCount) < 3 {
-						atomic.AddUint32(&interruptCount, 1)
-						// Initiate the cleanup only once
-						if atomic.LoadUint32(&interruptCount) == 1 {
-							utils.RemovePidFile(srv.daemon.Config().Pidfile)
-							srv.Close()
-						} else {
-							return
-						}
-					} else {
-						log.Printf("Force shutdown of docker, interrupting cleanup\n")
-					}
-				case syscall.SIGQUIT:
-				}
-				os.Exit(128 + int(sig.(syscall.Signal)))
-			}(sig)
-		}
-	}()
 	job.Eng.Hack_SetGlobalVar("httpapi.server", srv)
 	job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon)
 
@@ -104,7 +50,6 @@ func InitServer(job *engine.Job) engine.Status {
 	if err := srv.daemon.Install(job.Eng); err != nil {
 		return job.Error(err)
 	}
-	srv.SetRunning(true)
 	return engine.StatusOK
 }
 
@@ -119,6 +64,5 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
 		pullingPool: make(map[string]chan struct{}),
 		pushingPool: make(map[string]chan struct{}),
 	}
-	daemon.SetServer(srv)
 	return srv, nil
 }

+ 0 - 36
server/server.go

@@ -23,52 +23,16 @@ package server
 
 import (
 	"sync"
-	"time"
 
 	"github.com/docker/docker/daemon"
 	"github.com/docker/docker/engine"
 )
 
-func (srv *Server) SetRunning(status bool) {
-	srv.Lock()
-	defer srv.Unlock()
-
-	srv.running = status
-}
-
-func (srv *Server) IsRunning() bool {
-	srv.RLock()
-	defer srv.RUnlock()
-	return srv.running
-}
-
-func (srv *Server) Close() error {
-	if srv == nil {
-		return nil
-	}
-	srv.SetRunning(false)
-	done := make(chan struct{})
-	go func() {
-		srv.tasks.Wait()
-		close(done)
-	}()
-	select {
-	// Waiting server jobs for 15 seconds, shutdown immediately after that time
-	case <-time.After(time.Second * 15):
-	case <-done:
-	}
-	if srv.daemon == nil {
-		return nil
-	}
-	return srv.daemon.Close()
-}
-
 type Server struct {
 	sync.RWMutex
 	daemon      *daemon.Daemon
 	pullingPool map[string]chan struct{}
 	pushingPool map[string]chan struct{}
 	Eng         *engine.Engine
-	running     bool
 	tasks       sync.WaitGroup
 }