Browse Source

Merge pull request #39946 from cpuguy83/plugins_use_containerd_dist

Replace plugin distribution code w/ containerd
Akihiro Suda 5 năm trước cách đây
mục cha
commit
ba8129b28a

+ 4 - 0
integration/plugin/common/main_test.go

@@ -5,12 +5,16 @@ import (
 	"os"
 	"testing"
 
+	"github.com/docker/docker/pkg/reexec"
 	"github.com/docker/docker/testutil/environment"
 )
 
 var testEnv *environment.Execution
 
 func TestMain(m *testing.M) {
+	if reexec.Init() {
+		return
+	}
 	var err error
 	testEnv, err = environment.New()
 	if err != nil {

+ 121 - 0
integration/plugin/common/plugin_test.go

@@ -1,12 +1,25 @@
 package common // import "github.com/docker/docker/integration/plugin/common"
 
 import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"io"
+	"io/ioutil"
+	"net"
 	"net/http"
+	"path"
+	"strings"
 	"testing"
 
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/testutil/daemon"
+	"github.com/docker/docker/testutil/fixtures/plugin"
+	"github.com/docker/docker/testutil/registry"
 	"github.com/docker/docker/testutil/request"
 	"gotest.tools/v3/assert"
 	is "gotest.tools/v3/assert/cmp"
+	"gotest.tools/v3/skip"
 )
 
 func TestPluginInvalidJSON(t *testing.T) {
@@ -36,3 +49,111 @@ func TestPluginInvalidJSON(t *testing.T) {
 		})
 	}
 }
+
+func TestPluginInstall(t *testing.T) {
+	skip.If(t, testEnv.IsRemoteDaemon, "cannot run daemon when remote daemon")
+	skip.If(t, testEnv.OSType == "windows")
+	skip.If(t, testEnv.IsRootless, "rootless mode has different view of localhost")
+
+	ctx := context.Background()
+	client := testEnv.APIClient()
+
+	t.Run("no auth", func(t *testing.T) {
+		defer setupTest(t)()
+
+		reg := registry.NewV2(t)
+		defer reg.Close()
+
+		name := "test-" + strings.ToLower(t.Name())
+		repo := path.Join(registry.DefaultURL, name+":latest")
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil))
+
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo})
+		assert.NilError(t, err)
+		defer rdr.Close()
+
+		_, err = io.Copy(ioutil.Discard, rdr)
+		assert.NilError(t, err)
+
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
+		assert.NilError(t, err)
+	})
+
+	t.Run("with htpasswd", func(t *testing.T) {
+		defer setupTest(t)()
+
+		reg := registry.NewV2(t, registry.Htpasswd)
+		defer reg.Close()
+
+		name := "test-" + strings.ToLower(t.Name())
+		repo := path.Join(registry.DefaultURL, name+":latest")
+		auth := &types.AuthConfig{ServerAddress: registry.DefaultURL, Username: "testuser", Password: "testpassword"}
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, auth))
+
+		authEncoded, err := json.Marshal(auth)
+		assert.NilError(t, err)
+
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{
+			RegistryAuth: base64.URLEncoding.EncodeToString(authEncoded),
+			Disabled:     true,
+			RemoteRef:    repo,
+		})
+		assert.NilError(t, err)
+		defer rdr.Close()
+
+		_, err = io.Copy(ioutil.Discard, rdr)
+		assert.NilError(t, err)
+
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
+		assert.NilError(t, err)
+	})
+	t.Run("with insecure", func(t *testing.T) {
+		skip.If(t, !testEnv.IsLocalDaemon())
+
+		addrs, err := net.InterfaceAddrs()
+		assert.NilError(t, err)
+
+		var bindTo string
+		for _, addr := range addrs {
+			ip, ok := addr.(*net.IPNet)
+			if !ok {
+				continue
+			}
+			if ip.IP.IsLoopback() || ip.IP.To4() == nil {
+				continue
+			}
+			bindTo = ip.IP.String()
+		}
+
+		if bindTo == "" {
+			t.Skip("No suitable interface to bind registry to")
+		}
+
+		regURL := bindTo + ":5000"
+
+		d := daemon.New(t)
+		defer d.Stop(t)
+
+		d.Start(t, "--insecure-registry="+regURL)
+		defer d.Stop(t)
+
+		reg := registry.NewV2(t, registry.URL(regURL))
+		defer reg.Close()
+
+		name := "test-" + strings.ToLower(t.Name())
+		repo := path.Join(regURL, name+":latest")
+		assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil, plugin.WithInsecureRegistry(regURL)))
+
+		client := d.NewClientT(t)
+		rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo})
+		assert.NilError(t, err)
+		defer rdr.Close()
+
+		_, err = io.Copy(ioutil.Discard, rdr)
+		assert.NilError(t, err)
+
+		_, _, err = client.PluginInspectWithRaw(ctx, repo)
+		assert.NilError(t, err)
+	})
+	// TODO: test insecure registry with https
+}

+ 249 - 297
plugin/backend_linux.go

