123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- package distribution
- import (
- "fmt"
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution/reference"
- "github.com/docker/distribution/registry/client/transport"
- "github.com/docker/docker/distribution/metadata"
- "github.com/docker/docker/dockerversion"
- "github.com/docker/docker/image"
- "github.com/docker/docker/image/v1"
- "github.com/docker/docker/layer"
- "github.com/docker/docker/pkg/ioutils"
- "github.com/docker/docker/pkg/progress"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/docker/registry"
- "github.com/opencontainers/go-digest"
- "golang.org/x/net/context"
- )
- type v1Pusher struct {
- v1IDService *metadata.V1IDService
- endpoint registry.APIEndpoint
- ref reference.Named
- repoInfo *registry.RepositoryInfo
- config *ImagePushConfig
- session *registry.Session
- }
- func (p *v1Pusher) Push(ctx context.Context) error {
- tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
- if err != nil {
- return err
- }
- // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
- tr := transport.NewTransport(
- // TODO(tiborvass): was NoTimeout
- registry.NewTransport(tlsConfig),
- registry.DockerHeaders(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)...,
- )
- client := registry.HTTPClient(tr)
- v1Endpoint, err := p.endpoint.ToV1Endpoint(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)
- if err != nil {
- logrus.Debugf("Could not get v1 endpoint: %v", err)
- return fallbackError{err: err}
- }
- p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
- if err != nil {
- // TODO(dmcgowan): Check if should fallback
- return fallbackError{err: err}
- }
- if err := p.pushRepository(ctx); err != nil {
- // TODO(dmcgowan): Check if should fallback
- return err
- }
- return nil
- }
- // v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
- // image being pushed to a v1 registry.
- type v1Image interface {
- Config() []byte
- Layer() layer.Layer
- V1ID() string
- }
- type v1ImageCommon struct {
- layer layer.Layer
- config []byte
- v1ID string
- }
- func (common *v1ImageCommon) Config() []byte {
- return common.config
- }
- func (common *v1ImageCommon) V1ID() string {
- return common.v1ID
- }
- func (common *v1ImageCommon) Layer() layer.Layer {
- return common.layer
- }
- // v1TopImage defines a runnable (top layer) image being pushed to a v1
- // registry.
- type v1TopImage struct {
- v1ImageCommon
- imageID image.ID
- }
- func newV1TopImage(imageID image.ID, img *image.Image, l layer.Layer, parent *v1DependencyImage) (*v1TopImage, error) {
- v1ID := imageID.Digest().Hex()
- parentV1ID := ""
- if parent != nil {
- parentV1ID = parent.V1ID()
- }
- config, err := v1.MakeV1ConfigFromConfig(img, v1ID, parentV1ID, false)
- if err != nil {
- return nil, err
- }
- return &v1TopImage{
- v1ImageCommon: v1ImageCommon{
- v1ID: v1ID,
- config: config,
- layer: l,
- },
- imageID: imageID,
- }, nil
- }
- // v1DependencyImage defines a dependency layer being pushed to a v1 registry.
- type v1DependencyImage struct {
- v1ImageCommon
- }
- func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1DependencyImage {
- v1ID := digest.Digest(l.ChainID()).Hex()
- var config string
- if parent != nil {
- config = fmt.Sprintf(`{"id":"%s","parent":"%s"}`, v1ID, parent.V1ID())
- } else {
- config = fmt.Sprintf(`{"id":"%s"}`, v1ID)
- }
- return &v1DependencyImage{
- v1ImageCommon: v1ImageCommon{
- v1ID: v1ID,
- config: []byte(config),
- layer: l,
- },
- }
- }
- // Retrieve the all the images to be uploaded in the correct order
- func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
- tagsByImage = make(map[image.ID][]string)
- // Ignore digest references
- if _, isCanonical := p.ref.(reference.Canonical); isCanonical {
- return
- }
- tagged, isTagged := p.ref.(reference.NamedTagged)
- if isTagged {
- // Push a specific tag
- var imgID image.ID
- var dgst digest.Digest
- dgst, err = p.config.ReferenceStore.Get(p.ref)
- if err != nil {
- return
- }
- imgID = image.IDFromDigest(dgst)
- imageList, err = p.imageListForTag(imgID, nil, &referencedLayers)
- if err != nil {
- return
- }
- tagsByImage[imgID] = []string{tagged.Tag()}
- return
- }
- imagesSeen := make(map[digest.Digest]struct{})
- dependenciesSeen := make(map[layer.ChainID]*v1DependencyImage)
- associations := p.config.ReferenceStore.ReferencesByName(p.ref)
- for _, association := range associations {
- if tagged, isTagged = association.Ref.(reference.NamedTagged); !isTagged {
- // Ignore digest references.
- continue
- }
- imgID := image.IDFromDigest(association.ID)
- tagsByImage[imgID] = append(tagsByImage[imgID], tagged.Tag())
- if _, present := imagesSeen[association.ID]; present {
- // Skip generating image list for already-seen image
- continue
- }
- imagesSeen[association.ID] = struct{}{}
- imageListForThisTag, err := p.imageListForTag(imgID, dependenciesSeen, &referencedLayers)
- if err != nil {
- return nil, nil, nil, err
- }
- // append to main image list
- imageList = append(imageList, imageListForThisTag...)
- }
- if len(imageList) == 0 {
- return nil, nil, nil, fmt.Errorf("No images found for the requested repository / tag")
- }
- logrus.Debugf("Image list: %v", imageList)
- logrus.Debugf("Tags by image: %v", tagsByImage)
- return
- }
- func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
- ics, ok := p.config.ImageStore.(*imageConfigStore)
- if !ok {
- return nil, fmt.Errorf("only image store images supported for v1 push")
- }
- img, err := ics.Store.Get(imgID)
- if err != nil {
- return nil, err
- }
- topLayerID := img.RootFS.ChainID()
- pl, err := p.config.LayerStore.Get(topLayerID)
- *referencedLayers = append(*referencedLayers, pl)
- if err != nil {
- return nil, fmt.Errorf("failed to get top layer from image: %v", err)
- }
- // V1 push is deprecated, only support existing layerstore layers
- lsl, ok := pl.(*storeLayer)
- if !ok {
- return nil, fmt.Errorf("only layer store layers supported for v1 push")
- }
- l := lsl.Layer
- dependencyImages, parent := generateDependencyImages(l.Parent(), dependenciesSeen)
- topImage, err := newV1TopImage(imgID, img, l, parent)
- if err != nil {
- return nil, err
- }
- imageListForThisTag = append(dependencyImages, topImage)
- return
- }
- func generateDependencyImages(l layer.Layer, dependenciesSeen map[layer.ChainID]*v1DependencyImage) (imageListForThisTag []v1Image, parent *v1DependencyImage) {
- if l == nil {
- return nil, nil
- }
- imageListForThisTag, parent = generateDependencyImages(l.Parent(), dependenciesSeen)
- if dependenciesSeen != nil {
- if dependencyImage, present := dependenciesSeen[l.ChainID()]; present {
- // This layer is already on the list, we can ignore it
- // and all its parents.
- return imageListForThisTag, dependencyImage
- }
- }
- dependencyImage := newV1DependencyImage(l, parent)
- imageListForThisTag = append(imageListForThisTag, dependencyImage)
- if dependenciesSeen != nil {
- dependenciesSeen[l.ChainID()] = dependencyImage
- }
- return imageListForThisTag, dependencyImage
- }
- // createImageIndex returns an index of an image's layer IDs and tags.
- func createImageIndex(images []v1Image, tags map[image.ID][]string) []*registry.ImgData {
- var imageIndex []*registry.ImgData
- for _, img := range images {
- v1ID := img.V1ID()
- if topImage, isTopImage := img.(*v1TopImage); isTopImage {
- if tags, hasTags := tags[topImage.imageID]; hasTags {
- // If an image has tags you must add an entry in the image index
- // for each tag
- for _, tag := range tags {
- imageIndex = append(imageIndex, ®istry.ImgData{
- ID: v1ID,
- Tag: tag,
- })
- }
- continue
- }
- }
- // If the image does not have a tag it still needs to be sent to the
- // registry with an empty tag so that it is associated with the repository
- imageIndex = append(imageIndex, ®istry.ImgData{
- ID: v1ID,
- Tag: "",
- })
- }
- return imageIndex
- }
- // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
- // and if it is absent then it sends the image id to the channel to be pushed.
- func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, endpoint string, images chan v1Image, imagesToPush chan string) {
- defer wg.Done()
- for image := range images {
- v1ID := image.V1ID()
- truncID := stringid.TruncateID(image.Layer().DiffID().String())
- if err := p.session.LookupRemoteImage(v1ID, endpoint); err != nil {
- logrus.Errorf("Error in LookupRemoteImage: %s", err)
- imagesToPush <- v1ID
- progress.Update(p.config.ProgressOutput, truncID, "Waiting")
- } else {
- progress.Update(p.config.ProgressOutput, truncID, "Already exists")
- }
- }
- }
- func (p *v1Pusher) pushImageToEndpoint(ctx context.Context, endpoint string, imageList []v1Image, tags map[image.ID][]string, repo *registry.RepositoryData) error {
- workerCount := len(imageList)
- // start a maximum of 5 workers to check if images exist on the specified endpoint.
- if workerCount > 5 {
- workerCount = 5
- }
- var (
- wg = &sync.WaitGroup{}
- imageData = make(chan v1Image, workerCount*2)
- imagesToPush = make(chan string, workerCount*2)
- pushes = make(chan map[string]struct{}, 1)
- )
- for i := 0; i < workerCount; i++ {
- wg.Add(1)
- go p.lookupImageOnEndpoint(wg, endpoint, imageData, imagesToPush)
- }
- // start a go routine that consumes the images to push
- go func() {
- shouldPush := make(map[string]struct{})
- for id := range imagesToPush {
- shouldPush[id] = struct{}{}
- }
- pushes <- shouldPush
- }()
- for _, v1Image := range imageList {
- imageData <- v1Image
- }
- // close the channel to notify the workers that there will be no more images to check.
- close(imageData)
- wg.Wait()
- close(imagesToPush)
- // wait for all the images that require pushes to be collected into a consumable map.
- shouldPush := <-pushes
- // finish by pushing any images and tags to the endpoint. The order that the images are pushed
- // is very important that is why we are still iterating over the ordered list of imageIDs.
- for _, img := range imageList {
- v1ID := img.V1ID()
- if _, push := shouldPush[v1ID]; push {
- if _, err := p.pushImage(ctx, img, endpoint); err != nil {
- // FIXME: Continue on error?
- return err
- }
- }
- if topImage, isTopImage := img.(*v1TopImage); isTopImage {
- for _, tag := range tags[topImage.imageID] {
- progress.Messagef(p.config.ProgressOutput, "", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(v1ID), endpoint+"repositories/"+reference.Path(p.repoInfo.Name)+"/tags/"+tag)
- if err := p.session.PushRegistryTag(p.repoInfo.Name, v1ID, tag, endpoint); err != nil {
- return err
- }
- }
- }
- }
- return nil
- }
- // pushRepository pushes layers that do not already exist on the registry.
- func (p *v1Pusher) pushRepository(ctx context.Context) error {
- imgList, tags, referencedLayers, err := p.getImageList()
- defer func() {
- for _, l := range referencedLayers {
- l.Release()
- }
- }()
- if err != nil {
- return err
- }
- imageIndex := createImageIndex(imgList, tags)
- for _, data := range imageIndex {
- logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
- }
- // Register all the images in a repository with the registry
- // If an image is not in this list it will not be associated with the repository
- repoData, err := p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, false, nil)
- if err != nil {
- return err
- }
- // push the repository to each of the endpoints only if it does not exist.
- for _, endpoint := range repoData.Endpoints {
- if err := p.pushImageToEndpoint(ctx, endpoint, imgList, tags, repoData); err != nil {
- return err
- }
- }
- _, err = p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, true, repoData.Endpoints)
- return err
- }
- func (p *v1Pusher) pushImage(ctx context.Context, v1Image v1Image, ep string) (checksum string, err error) {
- l := v1Image.Layer()
- v1ID := v1Image.V1ID()
- truncID := stringid.TruncateID(l.DiffID().String())
- jsonRaw := v1Image.Config()
- progress.Update(p.config.ProgressOutput, truncID, "Pushing")
- // General rule is to use ID for graph accesses and compatibilityID for
- // calls to session.registry()
- imgData := ®istry.ImgData{
- ID: v1ID,
- }
- // Send the json
- if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
- if err == registry.ErrAlreadyExists {
- progress.Update(p.config.ProgressOutput, truncID, "Image already pushed, skipping")
- return "", nil
- }
- return "", err
- }
- arch, err := l.TarStream()
- if err != nil {
- return "", err
- }
- defer arch.Close()
- // don't care if this fails; best effort
- size, _ := l.DiffSize()
- // Send the layer
- logrus.Debugf("rendered layer for %s of [%d] size", v1ID, size)
- reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), p.config.ProgressOutput, size, truncID, "Pushing")
- defer reader.Close()
- checksum, checksumPayload, err := p.session.PushImageLayerRegistry(v1ID, reader, ep, jsonRaw)
- if err != nil {
- return "", err
- }
- imgData.Checksum = checksum
- imgData.ChecksumPayload = checksumPayload
- // Send the checksum
- if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
- return "", err
- }
- if err := p.v1IDService.Set(v1ID, p.repoInfo.Index.Name, l.DiffID()); err != nil {
- logrus.Warnf("Could not set v1 ID mapping: %v", err)
- }
- progress.Update(p.config.ProgressOutput, truncID, "Image successfully pushed")
- return imgData.Checksum, nil
- }
|