123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- package plugin // import "github.com/docker/docker/daemon/cluster/controllers/plugin"
- import (
- "context"
- "io"
- "net/http"
- "github.com/containerd/containerd/log"
- "github.com/distribution/reference"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/registry"
- "github.com/docker/docker/api/types/swarm/runtime"
- "github.com/docker/docker/errdefs"
- "github.com/docker/docker/plugin"
- v2 "github.com/docker/docker/plugin/v2"
- "github.com/gogo/protobuf/proto"
- "github.com/moby/swarmkit/v2/api"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- )
- // Controller is the controller for the plugin backend.
- // Plugins are managed as a singleton object with a desired state (different from containers).
- // With the plugin controller instead of having a strict create->start->stop->remove
- // task lifecycle like containers, we manage the desired state of the plugin and let
- // the plugin manager do what it already does and monitor the plugin.
- // We'll also end up with many tasks all pointing to the same plugin ID.
- //
- // TODO(@cpuguy83): registry auth is intentionally not supported until we work out
- // the right way to pass registry credentials via secrets.
- type Controller struct {
- backend Backend
- spec runtime.PluginSpec
- logger *logrus.Entry
- pluginID string
- serviceID string
- // hook used to signal tests that `Wait()` is actually ready and waiting
- signalWaitReady func()
- }
- // Backend is the interface for interacting with the plugin manager
- // Controller actions are passed to the configured backend to do the real work.
- type Backend interface {
- Disable(name string, config *types.PluginDisableConfig) error
- Enable(name string, config *types.PluginEnableConfig) error
- Remove(name string, config *types.PluginRmConfig) error
- Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *registry.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
- Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *registry.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) error
- Get(name string) (*v2.Plugin, error)
- SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
- }
- // NewController returns a new cluster plugin controller
- func NewController(backend Backend, t *api.Task) (*Controller, error) {
- spec, err := readSpec(t)
- if err != nil {
- return nil, err
- }
- return &Controller{
- backend: backend,
- spec: spec,
- serviceID: t.ServiceID,
- logger: log.G(context.TODO()).WithFields(log.Fields{
- "controller": "plugin",
- "task": t.ID,
- "plugin": spec.Name,
- }),
- }, nil
- }
- func readSpec(t *api.Task) (runtime.PluginSpec, error) {
- var cfg runtime.PluginSpec
- generic := t.Spec.GetGeneric()
- if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
- return cfg, errors.Wrap(err, "error reading plugin spec")
- }
- return cfg, nil
- }
- // Update is the update phase from swarmkit
- func (p *Controller) Update(ctx context.Context, t *api.Task) error {
- p.logger.Debug("Update")
- return nil
- }
- // Prepare is the prepare phase from swarmkit
- func (p *Controller) Prepare(ctx context.Context) (err error) {
- p.logger.Debug("Prepare")
- remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
- if err != nil {
- return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
- }
- if p.spec.Name == "" {
- p.spec.Name = remote.String()
- }
- var authConfig registry.AuthConfig
- privs := convertPrivileges(p.spec.Privileges)
- pl, err := p.backend.Get(p.spec.Name)
- defer func() {
- if pl != nil && err == nil {
- pl.Acquire()
- }
- }()
- if err == nil && pl != nil {
- if pl.SwarmServiceID != p.serviceID {
- return errors.Errorf("plugin already exists: %s", p.spec.Name)
- }
- if pl.IsEnabled() {
- if err := p.backend.Disable(pl.GetID(), &types.PluginDisableConfig{ForceDisable: true}); err != nil {
- p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
- }
- }
- p.pluginID = pl.GetID()
- return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard)
- }
- if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard, plugin.WithSwarmService(p.serviceID), plugin.WithEnv(p.spec.Env)); err != nil {
- return err
- }
- pl, err = p.backend.Get(p.spec.Name)
- if err != nil {
- return err
- }
- p.pluginID = pl.GetID()
- return nil
- }
- // Start is the start phase from swarmkit
- func (p *Controller) Start(ctx context.Context) error {
- p.logger.Debug("Start")
- pl, err := p.backend.Get(p.pluginID)
- if err != nil {
- return err
- }
- if p.spec.Disabled {
- if pl.IsEnabled() {
- return p.backend.Disable(p.pluginID, &types.PluginDisableConfig{ForceDisable: false})
- }
- return nil
- }
- if !pl.IsEnabled() {
- return p.backend.Enable(p.pluginID, &types.PluginEnableConfig{Timeout: 30})
- }
- return nil
- }
- // Wait causes the task to wait until returned
- func (p *Controller) Wait(ctx context.Context) error {
- p.logger.Debug("Wait")
- pl, err := p.backend.Get(p.pluginID)
- if err != nil {
- return err
- }
- events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
- defer cancel()
- if p.signalWaitReady != nil {
- p.signalWaitReady()
- }
- if !p.spec.Disabled != pl.IsEnabled() {
- return errors.New("mismatched plugin state")
- }
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case e := <-events:
- p.logger.Debugf("got event %T", e)
- switch e.(type) {
- case plugin.EventEnable:
- if p.spec.Disabled {
- return errors.New("plugin enabled")
- }
- case plugin.EventRemove:
- return errors.New("plugin removed")
- case plugin.EventDisable:
- if !p.spec.Disabled {
- return errors.New("plugin disabled")
- }
- }
- }
- }
- }
- func isNotFound(err error) bool {
- return errdefs.IsNotFound(err)
- }
- // Shutdown is the shutdown phase from swarmkit
- func (p *Controller) Shutdown(ctx context.Context) error {
- p.logger.Debug("Shutdown")
- return nil
- }
- // Terminate is the terminate phase from swarmkit
- func (p *Controller) Terminate(ctx context.Context) error {
- p.logger.Debug("Terminate")
- return nil
- }
- // Remove is the remove phase from swarmkit
- func (p *Controller) Remove(ctx context.Context) error {
- p.logger.Debug("Remove")
- pl, err := p.backend.Get(p.pluginID)
- if err != nil {
- if isNotFound(err) {
- return nil
- }
- return err
- }
- pl.Release()
- if pl.GetRefCount() > 0 {
- p.logger.Debug("skipping remove due to ref count")
- return nil
- }
- // This may error because we have exactly 1 plugin, but potentially multiple
- // tasks which are calling remove.
- err = p.backend.Remove(p.pluginID, &types.PluginRmConfig{ForceRemove: true})
- if isNotFound(err) {
- return nil
- }
- return err
- }
- // Close is the close phase from swarmkit
- func (p *Controller) Close() error {
- p.logger.Debug("Close")
- return nil
- }
- func convertPrivileges(ls []*runtime.PluginPrivilege) types.PluginPrivileges {
- var out types.PluginPrivileges
- for _, p := range ls {
- pp := types.PluginPrivilege{
- Name: p.Name,
- Description: p.Description,
- Value: p.Value,
- }
- out = append(out, pp)
- }
- return out
- }
|