Merge pull request #29339 from dmcgowan/plugins-abstract-download-manager

Abstract layerstore from pull/push distribution code
This commit is contained in:
Tõnis Tiigi 2016-12-19 13:32:28 -08:00 committed by GitHub
commit aecb79ff98
20 changed files with 503 additions and 233 deletions

View file

@ -74,7 +74,7 @@ func runPull(dockerCli *command.DockerCli, opts pullOptions) error {
err = imagePullPrivileged(ctx, dockerCli, authConfig, distributionRef.String(), requestPrivilege, opts.all)
}
if err != nil {
if strings.Contains(err.Error(), "target is a plugin") {
if strings.Contains(err.Error(), "target is plugin") {
return errors.New(err.Error() + " - Use `docker plugin install`")
}
return err

View file

@ -89,15 +89,18 @@ func (daemon *Daemon) pullImageWithReference(ctx context.Context, ref reference.
}()
imagePullConfig := &distribution.ImagePullConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: daemon.imageStore,
ReferenceStore: daemon.referenceStore,
DownloadManager: daemon.downloadManager,
Config: distribution.Config{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore),
ReferenceStore: daemon.referenceStore,
},
DownloadManager: daemon.downloadManager,
Schema2Types: distribution.ImageTypes,
}
err := distribution.Pull(ctx, ref, imagePullConfig)

View file

@ -3,6 +3,7 @@ package daemon
import (
"io"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution"
"github.com/docker/docker/pkg/progress"
@ -38,17 +39,20 @@ func (daemon *Daemon) PushImage(ctx context.Context, image, tag string, metaHead
}()
imagePushConfig := &distribution.ImagePushConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
LayerStore: daemon.layerStore,
ImageStore: daemon.imageStore,
ReferenceStore: daemon.referenceStore,
TrustKey: daemon.trustKey,
UploadManager: daemon.uploadManager,
Config: distribution.Config{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore),
ReferenceStore: daemon.referenceStore,
},
ConfigMediaType: schema2.MediaTypeImageConfig,
LayerStore: distribution.NewLayerProviderFromStore(daemon.layerStore),
TrustKey: daemon.trustKey,
UploadManager: daemon.uploadManager,
}
err = distribution.Push(ctx, ref, imagePushConfig)

233
distribution/config.go Normal file
View file

