Selaa lähdekoodia

Make plugin emit strongly typed, consumable events

Enables other subsystems to watch actions for a plugin(s).

This will be used specifically for implementing plugins on swarm where a
swarm controller needs to watch the state of a plugin.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 8 vuotta sitten
vanhempi
commit
72c3bcf2a5
37 muutettua tiedostoa jossa 2197 lisäystä ja 206 poistoa
  1. 2 1
      api/server/router/plugin/backend.go
  2. 1 1
      api/server/router/swarm/helpers.go
  3. 29 1
      api/swagger.yaml
  4. 3 0
      api/types/swarm/runtime/gen.go
  5. 712 0
      api/types/swarm/runtime/plugin.pb.go
  6. 18 0
      api/types/swarm/runtime/plugin.proto
  7. 10 2
      api/types/swarm/task.go
  8. 75 31
      client/service_create.go
  9. 2 2
      client/service_create_test.go
  10. 33 13
      client/service_update.go
  11. 1 0
      cmd/dockerd/daemon.go
  12. 2 0
      daemon/cluster/cluster.go
  13. 213 31
      daemon/cluster/controllers/plugin/controller.go
  14. 390 0
      daemon/cluster/controllers/plugin/controller_test.go
  15. 6 3
      daemon/cluster/convert/container.go
  16. 56 21
      daemon/cluster/convert/service.go
  17. 4 2
      daemon/cluster/convert/service_test.go
  18. 9 6
      daemon/cluster/convert/task.go
  19. 8 6
      daemon/cluster/executor/container/executor.go
  20. 2 0
      daemon/cluster/filters.go
  21. 1 1
      daemon/cluster/noderunner.go
  22. 91 63
      daemon/cluster/services.go
  23. 12 6
      daemon/cluster/tasks.go
  24. 1 0
      docs/api/version-history.md
  25. 51 9
      integration-cli/daemon/daemon_swarm.go
  26. 78 0
      integration-cli/docker_api_swarm_service_test.go
  27. 8 2
      integration-cli/docker_api_swarm_test.go
  28. 34 0
      integration-cli/fixtures/plugin/basic/basic.go
  29. 183 0
      integration-cli/fixtures/plugin/plugin.go
  30. 10 0
      pkg/pubsub/publisher.go
  31. 14 3
      plugin/backend_linux.go
  32. 1 1
      plugin/backend_unsupported.go
  33. 11 0
      plugin/defs.go
  34. 111 0
      plugin/events.go
  35. 9 0
      plugin/manager.go
  36. 4 1
      plugin/manager_linux.go
  37. 2 0
      plugin/v2/plugin.go

+ 2 - 1
api/server/router/plugin/backend.go

@@ -7,6 +7,7 @@ import (
 	"github.com/docker/distribution/reference"
 	enginetypes "github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
+	"github.com/docker/docker/plugin"
 	"golang.org/x/net/context"
 )
 
@@ -19,7 +20,7 @@ type Backend interface {
 	Remove(name string, config *enginetypes.PluginRmConfig) error
 	Set(name string, args []string) error
 	Privileges(ctx context.Context, ref reference.Named, metaHeaders http.Header, authConfig *enginetypes.AuthConfig) (enginetypes.PluginPrivileges, error)
-	Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
+	Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
 	Push(ctx context.Context, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, outStream io.Writer) error
 	Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
 	CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *enginetypes.PluginCreateOptions) error

+ 1 - 1
api/server/router/swarm/helpers.go

@@ -44,7 +44,7 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *
 			// maybe should return some context with this error?
 			return err
 		}
