Просмотр исходного кода

Merge pull request #13222 from calavera/plugins_infra

Remote plugins plumbing.
Michael Crosby 10 лет назад
Родитель
Сommit
115b11ae69
5 измененных файлов с 449 добавлено и 0 удалено
  1. 100 0
      pkg/plugins/client.go
  2. 63 0
      pkg/plugins/client_test.go
  3. 78 0
      pkg/plugins/discovery.go
  4. 108 0
      pkg/plugins/discovery_test.go
  5. 100 0
      pkg/plugins/plugins.go

+ 100 - 0
pkg/plugins/client.go

@@ -0,0 +1,100 @@
+package plugins
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+)
+
+const (
+	versionMimetype = "appplication/vnd.docker.plugins.v1+json"
+	defaultTimeOut  = 30
+)
+
+func NewClient(addr string) *Client {
+	tr := &http.Transport{}
+	protoAndAddr := strings.Split(addr, "://")
+	configureTCPTransport(tr, protoAndAddr[0], protoAndAddr[1])
+	return &Client{&http.Client{Transport: tr}, protoAndAddr[1]}
+}
+
+type Client struct {
+	http *http.Client
+	addr string
+}
+
+func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
+	var buf bytes.Buffer
+	if err := json.NewEncoder(&buf).Encode(args); err != nil {
+		return err
+	}
+
+	req, err := http.NewRequest("POST", "/"+serviceMethod, &buf)
+	if err != nil {
+		return err
+	}
+	req.Header.Add("Accept", versionMimetype)
+	req.URL.Scheme = "http"
+	req.URL.Host = c.addr
+
+	var retries int
+	start := time.Now()
+
+	for {
+		resp, err := c.http.Do(req)
+		if err != nil {
+			timeOff := backoff(retries)
+			if timeOff+time.Since(start) > defaultTimeOut {
+				return err
+			}
+			retries++
+			logrus.Warn("Unable to connect to plugin: %s, retrying in %ds\n", c.addr, timeOff)
+			time.Sleep(timeOff)
+			continue
+		}
+
+		if resp.StatusCode != http.StatusOK {
+			remoteErr, err := ioutil.ReadAll(resp.Body)
+			if err != nil {
+				return nil
+			}
+			return fmt.Errorf("Plugin Error: %s", remoteErr)
+		}
+
+		return json.NewDecoder(resp.Body).Decode(&ret)
+	}
+}
+
+func backoff(retries int) time.Duration {
+	b, max := float64(1), float64(defaultTimeOut)
+	for b < max && retries > 0 {
+		b *= 2
+		retries--
+	}
+	if b > max {
+		b = max
+	}
+	return time.Duration(b)
+}
+
+func configureTCPTransport(tr *http.Transport, proto, addr string) {
+	// Why 32? See https://github.com/docker/docker/pull/8035.
+	timeout := 32 * time.Second
+	if proto == "unix" {
+		// No need for compression in local communications.
+		tr.DisableCompression = true
+		tr.Dial = func(_, _ string) (net.Conn, error) {
+			return net.DialTimeout(proto, addr, timeout)
+		}
+	} else {
+		tr.Proxy = http.ProxyFromEnvironment
+		tr.Dial = (&net.Dialer{Timeout: timeout}).Dial
+	}
+}

+ 63 - 0
pkg/plugins/client_test.go

