123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- package libcontainerd
- import (
- "fmt"
- "os"
- "strings"
- "sync"
- "time"
- containerd "github.com/containerd/containerd/api/grpc/types"
- containerd_runtime_types "github.com/containerd/containerd/runtime"
- "github.com/docker/docker/pkg/ioutils"
- "github.com/docker/docker/pkg/mount"
- "github.com/golang/protobuf/ptypes"
- "github.com/golang/protobuf/ptypes/timestamp"
- specs "github.com/opencontainers/runtime-spec/specs-go"
- "github.com/sirupsen/logrus"
- "golang.org/x/net/context"
- "golang.org/x/sys/unix"
- )
- type client struct {
- clientCommon
- // Platform specific properties below here.
- remote *remote
- q queue
- exitNotifiers map[string]*exitNotifier
- liveRestore bool
- }
- // GetServerVersion returns the connected server version information
- func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
- resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
- if err != nil {
- return nil, err
- }
- sv := &ServerVersion{
- GetServerVersionResponse: *resp,
- }
- return sv, nil
- }
- // AddProcess is the handler for adding a process to an already running
- // container. It's called through docker exec. It returns the system pid of the
- // exec'd process.
- func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- container, err := clnt.getContainer(containerID)
- if err != nil {
- return -1, err
- }
- spec, err := container.spec()
- if err != nil {
- return -1, err
- }
- sp := spec.Process
- sp.Args = specp.Args
- sp.Terminal = specp.Terminal
- if len(specp.Env) > 0 {
- sp.Env = specp.Env
- }
- if specp.Cwd != nil {
- sp.Cwd = *specp.Cwd
- }
- if specp.User != nil {
- sp.User = specs.User{
- UID: specp.User.UID,
- GID: specp.User.GID,
- AdditionalGids: specp.User.AdditionalGids,
- }
- }
- if specp.Capabilities != nil {
- sp.Capabilities.Bounding = specp.Capabilities
- sp.Capabilities.Effective = specp.Capabilities
- sp.Capabilities.Inheritable = specp.Capabilities
- sp.Capabilities.Permitted = specp.Capabilities
- }
- p := container.newProcess(processFriendlyName)
- r := &containerd.AddProcessRequest{
- Args: sp.Args,
- Cwd: sp.Cwd,
- Terminal: sp.Terminal,
- Id: containerID,
- Env: sp.Env,
- User: &containerd.User{
- Uid: sp.User.UID,
- Gid: sp.User.GID,
- AdditionalGids: sp.User.AdditionalGids,
- },
- Pid: processFriendlyName,
- Stdin: p.fifo(unix.Stdin),
- Stdout: p.fifo(unix.Stdout),
- Stderr: p.fifo(unix.Stderr),
- Capabilities: sp.Capabilities.Effective,
- ApparmorProfile: sp.ApparmorProfile,
- SelinuxLabel: sp.SelinuxLabel,
- NoNewPrivileges: sp.NoNewPrivileges,
- Rlimits: convertRlimits(sp.Rlimits),
- }
- fifoCtx, cancel := context.WithCancel(context.Background())
- defer func() {
- if err != nil {
- cancel()
- }
- }()
- iopipe, err := p.openFifos(fifoCtx, sp.Terminal)
- if err != nil {
- return -1, err
- }
- resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
- if err != nil {
- p.closeFifos(iopipe)
- return -1, err
- }
- var stdinOnce sync.Once
- stdin := iopipe.Stdin
- iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
- var err error
- stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
- err = stdin.Close()
- if err2 := p.sendCloseStdin(); err == nil {
- err = err2
- }
- })
- return err
- })
- container.processes[processFriendlyName] = p
- if err := attachStdio(*iopipe); err != nil {
- p.closeFifos(iopipe)
- return -1, err
- }
- return int(resp.SystemPid), nil
- }
- func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
- Id: containerID,
- Pid: pid,
- Signal: uint32(sig),
- })
- return err
- }
- func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- if _, err := clnt.getContainer(containerID); err != nil {
- return err
- }
- _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
- Id: containerID,
- Pid: processFriendlyName,
- Width: uint32(width),
- Height: uint32(height),
- })
- return err
- }
- func (clnt *client) Pause(containerID string) error {
- return clnt.setState(containerID, StatePause)
- }
- func (clnt *client) setState(containerID, state string) error {
- clnt.lock(containerID)
- container, err := clnt.getContainer(containerID)
- if err != nil {
- clnt.unlock(containerID)
- return err
- }
- if container.systemPid == 0 {
- clnt.unlock(containerID)
- return fmt.Errorf("No active process for container %s", containerID)
- }
- st := "running"
- if state == StatePause {
- st = "paused"
- }
- chstate := make(chan struct{})
- _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
- Id: containerID,
- Pid: InitFriendlyName,
- Status: st,
- })
- if err != nil {
- clnt.unlock(containerID)
- return err
- }
- container.pauseMonitor.append(state, chstate)
- clnt.unlock(containerID)
- <-chstate
- return nil
- }
- func (clnt *client) Resume(containerID string) error {
- return clnt.setState(containerID, StateResume)
- }
- func (clnt *client) Stats(containerID string) (*Stats, error) {
- resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
- if err != nil {
- return nil, err
- }
- return (*Stats)(resp), nil
- }
- // Take care of the old 1.11.0 behavior in case the version upgrade
- // happened without a clean daemon shutdown
- func (clnt *client) cleanupOldRootfs(containerID string) {
- // Unmount and delete the bundle folder
- if mts, err := mount.GetMounts(); err == nil {
- for _, mts := range mts {
- if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
- if err := unix.Unmount(mts.Mountpoint, unix.MNT_DETACH); err == nil {
- os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
- }
- break
- }
- }
- }
- }
- func (clnt *client) setExited(containerID string, exitCode uint32) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- err := clnt.backend.StateChanged(containerID, StateInfo{
- CommonStateInfo: CommonStateInfo{
- State: StateExit,
- ExitCode: exitCode,
- }})
- clnt.cleanupOldRootfs(containerID)
- return err
- }
- func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
- cont, err := clnt.getContainerdContainer(containerID)
- if err != nil {
- return nil, err
- }
- pids := make([]int, len(cont.Pids))
- for i, p := range cont.Pids {
- pids[i] = int(p)
- }
- return pids, nil
- }
- // Summary returns a summary of the processes running in a container.
- // This is a no-op on Linux.
- func (clnt *client) Summary(containerID string) ([]Summary, error) {
- return nil, nil
- }
- func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
- resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
- if err != nil {
- return nil, err
- }
- for _, cont := range resp.Containers {
- if cont.Id == containerID {
- return cont, nil
- }
- }
- return nil, fmt.Errorf("invalid state response")
- }
- func (clnt *client) UpdateResources(containerID string, resources Resources) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- container, err := clnt.getContainer(containerID)
- if err != nil {
- return err
- }
- if container.systemPid == 0 {
- return fmt.Errorf("No active process for container %s", containerID)
- }
- _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
- Id: containerID,
- Pid: InitFriendlyName,
- Resources: (*containerd.UpdateResource)(&resources),
- })
- return err
- }
- func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
- clnt.mapMutex.RLock()
- defer clnt.mapMutex.RUnlock()
- return clnt.exitNotifiers[containerID]
- }
- func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
- clnt.mapMutex.Lock()
- w, ok := clnt.exitNotifiers[containerID]
- defer clnt.mapMutex.Unlock()
- if !ok {
- w = &exitNotifier{c: make(chan struct{}), client: clnt}
- clnt.exitNotifiers[containerID] = w
- }
- return w
- }
- func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
- clnt.lock(cont.Id)
- defer clnt.unlock(cont.Id)
- logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
- containerID := cont.Id
- if _, err := clnt.getContainer(containerID); err == nil {
- return fmt.Errorf("container %s is already active", containerID)
- }
- defer func() {
- if err != nil {
- clnt.deleteContainer(cont.Id)
- }
- }()
- container := clnt.newContainer(cont.BundlePath, options...)
- container.systemPid = systemPid(cont)
- var terminal bool
- for _, p := range cont.Processes {
- if p.Pid == InitFriendlyName {
- terminal = p.Terminal
- }
- }
- fifoCtx, cancel := context.WithCancel(context.Background())
- defer func() {
- if err != nil {
- cancel()
- }
- }()
- iopipe, err := container.openFifos(fifoCtx, terminal)
- if err != nil {
- return err
- }
- var stdinOnce sync.Once
- stdin := iopipe.Stdin
- iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
- var err error
- stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
- err = stdin.Close()
- })
- return err
- })
- if err := attachStdio(*iopipe); err != nil {
- container.closeFifos(iopipe)
- return err
- }
- clnt.appendContainer(container)
- err = clnt.backend.StateChanged(containerID, StateInfo{
- CommonStateInfo: CommonStateInfo{
- State: StateRestore,
- Pid: container.systemPid,
- }})
- if err != nil {
- container.closeFifos(iopipe)
- return err
- }
- if lastEvent != nil {
- // This should only be a pause or resume event
- if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
- return clnt.backend.StateChanged(containerID, StateInfo{
- CommonStateInfo: CommonStateInfo{
- State: lastEvent.Type,
- Pid: container.systemPid,
- }})
- }
- logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
- }
- return nil
- }
- func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
- er := &containerd.EventsRequest{
- Timestamp: tsp,
- StoredOnly: true,
- Id: id,
- }
- events, err := clnt.remote.apiClient.Events(context.Background(), er)
- if err != nil {
- logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
- return nil, err
- }
- var ev *containerd.Event
- for {
- e, err := events.Recv()
- if err != nil {
- if err.Error() == "EOF" {
- break
- }
- logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
- return nil, err
- }
- ev = e
- logrus.Debugf("libcontainerd: received past event %#v", ev)
- }
- return ev, nil
- }
- func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
- ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
- if err == nil && ev == nil {
- // If ev is nil and the container is running in containerd,
- // we already consumed all the event of the
- // container, included the "exit" one.
- // Thus, we request all events containerd has in memory for
- // this container in order to get the last one (which should
- // be an exit event)
- logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
- // Request all events since beginning of time
- t := time.Unix(0, 0)
- tsp, err := ptypes.TimestampProto(t)
- if err != nil {
- logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
- return nil, err
- }
- return clnt.getContainerLastEventSinceTime(id, tsp)
- }
- return ev, err
- }
- func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
- // Synchronize with live events
- clnt.remote.Lock()
- defer clnt.remote.Unlock()
- // Check that containerd still knows this container.
- //
- // In the unlikely event that Restore for this container process
- // the its past event before the main loop, the event will be
- // processed twice. However, this is not an issue as all those
- // events will do is change the state of the container to be
- // exactly the same.
- cont, err := clnt.getContainerdContainer(containerID)
- // Get its last event
- ev, eerr := clnt.getContainerLastEvent(containerID)
- if err != nil || containerd_runtime_types.State(cont.Status) == containerd_runtime_types.Stopped {
- if err != nil {
- logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
- }
- if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
- // Wait a while for the exit event
- timeout := time.NewTimer(10 * time.Second)
- tick := time.NewTicker(100 * time.Millisecond)
- stop:
- for {
- select {
- case <-timeout.C:
- break stop
- case <-tick.C:
- ev, eerr = clnt.getContainerLastEvent(containerID)
- if eerr != nil {
- break stop
- }
- if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
- break stop
- }
- }
- }
- timeout.Stop()
- tick.Stop()
- }
- // get the exit status for this container, if we don't have
- // one, indicate an error
- ec := uint32(255)
- if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
- ec = ev.Status
- }
- clnt.setExited(containerID, ec)
- return nil
- }
- // container is still alive
- if clnt.liveRestore {
- if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
- logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
- }
- return nil
- }
- // Kill the container if liveRestore == false
- w := clnt.getOrCreateExitNotifier(containerID)
- clnt.lock(cont.Id)
- container := clnt.newContainer(cont.BundlePath)
- container.systemPid = systemPid(cont)
- clnt.appendContainer(container)
- clnt.unlock(cont.Id)
- container.discardFifos()
- if err := clnt.Signal(containerID, int(unix.SIGTERM)); err != nil {
- logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
- }
- // Let the main loop handle the exit event
- clnt.remote.Unlock()
- if ev != nil && ev.Type == StatePause {
- // resume container, it depends on the main loop, so we do it after Unlock()
- logrus.Debugf("libcontainerd: %s was paused, resuming it so it can die", containerID)
- if err := clnt.Resume(containerID); err != nil {
- return fmt.Errorf("failed to resume container: %v", err)
- }
- }
- select {
- case <-time.After(10 * time.Second):
- if err := clnt.Signal(containerID, int(unix.SIGKILL)); err != nil {
- logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
- }
- select {
- case <-time.After(2 * time.Second):
- case <-w.wait():
- // relock because of the defer
- clnt.remote.Lock()
- return nil
- }
- case <-w.wait():
- // relock because of the defer
- clnt.remote.Lock()
- return nil
- }
- // relock because of the defer
- clnt.remote.Lock()
- clnt.deleteContainer(containerID)
- return clnt.setExited(containerID, uint32(255))
- }
- func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- if _, err := clnt.getContainer(containerID); err != nil {
- return err
- }
- _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
- Id: containerID,
- Checkpoint: &containerd.Checkpoint{
- Name: checkpointID,
- Exit: exit,
- Tcp: true,
- UnixSockets: true,
- Shell: false,
- EmptyNS: []string{"network"},
- },
- CheckpointDir: checkpointDir,
- })
- return err
- }
- func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- if _, err := clnt.getContainer(containerID); err != nil {
- return err
- }
- _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
- Id: containerID,
- Name: checkpointID,
- CheckpointDir: checkpointDir,
- })
- return err
- }
- func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
- clnt.lock(containerID)
- defer clnt.unlock(containerID)
- if _, err := clnt.getContainer(containerID); err != nil {
- return nil, err
- }
- resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
- Id: containerID,
- CheckpointDir: checkpointDir,
- })
- if err != nil {
- return nil, err
- }
- return (*Checkpoints)(resp), nil
- }
|