-		tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty
+		tty = (s.Spec.TaskTemplate.ContainerSpec != nil && s.Spec.TaskTemplate.ContainerSpec.TTY) || tty
 	}
 	for _, task := range selector.Tasks {
 		t, err := sr.backend.GetTask(task)

+ 29 - 1
api/swagger.yaml

@@ -1975,11 +1975,39 @@ definitions:
     description: "User modifiable task configuration."
     type: "object"
     properties:
+      PluginSpec:
+        type: "object"
+        description: "Invalid when specified with `ContainerSpec`."
+        properties:
+          Name:
+            description: "The name or 'alias' to use for the plugin."
+            type: "string"
+          Remote:
+            description: "The plugin image reference to use."
+            type: "string"
+          Disabled:
+            description: "Disable the plugin once scheduled."
+            type: "boolean"
+          PluginPrivilege:
+            type: "array"
+            items:
+              description: "Describes a permission accepted by the user upon installing the plugin."
+              type: "object"
+              properties:
+                Name:
+                  type: "string"
+                Description:
+                  type: "string"
+                Value:
+                  type: "array"
+                  items:
+                    type: "string"
       ContainerSpec:
         type: "object"
+        description: "Invalid when specified with `PluginSpec`."
         properties:
           Image:
-            description: "The image name to use for the container."
+            description: "The image name to use for the container"
             type: "string"
           Labels:
             description: "User-defined key/value data."

+ 3 - 0
api/types/swarm/runtime/gen.go

@@ -0,0 +1,3 @@
+//go:generate protoc -I . --gogofast_out=import_path=github.com/docker/docker/api/types/swarm/runtime:. plugin.proto
+
+package runtime

+ 712 - 0
api/types/swarm/runtime/plugin.pb.go

@@ -0,0 +1,712 @@
+// Code generated by protoc-gen-gogo.
+// source: plugin.proto
+// DO NOT EDIT!
+
+/*
+	Package runtime is a generated protocol buffer package.
+
+	It is generated from these files:
+		plugin.proto
+
+	It has these top-level messages:
+		PluginSpec
+		PluginPrivilege
+*/
+package runtime
+
+import proto "github.com/gogo/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+import io "io"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
+
+// PluginSpec defines the base payload which clients can specify for creating
+// a service with the plugin runtime.
+type PluginSpec struct {
+	Name       string             `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Remote     string             `protobuf:"bytes,2,opt,name=remote,proto3" json:"remote,omitempty"`
+	Privileges []*PluginPrivilege `protobuf:"bytes,3,rep,name=privileges" json:"privileges,omitempty"`
+	Disabled   bool               `protobuf:"varint,4,opt,name=disabled,proto3" json:"disabled,omitempty"`
+}
+
+func (m *PluginSpec) Reset()                    { *m = PluginSpec{} }
+func (m *PluginSpec) String() string            { return proto.CompactTextString(m) }
+func (*PluginSpec) ProtoMessage()               {}
+func (*PluginSpec) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{0} }
+
+func (m *PluginSpec) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *PluginSpec) GetRemote() string {
+	if m != nil {
+		return m.Remote
+	}
+	return ""
+}
+
+func (m *PluginSpec) GetPrivileges() []*PluginPrivilege {
+	if m != nil {
+		return m.Privileges
+	}
+	return nil
+}
+
+func (m *PluginSpec) GetDisabled() bool {
+	if m != nil {
+		return m.Disabled
+	}
+	return false
+}
+
+// PluginPrivilege describes a permission the user has to accept
+// upon installing a plugin.
+type PluginPrivilege struct {
+	Name        string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Description string   `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
+	Value       []string `protobuf:"bytes,3,rep,name=value" json:"value,omitempty"`
+}
+
+func (m *PluginPrivilege) Reset()                    { *m = PluginPrivilege{} }
+func (m *PluginPrivilege) String() string            { return proto.CompactTextString(m) }
+func (*PluginPrivilege) ProtoMessage()               {}
+func (*PluginPrivilege) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{1} }
+
+func (m *PluginPrivilege) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *PluginPrivilege) GetDescription() string {
+	if m != nil {
+		return m.Description
+	}
+	return ""
+}
+
+func (m *PluginPrivilege) GetValue() []string {
+	if m != nil {
+		return m.Value
+	}
+	return nil
+}
+
+func init() {
+	proto.RegisterType((*PluginSpec)(nil), "PluginSpec")
+	proto.RegisterType((*PluginPrivilege)(nil), "PluginPrivilege")
+}
+func (m *PluginSpec) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *PluginSpec) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Name) > 0 {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name)))
+		i += copy(dAtA[i:], m.Name)
+	}
+	if len(m.Remote) > 0 {
+		dAtA[i] = 0x12
+		i++
+		i = encodeVarintPlugin(dAtA, i, uint64(len(m.Remote)))
+		i += copy(dAtA[i:], m.Remote)
+	}
+	if len(m.Privileges) > 0 {
+		for _, msg := range m.Privileges {
+			dAtA[i] = 0x1a
+			i++
+			i = encodeVarintPlugin(dAtA, i, uint64(msg.Size()))
+			n, err := msg.MarshalTo(dAtA[i:])
+			if err != nil {
+				return 0, err
+			}
+			i += n
+		}
+	}
+	if m.Disabled {
+		dAtA[i] = 0x20
+		i++
+		if m.Disabled {
+			dAtA[i] = 1
+		} else {
+			dAtA[i] = 0
+		}
+		i++
+	}
+	return i, nil
+}
+
+func (m *PluginPrivilege) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *PluginPrivilege) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Name) > 0 {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name)))
+		i += copy(dAtA[i:], m.Name)
+	}
+	if len(m.Description) > 0 {
+		dAtA[i] = 0x12
+		i++
+		i = encodeVarintPlugin(dAtA, i, uint64(len(m.Description)))
+		i += copy(dAtA[i:], m.Description)
+	}
+	if len(m.Value) > 0 {
+		for _, s := range m.Value {
+			dAtA[i] = 0x1a
+			i++
+			l = len(s)
+			for l >= 1<<7 {
+				dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
+				l >>= 7
+				i++
+			}
+			dAtA[i] = uint8(l)
+			i++
+			i += copy(dAtA[i:], s)
+		}
+	}
+	return i, nil
+}
+
+func encodeFixed64Plugin(dAtA []byte, offset int, v uint64) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	dAtA[offset+4] = uint8(v >> 32)
+	dAtA[offset+5] = uint8(v >> 40)
+	dAtA[offset+6] = uint8(v >> 48)
+	dAtA[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Plugin(dAtA []byte, offset int, v uint32) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintPlugin(dAtA []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		dAtA[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	dAtA[offset] = uint8(v)
+	return offset + 1
+}
+func (m *PluginSpec) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Name)
+	if l > 0 {
+		n += 1 + l + sovPlugin(uint64(l))
+	}
+	l = len(m.Remote)
+	if l > 0 {
+		n += 1 + l + sovPlugin(uint64(l))
+	}
+	if len(m.Privileges) > 0 {
+		for _, e := range m.Privileges {
+			l = e.Size()
+			n += 1 + l + sovPlugin(uint64(l))
+		}
+	}
+	if m.Disabled {
+		n += 2
+	}
+	return n
+}
+
+func (m *PluginPrivilege) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Name)
+	if l > 0 {
+		n += 1 + l + sovPlugin(uint64(l))
+	}
+	l = len(m.Description)
+	if l > 0 {
+		n += 1 + l + sovPlugin(uint64(l))
+	}
+	if len(m.Value) > 0 {
+		for _, s := range m.Value {
+			l = len(s)
+			n += 1 + l + sovPlugin(uint64(l))
+		}
+	}
+	return n
+}
+
+func sovPlugin(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozPlugin(x uint64) (n int) {
+	return sovPlugin(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *PluginSpec) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowPlugin
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: PluginSpec: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: PluginSpec: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Name = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Remote", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Remote = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Privileges", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Privileges = append(m.Privileges, &PluginPrivilege{})
+			if err := m.Privileges[len(m.Privileges)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 4:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Disabled", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Disabled = bool(v != 0)
+		default:
+			iNdEx = preIndex
+			skippy, err := skipPlugin(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *PluginPrivilege) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowPlugin
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: PluginPrivilege: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: PluginPrivilege: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Name = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Description = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Value = append(m.Value, string(dAtA[iNdEx:postIndex]))
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipPlugin(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthPlugin
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func skipPlugin(dAtA []byte) (n int, err error) {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return 0, ErrIntOverflowPlugin
+			}
+			if iNdEx >= l {
+				return 0, io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		wireType := int(wire & 0x7)
+		switch wireType {
+		case 0:
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				iNdEx++
+				if dAtA[iNdEx-1] < 0x80 {
+					break
+				}
+			}
+			return iNdEx, nil
+		case 1:
+			iNdEx += 8
+			return iNdEx, nil
+		case 2:
+			var length int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowPlugin
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				length |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			iNdEx += length
+			if length < 0 {
+				return 0, ErrInvalidLengthPlugin
+			}
+			return iNdEx, nil
+		case 3:
+			for {
+				var innerWire uint64
+				var start int = iNdEx
+				for shift := uint(0); ; shift += 7 {
+					if shift >= 64 {
+						return 0, ErrIntOverflowPlugin
+					}
+					if iNdEx >= l {
+						return 0, io.ErrUnexpectedEOF
+					}
+					b := dAtA[iNdEx]
+					iNdEx++
+					innerWire |= (uint64(b) & 0x7F) << shift
+					if b < 0x80 {
+						break
+					}
+				}
+				innerWireType := int(innerWire & 0x7)
+				if innerWireType == 4 {
+					break
+				}
+				next, err := skipPlugin(dAtA[start:])
+				if err != nil {
+					return 0, err
+				}
+				iNdEx = start + next
+			}
+			return iNdEx, nil
+		case 4:
+			return iNdEx, nil
+		case 5:
+			iNdEx += 4
+			return iNdEx, nil
+		default:
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+		}
+	}
+	panic("unreachable")
+}
+
+var (
+	ErrInvalidLengthPlugin = fmt.Errorf("proto: negative length found during unmarshaling")
+	ErrIntOverflowPlugin   = fmt.Errorf("proto: integer overflow")
+)
+
+func init() { proto.RegisterFile("plugin.proto", fileDescriptorPlugin) }
+
+var fileDescriptorPlugin = []byte{
+	// 196 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0xc8, 0x29, 0x4d,
+	0xcf, 0xcc, 0xd3, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x6a, 0x63, 0xe4, 0xe2, 0x0a, 0x00, 0x0b,
+	0x04, 0x17, 0xa4, 0x26, 0x0b, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30,
+	0x6a, 0x70, 0x06, 0x81, 0xd9, 0x42, 0x62, 0x5c, 0x6c, 0x45, 0xa9, 0xb9, 0xf9, 0x25, 0xa9, 0x12,
+	0x4c, 0x60, 0x51, 0x28, 0x4f, 0xc8, 0x80, 0x8b, 0xab, 0xa0, 0x28, 0xb3, 0x2c, 0x33, 0x27, 0x35,
+	0x3d, 0xb5, 0x58, 0x82, 0x59, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x40, 0x0f, 0x62, 0x58, 0x00, 0x4c,
+	0x22, 0x08, 0x49, 0x8d, 0x90, 0x14, 0x17, 0x47, 0x4a, 0x66, 0x71, 0x62, 0x52, 0x4e, 0x6a, 0x8a,
+	0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, 0x9c, 0xaf, 0x14, 0xcb, 0xc5, 0x8f, 0xa6, 0x15, 0xab,
+	0x63, 0x14, 0xb8, 0xb8, 0x53, 0x52, 0x8b, 0x93, 0x8b, 0x32, 0x0b, 0x4a, 0x32, 0xf3, 0xf3, 0xa0,
+	0x2e, 0x42, 0x16, 0x12, 0x12, 0xe1, 0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d, 0x05, 0xbb, 0x88, 0x33,
+	0x08, 0xc2, 0x71, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4,
+	0x18, 0x93, 0xd8, 0xc0, 0x9e, 0x37, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x84, 0xad, 0x79,
+	0x0c, 0x01, 0x00, 0x00,
+}

+ 18 - 0
api/types/swarm/runtime/plugin.proto

@@ -0,0 +1,18 @@
+syntax = "proto3";
+
+// PluginSpec defines the base payload which clients can specify for creating
+// a service with the plugin runtime.
+message PluginSpec {
+	string name = 1;
+	string remote = 2;
+	repeated PluginPrivilege privileges = 3;
+	bool disabled = 4;
+}
+
+// PluginPrivilege describes a permission the user has to accept
+// upon installing a plugin.
+message PluginPrivilege {
+	string name = 1;
+	string description = 2;
+	repeated string value = 3;
+}

+ 10 - 2
api/types/swarm/task.go

@@ -1,6 +1,10 @@
 package swarm
 
-import "time"
+import (
+	"time"
+
+	"github.com/docker/docker/api/types/swarm/runtime"
+)
 
 // TaskState represents the state of a task.
 type TaskState string
@@ -51,7 +55,11 @@ type Task struct {
 
 // TaskSpec represents the spec of a task.
 type TaskSpec struct {
-	ContainerSpec ContainerSpec             `json:",omitempty"`
+	// ContainerSpec and PluginSpec are mutually exclusive.
+	// PluginSpec will only be used when the `Runtime` field is set to `plugin`
+	ContainerSpec *ContainerSpec      `json:",omitempty"`
+	PluginSpec    *runtime.PluginSpec `json:",omitempty"`
+
 	Resources     *ResourceRequirements     `json:",omitempty"`
 	RestartPolicy *RestartPolicy            `json:",omitempty"`
 	Placement     *Placement                `json:",omitempty"`

+ 75 - 31
client/service_create.go

@@ -6,9 +6,9 @@ import (
 
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types"
-	registrytypes "github.com/docker/docker/api/types/registry"
 	"github.com/docker/docker/api/types/swarm"
 	"github.com/opencontainers/go-digest"
+	"github.com/pkg/errors"
 	"golang.org/x/net/context"
 )
 
@@ -24,24 +24,51 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec,
 		headers["X-Registry-Auth"] = []string{options.EncodedRegistryAuth}
 	}
 
-	// ensure that the image is tagged
-	if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
-		service.TaskTemplate.ContainerSpec.Image = taggedImg
+	// Make sure containerSpec is not nil when no runtime is set or the runtime is set to container
+	if service.TaskTemplate.ContainerSpec == nil && (service.TaskTemplate.Runtime == "" || service.TaskTemplate.Runtime == swarm.RuntimeContainer) {
+		service.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
+	}
+
+	if err := validateServiceSpec(service); err != nil {
+		return types.ServiceCreateResponse{}, err
 	}
 
-	// Contact the registry to retrieve digest and platform information
-	if options.QueryRegistry {
-		distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
-		distErr = err
-		if err == nil {
-			// now pin by digest if the image doesn't already contain a digest
-			if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" {
+	// ensure that the image is tagged
+	var imgPlatforms []swarm.Platform
+	if service.TaskTemplate.ContainerSpec != nil {
+		if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
+			service.TaskTemplate.ContainerSpec.Image = taggedImg
+		}
+		if options.QueryRegistry {
+			var img string
+			img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
+			if img != "" {
 				service.TaskTemplate.ContainerSpec.Image = img
 			}
-			// add platforms that are compatible with the service
-			service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect)
 		}
 	}
+
+	// ensure that the image is tagged
+	if service.TaskTemplate.PluginSpec != nil {
+		if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" {
+			service.TaskTemplate.PluginSpec.Remote = taggedImg
+		}
+		if options.QueryRegistry {
+			var img string
+			img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth)
+			if img != "" {
+				service.TaskTemplate.PluginSpec.Remote = img
+			}
+		}
+	}
+
+	if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 {
+		service.TaskTemplate.Placement = &swarm.Placement{}
+	}
+	if len(imgPlatforms) > 0 {
+		service.TaskTemplate.Placement.Platforms = imgPlatforms
+	}
+
 	var response types.ServiceCreateResponse
 	resp, err := cli.post(ctx, "/services/create", nil, service, headers)
 	if err != nil {
@@ -58,6 +85,28 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec,
 	return response, err
 }
 
+func imageDigestAndPlatforms(ctx context.Context, cli *Client, image, encodedAuth string) (string, []swarm.Platform, error) {
+	distributionInspect, err := cli.DistributionInspect(ctx, image, encodedAuth)
+	imageWithDigest := image
+	var platforms []swarm.Platform
+	if err != nil {
+		return "", nil, err
+	}
+
+	imageWithDigest = imageWithDigestString(image, distributionInspect.Descriptor.Digest)
+
+	if len(distributionInspect.Platforms) > 0 {
+		platforms = make([]swarm.Platform, 0, len(distributionInspect.Platforms))
+		for _, p := range distributionInspect.Platforms {
+			platforms = append(platforms, swarm.Platform{
+				Architecture: p.Architecture,
+				OS:           p.OS,
+			})
+		}
+	}
+	return imageWithDigest, platforms, err
+}
+
 // imageWithDigestString takes an image string and a digest, and updates
 // the image string if it didn't originally contain a digest. It returns
 // an empty string if there are no updates.
@@ -86,27 +135,22 @@ func imageWithTagString(image string) string {
 	return ""
 }
 
-// setServicePlatforms sets Platforms in swarm.Placement to list all
-// compatible platforms for the service, as found in distributionInspect
-// and returns a pointer to the new or updated swarm.Placement struct.
-func setServicePlatforms(placement *swarm.Placement, distributionInspect registrytypes.DistributionInspect) *swarm.Placement {
-	if placement == nil {
-		placement = &swarm.Placement{}
-	}
-	// reset any existing listed platforms
-	placement.Platforms = []swarm.Platform{}
-	for _, p := range distributionInspect.Platforms {
-		placement.Platforms = append(placement.Platforms, swarm.Platform{
-			Architecture: p.Architecture,
-			OS:           p.OS,
-		})
-	}
-	return placement
-}
-
 // digestWarning constructs a formatted warning string using the
 // image name that could not be pinned by digest. The formatting
 // is hardcoded, but could me made smarter in the future
 func digestWarning(image string) string {
 	return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
 }
+
+func validateServiceSpec(s swarm.ServiceSpec) error {
+	if s.TaskTemplate.ContainerSpec != nil && s.TaskTemplate.PluginSpec != nil {
+		return errors.New("must not specify both a container spec and a plugin spec in the task template")
+	}
+	if s.TaskTemplate.PluginSpec != nil && s.TaskTemplate.Runtime != swarm.RuntimePlugin {
+		return errors.New("mismatched runtime with plugin spec")
+	}
+	if s.TaskTemplate.ContainerSpec != nil && (s.TaskTemplate.Runtime != "" && s.TaskTemplate.Runtime != swarm.RuntimeContainer) {
+		return errors.New("mismatched runtime with container spec")
+	}
+	return nil
+}

+ 2 - 2
client/service_create_test.go

@@ -112,7 +112,7 @@ func TestServiceCreateCompatiblePlatforms(t *testing.T) {
 		}),
 	}
 
