diff --git a/api/types/swarm/runtime.go b/api/types/swarm/runtime.go new file mode 100644 index 0000000000..c4c731dc82 --- /dev/null +++ b/api/types/swarm/runtime.go @@ -0,0 +1,19 @@ +package swarm + +// RuntimeType is the type of runtime used for the TaskSpec +type RuntimeType string + +// RuntimeURL is the proto type url +type RuntimeURL string + +const ( + // RuntimeContainer is the container based runtime + RuntimeContainer RuntimeType = "container" + // RuntimePlugin is the plugin based runtime + RuntimePlugin RuntimeType = "plugin" + + // RuntimeURLContainer is the proto url for the container type + RuntimeURLContainer RuntimeURL = "types.docker.com/RuntimeContainer" + // RuntimeURLPlugin is the proto url for the plugin type + RuntimeURLPlugin RuntimeURL = "types.docker.com/RuntimePlugin" +) diff --git a/api/types/swarm/task.go b/api/types/swarm/task.go index 1769b6082b..8d5792d3df 100644 --- a/api/types/swarm/task.go +++ b/api/types/swarm/task.go @@ -65,6 +65,9 @@ type TaskSpec struct { // ForceUpdate is a counter that triggers an update even if no relevant // parameters have been changed. ForceUpdate uint64 + + Runtime RuntimeType `json:",omitempty"` + RuntimeData []byte `json:",omitempty"` } // Resources represents resources (CPU/Memory). diff --git a/daemon/cluster/controllers/plugin/controller.go b/daemon/cluster/controllers/plugin/controller.go new file mode 100644 index 0000000000..de7eb2c00f --- /dev/null +++ b/daemon/cluster/controllers/plugin/controller.go @@ -0,0 +1,79 @@ +package plugin + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/swarmkit/api" + "golang.org/x/net/context" +) + +// Controller is the controller for the plugin backend +type Controller struct{} + +// NewController returns a new cluster plugin controller +func NewController() (*Controller, error) { + return &Controller{}, nil +} + +// Update is the update phase from swarmkit +func (p *Controller) Update(ctx context.Context, t *api.Task) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Update") + return nil +} + +// Prepare is the prepare phase from swarmkit +func (p *Controller) Prepare(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Prepare") + return nil +} + +// Start is the start phase from swarmkit +func (p *Controller) Start(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Start") + return nil +} + +// Wait causes the task to wait until returned +func (p *Controller) Wait(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Wait") + return nil +} + +// Shutdown is the shutdown phase from swarmkit +func (p *Controller) Shutdown(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Shutdown") + return nil +} + +// Terminate is the terminate phase from swarmkit +func (p *Controller) Terminate(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Terminate") + return nil +} + +// Remove is the remove phase from swarmkit +func (p *Controller) Remove(ctx context.Context) error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Remove") + return nil +} + +// Close is the close phase from swarmkit +func (p *Controller) Close() error { + logrus.WithFields(logrus.Fields{ + "controller": "plugin", + }).Debug("Close") + return nil +} diff --git a/daemon/cluster/convert/service.go b/daemon/cluster/convert/service.go index 98ea226355..fdddf98335 100644 --- a/daemon/cluster/convert/service.go +++ b/daemon/cluster/convert/service.go @@ -11,11 +11,19 @@ import ( ) // ServiceFromGRPC converts a grpc Service to a Service. -func ServiceFromGRPC(s swarmapi.Service) types.Service { +func ServiceFromGRPC(s swarmapi.Service) (types.Service, error) { + curSpec, err := serviceSpecFromGRPC(&s.Spec) + if err != nil { + return types.Service{}, err + } + prevSpec, err := serviceSpecFromGRPC(s.PreviousSpec) + if err != nil { + return types.Service{}, err + } service := types.Service{ ID: s.ID, - Spec: *serviceSpecFromGRPC(&s.Spec), - PreviousSpec: serviceSpecFromGRPC(s.PreviousSpec), + Spec: *curSpec, + PreviousSpec: prevSpec, Endpoint: endpointFromGRPC(s.Endpoint), } @@ -56,12 +64,12 @@ func ServiceFromGRPC(s swarmapi.Service) types.Service { service.UpdateStatus.Message = s.UpdateStatus.Message } - return service + return service, nil } -func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec { +func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error) { if spec == nil { - return nil + return nil, nil } serviceNetworks := make([]types.NetworkAttachmentConfig, 0, len(spec.Networks)) @@ -69,9 +77,29 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec { serviceNetworks = append(serviceNetworks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases}) } + taskTemplate := taskSpecFromGRPC(spec.Task) + + switch t := spec.Task.Runtime.(type) { + case *swarmapi.TaskSpec_Container: + containerConfig := t.Container + taskTemplate.ContainerSpec = containerSpecFromGRPC(containerConfig) + taskTemplate.Runtime = types.RuntimeContainer + case *swarmapi.TaskSpec_Generic: + switch t.Generic.Payload.TypeUrl { + case string(types.RuntimeURLPlugin): + taskTemplate.Runtime = types.RuntimePlugin + default: + return nil, fmt.Errorf("unknown task runtime type: %s", t.Generic.Payload.TypeUrl) + } + + taskTemplate.RuntimeData = t.Generic.Payload.Value + default: + return nil, fmt.Errorf("error creating service; unsupported runtime %T", t) + } + convertedSpec := &types.ServiceSpec{ Annotations: annotationsFromGRPC(spec.Annotations), - TaskTemplate: taskSpecFromGRPC(spec.Task), + TaskTemplate: taskTemplate, Networks: serviceNetworks, EndpointSpec: endpointSpecFromGRPC(spec.Endpoint), } @@ -90,7 +118,7 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec { } } - return convertedSpec + return convertedSpec, nil } // ServiceSpecToGRPC converts a ServiceSpec to a grpc ServiceSpec. @@ -124,11 +152,26 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) { Networks: serviceNetworks, } - containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec) - if err != nil { - return swarmapi.ServiceSpec{}, err + switch s.TaskTemplate.Runtime { + case types.RuntimeContainer, "": // if empty runtime default to container + containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec) + if err != nil { + return swarmapi.ServiceSpec{}, err + } + spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec} + case types.RuntimePlugin: + spec.Task.Runtime = &swarmapi.TaskSpec_Generic{ + Generic: &swarmapi.GenericRuntimeSpec{ + Kind: string(types.RuntimePlugin), + Payload: &gogotypes.Any{ + TypeUrl: string(types.RuntimeURLPlugin), + Value: s.TaskTemplate.RuntimeData, + }, + }, + } + default: + return swarmapi.ServiceSpec{}, fmt.Errorf("error creating service; unsupported runtime %q", s.TaskTemplate.Runtime) } - spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec} restartPolicy, err := restartPolicyToGRPC(s.TaskTemplate.RestartPolicy) if err != nil { diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go index 6be0f3156c..f889b994af 100644 --- a/daemon/cluster/executor/container/executor.go +++ b/daemon/cluster/executor/container/executor.go @@ -1,18 +1,23 @@ package container import ( + "fmt" "sort" "strings" + "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" + swarmtypes "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/daemon/cluster/controllers/plugin" executorpkg "github.com/docker/docker/daemon/cluster/executor" clustertypes "github.com/docker/docker/daemon/cluster/provider" networktypes "github.com/docker/libnetwork/types" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/agent/secrets" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/api/naming" "golang.org/x/net/context" ) @@ -156,9 +161,35 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) { return newNetworkAttacherController(e.backend, t, e.secrets) } - ctlr, err := newController(e.backend, t, secrets.Restrict(e.secrets, t)) - if err != nil { - return nil, err + var ctlr exec.Controller + switch r := t.Spec.GetRuntime().(type) { + case *api.TaskSpec_Generic: + logrus.WithFields(logrus.Fields{ + "kind": r.Generic.Kind, + "runtimeUrl": r.Generic.Payload.TypeUrl, + }).Debug("custom runtime requested") + runtimeKind, err := naming.Runtime(t.Spec) + if err != nil { + return ctlr, err + } + switch runtimeKind { + case string(swarmtypes.RuntimePlugin): + c, err := plugin.NewController() + if err != nil { + return ctlr, err + } + ctlr = c + default: + return ctlr, fmt.Errorf("unsupported runtime type: %q", r.Generic.Kind) + } + case *api.TaskSpec_Container: + c, err := newController(e.backend, t, secrets.Restrict(e.secrets, t)) + if err != nil { + return nil, err + } + ctlr = c + default: + return nil, fmt.Errorf("unsupported runtime: %q", r) } return ctlr, nil diff --git a/daemon/cluster/services.go b/daemon/cluster/services.go index 8d5d4a5edd..69e3441cd5 100644 --- a/daemon/cluster/services.go +++ b/daemon/cluster/services.go @@ -80,7 +80,11 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv continue } } - services = append(services, convert.ServiceFromGRPC(*service)) + svcs, err := convert.ServiceFromGRPC(*service) + if err != nil { + return nil, err + } + services = append(services, svcs) } return services, nil @@ -99,7 +103,11 @@ func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service, }); err != nil { return types.Service{}, err } - return convert.ServiceFromGRPC(*service), nil + svc, err := convert.ServiceFromGRPC(*service) + if err != nil { + return types.Service{}, err + } + return svc, nil } // CreateService creates a new service in a managed swarm cluster. @@ -116,58 +124,65 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity return apierrors.NewBadRequestError(err) } - ctnr := serviceSpec.Task.GetContainer() - if ctnr == nil { - return errors.New("service does not use container tasks") - } - - if encodedAuth != "" { - ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} - } - - // retrieve auth config from encoded auth - authConfig := &apitypes.AuthConfig{} - if encodedAuth != "" { - if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { - logrus.Warnf("invalid authconfig: %v", err) - } - } - resp = &apitypes.ServiceCreateResponse{} - // pin image by digest - if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { - digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig) - if err != nil { - logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()) - // warning in the client response should be concise - resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image)) - } else if ctnr.Image != digestImage { - logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage) - ctnr.Image = digestImage - } else { - logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image) + switch serviceSpec.Task.Runtime.(type) { + // handle other runtimes here + case *swarmapi.TaskSpec_Container: + ctnr := serviceSpec.Task.GetContainer() + if ctnr == nil { + return errors.New("service does not use container tasks") + } + if encodedAuth != "" { + ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} } - // Replace the context with a fresh one. - // If we timed out while communicating with the - // registry, then "ctx" will already be expired, which - // would cause UpdateService below to fail. Reusing - // "ctx" could make it impossible to create a service - // if the registry is slow or unresponsive. - var cancel func() - ctx, cancel = c.getRequestContext() - defer cancel() - } + // retrieve auth config from encoded auth + authConfig := &apitypes.AuthConfig{} + if encodedAuth != "" { + if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { + logrus.Warnf("invalid authconfig: %v", err) + } + } - r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) - if err != nil { - return err - } + // pin image by digest + if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { + digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig) + if err != nil { + logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()) + // warning in the client response should be concise + resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image)) - resp.ID = r.Service.ID + } else if ctnr.Image != digestImage { + logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage) + ctnr.Image = digestImage + + } else { + logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image) + + } + + // Replace the context with a fresh one. + // If we timed out while communicating with the + // registry, then "ctx" will already be expired, which + // would cause UpdateService below to fail. Reusing + // "ctx" could make it impossible to create a service + // if the registry is slow or unresponsive. + var cancel func() + ctx, cancel = c.getRequestContext() + defer cancel() + } + + r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) + if err != nil { + return err + } + + resp.ID = r.Service.ID + } return nil }) + return resp, err }