plugins.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Package plugins provides structures and helper functions to manage Docker
  2. // plugins.
  3. //
  4. // Docker discovers plugins by looking for them in the plugin directory whenever
  5. // a user or container tries to use one by name. UNIX domain socket files must
  6. // be located under /run/docker/plugins, whereas spec files can be located
  7. // either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
  8. // by the Registry interface, which lets you list all plugins or get a plugin by
  9. // its name if it exists.
  10. //
  11. // The plugins need to implement an HTTP server and bind this to the UNIX socket
  12. // or the address specified in the spec files.
  13. // A handshake is send at /Plugin.Activate, and plugins are expected to return
  14. // a Manifest with a list of Docker subsystems which this plugin implements.
  15. //
  16. // In order to use a plugins, you can use the `Get` with the name of the
  17. // plugin and the subsystem it implements.
  18. //
  19. // plugin, err := plugins.Get("example", "VolumeDriver")
  20. // if err != nil {
  21. // return fmt.Errorf("Error looking up volume plugin example: %v", err)
  22. // }
  23. package plugins // import "github.com/docker/docker/pkg/plugins"
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "sync"
  29. "time"
  30. "github.com/containerd/containerd/log"
  31. "github.com/docker/go-connections/tlsconfig"
  32. )
  33. // ProtocolSchemeHTTPV1 is the name of the protocol used for interacting with plugins using this package.
  34. const ProtocolSchemeHTTPV1 = "moby.plugins.http/v1"
  35. // ErrNotImplements is returned if the plugin does not implement the requested driver.
  36. var ErrNotImplements = errors.New("Plugin does not implement the requested driver")
  37. type plugins struct {
  38. sync.Mutex
  39. plugins map[string]*Plugin
  40. }
  41. type extpointHandlers struct {
  42. sync.RWMutex
  43. extpointHandlers map[string][]func(string, *Client)
  44. }
  45. var (
  46. storage = plugins{plugins: make(map[string]*Plugin)}
  47. handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
  48. )
  49. // Manifest lists what a plugin implements.
  50. type Manifest struct {
  51. // List of subsystem the plugin implements.
  52. Implements []string
  53. }
  54. // Plugin is the definition of a docker plugin.
  55. type Plugin struct {
  56. // Name of the plugin
  57. name string
  58. // Address of the plugin
  59. Addr string
  60. // TLS configuration of the plugin
  61. TLSConfig *tlsconfig.Options
  62. // Client attached to the plugin
  63. client *Client
  64. // Manifest of the plugin (see above)
  65. Manifest *Manifest `json:"-"`
  66. // wait for activation to finish
  67. activateWait *sync.Cond
  68. // error produced by activation
  69. activateErr error
  70. // keeps track of callback handlers run against this plugin
  71. handlersRun bool
  72. }
  73. // Name returns the name of the plugin.
  74. func (p *Plugin) Name() string {
  75. return p.name
  76. }
  77. // Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
  78. func (p *Plugin) Client() *Client {
  79. return p.client
  80. }
  81. // Protocol returns the protocol name/version used for plugins in this package.
  82. func (p *Plugin) Protocol() string {
  83. return ProtocolSchemeHTTPV1
  84. }
  85. // IsV1 returns true for V1 plugins and false otherwise.
  86. func (p *Plugin) IsV1() bool {
  87. return true
  88. }
  89. // ScopedPath returns the path scoped to the plugin's rootfs.
  90. // For v1 plugins, this always returns the path unchanged as v1 plugins run directly on the host.
  91. func (p *Plugin) ScopedPath(s string) string {
  92. return s
  93. }
  94. // NewLocalPlugin creates a new local plugin.
  95. func NewLocalPlugin(name, addr string) *Plugin {
  96. return &Plugin{
  97. name: name,
  98. Addr: addr,
  99. // TODO: change to nil
  100. TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
  101. activateWait: sync.NewCond(&sync.Mutex{}),
  102. }
  103. }
  104. func (p *Plugin) activate() error {
  105. p.activateWait.L.Lock()
  106. if p.activated() {
  107. p.runHandlers()
  108. p.activateWait.L.Unlock()
  109. return p.activateErr
  110. }
  111. p.activateErr = p.activateWithLock()
  112. p.runHandlers()
  113. p.activateWait.L.Unlock()
  114. p.activateWait.Broadcast()
  115. return p.activateErr
  116. }
  117. // runHandlers runs the registered handlers for the implemented plugin types
  118. // This should only be run after activation, and while the activation lock is held.
  119. func (p *Plugin) runHandlers() {
  120. if !p.activated() {
  121. return
  122. }
  123. handlers.RLock()
  124. if !p.handlersRun {
  125. for _, iface := range p.Manifest.Implements {
  126. hdlrs, handled := handlers.extpointHandlers[iface]
  127. if !handled {
  128. continue
  129. }
  130. for _, handler := range hdlrs {
  131. handler(p.name, p.client)
  132. }
  133. }
  134. p.handlersRun = true
  135. }
  136. handlers.RUnlock()
  137. }
  138. // activated returns if the plugin has already been activated.
  139. // This should only be called with the activation lock held
  140. func (p *Plugin) activated() bool {
  141. return p.Manifest != nil
  142. }
  143. func (p *Plugin) activateWithLock() error {
  144. c, err := NewClient(p.Addr, p.TLSConfig)
  145. if err != nil {
  146. return err
  147. }
  148. p.client = c
  149. m := new(Manifest)
  150. if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
  151. return err
  152. }
  153. p.Manifest = m
  154. return nil
  155. }
  156. func (p *Plugin) waitActive() error {
  157. p.activateWait.L.Lock()
  158. for !p.activated() && p.activateErr == nil {
  159. p.activateWait.Wait()
  160. }
  161. p.activateWait.L.Unlock()
  162. return p.activateErr
  163. }
  164. func (p *Plugin) implements(kind string) bool {
  165. if p.Manifest == nil {
  166. return false
  167. }
  168. for _, driver := range p.Manifest.Implements {
  169. if driver == kind {
  170. return true
  171. }
  172. }
  173. return false
  174. }
  175. func loadWithRetry(name string, retry bool) (*Plugin, error) {
  176. registry := NewLocalRegistry()
  177. start := time.Now()
  178. var testTimeOut int
  179. if name == testNonExistingPlugin {
  180. // override the timeout in tests
  181. testTimeOut = 2
  182. }
  183. var retries int
  184. for {
  185. pl, err := registry.Plugin(name)
  186. if err != nil {
  187. if !retry {
  188. return nil, err
  189. }
  190. timeOff := backoff(retries)
  191. if abort(start, timeOff, testTimeOut) {
  192. return nil, err
  193. }
  194. retries++
  195. log.G(context.TODO()).Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
  196. time.Sleep(timeOff)
  197. continue
  198. }
  199. storage.Lock()
  200. if pl, exists := storage.plugins[name]; exists {
  201. storage.Unlock()
  202. return pl, pl.activate()
  203. }
  204. storage.plugins[name] = pl
  205. storage.Unlock()
  206. err = pl.activate()
  207. if err != nil {
  208. storage.Lock()
  209. delete(storage.plugins, name)
  210. storage.Unlock()
  211. }
  212. return pl, err
  213. }
  214. }
  215. func get(name string) (*Plugin, error) {
  216. storage.Lock()
  217. pl, ok := storage.plugins[name]
  218. storage.Unlock()
  219. if ok {
  220. return pl, pl.activate()
  221. }
  222. return loadWithRetry(name, true)
  223. }
  224. // Get returns the plugin given the specified name and requested implementation.
  225. func Get(name, imp string) (*Plugin, error) {
  226. if name == "" {
  227. return nil, errors.New("Unable to find plugin without name")
  228. }
  229. pl, err := get(name)
  230. if err != nil {
  231. return nil, err
  232. }
  233. if err := pl.waitActive(); err == nil && pl.implements(imp) {
  234. log.G(context.TODO()).Debugf("%s implements: %s", name, imp)
  235. return pl, nil
  236. }
  237. return nil, fmt.Errorf("%w: plugin=%q, requested implementation=%q", ErrNotImplements, name, imp)
  238. }
  239. // Handle adds the specified function to the extpointHandlers.
  240. func Handle(iface string, fn func(string, *Client)) {
  241. handlers.Lock()
  242. hdlrs, ok := handlers.extpointHandlers[iface]
  243. if !ok {
  244. hdlrs = []func(string, *Client){}
  245. }
  246. hdlrs = append(hdlrs, fn)
  247. handlers.extpointHandlers[iface] = hdlrs
  248. storage.Lock()
  249. for _, p := range storage.plugins {
  250. p.activateWait.L.Lock()
  251. if p.activated() && p.implements(iface) {
  252. p.handlersRun = false
  253. }
  254. p.activateWait.L.Unlock()
  255. }
  256. storage.Unlock()
  257. handlers.Unlock()
  258. }
  259. // GetAll returns all the plugins for the specified implementation
  260. func (l *LocalRegistry) GetAll(imp string) ([]*Plugin, error) {
  261. pluginNames, err := l.Scan()
  262. if err != nil {
  263. return nil, err
  264. }
  265. type plLoad struct {
  266. pl *Plugin
  267. err error
  268. }
  269. chPl := make(chan *plLoad, len(pluginNames))
  270. var wg sync.WaitGroup
  271. for _, name := range pluginNames {
  272. storage.Lock()
  273. pl, ok := storage.plugins[name]
  274. storage.Unlock()
  275. if ok {
  276. chPl <- &plLoad{pl, nil}
  277. continue
  278. }
  279. wg.Add(1)
  280. go func(name string) {
  281. defer wg.Done()
  282. pl, err := loadWithRetry(name, false)
  283. chPl <- &plLoad{pl, err}
  284. }(name)
  285. }
  286. wg.Wait()
  287. close(chPl)
  288. var out []*Plugin
  289. for pl := range chPl {
  290. if pl.err != nil {
  291. log.G(context.TODO()).Error(pl.err)
  292. continue
  293. }
  294. if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
  295. out = append(out, pl.pl)
  296. }
  297. }
  298. return out, nil
  299. }