123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- package container // import "github.com/docker/docker/daemon/cluster/executor/container"
- import (
- "context"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "strings"
- "syscall"
- "time"
- "github.com/containerd/log"
- "github.com/distribution/reference"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/backend"
- containertypes "github.com/docker/docker/api/types/container"
- "github.com/docker/docker/api/types/events"
- imagetypes "github.com/docker/docker/api/types/image"
- "github.com/docker/docker/api/types/registry"
- containerpkg "github.com/docker/docker/container"
- "github.com/docker/docker/daemon"
- "github.com/docker/docker/daemon/cluster/convert"
- executorpkg "github.com/docker/docker/daemon/cluster/executor"
- "github.com/docker/docker/libnetwork"
- volumeopts "github.com/docker/docker/volume/service/opts"
- gogotypes "github.com/gogo/protobuf/types"
- "github.com/moby/swarmkit/v2/agent/exec"
- "github.com/moby/swarmkit/v2/api"
- swarmlog "github.com/moby/swarmkit/v2/log"
- "github.com/opencontainers/go-digest"
- "github.com/pkg/errors"
- "golang.org/x/time/rate"
- )
- // nodeAttachmentReadyInterval is the interval to poll
- const nodeAttachmentReadyInterval = 100 * time.Millisecond
- // containerAdapter conducts remote operations for a container. All calls
- // are mostly naked calls to the client API, seeded with information from
- // containerConfig.
- type containerAdapter struct {
- backend executorpkg.Backend
- imageBackend executorpkg.ImageBackend
- volumeBackend executorpkg.VolumeBackend
- container *containerConfig
- dependencies exec.DependencyGetter
- }
- func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
- ctnr, err := newContainerConfig(task, node)
- if err != nil {
- return nil, err
- }
- return &containerAdapter{
- container: ctnr,
- backend: b,
- imageBackend: i,
- volumeBackend: v,
- dependencies: dependencies,
- }, nil
- }
- func (c *containerAdapter) pullImage(ctx context.Context) error {
- spec := c.container.spec()
- // Skip pulling if the image is referenced by image ID.
- if _, err := digest.Parse(spec.Image); err == nil {
- return nil
- }
- // Skip pulling if the image is referenced by digest and already
- // exists locally.
- named, err := reference.ParseNormalizedNamed(spec.Image)
- if err == nil {
- if _, ok := named.(reference.Canonical); ok {
- _, err := c.imageBackend.GetImage(ctx, spec.Image, imagetypes.GetImageOpts{})
- if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
- return err
- }
- if err == nil {
- return nil
- }
- }
- }
- // if the image needs to be pulled, the auth config will be retrieved and updated
- var encodedAuthConfig string
- if spec.PullOptions != nil {
- encodedAuthConfig = spec.PullOptions.RegistryAuth
- }
- authConfig := ®istry.AuthConfig{}
- if encodedAuthConfig != "" {
- if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
- swarmlog.G(ctx).Warnf("invalid authconfig: %v", err)
- }
- }
- pr, pw := io.Pipe()
- metaHeaders := map[string][]string{}
- go func() {
- // TODO LCOW Support: This will need revisiting as
- // the stack is built up to include LCOW support for swarm.
- // Make sure the image has a tag, otherwise it will pull all tags.
- ref := reference.TagNameOnly(named)
- err := c.imageBackend.PullImage(ctx, ref, nil, metaHeaders, authConfig, pw)
- pw.CloseWithError(err)
- }()
- dec := json.NewDecoder(pr)
- dec.UseNumber()
- m := map[string]interface{}{}
- spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
- lastStatus := ""
- for {
- if err := dec.Decode(&m); err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
- l := swarmlog.G(ctx)
- // limit pull progress logs unless the status changes
- if spamLimiter.Allow() || lastStatus != m["status"] {
- // if we have progress details, we have everything we need
- if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
- // first, log the image and status
- l = l.WithFields(log.Fields{
- "image": c.container.image(),
- "status": m["status"],
- })
- // then, if we have progress, log the progress
- if progress["current"] != nil && progress["total"] != nil {
- l = l.WithFields(log.Fields{
- "current": progress["current"],
- "total": progress["total"],
- })
- }
- }
- l.Debug("pull in progress")
- }
- // sometimes, we get no useful information at all, and add no fields
- if status, ok := m["status"].(string); ok {
- lastStatus = status
- }
- }
- // if the final stream object contained an error, return it
- if errMsg, ok := m["error"]; ok {
- return fmt.Errorf("%v", errMsg)
- }
- return nil
- }
- // waitNodeAttachments validates that NetworkAttachments exist on this node
- // for every network in use by this task. It blocks until the network
- // attachments are ready, or the context times out. If it returns nil, then the
- // node's network attachments are all there.
- func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
- // to do this, we're going to get the attachment store and try getting the
- // IP address for each network. if any network comes back not existing,
- // we'll wait and try again.
- attachmentStore := c.backend.GetAttachmentStore()
- if attachmentStore == nil {
- return fmt.Errorf("error getting attachment store")
- }
- // essentially, we're long-polling here. this is really sub-optimal, but a
- // better solution based off signaling channels would require a more
- // substantial rearchitecture and probably not be worth our time in terms
- // of performance gains.
- poll := time.NewTicker(nodeAttachmentReadyInterval)
- defer poll.Stop()
- for {
- // set a flag ready to true. if we try to get a network IP that doesn't
- // exist yet, we will set this flag to "false"
- ready := true
- for _, attachment := range c.container.networksAttachments {
- // we only need node attachments (IP address) for overlay networks
- // TODO(dperny): unsure if this will work with other network
- // drivers, but i also don't think other network drivers use the
- // node attachment IP address.
- if attachment.Network.DriverState.Name == "overlay" {
- if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
- ready = false
- }
- }
- }
- // if everything is ready here, then we can just return no error
- if ready {
- return nil
- }
- // otherwise, try polling again, or wait for context canceled.
- select {
- case <-ctx.Done():
- return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
- case <-poll.C:
- }
- }
- }
- func (c *containerAdapter) createNetworks(ctx context.Context) error {
- for name := range c.container.networksAttachments {
- ncr, err := c.container.networkCreateRequest(name)
- if err != nil {
- return err
- }
- if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
- if _, ok := err.(libnetwork.NetworkNameError); ok {
- continue
- }
- // We will continue if CreateManagedNetwork returns PredefinedNetworkError error.
- // Other callers still can treat it as Error.
- if _, ok := err.(daemon.PredefinedNetworkError); ok {
- continue
- }
- return err
- }
- }
- return nil
- }
- func (c *containerAdapter) removeNetworks(ctx context.Context) error {
- var (
- activeEndpointsError *libnetwork.ActiveEndpointsError
- errNoSuchNetwork libnetwork.ErrNoSuchNetwork
- )
- for name, v := range c.container.networksAttachments {
- if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
- switch {
- case errors.As(err, &activeEndpointsError):
- continue
- case errors.As(err, &errNoSuchNetwork):
- continue
- default:
- swarmlog.G(ctx).Errorf("network %s remove failed: %v", name, err)
- return err
- }
- }
- }
- return nil
- }
- func (c *containerAdapter) networkAttach(ctx context.Context) error {
- config := c.container.createNetworkingConfig(c.backend)
- var (
- networkName string
- networkID string
- )
- if config != nil {
- for n, epConfig := range config.EndpointsConfig {
- networkName = n
- networkID = epConfig.NetworkID
- break
- }
- }
- return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
- }
- func (c *containerAdapter) waitForDetach(ctx context.Context) error {
- config := c.container.createNetworkingConfig(c.backend)
- var (
- networkName string
- networkID string
- )
- if config != nil {
- for n, epConfig := range config.EndpointsConfig {
- networkName = n
- networkID = epConfig.NetworkID
- break
- }
- }
- return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
- }
- func (c *containerAdapter) create(ctx context.Context) error {
- var cr containertypes.CreateResponse
- var err error
- if cr, err = c.backend.CreateManagedContainer(ctx, backend.ContainerCreateConfig{
- Name: c.container.name(),
- Config: c.container.config(),
- HostConfig: c.container.hostConfig(c.dependencies.Volumes()),
- // Use the first network in container create
- NetworkingConfig: c.container.createNetworkingConfig(c.backend),
- }); err != nil {
- return err
- }
- container := c.container.task.Spec.GetContainer()
- if container == nil {
- return errors.New("unable to get container from task spec")
- }
- if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
- return err
- }
- // configure secrets
- secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
- if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
- return err
- }
- configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
- if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
- return err
- }
- return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
- }
- // checkMounts ensures that the provided mounts won't have any host-specific
- // problems at start up. For example, we disallow bind mounts without an
- // existing path, which slightly different from the container API.
- func (c *containerAdapter) checkMounts() error {
- spec := c.container.spec()
- for _, mount := range spec.Mounts {
- switch mount.Type {
- case api.MountTypeBind:
- if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
- return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
- }
- }
- }
- return nil
- }
- func (c *containerAdapter) start(ctx context.Context) error {
- if err := c.checkMounts(); err != nil {
- return err
- }
- return c.backend.ContainerStart(ctx, c.container.name(), nil, "", "")
- }
- func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
- cs, err := c.backend.ContainerInspectCurrent(ctx, c.container.name(), false)
- if ctx.Err() != nil {
- return types.ContainerJSON{}, ctx.Err()
- }
- if err != nil {
- return types.ContainerJSON{}, err
- }
- return *cs, nil
- }
- // events issues a call to the events API and returns a channel with all
- // events. The stream of events can be shutdown by cancelling the context.
- func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
- swarmlog.G(ctx).Debugf("waiting on events")
- buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
- eventsq := make(chan events.Message, len(buffer))
- for _, event := range buffer {
- eventsq <- event
- }
- go func() {
- defer c.backend.UnsubscribeFromEvents(l)
- for {
- select {
- case ev := <-l:
- jev, ok := ev.(events.Message)
- if !ok {
- swarmlog.G(ctx).Warnf("unexpected event message: %q", ev)
- continue
- }
- select {
- case eventsq <- jev:
- case <-ctx.Done():
- return
- }
- case <-ctx.Done():
- return
- }
- }
- }()
- return eventsq
- }
- func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
- return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
- }
- func (c *containerAdapter) shutdown(ctx context.Context) error {
- options := containertypes.StopOptions{}
- // Default stop grace period to nil (daemon will use the stopTimeout of the container)
- if spec := c.container.spec(); spec.StopGracePeriod != nil {
- timeout := int(spec.StopGracePeriod.Seconds)
- options.Timeout = &timeout
- }
- return c.backend.ContainerStop(ctx, c.container.name(), options)
- }
- func (c *containerAdapter) terminate(ctx context.Context) error {
- return c.backend.ContainerKill(c.container.name(), syscall.SIGKILL.String())
- }
- func (c *containerAdapter) remove(ctx context.Context) error {
- return c.backend.ContainerRm(c.container.name(), &backend.ContainerRmConfig{
- RemoveVolume: true,
- ForceRemove: true,
- })
- }
- func (c *containerAdapter) createVolumes(ctx context.Context) error {
- // Create plugin volumes that are embedded inside a Mount
- for _, mount := range c.container.task.Spec.GetContainer().Mounts {
- mount := mount
- if mount.Type != api.MountTypeVolume {
- continue
- }
- if mount.VolumeOptions == nil {
- continue
- }
- if mount.VolumeOptions.DriverConfig == nil {
- continue
- }
- req := c.container.volumeCreateRequest(&mount)
- // Check if this volume exists on the engine
- if _, err := c.volumeBackend.Create(ctx, req.Name, req.Driver,
- volumeopts.WithCreateOptions(req.DriverOpts),
- volumeopts.WithCreateLabels(req.Labels),
- ); err != nil {
- // TODO(amitshukla): Today, volume create through the engine api does not return an error
- // when the named volume with the same parameters already exists.
- // It returns an error if the driver name is different - that is a valid error
- return err
- }
- }
- return nil
- }
- // waitClusterVolumes blocks until the VolumeGetter returns a path for each
- // cluster volume in use by this task
- func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error {
- for _, attached := range c.container.task.Volumes {
- // for every attachment, try until we succeed or until the context
- // is canceled.
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- // continue through the code.
- }
- path, err := c.dependencies.Volumes().Get(attached.ID)
- if err == nil && path != "" {
- // break out of the inner-most loop
- break
- }
- }
- }
- swarmlog.G(ctx).Debug("volumes ready")
- return nil
- }
- func (c *containerAdapter) activateServiceBinding() error {
- return c.backend.ActivateContainerServiceBinding(c.container.name())
- }
- func (c *containerAdapter) deactivateServiceBinding() error {
- return c.backend.DeactivateContainerServiceBinding(c.container.name())
- }
- func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
- apiOptions := &containertypes.LogsOptions{
- Follow: options.Follow,
- // Always say yes to Timestamps and Details. we make the decision
- // of whether to return these to the user or not way higher up the
- // stack.
- Timestamps: true,
- Details: true,
- }
- if options.Since != nil {
- since, err := gogotypes.TimestampFromProto(options.Since)
- if err != nil {
- return nil, err
- }
- // print since as this formatted string because the docker container
- // logs interface expects it like this.
- // see github.com/docker/docker/api/types/time.ParseTimestamps
- apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
- }
- if options.Tail < 0 {
- // See protobuf documentation for details of how this works.
- apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
- } else if options.Tail > 0 {
- return nil, errors.New("tail relative to start of logs not supported via docker API")
- }
- if len(options.Streams) == 0 {
- // empty == all
- apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
- } else {
- for _, stream := range options.Streams {
- switch stream {
- case api.LogStreamStdout:
- apiOptions.ShowStdout = true
- case api.LogStreamStderr:
- apiOptions.ShowStderr = true
- }
- }
- }
- msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
- if err != nil {
- return nil, err
- }
- return msgs, nil
- }
- // todo: typed/wrapped errors
- func isContainerCreateNameConflict(err error) bool {
- return strings.Contains(err.Error(), "Conflict. The name")
- }
- func isUnknownContainer(err error) bool {
- return strings.Contains(err.Error(), "No such container:")
- }
- func isStoppedContainer(err error) bool {
- return strings.Contains(err.Error(), "is already stopped")
- }
|