controller.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package plugin
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "net/http"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/docker/distribution/reference"
  8. enginetypes "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/api/types/swarm/runtime"
  10. "github.com/docker/docker/plugin"
  11. "github.com/docker/docker/plugin/v2"
  12. "github.com/docker/swarmkit/api"
  13. "github.com/gogo/protobuf/proto"
  14. "github.com/pkg/errors"
  15. "golang.org/x/net/context"
  16. )
  17. // Controller is the controller for the plugin backend.
  18. // Plugins are managed as a singleton object with a desired state (different from containers).
  19. // With the the plugin controller instead of having a strict create->start->stop->remove
  20. // task lifecycle like containers, we manage the desired state of the plugin and let
  21. // the plugin manager do what it already does and monitor the plugin.
  22. // We'll also end up with many tasks all pointing to the same plugin ID.
  23. //
  24. // TODO(@cpuguy83): registry auth is intentionally not supported until we work out
  25. // the right way to pass registry crednetials via secrets.
  26. type Controller struct {
  27. backend Backend
  28. spec runtime.PluginSpec
  29. logger *logrus.Entry
  30. pluginID string
  31. serviceID string
  32. taskID string
  33. // hook used to signal tests that `Wait()` is actually ready and waiting
  34. signalWaitReady func()
  35. }
  36. // Backend is the interface for interacting with the plugin manager
  37. // Controller actions are passed to the configured backend to do the real work.
  38. type Backend interface {
  39. Disable(name string, config *enginetypes.PluginDisableConfig) error
  40. Enable(name string, config *enginetypes.PluginEnableConfig) error
  41. Remove(name string, config *enginetypes.PluginRmConfig) error
  42. Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
  43. Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
  44. Get(name string) (*v2.Plugin, error)
  45. SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
  46. }
  47. // NewController returns a new cluster plugin controller
  48. func NewController(backend Backend, t *api.Task) (*Controller, error) {
  49. spec, err := readSpec(t)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return &Controller{
  54. backend: backend,
  55. spec: spec,
  56. serviceID: t.ServiceID,
  57. logger: logrus.WithFields(logrus.Fields{
  58. "controller": "plugin",
  59. "task": t.ID,
  60. "plugin": spec.Name,
  61. })}, nil
  62. }
  63. func readSpec(t *api.Task) (runtime.PluginSpec, error) {
  64. var cfg runtime.PluginSpec
  65. generic := t.Spec.GetGeneric()
  66. if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
  67. return cfg, errors.Wrap(err, "error reading plugin spec")
  68. }
  69. return cfg, nil
  70. }
  71. // Update is the update phase from swarmkit
  72. func (p *Controller) Update(ctx context.Context, t *api.Task) error {
  73. p.logger.Debug("Update")
  74. return nil
  75. }
  76. // Prepare is the prepare phase from swarmkit
  77. func (p *Controller) Prepare(ctx context.Context) (err error) {
  78. p.logger.Debug("Prepare")
  79. remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
  80. if err != nil {
  81. return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
  82. }
  83. if p.spec.Name == "" {
  84. p.spec.Name = remote.String()
  85. }
  86. var authConfig enginetypes.AuthConfig
  87. privs := convertPrivileges(p.spec.Privileges)
  88. pl, err := p.backend.Get(p.spec.Name)
  89. defer func() {
  90. if pl != nil && err == nil {
  91. pl.Acquire()
  92. }
  93. }()
  94. if err == nil && pl != nil {
  95. if pl.SwarmServiceID != p.serviceID {
  96. return errors.Errorf("plugin already exists: %s", p.spec.Name)
  97. }
  98. if pl.IsEnabled() {
  99. if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
  100. p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
  101. }
  102. }
  103. p.pluginID = pl.GetID()
  104. return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
  105. }
  106. if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID)); err != nil {
  107. return err
  108. }
  109. pl, err = p.backend.Get(p.spec.Name)
  110. if err != nil {
  111. return err
  112. }
  113. p.pluginID = pl.GetID()
  114. return nil
  115. }
  116. // Start is the start phase from swarmkit
  117. func (p *Controller) Start(ctx context.Context) error {
  118. p.logger.Debug("Start")
  119. pl, err := p.backend.Get(p.pluginID)
  120. if err != nil {
  121. return err
  122. }
  123. if p.spec.Disabled {
  124. if pl.IsEnabled() {
  125. return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
  126. }
  127. return nil
  128. }
  129. if !pl.IsEnabled() {
  130. return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
  131. }
  132. return nil
  133. }
  134. // Wait causes the task to wait until returned
  135. func (p *Controller) Wait(ctx context.Context) error {
  136. p.logger.Debug("Wait")
  137. pl, err := p.backend.Get(p.pluginID)
  138. if err != nil {
  139. return err
  140. }
  141. events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
  142. defer cancel()
  143. if p.signalWaitReady != nil {
  144. p.signalWaitReady()
  145. }
  146. if !p.spec.Disabled != pl.IsEnabled() {
  147. return errors.New("mismatched plugin state")
  148. }
  149. for {
  150. select {
  151. case <-ctx.Done():
  152. return ctx.Err()
  153. case e := <-events:
  154. p.logger.Debugf("got event %#T", e)
  155. switch e.(type) {
  156. case plugin.EventEnable:
  157. if p.spec.Disabled {
  158. return errors.New("plugin enabled")
  159. }
  160. case plugin.EventRemove:
  161. return errors.New("plugin removed")
  162. case plugin.EventDisable:
  163. if !p.spec.Disabled {
  164. return errors.New("plugin disabled")
  165. }
  166. }
  167. }
  168. }
  169. }
  170. func isNotFound(err error) bool {
  171. _, ok := errors.Cause(err).(plugin.ErrNotFound)
  172. return ok
  173. }
  174. // Shutdown is the shutdown phase from swarmkit
  175. func (p *Controller) Shutdown(ctx context.Context) error {
  176. p.logger.Debug("Shutdown")
  177. return nil
  178. }
  179. // Terminate is the terminate phase from swarmkit
  180. func (p *Controller) Terminate(ctx context.Context) error {
  181. p.logger.Debug("Terminate")
  182. return nil
  183. }
  184. // Remove is the remove phase from swarmkit
  185. func (p *Controller) Remove(ctx context.Context) error {
  186. p.logger.Debug("Remove")
  187. pl, err := p.backend.Get(p.pluginID)
  188. if err != nil {
  189. if isNotFound(err) {
  190. return nil
  191. }
  192. return err
  193. }
  194. pl.Release()
  195. if pl.GetRefCount() > 0 {
  196. p.logger.Debug("skipping remove due to ref count")
  197. return nil
  198. }
  199. // This may error because we have exactly 1 plugin, but potentially multiple
  200. // tasks which are calling remove.
  201. err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
  202. if isNotFound(err) {
  203. return nil
  204. }
  205. return err
  206. }
  207. // Close is the close phase from swarmkit
  208. func (p *Controller) Close() error {
  209. p.logger.Debug("Close")
  210. return nil
  211. }
  212. func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
  213. var out enginetypes.PluginPrivileges
  214. for _, p := range ls {
  215. pp := enginetypes.PluginPrivilege{
  216. Name: p.Name,
  217. Description: p.Description,
  218. Value: p.Value,
  219. }
  220. out = append(out, pp)
  221. }
  222. return out
  223. }