controller.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package plugin // import "github.com/docker/docker/daemon/cluster/controllers/plugin"
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "github.com/containerd/containerd/log"
  7. "github.com/distribution/reference"
  8. "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/api/types/registry"
  10. "github.com/docker/docker/api/types/swarm/runtime"
  11. "github.com/docker/docker/errdefs"
  12. "github.com/docker/docker/plugin"
  13. v2 "github.com/docker/docker/plugin/v2"
  14. "github.com/gogo/protobuf/proto"
  15. "github.com/moby/swarmkit/v2/api"
  16. "github.com/pkg/errors"
  17. "github.com/sirupsen/logrus"
  18. )
  19. // Controller is the controller for the plugin backend.
  20. // Plugins are managed as a singleton object with a desired state (different from containers).
  21. // With the plugin controller instead of having a strict create->start->stop->remove
  22. // task lifecycle like containers, we manage the desired state of the plugin and let
  23. // the plugin manager do what it already does and monitor the plugin.
  24. // We'll also end up with many tasks all pointing to the same plugin ID.
  25. //
  26. // TODO(@cpuguy83): registry auth is intentionally not supported until we work out
  27. // the right way to pass registry credentials via secrets.
  28. type Controller struct {
  29. backend Backend
  30. spec runtime.PluginSpec
  31. logger *logrus.Entry
  32. pluginID string
  33. serviceID string
  34. // hook used to signal tests that `Wait()` is actually ready and waiting
  35. signalWaitReady func()
  36. }
  37. // Backend is the interface for interacting with the plugin manager
  38. // Controller actions are passed to the configured backend to do the real work.
  39. type Backend interface {
  40. Disable(name string, config *types.PluginDisableConfig) error
  41. Enable(name string, config *types.PluginEnableConfig) error
  42. Remove(name string, config *types.PluginRmConfig) error
  43. Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *registry.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
  44. Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *registry.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) error
  45. Get(name string) (*v2.Plugin, error)
  46. SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
  47. }
  48. // NewController returns a new cluster plugin controller
  49. func NewController(backend Backend, t *api.Task) (*Controller, error) {
  50. spec, err := readSpec(t)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &Controller{
  55. backend: backend,
  56. spec: spec,
  57. serviceID: t.ServiceID,
  58. logger: log.G(context.TODO()).WithFields(log.Fields{
  59. "controller": "plugin",
  60. "task": t.ID,
  61. "plugin": spec.Name,
  62. }),
  63. }, nil
  64. }
  65. func readSpec(t *api.Task) (runtime.PluginSpec, error) {
  66. var cfg runtime.PluginSpec
  67. generic := t.Spec.GetGeneric()
  68. if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
  69. return cfg, errors.Wrap(err, "error reading plugin spec")
  70. }
  71. return cfg, nil
  72. }
  73. // Update is the update phase from swarmkit
  74. func (p *Controller) Update(ctx context.Context, t *api.Task) error {
  75. p.logger.Debug("Update")
  76. return nil
  77. }
  78. // Prepare is the prepare phase from swarmkit
  79. func (p *Controller) Prepare(ctx context.Context) (err error) {
  80. p.logger.Debug("Prepare")
  81. remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
  82. if err != nil {
  83. return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
  84. }
  85. if p.spec.Name == "" {
  86. p.spec.Name = remote.String()
  87. }
  88. var authConfig registry.AuthConfig
  89. privs := convertPrivileges(p.spec.Privileges)
  90. pl, err := p.backend.Get(p.spec.Name)
  91. defer func() {
  92. if pl != nil && err == nil {
  93. pl.Acquire()
  94. }
  95. }()
  96. if err == nil && pl != nil {
  97. if pl.SwarmServiceID != p.serviceID {
  98. return errors.Errorf("plugin already exists: %s", p.spec.Name)
  99. }
  100. if pl.IsEnabled() {
  101. if err := p.backend.Disable(pl.GetID(), &types.PluginDisableConfig{ForceDisable: true}); err != nil {
  102. p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
  103. }
  104. }
  105. p.pluginID = pl.GetID()
  106. return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard)
  107. }
  108. if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, io.Discard, plugin.WithSwarmService(p.serviceID), plugin.WithEnv(p.spec.Env)); err != nil {
  109. return err
  110. }
  111. pl, err = p.backend.Get(p.spec.Name)
  112. if err != nil {
  113. return err
  114. }
  115. p.pluginID = pl.GetID()
  116. return nil
  117. }
  118. // Start is the start phase from swarmkit
  119. func (p *Controller) Start(ctx context.Context) error {
  120. p.logger.Debug("Start")
  121. pl, err := p.backend.Get(p.pluginID)
  122. if err != nil {
  123. return err
  124. }
  125. if p.spec.Disabled {
  126. if pl.IsEnabled() {
  127. return p.backend.Disable(p.pluginID, &types.PluginDisableConfig{ForceDisable: false})
  128. }
  129. return nil
  130. }
  131. if !pl.IsEnabled() {
  132. return p.backend.Enable(p.pluginID, &types.PluginEnableConfig{Timeout: 30})
  133. }
  134. return nil
  135. }
  136. // Wait causes the task to wait until returned
  137. func (p *Controller) Wait(ctx context.Context) error {
  138. p.logger.Debug("Wait")
  139. pl, err := p.backend.Get(p.pluginID)
  140. if err != nil {
  141. return err
  142. }
  143. events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
  144. defer cancel()
  145. if p.signalWaitReady != nil {
  146. p.signalWaitReady()
  147. }
  148. if !p.spec.Disabled != pl.IsEnabled() {
  149. return errors.New("mismatched plugin state")
  150. }
  151. for {
  152. select {
  153. case <-ctx.Done():
  154. return ctx.Err()
  155. case e := <-events:
  156. p.logger.Debugf("got event %T", e)
  157. switch e.(type) {
  158. case plugin.EventEnable:
  159. if p.spec.Disabled {
  160. return errors.New("plugin enabled")
  161. }
  162. case plugin.EventRemove:
  163. return errors.New("plugin removed")
  164. case plugin.EventDisable:
  165. if !p.spec.Disabled {
  166. return errors.New("plugin disabled")
  167. }
  168. }
  169. }
  170. }
  171. }
  172. func isNotFound(err error) bool {
  173. return errdefs.IsNotFound(err)
  174. }
  175. // Shutdown is the shutdown phase from swarmkit
  176. func (p *Controller) Shutdown(ctx context.Context) error {
  177. p.logger.Debug("Shutdown")
  178. return nil
  179. }
  180. // Terminate is the terminate phase from swarmkit
  181. func (p *Controller) Terminate(ctx context.Context) error {
  182. p.logger.Debug("Terminate")
  183. return nil
  184. }
  185. // Remove is the remove phase from swarmkit
  186. func (p *Controller) Remove(ctx context.Context) error {
  187. p.logger.Debug("Remove")
  188. pl, err := p.backend.Get(p.pluginID)
  189. if err != nil {
  190. if isNotFound(err) {
  191. return nil
  192. }
  193. return err
  194. }
  195. pl.Release()
  196. if pl.GetRefCount() > 0 {
  197. p.logger.Debug("skipping remove due to ref count")
  198. return nil
  199. }
  200. // This may error because we have exactly 1 plugin, but potentially multiple
  201. // tasks which are calling remove.
  202. err = p.backend.Remove(p.pluginID, &types.PluginRmConfig{ForceRemove: true})
  203. if isNotFound(err) {
  204. return nil
  205. }
  206. return err
  207. }
  208. // Close is the close phase from swarmkit
  209. func (p *Controller) Close() error {
  210. p.logger.Debug("Close")
  211. return nil
  212. }
  213. func convertPrivileges(ls []*runtime.PluginPrivilege) types.PluginPrivileges {
  214. var out types.PluginPrivileges
  215. for _, p := range ls {
  216. pp := types.PluginPrivilege{
  217. Name: p.Name,
  218. Description: p.Description,
  219. Value: p.Value,
  220. }
  221. out = append(out, pp)
  222. }
  223. return out
  224. }