@@ -0,0 +1,63 @@
+package plugins
+
+import (
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"testing"
+)
+
+var (
+	mux    *http.ServeMux
+	server *httptest.Server
+)
+
+func setupRemotePluginServer() string {
+	mux = http.NewServeMux()
+	server = httptest.NewServer(mux)
+	return server.URL
+}
+
+func teardownRemotePluginServer() {
+	if server != nil {
+		server.Close()
+	}
+}
+
+func TestFailedConnection(t *testing.T) {
+	c := NewClient("tcp://127.0.0.1:1")
+	err := c.Call("Service.Method", nil, nil)
+	if err == nil {
+		t.Fatal("Unexpected successful connection")
+	}
+}
+
+func TestEchoInputOutput(t *testing.T) {
+	addr := setupRemotePluginServer()
+	defer teardownRemotePluginServer()
+
+	m := Manifest{[]string{"VolumeDriver", "NetworkDriver"}}
+
+	mux.HandleFunc("/Test.Echo", func(w http.ResponseWriter, r *http.Request) {
+		if r.Method != "POST" {
+			t.Fatalf("Expected POST, got %s\n", r.Method)
+		}
+
+		header := w.Header()
+		header.Set("Content-Type", versionMimetype)
+
+		io.Copy(w, r.Body)
+	})
+
+	c := NewClient(addr)
+	var output Manifest
+	err := c.Call("Test.Echo", m, &output)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !reflect.DeepEqual(output, m) {
+		t.Fatalf("Expected %v, was %v\n", m, output)
+	}
+}

+ 78 - 0
pkg/plugins/discovery.go

@@ -0,0 +1,78 @@
+package plugins
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"net/url"
+	"os"
+	"path/filepath"
+	"strings"
+)
+
+const defaultLocalRegistry = "/usr/share/docker/plugins"
+
+var (
+	ErrNotFound = errors.New("Plugin not found")
+)
+
+type Registry interface {
+	Plugins() ([]*Plugin, error)
+	Plugin(name string) (*Plugin, error)
+}
+
+type LocalRegistry struct {
+	path string
+}
+
+func newLocalRegistry(path string) *LocalRegistry {
+	if len(path) == 0 {
+		path = defaultLocalRegistry
+	}
+
+	return &LocalRegistry{path}
+}
+
+func (l *LocalRegistry) Plugin(name string) (*Plugin, error) {
+	filepath := filepath.Join(l.path, name)
+	specpath := filepath + ".spec"
+	if fi, err := os.Stat(specpath); err == nil {
+		return readPluginInfo(specpath, fi)
+	}
+	socketpath := filepath + ".sock"
+	if fi, err := os.Stat(socketpath); err == nil {
+		return readPluginInfo(socketpath, fi)
+	}
+	return nil, ErrNotFound
+}
+
+func readPluginInfo(path string, fi os.FileInfo) (*Plugin, error) {
+	name := strings.Split(fi.Name(), ".")[0]
+
+	if fi.Mode()&os.ModeSocket != 0 {
+		return &Plugin{
+			Name: name,
+			Addr: "unix://" + path,
+		}, nil
+	}
+
+	content, err := ioutil.ReadFile(path)
+	if err != nil {
+		return nil, err
+	}
+	addr := strings.TrimSpace(string(content))
+
+	u, err := url.Parse(addr)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(u.Scheme) == 0 {
+		return nil, fmt.Errorf("Unknown protocol")
+	}
+
+	return &Plugin{
+		Name: name,
+		Addr: addr,
+	}, nil
+}

+ 108 - 0
pkg/plugins/discovery_test.go