@ -0,0 +1,233 @@
package distribution
import (
"encoding/json"
"fmt"
"io"
"runtime"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
// Config stores configuration for communicating
// with a registry.
type Config struct {
// MetaHeaders stores HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the pull
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// ImageStore manages images.
ImageStore ImageConfigStore
// ReferenceStore manages tags. This value is optional, when excluded
// content will not be tagged.
ReferenceStore reference.Store
// RequireSchema2 ensures that only schema2 manifests are used.
RequireSchema2 bool
}
// ImagePullConfig stores pull configuration.
type ImagePullConfig struct {
Config
// DownloadManager manages concurrent pulls.
DownloadManager RootFSDownloadManager
// Schema2Types is the valid schema2 configuration types allowed
// by the pull operation.
Schema2Types []string
}
// ImagePushConfig stores push configuration.
type ImagePushConfig struct {
Config
// ConfigMediaType is the configuration media type for
// schema2 manifests.
ConfigMediaType string
// LayerStore manages layers.
LayerStore PushLayerProvider
// TrustKey is the private key for legacy signatures. This is typically
// an ephemeral key, since these signatures are no longer verified.
TrustKey libtrust.PrivateKey
// UploadManager dispatches uploads.
UploadManager *xfer.LayerUploadManager
}
// ImageConfigStore handles storing and getting image configurations
// by digest. Allows getting an image configurations rootfs from the
// configuration.
type ImageConfigStore interface {
Put([]byte) (digest.Digest, error)
Get(digest.Digest) ([]byte, error)
RootFSFromConfig([]byte) (*image.RootFS, error)
}
// PushLayerProvider provides layers to be pushed by ChainID.
type PushLayerProvider interface {
Get(layer.ChainID) (PushLayer, error)
}
// PushLayer is a pushable layer with metadata about the layer
// and access to the content of the layer.
type PushLayer interface {
ChainID() layer.ChainID
DiffID() layer.DiffID
Parent() PushLayer
Open() (io.ReadCloser, error)
Size() (int64, error)
MediaType() string
Release()
}
// RootFSDownloadManager handles downloading of the rootfs
type RootFSDownloadManager interface {
// Download downloads the layers into the given initial rootfs and
// returns the final rootfs.
// Given progress output to track download progress
// Returns function to release download resources
Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error)
}
type imageConfigStore struct {
image.Store
}
// NewImageConfigStoreFromStore returns an ImageConfigStore backed
// by an image.Store for container images.
func NewImageConfigStoreFromStore(is image.Store) ImageConfigStore {
return &imageConfigStore{
Store: is,
}
}
func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) {
id, err := s.Store.Create(c)
return digest.Digest(id), err
}
func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) {
img, err := s.Store.Get(image.IDFromDigest(d))
if err != nil {
return nil, err
}
return img.RawJSON(), nil
}
func (s *imageConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
var unmarshalledConfig image.Image
if err := json.Unmarshal(c, &unmarshalledConfig); err != nil {
return nil, err
}
// fail immediately on windows
if runtime.GOOS == "windows" && unmarshalledConfig.OS == "linux" {
return nil, fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS)
}
return unmarshalledConfig.RootFS, nil
}
type storeLayerProvider struct {
ls layer.Store
}
// NewLayerProviderFromStore returns a layer provider backed by
// an instance of LayerStore. Only getting layers as gzipped
// tars is supported.
func NewLayerProviderFromStore(ls layer.Store) PushLayerProvider {
return &storeLayerProvider{
ls: ls,
}
}
func (p *storeLayerProvider) Get(lid layer.ChainID) (PushLayer, error) {
if lid == "" {
return &storeLayer{
Layer: layer.EmptyLayer,
}, nil
}
l, err := p.ls.Get(lid)
if err != nil {
return nil, err
}
sl := storeLayer{
Layer: l,
ls: p.ls,
}
if d, ok := l.(distribution.Describable); ok {
return &describableStoreLayer{
storeLayer: sl,
describable: d,
}, nil
}
return &sl, nil
}
type storeLayer struct {
layer.Layer
ls layer.Store
}
func (l *storeLayer) Parent() PushLayer {
p := l.Layer.Parent()
if p == nil {
return nil
}
return &storeLayer{
Layer: p,
ls: l.ls,
}
}
func (l *storeLayer) Open() (io.ReadCloser, error) {
return l.Layer.TarStream()
}
func (l *storeLayer) Size() (int64, error) {
return l.Layer.DiffSize()
}
func (l *storeLayer) MediaType() string {
// layer store always returns uncompressed tars
return schema2.MediaTypeUncompressedLayer
}
func (l *storeLayer) Release() {
if l.ls != nil {
layer.ReleaseAndLog(l.ls, l.Layer)
}
}
type describableStoreLayer struct {
storeLayer
describable distribution.Describable
}
func (l *describableStoreLayer) Descriptor() distribution.Descriptor {
return l.describable.Descriptor()
}

View file

@ -6,42 +6,13 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/docker/api"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
// ImagePullConfig stores pull configuration.
type ImagePullConfig struct {
// MetaHeaders stores HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the pull
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// ImageStore manages images.
ImageStore image.Store
// ReferenceStore manages tags.
ReferenceStore reference.Store
// DownloadManager manages concurrent pulls.
DownloadManager *xfer.LayerDownloadManager
}
// Puller is an interface that abstracts pulling for different API versions.
type Puller interface {
// Pull tries to pull the image referenced by `tag`
@ -117,6 +88,10 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
confirmedTLSRegistries = make(map[string]struct{})
)
for _, endpoint := range endpoints {
if imagePullConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
continue
}
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue

View file

