controller.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package plugin
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "net/http"
  6. "github.com/docker/distribution/reference"
  7. enginetypes "github.com/docker/docker/api/types"
  8. "github.com/docker/docker/api/types/swarm/runtime"
  9. "github.com/docker/docker/errdefs"
  10. "github.com/docker/docker/plugin"
  11. "github.com/docker/docker/plugin/v2"
  12. "github.com/docker/swarmkit/api"
  13. "github.com/gogo/protobuf/proto"
  14. "github.com/pkg/errors"
  15. "github.com/sirupsen/logrus"
  16. "golang.org/x/net/context"
  17. )
  18. // Controller is the controller for the plugin backend.
  19. // Plugins are managed as a singleton object with a desired state (different from containers).
  20. // With the the plugin controller instead of having a strict create->start->stop->remove
  21. // task lifecycle like containers, we manage the desired state of the plugin and let
  22. // the plugin manager do what it already does and monitor the plugin.
  23. // We'll also end up with many tasks all pointing to the same plugin ID.
  24. //
  25. // TODO(@cpuguy83): registry auth is intentionally not supported until we work out
  26. // the right way to pass registry crednetials via secrets.
  27. type Controller struct {
  28. backend Backend
  29. spec runtime.PluginSpec
  30. logger *logrus.Entry
  31. pluginID string
  32. serviceID string
  33. taskID string
  34. // hook used to signal tests that `Wait()` is actually ready and waiting
  35. signalWaitReady func()
  36. }
  37. // Backend is the interface for interacting with the plugin manager
  38. // Controller actions are passed to the configured backend to do the real work.
  39. type Backend interface {
  40. Disable(name string, config *enginetypes.PluginDisableConfig) error
  41. Enable(name string, config *enginetypes.PluginEnableConfig) error
  42. Remove(name string, config *enginetypes.PluginRmConfig) error
  43. Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
  44. Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
  45. Get(name string) (*v2.Plugin, error)
  46. SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
  47. }
  48. // NewController returns a new cluster plugin controller
  49. func NewController(backend Backend, t *api.Task) (*Controller, error) {
  50. spec, err := readSpec(t)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &Controller{
  55. backend: backend,
  56. spec: spec,
  57. serviceID: t.ServiceID,
  58. logger: logrus.WithFields(logrus.Fields{
  59. "controller": "plugin",
  60. "task": t.ID,
  61. "plugin": spec.Name,
  62. })}, nil
  63. }
  64. func readSpec(t *api.Task) (runtime.PluginSpec, error) {
  65. var cfg runtime.PluginSpec
  66. generic := t.Spec.GetGeneric()
  67. if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
  68. return cfg, errors.Wrap(err, "error reading plugin spec")
  69. }
  70. return cfg, nil
  71. }
  72. // Update is the update phase from swarmkit
  73. func (p *Controller) Update(ctx context.Context, t *api.Task) error {
  74. p.logger.Debug("Update")
  75. return nil
  76. }
  77. // Prepare is the prepare phase from swarmkit
  78. func (p *Controller) Prepare(ctx context.Context) (err error) {
  79. p.logger.Debug("Prepare")
  80. remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
  81. if err != nil {
  82. return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
  83. }
  84. if p.spec.Name == "" {
  85. p.spec.Name = remote.String()
  86. }
  87. var authConfig enginetypes.AuthConfig
  88. privs := convertPrivileges(p.spec.Privileges)
  89. pl, err := p.backend.Get(p.spec.Name)
  90. defer func() {
  91. if pl != nil && err == nil {
  92. pl.Acquire()
  93. }
  94. }()
  95. if err == nil && pl != nil {
  96. if pl.SwarmServiceID != p.serviceID {
  97. return errors.Errorf("plugin already exists: %s", p.spec.Name)
  98. }
  99. if pl.IsEnabled() {
  100. if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
  101. p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
  102. }
  103. }
  104. p.pluginID = pl.GetID()
  105. return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
  106. }
  107. if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID)); err != nil {
  108. return err
  109. }
  110. pl, err = p.backend.Get(p.spec.Name)
  111. if err != nil {
  112. return err
  113. }
  114. p.pluginID = pl.GetID()
  115. return nil
  116. }
  117. // Start is the start phase from swarmkit
  118. func (p *Controller) Start(ctx context.Context) error {
  119. p.logger.Debug("Start")
  120. pl, err := p.backend.Get(p.pluginID)
  121. if err != nil {
  122. return err
  123. }
  124. if p.spec.Disabled {
  125. if pl.IsEnabled() {
  126. return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
  127. }
  128. return nil
  129. }
  130. if !pl.IsEnabled() {
  131. return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
  132. }
  133. return nil
  134. }
  135. // Wait causes the task to wait until returned
  136. func (p *Controller) Wait(ctx context.Context) error {
  137. p.logger.Debug("Wait")
  138. pl, err := p.backend.Get(p.pluginID)
  139. if err != nil {
  140. return err
  141. }
  142. events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
  143. defer cancel()
  144. if p.signalWaitReady != nil {
  145. p.signalWaitReady()
  146. }
  147. if !p.spec.Disabled != pl.IsEnabled() {
  148. return errors.New("mismatched plugin state")
  149. }
  150. for {
  151. select {
  152. case <-ctx.Done():
  153. return ctx.Err()
  154. case e := <-events:
  155. p.logger.Debugf("got event %#T", e)
  156. switch e.(type) {
  157. case plugin.EventEnable:
  158. if p.spec.Disabled {
  159. return errors.New("plugin enabled")
  160. }
  161. case plugin.EventRemove:
  162. return errors.New("plugin removed")
  163. case plugin.EventDisable:
  164. if !p.spec.Disabled {
  165. return errors.New("plugin disabled")
  166. }
  167. }
  168. }
  169. }
  170. }
  171. func isNotFound(err error) bool {
  172. return errdefs.IsNotFound(err)
  173. }
  174. // Shutdown is the shutdown phase from swarmkit
  175. func (p *Controller) Shutdown(ctx context.Context) error {
  176. p.logger.Debug("Shutdown")
  177. return nil
  178. }
  179. // Terminate is the terminate phase from swarmkit
  180. func (p *Controller) Terminate(ctx context.Context) error {
  181. p.logger.Debug("Terminate")
  182. return nil
  183. }
  184. // Remove is the remove phase from swarmkit
  185. func (p *Controller) Remove(ctx context.Context) error {
  186. p.logger.Debug("Remove")
  187. pl, err := p.backend.Get(p.pluginID)
  188. if err != nil {
  189. if isNotFound(err) {
  190. return nil
  191. }
  192. return err
  193. }
  194. pl.Release()
  195. if pl.GetRefCount() > 0 {
  196. p.logger.Debug("skipping remove due to ref count")
  197. return nil
  198. }
  199. // This may error because we have exactly 1 plugin, but potentially multiple
  200. // tasks which are calling remove.
  201. err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
  202. if isNotFound(err) {
  203. return nil
  204. }
  205. return err
  206. }
  207. // Close is the close phase from swarmkit
  208. func (p *Controller) Close() error {
  209. p.logger.Debug("Close")
  210. return nil
  211. }
  212. func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
  213. var out enginetypes.PluginPrivileges
  214. for _, p := range ls {
  215. pp := enginetypes.PluginPrivilege{
  216. Name: p.Name,
  217. Description: p.Description,
  218. Value: p.Value,
  219. }
  220. out = append(out, pp)
  221. }
  222. return out
  223. }