-	spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: swarm.ContainerSpec{Image: "foobar:1.0"}}}
+	spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: &swarm.ContainerSpec{Image: "foobar:1.0"}}}
 
 	r, err := client.ServiceCreate(context.Background(), spec, types.ServiceCreateOptions{QueryRegistry: true})
 	assert.NoError(t, err)
@@ -189,7 +189,7 @@ func TestServiceCreateDigestPinning(t *testing.T) {
 	for _, p := range pinByDigestTests {
 		r, err := client.ServiceCreate(context.Background(), swarm.ServiceSpec{
 			TaskTemplate: swarm.TaskSpec{
-				ContainerSpec: swarm.ContainerSpec{
+				ContainerSpec: &swarm.ContainerSpec{
 					Image: p.img,
 				},
 			},

+ 33 - 13
client/service_update.go

@@ -35,26 +35,46 @@ func (cli *Client) ServiceUpdate(ctx context.Context, serviceID string, version
 
 	query.Set("version", strconv.FormatUint(version.Index, 10))
 
-	// ensure that the image is tagged
-	if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
-		service.TaskTemplate.ContainerSpec.Image = taggedImg
+	if err := validateServiceSpec(service); err != nil {
+		return types.ServiceUpdateResponse{}, err
 	}
 
-	// Contact the registry to retrieve digest and platform information
-	// This happens only when the image has changed
-	if options.QueryRegistry {
-		distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
-		distErr = err
-		if err == nil {
-			// now pin by digest if the image doesn't already contain a digest
-			if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" {
+	var imgPlatforms []swarm.Platform
+	// ensure that the image is tagged
+	if service.TaskTemplate.ContainerSpec != nil {
+		if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
+			service.TaskTemplate.ContainerSpec.Image = taggedImg
+		}
+		if options.QueryRegistry {
+			var img string
+			img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
+			if img != "" {
 				service.TaskTemplate.ContainerSpec.Image = img
 			}
-			// add platforms that are compatible with the service
-			service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect)
 		}
 	}
 
+	// ensure that the image is tagged
+	if service.TaskTemplate.PluginSpec != nil {
+		if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" {
+			service.TaskTemplate.PluginSpec.Remote = taggedImg
+		}
+		if options.QueryRegistry {
+			var img string
+			img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth)
+			if img != "" {
+				service.TaskTemplate.PluginSpec.Remote = img
+			}
+		}
+	}
+
+	if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 {
+		service.TaskTemplate.Placement = &swarm.Placement{}
+	}
+	if len(imgPlatforms) > 0 {
+		service.TaskTemplate.Placement.Platforms = imgPlatforms
+	}
+
 	var response types.ServiceUpdateResponse
 	resp, err := cli.post(ctx, "/services/"+serviceID+"/update", query, service, headers)
 	if err != nil {

+ 1 - 0
cmd/dockerd/daemon.go

@@ -253,6 +253,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
 		Root:                   cli.Config.Root,
 		Name:                   name,
 		Backend:                d,
+		PluginBackend:          d.PluginManager(),
 		NetworkSubnetsProvider: d,
 		DefaultAdvertiseAddr:   cli.Config.SwarmDefaultAdvertiseAddr,
 		RuntimeRoot:            cli.getSwarmRunRoot(),

+ 2 - 0
daemon/cluster/cluster.go

@@ -49,6 +49,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/api/types/network"
 	types "github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/daemon/cluster/controllers/plugin"
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
 	"github.com/docker/docker/pkg/signal"
 	lncluster "github.com/docker/libnetwork/cluster"
@@ -97,6 +98,7 @@ type Config struct {
 	Root                   string
 	Name                   string
 	Backend                executorpkg.Backend
+	PluginBackend          plugin.Backend
 	NetworkSubnetsProvider NetworkSubnetsProvider
 
 	// DefaultAdvertiseAddr is the default host/IP or network interface to use

+ 213 - 31
daemon/cluster/controllers/plugin/controller.go

@@ -1,79 +1,261 @@
 package plugin
 
 import (
+	"io"
+	"io/ioutil"
+	"net/http"
+
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/reference"
+	enginetypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/swarm/runtime"
+	"github.com/docker/docker/plugin"
+	"github.com/docker/docker/plugin/v2"
 	"github.com/docker/swarmkit/api"
+	"github.com/gogo/protobuf/proto"
+	"github.com/pkg/errors"
 	"golang.org/x/net/context"
 )
 
-// Controller is the controller for the plugin backend
-type Controller struct{}
+// Controller is the controller for the plugin backend.
+// Plugins are managed as a singleton object with a desired state (different from containers).
+// With the the plugin controller instead of having a strict create->start->stop->remove
+// task lifecycle like containers, we manage the desired state of the plugin and let
+// the plugin manager do what it already does and monitor the plugin.
+// We'll also end up with many tasks all pointing to the same plugin ID.
+//
+// TODO(@cpuguy83): registry auth is intentionally not supported until we work out
+// the right way to pass registry crednetials via secrets.
+type Controller struct {
+	backend Backend
+	spec    runtime.PluginSpec
+	logger  *logrus.Entry
+
+	pluginID  string
+	serviceID string
+	taskID    string
+
+	// hook used to signal tests that `Wait()` is actually ready and waiting
+	signalWaitReady func()
+}
+
+// Backend is the interface for interacting with the plugin manager
+// Controller actions are passed to the configured backend to do the real work.
+type Backend interface {
+	Disable(name string, config *enginetypes.PluginDisableConfig) error
+	Enable(name string, config *enginetypes.PluginEnableConfig) error
+	Remove(name string, config *enginetypes.PluginRmConfig) error
+	Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
+	Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
+	Get(name string) (*v2.Plugin, error)
+	SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
+}
 
 // NewController returns a new cluster plugin controller
-func NewController() (*Controller, error) {
-	return &Controller{}, nil
+func NewController(backend Backend, t *api.Task) (*Controller, error) {
+	spec, err := readSpec(t)
+	if err != nil {
+		return nil, err
+	}
+	return &Controller{
+		backend:   backend,
+		spec:      spec,
+		serviceID: t.ServiceID,
+		logger: logrus.WithFields(logrus.Fields{
+			"controller": "plugin",
+			"task":       t.ID,
+			"plugin":     spec.Name,
+		})}, nil
+}
+
+func readSpec(t *api.Task) (runtime.PluginSpec, error) {
+	var cfg runtime.PluginSpec
+
+	generic := t.Spec.GetGeneric()
+	if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
+		return cfg, errors.Wrap(err, "error reading plugin spec")
+	}
+	return cfg, nil
 }
 
 // Update is the update phase from swarmkit
 func (p *Controller) Update(ctx context.Context, t *api.Task) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Update")
+	p.logger.Debug("Update")
 	return nil
 }
 
 // Prepare is the prepare phase from swarmkit
-func (p *Controller) Prepare(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Prepare")
+func (p *Controller) Prepare(ctx context.Context) (err error) {
+	p.logger.Debug("Prepare")
+
+	remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
+	if err != nil {
+		return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
+	}
+
+	if p.spec.Name == "" {
+		p.spec.Name = remote.String()
+	}
+
+	var authConfig enginetypes.AuthConfig
+	privs := convertPrivileges(p.spec.Privileges)
+
+	pl, err := p.backend.Get(p.spec.Name)
+
+	defer func() {
+		if pl != nil && err == nil {
+			pl.Acquire()
+		}
+	}()
+
+	if err == nil && pl != nil {
+		if pl.SwarmServiceID != p.serviceID {
+			return errors.Errorf("plugin already exists: %s", p.spec.Name)
+		}
+		if pl.IsEnabled() {
+			if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
+				p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
+			}
+		}
+		p.pluginID = pl.GetID()
+		return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
+	}
+
+	if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID)); err != nil {
+		return err
+	}
+	pl, err = p.backend.Get(p.spec.Name)
+	if err != nil {
+		return err
+	}
+	p.pluginID = pl.GetID()
+
 	return nil
 }
 
 // Start is the start phase from swarmkit
 func (p *Controller) Start(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Start")
+	p.logger.Debug("Start")
+
+	pl, err := p.backend.Get(p.pluginID)
+	if err != nil {
+		return err
+	}
+
+	if p.spec.Disabled {
+		if pl.IsEnabled() {
+			return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
+		}
+		return nil
+	}
+	if !pl.IsEnabled() {
+		return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
+	}
 	return nil
 }
 
 // Wait causes the task to wait until returned
 func (p *Controller) Wait(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Wait")
-	return nil
+	p.logger.Debug("Wait")
+
+	pl, err := p.backend.Get(p.pluginID)
+	if err != nil {
+		return err
+	}
+
+	events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
+	defer cancel()
+
+	if p.signalWaitReady != nil {
+		p.signalWaitReady()
+	}
+
+	if !p.spec.Disabled != pl.IsEnabled() {
+		return errors.New("mismatched plugin state")
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case e := <-events:
+			p.logger.Debugf("got event %#T", e)
+
+			switch e.(type) {
+			case plugin.EventEnable:
+				if p.spec.Disabled {
+					return errors.New("plugin enabled")
+				}
+			case plugin.EventRemove:
+				return errors.New("plugin removed")
+			case plugin.EventDisable:
+				if !p.spec.Disabled {
+					return errors.New("plugin disabled")
+				}
+			}
+		}
+	}
+}
+
+func isNotFound(err error) bool {
+	_, ok := errors.Cause(err).(plugin.ErrNotFound)
+	return ok
 }
 
 // Shutdown is the shutdown phase from swarmkit
 func (p *Controller) Shutdown(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Shutdown")
+	p.logger.Debug("Shutdown")
 	return nil
 }
 
 // Terminate is the terminate phase from swarmkit
 func (p *Controller) Terminate(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Terminate")
+	p.logger.Debug("Terminate")
 	return nil
 }
 
 // Remove is the remove phase from swarmkit
 func (p *Controller) Remove(ctx context.Context) error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Remove")
-	return nil
+	p.logger.Debug("Remove")
+
+	pl, err := p.backend.Get(p.pluginID)
+	if err != nil {
+		if isNotFound(err) {
+			return nil
+		}
+		return err
+	}
+
+	pl.Release()
+	if pl.GetRefCount() > 0 {
+		p.logger.Debug("skipping remove due to ref count")
+		return nil
+	}
+
+	// This may error because we have exactly 1 plugin, but potentially multiple
+	// tasks which are calling remove.
+	err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
+	if isNotFound(err) {
+		return nil
+	}
+	return err
 }
 
 // Close is the close phase from swarmkit
 func (p *Controller) Close() error {
-	logrus.WithFields(logrus.Fields{
-		"controller": "plugin",
-	}).Debug("Close")
+	p.logger.Debug("Close")
 	return nil
 }
+
+func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
+	var out enginetypes.PluginPrivileges
+	for _, p := range ls {
+		pp := enginetypes.PluginPrivilege{
+			Name:        p.Name,
+			Description: p.Description,
+			Value:       p.Value,
+		}
+		out = append(out, pp)
+	}
+	return out
+}

