Use containerd client Reconnect()
API.
This fixes an issue where the containerd client is cached in a container object in libcontainerd and becomes stale after containerd is restarted. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
30cb23360e
commit
2c682d5209
3 changed files with 9 additions and 27 deletions
|
@ -114,6 +114,13 @@ type client struct {
|
||||||
containers map[string]*container
|
containers map[string]*container
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *client) reconnect() error {
|
||||||
|
c.Lock()
|
||||||
|
err := c.remote.Reconnect()
|
||||||
|
c.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *client) setRemote(remote *containerd.Client) {
|
func (c *client) setRemote(remote *containerd.Client) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
c.remote = remote
|
c.remote = remote
|
||||||
|
|
|
@ -309,20 +309,17 @@ func (r *remote) monitorConnection(monitor *containerd.Client) {
|
||||||
}
|
}
|
||||||
<-r.daemonWaitCh
|
<-r.daemonWaitCh
|
||||||
|
|
||||||
monitor.Close()
|
|
||||||
os.Remove(r.GRPC.Address)
|
os.Remove(r.GRPC.Address)
|
||||||
if err := r.startContainerd(); err != nil {
|
if err := r.startContainerd(); err != nil {
|
||||||
r.logger.WithError(err).Error("failed restarting containerd")
|
r.logger.WithError(err).Error("failed restarting containerd")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
newMonitor, err := containerd.New(r.GRPC.Address)
|
if err := monitor.Reconnect(); err != nil {
|
||||||
if err != nil {
|
|
||||||
r.logger.WithError(err).Error("failed connect to containerd")
|
r.logger.WithError(err).Error("failed connect to containerd")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
monitor = newMonitor
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, c := range r.clients {
|
for _, c := range r.clients {
|
||||||
|
@ -331,18 +328,12 @@ func (r *remote) monitorConnection(monitor *containerd.Client) {
|
||||||
go func(c *client) {
|
go func(c *client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
|
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
|
||||||
c.remote.Close()
|
if err := c.reconnect(); err != nil {
|
||||||
|
|
||||||
remote, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(c.namespace))
|
|
||||||
if err != nil {
|
|
||||||
r.logger.WithError(err).Error("failed to connect to containerd")
|
r.logger.WithError(err).Error("failed to connect to containerd")
|
||||||
// TODO: Better way to handle this?
|
// TODO: Better way to handle this?
|
||||||
// This *shouldn't* happen, but this could wind up where the daemon
|
// This *shouldn't* happen, but this could wind up where the daemon
|
||||||
// is not able to communicate with an eventually up containerd
|
// is not able to communicate with an eventually up containerd
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.setRemote(remote)
|
|
||||||
}(c)
|
}(c)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -16,19 +16,3 @@ func (o oomScore) Apply(r Remote) error {
|
||||||
}
|
}
|
||||||
return fmt.Errorf("WithOOMScore option not supported for this remote")
|
return fmt.Errorf("WithOOMScore option not supported for this remote")
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithSubreaper sets whether containerd should register itself as a
|
|
||||||
// subreaper
|
|
||||||
func WithSubreaper(reap bool) RemoteOption {
|
|
||||||
return subreaper(reap)
|
|
||||||
}
|
|
||||||
|
|
||||||
type subreaper bool
|
|
||||||
|
|
||||||
func (s subreaper) Apply(r Remote) error {
|
|
||||||
if remote, ok := r.(*remote); ok {
|
|
||||||
remote.NoSubreaper = !bool(s)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return fmt.Errorf("WithSubreaper option not supported for this remote")
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue