plugins.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Package plugins provides structures and helper functions to manage Docker
  2. // plugins.
  3. //
  4. // Docker discovers plugins by looking for them in the plugin directory whenever
  5. // a user or container tries to use one by name. UNIX domain socket files must
  6. // be located under /run/docker/plugins, whereas spec files can be located
  7. // either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
  8. // by the Registry interface, which lets you list all plugins or get a plugin by
  9. // its name if it exists.
  10. //
  11. // The plugins need to implement an HTTP server and bind this to the UNIX socket
  12. // or the address specified in the spec files.
  13. // A handshake is send at /Plugin.Activate, and plugins are expected to return
  14. // a Manifest with a list of of Docker subsystems which this plugin implements.
  15. //
  16. // In order to use a plugins, you can use the ``Get`` with the name of the
  17. // plugin and the subsystem it implements.
  18. //
  19. // plugin, err := plugins.Get("example", "VolumeDriver")
  20. // if err != nil {
  21. // return fmt.Errorf("Error looking up volume plugin example: %v", err)
  22. // }
  23. package plugins
  24. import (
  25. "errors"
  26. "sync"
  27. "time"
  28. "github.com/Sirupsen/logrus"
  29. "github.com/docker/go-connections/tlsconfig"
  30. )
  31. var (
  32. // ErrNotImplements is returned if the plugin does not implement the requested driver.
  33. ErrNotImplements = errors.New("Plugin does not implement the requested driver")
  34. )
  35. type plugins struct {
  36. sync.Mutex
  37. plugins map[string]*Plugin
  38. }
  39. var (
  40. storage = plugins{plugins: make(map[string]*Plugin)}
  41. extpointHandlers = make(map[string]func(string, *Client))
  42. )
  43. // Manifest lists what a plugin implements.
  44. type Manifest struct {
  45. // List of subsystem the plugin implements.
  46. Implements []string
  47. }
  48. // Plugin is the definition of a docker plugin.
  49. type Plugin struct {
  50. // Name of the plugin
  51. name string
  52. // Address of the plugin
  53. Addr string
  54. // TLS configuration of the plugin
  55. TLSConfig *tlsconfig.Options
  56. // Client attached to the plugin
  57. client *Client
  58. // Manifest of the plugin (see above)
  59. Manifest *Manifest `json:"-"`
  60. // error produced by activation
  61. activateErr error
  62. // specifies if the activation sequence is completed (not if it is successful or not)
  63. activated bool
  64. // wait for activation to finish
  65. activateWait *sync.Cond
  66. }
  67. // Name returns the name of the plugin.
  68. func (p *Plugin) Name() string {
  69. return p.name
  70. }
  71. // Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
  72. func (p *Plugin) Client() *Client {
  73. return p.client
  74. }
  75. // NewLocalPlugin creates a new local plugin.
  76. func NewLocalPlugin(name, addr string) *Plugin {
  77. return &Plugin{
  78. name: name,
  79. Addr: addr,
  80. // TODO: change to nil
  81. TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
  82. activateWait: sync.NewCond(&sync.Mutex{}),
  83. }
  84. }
  85. func (p *Plugin) activate() error {
  86. p.activateWait.L.Lock()
  87. if p.activated {
  88. p.activateWait.L.Unlock()
  89. return p.activateErr
  90. }
  91. p.activateErr = p.activateWithLock()
  92. p.activated = true
  93. p.activateWait.L.Unlock()
  94. p.activateWait.Broadcast()
  95. return p.activateErr
  96. }
  97. func (p *Plugin) activateWithLock() error {
  98. c, err := NewClient(p.Addr, p.TLSConfig)
  99. if err != nil {
  100. return err
  101. }
  102. p.client = c
  103. m := new(Manifest)
  104. if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
  105. return err
  106. }
  107. p.Manifest = m
  108. for _, iface := range m.Implements {
  109. handler, handled := extpointHandlers[iface]
  110. if !handled {
  111. continue
  112. }
  113. handler(p.name, p.client)
  114. }
  115. return nil
  116. }
  117. func (p *Plugin) waitActive() error {
  118. p.activateWait.L.Lock()
  119. for !p.activated {
  120. p.activateWait.Wait()
  121. }
  122. p.activateWait.L.Unlock()
  123. return p.activateErr
  124. }
  125. func (p *Plugin) implements(kind string) bool {
  126. if err := p.waitActive(); err != nil {
  127. return false
  128. }
  129. for _, driver := range p.Manifest.Implements {
  130. if driver == kind {
  131. return true
  132. }
  133. }
  134. return false
  135. }
  136. func load(name string) (*Plugin, error) {
  137. return loadWithRetry(name, true)
  138. }
  139. func loadWithRetry(name string, retry bool) (*Plugin, error) {
  140. registry := newLocalRegistry()
  141. start := time.Now()
  142. var retries int
  143. for {
  144. pl, err := registry.Plugin(name)
  145. if err != nil {
  146. if !retry {
  147. return nil, err
  148. }
  149. timeOff := backoff(retries)
  150. if abort(start, timeOff) {
  151. return nil, err
  152. }
  153. retries++
  154. logrus.Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
  155. time.Sleep(timeOff)
  156. continue
  157. }
  158. storage.Lock()
  159. storage.plugins[name] = pl
  160. storage.Unlock()
  161. err = pl.activate()
  162. if err != nil {
  163. storage.Lock()
  164. delete(storage.plugins, name)
  165. storage.Unlock()
  166. }
  167. return pl, err
  168. }
  169. }
  170. func get(name string) (*Plugin, error) {
  171. storage.Lock()
  172. pl, ok := storage.plugins[name]
  173. storage.Unlock()
  174. if ok {
  175. return pl, pl.activate()
  176. }
  177. return load(name)
  178. }
  179. // Get returns the plugin given the specified name and requested implementation.
  180. func Get(name, imp string) (*Plugin, error) {
  181. pl, err := get(name)
  182. if err != nil {
  183. return nil, err
  184. }
  185. if pl.implements(imp) {
  186. logrus.Debugf("%s implements: %s", name, imp)
  187. return pl, nil
  188. }
  189. return nil, ErrNotImplements
  190. }
  191. // Handle adds the specified function to the extpointHandlers.
  192. func Handle(iface string, fn func(string, *Client)) {
  193. extpointHandlers[iface] = fn
  194. }
  195. // GetAll returns all the plugins for the specified implementation
  196. func GetAll(imp string) ([]*Plugin, error) {
  197. pluginNames, err := Scan()
  198. if err != nil {
  199. return nil, err
  200. }
  201. type plLoad struct {
  202. pl *Plugin
  203. err error
  204. }
  205. chPl := make(chan *plLoad, len(pluginNames))
  206. var wg sync.WaitGroup
  207. for _, name := range pluginNames {
  208. if pl, ok := storage.plugins[name]; ok {
  209. chPl <- &plLoad{pl, nil}
  210. continue
  211. }
  212. wg.Add(1)
  213. go func(name string) {
  214. defer wg.Done()
  215. pl, err := loadWithRetry(name, false)
  216. chPl <- &plLoad{pl, err}
  217. }(name)
  218. }
  219. wg.Wait()
  220. close(chPl)
  221. var out []*Plugin
  222. for pl := range chPl {
  223. if pl.err != nil {
  224. logrus.Error(pl.err)
  225. continue
  226. }
  227. if pl.pl.implements(imp) {
  228. out = append(out, pl.pl)
  229. }
  230. }
  231. return out, nil
  232. }