@@ -2,6 +2,7 @@ package plugin // import "github.com/docker/docker/plugin"
 
 import (
 	"archive/tar"
+	"bytes"
 	"compress/gzip"
 	"context"
 	"encoding/json"
@@ -11,27 +12,27 @@ import (
 	"os"
 	"path"
 	"path/filepath"
-	"runtime"
 	"strings"
+	"time"
 
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/images"
+	"github.com/containerd/containerd/platforms"
+	"github.com/containerd/containerd/remotes"
+	"github.com/containerd/containerd/remotes/docker"
 	"github.com/docker/distribution/manifest/schema2"
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
-	"github.com/docker/docker/distribution"
-	progressutils "github.com/docker/docker/distribution/utils"
-	"github.com/docker/docker/distribution/xfer"
 	"github.com/docker/docker/dockerversion"
 	"github.com/docker/docker/errdefs"
-	"github.com/docker/docker/image"
-	"github.com/docker/docker/layer"
 	"github.com/docker/docker/pkg/authorization"
 	"github.com/docker/docker/pkg/chrootarchive"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/progress"
+	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/system"
 	v2 "github.com/docker/docker/plugin/v2"
-	refstore "github.com/docker/docker/reference"
 	"github.com/moby/sys/mount"
 	digest "github.com/opencontainers/go-digest"
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
@@ -98,64 +99,6 @@ func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
 	return &p.PluginObj, nil
 }
 
-func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error {
-	if outStream != nil {
-		// Include a buffer so that slow client connections don't affect
-		// transfer performance.
-		progressChan := make(chan progress.Progress, 100)
-
-		writesDone := make(chan struct{})
-
-		defer func() {
-			close(progressChan)
-			<-writesDone
-		}()
-
-		var cancelFunc context.CancelFunc
-		ctx, cancelFunc = context.WithCancel(ctx)
-
-		go func() {
-			progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
-			close(writesDone)
-		}()
-
-		config.ProgressOutput = progress.ChanOutput(progressChan)
-	} else {
-		config.ProgressOutput = progress.DiscardOutput()
-	}
-	return distribution.Pull(ctx, ref, config)
-}
-
-type tempConfigStore struct {
-	config       []byte
-	configDigest digest.Digest
-}
-
-func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) {
-	dgst := digest.FromBytes(c)
-
-	s.config = c
-	s.configDigest = dgst
-
-	return dgst, nil
-}
-
-func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) {
-	if d != s.configDigest {
-		return nil, errNotFound("digest not found")
-	}
-	return s.config, nil
-}
-
-func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
-	return configToRootFS(c)
-}
-
-func (s *tempConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) {
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
-	return &specs.Platform{OS: runtime.GOOS}, nil
-}
-
 func computePrivileges(c types.PluginConfig) types.PluginPrivileges {
 	var privileges types.PluginPrivileges
 	if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
@@ -217,37 +160,53 @@ func computePrivileges(c types.PluginConfig) types.PluginPrivileges {
 
 // Privileges pulls a plugin config and computes the privileges required to install it.
 func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
-	// create image store instance
-	cs := &tempConfigStore{}
-
-	// DownloadManager not defined because only pulling configuration.
-	pluginPullConfig := &distribution.ImagePullConfig{
-		Config: distribution.Config{
-			MetaHeaders:      metaHeader,
-			AuthConfig:       authConfig,
-			RegistryService:  pm.config.RegistryService,
-			ImageEventLogger: func(string, string, string) {},
-			ImageStore:       cs,
-		},
-		Schema2Types: distribution.PluginTypes,
-	}
-
-	if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil {
-		return nil, err
+	var (
+		config     types.PluginConfig
+		configSeen bool
+	)
+
+	h := func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		switch desc.MediaType {
+		case schema2.MediaTypeManifest, specs.MediaTypeImageManifest:
+			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
+			if err != nil {
+				return nil, errors.Wrapf(err, "error reading image manifest from blob store for %s", ref)
+			}
+
+			var m specs.Manifest
+			if err := json.Unmarshal(data, &m); err != nil {
+				return nil, errors.Wrapf(err, "error unmarshaling image manifest for %s", ref)
+			}
+			return []specs.Descriptor{m.Config}, nil
+		case schema2.MediaTypePluginConfig:
+			configSeen = true
+			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
+			if err != nil {
+				return nil, errors.Wrapf(err, "error reading plugin config from blob store for %s", ref)
+			}
+
+			if err := json.Unmarshal(data, &config); err != nil {
+				return nil, errors.Wrapf(err, "error unmarshaling plugin config for %s", ref)
+			}
+		}
+
+		return nil, nil
 	}
 
-	if cs.config == nil {
-		return nil, errors.New("no configuration pulled")
+	if err := pm.fetch(ctx, ref, authConfig, progress.DiscardOutput(), metaHeader, images.HandlerFunc(h)); err != nil {
+		return types.PluginPrivileges{}, nil
 	}
-	var config types.PluginConfig
-	if err := json.Unmarshal(cs.config, &config); err != nil {
-		return nil, errdefs.System(err)
+
+	if !configSeen {
+		return types.PluginPrivileges{}, errors.Errorf("did not find plugin config for specified reference %s", ref)
 	}
 
 	return computePrivileges(config), nil
 }
 
 // Upgrade upgrades a plugin
+//
+// TODO: replace reference package usage with simpler url.Parse semantics
 func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
 	p, err := pm.config.Store.GetV2Plugin(name)
 	if err != nil {
@@ -258,44 +217,35 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
 		return errors.Wrap(enabledError(p.Name()), "plugin must be disabled before upgrading")
 	}
 
-	pm.muGC.RLock()
-	defer pm.muGC.RUnlock()
-
 	// revalidate because Pull is public
 	if _, err := reference.ParseNormalizedNamed(name); err != nil {
 		return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name)
 	}
 
+	pm.muGC.RLock()
+	defer pm.muGC.RUnlock()
+
 	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
 	if err != nil {
-		return errors.Wrap(errdefs.System(err), "error preparing upgrade")
+		return errors.Wrap(err, "error creating tmp dir for plugin rootfs")
 	}
-	defer os.RemoveAll(tmpRootFSDir)
 
-	dm := &downloadManager{
-		tmpDir:    tmpRootFSDir,
-		blobStore: pm.blobStore,
-	}
+	var md fetchMeta
 
-	pluginPullConfig := &distribution.ImagePullConfig{
-		Config: distribution.Config{
-			MetaHeaders:      metaHeader,
-			AuthConfig:       authConfig,
-			RegistryService:  pm.config.RegistryService,
-			ImageEventLogger: pm.config.LogPluginEvent,
-			ImageStore:       dm,
-		},
-		DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
-		Schema2Types:    distribution.PluginTypes,
+	ctx, cancel := context.WithCancel(ctx)
+	out, waitProgress := setupProgressOutput(outStream, cancel)
+	defer waitProgress()
+
+	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
+		return err
 	}
+	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
 
-	err = pm.pull(ctx, ref, pluginPullConfig, outStream)
-	if err != nil {
-		go pm.GC()
+	if err := validateFetchedMetadata(md); err != nil {
 		return err
 	}
 
-	if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil {
+	if err := pm.upgradePlugin(p, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges); err != nil {
 		return err
 	}
 	p.PluginObj.PluginReference = ref.String()
@@ -303,6 +253,8 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
 }
 
 // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
+//
+// TODO: replace reference package usage with simpler url.Parse semantics
 func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) {
 	pm.muGC.RLock()
 	defer pm.muGC.RUnlock()
@@ -320,30 +272,22 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
 
 	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
 	if err != nil {
-		return errors.Wrap(errdefs.System(err), "error preparing pull")
+		return errors.Wrap(errdefs.System(err), "error preparing upgrade")
 	}
 	defer os.RemoveAll(tmpRootFSDir)
 
-	dm := &downloadManager{
-		tmpDir:    tmpRootFSDir,
-		blobStore: pm.blobStore,
-	}
+	var md fetchMeta
 
-	pluginPullConfig := &distribution.ImagePullConfig{
-		Config: distribution.Config{
-			MetaHeaders:      metaHeader,
-			AuthConfig:       authConfig,
-			RegistryService:  pm.config.RegistryService,
-			ImageEventLogger: pm.config.LogPluginEvent,
-			ImageStore:       dm,
-		},
-		DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
-		Schema2Types:    distribution.PluginTypes,
+	ctx, cancel := context.WithCancel(ctx)
+	out, waitProgress := setupProgressOutput(outStream, cancel)
+	defer waitProgress()
+
+	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
+		return err
 	}
+	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
 
-	err = pm.pull(ctx, ref, pluginPullConfig, outStream)
-	if err != nil {
-		go pm.GC()
+	if err := validateFetchedMetadata(md); err != nil {
 		return err
 	}
 
@@ -354,12 +298,14 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
 	optsList = append(optsList, opts...)
 	optsList = append(optsList, refOpt)
 
-	p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...)
+	// TODO: tmpRootFSDir is empty but should have layers in it
+	p, err := pm.createPlugin(name, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges, optsList...)
 	if err != nil {
 		return err
 	}
 
 	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
+
 	return nil
 }
 
@@ -404,7 +350,7 @@ next:
 	return out, nil
 }
 
-// Push pushes a plugin to the store.
+// Push pushes a plugin to the registry.
 func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
 	p, err := pm.config.Store.GetV2Plugin(name)
 	if err != nil {
@@ -416,201 +362,197 @@ func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header
 		return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
 	}
 
-	var po progress.Output
-	if outStream != nil {
-		// Include a buffer so that slow client connections don't affect
-		// transfer performance.
-		progressChan := make(chan progress.Progress, 100)
-
-		writesDone := make(chan struct{})
+	statusTracker := docker.NewInMemoryTracker()
 
-		defer func() {
-			close(progressChan)
-			<-writesDone
-		}()
-
-		var cancelFunc context.CancelFunc
-		ctx, cancelFunc = context.WithCancel(ctx)
+	resolver, err := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, false)
+	if err != nil {
+		return err
+	}
 
-		go func() {
-			progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
-			close(writesDone)
-		}()
+	pusher, err := resolver.Pusher(ctx, ref.String())
+	if err != nil {
 
-		po = progress.ChanOutput(progressChan)
-	} else {
-		po = progress.DiscardOutput()
+		return errors.Wrap(err, "error creating plugin pusher")
 	}
 
-	// TODO: replace these with manager
-	is := &pluginConfigStore{
-		pm:     pm,
-		plugin: p,
-	}
-	lss := make(map[string]distribution.PushLayerProvider)
-	lss[runtime.GOOS] = &pluginLayerProvider{
-		pm:     pm,
-		plugin: p,
-	}
-	rs := &pluginReference{
-		name:     ref,
-		pluginID: p.Config,
-	}
+	pj := newPushJobs(statusTracker)
+
+	ctx, cancel := context.WithCancel(ctx)
+	out, waitProgress := setupProgressOutput(outStream, cancel)
+	defer waitProgress()
 
-	uploadManager := xfer.NewLayerUploadManager(3)
+	progressHandler := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		logrus.WithField("mediaType", desc.MediaType).WithField("digest", desc.Digest.String()).Debug("Preparing to push plugin layer")
+		id := stringid.TruncateID(desc.Digest.String())
+		pj.add(remotes.MakeRefKey(ctx, desc), id)
+		progress.Update(out, id, "Preparing")
+		return nil, nil
+	})
 
