123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package container
- import (
- "encoding/base64"
- "encoding/json"
- "fmt"
- "io"
- "strings"
- "syscall"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/api/server/httputils"
- executorpkg "github.com/docker/docker/daemon/cluster/executor"
- "github.com/docker/engine-api/types"
- "github.com/docker/engine-api/types/events"
- "github.com/docker/engine-api/types/versions"
- "github.com/docker/libnetwork"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "golang.org/x/net/context"
- )
- // containerAdapter conducts remote operations for a container. All calls
- // are mostly naked calls to the client API, seeded with information from
- // containerConfig.
- type containerAdapter struct {
- backend executorpkg.Backend
- container *containerConfig
- }
- func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) {
- ctnr, err := newContainerConfig(task)
- if err != nil {
- return nil, err
- }
- return &containerAdapter{
- container: ctnr,
- backend: b,
- }, nil
- }
- func (c *containerAdapter) pullImage(ctx context.Context) error {
- spec := c.container.spec()
- // if the image needs to be pulled, the auth config will be retrieved and updated
- var encodedAuthConfig string
- if spec.PullOptions != nil {
- encodedAuthConfig = spec.PullOptions.RegistryAuth
- }
- authConfig := &types.AuthConfig{}
- if encodedAuthConfig != "" {
- if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
- logrus.Warnf("invalid authconfig: %v", err)
- }
- }
- pr, pw := io.Pipe()
- metaHeaders := map[string][]string{}
- go func() {
- err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
- pw.CloseWithError(err)
- }()
- dec := json.NewDecoder(pr)
- m := map[string]interface{}{}
- for {
- if err := dec.Decode(&m); err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
- // TODO(stevvooe): Report this status somewhere.
- logrus.Debugln("pull progress", m)
- }
- // if the final stream object contained an error, return it
- if errMsg, ok := m["error"]; ok {
- return fmt.Errorf("%v", errMsg)
- }
- return nil
- }
- func (c *containerAdapter) createNetworks(ctx context.Context) error {
- for _, network := range c.container.networks() {
- ncr, err := c.container.networkCreateRequest(network)
- if err != nil {
- return err
- }
- if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
- if _, ok := err.(libnetwork.NetworkNameError); ok {
- continue
- }
- return err
- }
- }
- return nil
- }
- func (c *containerAdapter) removeNetworks(ctx context.Context) error {
- for _, nid := range c.container.networks() {
- if err := c.backend.DeleteManagedNetwork(nid); err != nil {
- if _, ok := err.(*libnetwork.ActiveEndpointsError); ok {
- continue
- }
- log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
- return err
- }
- }
- return nil
- }
- func (c *containerAdapter) create(ctx context.Context, backend executorpkg.Backend) error {
- var cr types.ContainerCreateResponse
- var err error
- version := httputils.VersionFromContext(ctx)
- validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
- if cr, err = backend.CreateManagedContainer(types.ContainerCreateConfig{
- Name: c.container.name(),
- Config: c.container.config(),
- HostConfig: c.container.hostConfig(),
- // Use the first network in container create
- NetworkingConfig: c.container.createNetworkingConfig(),
- }, validateHostname); err != nil {
- return err
- }
- // Docker daemon currently doesn't support multiple networks in container create
- // Connect to all other networks
- nc := c.container.connectNetworkingConfig()
- if nc != nil {
- for n, ep := range nc.EndpointsConfig {
- if err := backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
- return err
- }
- }
- }
- if err := backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
- return err
- }
- return nil
- }
- func (c *containerAdapter) start(ctx context.Context) error {
- version := httputils.VersionFromContext(ctx)
- validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
- return c.backend.ContainerStart(c.container.name(), nil, validateHostname)
- }
- func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
- cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
- if ctx.Err() != nil {
- return types.ContainerJSON{}, ctx.Err()
- }
- if err != nil {
- return types.ContainerJSON{}, err
- }
- return *cs, nil
- }
- // events issues a call to the events API and returns a channel with all
- // events. The stream of events can be shutdown by cancelling the context.
- func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
- log.G(ctx).Debugf("waiting on events")
- buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
- eventsq := make(chan events.Message, len(buffer))
- for _, event := range buffer {
- eventsq <- event
- }
- go func() {
- defer c.backend.UnsubscribeFromEvents(l)
- for {
- select {
- case ev := <-l:
- jev, ok := ev.(events.Message)
- if !ok {
- log.G(ctx).Warnf("unexpected event message: %q", ev)
- continue
- }
- select {
- case eventsq <- jev:
- case <-ctx.Done():
- return
- }
- case <-ctx.Done():
- return
- }
- }
- }()
- return eventsq
- }
- func (c *containerAdapter) wait(ctx context.Context) error {
- return c.backend.ContainerWaitWithContext(ctx, c.container.name())
- }
- func (c *containerAdapter) shutdown(ctx context.Context) error {
- // Default stop grace period to 10s.
- stopgrace := 10
- spec := c.container.spec()
- if spec.StopGracePeriod != nil {
- stopgrace = int(spec.StopGracePeriod.Seconds)
- }
- return c.backend.ContainerStop(c.container.name(), stopgrace)
- }
- func (c *containerAdapter) terminate(ctx context.Context) error {
- return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
- }
- func (c *containerAdapter) remove(ctx context.Context) error {
- return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
- RemoveVolume: true,
- ForceRemove: true,
- })
- }
- func (c *containerAdapter) createVolumes(ctx context.Context, backend executorpkg.Backend) error {
- // Create plugin volumes that are embedded inside a Mount
- for _, mount := range c.container.task.Spec.GetContainer().Mounts {
- if mount.Type != api.MountTypeVolume {
- continue
- }
- if mount.VolumeOptions == nil {
- continue
- }
- if mount.VolumeOptions.DriverConfig == nil {
- continue
- }
- req := c.container.volumeCreateRequest(&mount)
- // Check if this volume exists on the engine
- if _, err := backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
- // TODO(amitshukla): Today, volume create through the engine api does not return an error
- // when the named volume with the same parameters already exists.
- // It returns an error if the driver name is different - that is a valid error
- return err
- }
- }
- return nil
- }
- // todo: typed/wrapped errors
- func isContainerCreateNameConflict(err error) bool {
- return strings.Contains(err.Error(), "Conflict. The name")
- }
- func isUnknownContainer(err error) bool {
- return strings.Contains(err.Error(), "No such container:")
- }
- func isStoppedContainer(err error) bool {
- return strings.Contains(err.Error(), "is already stopped")
- }
|