+ 390 - 0
daemon/cluster/controllers/plugin/controller_test.go

@@ -0,0 +1,390 @@
+package plugin
+
+import (
+	"errors"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/reference"
+	enginetypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/swarm/runtime"
+	"github.com/docker/docker/pkg/pubsub"
+	"github.com/docker/docker/plugin"
+	"github.com/docker/docker/plugin/v2"
+	"golang.org/x/net/context"
+)
+
+const (
+	pluginTestName          = "test"
+	pluginTestRemote        = "testremote"
+	pluginTestRemoteUpgrade = "testremote2"
+)
+
+func TestPrepare(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, false)
+	ctx := context.Background()
+
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	if b.p == nil {
+		t.Fatal("pull not performed")
+	}
+
+	c = newTestController(b, false)
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if b.p == nil {
+		t.Fatal("unexpected nil")
+	}
+	if b.p.PluginObj.PluginReference != pluginTestRemoteUpgrade {
+		t.Fatal("upgrade not performed")
+	}
+
+	c = newTestController(b, false)
+	c.serviceID = "1"
+	if err := c.Prepare(ctx); err == nil {
+		t.Fatal("expected error on prepare")
+	}
+}
+
+func TestStart(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, false)
+	ctx := context.Background()
+
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	if !b.p.IsEnabled() {
+		t.Fatal("expected plugin to be enabled")
+	}
+
+	c = newTestController(b, true)
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if b.p.IsEnabled() {
+		t.Fatal("expected plugin to be disabled")
+	}
+
+	c = newTestController(b, false)
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if !b.p.IsEnabled() {
+		t.Fatal("expected plugin to be enabled")
+	}
+}
+
+func TestWaitCancel(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, true)
+	ctx := context.Background()
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	ctxCancel, cancel := context.WithCancel(ctx)
+	chErr := make(chan error)
+	go func() {
+		chErr <- c.Wait(ctxCancel)
+	}()
+	cancel()
+	select {
+	case err := <-chErr:
+		if err != context.Canceled {
+			t.Fatal(err)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for cancelation")
+	}
+}
+
+func TestWaitDisabled(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, true)
+	ctx := context.Background()
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	chErr := make(chan error)
+	go func() {
+		chErr <- c.Wait(ctx)
+	}()
+
+	if err := b.Enable("test", nil); err != nil {
+		t.Fatal(err)
+	}
+	select {
+	case err := <-chErr:
+		if err == nil {
+			t.Fatal("expected error")
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	ctxWaitReady, cancelCtxWaitReady := context.WithTimeout(ctx, 30*time.Second)
+	c.signalWaitReady = cancelCtxWaitReady
+	defer cancelCtxWaitReady()
+
+	go func() {
+		chErr <- c.Wait(ctx)
+	}()
+
+	chEvent, cancel := b.SubscribeEvents(1)
+	defer cancel()
+
+	if err := b.Disable("test", nil); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-chEvent:
+		<-ctxWaitReady.Done()
+		if err := ctxWaitReady.Err(); err == context.DeadlineExceeded {
+			t.Fatal(err)
+		}
+		select {
+		case <-chErr:
+			t.Fatal("wait returned unexpectedly")
+		default:
+			// all good
+		}
+	case <-chErr:
+		t.Fatal("wait returned unexpectedly")
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+
+	if err := b.Remove("test", nil); err != nil {
+		t.Fatal(err)
+	}
+	select {
+	case err := <-chErr:
+		if err == nil {
+			t.Fatal("expected error")
+		}
+		if !strings.Contains(err.Error(), "removed") {
+			t.Fatal(err)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+}
+
+func TestWaitEnabled(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, false)
+	ctx := context.Background()
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	chErr := make(chan error)
+	go func() {
+		chErr <- c.Wait(ctx)
+	}()
+
+	if err := b.Disable("test", nil); err != nil {
+		t.Fatal(err)
+	}
+	select {
+	case err := <-chErr:
+		if err == nil {
+			t.Fatal("expected error")
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+
+	if err := c.Start(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	ctxWaitReady, ctxWaitCancel := context.WithCancel(ctx)
+	c.signalWaitReady = ctxWaitCancel
+	defer ctxWaitCancel()
+
+	go func() {
+		chErr <- c.Wait(ctx)
+	}()
+
+	chEvent, cancel := b.SubscribeEvents(1)
+	defer cancel()
+
+	if err := b.Enable("test", nil); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-chEvent:
+		<-ctxWaitReady.Done()
+		if err := ctxWaitReady.Err(); err == context.DeadlineExceeded {
+			t.Fatal(err)
+		}
+		select {
+		case <-chErr:
+			t.Fatal("wait returned unexpectedly")
+		default:
+			// all good
+		}
+	case <-chErr:
+		t.Fatal("wait returned unexpectedly")
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+
+	if err := b.Remove("test", nil); err != nil {
+		t.Fatal(err)
+	}
+	select {
+	case err := <-chErr:
+		if err == nil {
+			t.Fatal("expected error")
+		}
+		if !strings.Contains(err.Error(), "removed") {
+			t.Fatal(err)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for event")
+	}
+}
+
+func TestRemove(t *testing.T) {
+	b := newMockBackend()
+	c := newTestController(b, false)
+	ctx := context.Background()
+
+	if err := c.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c.Shutdown(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	c2 := newTestController(b, false)
+	if err := c2.Prepare(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := c.Remove(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if b.p == nil {
+		t.Fatal("plugin removed unexpectedly")
+	}
+	if err := c2.Shutdown(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if err := c2.Remove(ctx); err != nil {
+		t.Fatal(err)
+	}
+	if b.p != nil {
+		t.Fatal("expected plugin to be removed")
+	}
+}
+
+func newTestController(b Backend, disabled bool) *Controller {
+	return &Controller{
+		logger:  &logrus.Entry{Logger: &logrus.Logger{Out: ioutil.Discard}},
+		backend: b,
+		spec: runtime.PluginSpec{
+			Name:     pluginTestName,
+			Remote:   pluginTestRemote,
+			Disabled: disabled,
+		},
+	}
+}
+
+func newMockBackend() *mockBackend {
+	return &mockBackend{
+		pub: pubsub.NewPublisher(0, 0),
+	}
+}
+
+type mockBackend struct {
+	p   *v2.Plugin
+	pub *pubsub.Publisher
+}
+
+func (m *mockBackend) Disable(name string, config *enginetypes.PluginDisableConfig) error {
+	m.p.PluginObj.Enabled = false
+	m.pub.Publish(plugin.EventDisable{})
+	return nil
+}
+
+func (m *mockBackend) Enable(name string, config *enginetypes.PluginEnableConfig) error {
+	m.p.PluginObj.Enabled = true
+	m.pub.Publish(plugin.EventEnable{})
+	return nil
+}
+
+func (m *mockBackend) Remove(name string, config *enginetypes.PluginRmConfig) error {
+	m.p = nil
+	m.pub.Publish(plugin.EventRemove{})
+	return nil
+}
+
+func (m *mockBackend) Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error {
+	m.p = &v2.Plugin{
+		PluginObj: enginetypes.Plugin{
+			ID:              "1234",
+			Name:            name,
+			PluginReference: ref.String(),
+		},
+	}
+	return nil
+}
+
+func (m *mockBackend) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error {
+	m.p.PluginObj.PluginReference = pluginTestRemoteUpgrade
+	return nil
+}
+
+func (m *mockBackend) Get(name string) (*v2.Plugin, error) {
+	if m.p == nil {
+		return nil, errors.New("not found")
+	}
+	return m.p, nil
+}
+
+func (m *mockBackend) SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func()) {
+	ch := m.pub.SubscribeTopicWithBuffer(nil, buffer)
+	cancel = func() { m.pub.Evict(ch) }
+	return ch, cancel
+}

+ 6 - 3
daemon/cluster/convert/container.go

@@ -13,8 +13,11 @@ import (
 	gogotypes "github.com/gogo/protobuf/types"
 )
 
-func containerSpecFromGRPC(c *swarmapi.ContainerSpec) types.ContainerSpec {
-	containerSpec := types.ContainerSpec{
+func containerSpecFromGRPC(c *swarmapi.ContainerSpec) *types.ContainerSpec {
+	if c == nil {
+		return nil
+	}
+	containerSpec := &types.ContainerSpec{
 		Image:      c.Image,
 		Labels:     c.Labels,
 		Command:    c.Command,
@@ -211,7 +214,7 @@ func configReferencesFromGRPC(sr []*swarmapi.ConfigReference) []*types.ConfigRef
 	return refs
 }
 
-func containerToGRPC(c types.ContainerSpec) (*swarmapi.ContainerSpec, error) {
+func containerToGRPC(c *types.ContainerSpec) (*swarmapi.ContainerSpec, error) {
 	containerSpec := &swarmapi.ContainerSpec{
 		Image:      c.Image,
 		Labels:     c.Labels,

+ 56 - 21
daemon/cluster/convert/service.go

@@ -1,14 +1,16 @@
 package convert
 
 import (
-	"errors"
 	"fmt"
 	"strings"
 
 	types "github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/api/types/swarm/runtime"
 	"github.com/docker/docker/pkg/namesgenerator"
 	swarmapi "github.com/docker/swarmkit/api"
+	"github.com/gogo/protobuf/proto"
 	gogotypes "github.com/gogo/protobuf/types"
+	"github.com/pkg/errors"
 )
 
 var (
@@ -85,7 +87,10 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error)
 
 	}
 
-	taskTemplate := taskSpecFromGRPC(spec.Task)
+	taskTemplate, err := taskSpecFromGRPC(spec.Task)
+	if err != nil {
+		return nil, err
+	}
 
 	switch t := spec.Task.GetRuntime().(type) {
 	case *swarmapi.TaskSpec_Container:
@@ -164,19 +169,34 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
 
 	switch s.TaskTemplate.Runtime {
 	case types.RuntimeContainer, "": // if empty runtime default to container
-		containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
-		if err != nil {
-			return swarmapi.ServiceSpec{}, err
+		if s.TaskTemplate.ContainerSpec != nil {
+			containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
+			if err != nil {
+				return swarmapi.ServiceSpec{}, err
+			}
+			spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
 		}
-		spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
 	case types.RuntimePlugin:
-		spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
-			Generic: &swarmapi.GenericRuntimeSpec{
-				Kind: string(types.RuntimePlugin),
-				Payload: &gogotypes.Any{
-					TypeUrl: string(types.RuntimeURLPlugin),
+		if s.Mode.Replicated != nil {
+			return swarmapi.ServiceSpec{}, errors.New("plugins must not use replicated mode")
+		}
+
+		s.Mode.Global = &types.GlobalService{} // must always be global
+
+		if s.TaskTemplate.PluginSpec != nil {
+			pluginSpec, err := proto.Marshal(s.TaskTemplate.PluginSpec)
+			if err != nil {
+				return swarmapi.ServiceSpec{}, err
+			}
+			spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
+				Generic: &swarmapi.GenericRuntimeSpec{
+					Kind: string(types.RuntimePlugin),
+					Payload: &gogotypes.Any{
+						TypeUrl: string(types.RuntimeURLPlugin),
+						Value:   pluginSpec,
+					},
 				},
-			},
+			}
 		}
 	default:
 		return swarmapi.ServiceSpec{}, ErrUnsupportedRuntime
@@ -507,21 +527,14 @@ func updateConfigToGRPC(updateConfig *types.UpdateConfig) (*swarmapi.UpdateConfi
 	return converted, nil
 }
 
-func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec {
+func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) (types.TaskSpec, error) {
 	taskNetworks := make([]types.NetworkAttachmentConfig, 0, len(taskSpec.Networks))
 	for _, n := range taskSpec.Networks {
 		netConfig := types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases, DriverOpts: n.DriverAttachmentOpts}
 		taskNetworks = append(taskNetworks, netConfig)
 	}
 
-	c := taskSpec.GetContainer()
-	cSpec := types.ContainerSpec{}
-	if c != nil {
-		cSpec = containerSpecFromGRPC(c)
-	}
-
-	return types.TaskSpec{
-		ContainerSpec: cSpec,
+	t := types.TaskSpec{
 		Resources:     resourcesFromGRPC(taskSpec.Resources),
 		RestartPolicy: restartPolicyFromGRPC(taskSpec.Restart),
 		Placement:     placementFromGRPC(taskSpec.Placement),
@@ -529,4 +542,26 @@ func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec {
 		Networks:      taskNetworks,
 		ForceUpdate:   taskSpec.ForceUpdate,
 	}
+
+	switch taskSpec.GetRuntime().(type) {
+	case *swarmapi.TaskSpec_Container, nil:
+		c := taskSpec.GetContainer()
+		if c != nil {
+			t.ContainerSpec = containerSpecFromGRPC(c)
+		}
+	case *swarmapi.TaskSpec_Generic:
+		g := taskSpec.GetGeneric()
+		if g != nil {
+			switch g.Kind {
+			case string(types.RuntimePlugin):
+				var p runtime.PluginSpec
+				if err := proto.Unmarshal(g.Payload.Value, &p); err != nil {
+					return t, errors.Wrap(err, "error unmarshalling plugin spec")
+				}
+				t.PluginSpec = &p
+			}
+		}
+	}
+
+	return t, nil
 }

+ 4 - 2
daemon/cluster/convert/service_test.go

@@ -4,6 +4,7 @@ import (
 	"testing"
 
 	swarmtypes "github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/api/types/swarm/runtime"
 	swarmapi "github.com/docker/swarmkit/api"
 	google_protobuf3 "github.com/gogo/protobuf/types"
 )
@@ -82,7 +83,8 @@ func TestServiceConvertFromGRPCGenericRuntimePlugin(t *testing.T) {
 func TestServiceConvertToGRPCGenericRuntimePlugin(t *testing.T) {
 	s := swarmtypes.ServiceSpec{
 		TaskTemplate: swarmtypes.TaskSpec{
-			Runtime: swarmtypes.RuntimePlugin,
+			Runtime:    swarmtypes.RuntimePlugin,
+			PluginSpec: &runtime.PluginSpec{},
 		},
 		Mode: swarmtypes.ServiceMode{
 			Global: &swarmtypes.GlobalService{},
@@ -108,7 +110,7 @@ func TestServiceConvertToGRPCContainerRuntime(t *testing.T) {
 	image := "alpine:latest"
 	s := swarmtypes.ServiceSpec{
 		TaskTemplate: swarmtypes.TaskSpec{
-			ContainerSpec: swarmtypes.ContainerSpec{
+			ContainerSpec: &swarmtypes.ContainerSpec{
 				Image: image,
 			},
 		},

+ 9 - 6
daemon/cluster/convert/task.go

@@ -9,19 +9,22 @@ import (
 )
 
 // TaskFromGRPC converts a grpc Task to a Task.
-func TaskFromGRPC(t swarmapi.Task) types.Task {
+func TaskFromGRPC(t swarmapi.Task) (types.Task, error) {
 	if t.Spec.GetAttachment() != nil {
-		return types.Task{}
+		return types.Task{}, nil
 	}
 	containerStatus := t.Status.GetContainer()
-
+	taskSpec, err := taskSpecFromGRPC(t.Spec)
+	if err != nil {
+		return types.Task{}, err
+	}
 	task := types.Task{
 		ID:          t.ID,
 		Annotations: annotationsFromGRPC(t.Annotations),
 		ServiceID:   t.ServiceID,
 		Slot:        int(t.Slot),
 		NodeID:      t.NodeID,
-		Spec:        taskSpecFromGRPC(t.Spec),
+		Spec:        taskSpec,
 		Status: types.TaskStatus{
 			State:   types.TaskState(strings.ToLower(t.Status.State.String())),
 			Message: t.Status.Message,
@@ -49,7 +52,7 @@ func TaskFromGRPC(t swarmapi.Task) types.Task {
 	}
 
 	if t.Status.PortStatus == nil {
-		return task
+		return task, nil
 	}
 
 	for _, p := range t.Status.PortStatus.Ports {
@@ -62,5 +65,5 @@ func TaskFromGRPC(t swarmapi.Task) types.Task {
 		})
 	}
 
-	return task
+	return task, nil
 }

+ 8 - 6
daemon/cluster/executor/container/executor.go

@@ -22,15 +22,17 @@ import (
 )
 
 type executor struct {
-	backend      executorpkg.Backend
-	dependencies exec.DependencyManager
+	backend       executorpkg.Backend
+	pluginBackend plugin.Backend
+	dependencies  exec.DependencyManager
 }
 
 // NewExecutor returns an executor from the docker client.
-func NewExecutor(b executorpkg.Backend) exec.Executor {
+func NewExecutor(b executorpkg.Backend, p plugin.Backend) exec.Executor {
 	return &executor{
-		backend:      b,
-		dependencies: agent.NewDependencyManager(),
+		backend:       b,
+		pluginBackend: p,
+		dependencies:  agent.NewDependencyManager(),
 	}
 }
 
@@ -181,7 +183,7 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
 		}
 		switch runtimeKind {
 		case string(swarmtypes.RuntimePlugin):
-			c, err := plugin.NewController()
+			c, err := plugin.NewController(e.pluginBackend, t)
 			if err != nil {
 				return ctlr, err
 			}

+ 2 - 0
daemon/cluster/filters.go

@@ -57,6 +57,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e
 		// internal use in checking create/update progress. Therefore,
 		// we prefix it with a '_'.
 		"_up-to-date": true,
+		"runtime":     true,
 	}
 	if err := filter.Validate(accepted); err != nil {
 		return nil, err
@@ -73,6 +74,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e
 		ServiceIDs:   filter.Get("service"),
 		NodeIDs:      filter.Get("node"),
 		UpToDate:     len(filter.Get("_up-to-date")) != 0,
+		Runtimes:     filter.Get("runtime"),
 	}
 
 	for _, s := range filter.Get("desired-state") {

+ 1 - 1
daemon/cluster/noderunner.go

@@ -118,7 +118,7 @@ func (n *nodeRunner) start(conf nodeStartConfig) error {
 		JoinAddr:           joinAddr,
 		StateDir:           n.cluster.root,
 		JoinToken:          conf.joinToken,
-		Executor:           container.NewExecutor(n.cluster.config.Backend),
+		Executor:           container.NewExecutor(n.cluster.config.Backend, n.cluster.config.PluginBackend),
 		HeartbeatTick:      1,
 		ElectionTick:       3,
 		UnlockKey:          conf.lockKey,

+ 91 - 63
daemon/cluster/services.go

@@ -50,14 +50,16 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
 		return nil, err
 	}
 
+	if len(options.Filters.Get("runtime")) == 0 {
+		// Default to using the container runtime filter
+		options.Filters.Add("runtime", string(types.RuntimeContainer))
+	}
+
 	filters := &swarmapi.ListServicesRequest_Filters{
 		NamePrefixes: options.Filters.Get("name"),
 		IDPrefixes:   options.Filters.Get("id"),
 		Labels:       runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
-		// (ehazlett): hardcode runtime for now. eventually we will
-		// be able to filter for the desired runtimes once more
-		// are supported.
-		Runtimes: []string{string(types.RuntimeContainer)},
+		Runtimes:     options.Filters.Get("runtime"),
 	}
 
 	ctx, cancel := c.getRequestContext()
@@ -134,6 +136,20 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe
 
 		switch serviceSpec.Task.Runtime.(type) {
 		// handle other runtimes here
+		case *swarmapi.TaskSpec_Generic:
+			switch serviceSpec.Task.GetGeneric().Kind {
+			case string(types.RuntimePlugin):
+				if s.TaskTemplate.PluginSpec == nil {
+					return errors.New("plugin spec must be set")
+				}
+			}
+
+			r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
+			if err != nil {
+				return err
+			}
+
+			resp.ID = r.Service.ID
 		case *swarmapi.TaskSpec_Container:
 			ctnr := serviceSpec.Task.GetContainer()
 			if ctnr == nil {
@@ -146,7 +162,9 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe
 			// retrieve auth config from encoded auth
 			authConfig := &apitypes.AuthConfig{}
 			if encodedAuth != "" {
-				if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
+				authReader := strings.NewReader(encodedAuth)
+				dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
+				if err := dec.Decode(authConfig); err != nil {
 					logrus.Warnf("invalid authconfig: %v", err)
 				}
 			}
@@ -216,75 +234,85 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ
 			return err
 		}
 
-		newCtnr := serviceSpec.Task.GetContainer()
-		if newCtnr == nil {
-			return errors.New("service does not use container tasks")
-		}
+		resp = &apitypes.ServiceUpdateResponse{}
 
-		encodedAuth := flags.EncodedRegistryAuth
-		if encodedAuth != "" {
-			newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
-		} else {
-			// this is needed because if the encodedAuth isn't being updated then we
-			// shouldn't lose it, and continue to use the one that was already present
-			var ctnr *swarmapi.ContainerSpec
-			switch flags.RegistryAuthFrom {
-			case apitypes.RegistryAuthFromSpec, "":
-				ctnr = currentService.Spec.Task.GetContainer()
-			case apitypes.RegistryAuthFromPreviousSpec:
-				if currentService.PreviousSpec == nil {
-					return errors.New("service does not have a previous spec")
+		switch serviceSpec.Task.Runtime.(type) {
+		case *swarmapi.TaskSpec_Generic:
+			switch serviceSpec.Task.GetGeneric().Kind {
+			case string(types.RuntimePlugin):
+				if spec.TaskTemplate.PluginSpec == nil {
+					return errors.New("plugin spec must be set")
 				}
-				ctnr = currentService.PreviousSpec.Task.GetContainer()
-			default:
-				return errors.New("unsupported registryAuthFrom value")
 			}
-			if ctnr == nil {
+		case *swarmapi.TaskSpec_Container:
+			newCtnr := serviceSpec.Task.GetContainer()
+			if newCtnr == nil {
 				return errors.New("service does not use container tasks")
 			}
-			newCtnr.PullOptions = ctnr.PullOptions
-			// update encodedAuth so it can be used to pin image by digest
-			if ctnr.PullOptions != nil {
-				encodedAuth = ctnr.PullOptions.RegistryAuth
+
+			encodedAuth := flags.EncodedRegistryAuth
+			if encodedAuth != "" {
+				newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
+			} else {
+				// this is needed because if the encodedAuth isn't being updated then we
+				// shouldn't lose it, and continue to use the one that was already present
+				var ctnr *swarmapi.ContainerSpec
+				switch flags.RegistryAuthFrom {
+				case apitypes.RegistryAuthFromSpec, "":
+					ctnr = currentService.Spec.Task.GetContainer()
+				case apitypes.RegistryAuthFromPreviousSpec:
+					if currentService.PreviousSpec == nil {
+						return errors.New("service does not have a previous spec")
+					}
+					ctnr = currentService.PreviousSpec.Task.GetContainer()
+				default:
+					return errors.New("unsupported registryAuthFrom value")
+				}
+				if ctnr == nil {
+					return errors.New("service does not use container tasks")
+				}
+				newCtnr.PullOptions = ctnr.PullOptions
+				// update encodedAuth so it can be used to pin image by digest
+				if ctnr.PullOptions != nil {
+					encodedAuth = ctnr.PullOptions.RegistryAuth
+				}
 			}
-		}
 
-		// retrieve auth config from encoded auth
-		authConfig := &apitypes.AuthConfig{}
-		if encodedAuth != "" {
-			if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
-				logrus.Warnf("invalid authconfig: %v", err)
+			// retrieve auth config from encoded auth
+			authConfig := &apitypes.AuthConfig{}
+			if encodedAuth != "" {
+				if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
+					logrus.Warnf("invalid authconfig: %v", err)
+				}
 			}
-		}
 
-		resp = &apitypes.ServiceUpdateResponse{}
+			// pin image by digest for API versions < 1.30
+			// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
+			// should be removed in the future. Since integration tests only use the
+			// latest API version, so this is no longer required.
+			if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
+				digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
+				if err != nil {
+					logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
+					// warning in the client response should be concise
+					resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
+				} else if newCtnr.Image != digestImage {
+					logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
+					newCtnr.Image = digestImage
+				} else {
+					logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
+				}
 
-		// pin image by digest for API versions < 1.30
-		// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
-		// should be removed in the future. Since integration tests only use the
-		// latest API version, so this is no longer required.
-		if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
-			digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
-			if err != nil {
-				logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
-				// warning in the client response should be concise
-				resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
-			} else if newCtnr.Image != digestImage {
-				logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
-				newCtnr.Image = digestImage
-			} else {
-				logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
+				// Replace the context with a fresh one.
+				// If we timed out while communicating with the
+				// registry, then "ctx" will already be expired, which
+				// would cause UpdateService below to fail. Reusing
+				// "ctx" could make it impossible to update a service
+				// if the registry is slow or unresponsive.
+				var cancel func()
+				ctx, cancel = c.getRequestContext()
+				defer cancel()
 			}
-
-			// Replace the context with a fresh one.
-			// If we timed out while communicating with the
-			// registry, then "ctx" will already be expired, which
-			// would cause UpdateService below to fail. Reusing
-			// "ctx" could make it impossible to update a service
-			// if the registry is slow or unresponsive.
-			var cancel func()
-			ctx, cancel = c.getRequestContext()
-			defer cancel()
 		}
 
 		var rollback swarmapi.UpdateServiceRequest_Rollback

+ 12 - 6
daemon/cluster/tasks.go

@@ -19,7 +19,7 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
 		return nil, c.errNoManager(state)
 	}
 
-	byName := func(filter filters.Args) error {
+	filterTransform := func(filter filters.Args) error {
 		if filter.Include("service") {
 			serviceFilters := filter.Get("service")
 			for _, serviceFilter := range serviceFilters {
@@ -42,10 +42,15 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
 				filter.Add("node", node.ID)
 			}
 		}
+		if !filter.Include("runtime") {
+			// default to only showing container tasks
+			filter.Add("runtime", "container")
+			filter.Add("runtime", "")
+		}
 		return nil
 	}
 
-	filters, err := newListTasksFilters(options.Filters, byName)
+	filters, err := newListTasksFilters(options.Filters, filterTransform)
 	if err != nil {
 		return nil, err
 	}
@@ -61,11 +66,12 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
 	}
 
 	tasks := make([]types.Task, 0, len(r.Tasks))
-
 	for _, task := range r.Tasks {
-		if task.Spec.GetContainer() != nil {
-			tasks = append(tasks, convert.TaskFromGRPC(*task))
+		t, err := convert.TaskFromGRPC(*task)
+		if err != nil {
+			return nil, err
 		}
+		tasks = append(tasks, t)
 	}
 	return tasks, nil
 }
@@ -83,5 +89,5 @@ func (c *Cluster) GetTask(input string) (types.Task, error) {
 	}); err != nil {
 		return types.Task{}, err
 	}
-	return convert.TaskFromGRPC(*task), nil
+	return convert.TaskFromGRPC(*task)
 }

+ 1 - 0
docs/api/version-history.md

@@ -28,6 +28,7 @@ keywords: "API, Docker, rcli, REST, documentation"
 * `GET /images/(name)/get` now includes an `ImageMetadata` field which contains image metadata that is local to the engine and not part of the image config.
 * `POST /swarm/init` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
 * `POST /swarm/join` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
+* `POST /services/create` now accepts a `PluginSpec` when `TaskTemplate.Runtime` is set to `plugin`
 
 ## v1.30 API changes
 

+ 51 - 9
integration-cli/daemon/daemon_swarm.go

@@ -1,6 +1,7 @@
 package daemon
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -10,6 +11,7 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/client"
 	"github.com/docker/docker/integration-cli/checker"
 	"github.com/go-check/check"
 	"github.com/pkg/errors"
@@ -124,20 +126,29 @@ type ConfigConstructor func(*swarm.Config)
 // SpecConstructor defines a swarm spec constructor
 type SpecConstructor func(*swarm.Spec)
 
-// CreateService creates a swarm service given the specified service constructor
-func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string {
+// CreateServiceWithOptions creates a swarm service given the specified service constructors
+// and auth config
+func (d *Swarm) CreateServiceWithOptions(c *check.C, opts types.ServiceCreateOptions, f ...ServiceConstructor) string {
+	cl, err := client.NewClient(d.Sock(), "", nil, nil)
+	c.Assert(err, checker.IsNil, check.Commentf("failed to create client"))
+	defer cl.Close()
+
 	var service swarm.Service
 	for _, fn := range f {
 		fn(&service)
 	}
-	status, out, err := d.SockRequest("POST", "/services/create", service.Spec)
 
-	c.Assert(err, checker.IsNil, check.Commentf(string(out)))
-	c.Assert(status, checker.Equals, http.StatusCreated, check.Commentf("output: %q", string(out)))
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
 
-	var scr types.ServiceCreateResponse
-	c.Assert(json.Unmarshal(out, &scr), checker.IsNil)
-	return scr.ID
+	res, err := cl.ServiceCreate(ctx, service.Spec, opts)
+	c.Assert(err, checker.IsNil)
+	return res.ID
+}
+
+// CreateService creates a swarm service given the specified service constructor
+func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string {
+	return d.CreateServiceWithOptions(c, types.ServiceCreateOptions{}, f...)
 }
 
 // GetService returns the swarm service corresponding to the specified id
@@ -200,6 +211,37 @@ func (d *Swarm) CheckServiceUpdateState(service string) func(*check.C) (interfac
 	}
 }
 
+// CheckPluginRunning returns the runtime state of the plugin
+func (d *Swarm) CheckPluginRunning(plugin string) func(c *check.C) (interface{}, check.CommentInterface) {
+	return func(c *check.C) (interface{}, check.CommentInterface) {
+		status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil)
+		c.Assert(err, checker.IsNil, check.Commentf(string(out)))
+		if status != http.StatusOK {
+			return false, nil
+		}
+
+		var p types.Plugin
+		c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out)))
+
+		return p.Enabled, check.Commentf("%+v", p)
+	}
+}
+
+// CheckPluginImage returns the runtime state of the plugin
+func (d *Swarm) CheckPluginImage(plugin string) func(c *check.C) (interface{}, check.CommentInterface) {
+	return func(c *check.C) (interface{}, check.CommentInterface) {
+		status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil)
+		c.Assert(err, checker.IsNil, check.Commentf(string(out)))
+		if status != http.StatusOK {
+			return false, nil
+		}
+
+		var p types.Plugin
+		c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out)))
+		return p.PluginReference, check.Commentf("%+v", p)
+	}
+}
+
 // CheckServiceTasks returns the number of tasks for the specified service
 func (d *Swarm) CheckServiceTasks(service string) func(*check.C) (interface{}, check.CommentInterface) {
 	return func(c *check.C) (interface{}, check.CommentInterface) {
@@ -247,7 +289,7 @@ func (d *Swarm) CheckRunningTaskImages(c *check.C) (interface{}, check.CommentIn
 
 	result := make(map[string]int)
 	for _, task := range tasks {
-		if task.Status.State == swarm.TaskStateRunning {
+		if task.Status.State == swarm.TaskStateRunning && task.Spec.ContainerSpec != nil {
 			result[task.Spec.ContainerSpec.Image]++
 		}
 	}

+ 78 - 0
integration-cli/docker_api_swarm_service_test.go

@@ -4,15 +4,19 @@ package main
 
 import (
 	"fmt"
+	"path"
 	"strconv"
 	"strings"
 	"syscall"
 	"time"
 
 	"github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/api/types/swarm/runtime"
 	"github.com/docker/docker/integration-cli/checker"
 	"github.com/docker/docker/integration-cli/daemon"
+	"github.com/docker/docker/integration-cli/fixtures/plugin"
 	"github.com/go-check/check"
+	"golang.org/x/net/context"
 )
 
 func setPortConfig(portConfig []swarm.PortConfig) daemon.ServiceConstructor {
@@ -596,3 +600,77 @@ func (s *DockerSwarmSuite) TestAPISwarmServicesStateReporting(c *check.C) {
 		}
 	}
 }
+
+// Test plugins deployed via swarm services
+func (s *DockerSwarmSuite) TestAPISwarmServicesPlugin(c *check.C) {
+	testRequires(c, DaemonIsLinux, IsAmd64)
+	reg := setupRegistry(c, false, "", "")
+	defer reg.Close()
+
+	repo := path.Join(privateRegistryURL, "swarm", "test:v1")
+	repo2 := path.Join(privateRegistryURL, "swarm", "test:v2")
+	name := "test"
+
+	err := plugin.CreateInRegistry(context.Background(), repo, nil)
+	c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin"))
+	err = plugin.CreateInRegistry(context.Background(), repo2, nil)
+	c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin"))
+
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, true)
+	d3 := s.AddDaemon(c, true, false)
+
+	makePlugin := func(repo, name string, constraints []string) func(*swarm.Service) {
+		return func(s *swarm.Service) {
+			s.Spec.TaskTemplate.Runtime = "plugin"
+			s.Spec.TaskTemplate.PluginSpec = &runtime.PluginSpec{
+				Name:   name,
+				Remote: repo,
+			}
+			if constraints != nil {
+				s.Spec.TaskTemplate.Placement = &swarm.Placement{
+					Constraints: constraints,
+				}
+			}
+		}
+	}
+
+	id := d1.CreateService(c, makePlugin(repo, name, nil))
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True)
+
+	service := d1.GetService(c, id)
+	d1.UpdateService(c, service, makePlugin(repo2, name, nil))
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginImage(name), checker.Equals, repo2)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginImage(name), checker.Equals, repo2)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginImage(name), checker.Equals, repo2)
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True)
+
+	d1.RemoveService(c, id)
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False)
+
+	// constrain to managers only
+	id = d1.CreateService(c, makePlugin(repo, name, []string{"node.role==manager"}))
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False) // Not a manager, not running it
+	d1.RemoveService(c, id)
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False)
+
+	// with no name
+	id = d1.CreateService(c, makePlugin(repo, "", nil))
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.True)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.True)
+	d1.RemoveService(c, id)
+	waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.False)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.False)
+}

+ 8 - 2
integration-cli/docker_api_swarm_test.go

@@ -560,7 +560,7 @@ func simpleTestService(s *swarm.Service) {
 
 	s.Spec = swarm.ServiceSpec{
 		TaskTemplate: swarm.TaskSpec{
-			ContainerSpec: swarm.ContainerSpec{
+			ContainerSpec: &swarm.ContainerSpec{
 				Image:   "busybox:latest",
 				Command: []string{"/bin/top"},
 			},
@@ -583,7 +583,7 @@ func serviceForUpdate(s *swarm.Service) {
 
 	s.Spec = swarm.ServiceSpec{
 		TaskTemplate: swarm.TaskSpec{
-			ContainerSpec: swarm.ContainerSpec{
+			ContainerSpec: &swarm.ContainerSpec{
 				Image:   "busybox:latest",
 				Command: []string{"/bin/top"},
 			},
@@ -641,6 +641,9 @@ func setRollbackOrder(order string) daemon.ServiceConstructor {
 
 func setImage(image string) daemon.ServiceConstructor {
 	return func(s *swarm.Service) {
+		if s.Spec.TaskTemplate.ContainerSpec == nil {
+			s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
+		}
 		s.Spec.TaskTemplate.ContainerSpec.Image = image
 	}
 }
@@ -921,6 +924,9 @@ func (s *DockerSwarmSuite) TestAPISwarmHealthcheckNone(c *check.C) {
 
 	instances := 1
 	d.CreateService(c, simpleTestService, setInstances(instances), func(s *swarm.Service) {
+		if s.Spec.TaskTemplate.ContainerSpec == nil {
+			s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
+		}
 		s.Spec.TaskTemplate.ContainerSpec.Healthcheck = &container.HealthConfig{}
 		s.Spec.TaskTemplate.Networks = []swarm.NetworkAttachmentConfig{
 			{Target: "lb"},

+ 34 - 0
integration-cli/fixtures/plugin/basic/basic.go

@@ -0,0 +1,34 @@
+package main
+
+import (
+	"fmt"
+	"net"
+	"net/http"
+	"os"
+	"path/filepath"
+)
+
+func main() {
+	p, err := filepath.Abs(filepath.Join("run", "docker", "plugins"))
+	if err != nil {
+		panic(err)
+	}
+	if err := os.MkdirAll(p, 0755); err != nil {
+		panic(err)
+	}
+	l, err := net.Listen("unix", filepath.Join(p, "basic.sock"))
+	if err != nil {
+		panic(err)
+	}
+
+	mux := http.NewServeMux()
+	server := http.Server{
+		Addr:    l.Addr().String(),
+		Handler: http.NewServeMux(),
+	}
+	mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
+		w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1.1+json")
+		fmt.Println(w, `{"Implements": ["dummy"]}`)
+	})
+	server.Serve(l)
+}

+ 183 - 0
integration-cli/fixtures/plugin/plugin.go

@@ -0,0 +1,183 @@
+package plugin
+
+import (
+	"encoding/json"
+	"io"
+	"io/ioutil"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"time"
+
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/libcontainerd"
+	"github.com/docker/docker/pkg/archive"
+	"github.com/docker/docker/plugin"
+	"github.com/docker/docker/registry"
+	"github.com/pkg/errors"
+	"golang.org/x/net/context"
+)
+
+// CreateOpt is is passed used to change the defualt plugin config before
+// creating it
+type CreateOpt func(*Config)
+
+// Config wraps types.PluginConfig to provide some extra state for options
+// extra customizations on the plugin details, such as using a custom binary to
+// create the plugin with.
+type Config struct {
+	*types.PluginConfig
+	binPath string
+}
+
+// WithBinary is a CreateOpt to set an custom binary to create the plugin with.
+// This binary must be statically compiled.
+func WithBinary(bin string) CreateOpt {
+	return func(cfg *Config) {
+		cfg.binPath = bin
+	}
+}
+
+// CreateClient is the interface used for `BuildPlugin` to interact with the
+// daemon.
+type CreateClient interface {
+	PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error
+}
+
+// Create creates a new plugin with the specified name
+func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
+	tmpDir, err := ioutil.TempDir("", "create-test-plugin")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(tmpDir)
+
+	tar, err := makePluginBundle(tmpDir, opts...)
+	if err != nil {
+		return err
+	}
+	defer tar.Close()
+
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+
+	return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
+}
+
+// TODO(@cpuguy83): we really shouldn't have to do this...
+// The manager panics on init when `Executor` is not set.
+type dummyExecutor struct{}
+
+func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil }
+func (dummyExecutor) Cleanup()                                                   {}
+func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error          { return nil }
+
+// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
+// This does not use a dockerd instance to create or push the plugin.
+// If you just want to create a plugin in some daemon, use `Create`.
+//
+// This can be useful when testing plugins on swarm where you don't really want
+// the plugin to exist on any of the daemons (immediately) and there needs to be
+// some way to distribute the plugin.
+func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
+	tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(tmpDir)
+
+	inPath := filepath.Join(tmpDir, "plugin")
+	if err := os.MkdirAll(inPath, 0755); err != nil {
+		return errors.Wrap(err, "error creating plugin root")
+	}
+
+	tar, err := makePluginBundle(inPath, opts...)
+	if err != nil {
+		return err
+	}
+	defer tar.Close()
+
+	managerConfig := plugin.ManagerConfig{
+		Store:           plugin.NewStore(),
+		RegistryService: registry.NewService(registry.ServiceOptions{V2Only: true}),
+		Root:            filepath.Join(tmpDir, "root"),
+		ExecRoot:        "/run/docker", // manager init fails if not set
+		Executor:        dummyExecutor{},
+		LogPluginEvent:  func(id, name, action string) {}, // panics when not set
+	}
+	manager, err := plugin.NewManager(managerConfig)
+	if err != nil {
+		return errors.Wrap(err, "error creating plugin manager")
+	}
+
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+	if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
+		return err
+	}
+
+	if auth == nil {
+		auth = &types.AuthConfig{}
+	}
+	err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
+	return errors.Wrap(err, "error pushing plugin")
+}
+
+func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
+	p := &types.PluginConfig{
+		Interface: types.PluginConfigInterface{
+			Socket: "basic.sock",
+			Types:  []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
+		},
+		Entrypoint: []string{"/basic"},
+	}
+	cfg := &Config{
+		PluginConfig: p,
+	}
+	for _, o := range opts {
+		o(cfg)
+	}
+	if cfg.binPath == "" {
+		binPath, err := ensureBasicPluginBin()
+		if err != nil {
+			return nil, err
+		}
+		cfg.binPath = binPath
+	}
+
+	configJSON, err := json.Marshal(p)
+	if err != nil {
+		return nil, err
+	}
+	if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
+		return nil, err
+	}
+	if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
+		return nil, errors.Wrap(err, "error creating plugin rootfs dir")
+	}
+	if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
+		return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
+	}
+	tar, err := archive.Tar(inPath, archive.Uncompressed)
+	return tar, errors.Wrap(err, "error making plugin archive")
+}
+
+func ensureBasicPluginBin() (string, error) {
+	name := "docker-basic-plugin"
+	p, err := exec.LookPath(name)
+	if err == nil {
+		return p, nil
+	}
+
+	goBin, err := exec.LookPath("go")
+	if err != nil {
+		return "", err
+	}
+	installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
+	cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
+	cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
+	if out, err := cmd.CombinedOutput(); err != nil {
+		return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
+	}
+	return installPath, nil
+}

+ 10 - 0
pkg/pubsub/publisher.go

@@ -53,6 +53,16 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
 	return ch
 }
 
+// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
+// The returned channel has a buffer of the specified size.
+func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
+	ch := make(chan interface{}, buffer)
+	p.m.Lock()
+	p.subscribers[ch] = topic
+	p.m.Unlock()
+	return ch
+}
+
 // Evict removes the specified subscriber from receiving any more messages.
 func (p *Publisher) Evict(sub chan interface{}) {
 	p.m.Lock()

+ 14 - 3
plugin/backend_linux.go

@@ -67,6 +67,7 @@ func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) er
 	if err := pm.disable(p, c); err != nil {
 		return err
 	}
+	pm.publisher.Publish(EventDisable{Plugin: p.PluginObj})
 	pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
 	return nil
 }
@@ -82,6 +83,7 @@ func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) erro
 	if err := pm.enable(p, c, false); err != nil {
 		return err
 	}
+	pm.publisher.Publish(EventEnable{Plugin: p.PluginObj})
 	pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
 	return nil
 }
@@ -296,7 +298,7 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
 }
 
 // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
-func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
+func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) {
 	pm.muGC.RLock()
 	defer pm.muGC.RUnlock()
 
@@ -340,12 +342,19 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
 		return err
 	}
 
-	p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges)
+	refOpt := func(p *v2.Plugin) {
+		p.PluginObj.PluginReference = ref.String()
+	}
+	optsList := make([]CreateOpt, 0, len(opts)+1)
+	optsList = append(optsList, opts...)
+	optsList = append(optsList, refOpt)
+
+	p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...)
 	if err != nil {
 		return err
 	}
-	p.PluginObj.PluginReference = ref.String()
 
+	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
 	return nil
 }
 
@@ -640,6 +649,7 @@ func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
 	}
 	pm.config.Store.Remove(p)
 	pm.config.LogPluginEvent(id, name, "remove")
+	pm.publisher.Publish(EventRemove{Plugin: p.PluginObj})
 	return nil
 }
 
@@ -771,6 +781,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
 	}
 	p.PluginObj.PluginReference = name
 
+	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
 	pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
 
 	return nil

+ 1 - 1
plugin/backend_unsupported.go

@@ -36,7 +36,7 @@ func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHead
 }
 
 // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