-	imagePushConfig := &distribution.ImagePushConfig{
-		Config: distribution.Config{
-			MetaHeaders:      metaHeader,
-			AuthConfig:       authConfig,
-			ProgressOutput:   po,
-			RegistryService:  pm.config.RegistryService,
-			ReferenceStore:   rs,
-			ImageEventLogger: pm.config.LogPluginEvent,
-			ImageStore:       is,
-			RequireSchema2:   true,
-		},
-		ConfigMediaType: schema2.MediaTypePluginConfig,
-		LayerStores:     lss,
-		UploadManager:   uploadManager,
+	desc, err := pm.getManifestDescriptor(ctx, p)
+	if err != nil {
+		return errors.Wrap(err, "error reading plugin manifest")
 	}
 
-	return distribution.Push(ctx, ref, imagePushConfig)
-}
+	progress.Messagef(out, "", "The push refers to repository [%s]", reference.FamiliarName(ref))
 
-type pluginReference struct {
-	name     reference.Named
-	pluginID digest.Digest
-}
+	// TODO: If a layer already exists on the registry, the progress output just says "Preparing"
+	go func() {
+		timer := time.NewTimer(100 * time.Millisecond)
+		defer timer.Stop()
+		if !timer.Stop() {
+			<-timer.C
+		}
+		var statuses []contentStatus
+		for {
+			timer.Reset(100 * time.Millisecond)
+			select {
+			case <-ctx.Done():
+				return
+			case <-timer.C:
+				statuses = pj.status()
+			}
 
-func (r *pluginReference) References(id digest.Digest) []reference.Named {
-	if r.pluginID != id {
-		return nil
-	}
-	return []reference.Named{r.name}
-}
+			for _, s := range statuses {
+				out.WriteProgress(progress.Progress{ID: s.Ref, Current: s.Offset, Total: s.Total, Action: s.Status, LastUpdate: s.Offset == s.Total})
+			}
+		}
+	}()
 
-func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association {
-	return []refstore.Association{
-		{
-			Ref: r.name,
-			ID:  r.pluginID,
-		},
+	// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
+	ctx = docker.WithScope(ctx, scope(ref, true))
+	if err := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
+		return images.Handlers(progressHandler, h)
+	}); err != nil {
+		// Try fallback to http.
+		// This is needed because the containerd pusher will only attempt the first registry config we pass, which would
+		// typically be https.
+		// If there are no http-only host configs found we'll error out anyway.
+		resolver, _ := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, true)
+		if resolver != nil {
+			pusher, _ := resolver.Pusher(ctx, ref.String())
+			if pusher != nil {
+				logrus.WithField("ref", ref).Debug("Re-attmpting push with http-fallback")
+				err2 := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
+					return images.Handlers(progressHandler, h)
+				})
+				if err2 == nil {
+					err = nil
+				} else {
+					logrus.WithError(err2).WithField("ref", ref).Debug("Error while attempting push with http-fallback")
+				}
+			}
+		}
+		if err != nil {
+			return errors.Wrap(err, "error pushing plugin")
+		}
 	}
-}
 
-func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) {
-	if r.name.String() != ref.String() {
-		return digest.Digest(""), refstore.ErrDoesNotExist
+	// For blobs that already exist in the registry we need to make sure to update the progress otherwise it will just say "pending"
+	// TODO: How to check if the layer already exists? Is it worth it?
+	for _, j := range pj.jobs {
+		progress.Update(out, pj.names[j], "Upload complete")
 	}
-	return r.pluginID, nil
-}
 
-func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error {
-	// Read only, ignore
-	return nil
-}
-func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error {
-	// Read only, ignore
+	// Signal the client for content trust verification
+	progress.Aux(out, types.PushResult{Tag: ref.(reference.Tagged).Tag(), Digest: desc.Digest.String(), Size: int(desc.Size)})
+
 	return nil
 }
-func (r *pluginReference) Delete(ref reference.Named) (bool, error) {
-	// Read only, ignore
-	return false, nil
-}
 
-type pluginConfigStore struct {
-	pm     *Manager
-	plugin *v2.Plugin
+// manifest wraps an OCI manifest, because...
+// Historically the registry does not support plugins unless the media type on the manifest is specifically schema2.MediaTypeManifest
+// So the OCI manifest media type is not supported.
+// Additionally, there is extra validation for the docker schema2 manifest than there is a mediatype set on the manifest itself
+// even though this is set on the descriptor
+// The OCI types do not have this field.
+type manifest struct {
+	specs.Manifest
+	MediaType string `json:"mediaType,omitempty"`
 }
 
-func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) {
-	return digest.Digest(""), errors.New("cannot store config on push")
-}
+func buildManifest(ctx context.Context, s content.Manager, config digest.Digest, layers []digest.Digest) (manifest, error) {
+	var m manifest
+	m.MediaType = images.MediaTypeDockerSchema2Manifest
+	m.SchemaVersion = 2
 
-func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) {
-	if s.plugin.Config != d {
-		return nil, errors.New("plugin not found")
-	}
-	rwc, err := s.pm.blobStore.Get(d)
+	configInfo, err := s.Info(ctx, config)
 	if err != nil {
-		return nil, err
+		return m, errors.Wrapf(err, "error reading plugin config content for digest %s", config)
+	}
+	m.Config = specs.Descriptor{
+		MediaType: mediaTypePluginConfig,
+		Size:      configInfo.Size,
+		Digest:    configInfo.Digest,
 	}
-	defer rwc.Close()
-	return ioutil.ReadAll(rwc)
-}
-
-func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
-	return configToRootFS(c)
-}
-
-func (s *pluginConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) {
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
-	return &specs.Platform{OS: runtime.GOOS}, nil
-}
 