@ -243,13 +243,15 @@ func (p *v1Puller) pullImage(ctx context.Context, v1ID, endpoint string, localNa
return err
}
imageID, err := p.config.ImageStore.Create(config)
imageID, err := p.config.ImageStore.Put(config)
if err != nil {
return err
}
if err := p.config.ReferenceStore.AddTag(localNameRef, imageID.Digest(), true); err != nil {
return err
if p.config.ReferenceStore != nil {
if err := p.config.ReferenceStore.AddTag(localNameRef, imageID, true); err != nil {
return err
}
}
return nil

View file

@ -33,9 +33,8 @@ import (
)
var (
errRootFSMismatch = errors.New("layers from manifest don't match image configuration")
errMediaTypePlugin = errors.New("target is a plugin")
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
errRootFSMismatch = errors.New("layers from manifest don't match image configuration")
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
)
// ImageConfigPullError is an error pulling the image config blob
@ -355,8 +354,19 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
}
if m, ok := manifest.(*schema2.DeserializedManifest); ok {
if m.Manifest.Config.MediaType == schema2.MediaTypePluginConfig {
return false, errMediaTypePlugin
var allowedMediatype bool
for _, t := range p.config.Schema2Types {
if m.Manifest.Config.MediaType == t {
allowedMediatype = true
break
}
}
if !allowedMediatype {
configClass := mediaTypeClasses[m.Manifest.Config.MediaType]
if configClass == "" {
configClass = "unknown"
}
return false, fmt.Errorf("target is %s", configClass)
}
}
@ -374,6 +384,9 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
switch v := manifest.(type) {
case *schema1.SignedManifest:
if p.config.RequireSchema2 {
return false, fmt.Errorf("invalid manifest: not schema2")
}
id, manifestDigest, err = p.pullSchema1(ctx, ref, v)
if err != nil {
return false, err
@ -394,25 +407,27 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
progress.Message(p.config.ProgressOutput, "", "Digest: "+manifestDigest.String())
oldTagID, err := p.config.ReferenceStore.Get(ref)
if err == nil {
if oldTagID == id {
return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id)
if p.config.ReferenceStore != nil {
oldTagID, err := p.config.ReferenceStore.Get(ref)
if err == nil {
if oldTagID == id {
return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id)
}
} else if err != reference.ErrDoesNotExist {
return false, err
}
} else if err != reference.ErrDoesNotExist {
return false, err
}
if canonical, ok := ref.(reference.Canonical); ok {
if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil {
return false, err
}
} else {
if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
return false, err
}
if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil {
return false, err
if canonical, ok := ref.(reference.Canonical); ok {
if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil {
return false, err
}
} else {
if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
return false, err
}
if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil {
return false, err
}
}
}
return true, nil
@ -481,14 +496,14 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif
return "", "", err
}
imageID, err := p.config.ImageStore.Create(config)
imageID, err := p.config.ImageStore.Put(config)
if err != nil {
return "", "", err
}
manifestDigest = digest.FromBytes(unverifiedManifest.Canonical)
return imageID.Digest(), manifestDigest, nil
return imageID, manifestDigest, nil
}
func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest) (id digest.Digest, manifestDigest digest.Digest, err error) {
@ -498,7 +513,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
}
target := mfst.Target()
if _, err := p.config.ImageStore.Get(image.IDFromDigest(target.Digest)); err == nil {
if _, err := p.config.ImageStore.Get(target.Digest); err == nil {
// If the image already exists locally, no need to pull
// anything.
return target.Digest, manifestDigest, nil
@ -537,9 +552,9 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
}()
var (
configJSON []byte // raw serialized image config
unmarshalledConfig image.Image // deserialized image config
downloadRootFS image.RootFS // rootFS to use for registering layers.
configJSON []byte // raw serialized image config
downloadedRootFS *image.RootFS // rootFS from registered layers
configRootFS *image.RootFS // rootFS from configuration
)
// https://github.com/docker/docker/issues/24766 - Err on the side of caution,
@ -551,84 +566,87 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
// check to block Windows images being pulled on Linux is implemented, it
// may be necessary to perform the same type of serialisation.
if runtime.GOOS == "windows" {
configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan)
configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan)
if err != nil {
return "", "", err
}
if unmarshalledConfig.RootFS == nil {
if configRootFS == nil {
return "", "", errRootFSInvalid
}
if unmarshalledConfig.OS == "linux" {
return "", "", fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS)
}
}
downloadRootFS = *image.NewRootFS()
rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput)
if err != nil {
if configJSON != nil {
// Already received the config
return "", "", err
}
select {
case err = <-errChan:
return "", "", err
default:
cancel()
select {
case <-configChan:
case <-errChan:
if p.config.DownloadManager != nil {
downloadRootFS := *image.NewRootFS()
rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput)
if err != nil {
if configJSON != nil {
// Already received the config
return "", "", err
}
select {
case err = <-errChan:
return "", "", err
default:
cancel()
select {
case <-configChan:
case <-errChan:
}
return "", "", err
}
return "", "", err
}
if release != nil {
defer release()
}
downloadedRootFS = &rootFS
}
defer release()
if configJSON == nil {
configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan)
configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan)
if err != nil {
return "", "", err
}
if unmarshalledConfig.RootFS == nil {
if configRootFS == nil {
return "", "", errRootFSInvalid
}
}
// The DiffIDs returned in rootFS MUST match those in the config.
// Otherwise the image config could be referencing layers that aren't
// included in the manifest.
if len(rootFS.DiffIDs) != len(unmarshalledConfig.RootFS.DiffIDs) {
return "", "", errRootFSMismatch
}
for i := range rootFS.DiffIDs {
if rootFS.DiffIDs[i] != unmarshalledConfig.RootFS.DiffIDs[i] {
if downloadedRootFS != nil {
// The DiffIDs returned in rootFS MUST match those in the config.
// Otherwise the image config could be referencing layers that aren't
// included in the manifest.
if len(downloadedRootFS.DiffIDs) != len(configRootFS.DiffIDs) {
return "", "", errRootFSMismatch
}
for i := range downloadedRootFS.DiffIDs {
if downloadedRootFS.DiffIDs[i] != configRootFS.DiffIDs[i] {
return "", "", errRootFSMismatch
}
}
}
imageID, err := p.config.ImageStore.Create(configJSON)
imageID, err := p.config.ImageStore.Put(configJSON)
if err != nil {
return "", "", err
}
return imageID.Digest(), manifestDigest, nil
return imageID, manifestDigest, nil
}
func receiveConfig(configChan <-chan []byte, errChan <-chan error) ([]byte, image.Image, error) {
func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, error) {
select {
case configJSON := <-configChan:
var unmarshalledConfig image.Image
if err := json.Unmarshal(configJSON, &unmarshalledConfig); err != nil {
return nil, image.Image{}, err
rootfs, err := s.RootFSFromConfig(configJSON)
if err != nil {
return nil, nil, err
}
return configJSON, unmarshalledConfig, nil
return configJSON, rootfs, nil
case err := <-errChan:
return nil, image.Image{}, err
return nil, nil, err
// Don't need a case for ctx.Done in the select because cancellation
// will trigger an error in p.pullSchema2ImageConfig.
}

View file

@ -7,49 +7,13 @@ import (
"io"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
// ImagePushConfig stores push configuration.
type ImagePushConfig struct {
// MetaHeaders store HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the push
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// LayerStore manages layers.
LayerStore layer.Store
// ImageStore manages images.
ImageStore image.Store
// ReferenceStore manages tags.
ReferenceStore reference.Store
// TrustKey is the private key for legacy signatures. This is typically
// an ephemeral key, since these signatures are no longer verified.
TrustKey libtrust.PrivateKey
// UploadManager dispatches uploads.
UploadManager *xfer.LayerUploadManager
}
// Pusher is an interface that abstracts pushing for different API versions.
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
@ -127,6 +91,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
)
for _, endpoint := range endpoints {
if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
continue
}
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue

View file

@ -137,7 +137,7 @@ func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1Dependenc
}
// Retrieve the all the images to be uploaded in the correct order
func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []layer.Layer, err error) {
func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
tagsByImage = make(map[image.ID][]string)
// Ignore digest references
@ -202,25 +202,31 @@ func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID
return
}
func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]layer.Layer) (imageListForThisTag []v1Image, err error) {
img, err := p.config.ImageStore.Get(imgID)
func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
ics, ok := p.config.ImageStore.(*imageConfigStore)
if !ok {
return nil, fmt.Errorf("only image store images supported for v1 push")
}
img, err := ics.Store.Get(imgID)
if err != nil {
return nil, err
}
topLayerID := img.RootFS.ChainID()
var l layer.Layer
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
*referencedLayers = append(*referencedLayers, l)
if err != nil {
return nil, fmt.Errorf("failed to get top layer from image: %v", err)
}
pl, err := p.config.LayerStore.Get(topLayerID)
*referencedLayers = append(*referencedLayers, pl)
if err != nil {
return nil, fmt.Errorf("failed to get top layer from image: %v", err)
}
// V1 push is deprecated, only support existing layerstore layers
lsl, ok := pl.(*storeLayer)
if !ok {
return nil, fmt.Errorf("only layer store layers supported for v1 push")
}
l := lsl.Layer
dependencyImages, parent := generateDependencyImages(l.Parent(), dependenciesSeen)
topImage, err := newV1TopImage(imgID, img, l, parent)
@ -365,7 +371,7 @@ func (p *v1Pusher) pushRepository(ctx context.Context) error {
imgList, tags, referencedLayers, err := p.getImageList()
defer func() {
for _, l := range referencedLayers {
p.config.LayerStore.Release(l)
l.Release()
}
}()
if err != nil {

View file

@ -20,7 +20,6 @@ import (
"github.com/docker/distribution/registry/client"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
@ -123,24 +122,22 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", ref.String())
img, err := p.config.ImageStore.Get(image.IDFromDigest(id))
imgConfig, err := p.config.ImageStore.Get(id)
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
}
var l layer.Layer
topLayerID := img.RootFS.ChainID()
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer layer.ReleaseAndLog(p.config.LayerStore, l)
rootfs, err := p.config.ImageStore.RootFSFromConfig(imgConfig)
if err != nil {
return fmt.Errorf("unable to get rootfs for image %s: %s", ref.String(), err)
}
l, err := p.config.LayerStore.Get(rootfs.ChainID())
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer l.Release()
hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
if err != nil {
return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
@ -158,7 +155,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
for i := 0; i < len(img.RootFS.DiffIDs); i++ {
for i := 0; i < len(rootfs.DiffIDs); i++ {
descriptor := descriptorTemplate
descriptor.layer = l
descriptor.checkedDigests = make(map[digest.Digest]struct{})
@ -172,7 +169,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
}
// Try schema2 first
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
manifest, err := manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
@ -185,7 +182,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
if runtime.GOOS == "windows" {
if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 {
logrus.Warnf("failed to upload schema2 manifest: %v", err)
return err
}
@ -196,7 +193,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
if err != nil {
return err
}
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig)
manifest, err = manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
@ -246,7 +243,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
}
type v2PushDescriptor struct {
layer layer.Layer
layer PushLayer
v2MetadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
@ -425,26 +422,32 @@ func (pd *v2PushDescriptor) uploadUsingSession(
diffID layer.DiffID,
layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
arch, err := pd.layer.TarStream()
if err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
var reader io.ReadCloser
contentReader, err := pd.layer.Open()
size, _ := pd.layer.Size()
reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
switch m := pd.layer.MediaType(); m {
case schema2.MediaTypeUncompressedLayer:
compressedReader, compressionDone := compress(reader)
defer func(closer io.Closer) {
closer.Close()
<-compressionDone
}(reader)
reader = compressedReader
case schema2.MediaTypeLayer:
default:
reader.Close()
return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
}
// don't care if this fails; best effort
size, _ := pd.layer.DiffSize()
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
compressedReader, compressionDone := compress(reader)
defer func() {
reader.Close()
<-compressionDone
}()
digester := digest.Canonical.New()
tee := io.TeeReader(compressedReader, digester.Hash())
tee := io.TeeReader(reader, digester.Hash())
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
reader.Close()
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
@ -568,8 +571,8 @@ attempts:
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.DiffSize()
func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.Size()
switch {
// big blob
case size > middleLayerMaximumSize:

View file

@ -387,9 +387,11 @@ func TestLayerAlreadyExists(t *testing.T) {
ctx := context.Background()
ms := &mockV2MetadataService{}
pd := &v2PushDescriptor{
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: layer.EmptyLayer,
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema2"
distreference "github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
@ -18,6 +19,34 @@ import (
"golang.org/x/net/context"
)
// ImageTypes represents the schema2 config types for images
var ImageTypes = []string{
schema2.MediaTypeImageConfig,
// Handle unexpected values from https://github.com/docker/distribution/issues/1621
"application/octet-stream",
// Treat defaulted values as images, newer types cannot be implied
"",
}
// PluginTypes represents the schema2 config types for plugins
var PluginTypes = []string{
schema2.MediaTypePluginConfig,
}
var mediaTypeClasses map[string]string
func init() {
// initialize media type classes with all know types for
// plugin
mediaTypeClasses = map[string]string{}
for _, t := range ImageTypes {
mediaTypeClasses[t] = "image"
}
for _, t := range PluginTypes {
mediaTypeClasses[t] = "plugin"
}
}
// NewV2Repository returns a repository (v2 only). It creates an HTTP transport
// providing timeout settings and authentication support, and also verifies the
// remote API version.

View file

@ -70,10 +70,13 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
Official: false,
}
imagePullConfig := &ImagePullConfig{
MetaHeaders: http.Header{},
AuthConfig: &types.AuthConfig{
RegistryToken: secretRegistryToken,
Config: Config{
MetaHeaders: http.Header{},
AuthConfig: &types.AuthConfig{
RegistryToken: secretRegistryToken,
},
},
Schema2Types: ImageTypes,
}
puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
if err != nil {

View file

@ -44,7 +44,7 @@ github.com/boltdb/bolt fff57c100f4dea1905678da7e90d92429dff2904
github.com/miekg/dns 75e6e86cc601825c5dbcd4e0c209eab180997cd7
# get graph and distribution packages
github.com/docker/distribution a6bf3dd064f15598166bca2d66a9962a9555139e
github.com/docker/distribution 28602af35aceda2f8d571bad7ca37a54cf0250bc
github.com/vbatts/tar-split v0.10.1
# get go-zfs packages

View file

@ -80,6 +80,11 @@ func FromBytes(p []byte) Digest {
return Canonical.FromBytes(p)
}
// FromString digests the input and returns a Digest.
func FromString(s string) Digest {
return Canonical.FromString(s)
}
// Validate checks that the contents of d is a valid digest, returning an
// error if not.
func (d Digest) Validate() error {

View file

@ -129,6 +129,11 @@ func (a Algorithm) FromBytes(p []byte) Digest {
return digester.Digest()
}
// FromString digests the string input and returns a Digest.
func (a Algorithm) FromString(s string) Digest {
return a.FromBytes([]byte(s))
}
// TODO(stevvooe): Allow resolution of verifiers using the digest type and
// this registration system.

View file

@ -240,8 +240,13 @@ func (mb *configManifestBuilder) emptyTar(ctx context.Context) (digest.Digest, e
// AppendReference adds a reference to the current ManifestBuilder
func (mb *configManifestBuilder) AppendReference(d distribution.Describable) error {
// todo: verification here?
mb.descriptors = append(mb.descriptors, d.Descriptor())
descriptor := d.Descriptor()
if err := descriptor.Digest.Validate(); err != nil {
return err
}
mb.descriptors = append(mb.descriptors, descriptor)
return nil
}

View file

@ -11,21 +11,25 @@ type builder struct {
// bs is a BlobService used to publish the configuration blob.
bs distribution.BlobService
// configMediaType is media type used to describe configuration
configMediaType string
// configJSON references
configJSON []byte
// layers is a list of layer descriptors that gets built by successive
// calls to AppendReference.
layers []distribution.Descriptor
// dependencies is a list of descriptors that gets built by successive
// calls to AppendReference. In case of image configuration these are layers.
dependencies []distribution.Descriptor
}
// NewManifestBuilder is used to build new manifests for the current schema
// version. It takes a BlobService so it can publish the configuration blob
// as part of the Build process.
func NewManifestBuilder(bs distribution.BlobService, configJSON []byte) distribution.ManifestBuilder {
func NewManifestBuilder(bs distribution.BlobService, configMediaType string, configJSON []byte) distribution.ManifestBuilder {
mb := &builder{
bs: bs,
configJSON: make([]byte, len(configJSON)),
bs: bs,
configMediaType: configMediaType,
configJSON: make([]byte, len(configJSON)),
}
copy(mb.configJSON, configJSON)
@ -36,9 +40,9 @@ func NewManifestBuilder(bs distribution.BlobService, configJSON []byte) distribu
func (mb *builder) Build(ctx context.Context) (distribution.Manifest, error) {
m := Manifest{
Versioned: SchemaVersion,
Layers: make([]distribution.Descriptor, len(mb.layers)),
Layers: make([]distribution.Descriptor, len(mb.dependencies)),
}
copy(m.Layers, mb.layers)
copy(m.Layers, mb.dependencies)
configDigest := digest.FromBytes(mb.configJSON)
@ -48,7 +52,7 @@ func (mb *builder) Build(ctx context.Context) (distribution.Manifest, error) {
case nil:
// Override MediaType, since Put always replaces the specified media
// type with application/octet-stream in the descriptor it returns.
m.Config.MediaType = MediaTypeConfig
m.Config.MediaType = mb.configMediaType
return FromStruct(m)
case distribution.ErrBlobUnknown:
// nop
@ -57,10 +61,10 @@ func (mb *builder) Build(ctx context.Context) (distribution.Manifest, error) {
}
// Add config to the blob store
m.Config, err = mb.bs.Put(ctx, MediaTypeConfig, mb.configJSON)
m.Config, err = mb.bs.Put(ctx, mb.configMediaType, mb.configJSON)
// Override MediaType, since Put always replaces the specified media
// type with application/octet-stream in the descriptor it returns.
m.Config.MediaType = MediaTypeConfig
m.Config.MediaType = mb.configMediaType
if err != nil {
return nil, err
}
@ -70,11 +74,11 @@ func (mb *builder) Build(ctx context.Context) (distribution.Manifest, error) {
// AppendReference adds a reference to the current ManifestBuilder.
func (mb *builder) AppendReference(d distribution.Describable) error {
mb.layers = append(mb.layers, d.Descriptor())
mb.dependencies = append(mb.dependencies, d.Descriptor())
return nil
}
// References returns the current references added to this builder.
func (mb *builder) References() []distribution.Descriptor {
return mb.layers
return mb.dependencies
}

View file

@ -14,8 +14,8 @@ const (
// MediaTypeManifest specifies the mediaType for the current version.
MediaTypeManifest = "application/vnd.docker.distribution.manifest.v2+json"
// MediaTypeConfig specifies the mediaType for the image configuration.
MediaTypeConfig = "application/vnd.docker.container.image.v1+json"
// MediaTypeImageConfig specifies the mediaType for the image configuration.
MediaTypeImageConfig = "application/vnd.docker.container.image.v1+json"
// MediaTypePluginConfig specifies the mediaType for plugin configuration.
MediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
@ -27,6 +27,10 @@ const (
// MediaTypeForeignLayer is the mediaType used for layers that must be
// downloaded from foreign URLs.
MediaTypeForeignLayer = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip"
// MediaTypeUncompressedLayer is the mediaType used for layers which
// are not compressed.
MediaTypeUncompressedLayer = "application/vnd.docker.image.rootfs.diff.tar"
)
var (

View file

@ -155,7 +155,9 @@ type RepositoryScope struct {
// using the scope grammar
func (rs RepositoryScope) String() string {
repoType := "repository"
if rs.Class != "" {
// Keep existing format for image class to maintain backwards compatibility
// with authorization servers which do not support the expanded grammar.
if rs.Class != "" && rs.Class != "image" {
repoType = fmt.Sprintf("%s(%s)", repoType, rs.Class)
}
return fmt.Sprintf("%s:%s:%s", repoType, rs.Repository, strings.Join(rs.Actions, ","))