-func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer) error {
+func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer, opts ...CreateOpt) error {
 	return errNotSupported
 }
 

+ 11 - 0
plugin/defs.go

@@ -24,3 +24,14 @@ func NewStore() *Store {
 		handlers: make(map[string][]func(string, *plugins.Client)),
 	}
 }
+
+// CreateOpt is used to configure specific plugin details when created
+type CreateOpt func(p *v2.Plugin)
+
+// WithSwarmService is a CreateOpt that flags the passed in a plugin as a plugin
+// managed by swarm
+func WithSwarmService(id string) CreateOpt {
+	return func(p *v2.Plugin) {
+		p.SwarmServiceID = id
+	}
+}

+ 111 - 0
plugin/events.go

@@ -0,0 +1,111 @@
+package plugin
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/docker/docker/api/types"
+)
+
+// Event is emitted for actions performed on the plugin manager
+type Event interface {
+	matches(Event) bool
+}
+
+// EventCreate is an event which is emitted when a plugin is created
+// This is either by pull or create from context.
+//
+// Use the `Interfaces` field to match only plugins that implement a specific
+// interface.
+// These are matched against using "or" logic.
+// If no interfaces are listed, all are matched.
+type EventCreate struct {
+	Interfaces map[string]bool
+	Plugin     types.Plugin
+}
+
+func (e EventCreate) matches(observed Event) bool {
+	oe, ok := observed.(EventCreate)
+	if !ok {
+		return false
+	}
+	if len(e.Interfaces) == 0 {
+		return true
+	}
+
+	var ifaceMatch bool
+	for _, in := range oe.Plugin.Config.Interface.Types {
+		if e.Interfaces[in.Capability] {
+			ifaceMatch = true
+			break
+		}
+	}
+	return ifaceMatch
+}
+
+// EventRemove is an event which is emitted when a plugin is removed
+// It maches on the passed in plugin's ID only.
+type EventRemove struct {
+	Plugin types.Plugin
+}
+
+func (e EventRemove) matches(observed Event) bool {
+	oe, ok := observed.(EventRemove)
+	if !ok {
+		return false
+	}
+	return e.Plugin.ID == oe.Plugin.ID
+}
+
+// EventDisable is an event that is emitted when a plugin is disabled
+// It maches on the passed in plugin's ID only.
+type EventDisable struct {
+	Plugin types.Plugin
+}
+
+func (e EventDisable) matches(observed Event) bool {
+	oe, ok := observed.(EventDisable)
+	if !ok {
+		return false
+	}
+	return e.Plugin.ID == oe.Plugin.ID
+}
+
+// EventEnable is an event that is emitted when a plugin is disabled
+// It maches on the passed in plugin's ID only.
+type EventEnable struct {
+	Plugin types.Plugin
+}
+
+func (e EventEnable) matches(observed Event) bool {
+	oe, ok := observed.(EventEnable)
+	if !ok {
+		return false
+	}
+	return e.Plugin.ID == oe.Plugin.ID
+}
+
+// SubscribeEvents provides an event channel to listen for structured events from
+// the plugin manager actions, CRUD operations.
+// The caller must call the returned `cancel()` function once done with the channel
+// or this will leak resources.
+func (pm *Manager) SubscribeEvents(buffer int, watchEvents ...Event) (eventCh <-chan interface{}, cancel func()) {
+	topic := func(i interface{}) bool {
+		observed, ok := i.(Event)
+		if !ok {
+			panic(fmt.Sprintf("unexpected type passed to event channel: %v", reflect.TypeOf(i)))
+		}
+		for _, e := range watchEvents {
+			if e.matches(observed) {
+				return true
+			}
+		}
+		// If no specific events are specified always assume a matched event
+		// If some events were specified and none matched above, then the event
+		// doesn't match
+		return watchEvents == nil
+	}
+	ch := pm.publisher.SubscribeTopicWithBuffer(topic, buffer)
+	cancelFunc := func() { pm.publisher.Evict(ch) }
+	return ch, cancelFunc
+}