-type pluginLayerProvider struct {
-	pm     *Manager
-	plugin *v2.Plugin
+	for _, l := range layers {
+		info, err := s.Info(ctx, l)
+		if err != nil {
+			return m, errors.Wrapf(err, "error fetching info for content digest %s", l)
+		}
+		m.Layers = append(m.Layers, specs.Descriptor{
+			MediaType: specs.MediaTypeImageLayerGzip, // TODO: This is assuming everything is a gzip compressed layer, but that may not be true.
+			Digest:    l,
+			Size:      info.Size,
+		})
+	}
+	return m, nil
 }
 
-func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) {
-	rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs)
-	var i int
-	for i = 1; i <= len(rootFS.DiffIDs); i++ {
-		if layer.CreateChainID(rootFS.DiffIDs[:i]) == id {
-			break
+// getManifestDescriptor gets the OCI descriptor for a manifest
+// It will generate a manifest if one does not exist
+func (pm *Manager) getManifestDescriptor(ctx context.Context, p *v2.Plugin) (specs.Descriptor, error) {
+	logger := logrus.WithField("plugin", p.Name()).WithField("digest", p.Manifest)
+	if p.Manifest != "" {
+		info, err := pm.blobStore.Info(ctx, p.Manifest)
+		if err == nil {
+			desc := specs.Descriptor{
+				Size:      info.Size,
+				Digest:    info.Digest,
+				MediaType: images.MediaTypeDockerSchema2Manifest,
+			}
+			return desc, nil
 		}
+		logger.WithError(err).Debug("Could not find plugin manifest in content store")
+	} else {
+		logger.Info("Plugin does not have manifest digest")
 	}
-	if i > len(rootFS.DiffIDs) {
-		return nil, errors.New("layer not found")
-	}
-	return &pluginLayer{
-		pm:      p.pm,
-		diffIDs: rootFS.DiffIDs[:i],
-		blobs:   p.plugin.Blobsums[:i],
-	}, nil
-}
+	logger.Info("Building a new plugin manifest")
 
-type pluginLayer struct {
-	pm      *Manager
-	diffIDs []layer.DiffID
-	blobs   []digest.Digest
-}
+	manifest, err := buildManifest(ctx, pm.blobStore, p.Config, p.Blobsums)
+	if err != nil {
+		return specs.Descriptor{}, err
+	}
 
-func (l *pluginLayer) ChainID() layer.ChainID {
-	return layer.CreateChainID(l.diffIDs)
-}
+	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
+	if err != nil {
+		return desc, err
+	}
 
-func (l *pluginLayer) DiffID() layer.DiffID {
-	return l.diffIDs[len(l.diffIDs)-1]
+	if err := pm.save(p); err != nil {
+		logger.WithError(err).Error("Could not save plugin with manifest digest")
+	}
+	return desc, nil
 }
 
-func (l *pluginLayer) Parent() distribution.PushLayer {
-	if len(l.diffIDs) == 1 {
-		return nil
+func writeManifest(ctx context.Context, cs content.Store, m *manifest) (specs.Descriptor, error) {
+	platform := platforms.DefaultSpec()
+	desc := specs.Descriptor{
+		MediaType: images.MediaTypeDockerSchema2Manifest,
+		Platform:  &platform,
 	}
-	return &pluginLayer{
-		pm:      l.pm,
-		diffIDs: l.diffIDs[:len(l.diffIDs)-1],
-		blobs:   l.blobs[:len(l.diffIDs)-1],
+	data, err := json.Marshal(m)
+	if err != nil {
+		return desc, errors.Wrap(err, "error encoding manifest")
 	}
-}
-
-func (l *pluginLayer) Open() (io.ReadCloser, error) {
-	return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1])
-}
-
-func (l *pluginLayer) Size() (int64, error) {
-	return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1])
-}
-
-func (l *pluginLayer) MediaType() string {
-	return schema2.MediaTypeLayer
-}
+	desc.Digest = digest.FromBytes(data)
+	desc.Size = int64(len(data))
 
-func (l *pluginLayer) Release() {
-	// Nothing needs to be release, no references held
+	if err := content.WriteBlob(ctx, cs, remotes.MakeRefKey(ctx, desc), bytes.NewReader(data), desc); err != nil {
+		return desc, errors.Wrap(err, "error writing plugin manifest")
+	}
+	return desc, nil
 }
 
 // Remove deletes plugin's root directory.
@@ -700,14 +642,14 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
 	var configJSON []byte
 	rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
 
-	rootFSBlob, err := pm.blobStore.New()
+	rootFSBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name))
 	if err != nil {
 		return err
 	}
 	defer rootFSBlob.Close()
+
 	gzw := gzip.NewWriter(rootFSBlob)
-	layerDigester := digest.Canonical.Digester()
-	rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash()))
+	rootFSReader := io.TeeReader(rootFS, gzw)
 
 	if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
 		return err
@@ -736,8 +678,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
 	pm.mu.Lock()
 	defer pm.mu.Unlock()
 
-	rootFSBlobsum, err := rootFSBlob.Commit()
-	if err != nil {
+	if err := rootFSBlob.Commit(ctx, 0, ""); err != nil {
 		return err
 	}
 	defer func() {
@@ -748,12 +689,12 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
 
 	config.Rootfs = &types.PluginConfigRootfs{
 		Type:    "layers",
-		DiffIds: []string{layerDigester.Digest().String()},
+		DiffIds: []string{rootFSBlob.Digest().String()},
 	}
 
 	config.DockerVersion = dockerversion.Version
 
-	configBlob, err := pm.blobStore.New()
+	configBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name+"-config.json"))
 	if err != nil {
 		return err
 	}
