Browse Source

Merge pull request #36173 from cpuguy83/fix_containerd_crash_spin

Refresh containerd remotes on containerd restarted
Yong Tang 7 years ago
parent
commit
384ff69f2f
2 changed files with 81 additions and 36 deletions
  1. 23 8
      libcontainerd/client_daemon.go
  2. 58 28
      libcontainerd/remote_daemon.go

+ 23 - 8
libcontainerd/client_daemon.go

@@ -114,8 +114,21 @@ type client struct {
 	containers map[string]*container
 	containers map[string]*container
 }
 }
 
 
+func (c *client) setRemote(remote *containerd.Client) {
+	c.Lock()
+	c.remote = remote
+	c.Unlock()
+}
+
+func (c *client) getRemote() *containerd.Client {
+	c.RLock()
+	remote := c.remote
+	c.RUnlock()
+	return remote
+}
+
 func (c *client) Version(ctx context.Context) (containerd.Version, error) {
 func (c *client) Version(ctx context.Context) (containerd.Version, error) {
-	return c.remote.Version(ctx)
+	return c.getRemote().Version(ctx)
 }
 }
 
 
 func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
 func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
@@ -187,7 +200,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
 
 
 	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
 	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
 
 
-	cdCtr, err := c.remote.NewContainer(ctx, id,
+	cdCtr, err := c.getRemote().NewContainer(ctx, id,
 		containerd.WithSpec(ociSpec),
 		containerd.WithSpec(ociSpec),
 		// TODO(mlaventure): when containerd support lcow, revisit runtime value
 		// TODO(mlaventure): when containerd support lcow, revisit runtime value
 		containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
 		containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
@@ -230,7 +243,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 		// remove the checkpoint when we're done
 		// remove the checkpoint when we're done
 		defer func() {
 		defer func() {
 			if cp != nil {
 			if cp != nil {
-				err := c.remote.ContentStore().Delete(context.Background(), cp.Digest)
+				err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
 				if err != nil {
 				if err != nil {
 					c.logger.WithError(err).WithFields(logrus.Fields{
 					c.logger.WithError(err).WithFields(logrus.Fields{
 						"ref":    checkpointDir,
 						"ref":    checkpointDir,
@@ -528,14 +541,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
 	}
 	}
 	// Whatever happens, delete the checkpoint from containerd
 	// Whatever happens, delete the checkpoint from containerd
 	defer func() {
 	defer func() {
-		err := c.remote.ImageService().Delete(context.Background(), img.Name())
+		err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
 		if err != nil {
 		if err != nil {
 			c.logger.WithError(err).WithField("digest", img.Target().Digest).
 			c.logger.WithError(err).WithField("digest", img.Target().Digest).
 				Warnf("failed to delete checkpoint image")
 				Warnf("failed to delete checkpoint image")
 		}
 		}
 	}()
 	}()
 
 
-	b, err := content.ReadBlob(ctx, c.remote.ContentStore(), img.Target().Digest)
+	b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target().Digest)
 	if err != nil {
 	if err != nil {
 		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
 		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
 	}
 	}
@@ -555,7 +568,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
 		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
 		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
 	}
 	}
 
 
-	rat, err := c.remote.ContentStore().ReaderAt(ctx, cpDesc.Digest)
+	rat, err := c.getRemote().ContentStore().ReaderAt(ctx, cpDesc.Digest)
 	if err != nil {
 	if err != nil {
 		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
 		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
 	}
 	}
@@ -708,7 +721,7 @@ func (c *client) processEventStream(ctx context.Context) {
 		}
 		}
 	}()
 	}()
 
 
-	eventStream, err = c.remote.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
+	eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
 		Filters: []string{
 		Filters: []string{
 			// Filter on both namespace *and* topic. To create an "and" filter,
 			// Filter on both namespace *and* topic. To create an "and" filter,
 			// this must be a single, comma-separated string
 			// this must be a single, comma-separated string
@@ -719,6 +732,8 @@ func (c *client) processEventStream(ctx context.Context) {
 		return
 		return
 	}
 	}
 
 
+	c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
+
 	var oomKilled bool
 	var oomKilled bool
 	for {
 	for {
 		ev, err = eventStream.Recv()
 		ev, err = eventStream.Recv()
@@ -822,7 +837,7 @@ func (c *client) processEventStream(ctx context.Context) {
 }
 }
 
 
 func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
 func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
-	writer, err := c.remote.ContentStore().Writer(ctx, ref, 0, "")
+	writer, err := c.getRemote().ContentStore().Writer(ctx, ref, 0, "")
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 58 - 28
libcontainerd/remote_daemon.go

@@ -260,7 +260,7 @@ func (r *remote) startContainerd() error {
 	return nil
 	return nil
 }
 }
 
 
-func (r *remote) monitorConnection(client *containerd.Client) {
+func (r *remote) monitorConnection(monitor *containerd.Client) {
 	var transientFailureCount = 0
 	var transientFailureCount = 0
 
 
 	ticker := time.NewTicker(500 * time.Millisecond)
 	ticker := time.NewTicker(500 * time.Millisecond)
@@ -269,7 +269,7 @@ func (r *remote) monitorConnection(client *containerd.Client) {
 	for {
 	for {
 		<-ticker.C
 		<-ticker.C
 		ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
 		ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
-		_, err := client.IsServing(ctx)
+		_, err := monitor.IsServing(ctx)
 		cancel()
 		cancel()
 		if err == nil {
 		if err == nil {
 			transientFailureCount = 0
 			transientFailureCount = 0
@@ -279,39 +279,69 @@ func (r *remote) monitorConnection(client *containerd.Client) {
 		select {
 		select {
 		case <-r.shutdownContext.Done():
 		case <-r.shutdownContext.Done():
 			r.logger.Info("stopping healthcheck following graceful shutdown")
 			r.logger.Info("stopping healthcheck following graceful shutdown")
-			client.Close()
+			monitor.Close()
 			return
 			return
 		default:
 		default:
 		}
 		}
 
 
 		r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
 		r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
 
 
-		if r.daemonPid != -1 {
-			transientFailureCount++
-			if transientFailureCount >= maxConnectionRetryCount || !system.IsProcessAlive(r.daemonPid) {
-				transientFailureCount = 0
-				if system.IsProcessAlive(r.daemonPid) {
-					r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
-					// Try to get a stack trace
-					syscall.Kill(r.daemonPid, syscall.SIGUSR1)
-					<-time.After(100 * time.Millisecond)
-					system.KillProcess(r.daemonPid)
-				}
-				<-r.daemonWaitCh
-				var err error
-				client.Close()
-				os.Remove(r.GRPC.Address)
-				if err = r.startContainerd(); err != nil {
-					r.logger.WithError(err).Error("failed restarting containerd")
-				} else {
-					newClient, err := containerd.New(r.GRPC.Address)
-					if err != nil {
-						r.logger.WithError(err).Error("failed connect to containerd")
-					} else {
-						client = newClient
-					}
+		if r.daemonPid == -1 {
+			continue
+		}
+
+		transientFailureCount++
+		if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
+			continue
+		}
+
+		transientFailureCount = 0
+		if system.IsProcessAlive(r.daemonPid) {
+			r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
+			// Try to get a stack trace
+			syscall.Kill(r.daemonPid, syscall.SIGUSR1)
+			<-time.After(100 * time.Millisecond)
+			system.KillProcess(r.daemonPid)
+		}
+		<-r.daemonWaitCh
+
+		monitor.Close()
+		os.Remove(r.GRPC.Address)
+		if err := r.startContainerd(); err != nil {
+			r.logger.WithError(err).Error("failed restarting containerd")
+			continue
+		}
+
+		newMonitor, err := containerd.New(r.GRPC.Address)
+		if err != nil {
+			r.logger.WithError(err).Error("failed connect to containerd")
+			continue
+		}
+
+		monitor = newMonitor
+		var wg sync.WaitGroup
+
+		for _, c := range r.clients {
+			wg.Add(1)
+
+			go func(c *client) {
+				defer wg.Done()
+				c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
+				c.remote.Close()
+
+				remote, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(c.namespace))
+				if err != nil {
+					r.logger.WithError(err).Error("failed to connect to containerd")
+					// TODO: Better way to handle this?
+					// This *shouldn't* happen, but this could wind up where the daemon
+					// is not able to communicate with an eventually up containerd
+					return
 				}
 				}
-			}
+
+				c.setRemote(remote)
+			}(c)
+
+			wg.Wait()
 		}
 		}
 	}
 	}
 }
 }