123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- // Package plugins provides structures and helper functions to manage Docker
- // plugins.
- //
- // Docker discovers plugins by looking for them in the plugin directory whenever
- // a user or container tries to use one by name. UNIX domain socket files must
- // be located under /run/docker/plugins, whereas spec files can be located
- // either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
- // by the Registry interface, which lets you list all plugins or get a plugin by
- // its name if it exists.
- //
- // The plugins need to implement an HTTP server and bind this to the UNIX socket
- // or the address specified in the spec files.
- // A handshake is send at /Plugin.Activate, and plugins are expected to return
- // a Manifest with a list of of Docker subsystems which this plugin implements.
- //
- // In order to use a plugins, you can use the ``Get`` with the name of the
- // plugin and the subsystem it implements.
- //
- // plugin, err := plugins.Get("example", "VolumeDriver")
- // if err != nil {
- // return fmt.Errorf("Error looking up volume plugin example: %v", err)
- // }
- package plugins // import "github.com/docker/docker/pkg/plugins"
- import (
- "errors"
- "sync"
- "time"
- "github.com/docker/go-connections/tlsconfig"
- "github.com/sirupsen/logrus"
- )
- // ProtocolSchemeHTTPV1 is the name of the protocol used for interacting with plugins using this package.
- const ProtocolSchemeHTTPV1 = "moby.plugins.http/v1"
- var (
- // ErrNotImplements is returned if the plugin does not implement the requested driver.
- ErrNotImplements = errors.New("Plugin does not implement the requested driver")
- )
- type plugins struct {
- sync.Mutex
- plugins map[string]*Plugin
- }
- type extpointHandlers struct {
- sync.RWMutex
- extpointHandlers map[string][]func(string, *Client)
- }
- var (
- storage = plugins{plugins: make(map[string]*Plugin)}
- handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
- )
- // Manifest lists what a plugin implements.
- type Manifest struct {
- // List of subsystem the plugin implements.
- Implements []string
- }
- // Plugin is the definition of a docker plugin.
- type Plugin struct {
- // Name of the plugin
- name string
- // Address of the plugin
- Addr string
- // TLS configuration of the plugin
- TLSConfig *tlsconfig.Options
- // Client attached to the plugin
- client *Client
- // Manifest of the plugin (see above)
- Manifest *Manifest `json:"-"`
- // wait for activation to finish
- activateWait *sync.Cond
- // error produced by activation
- activateErr error
- // keeps track of callback handlers run against this plugin
- handlersRun bool
- }
- // Name returns the name of the plugin.
- func (p *Plugin) Name() string {
- return p.name
- }
- // Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
- func (p *Plugin) Client() *Client {
- return p.client
- }
- // Protocol returns the protocol name/version used for plugins in this package.
- func (p *Plugin) Protocol() string {
- return ProtocolSchemeHTTPV1
- }
- // IsV1 returns true for V1 plugins and false otherwise.
- func (p *Plugin) IsV1() bool {
- return true
- }
- // NewLocalPlugin creates a new local plugin.
- func NewLocalPlugin(name, addr string) *Plugin {
- return &Plugin{
- name: name,
- Addr: addr,
- // TODO: change to nil
- TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
- activateWait: sync.NewCond(&sync.Mutex{}),
- }
- }
- func (p *Plugin) activate() error {
- p.activateWait.L.Lock()
- if p.activated() {
- p.runHandlers()
- p.activateWait.L.Unlock()
- return p.activateErr
- }
- p.activateErr = p.activateWithLock()
- p.runHandlers()
- p.activateWait.L.Unlock()
- p.activateWait.Broadcast()
- return p.activateErr
- }
- // runHandlers runs the registered handlers for the implemented plugin types
- // This should only be run after activation, and while the activation lock is held.
- func (p *Plugin) runHandlers() {
- if !p.activated() {
- return
- }
- handlers.RLock()
- if !p.handlersRun {
- for _, iface := range p.Manifest.Implements {
- hdlrs, handled := handlers.extpointHandlers[iface]
- if !handled {
- continue
- }
- for _, handler := range hdlrs {
- handler(p.name, p.client)
- }
- }
- p.handlersRun = true
- }
- handlers.RUnlock()
- }
- // activated returns if the plugin has already been activated.
- // This should only be called with the activation lock held
- func (p *Plugin) activated() bool {
- return p.Manifest != nil
- }
- func (p *Plugin) activateWithLock() error {
- c, err := NewClient(p.Addr, p.TLSConfig)
- if err != nil {
- return err
- }
- p.client = c
- m := new(Manifest)
- if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
- return err
- }
- p.Manifest = m
- return nil
- }
- func (p *Plugin) waitActive() error {
- p.activateWait.L.Lock()
- for !p.activated() && p.activateErr == nil {
- p.activateWait.Wait()
- }
- p.activateWait.L.Unlock()
- return p.activateErr
- }
- func (p *Plugin) implements(kind string) bool {
- if p.Manifest == nil {
- return false
- }
- for _, driver := range p.Manifest.Implements {
- if driver == kind {
- return true
- }
- }
- return false
- }
- func load(name string) (*Plugin, error) {
- return loadWithRetry(name, true)
- }
- func loadWithRetry(name string, retry bool) (*Plugin, error) {
- registry := newLocalRegistry()
- start := time.Now()
- var retries int
- for {
- pl, err := registry.Plugin(name)
- if err != nil {
- if !retry {
- return nil, err
- }
- timeOff := backoff(retries)
- if abort(start, timeOff) {
- return nil, err
- }
- retries++
- logrus.Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
- time.Sleep(timeOff)
- continue
- }
- storage.Lock()
- if pl, exists := storage.plugins[name]; exists {
- storage.Unlock()
- return pl, pl.activate()
- }
- storage.plugins[name] = pl
- storage.Unlock()
- err = pl.activate()
- if err != nil {
- storage.Lock()
- delete(storage.plugins, name)
- storage.Unlock()
- }
- return pl, err
- }
- }
- func get(name string) (*Plugin, error) {
- storage.Lock()
- pl, ok := storage.plugins[name]
- storage.Unlock()
- if ok {
- return pl, pl.activate()
- }
- return load(name)
- }
- // Get returns the plugin given the specified name and requested implementation.
- func Get(name, imp string) (*Plugin, error) {
- pl, err := get(name)
- if err != nil {
- return nil, err
- }
- if err := pl.waitActive(); err == nil && pl.implements(imp) {
- logrus.Debugf("%s implements: %s", name, imp)
- return pl, nil
- }
- return nil, ErrNotImplements
- }
- // Handle adds the specified function to the extpointHandlers.
- func Handle(iface string, fn func(string, *Client)) {
- handlers.Lock()
- hdlrs, ok := handlers.extpointHandlers[iface]
- if !ok {
- hdlrs = []func(string, *Client){}
- }
- hdlrs = append(hdlrs, fn)
- handlers.extpointHandlers[iface] = hdlrs
- storage.Lock()
- for _, p := range storage.plugins {
- p.activateWait.L.Lock()
- if p.activated() && p.implements(iface) {
- p.handlersRun = false
- }
- p.activateWait.L.Unlock()
- }
- storage.Unlock()
- handlers.Unlock()
- }
- // GetAll returns all the plugins for the specified implementation
- func GetAll(imp string) ([]*Plugin, error) {
- pluginNames, err := Scan()
- if err != nil {
- return nil, err
- }
- type plLoad struct {
- pl *Plugin
- err error
- }
- chPl := make(chan *plLoad, len(pluginNames))
- var wg sync.WaitGroup
- for _, name := range pluginNames {
- storage.Lock()
- pl, ok := storage.plugins[name]
- storage.Unlock()
- if ok {
- chPl <- &plLoad{pl, nil}
- continue
- }
- wg.Add(1)
- go func(name string) {
- defer wg.Done()
- pl, err := loadWithRetry(name, false)
- chPl <- &plLoad{pl, err}
- }(name)
- }
- wg.Wait()
- close(chPl)
- var out []*Plugin
- for pl := range chPl {
- if pl.err != nil {
- logrus.Error(pl.err)
- continue
- }
- if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
- out = append(out, pl.pl)
- }
- }
- return out, nil
- }
|