@@ -761,12 +702,23 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
 	if err := json.NewEncoder(configBlob).Encode(config); err != nil {
 		return errors.Wrap(err, "error encoding json config")
 	}
-	configBlobsum, err := configBlob.Commit()
+	if err := configBlob.Commit(ctx, 0, ""); err != nil {
+		return err
+	}
+
+	configDigest := configBlob.Digest()
+	layers := []digest.Digest{rootFSBlob.Digest()}
+
+	manifest, err := buildManifest(ctx, pm.blobStore, configDigest, layers)
 	if err != nil {
 		return err
 	}
+	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
+	if err != nil {
+		return
+	}
 
-	p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil)
+	p, err := pm.createPlugin(name, configDigest, desc.Digest, layers, tmpRootFSDir, nil)
 	if err != nil {
 		return err
 	}

+ 0 - 190
plugin/blobstore.go

@@ -1,190 +0,0 @@
-package plugin // import "github.com/docker/docker/plugin"
-
-import (
-	"context"
-	"fmt"
-	"io"
-	"io/ioutil"
-	"os"
-	"path/filepath"
-	"runtime"
-
-	"github.com/docker/docker/distribution/xfer"
-	"github.com/docker/docker/image"
-	"github.com/docker/docker/layer"
-	"github.com/docker/docker/pkg/archive"
-	"github.com/docker/docker/pkg/chrootarchive"
-	"github.com/docker/docker/pkg/progress"
-	digest "github.com/opencontainers/go-digest"
-	specs "github.com/opencontainers/image-spec/specs-go/v1"
-	"github.com/pkg/errors"
-	"github.com/sirupsen/logrus"
-)
-
-type blobstore interface {
-	New() (WriteCommitCloser, error)
-	Get(dgst digest.Digest) (io.ReadCloser, error)
-	Size(dgst digest.Digest) (int64, error)
-}
-
-type basicBlobStore struct {
-	path string
-}
-
-func newBasicBlobStore(p string) (*basicBlobStore, error) {
-	tmpdir := filepath.Join(p, "tmp")
-	if err := os.MkdirAll(tmpdir, 0700); err != nil {
-		return nil, errors.Wrapf(err, "failed to mkdir %v", p)
-	}
-	return &basicBlobStore{path: p}, nil
-}
-
-func (b *basicBlobStore) New() (WriteCommitCloser, error) {
-	f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
-	if err != nil {
-		return nil, errors.Wrap(err, "failed to create temp file")
-	}
-	return newInsertion(f), nil
-}
-
-func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
-	return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
-}
-
-func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
-	stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
-	if err != nil {
-		return 0, err
-	}
-	return stat.Size(), nil
-}
-
-func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
-	for _, alg := range []string{string(digest.Canonical)} {
-		items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
-		if err != nil {
-			continue
-		}
-		for _, fi := range items {
-			if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
-				p := filepath.Join(b.path, alg, fi.Name())
-				err := os.RemoveAll(p)
-				logrus.Debugf("cleaned up blob %v: %v", p, err)
-			}
-		}
-	}
-
-}
-
-// WriteCommitCloser defines object that can be committed to blobstore.
-type WriteCommitCloser interface {
-	io.WriteCloser
-	Commit() (digest.Digest, error)
-}
-
-type insertion struct {
-	io.Writer
-	f        *os.File
-	digester digest.Digester
-	closed   bool
-}
-
-func newInsertion(tempFile *os.File) *insertion {
-	digester := digest.Canonical.Digester()
-	return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
-}
-
-func (i *insertion) Commit() (digest.Digest, error) {
-	p := i.f.Name()
-	d := filepath.Join(filepath.Join(p, "../../"))
-	i.f.Sync()
-	defer os.RemoveAll(p)
-	if err := i.f.Close(); err != nil {
-		return "", err
-	}
-	i.closed = true
-	dgst := i.digester.Digest()
-	if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
-		return "", errors.Wrapf(err, "failed to mkdir %v", d)
-	}
-	if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
-		return "", errors.Wrapf(err, "failed to rename %v", p)
-	}
-	return dgst, nil
-}
-
-func (i *insertion) Close() error {
-	if i.closed {
-		return nil
-	}
-	defer os.RemoveAll(i.f.Name())
-	return i.f.Close()
-}
-
-type downloadManager struct {
-	blobStore    blobstore
-	tmpDir       string
-	blobs        []digest.Digest
-	configDigest digest.Digest
-}
-
-func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
-	for _, l := range layers {
-		b, err := dm.blobStore.New()
-		if err != nil {
-			return initialRootFS, nil, err
-		}
-		defer b.Close()
-		rc, _, err := l.Download(ctx, progressOutput)
-		if err != nil {
-			return initialRootFS, nil, errors.Wrap(err, "failed to download")
-		}
-		defer rc.Close()
-		r := io.TeeReader(rc, b)
-		inflatedLayerData, err := archive.DecompressStream(r)
-		if err != nil {
-			return initialRootFS, nil, err
-		}
-		defer inflatedLayerData.Close()
-		digester := digest.Canonical.Digester()
-		if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
-			return initialRootFS, nil, err
-		}
-		initialRootFS.Append(layer.DiffID(digester.Digest()))
-		d, err := b.Commit()
-		if err != nil {
-			return initialRootFS, nil, err
-		}
-		dm.blobs = append(dm.blobs, d)
-	}
-	return initialRootFS, nil, nil
-}
-
-func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
-	b, err := dm.blobStore.New()
-	if err != nil {
-		return "", err
-	}
-	defer b.Close()
-	n, err := b.Write(dt)
-	if err != nil {
-		return "", err
-	}
-	if n != len(dt) {
-		return "", io.ErrShortWrite
-	}
-	d, err := b.Commit()
-	dm.configDigest = d
-	return d, err
-}
-
-func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
-	return nil, fmt.Errorf("digest not found")
-}
-func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
-	return configToRootFS(c)
-}
-func (dm *downloadManager) PlatformFromConfig(c []byte) (*specs.Platform, error) {
-	// TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS
-	return &specs.Platform{OS: runtime.GOOS}, nil
-}

+ 288 - 0
plugin/fetch_linux.go