@@ -0,0 +1,108 @@
+package plugins
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net"
+	"os"
+	"path"
+	"path/filepath"
+	"reflect"
+	"testing"
+)
+
+func TestUnknownLocalPath(t *testing.T) {
+	tmpdir, err := ioutil.TempDir("", "docker-test")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmpdir)
+
+	l := newLocalRegistry(filepath.Join(tmpdir, "unknown"))
+	_, err = l.Plugin("foo")
+	if err == nil || err != ErrNotFound {
+		t.Fatalf("Expected error for unknown directory")
+	}
+}
+
+func TestLocalSocket(t *testing.T) {
+	tmpdir, err := ioutil.TempDir("", "docker-test")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmpdir)
+	l, err := net.Listen("unix", filepath.Join(tmpdir, "echo.sock"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Close()
+
+	r := newLocalRegistry(tmpdir)
+	p, err := r.Plugin("echo")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pp, err := r.Plugin("echo")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(p, pp) {
+		t.Fatalf("Expected %v, was %v\n", p, pp)
+	}
+
+	if p.Name != "echo" {
+		t.Fatalf("Expected plugin `echo`, got %s\n", p.Name)
+	}
+
+	addr := fmt.Sprintf("unix://%s/echo.sock", tmpdir)
+	if p.Addr != addr {
+		t.Fatalf("Expected plugin addr `%s`, got %s\n", addr, p.Addr)
+	}
+}
+
+func TestFileSpecPlugin(t *testing.T) {
+	tmpdir, err := ioutil.TempDir("", "docker-test")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	cases := []struct {
+		path string
+		name string
+		addr string
+		fail bool
+	}{
+		{filepath.Join(tmpdir, "echo.spec"), "echo", "unix://var/lib/docker/plugins/echo.sock", false},
+		{filepath.Join(tmpdir, "foo.spec"), "foo", "tcp://localhost:8080", false},
+		{filepath.Join(tmpdir, "bar.spec"), "bar", "localhost:8080", true}, // unknown transport
+	}
+
+	for _, c := range cases {
+		if err = os.MkdirAll(path.Dir(c.path), 0755); err != nil {
+			t.Fatal(err)
+		}
+		if err = ioutil.WriteFile(c.path, []byte(c.addr), 0644); err != nil {
+			t.Fatal(err)
+		}
+
+		r := newLocalRegistry(tmpdir)
+		p, err := r.Plugin(c.name)
+		if c.fail && err == nil {
+			continue
+		}
+
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		if p.Name != c.name {
+			t.Fatalf("Expected plugin `%s`, got %s\n", c.name, p.Name)
+		}
+
+		if p.Addr != c.addr {
+			t.Fatalf("Expected plugin addr `%s`, got %s\n", c.addr, p.Addr)
+		}
+		os.Remove(c.path)
+	}
+}

+ 100 - 0
pkg/plugins/plugins.go

@@ -0,0 +1,100 @@
+package plugins
+
+import (
+	"errors"
+	"sync"
+
+	"github.com/Sirupsen/logrus"
+)
+
+var (
+	ErrNotImplements = errors.New("Plugin does not implement the requested driver")
+)
+
+type plugins struct {
+	sync.Mutex
+	plugins map[string]*Plugin
+}
+
+var (
+	storage          = plugins{plugins: make(map[string]*Plugin)}
+	extpointHandlers = make(map[string]func(string, *Client))
+)
+
+type Manifest struct {
+	Implements []string
+}
+
+type Plugin struct {
+	Name     string
+	Addr     string
+	Client   *Client
+	Manifest *Manifest
+}
+
+func (p *Plugin) activate() error {
+	m := new(Manifest)
+	p.Client = NewClient(p.Addr)
+	err := p.Client.Call("Plugin.Activate", nil, m)
+	if err != nil {
+		return err
+	}
+
+	logrus.Debugf("%s's manifest: %v", p.Name, m)
+	p.Manifest = m
+	for _, iface := range m.Implements {
+		handler, handled := extpointHandlers[iface]
+		if !handled {
+			continue
+		}
+		handler(p.Name, p.Client)
+	}
+	return nil
+}
+
+func load(name string) (*Plugin, error) {
+	registry := newLocalRegistry("")
+	pl, err := registry.Plugin(name)
+	if err != nil {
+		return nil, err
+	}
+	if err := pl.activate(); err != nil {
+		return nil, err
+	}
+	return pl, nil
+}
+
+func get(name string) (*Plugin, error) {
+	storage.Lock()
+	defer storage.Unlock()
+	pl, ok := storage.plugins[name]
+	if ok {
+		return pl, nil
+	}
+	pl, err := load(name)
+	if err != nil {
+		return nil, err
+	}
+
+	logrus.Debugf("Plugin: %v", pl)
+	storage.plugins[name] = pl
+	return pl, nil
+}
+
+func Get(name, imp string) (*Plugin, error) {
+	pl, err := get(name)
+	if err != nil {
+		return nil, err
+	}
+	for _, driver := range pl.Manifest.Implements {
+		logrus.Debugf("%s implements: %s", name, driver)
+		if driver == imp {
+			return pl, nil
+		}
+	}
+	return nil, ErrNotImplements
+}
+
+func Handle(iface string, fn func(string, *Client)) {
+	extpointHandlers[iface] = fn
+}