a705e166cf
When user try to restart a restarting container, docker client report error: "container is already active", and container will be stopped instead be restarted which is seriously wrong. What's more critical is that when user try to start this container again, it will always fail. This error can also be reproduced with a `docker stop`+`docker start`. And this commit will fix the bug. Signed-off-by: Zhang Wei <zhangwei555@huawei.com>
402 lines
9.6 KiB
Go
402 lines
9.6 KiB
Go
package libcontainerd
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
containerd "github.com/docker/containerd/api/grpc/types"
|
|
"github.com/docker/docker/pkg/idtools"
|
|
"github.com/docker/docker/pkg/mount"
|
|
"github.com/opencontainers/specs/specs-go"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type client struct {
|
|
clientCommon
|
|
|
|
// Platform specific properties below here.
|
|
remote *remote
|
|
q queue
|
|
exitNotifiers map[string]*exitNotifier
|
|
}
|
|
|
|
func (clnt *client) AddProcess(containerID, processFriendlyName string, specp Process) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
spec, err := container.spec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sp := spec.Process
|
|
sp.Args = specp.Args
|
|
sp.Terminal = specp.Terminal
|
|
if specp.Env != nil {
|
|
sp.Env = specp.Env
|
|
}
|
|
if specp.Cwd != nil {
|
|
sp.Cwd = *specp.Cwd
|
|
}
|
|
if specp.User != nil {
|
|
sp.User = specs.User{
|
|
UID: specp.User.UID,
|
|
GID: specp.User.GID,
|
|
AdditionalGids: specp.User.AdditionalGids,
|
|
}
|
|
}
|
|
if specp.Capabilities != nil {
|
|
sp.Capabilities = specp.Capabilities
|
|
}
|
|
|
|
p := container.newProcess(processFriendlyName)
|
|
|
|
r := &containerd.AddProcessRequest{
|
|
Args: sp.Args,
|
|
Cwd: sp.Cwd,
|
|
Terminal: sp.Terminal,
|
|
Id: containerID,
|
|
Env: sp.Env,
|
|
User: &containerd.User{
|
|
Uid: sp.User.UID,
|
|
Gid: sp.User.GID,
|
|
AdditionalGids: sp.User.AdditionalGids,
|
|
},
|
|
Pid: processFriendlyName,
|
|
Stdin: p.fifo(syscall.Stdin),
|
|
Stdout: p.fifo(syscall.Stdout),
|
|
Stderr: p.fifo(syscall.Stderr),
|
|
Capabilities: sp.Capabilities,
|
|
ApparmorProfile: sp.ApparmorProfile,
|
|
SelinuxLabel: sp.SelinuxLabel,
|
|
NoNewPrivileges: sp.NoNewPrivileges,
|
|
Rlimits: convertRlimits(sp.Rlimits),
|
|
}
|
|
|
|
iopipe, err := p.openFifos(sp.Terminal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := clnt.remote.apiClient.AddProcess(context.Background(), r); err != nil {
|
|
p.closeFifos(iopipe)
|
|
return err
|
|
}
|
|
|
|
container.processes[processFriendlyName] = p
|
|
|
|
clnt.unlock(containerID)
|
|
|
|
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
|
|
return err
|
|
}
|
|
clnt.lock(containerID)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
|
|
root, err := filepath.Abs(clnt.remote.stateDir)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if uid == 0 && gid == 0 {
|
|
return root, nil
|
|
}
|
|
p := string(filepath.Separator)
|
|
for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
|
|
p = filepath.Join(p, d)
|
|
fi, err := os.Stat(p)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return "", err
|
|
}
|
|
if os.IsNotExist(err) || fi.Mode()&1 == 0 {
|
|
p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
|
|
if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
|
|
if ctr, err := clnt.getContainer(containerID); err == nil {
|
|
if ctr.restarting {
|
|
ctr.restartManager.Cancel()
|
|
ctr.clean()
|
|
} else {
|
|
return fmt.Errorf("Container %s is already active", containerID)
|
|
}
|
|
}
|
|
|
|
uid, gid, err := getRootIDs(specs.Spec(spec))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dir, err := clnt.prepareBundleDir(uid, gid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
container := clnt.newContainer(filepath.Join(dir, containerID), options...)
|
|
if err := container.clean(); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
container.clean()
|
|
clnt.deleteContainer(containerID)
|
|
}
|
|
}()
|
|
|
|
// uid/gid
|
|
rootfsDir := filepath.Join(container.dir, "rootfs")
|
|
if err := idtools.MkdirAllAs(rootfsDir, 0700, uid, gid); err != nil && !os.IsExist(err) {
|
|
return err
|
|
}
|
|
if err := syscall.Mount(spec.Root.Path, rootfsDir, "bind", syscall.MS_REC|syscall.MS_BIND, ""); err != nil {
|
|
return err
|
|
}
|
|
spec.Root.Path = "rootfs"
|
|
|
|
f, err := os.Create(filepath.Join(container.dir, configFilename))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
if err := json.NewEncoder(f).Encode(spec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return container.start()
|
|
}
|
|
|
|
func (clnt *client) Signal(containerID string, sig int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Signal: uint32(sig),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return err
|
|
}
|
|
_, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
|
|
Id: containerID,
|
|
Pid: processFriendlyName,
|
|
Width: uint32(width),
|
|
Height: uint32(height),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Pause(containerID string) error {
|
|
return clnt.setState(containerID, StatePause)
|
|
}
|
|
|
|
func (clnt *client) setState(containerID, state string) error {
|
|
clnt.lock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
clnt.unlock(containerID)
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
st := "running"
|
|
if state == StatePause {
|
|
st = "paused"
|
|
}
|
|
chstate := make(chan struct{})
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Status: st,
|
|
})
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
container.pauseMonitor.append(state, chstate)
|
|
clnt.unlock(containerID)
|
|
<-chstate
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) Resume(containerID string) error {
|
|
return clnt.setState(containerID, StateResume)
|
|
}
|
|
|
|
func (clnt *client) Stats(containerID string) (*Stats, error) {
|
|
resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return (*Stats)(resp), nil
|
|
}
|
|
|
|
func (clnt *client) setExited(containerID string) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
|
|
var exitCode uint32
|
|
if event, ok := clnt.remote.pastEvents[containerID]; ok {
|
|
exitCode = event.Status
|
|
delete(clnt.remote.pastEvents, containerID)
|
|
}
|
|
|
|
err := clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: StateExit,
|
|
ExitCode: exitCode,
|
|
}})
|
|
|
|
// Unmount and delete the bundle folder
|
|
if mts, err := mount.GetMounts(); err == nil {
|
|
for _, mts := range mts {
|
|
if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
|
|
if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
|
|
os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
|
|
cont, err := clnt.getContainerdContainer(containerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]int, len(cont.Pids))
|
|
for i, p := range cont.Pids {
|
|
pids[i] = int(p)
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
// Summary returns a summary of the processes running in a container.
|
|
// This is a no-op on Linux.
|
|
func (clnt *client) Summary(containerID string) ([]Summary, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
|
|
resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, cont := range resp.Containers {
|
|
if cont.Id == containerID {
|
|
return cont, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("invalid state response")
|
|
}
|
|
|
|
func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
|
|
container := &container{
|
|
containerCommon: containerCommon{
|
|
process: process{
|
|
dir: dir,
|
|
processCommon: processCommon{
|
|
containerID: filepath.Base(dir),
|
|
client: clnt,
|
|
friendlyName: InitFriendlyName,
|
|
},
|
|
},
|
|
processes: make(map[string]*process),
|
|
},
|
|
}
|
|
for _, option := range options {
|
|
if err := option.Apply(container); err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
}
|
|
return container
|
|
}
|
|
|
|
func (clnt *client) UpdateResources(containerID string, resources Resources) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Resources: (*containerd.UpdateResource)(&resources),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.RLock()
|
|
defer clnt.mapMutex.RUnlock()
|
|
return clnt.exitNotifiers[containerID]
|
|
}
|
|
|
|
func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.Lock()
|
|
w, ok := clnt.exitNotifiers[containerID]
|
|
defer clnt.mapMutex.Unlock()
|
|
if !ok {
|
|
w = &exitNotifier{c: make(chan struct{}), client: clnt}
|
|
clnt.exitNotifiers[containerID] = w
|
|
}
|
|
return w
|
|
}
|
|
|
|
type exitNotifier struct {
|
|
id string
|
|
client *client
|
|
c chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
func (en *exitNotifier) close() {
|
|
en.once.Do(func() {
|
|
close(en.c)
|
|
en.client.mapMutex.Lock()
|
|
if en == en.client.exitNotifiers[en.id] {
|
|
delete(en.client.exitNotifiers, en.id)
|
|
}
|
|
en.client.mapMutex.Unlock()
|
|
})
|
|
}
|
|
func (en *exitNotifier) wait() <-chan struct{} {
|
|
return en.c
|
|
}
|