@@ -0,0 +1,288 @@
+package plugin
+
+import (
+	"context"
+	"io"
+	"net/http"
+	"time"
+
+	"github.com/containerd/containerd/content"
+	c8derrdefs "github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/images"
+	"github.com/containerd/containerd/remotes"
+	"github.com/containerd/containerd/remotes/docker"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/api/types"
+	progressutils "github.com/docker/docker/distribution/utils"
+	"github.com/docker/docker/pkg/chrootarchive"
+	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/progress"
+	"github.com/docker/docker/pkg/stringid"
+	digest "github.com/opencontainers/go-digest"
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+)
+
+const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
+
+// setupProgressOutput sets up the passed in writer to stream progress.
+//
+// The passed in cancel function is used by the progress writer to signal callers that there
+// is an issue writing to the stream.
+//
+// The returned function is used to wait for the progress writer to be finished.
+// Call it to make sure the progress writer is done before returning from your function as needed.
+func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) {
+	var out progress.Output
+	f := func() {}
+
+	if outStream != nil {
+		ch := make(chan progress.Progress, 100)
+		out = progress.ChanOutput(ch)
+
+		ctx, retCancel := context.WithCancel(context.Background())
+		go func() {
+			progressutils.WriteDistributionProgress(cancel, outStream, ch)
+			retCancel()
+		}()
+
+		f = func() {
+			close(ch)
+			<-ctx.Done()
+		}
+	} else {
+		out = progress.DiscardOutput()
+	}
+	return out, f
+}
+
+// fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers
+// There is no need to use remotes.FetchHandler since it already gets set
+func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *types.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) {
+	// We need to make sure we have a domain on the reference
+	withDomain, err := reference.ParseNormalizedNamed(ref.String())
+	if err != nil {
+		return errors.Wrap(err, "error parsing plugin image reference")
+	}
+
+	// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
+	ctx = docker.WithScope(ctx, scope(ref, false))
+
+	// Make sure the fetch handler knows how to set a ref key for the plugin media type.
+	// Without this the ref key is "unknown" and we see a nasty warning message in the logs
+	ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin")
+
+	resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false)
+	if err != nil {
+		return err
+	}
+	resolved, desc, err := resolver.Resolve(ctx, withDomain.String())
+	if err != nil {
+		// This is backwards compatible with older versions of the distribution registry.
+		// The containerd client will add it's own accept header as a comma separated list of supported manifests.
+		// This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list,
+		//   so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the
+		//   fallback does not support plugin configs...
+		logrus.WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format")
+		headers := http.Header{}
+		headers.Add("Accept", images.MediaTypeDockerSchema2Manifest)
+		headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList)
+		headers.Add("Accept", specs.MediaTypeImageManifest)
+		headers.Add("Accept", specs.MediaTypeImageIndex)
+		resolver, _ = pm.newResolver(ctx, nil, auth, headers, false)
+		if resolver != nil {
+			resolved, desc, err = resolver.Resolve(ctx, withDomain.String())
+			if err != nil {
+				logrus.WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format")
+			}
+		}
+		if err != nil {
+			return errors.Wrap(err, "error resolving plugin reference")
+		}
+	}
+
+	fetcher, err := resolver.Fetcher(ctx, resolved)
+	if err != nil {
+		return errors.Wrap(err, "error creating plugin image fetcher")
+	}
+
+	fp := withFetchProgress(pm.blobStore, out, ref)
+	handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...)
+	if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
+		return err
+	}
+	return nil
+}
+
+// applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory.
+//
+// TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however
+// if there are multiple layers to fetch we may end up extracting layers in the wrong
+// order.
+func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc {
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		switch desc.MediaType {
+		case
+			specs.MediaTypeImageLayer,
+			images.MediaTypeDockerSchema2Layer,
+			specs.MediaTypeImageLayerGzip,
+			images.MediaTypeDockerSchema2LayerGzip:
+		default:
+			return nil, nil
+		}
+
+		ra, err := cs.ReaderAt(ctx, desc)
+		if err != nil {
+			return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest)
+		}
+
+		id := stringid.TruncateID(desc.Digest.String())
+
+		rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close)
+		pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting")
+		defer pr.Close()
+
+		if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil {
+			return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest)
+		}
+		progress.Update(out, id, "Complete")
+		return nil, nil
+	}
+}
+
+func childrenHandler(cs content.Store) images.HandlerFunc {
+	ch := images.ChildrenHandler(cs)
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		switch desc.MediaType {
+		case mediaTypePluginConfig:
+			return nil, nil
+		default:
+			return ch(ctx, desc)
+		}
+	}
+}
+
+type fetchMeta struct {
+	blobs    []digest.Digest
+	config   digest.Digest
+	manifest digest.Digest
+}
+
+func storeFetchMetadata(m *fetchMeta) images.HandlerFunc {
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		switch desc.MediaType {
+		case
+			images.MediaTypeDockerSchema2LayerForeignGzip,
+			images.MediaTypeDockerSchema2Layer,
+			specs.MediaTypeImageLayer,
+			specs.MediaTypeImageLayerGzip:
+			m.blobs = append(m.blobs, desc.Digest)
+		case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
+			m.manifest = desc.Digest
+		case mediaTypePluginConfig:
+			m.config = desc.Digest
+		}
+		return nil, nil
+	}
+}
+
+func validateFetchedMetadata(md fetchMeta) error {
+	if md.config == "" {
+		return errors.New("fetched plugin image but plugin config is missing")
+	}
+	if md.manifest == "" {
+		return errors.New("fetched plugin image but manifest is missing")
+	}
+	return nil
+}
+
+// withFetchProgress is a fetch handler which registers a descriptor with a progress
+func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc {
+	return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		switch desc.MediaType {
+		case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
+			tn := reference.TagNameOnly(ref)
+			tagged := tn.(reference.Tagged)
+			progress.Messagef(out, tagged.Tag(), "Pulling from %s", reference.FamiliarName(ref))
+			progress.Messagef(out, "", "Digest: %s", desc.Digest.String())
+			return nil, nil
+		case
+			images.MediaTypeDockerSchema2LayerGzip,
+			images.MediaTypeDockerSchema2Layer,
+			specs.MediaTypeImageLayer,
+			specs.MediaTypeImageLayerGzip:
+		default:
+			return nil, nil
+		}
+
+		id := stringid.TruncateID(desc.Digest.String())
+
+		if _, err := cs.Info(ctx, desc.Digest); err == nil {
+			out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true})
+			return nil, nil
+		}
+
+		progress.Update(out, id, "Waiting")
+
+		key := remotes.MakeRefKey(ctx, desc)
+
+		go func() {
+			timer := time.NewTimer(100 * time.Millisecond)
+			if !timer.Stop() {
+				<-timer.C
+			}
+			defer timer.Stop()
+
+			var pulling bool
+			var ctxErr error
+
+			for {
+				timer.Reset(100 * time.Millisecond)
+
+				select {
+				case <-ctx.Done():
+					ctxErr = ctx.Err()
+					// make sure we can still fetch from the content store
+					// TODO: Might need to add some sort of timeout
+					ctx = context.Background()
+				case <-timer.C:
+				}
+
+				s, err := cs.Status(ctx, key)
+				if err != nil {
+					if !c8derrdefs.IsNotFound(err) {
+						logrus.WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull")
+						progress.Update(out, id, err.Error())
+						return
+					}
+
+					if _, err := cs.Info(ctx, desc.Digest); err == nil {
+						progress.Update(out, id, "Download complete")
+						return
+					}
+
+					if ctxErr != nil {
+						progress.Update(out, id, ctxErr.Error())
+						return
+					}
+
+					continue
+				}
+
+				if !pulling {
+					progress.Update(out, id, "Pulling fs layer")
+					pulling = true
+				}
+
+				if s.Offset == s.Total {
+					out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true})
+					return
+				}
+
+				out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total})
+			}
+		}()
+		return nil, nil
+	}
+}

