controller.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package plugin // import "github.com/docker/docker/daemon/cluster/controllers/plugin"
  2. import (
  3. "context"
  4. "io"
  5. "io/ioutil"
  6. "net/http"
  7. "github.com/docker/distribution/reference"
  8. enginetypes "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/api/types/swarm/runtime"
  10. "github.com/docker/docker/errdefs"
  11. "github.com/docker/docker/plugin"
  12. v2 "github.com/docker/docker/plugin/v2"
  13. "github.com/docker/swarmkit/api"
  14. "github.com/gogo/protobuf/proto"
  15. "github.com/pkg/errors"
  16. "github.com/sirupsen/logrus"
  17. )
  18. // Controller is the controller for the plugin backend.
  19. // Plugins are managed as a singleton object with a desired state (different from containers).
  20. // With the plugin controller instead of having a strict create->start->stop->remove
  21. // task lifecycle like containers, we manage the desired state of the plugin and let
  22. // the plugin manager do what it already does and monitor the plugin.
  23. // We'll also end up with many tasks all pointing to the same plugin ID.
  24. //
  25. // TODO(@cpuguy83): registry auth is intentionally not supported until we work out
  26. // the right way to pass registry credentials via secrets.
  27. type Controller struct {
  28. backend Backend
  29. spec runtime.PluginSpec
  30. logger *logrus.Entry
  31. pluginID string
  32. serviceID string
  33. // hook used to signal tests that `Wait()` is actually ready and waiting
  34. signalWaitReady func()
  35. }
  36. // Backend is the interface for interacting with the plugin manager
  37. // Controller actions are passed to the configured backend to do the real work.
  38. type Backend interface {
  39. Disable(name string, config *enginetypes.PluginDisableConfig) error
  40. Enable(name string, config *enginetypes.PluginEnableConfig) error
  41. Remove(name string, config *enginetypes.PluginRmConfig) error
  42. Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
  43. Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
  44. Get(name string) (*v2.Plugin, error)
  45. SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
  46. }
  47. // NewController returns a new cluster plugin controller
  48. func NewController(backend Backend, t *api.Task) (*Controller, error) {
  49. spec, err := readSpec(t)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return &Controller{
  54. backend: backend,
  55. spec: spec,
  56. serviceID: t.ServiceID,
  57. logger: logrus.WithFields(logrus.Fields{
  58. "controller": "plugin",
  59. "task": t.ID,
  60. "plugin": spec.Name,
  61. })}, nil
  62. }
  63. func readSpec(t *api.Task) (runtime.PluginSpec, error) {
  64. var cfg runtime.PluginSpec
  65. generic := t.Spec.GetGeneric()
  66. if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
  67. return cfg, errors.Wrap(err, "error reading plugin spec")
  68. }
  69. return cfg, nil
  70. }
  71. // Update is the update phase from swarmkit
  72. func (p *Controller) Update(ctx context.Context, t *api.Task) error {
  73. p.logger.Debug("Update")
  74. return nil
  75. }
  76. // Prepare is the prepare phase from swarmkit
  77. func (p *Controller) Prepare(ctx context.Context) (err error) {
  78. p.logger.Debug("Prepare")
  79. remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
  80. if err != nil {
  81. return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
  82. }
  83. if p.spec.Name == "" {
  84. p.spec.Name = remote.String()
  85. }
  86. var authConfig enginetypes.AuthConfig
  87. privs := convertPrivileges(p.spec.Privileges)
  88. pl, err := p.backend.Get(p.spec.Name)
  89. defer func() {
  90. if pl != nil && err == nil {
  91. pl.Acquire()
  92. }
  93. }()
  94. if err == nil && pl != nil {
  95. if pl.SwarmServiceID != p.serviceID {
  96. return errors.Errorf("plugin already exists: %s", p.spec.Name)
  97. }
  98. if pl.IsEnabled() {
  99. if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
  100. p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
  101. }
  102. }
  103. p.pluginID = pl.GetID()
  104. return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
  105. }
  106. if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID), plugin.WithEnv(p.spec.Env)); err != nil {
  107. return err
  108. }
  109. pl, err = p.backend.Get(p.spec.Name)
  110. if err != nil {
  111. return err
  112. }
  113. p.pluginID = pl.GetID()
  114. return nil
  115. }
  116. // Start is the start phase from swarmkit
  117. func (p *Controller) Start(ctx context.Context) error {
  118. p.logger.Debug("Start")
  119. pl, err := p.backend.Get(p.pluginID)
  120. if err != nil {
  121. return err
  122. }
  123. if p.spec.Disabled {
  124. if pl.IsEnabled() {
  125. return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
  126. }
  127. return nil
  128. }
  129. if !pl.IsEnabled() {
  130. return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
  131. }
  132. return nil
  133. }
  134. // Wait causes the task to wait until returned
  135. func (p *Controller) Wait(ctx context.Context) error {
  136. p.logger.Debug("Wait")
  137. pl, err := p.backend.Get(p.pluginID)
  138. if err != nil {
  139. return err
  140. }
  141. events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
  142. defer cancel()
  143. if p.signalWaitReady != nil {
  144. p.signalWaitReady()
  145. }
  146. if !p.spec.Disabled != pl.IsEnabled() {
  147. return errors.New("mismatched plugin state")
  148. }
  149. for {
  150. select {
  151. case <-ctx.Done():
  152. return ctx.Err()
  153. case e := <-events:
  154. p.logger.Debugf("got event %T", e)
  155. switch e.(type) {
  156. case plugin.EventEnable:
  157. if p.spec.Disabled {
  158. return errors.New("plugin enabled")
  159. }
  160. case plugin.EventRemove:
  161. return errors.New("plugin removed")
  162. case plugin.EventDisable:
  163. if !p.spec.Disabled {
  164. return errors.New("plugin disabled")
  165. }
  166. }
  167. }
  168. }
  169. }
  170. func isNotFound(err error) bool {
  171. return errdefs.IsNotFound(err)
  172. }
  173. // Shutdown is the shutdown phase from swarmkit
  174. func (p *Controller) Shutdown(ctx context.Context) error {
  175. p.logger.Debug("Shutdown")
  176. return nil
  177. }
  178. // Terminate is the terminate phase from swarmkit
  179. func (p *Controller) Terminate(ctx context.Context) error {
  180. p.logger.Debug("Terminate")
  181. return nil
  182. }
  183. // Remove is the remove phase from swarmkit
  184. func (p *Controller) Remove(ctx context.Context) error {
  185. p.logger.Debug("Remove")
  186. pl, err := p.backend.Get(p.pluginID)
  187. if err != nil {
  188. if isNotFound(err) {
  189. return nil
  190. }
  191. return err
  192. }
  193. pl.Release()
  194. if pl.GetRefCount() > 0 {
  195. p.logger.Debug("skipping remove due to ref count")
  196. return nil
  197. }
  198. // This may error because we have exactly 1 plugin, but potentially multiple
  199. // tasks which are calling remove.
  200. err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
  201. if isNotFound(err) {
  202. return nil
  203. }
  204. return err
  205. }
  206. // Close is the close phase from swarmkit
  207. func (p *Controller) Close() error {
  208. p.logger.Debug("Close")
  209. return nil
  210. }
  211. func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
  212. var out enginetypes.PluginPrivileges
  213. for _, p := range ls {
  214. pp := enginetypes.PluginPrivilege{
  215. Name: p.Name,
  216. Description: p.Description,
  217. Value: p.Value,
  218. }
  219. out = append(out, pp)
  220. }
  221. return out
  222. }