+ 9 - 0
plugin/manager.go

@@ -22,6 +22,7 @@ import (
 	"github.com/docker/docker/pkg/authorization"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/mount"
+	"github.com/docker/docker/pkg/pubsub"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/plugin/v2"
 	"github.com/docker/docker/registry"
@@ -63,6 +64,7 @@ type Manager struct {
 	cMap             map[*v2.Plugin]*controller
 	containerdClient libcontainerd.Client
 	blobStore        *basicBlobStore
+	publisher        *pubsub.Publisher
 }
 
 // controller represents the manager's control on a plugin.
@@ -117,6 +119,8 @@ func NewManager(config ManagerConfig) (*Manager, error) {
 	if err := manager.reload(); err != nil {
 		return nil, errors.Wrap(err, "failed to restore plugins")
 	}
+
+	manager.publisher = pubsub.NewPublisher(0, 0)
 	return manager, nil
 }
 
@@ -268,6 +272,11 @@ func (pm *Manager) reload() error { // todo: restore
 	return nil
 }
 
+// Get looks up the requested plugin in the store.
+func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
+	return pm.config.Store.GetV2Plugin(idOrName)
+}
+
 func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
 	p := filepath.Join(pm.config.Root, id, configFileName)
 	dt, err := ioutil.ReadFile(p)

+ 4 - 1
plugin/manager_linux.go

@@ -274,7 +274,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.
 }
 
 // createPlugin creates a new plugin. take lock before calling.
-func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges) (p *v2.Plugin, err error) {
+func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
 	if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store
 		return nil, err
 	}
@@ -294,6 +294,9 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum
 		Blobsums: blobsums,
 	}
 	p.InitEmptySettings()
+	for _, o := range opts {
+		o(p)
+	}
 
 	pdir := filepath.Join(pm.config.Root, p.PluginObj.ID)
 	if err := os.MkdirAll(pdir, 0700); err != nil {

+ 2 - 0
plugin/v2/plugin.go

@@ -22,6 +22,8 @@ type Plugin struct {
 
 	Config   digest.Digest
 	Blobsums []digest.Digest
+
+	SwarmServiceID string
 }
 
 const defaultPluginRuntimeDestination = "/run/docker/plugins"