+ 15 - 31
plugin/manager.go

@@ -1,6 +1,7 @@
 package plugin // import "github.com/docker/docker/plugin"
 
 import (
+	"context"
 	"encoding/json"
 	"io"
 	"io/ioutil"
@@ -12,10 +13,10 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/content/local"
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types"
-	"github.com/docker/docker/image"
-	"github.com/docker/docker/layer"
 	"github.com/docker/docker/pkg/authorization"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/pubsub"
@@ -72,7 +73,7 @@ type Manager struct {
 	mu        sync.RWMutex // protects cMap
 	muGC      sync.RWMutex // protects blobstore deletions
 	cMap      map[*v2.Plugin]*controller
-	blobStore *basicBlobStore
+	blobStore content.Store
 	publisher *pubsub.Publisher
 	executor  Executor
 }
@@ -117,9 +118,9 @@ func NewManager(config ManagerConfig) (*Manager, error) {
 		return nil, err
 	}
 
-	manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
+	manager.blobStore, err = local.NewStore(filepath.Join(manager.config.Root, "storage"))
 	if err != nil {
-		return nil, err
+		return nil, errors.Wrap(err, "error creating plugin blob store")
 	}
 
 	manager.cMap = make(map[*v2.Plugin]*controller)
@@ -305,7 +306,15 @@ func (pm *Manager) GC() {
 		}
 	}
 
-	pm.blobStore.gc(whitelist)
+	ctx := context.TODO()
+	pm.blobStore.Walk(ctx, func(info content.Info) error {
+		_, ok := whitelist[info.Digest]
+		if ok {
+			return nil
+		}
+
+		return pm.blobStore.Delete(ctx, info.Digest)
+	})
 }
 
 type logHook struct{ id string }
@@ -357,28 +366,3 @@ func isEqualPrivilege(a, b types.PluginPrivilege) bool {
 
 	return reflect.DeepEqual(a.Value, b.Value)
 }
-
-func configToRootFS(c []byte) (*image.RootFS, error) {
-	var pluginConfig types.PluginConfig
-	if err := json.Unmarshal(c, &pluginConfig); err != nil {
-		return nil, err
-	}
-	// validation for empty rootfs is in distribution code
-	if pluginConfig.Rootfs == nil {
-		return nil, nil
-	}
-
-	return rootFSFromPlugin(pluginConfig.Rootfs), nil
-}
-
-func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
-	rootFS := image.RootFS{
-		Type:    pluginfs.Type,
-		DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
-	}
-	for i := range pluginfs.DiffIds {
-		rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
-	}
-
-	return &rootFS
-}

+ 12 - 5
plugin/manager_linux.go

@@ -1,12 +1,14 @@
 package plugin // import "github.com/docker/docker/plugin"
 
 import (
+	"context"
 	"encoding/json"
 	"net"
 	"os"
 	"path/filepath"
 	"time"
 
+	"github.com/containerd/containerd/content"
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/daemon/initlayer"
 	"github.com/docker/docker/errdefs"
@@ -17,6 +19,7 @@ import (
 	v2 "github.com/docker/docker/plugin/v2"
 	"github.com/moby/sys/mount"
 	digest "github.com/opencontainers/go-digest"
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 	"golang.org/x/sys/unix"
@@ -213,7 +216,7 @@ func (pm *Manager) Shutdown() {
 	}
 }
 
-func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) {
+func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) {
 	config, err := pm.setupNewPlugin(configDigest, blobsums, privileges)
 	if err != nil {
 		return err
@@ -261,19 +264,22 @@ func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobs
 	}
 
 	p.PluginObj.Config = config
+	p.Manifest = manifestDigest
 	err = pm.save(p)
 	return errors.Wrap(err, "error saving upgraded plugin config")
 }
 
 func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.Digest, privileges *types.PluginPrivileges) (types.PluginConfig, error) {
-	configRC, err := pm.blobStore.Get(configDigest)
+	configRA, err := pm.blobStore.ReaderAt(context.TODO(), specs.Descriptor{Digest: configDigest})
 	if err != nil {
 		return types.PluginConfig{}, err
 	}
-	defer configRC.Close()
+	defer configRA.Close()
+
+	configR := content.NewReader(configRA)
 
 	var config types.PluginConfig
-	dec := json.NewDecoder(configRC)
+	dec := json.NewDecoder(configR)
 	if err := dec.Decode(&config); err != nil {
 		return types.PluginConfig{}, errors.Wrapf(err, "failed to parse config")
 	}
@@ -292,7 +298,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.
 }
 
 // createPlugin creates a new plugin. take lock before calling.
-func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
+func (pm *Manager) createPlugin(name string, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
 	if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store
 		return nil, errdefs.InvalidParameter(err)
 	}
@@ -310,6 +316,7 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum
 		},
 		Config:   configDigest,
 		Blobsums: blobsums,
+		Manifest: manifestDigest,
 	}
 	p.InitEmptySettings()
 	for _, o := range opts {

+ 74 - 0
plugin/progress.go

@@ -0,0 +1,74 @@
+package plugin
+
+import (
+	"sync"
+	"time"
+
+	"github.com/containerd/containerd/remotes/docker"
+)
+
+func newPushJobs(tracker docker.StatusTracker) *pushJobs {
+	return &pushJobs{
+		names: make(map[string]string),
+		t:     tracker,
+	}
+}
+
+type pushJobs struct {
+	t docker.StatusTracker
+
+	mu   sync.Mutex
+	jobs []string
+	// maps job ref to a name
+	names map[string]string
+}
+
+func (p *pushJobs) add(id, name string) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	if _, ok := p.names[id]; ok {
+		return
+	}
+	p.jobs = append(p.jobs, id)
+	p.names[id] = name
+}
+
+func (p *pushJobs) status() []contentStatus {
+	statuses := make([]contentStatus, 0, len(p.jobs))
+
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	for _, j := range p.jobs {
+		var s contentStatus
+		s.Ref = p.names[j]
+
+		status, err := p.t.GetStatus(j)
+		if err != nil {
+			s.Status = "Waiting"
+		} else {
+			s.Total = status.Total
+			s.Offset = status.Offset
+			s.StartedAt = status.StartedAt
+			s.UpdatedAt = status.UpdatedAt
+			if status.UploadUUID == "" {
+				s.Status = "Upload complete"
+			} else {
+				s.Status = "Uploading"
+			}
+		}
+		statuses = append(statuses, s)
+	}
+
+	return statuses
+}
+
+type contentStatus struct {
+	Status    string
+	Total     int64
+	Offset    int64
+	StartedAt time.Time
+	UpdatedAt time.Time
+	Ref       string
+}

+ 111 - 0
plugin/registry.go

