|
@@ -3,37 +3,39 @@
|
|
|
package plugin
|
|
|
|
|
|
import (
|
|
|
- "bytes"
|
|
|
+ "archive/tar"
|
|
|
+ "compress/gzip"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
"net/http"
|
|
|
"os"
|
|
|
+ "path"
|
|
|
"path/filepath"
|
|
|
- "reflect"
|
|
|
- "regexp"
|
|
|
+ "strings"
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
+ "github.com/docker/distribution/digest"
|
|
|
+ "github.com/docker/distribution/manifest/schema2"
|
|
|
"github.com/docker/docker/api/types"
|
|
|
- "github.com/docker/docker/pkg/archive"
|
|
|
+ "github.com/docker/docker/distribution"
|
|
|
+ progressutils "github.com/docker/docker/distribution/utils"
|
|
|
+ "github.com/docker/docker/distribution/xfer"
|
|
|
+ "github.com/docker/docker/image"
|
|
|
+ "github.com/docker/docker/layer"
|
|
|
"github.com/docker/docker/pkg/chrootarchive"
|
|
|
- "github.com/docker/docker/pkg/stringid"
|
|
|
- "github.com/docker/docker/plugin/distribution"
|
|
|
+ "github.com/docker/docker/pkg/pools"
|
|
|
+ "github.com/docker/docker/pkg/progress"
|
|
|
"github.com/docker/docker/plugin/v2"
|
|
|
"github.com/docker/docker/reference"
|
|
|
"github.com/pkg/errors"
|
|
|
"golang.org/x/net/context"
|
|
|
)
|
|
|
|
|
|
-var (
|
|
|
- validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
|
|
|
- validPartialID = regexp.MustCompile(`^([a-f0-9]{1,64})$`)
|
|
|
-)
|
|
|
-
|
|
|
// Disable deactivates a plugin. This means resources (volumes, networks) cant use them.
|
|
|
-func (pm *Manager) Disable(name string, config *types.PluginDisableConfig) error {
|
|
|
- p, err := pm.pluginStore.GetByName(name)
|
|
|
+func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) error {
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(refOrID)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -48,13 +50,13 @@ func (pm *Manager) Disable(name string, config *types.PluginDisableConfig) error
|
|
|
if err := pm.disable(p, c); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- pm.pluginEventLogger(p.GetID(), name, "disable")
|
|
|
+ pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// Enable activates a plugin, which implies that they are ready to be used by containers.
|
|
|
-func (pm *Manager) Enable(name string, config *types.PluginEnableConfig) error {
|
|
|
- p, err := pm.pluginStore.GetByName(name)
|
|
|
+func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) error {
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(refOrID)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -63,71 +65,74 @@ func (pm *Manager) Enable(name string, config *types.PluginEnableConfig) error {
|
|
|
if err := pm.enable(p, c, false); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- pm.pluginEventLogger(p.GetID(), name, "enable")
|
|
|
+ pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// Inspect examines a plugin config
|
|
|
-func (pm *Manager) Inspect(refOrID string) (tp types.Plugin, err error) {
|
|
|
- // Match on full ID
|
|
|
- if validFullID.MatchString(refOrID) {
|
|
|
- p, err := pm.pluginStore.GetByID(refOrID)
|
|
|
- if err == nil {
|
|
|
- return p.PluginObj, nil
|
|
|
- }
|
|
|
+func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(refOrID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
- // Match on full name
|
|
|
- if pluginName, err := getPluginName(refOrID); err == nil {
|
|
|
- if p, err := pm.pluginStore.GetByName(pluginName); err == nil {
|
|
|
- return p.PluginObj, nil
|
|
|
- }
|
|
|
- }
|
|
|
+ return &p.PluginObj, nil
|
|
|
+}
|
|
|
|
|
|
- // Match on partial ID
|
|
|
- if validPartialID.MatchString(refOrID) {
|
|
|
- p, err := pm.pluginStore.Search(refOrID)
|
|
|
- if err == nil {
|
|
|
- return p.PluginObj, nil
|
|
|
- }
|
|
|
- return tp, err
|
|
|
+func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error {
|
|
|
+ if outStream != nil {
|
|
|
+ // Include a buffer so that slow client connections don't affect
|
|
|
+ // transfer performance.
|
|
|
+ progressChan := make(chan progress.Progress, 100)
|
|
|
+
|
|
|
+ writesDone := make(chan struct{})
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ close(progressChan)
|
|
|
+ <-writesDone
|
|
|
+ }()
|
|
|
+
|
|
|
+ var cancelFunc context.CancelFunc
|
|
|
+ ctx, cancelFunc = context.WithCancel(ctx)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
|
|
|
+ close(writesDone)
|
|
|
+ }()
|
|
|
+
|
|
|
+ config.ProgressOutput = progress.ChanOutput(progressChan)
|
|
|
+ } else {
|
|
|
+ config.ProgressOutput = progress.DiscardOutput()
|
|
|
}
|
|
|
+ return distribution.Pull(ctx, ref, config)
|
|
|
+}
|
|
|
|
|
|
- return tp, fmt.Errorf("no such plugin name or ID associated with %q", refOrID)
|
|
|
+type tempConfigStore struct {
|
|
|
+ config []byte
|
|
|
+ configDigest digest.Digest
|
|
|
}
|
|
|
|
|
|
-func (pm *Manager) pull(name string, metaHeader http.Header, authConfig *types.AuthConfig) (reference.Named, distribution.PullData, error) {
|
|
|
- ref, err := distribution.GetRef(name)
|
|
|
- if err != nil {
|
|
|
- logrus.Debugf("error in distribution.GetRef: %v", err)
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
- name = ref.String()
|
|
|
+func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) {
|
|
|
+ dgst := digest.FromBytes(c)
|
|
|
|
|
|
- if p, _ := pm.pluginStore.GetByName(name); p != nil {
|
|
|
- logrus.Debug("plugin already exists")
|
|
|
- return nil, nil, fmt.Errorf("%s exists", name)
|
|
|
- }
|
|
|
+ s.config = c
|
|
|
+ s.configDigest = dgst
|
|
|
|
|
|
- pd, err := distribution.Pull(ref, pm.registryService, metaHeader, authConfig)
|
|
|
- if err != nil {
|
|
|
- logrus.Debugf("error in distribution.Pull(): %v", err)
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
- return ref, pd, nil
|
|
|
+ return dgst, nil
|
|
|
}
|
|
|
|
|
|
-func computePrivileges(pd distribution.PullData) (types.PluginPrivileges, error) {
|
|
|
- config, err := pd.Config()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
+func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) {
|
|
|
+ if d != s.configDigest {
|
|
|
+ return nil, digest.ErrDigestNotFound
|
|
|
}
|
|
|
+ return s.config, nil
|
|
|
+}
|
|
|
|
|
|
- var c types.PluginConfig
|
|
|
- if err := json.Unmarshal(config, &c); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
|
|
|
+ return configToRootFS(c)
|
|
|
+}
|
|
|
|
|
|
+func computePrivileges(c types.PluginConfig) (types.PluginPrivileges, error) {
|
|
|
var privileges types.PluginPrivileges
|
|
|
if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
|
|
|
privileges = append(privileges, types.PluginPrivilege{
|
|
@@ -173,67 +178,89 @@ func computePrivileges(pd distribution.PullData) (types.PluginPrivileges, error)
|
|
|
}
|
|
|
|
|
|
// Privileges pulls a plugin config and computes the privileges required to install it.
|
|
|
-func (pm *Manager) Privileges(name string, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
|
|
|
- _, pd, err := pm.pull(name, metaHeader, authConfig)
|
|
|
- if err != nil {
|
|
|
+func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
|
|
|
+ // create image store instance
|
|
|
+ cs := &tempConfigStore{}
|
|
|
+
|
|
|
+ // DownloadManager not defined because only pulling configuration.
|
|
|
+ pluginPullConfig := &distribution.ImagePullConfig{
|
|
|
+ Config: distribution.Config{
|
|
|
+ MetaHeaders: metaHeader,
|
|
|
+ AuthConfig: authConfig,
|
|
|
+ RegistryService: pm.config.RegistryService,
|
|
|
+ ImageEventLogger: func(string, string, string) {},
|
|
|
+ ImageStore: cs,
|
|
|
+ },
|
|
|
+ Schema2Types: distribution.PluginTypes,
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- return computePrivileges(pd)
|
|
|
+
|
|
|
+ if cs.config == nil {
|
|
|
+ return nil, errors.New("no configuration pulled")
|
|
|
+ }
|
|
|
+ var config types.PluginConfig
|
|
|
+ if err := json.Unmarshal(cs.config, &config); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return computePrivileges(config)
|
|
|
}
|
|
|
|
|
|
// Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
|
|
|
-func (pm *Manager) Pull(name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges) (err error) {
|
|
|
- ref, pd, err := pm.pull(name, metaHeader, authConfig)
|
|
|
+func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
|
|
|
+ pm.muGC.RLock()
|
|
|
+ defer pm.muGC.RUnlock()
|
|
|
+
|
|
|
+ // revalidate because Pull is public
|
|
|
+ nameref, err := reference.ParseNamed(name)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return errors.Wrapf(err, "failed to parse %q", name)
|
|
|
}
|
|
|
+ name = reference.WithDefaultTag(nameref).String()
|
|
|
|
|
|
- requiredPrivileges, err := computePrivileges(pd)
|
|
|
- if err != nil {
|
|
|
+ if err := pm.config.Store.validateName(name); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- if !reflect.DeepEqual(privileges, requiredPrivileges) {
|
|
|
- return errors.New("incorrect privileges")
|
|
|
- }
|
|
|
+ tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
|
|
|
+ defer os.RemoveAll(tmpRootFSDir)
|
|
|
|
|
|
- pluginID := stringid.GenerateNonCryptoID()
|
|
|
- pluginDir := filepath.Join(pm.libRoot, pluginID)
|
|
|
- if err := os.MkdirAll(pluginDir, 0755); err != nil {
|
|
|
- logrus.Debugf("error in MkdirAll: %v", err)
|
|
|
- return err
|
|
|
+ dm := &downloadManager{
|
|
|
+ tmpDir: tmpRootFSDir,
|
|
|
+ blobStore: pm.blobStore,
|
|
|
}
|
|
|
|
|
|
- defer func() {
|
|
|
- if err != nil {
|
|
|
- if delErr := os.RemoveAll(pluginDir); delErr != nil {
|
|
|
- logrus.Warnf("unable to remove %q from failed plugin pull: %v", pluginDir, delErr)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
+ pluginPullConfig := &distribution.ImagePullConfig{
|
|
|
+ Config: distribution.Config{
|
|
|
+ MetaHeaders: metaHeader,
|
|
|
+ AuthConfig: authConfig,
|
|
|
+ RegistryService: pm.config.RegistryService,
|
|
|
+ ImageEventLogger: pm.config.LogPluginEvent,
|
|
|
+ ImageStore: dm,
|
|
|
+ },
|
|
|
+ DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
|
|
|
+ Schema2Types: distribution.PluginTypes,
|
|
|
+ }
|
|
|
|
|
|
- err = distribution.WritePullData(pd, filepath.Join(pm.libRoot, pluginID), true)
|
|
|
+ err = pm.pull(ctx, ref, pluginPullConfig, outStream)
|
|
|
if err != nil {
|
|
|
- logrus.Debugf("error in distribution.WritePullData(): %v", err)
|
|
|
+ go pm.GC()
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- tag := distribution.GetTag(ref)
|
|
|
- p := v2.NewPlugin(ref.Name(), pluginID, pm.runRoot, pm.libRoot, tag)
|
|
|
- err = p.InitPlugin()
|
|
|
- if err != nil {
|
|
|
+ if _, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- pm.pluginStore.Add(p)
|
|
|
-
|
|
|
- pm.pluginEventLogger(pluginID, ref.String(), "pull")
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// List displays the list of plugins and associated metadata.
|
|
|
func (pm *Manager) List() ([]types.Plugin, error) {
|
|
|
- plugins := pm.pluginStore.GetAll()
|
|
|
+ plugins := pm.config.Store.GetAll()
|
|
|
out := make([]types.Plugin, 0, len(plugins))
|
|
|
for _, p := range plugins {
|
|
|
out = append(out, p.PluginObj)
|
|
@@ -242,38 +269,211 @@ func (pm *Manager) List() ([]types.Plugin, error) {
|
|
|
}
|
|
|
|
|
|
// Push pushes a plugin to the store.
|
|
|
-func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.AuthConfig) error {
|
|
|
- p, err := pm.pluginStore.GetByName(name)
|
|
|
+func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(name)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- dest := filepath.Join(pm.libRoot, p.GetID())
|
|
|
- config, err := ioutil.ReadFile(filepath.Join(dest, "config.json"))
|
|
|
+
|
|
|
+ ref, err := reference.ParseNamed(p.Name())
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
|
|
|
}
|
|
|
|
|
|
- var dummy types.Plugin
|
|
|
- err = json.Unmarshal(config, &dummy)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ var po progress.Output
|
|
|
+ if outStream != nil {
|
|
|
+ // Include a buffer so that slow client connections don't affect
|
|
|
+ // transfer performance.
|
|
|
+ progressChan := make(chan progress.Progress, 100)
|
|
|
+
|
|
|
+ writesDone := make(chan struct{})
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ close(progressChan)
|
|
|
+ <-writesDone
|
|
|
+ }()
|
|
|
+
|
|
|
+ var cancelFunc context.CancelFunc
|
|
|
+ ctx, cancelFunc = context.WithCancel(ctx)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
|
|
|
+ close(writesDone)
|
|
|
+ }()
|
|
|
+
|
|
|
+ po = progress.ChanOutput(progressChan)
|
|
|
+ } else {
|
|
|
+ po = progress.DiscardOutput()
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: replace these with manager
|
|
|
+ is := &pluginConfigStore{
|
|
|
+ pm: pm,
|
|
|
+ plugin: p,
|
|
|
+ }
|
|
|
+ ls := &pluginLayerProvider{
|
|
|
+ pm: pm,
|
|
|
+ plugin: p,
|
|
|
+ }
|
|
|
+ rs := &pluginReference{
|
|
|
+ name: ref,
|
|
|
+ pluginID: p.Config,
|
|
|
}
|
|
|
|
|
|
- rootfs, err := archive.Tar(p.Rootfs, archive.Gzip)
|
|
|
+ uploadManager := xfer.NewLayerUploadManager(3)
|
|
|
+
|
|
|
+ imagePushConfig := &distribution.ImagePushConfig{
|
|
|
+ Config: distribution.Config{
|
|
|
+ MetaHeaders: metaHeader,
|
|
|
+ AuthConfig: authConfig,
|
|
|
+ ProgressOutput: po,
|
|
|
+ RegistryService: pm.config.RegistryService,
|
|
|
+ ReferenceStore: rs,
|
|
|
+ ImageEventLogger: pm.config.LogPluginEvent,
|
|
|
+ ImageStore: is,
|
|
|
+ RequireSchema2: true,
|
|
|
+ },
|
|
|
+ ConfigMediaType: schema2.MediaTypePluginConfig,
|
|
|
+ LayerStore: ls,
|
|
|
+ UploadManager: uploadManager,
|
|
|
+ }
|
|
|
+
|
|
|
+ return distribution.Push(ctx, ref, imagePushConfig)
|
|
|
+}
|
|
|
+
|
|
|
+type pluginReference struct {
|
|
|
+ name reference.Named
|
|
|
+ pluginID digest.Digest
|
|
|
+}
|
|
|
+
|
|
|
+func (r *pluginReference) References(id digest.Digest) []reference.Named {
|
|
|
+ if r.pluginID != id {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return []reference.Named{r.name}
|
|
|
+}
|
|
|
+
|
|
|
+func (r *pluginReference) ReferencesByName(ref reference.Named) []reference.Association {
|
|
|
+ return []reference.Association{
|
|
|
+ {
|
|
|
+ Ref: r.name,
|
|
|
+ ID: r.pluginID,
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) {
|
|
|
+ if r.name.String() != ref.String() {
|
|
|
+ return digest.Digest(""), reference.ErrDoesNotExist
|
|
|
+ }
|
|
|
+ return r.pluginID, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error {
|
|
|
+ // Read only, ignore
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error {
|
|
|
+ // Read only, ignore
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (r *pluginReference) Delete(ref reference.Named) (bool, error) {
|
|
|
+ // Read only, ignore
|
|
|
+ return false, nil
|
|
|
+}
|
|
|
+
|
|
|
+type pluginConfigStore struct {
|
|
|
+ pm *Manager
|
|
|
+ plugin *v2.Plugin
|
|
|
+}
|
|
|
+
|
|
|
+func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) {
|
|
|
+ return digest.Digest(""), errors.New("cannot store config on push")
|
|
|
+}
|
|
|
+
|
|
|
+func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) {
|
|
|
+ if s.plugin.Config != d {
|
|
|
+ return nil, errors.New("plugin not found")
|
|
|
+ }
|
|
|
+ rwc, err := s.pm.blobStore.Get(d)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ defer rwc.Close()
|
|
|
+ return ioutil.ReadAll(rwc)
|
|
|
+}
|
|
|
+
|
|
|
+func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
|
|
|
+ return configToRootFS(c)
|
|
|
+}
|
|
|
+
|
|
|
+type pluginLayerProvider struct {
|
|
|
+ pm *Manager
|
|
|
+ plugin *v2.Plugin
|
|
|
+}
|
|
|
+
|
|
|
+func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) {
|
|
|
+ rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs)
|
|
|
+ var i int
|
|
|
+ for i = 1; i <= len(rootFS.DiffIDs); i++ {
|
|
|
+ if layer.CreateChainID(rootFS.DiffIDs[:i]) == id {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if i > len(rootFS.DiffIDs) {
|
|
|
+ return nil, errors.New("layer not found")
|
|
|
+ }
|
|
|
+ return &pluginLayer{
|
|
|
+ pm: p.pm,
|
|
|
+ diffIDs: rootFS.DiffIDs[:i],
|
|
|
+ blobs: p.plugin.Blobsums[:i],
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+type pluginLayer struct {
|
|
|
+ pm *Manager
|
|
|
+ diffIDs []layer.DiffID
|
|
|
+ blobs []digest.Digest
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) ChainID() layer.ChainID {
|
|
|
+ return layer.CreateChainID(l.diffIDs)
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) DiffID() layer.DiffID {
|
|
|
+ return l.diffIDs[len(l.diffIDs)-1]
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) Parent() distribution.PushLayer {
|
|
|
+ if len(l.diffIDs) == 1 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return &pluginLayer{
|
|
|
+ pm: l.pm,
|
|
|
+ diffIDs: l.diffIDs[:len(l.diffIDs)-1],
|
|
|
+ blobs: l.blobs[:len(l.diffIDs)-1],
|
|
|
}
|
|
|
- defer rootfs.Close()
|
|
|
+}
|
|
|
|
|
|
- _, err = distribution.Push(name, pm.registryService, metaHeader, authConfig, ioutil.NopCloser(bytes.NewReader(config)), rootfs)
|
|
|
- // XXX: Ignore returning digest for now.
|
|
|
- // Since digest needs to be written to the ProgressWriter.
|
|
|
- return err
|
|
|
+func (l *pluginLayer) Open() (io.ReadCloser, error) {
|
|
|
+ return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1])
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) Size() (int64, error) {
|
|
|
+ return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1])
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) MediaType() string {
|
|
|
+ return schema2.MediaTypeLayer
|
|
|
+}
|
|
|
+
|
|
|
+func (l *pluginLayer) Release() {
|
|
|
+ // Nothing needs to be release, no references held
|
|
|
}
|
|
|
|
|
|
// Remove deletes plugin's root directory.
|
|
|
-func (pm *Manager) Remove(name string, config *types.PluginRmConfig) (err error) {
|
|
|
- p, err := pm.pluginStore.GetByName(name)
|
|
|
+func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(name)
|
|
|
pm.mu.RLock()
|
|
|
c := pm.cMap[p]
|
|
|
pm.mu.RUnlock()
|
|
@@ -297,95 +497,194 @@ func (pm *Manager) Remove(name string, config *types.PluginRmConfig) (err error)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- id := p.GetID()
|
|
|
- pluginDir := filepath.Join(pm.libRoot, id)
|
|
|
-
|
|
|
defer func() {
|
|
|
- if err == nil || config.ForceRemove {
|
|
|
- pm.pluginStore.Remove(p)
|
|
|
- pm.pluginEventLogger(id, name, "remove")
|
|
|
- }
|
|
|
+ go pm.GC()
|
|
|
}()
|
|
|
|
|
|
- if err = os.RemoveAll(pluginDir); err != nil {
|
|
|
- return errors.Wrap(err, "failed to remove plugin directory")
|
|
|
+ id := p.GetID()
|
|
|
+ pm.config.Store.Remove(p)
|
|
|
+ pluginDir := filepath.Join(pm.config.Root, id)
|
|
|
+ if err := os.RemoveAll(pluginDir); err != nil {
|
|
|
+ logrus.Warnf("unable to remove %q from plugin remove: %v", pluginDir, err)
|
|
|
}
|
|
|
+ pm.config.LogPluginEvent(id, name, "remove")
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// Set sets plugin args
|
|
|
func (pm *Manager) Set(name string, args []string) error {
|
|
|
- p, err := pm.pluginStore.GetByName(name)
|
|
|
+ p, err := pm.config.Store.GetV2Plugin(name)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- return p.Set(args)
|
|
|
+ if err := p.Set(args); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return pm.save(p)
|
|
|
}
|
|
|
|
|
|
// CreateFromContext creates a plugin from the given pluginDir which contains
|
|
|
// both the rootfs and the config.json and a repoName with optional tag.
|
|
|
-func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.Reader, options *types.PluginCreateOptions) error {
|
|
|
- repoName := options.RepoName
|
|
|
- ref, err := distribution.GetRef(repoName)
|
|
|
+func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *types.PluginCreateOptions) (err error) {
|
|
|
+ pm.muGC.RLock()
|
|
|
+ defer pm.muGC.RUnlock()
|
|
|
+
|
|
|
+ ref, err := reference.ParseNamed(options.RepoName)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return errors.Wrapf(err, "failed to parse reference %v", options.RepoName)
|
|
|
+ }
|
|
|
+ if _, ok := ref.(reference.Canonical); ok {
|
|
|
+ return errors.Errorf("canonical references are not permitted")
|
|
|
}
|
|
|
+ name := reference.WithDefaultTag(ref).String()
|
|
|
|
|
|
- name := ref.Name()
|
|
|
- tag := distribution.GetTag(ref)
|
|
|
- pluginID := stringid.GenerateNonCryptoID()
|
|
|
+ if err := pm.config.Store.validateName(name); err != nil { // fast check, real check is in createPlugin()
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- p := v2.NewPlugin(name, pluginID, pm.runRoot, pm.libRoot, tag)
|
|
|
+ tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
|
|
|
+ defer os.RemoveAll(tmpRootFSDir)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "failed to create temp directory")
|
|
|
+ }
|
|
|
+ var configJSON []byte
|
|
|
+ rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
|
|
|
|
|
|
- if v, _ := pm.pluginStore.GetByName(p.Name()); v != nil {
|
|
|
- return fmt.Errorf("plugin %q already exists", p.Name())
|
|
|
+ rootFSBlob, err := pm.blobStore.New()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
+ defer rootFSBlob.Close()
|
|
|
+ gzw := gzip.NewWriter(rootFSBlob)
|
|
|
+ layerDigester := digest.Canonical.New()
|
|
|
+ rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash()))
|
|
|
|
|
|
- pluginDir := filepath.Join(pm.libRoot, pluginID)
|
|
|
- if err := os.MkdirAll(pluginDir, 0755); err != nil {
|
|
|
+ if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := rootFS.Close(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // In case an error happens, remove the created directory.
|
|
|
- if err := pm.createFromContext(ctx, tarCtx, pluginDir, repoName, p); err != nil {
|
|
|
- if err := os.RemoveAll(pluginDir); err != nil {
|
|
|
- logrus.Warnf("unable to remove %q from failed plugin creation: %v", pluginDir, err)
|
|
|
- }
|
|
|
+ if configJSON == nil {
|
|
|
+ return errors.New("config not found")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := gzw.Close(); err != nil {
|
|
|
+ return errors.Wrap(err, "error closing gzip writer")
|
|
|
+ }
|
|
|
+
|
|
|
+ var config types.PluginConfig
|
|
|
+ if err := json.Unmarshal(configJSON, &config); err != nil {
|
|
|
+ return errors.Wrap(err, "failed to parse config")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := pm.validateConfig(config); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
-}
|
|
|
+ pm.mu.Lock()
|
|
|
+ defer pm.mu.Unlock()
|
|
|
|
|
|
-func (pm *Manager) createFromContext(ctx context.Context, tarCtx io.Reader, pluginDir, repoName string, p *v2.Plugin) error {
|
|
|
- if err := chrootarchive.Untar(tarCtx, pluginDir, nil); err != nil {
|
|
|
+ rootFSBlobsum, err := rootFSBlob.Commit()
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ go pm.GC()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ config.Rootfs = &types.PluginConfigRootfs{
|
|
|
+ Type: "layers",
|
|
|
+ DiffIds: []string{layerDigester.Digest().String()},
|
|
|
+ }
|
|
|
|
|
|
- if err := p.InitPlugin(); err != nil {
|
|
|
+ configBlob, err := pm.blobStore.New()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ defer configBlob.Close()
|
|
|
+ if err := json.NewEncoder(configBlob).Encode(config); err != nil {
|
|
|
+ return errors.Wrap(err, "error encoding json config")
|
|
|
+ }
|
|
|
+ configBlobsum, err := configBlob.Commit()
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- if err := pm.pluginStore.Add(p); err != nil {
|
|
|
+ p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil)
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- pm.pluginEventLogger(p.GetID(), repoName, "create")
|
|
|
+ pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func getPluginName(name string) (string, error) {
|
|
|
- named, err := reference.ParseNamed(name) // FIXME: validate
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- if reference.IsNameOnly(named) {
|
|
|
- named = reference.WithDefaultTag(named)
|
|
|
- }
|
|
|
- ref, ok := named.(reference.NamedTagged)
|
|
|
- if !ok {
|
|
|
- return "", fmt.Errorf("invalid name: %s", named.String())
|
|
|
- }
|
|
|
- return ref.String(), nil
|
|
|
+func (pm *Manager) validateConfig(config types.PluginConfig) error {
|
|
|
+ return nil // TODO:
|
|
|
+}
|
|
|
+
|
|
|
+func splitConfigRootFSFromTar(in io.ReadCloser, config *[]byte) io.ReadCloser {
|
|
|
+ pr, pw := io.Pipe()
|
|
|
+ go func() {
|
|
|
+ tarReader := tar.NewReader(in)
|
|
|
+ tarWriter := tar.NewWriter(pw)
|
|
|
+ defer in.Close()
|
|
|
+
|
|
|
+ hasRootFS := false
|
|
|
+
|
|
|
+ for {
|
|
|
+ hdr, err := tarReader.Next()
|
|
|
+ if err == io.EOF {
|
|
|
+ if !hasRootFS {
|
|
|
+ pw.CloseWithError(errors.Wrap(err, "no rootfs found"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // Signals end of archive.
|
|
|
+ tarWriter.Close()
|
|
|
+ pw.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ pw.CloseWithError(errors.Wrap(err, "failed to read from tar"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ content := io.Reader(tarReader)
|
|
|
+ name := path.Clean(hdr.Name)
|
|
|
+ if path.IsAbs(name) {
|
|
|
+ name = name[1:]
|
|
|
+ }
|
|
|
+ if name == configFileName {
|
|
|
+ dt, err := ioutil.ReadAll(content)
|
|
|
+ if err != nil {
|
|
|
+ pw.CloseWithError(errors.Wrapf(err, "failed to read %s", configFileName))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ *config = dt
|
|
|
+ }
|
|
|
+ if parts := strings.Split(name, "/"); len(parts) != 0 && parts[0] == rootFSFileName {
|
|
|
+ hdr.Name = path.Clean(path.Join(parts[1:]...))
|
|
|
+ if hdr.Typeflag == tar.TypeLink && strings.HasPrefix(strings.ToLower(hdr.Linkname), rootFSFileName+"/") {
|
|
|
+ hdr.Linkname = hdr.Linkname[len(rootFSFileName)+1:]
|
|
|
+ }
|
|
|
+ if err := tarWriter.WriteHeader(hdr); err != nil {
|
|
|
+ pw.CloseWithError(errors.Wrap(err, "error writing tar header"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if _, err := pools.Copy(tarWriter, content); err != nil {
|
|
|
+ pw.CloseWithError(errors.Wrap(err, "error copying tar data"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ hasRootFS = true
|
|
|
+ } else {
|
|
|
+ io.Copy(ioutil.Discard, content)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return pr
|
|
|
}
|