plugins.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Package plugins provides structures and helper functions to manage Docker
  2. // plugins.
  3. //
  4. // Docker discovers plugins by looking for them in the plugin directory whenever
  5. // a user or container tries to use one by name. UNIX domain socket files must
  6. // be located under /run/docker/plugins, whereas spec files can be located
  7. // either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
  8. // by the Registry interface, which lets you list all plugins or get a plugin by
  9. // its name if it exists.
  10. //
  11. // The plugins need to implement an HTTP server and bind this to the UNIX socket
  12. // or the address specified in the spec files.
  13. // A handshake is send at /Plugin.Activate, and plugins are expected to return
  14. // a Manifest with a list of of Docker subsystems which this plugin implements.
  15. //
  16. // In order to use a plugins, you can use the ``Get`` with the name of the
  17. // plugin and the subsystem it implements.
  18. //
  19. // plugin, err := plugins.Get("example", "VolumeDriver")
  20. // if err != nil {
  21. // return fmt.Errorf("Error looking up volume plugin example: %v", err)
  22. // }
  23. package plugins
  24. import (
  25. "errors"
  26. "sync"
  27. "time"
  28. "github.com/Sirupsen/logrus"
  29. "github.com/docker/go-connections/tlsconfig"
  30. )
  31. var (
  32. // ErrNotImplements is returned if the plugin does not implement the requested driver.
  33. ErrNotImplements = errors.New("Plugin does not implement the requested driver")
  34. )
  35. type plugins struct {
  36. sync.Mutex
  37. plugins map[string]*Plugin
  38. }
  39. type extpointHandlers struct {
  40. sync.RWMutex
  41. extpointHandlers map[string][]func(string, *Client)
  42. }
  43. var (
  44. storage = plugins{plugins: make(map[string]*Plugin)}
  45. handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
  46. )
  47. // Manifest lists what a plugin implements.
  48. type Manifest struct {
  49. // List of subsystem the plugin implements.
  50. Implements []string
  51. }
  52. // Plugin is the definition of a docker plugin.
  53. type Plugin struct {
  54. // Name of the plugin
  55. name string
  56. // Address of the plugin
  57. Addr string
  58. // TLS configuration of the plugin
  59. TLSConfig *tlsconfig.Options
  60. // Client attached to the plugin
  61. client *Client
  62. // Manifest of the plugin (see above)
  63. Manifest *Manifest `json:"-"`
  64. // wait for activation to finish
  65. activateWait *sync.Cond
  66. // error produced by activation
  67. activateErr error
  68. // keeps track of callback handlers run against this plugin
  69. handlersRun bool
  70. }
  71. // BasePath returns the path to which all paths returned by the plugin are relative to.
  72. // For v1 plugins, this always returns the host's root directory.
  73. func (p *Plugin) BasePath() string {
  74. return "/"
  75. }
  76. // Name returns the name of the plugin.
  77. func (p *Plugin) Name() string {
  78. return p.name
  79. }
  80. // Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
  81. func (p *Plugin) Client() *Client {
  82. return p.client
  83. }
  84. // IsV1 returns true for V1 plugins and false otherwise.
  85. func (p *Plugin) IsV1() bool {
  86. return true
  87. }
  88. // NewLocalPlugin creates a new local plugin.
  89. func NewLocalPlugin(name, addr string) *Plugin {
  90. return &Plugin{
  91. name: name,
  92. Addr: addr,
  93. // TODO: change to nil
  94. TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
  95. activateWait: sync.NewCond(&sync.Mutex{}),
  96. }
  97. }
  98. func (p *Plugin) activate() error {
  99. p.activateWait.L.Lock()
  100. if p.activated() {
  101. p.runHandlers()
  102. p.activateWait.L.Unlock()
  103. return p.activateErr
  104. }
  105. p.activateErr = p.activateWithLock()
  106. p.runHandlers()
  107. p.activateWait.L.Unlock()
  108. p.activateWait.Broadcast()
  109. return p.activateErr
  110. }
  111. // runHandlers runs the registered handlers for the implemented plugin types
  112. // This should only be run after activation, and while the activation lock is held.
  113. func (p *Plugin) runHandlers() {
  114. if !p.activated() {
  115. return
  116. }
  117. handlers.RLock()
  118. if !p.handlersRun {
  119. for _, iface := range p.Manifest.Implements {
  120. hdlrs, handled := handlers.extpointHandlers[iface]
  121. if !handled {
  122. continue
  123. }
  124. for _, handler := range hdlrs {
  125. handler(p.name, p.client)
  126. }
  127. }
  128. p.handlersRun = true
  129. }
  130. handlers.RUnlock()
  131. }
  132. // activated returns if the plugin has already been activated.
  133. // This should only be called with the activation lock held
  134. func (p *Plugin) activated() bool {
  135. return p.Manifest != nil
  136. }
  137. func (p *Plugin) activateWithLock() error {
  138. c, err := NewClient(p.Addr, p.TLSConfig)
  139. if err != nil {
  140. return err
  141. }
  142. p.client = c
  143. m := new(Manifest)
  144. if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
  145. return err
  146. }
  147. p.Manifest = m
  148. return nil
  149. }
  150. func (p *Plugin) waitActive() error {
  151. p.activateWait.L.Lock()
  152. for !p.activated() {
  153. p.activateWait.Wait()
  154. }
  155. p.activateWait.L.Unlock()
  156. return p.activateErr
  157. }
  158. func (p *Plugin) implements(kind string) bool {
  159. if p.Manifest == nil {
  160. return false
  161. }
  162. for _, driver := range p.Manifest.Implements {
  163. if driver == kind {
  164. return true
  165. }
  166. }
  167. return false
  168. }
  169. func load(name string) (*Plugin, error) {
  170. return loadWithRetry(name, true)
  171. }
  172. func loadWithRetry(name string, retry bool) (*Plugin, error) {
  173. registry := newLocalRegistry()
  174. start := time.Now()
  175. var retries int
  176. for {
  177. pl, err := registry.Plugin(name)
  178. if err != nil {
  179. if !retry {
  180. return nil, err
  181. }
  182. timeOff := backoff(retries)
  183. if abort(start, timeOff) {
  184. return nil, err
  185. }
  186. retries++
  187. logrus.Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
  188. time.Sleep(timeOff)
  189. continue
  190. }
  191. storage.Lock()
  192. storage.plugins[name] = pl
  193. storage.Unlock()
  194. err = pl.activate()
  195. if err != nil {
  196. storage.Lock()
  197. delete(storage.plugins, name)
  198. storage.Unlock()
  199. }
  200. return pl, err
  201. }
  202. }
  203. func get(name string) (*Plugin, error) {
  204. storage.Lock()
  205. pl, ok := storage.plugins[name]
  206. storage.Unlock()
  207. if ok {
  208. return pl, pl.activate()
  209. }
  210. return load(name)
  211. }
  212. // Get returns the plugin given the specified name and requested implementation.
  213. func Get(name, imp string) (*Plugin, error) {
  214. pl, err := get(name)
  215. if err != nil {
  216. return nil, err
  217. }
  218. if err := pl.waitActive(); err == nil && pl.implements(imp) {
  219. logrus.Debugf("%s implements: %s", name, imp)
  220. return pl, nil
  221. }
  222. return nil, ErrNotImplements
  223. }
  224. // Handle adds the specified function to the extpointHandlers.
  225. func Handle(iface string, fn func(string, *Client)) {
  226. handlers.Lock()
  227. hdlrs, ok := handlers.extpointHandlers[iface]
  228. if !ok {
  229. hdlrs = []func(string, *Client){}
  230. }
  231. hdlrs = append(hdlrs, fn)
  232. handlers.extpointHandlers[iface] = hdlrs
  233. storage.Lock()
  234. for _, p := range storage.plugins {
  235. p.activateWait.L.Lock()
  236. if p.activated() && p.implements(iface) {
  237. p.handlersRun = false
  238. }
  239. p.activateWait.L.Unlock()
  240. }
  241. storage.Unlock()
  242. handlers.Unlock()
  243. }
  244. // GetAll returns all the plugins for the specified implementation
  245. func GetAll(imp string) ([]*Plugin, error) {
  246. pluginNames, err := Scan()
  247. if err != nil {
  248. return nil, err
  249. }
  250. type plLoad struct {
  251. pl *Plugin
  252. err error
  253. }
  254. chPl := make(chan *plLoad, len(pluginNames))
  255. var wg sync.WaitGroup
  256. for _, name := range pluginNames {
  257. if pl, ok := storage.plugins[name]; ok {
  258. chPl <- &plLoad{pl, nil}
  259. continue
  260. }
  261. wg.Add(1)
  262. go func(name string) {
  263. defer wg.Done()
  264. pl, err := loadWithRetry(name, false)
  265. chPl <- &plLoad{pl, err}
  266. }(name)
  267. }
  268. wg.Wait()
  269. close(chPl)
  270. var out []*Plugin
  271. for pl := range chPl {
  272. if pl.err != nil {
  273. logrus.Error(pl.err)
  274. continue
  275. }
  276. if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
  277. out = append(out, pl.pl)
  278. }
  279. }
  280. return out, nil
  281. }