Refresh containerd remotes on containerd restarted

Before this patch, when containerd is restarted (due to a crash, or
kill, whatever), the daemon would keep trying to process the event
stream against the old socket handles. This would lead to a CPU spin due
to the error handling when the client can't connect to containerd.

This change makes sure the containerd remote client is updated for all
registered libcontainerd clients.

This is not neccessarily the ideal fix which would likely require a
major refactor, but at least gets things to a working state with a
minimal patch.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2018-01-31 17:32:40 -05:00
parent 2e8ccbb49e
commit 400126f869
2 changed files with 81 additions and 36 deletions

View file

@ -114,8 +114,21 @@ type client struct {
containers map[string]*container containers map[string]*container
} }
func (c *client) setRemote(remote *containerd.Client) {
c.Lock()
c.remote = remote
c.Unlock()
}
func (c *client) getRemote() *containerd.Client {
c.RLock()
remote := c.remote
c.RUnlock()
return remote
}
func (c *client) Version(ctx context.Context) (containerd.Version, error) { func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.remote.Version(ctx) return c.getRemote().Version(ctx)
} }
func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) { func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
@ -187,7 +200,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
cdCtr, err := c.remote.NewContainer(ctx, id, cdCtr, err := c.getRemote().NewContainer(ctx, id,
containerd.WithSpec(ociSpec), containerd.WithSpec(ociSpec),
// TODO(mlaventure): when containerd support lcow, revisit runtime value // TODO(mlaventure): when containerd support lcow, revisit runtime value
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions)) containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
@ -230,7 +243,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
// remove the checkpoint when we're done // remove the checkpoint when we're done
defer func() { defer func() {
if cp != nil { if cp != nil {
err := c.remote.ContentStore().Delete(context.Background(), cp.Digest) err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
if err != nil { if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{ c.logger.WithError(err).WithFields(logrus.Fields{
"ref": checkpointDir, "ref": checkpointDir,
@ -528,14 +541,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
} }
// Whatever happens, delete the checkpoint from containerd // Whatever happens, delete the checkpoint from containerd
defer func() { defer func() {
err := c.remote.ImageService().Delete(context.Background(), img.Name()) err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
if err != nil { if err != nil {
c.logger.WithError(err).WithField("digest", img.Target().Digest). c.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image") Warnf("failed to delete checkpoint image")
} }
}() }()
b, err := content.ReadBlob(ctx, c.remote.ContentStore(), img.Target().Digest) b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target().Digest)
if err != nil { if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
} }
@ -555,7 +568,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
} }
rat, err := c.remote.ContentStore().ReaderAt(ctx, cpDesc.Digest) rat, err := c.getRemote().ContentStore().ReaderAt(ctx, cpDesc.Digest)
if err != nil { if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
} }
@ -708,7 +721,7 @@ func (c *client) processEventStream(ctx context.Context) {
} }
}() }()
eventStream, err = c.remote.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: []string{ Filters: []string{
// Filter on both namespace *and* topic. To create an "and" filter, // Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string // this must be a single, comma-separated string
@ -719,6 +732,8 @@ func (c *client) processEventStream(ctx context.Context) {
return return
} }
c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
var oomKilled bool var oomKilled bool
for { for {
ev, err = eventStream.Recv() ev, err = eventStream.Recv()
@ -822,7 +837,7 @@ func (c *client) processEventStream(ctx context.Context) {
} }
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := c.remote.ContentStore().Writer(ctx, ref, 0, "") writer, err := c.getRemote().ContentStore().Writer(ctx, ref, 0, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -260,7 +260,7 @@ func (r *remote) startContainerd() error {
return nil return nil
} }
func (r *remote) monitorConnection(client *containerd.Client) { func (r *remote) monitorConnection(monitor *containerd.Client) {
var transientFailureCount = 0 var transientFailureCount = 0
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
@ -269,7 +269,7 @@ func (r *remote) monitorConnection(client *containerd.Client) {
for { for {
<-ticker.C <-ticker.C
ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout) ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
_, err := client.IsServing(ctx) _, err := monitor.IsServing(ctx)
cancel() cancel()
if err == nil { if err == nil {
transientFailureCount = 0 transientFailureCount = 0
@ -279,39 +279,69 @@ func (r *remote) monitorConnection(client *containerd.Client) {
select { select {
case <-r.shutdownContext.Done(): case <-r.shutdownContext.Done():
r.logger.Info("stopping healthcheck following graceful shutdown") r.logger.Info("stopping healthcheck following graceful shutdown")
client.Close() monitor.Close()
return return
default: default:
} }
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding") r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
if r.daemonPid != -1 { if r.daemonPid == -1 {
transientFailureCount++ continue
if transientFailureCount >= maxConnectionRetryCount || !system.IsProcessAlive(r.daemonPid) { }
transientFailureCount = 0
if system.IsProcessAlive(r.daemonPid) { transientFailureCount++
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd") if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
// Try to get a stack trace continue
syscall.Kill(r.daemonPid, syscall.SIGUSR1) }
<-time.After(100 * time.Millisecond)
system.KillProcess(r.daemonPid) transientFailureCount = 0
if system.IsProcessAlive(r.daemonPid) {
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
// Try to get a stack trace
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
<-time.After(100 * time.Millisecond)
system.KillProcess(r.daemonPid)
}
<-r.daemonWaitCh
monitor.Close()
os.Remove(r.GRPC.Address)
if err := r.startContainerd(); err != nil {
r.logger.WithError(err).Error("failed restarting containerd")
continue
}
newMonitor, err := containerd.New(r.GRPC.Address)
if err != nil {
r.logger.WithError(err).Error("failed connect to containerd")
continue
}
monitor = newMonitor
var wg sync.WaitGroup
for _, c := range r.clients {
wg.Add(1)
go func(c *client) {
defer wg.Done()
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
c.remote.Close()
remote, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(c.namespace))
if err != nil {
r.logger.WithError(err).Error("failed to connect to containerd")
// TODO: Better way to handle this?
// This *shouldn't* happen, but this could wind up where the daemon
// is not able to communicate with an eventually up containerd
return
} }
<-r.daemonWaitCh
var err error c.setRemote(remote)
client.Close() }(c)
os.Remove(r.GRPC.Address)
if err = r.startContainerd(); err != nil { wg.Wait()
r.logger.WithError(err).Error("failed restarting containerd")
} else {
newClient, err := containerd.New(r.GRPC.Address)
if err != nil {
r.logger.WithError(err).Error("failed connect to containerd")
} else {
client = newClient
}
}
}
} }
} }
} }