@@ -0,0 +1,111 @@
+package plugin
+
+import (
+	"context"
+	"crypto/tls"
+	"net"
+	"net/http"
+	"time"
+
+	"github.com/sirupsen/logrus"
+
+	"github.com/docker/docker/dockerversion"
+
+	"github.com/pkg/errors"
+
+	"github.com/containerd/containerd/remotes"
+	"github.com/containerd/containerd/remotes/docker"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/api/types"
+)
+
+// scope builds the correct auth scope for the registry client to authorize against
+// By default the client currently only does a "repository:" scope with out a classifier, e.g. "(plugin)"
+// Without this, the client will not be able to authorize the request
+func scope(ref reference.Named, push bool) string {
+	scope := "repository(plugin):" + reference.Path(reference.TrimNamed(ref)) + ":pull"
+	if push {
+		scope += ",push"
+	}
+	return scope
+}
+
+func (pm *Manager) newResolver(ctx context.Context, tracker docker.StatusTracker, auth *types.AuthConfig, headers http.Header, httpFallback bool) (remotes.Resolver, error) {
+	if headers == nil {
+		headers = http.Header{}
+	}
+	headers.Add("User-Agent", dockerversion.DockerUserAgent(ctx))
+
+	return docker.NewResolver(docker.ResolverOptions{
+		Tracker: tracker,
+		Headers: headers,
+		Hosts:   pm.registryHostsFn(auth, httpFallback),
+	}), nil
+}
+
+func registryHTTPClient(config *tls.Config) *http.Client {
+	return &http.Client{
+		Transport: &http.Transport{
+			Proxy: http.ProxyFromEnvironment,
+			DialContext: (&net.Dialer{
+				Timeout:   30 * time.Second,
+				KeepAlive: 30 * time.Second,
+			}).DialContext,
+			TLSClientConfig:     config,
+			TLSHandshakeTimeout: 10 * time.Second,
+			IdleConnTimeout:     30 * time.Second,
+		},
+	}
+}
+
+func (pm *Manager) registryHostsFn(auth *types.AuthConfig, httpFallback bool) docker.RegistryHosts {
+	return func(hostname string) ([]docker.RegistryHost, error) {
+		eps, err := pm.config.RegistryService.LookupPullEndpoints(hostname)
+		if err != nil {
+			return nil, errors.Wrapf(err, "error resolving repository for %s", hostname)
+		}
+
+		hosts := make([]docker.RegistryHost, 0, len(eps))
+
+		for _, ep := range eps {
+			// forced http fallback is used only for push since the containerd pusher only ever uses the first host we
+			// pass to it.
+			// So it is the callers responsibility to retry with this flag set.
+			if httpFallback && ep.URL.Scheme != "http" {
+				logrus.WithField("registryHost", hostname).WithField("endpoint", ep).Debugf("Skipping non-http endpoint")
+				continue
+			}
+
+			caps := docker.HostCapabilityPull | docker.HostCapabilityResolve
+			if !ep.Mirror {
+				caps = caps | docker.HostCapabilityPush
+			}
+
+			host, err := docker.DefaultHost(ep.URL.Host)
+			if err != nil {
+				return nil, err
+			}
+
+			client := registryHTTPClient(ep.TLSConfig)
+			hosts = append(hosts, docker.RegistryHost{
+				Host:         host,
+				Scheme:       ep.URL.Scheme,
+				Client:       client,
+				Path:         "/v2",
+				Capabilities: caps,
+				Authorizer: docker.NewDockerAuthorizer(
+					docker.WithAuthClient(client),
+					docker.WithAuthCreds(func(_ string) (string, string, error) {
+						if auth.IdentityToken != "" {
+							return "", auth.IdentityToken, nil
+						}
+						return auth.Username, auth.Password, nil
+					}),
+				),
+			})
+		}
+		logrus.WithField("registryHost", hostname).WithField("hosts", hosts).Debug("Resolved registry hosts")
+
+		return hosts, nil
+	}
+}

+ 1 - 0
plugin/v2/plugin.go

@@ -25,6 +25,7 @@ type Plugin struct {
 
 	Config   digest.Digest
 	Blobsums []digest.Digest
+	Manifest digest.Digest
 
 	modifyRuntimeSpec func(*specs.Spec)
 

+ 15 - 2
testutil/fixtures/plugin/plugin.go

@@ -26,7 +26,15 @@ type CreateOpt func(*Config)
 // create the plugin with.
 type Config struct {
 	*types.PluginConfig
-	binPath string
+	binPath        string
+	RegistryConfig registry.ServiceOptions
+}
+
+// WithInsecureRegistry specifies that the given registry can skip host-key checking as well as fall back to plain http
+func WithInsecureRegistry(url string) CreateOpt {
+	return func(cfg *Config) {
+		cfg.RegistryConfig.InsecureRegistries = append(cfg.RegistryConfig.InsecureRegistries, url)
+	}
 }
 
 // WithBinary is a CreateOpt to set an custom binary to create the plugin with.
@@ -82,6 +90,11 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig,
 		return errors.Wrap(err, "error creating plugin root")
 	}
 
+	var cfg Config
+	for _, o := range opts {
+		o(&cfg)
+	}
+
 	tar, err := makePluginBundle(inPath, opts...)
 	if err != nil {
 		return err
@@ -92,7 +105,7 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig,
 		return nil, nil
 	}
 
-	regService, err := registry.NewService(registry.ServiceOptions{})
+	regService, err := registry.NewService(cfg.RegistryConfig)
 	if err != nil {
 		return err
 	}

+ 16 - 0
testutil/registry/ops.go

@@ -1,5 +1,7 @@
 package registry
 
+import "io"
+
 // Schema1 sets the registry to serve v1 api
 func Schema1(c *Config) {
 	c.schema1 = true
@@ -24,3 +26,17 @@ func URL(registryURL string) func(*Config) {
 		c.registryURL = registryURL
 	}
 }
+
+// WithStdout sets the stdout of the registry command to the passed in writer.
+func WithStdout(w io.Writer) func(c *Config) {
+	return func(c *Config) {
+		c.stdout = w
+	}
+}
+
+// WithStderr sets the stdout of the registry command to the passed in writer.
+func WithStderr(w io.Writer) func(c *Config) {
+	return func(c *Config) {
+		c.stderr = w
+	}
+}

+ 5 - 0
testutil/registry/registry.go

@@ -2,6 +2,7 @@ package registry // import "github.com/docker/docker/testutil/registry"
 
 import (
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net/http"
 	"os"
@@ -40,6 +41,8 @@ type Config struct {
 	auth        string
 	tokenURL    string
 	registryURL string
+	stdout      io.Writer
+	stderr      io.Writer
 }
 
 // NewV2 creates a v2 registry server
@@ -109,6 +112,8 @@ http:
 		binary = V2binarySchema1
 	}
 	cmd := exec.Command(binary, confPath)
+	cmd.Stdout = c.stdout
+	cmd.Stderr = c.stderr
 	if err := cmd.Start(); err != nil {
 		// FIXME(vdemeester) use a defer/clean func
 		os.RemoveAll(tmp)