plugins.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Package plugins provides structures and helper functions to manage Docker
  2. // plugins.
  3. //
  4. // Docker discovers plugins by looking for them in the plugin directory whenever
  5. // a user or container tries to use one by name. UNIX domain socket files must
  6. // be located under /run/docker/plugins, whereas spec files can be located
  7. // either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
  8. // by the Registry interface, which lets you list all plugins or get a plugin by
  9. // its name if it exists.
  10. //
  11. // The plugins need to implement an HTTP server and bind this to the UNIX socket
  12. // or the address specified in the spec files.
  13. // A handshake is send at /Plugin.Activate, and plugins are expected to return
  14. // a Manifest with a list of Docker subsystems which this plugin implements.
  15. //
  16. // In order to use a plugins, you can use the `Get` with the name of the
  17. // plugin and the subsystem it implements.
  18. //
  19. // plugin, err := plugins.Get("example", "VolumeDriver")
  20. // if err != nil {
  21. // return fmt.Errorf("Error looking up volume plugin example: %v", err)
  22. // }
  23. package plugins // import "github.com/docker/docker/pkg/plugins"
  24. import (
  25. "errors"
  26. "fmt"
  27. "sync"
  28. "time"
  29. "github.com/docker/go-connections/tlsconfig"
  30. "github.com/sirupsen/logrus"
  31. )
  32. // ProtocolSchemeHTTPV1 is the name of the protocol used for interacting with plugins using this package.
  33. const ProtocolSchemeHTTPV1 = "moby.plugins.http/v1"
  34. var (
  35. // ErrNotImplements is returned if the plugin does not implement the requested driver.
  36. ErrNotImplements = errors.New("Plugin does not implement the requested driver")
  37. )
  38. type plugins struct {
  39. sync.Mutex
  40. plugins map[string]*Plugin
  41. }
  42. type extpointHandlers struct {
  43. sync.RWMutex
  44. extpointHandlers map[string][]func(string, *Client)
  45. }
  46. var (
  47. storage = plugins{plugins: make(map[string]*Plugin)}
  48. handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
  49. )
  50. // Manifest lists what a plugin implements.
  51. type Manifest struct {
  52. // List of subsystem the plugin implements.
  53. Implements []string
  54. }
  55. // Plugin is the definition of a docker plugin.
  56. type Plugin struct {
  57. // Name of the plugin
  58. name string
  59. // Address of the plugin
  60. Addr string
  61. // TLS configuration of the plugin
  62. TLSConfig *tlsconfig.Options
  63. // Client attached to the plugin
  64. client *Client
  65. // Manifest of the plugin (see above)
  66. Manifest *Manifest `json:"-"`
  67. // wait for activation to finish
  68. activateWait *sync.Cond
  69. // error produced by activation
  70. activateErr error
  71. // keeps track of callback handlers run against this plugin
  72. handlersRun bool
  73. }
  74. // Name returns the name of the plugin.
  75. func (p *Plugin) Name() string {
  76. return p.name
  77. }
  78. // Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
  79. func (p *Plugin) Client() *Client {
  80. return p.client
  81. }
  82. // Protocol returns the protocol name/version used for plugins in this package.
  83. func (p *Plugin) Protocol() string {
  84. return ProtocolSchemeHTTPV1
  85. }
  86. // IsV1 returns true for V1 plugins and false otherwise.
  87. func (p *Plugin) IsV1() bool {
  88. return true
  89. }
  90. // NewLocalPlugin creates a new local plugin.
  91. func NewLocalPlugin(name, addr string) *Plugin {
  92. return &Plugin{
  93. name: name,
  94. Addr: addr,
  95. // TODO: change to nil
  96. TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
  97. activateWait: sync.NewCond(&sync.Mutex{}),
  98. }
  99. }
  100. func (p *Plugin) activate() error {
  101. p.activateWait.L.Lock()
  102. if p.activated() {
  103. p.runHandlers()
  104. p.activateWait.L.Unlock()
  105. return p.activateErr
  106. }
  107. p.activateErr = p.activateWithLock()
  108. p.runHandlers()
  109. p.activateWait.L.Unlock()
  110. p.activateWait.Broadcast()
  111. return p.activateErr
  112. }
  113. // runHandlers runs the registered handlers for the implemented plugin types
  114. // This should only be run after activation, and while the activation lock is held.
  115. func (p *Plugin) runHandlers() {
  116. if !p.activated() {
  117. return
  118. }
  119. handlers.RLock()
  120. if !p.handlersRun {
  121. for _, iface := range p.Manifest.Implements {
  122. hdlrs, handled := handlers.extpointHandlers[iface]
  123. if !handled {
  124. continue
  125. }
  126. for _, handler := range hdlrs {
  127. handler(p.name, p.client)
  128. }
  129. }
  130. p.handlersRun = true
  131. }
  132. handlers.RUnlock()
  133. }
  134. // activated returns if the plugin has already been activated.
  135. // This should only be called with the activation lock held
  136. func (p *Plugin) activated() bool {
  137. return p.Manifest != nil
  138. }
  139. func (p *Plugin) activateWithLock() error {
  140. c, err := NewClient(p.Addr, p.TLSConfig)
  141. if err != nil {
  142. return err
  143. }
  144. p.client = c
  145. m := new(Manifest)
  146. if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
  147. return err
  148. }
  149. p.Manifest = m
  150. return nil
  151. }
  152. func (p *Plugin) waitActive() error {
  153. p.activateWait.L.Lock()
  154. for !p.activated() && p.activateErr == nil {
  155. p.activateWait.Wait()
  156. }
  157. p.activateWait.L.Unlock()
  158. return p.activateErr
  159. }
  160. func (p *Plugin) implements(kind string) bool {
  161. if p.Manifest == nil {
  162. return false
  163. }
  164. for _, driver := range p.Manifest.Implements {
  165. if driver == kind {
  166. return true
  167. }
  168. }
  169. return false
  170. }
  171. func load(name string) (*Plugin, error) {
  172. return loadWithRetry(name, true)
  173. }
  174. func loadWithRetry(name string, retry bool) (*Plugin, error) {
  175. registry := newLocalRegistry()
  176. start := time.Now()
  177. var retries int
  178. for {
  179. pl, err := registry.Plugin(name)
  180. if err != nil {
  181. if !retry {
  182. return nil, err
  183. }
  184. timeOff := backoff(retries)
  185. if abort(start, timeOff) {
  186. return nil, err
  187. }
  188. retries++
  189. logrus.Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
  190. time.Sleep(timeOff)
  191. continue
  192. }
  193. storage.Lock()
  194. if pl, exists := storage.plugins[name]; exists {
  195. storage.Unlock()
  196. return pl, pl.activate()
  197. }
  198. storage.plugins[name] = pl
  199. storage.Unlock()
  200. err = pl.activate()
  201. if err != nil {
  202. storage.Lock()
  203. delete(storage.plugins, name)
  204. storage.Unlock()
  205. }
  206. return pl, err
  207. }
  208. }
  209. func get(name string) (*Plugin, error) {
  210. storage.Lock()
  211. pl, ok := storage.plugins[name]
  212. storage.Unlock()
  213. if ok {
  214. return pl, pl.activate()
  215. }
  216. return load(name)
  217. }
  218. // Get returns the plugin given the specified name and requested implementation.
  219. func Get(name, imp string) (*Plugin, error) {
  220. if name == "" {
  221. return nil, errors.New("Unable to find plugin without name")
  222. }
  223. pl, err := get(name)
  224. if err != nil {
  225. return nil, err
  226. }
  227. if err := pl.waitActive(); err == nil && pl.implements(imp) {
  228. logrus.Debugf("%s implements: %s", name, imp)
  229. return pl, nil
  230. }
  231. return nil, fmt.Errorf("%w: plugin=%q, requested implementation=%q", ErrNotImplements, name, imp)
  232. }
  233. // Handle adds the specified function to the extpointHandlers.
  234. func Handle(iface string, fn func(string, *Client)) {
  235. handlers.Lock()
  236. hdlrs, ok := handlers.extpointHandlers[iface]
  237. if !ok {
  238. hdlrs = []func(string, *Client){}
  239. }
  240. hdlrs = append(hdlrs, fn)
  241. handlers.extpointHandlers[iface] = hdlrs
  242. storage.Lock()
  243. for _, p := range storage.plugins {
  244. p.activateWait.L.Lock()
  245. if p.activated() && p.implements(iface) {
  246. p.handlersRun = false
  247. }
  248. p.activateWait.L.Unlock()
  249. }
  250. storage.Unlock()
  251. handlers.Unlock()
  252. }
  253. // GetAll returns all the plugins for the specified implementation
  254. func GetAll(imp string) ([]*Plugin, error) {
  255. pluginNames, err := Scan()
  256. if err != nil {
  257. return nil, err
  258. }
  259. type plLoad struct {
  260. pl *Plugin
  261. err error
  262. }
  263. chPl := make(chan *plLoad, len(pluginNames))
  264. var wg sync.WaitGroup
  265. for _, name := range pluginNames {
  266. storage.Lock()
  267. pl, ok := storage.plugins[name]
  268. storage.Unlock()
  269. if ok {
  270. chPl <- &plLoad{pl, nil}
  271. continue
  272. }
  273. wg.Add(1)
  274. go func(name string) {
  275. defer wg.Done()
  276. pl, err := loadWithRetry(name, false)
  277. chPl <- &plLoad{pl, err}
  278. }(name)
  279. }
  280. wg.Wait()
  281. close(chPl)
  282. var out []*Plugin
  283. for pl := range chPl {
  284. if pl.err != nil {
  285. logrus.Error(pl.err)
  286. continue
  287. }
  288. if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
  289. out = append(out, pl.pl)
  290. }
  291. }
  292. return out, nil
  293. }