Add synchronization between jobs and server closing
Fixes #5154 Daemon waiting 15 seconds for finishing server jobs before shutdown. In this time it doesn't accept new jobs. After this time, it shutdown despite running jobs. Docker-DCO-1.1-Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com> (github: LK4D4)
This commit is contained in:
parent
7470ebf30b
commit
f92d68a6ce
1 changed files with 24 additions and 2 deletions
|
@ -55,6 +55,17 @@ import (
|
||||||
"github.com/dotcloud/docker/utils"
|
"github.com/dotcloud/docker/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (srv *Server) handlerWrap(h engine.Handler) engine.Handler {
|
||||||
|
return func(job *engine.Job) engine.Status {
|
||||||
|
if !srv.IsRunning() {
|
||||||
|
return job.Errorf("Server is not running")
|
||||||
|
}
|
||||||
|
srv.tasks.Add(1)
|
||||||
|
defer srv.tasks.Done()
|
||||||
|
return h(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// jobInitApi runs the remote api server `srv` as a daemon,
|
// jobInitApi runs the remote api server `srv` as a daemon,
|
||||||
// Only one api server can run at the same time - this is enforced by a pidfile.
|
// Only one api server can run at the same time - this is enforced by a pidfile.
|
||||||
// The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
|
// The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
|
||||||
|
@ -136,7 +147,7 @@ func InitServer(job *engine.Job) engine.Status {
|
||||||
"push": srv.ImagePush,
|
"push": srv.ImagePush,
|
||||||
"containers": srv.Containers,
|
"containers": srv.Containers,
|
||||||
} {
|
} {
|
||||||
if err := job.Eng.Register(name, handler); err != nil {
|
if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil {
|
||||||
return job.Error(err)
|
return job.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -150,6 +161,7 @@ func InitServer(job *engine.Job) engine.Status {
|
||||||
if err := srv.daemon.Install(job.Eng); err != nil {
|
if err := srv.daemon.Install(job.Eng); err != nil {
|
||||||
return job.Error(err)
|
return job.Error(err)
|
||||||
}
|
}
|
||||||
|
srv.SetRunning(true)
|
||||||
return engine.StatusOK
|
return engine.StatusOK
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2365,7 +2377,6 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
|
||||||
pushingPool: make(map[string]chan struct{}),
|
pushingPool: make(map[string]chan struct{}),
|
||||||
events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
|
events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
|
||||||
listeners: make(map[int64]chan utils.JSONMessage),
|
listeners: make(map[int64]chan utils.JSONMessage),
|
||||||
running: true,
|
|
||||||
}
|
}
|
||||||
daemon.SetServer(srv)
|
daemon.SetServer(srv)
|
||||||
return srv, nil
|
return srv, nil
|
||||||
|
@ -2414,6 +2425,16 @@ func (srv *Server) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
srv.SetRunning(false)
|
srv.SetRunning(false)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
srv.tasks.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
// Waiting server jobs for 15 seconds, shutdown immediately after that time
|
||||||
|
case <-time.After(time.Second * 15):
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
if srv.daemon == nil {
|
if srv.daemon == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2429,4 +2450,5 @@ type Server struct {
|
||||||
listeners map[int64]chan utils.JSONMessage
|
listeners map[int64]chan utils.JSONMessage
|
||||||
Eng *engine.Engine
|
Eng *engine.Engine
|
||||||
running bool
|
running bool
|
||||||
|
tasks sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue