Merge pull request #32030 from ehazlett/any-runtime

Support Swarmkit Generic Task Runtime
This commit is contained in:
Anusha Ragunathan 2017-04-11 17:53:20 -07:00 committed by GitHub
commit 3343653edb
11 changed files with 441 additions and 68 deletions

View file

@ -2097,6 +2097,12 @@ definitions:
ForceUpdate:
description: "A counter that triggers an update even if no relevant parameters have been changed."
type: "integer"
Runtime:
description: "Runtime is the type of runtime specified for the task executor."
type: "string"
RuntimeData:
description: "RuntimeData is the payload sent to be used with the runtime for the executor."
type: "array"
Networks:
type: "array"
items:

View file

@ -0,0 +1,19 @@
package swarm
// RuntimeType is the type of runtime used for the TaskSpec
type RuntimeType string
// RuntimeURL is the proto type url
type RuntimeURL string
const (
// RuntimeContainer is the container based runtime
RuntimeContainer RuntimeType = "container"
// RuntimePlugin is the plugin based runtime
RuntimePlugin RuntimeType = "plugin"
// RuntimeURLContainer is the proto url for the container type
RuntimeURLContainer RuntimeURL = "types.docker.com/RuntimeContainer"
// RuntimeURLPlugin is the proto url for the plugin type
RuntimeURLPlugin RuntimeURL = "types.docker.com/RuntimePlugin"
)

View file

@ -65,6 +65,11 @@ type TaskSpec struct {
// ForceUpdate is a counter that triggers an update even if no relevant
// parameters have been changed.
ForceUpdate uint64
Runtime RuntimeType `json:",omitempty"`
// TODO (ehazlett): this should be removed and instead
// use struct tags (proto) for the runtimes
RuntimeData []byte `json:",omitempty"`
}
// Resources represents resources (CPU/Memory).

View file

@ -45,7 +45,9 @@ func runList(dockerCli *command.DockerCli, opts listOptions) error {
ctx := context.Background()
client := dockerCli.Client()
services, err := client.ServiceList(ctx, types.ServiceListOptions{Filters: opts.filter.Value()})
serviceFilters := opts.filter.Value()
serviceFilters.Add("runtime", string(swarm.RuntimeContainer))
services, err := client.ServiceList(ctx, types.ServiceListOptions{Filters: serviceFilters})
if err != nil {
return err
}

View file

@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/cli"
"github.com/docker/docker/cli/command"
"github.com/docker/docker/cli/command/formatter"
@ -58,8 +59,11 @@ func runPS(dockerCli *command.DockerCli, opts psOptions) error {
serviceIDFilter := filters.NewArgs()
serviceNameFilter := filters.NewArgs()
for _, service := range opts.services {
// default to container runtime
serviceIDFilter.Add("id", service)
serviceIDFilter.Add("runtime", string(swarmtypes.RuntimeContainer))
serviceNameFilter.Add("name", service)
serviceNameFilter.Add("runtime", string(swarmtypes.RuntimeContainer))
}
serviceByIDList, err := client.ServiceList(ctx, types.ServiceListOptions{Filters: serviceIDFilter})
if err != nil {

View file

@ -17,6 +17,12 @@ func getStackFilter(namespace string) filters.Args {
return filter
}
func getServiceFilter(namespace string) filters.Args {
filter := getStackFilter(namespace)
filter.Add("runtime", string(swarm.RuntimeContainer))
return filter
}
func getStackFilterFromOpt(namespace string, opt opts.FilterOpt) filters.Args {
filter := opt.Value()
filter.Add("label", convert.LabelNamespace+"="+namespace)
@ -36,7 +42,7 @@ func getServices(
) ([]swarm.Service, error) {
return apiclient.ServiceList(
ctx,
types.ServiceListOptions{Filters: getStackFilter(namespace)})
types.ServiceListOptions{Filters: getServiceFilter(namespace)})
}
func getStackNetworks(

View file

@ -0,0 +1,79 @@
package plugin
import (
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"golang.org/x/net/context"
)
// Controller is the controller for the plugin backend
type Controller struct{}
// NewController returns a new cluster plugin controller
func NewController() (*Controller, error) {
return &Controller{}, nil
}
// Update is the update phase from swarmkit
func (p *Controller) Update(ctx context.Context, t *api.Task) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Update")
return nil
}
// Prepare is the prepare phase from swarmkit
func (p *Controller) Prepare(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Prepare")
return nil
}
// Start is the start phase from swarmkit
func (p *Controller) Start(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Start")
return nil
}
// Wait causes the task to wait until returned
func (p *Controller) Wait(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Wait")
return nil
}
// Shutdown is the shutdown phase from swarmkit
func (p *Controller) Shutdown(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Shutdown")
return nil
}
// Terminate is the terminate phase from swarmkit
func (p *Controller) Terminate(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Terminate")
return nil
}
// Remove is the remove phase from swarmkit
func (p *Controller) Remove(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Remove")
return nil
}
// Close is the close phase from swarmkit
func (p *Controller) Close() error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Close")
return nil
}

View file

@ -1,6 +1,7 @@
package convert
import (
"errors"
"fmt"
"strings"
@ -10,12 +11,25 @@ import (
gogotypes "github.com/gogo/protobuf/types"
)
var (
// ErrUnsupportedRuntime returns an error if the runtime is not supported by the daemon
ErrUnsupportedRuntime = errors.New("unsupported runtime")
)
// ServiceFromGRPC converts a grpc Service to a Service.
func ServiceFromGRPC(s swarmapi.Service) types.Service {
func ServiceFromGRPC(s swarmapi.Service) (types.Service, error) {
curSpec, err := serviceSpecFromGRPC(&s.Spec)
if err != nil {
return types.Service{}, err
}
prevSpec, err := serviceSpecFromGRPC(s.PreviousSpec)
if err != nil {
return types.Service{}, err
}
service := types.Service{
ID: s.ID,
Spec: *serviceSpecFromGRPC(&s.Spec),
PreviousSpec: serviceSpecFromGRPC(s.PreviousSpec),
Spec: *curSpec,
PreviousSpec: prevSpec,
Endpoint: endpointFromGRPC(s.Endpoint),
}
@ -56,12 +70,12 @@ func ServiceFromGRPC(s swarmapi.Service) types.Service {
service.UpdateStatus.Message = s.UpdateStatus.Message
}
return service
return service, nil
}
func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error) {
if spec == nil {
return nil
return nil, nil
}
serviceNetworks := make([]types.NetworkAttachmentConfig, 0, len(spec.Networks))
@ -69,9 +83,29 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
serviceNetworks = append(serviceNetworks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases})
}
taskTemplate := taskSpecFromGRPC(spec.Task)
switch t := spec.Task.GetRuntime().(type) {
case *swarmapi.TaskSpec_Container:
containerConfig := t.Container
taskTemplate.ContainerSpec = containerSpecFromGRPC(containerConfig)
taskTemplate.Runtime = types.RuntimeContainer
case *swarmapi.TaskSpec_Generic:
switch t.Generic.Kind {
case string(types.RuntimePlugin):
taskTemplate.Runtime = types.RuntimePlugin
default:
return nil, fmt.Errorf("unknown task runtime type: %s", t.Generic.Payload.TypeUrl)
}
taskTemplate.RuntimeData = t.Generic.Payload.Value
default:
return nil, fmt.Errorf("error creating service; unsupported runtime %T", t)
}
convertedSpec := &types.ServiceSpec{
Annotations: annotationsFromGRPC(spec.Annotations),
TaskTemplate: taskSpecFromGRPC(spec.Task),
TaskTemplate: taskTemplate,
Networks: serviceNetworks,
EndpointSpec: endpointSpecFromGRPC(spec.Endpoint),
}
@ -90,7 +124,7 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) *types.ServiceSpec {
}
}
return convertedSpec
return convertedSpec, nil
}
// ServiceSpecToGRPC converts a ServiceSpec to a grpc ServiceSpec.
@ -124,11 +158,26 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
Networks: serviceNetworks,
}
containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
switch s.TaskTemplate.Runtime {
case types.RuntimeContainer, "": // if empty runtime default to container
containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
}
spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
case types.RuntimePlugin:
spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
Generic: &swarmapi.GenericRuntimeSpec{
Kind: string(types.RuntimePlugin),
Payload: &gogotypes.Any{
TypeUrl: string(types.RuntimeURLPlugin),
Value: s.TaskTemplate.RuntimeData,
},
},
}
default:
return swarmapi.ServiceSpec{}, ErrUnsupportedRuntime
}
spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
restartPolicy, err := restartPolicyToGRPC(s.TaskTemplate.RestartPolicy)
if err != nil {
@ -446,8 +495,14 @@ func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec {
taskNetworks = append(taskNetworks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases})
}
c := taskSpec.GetContainer()
cSpec := types.ContainerSpec{}
if c != nil {
cSpec = containerSpecFromGRPC(c)
}
return types.TaskSpec{
ContainerSpec: containerSpecFromGRPC(taskSpec.GetContainer()),
ContainerSpec: cSpec,
Resources: resourcesFromGRPC(taskSpec.Resources),
RestartPolicy: restartPolicyFromGRPC(taskSpec.Restart),
Placement: placementFromGRPC(taskSpec.Placement),

View file

@ -0,0 +1,148 @@
package convert
import (
"testing"
swarmtypes "github.com/docker/docker/api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
google_protobuf3 "github.com/gogo/protobuf/types"
)
func TestServiceConvertFromGRPCRuntimeContainer(t *testing.T) {
gs := swarmapi.Service{
Meta: swarmapi.Meta{
Version: swarmapi.Version{
Index: 1,
},
CreatedAt: nil,
UpdatedAt: nil,
},
SpecVersion: &swarmapi.Version{
Index: 1,
},
Spec: swarmapi.ServiceSpec{
Task: swarmapi.TaskSpec{
Runtime: &swarmapi.TaskSpec_Container{
Container: &swarmapi.ContainerSpec{
Image: "alpine:latest",
},
},
},
},
}
svc, err := ServiceFromGRPC(gs)
if err != nil {
t.Fatal(err)
}
if svc.Spec.TaskTemplate.Runtime != swarmtypes.RuntimeContainer {
t.Fatalf("expected type %s; received %T", swarmtypes.RuntimeContainer, svc.Spec.TaskTemplate.Runtime)
}
}
func TestServiceConvertFromGRPCGenericRuntimePlugin(t *testing.T) {
kind := string(swarmtypes.RuntimePlugin)
url := swarmtypes.RuntimeURLPlugin
gs := swarmapi.Service{
Meta: swarmapi.Meta{
Version: swarmapi.Version{
Index: 1,
},
CreatedAt: nil,
UpdatedAt: nil,
},
SpecVersion: &swarmapi.Version{
Index: 1,
},
Spec: swarmapi.ServiceSpec{
Task: swarmapi.TaskSpec{
Runtime: &swarmapi.TaskSpec_Generic{
Generic: &swarmapi.GenericRuntimeSpec{
Kind: kind,
Payload: &google_protobuf3.Any{
TypeUrl: string(url),
},
},
},
},
},
}
svc, err := ServiceFromGRPC(gs)
if err != nil {
t.Fatal(err)
}
if svc.Spec.TaskTemplate.Runtime != swarmtypes.RuntimePlugin {
t.Fatalf("expected type %s; received %T", swarmtypes.RuntimePlugin, svc.Spec.TaskTemplate.Runtime)
}
}
func TestServiceConvertToGRPCGenericRuntimePlugin(t *testing.T) {
s := swarmtypes.ServiceSpec{
TaskTemplate: swarmtypes.TaskSpec{
Runtime: swarmtypes.RuntimePlugin,
},
Mode: swarmtypes.ServiceMode{
Global: &swarmtypes.GlobalService{},
},
}
svc, err := ServiceSpecToGRPC(s)
if err != nil {
t.Fatal(err)
}
v, ok := svc.Task.Runtime.(*swarmapi.TaskSpec_Generic)
if !ok {
t.Fatal("expected type swarmapi.TaskSpec_Generic")
}
if v.Generic.Payload.TypeUrl != string(swarmtypes.RuntimeURLPlugin) {
t.Fatalf("expected url %s; received %s", swarmtypes.RuntimeURLPlugin, v.Generic.Payload.TypeUrl)
}
}
func TestServiceConvertToGRPCContainerRuntime(t *testing.T) {
image := "alpine:latest"
s := swarmtypes.ServiceSpec{
TaskTemplate: swarmtypes.TaskSpec{
ContainerSpec: swarmtypes.ContainerSpec{
Image: image,
},
},
Mode: swarmtypes.ServiceMode{
Global: &swarmtypes.GlobalService{},
},
}
svc, err := ServiceSpecToGRPC(s)
if err != nil {
t.Fatal(err)
}
v, ok := svc.Task.Runtime.(*swarmapi.TaskSpec_Container)
if !ok {
t.Fatal("expected type swarmapi.TaskSpec_Container")
}
if v.Container.Image != image {
t.Fatalf("expected image %s; received %s", image, v.Container.Image)
}
}
func TestServiceConvertToGRPCGenericRuntimeCustom(t *testing.T) {
s := swarmtypes.ServiceSpec{
TaskTemplate: swarmtypes.TaskSpec{
Runtime: "customruntime",
},
Mode: swarmtypes.ServiceMode{
Global: &swarmtypes.GlobalService{},
},
}
if _, err := ServiceSpecToGRPC(s); err != ErrUnsupportedRuntime {
t.Fatal(err)
}
}

View file

@ -1,18 +1,23 @@
package container
import (
"fmt"
"sort"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/controllers/plugin"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
networktypes "github.com/docker/libnetwork/types"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/agent/secrets"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/naming"
"golang.org/x/net/context"
)
@ -156,9 +161,35 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
return newNetworkAttacherController(e.backend, t, e.secrets)
}
ctlr, err := newController(e.backend, t, secrets.Restrict(e.secrets, t))
if err != nil {
return nil, err
var ctlr exec.Controller
switch r := t.Spec.GetRuntime().(type) {
case *api.TaskSpec_Generic:
logrus.WithFields(logrus.Fields{
"kind": r.Generic.Kind,
"type_url": r.Generic.Payload.TypeUrl,
}).Debug("custom runtime requested")
runtimeKind, err := naming.Runtime(t.Spec)
if err != nil {
return ctlr, err
}
switch runtimeKind {
case string(swarmtypes.RuntimePlugin):
c, err := plugin.NewController()
if err != nil {
return ctlr, err
}
ctlr = c
default:
return ctlr, fmt.Errorf("unsupported runtime type: %q", r.Generic.Kind)
}
case *api.TaskSpec_Container:
c, err := newController(e.backend, t, secrets.Restrict(e.secrets, t))
if err != nil {
return ctlr, err
}
ctlr = c
default:
return ctlr, fmt.Errorf("unsupported runtime: %q", r)
}
return ctlr, nil

View file

@ -40,18 +40,21 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
// be good to have accepted file check in the same file as
// the filter processing (in the for loop below).
accepted := map[string]bool{
"name": true,
"id": true,
"label": true,
"mode": true,
"name": true,
"id": true,
"label": true,
"mode": true,
"runtime": true,
}
if err := options.Filters.Validate(accepted); err != nil {
return nil, err
}
filters := &swarmapi.ListServicesRequest_Filters{
NamePrefixes: options.Filters.Get("name"),
IDPrefixes: options.Filters.Get("id"),
Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
Runtimes: options.Filters.Get("runtime"),
}
ctx, cancel := c.getRequestContext()
@ -80,7 +83,11 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
continue
}
}
services = append(services, convert.ServiceFromGRPC(*service))
svcs, err := convert.ServiceFromGRPC(*service)
if err != nil {
return nil, err
}
services = append(services, svcs)
}
return services, nil
@ -99,7 +106,11 @@ func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service,
}); err != nil {
return types.Service{}, err
}
return convert.ServiceFromGRPC(*service), nil
svc, err := convert.ServiceFromGRPC(*service)
if err != nil {
return types.Service{}, err
}
return svc, nil
}
// CreateService creates a new service in a managed swarm cluster.
@ -116,58 +127,65 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity
return apierrors.NewBadRequestError(err)
}
ctnr := serviceSpec.Task.GetContainer()
if ctnr == nil {
return errors.New("service does not use container tasks")
}
if encodedAuth != "" {
ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
}
// retrieve auth config from encoded auth
authConfig := &apitypes.AuthConfig{}
if encodedAuth != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
resp = &apitypes.ServiceCreateResponse{}
// pin image by digest
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
if err != nil {
logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
// warning in the client response should be concise
resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
} else if ctnr.Image != digestImage {
logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
ctnr.Image = digestImage
} else {
logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
switch serviceSpec.Task.Runtime.(type) {
// handle other runtimes here
case *swarmapi.TaskSpec_Container:
ctnr := serviceSpec.Task.GetContainer()
if ctnr == nil {
return errors.New("service does not use container tasks")
}
if encodedAuth != "" {
ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
}
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to create a service
// if the registry is slow or unresponsive.
var cancel func()
ctx, cancel = c.getRequestContext()
defer cancel()
}
// retrieve auth config from encoded auth
authConfig := &apitypes.AuthConfig{}
if encodedAuth != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
if err != nil {
return err
}
// pin image by digest
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
if err != nil {
logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
// warning in the client response should be concise
resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
resp.ID = r.Service.ID
} else if ctnr.Image != digestImage {
logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
ctnr.Image = digestImage
} else {
logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
}
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to create a service
// if the registry is slow or unresponsive.
var cancel func()
ctx, cancel = c.getRequestContext()
defer cancel()
}
r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
if err != nil {
return err
}
resp.ID = r.Service.ID
}
return nil
})
return resp, err
}