123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- package graph
- import (
- "bufio"
- "compress/gzip"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution"
- "github.com/docker/distribution/digest"
- "github.com/docker/distribution/manifest"
- "github.com/docker/distribution/manifest/schema1"
- "github.com/docker/docker/image"
- "github.com/docker/docker/pkg/progressreader"
- "github.com/docker/docker/pkg/streamformatter"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/docker/registry"
- "github.com/docker/docker/runconfig"
- "github.com/docker/docker/utils"
- "golang.org/x/net/context"
- )
- const compressionBufSize = 32768
- type v2Pusher struct {
- *TagStore
- endpoint registry.APIEndpoint
- localRepo repository
- repoInfo *registry.RepositoryInfo
- config *ImagePushConfig
- sf *streamformatter.StreamFormatter
- repo distribution.Repository
- // layersPushed is the set of layers known to exist on the remote side.
- // This avoids redundant queries when pushing multiple tags that
- // involve the same layers.
- layersPushed map[digest.Digest]bool
- }
- func (p *v2Pusher) Push() (fallback bool, err error) {
- p.repo, err = newV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
- if err != nil {
- logrus.Debugf("Error getting v2 registry: %v", err)
- return true, err
- }
- return false, p.pushV2Repository(p.config.Tag)
- }
- func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
- logrus.Debugf("Checking %q against %#v", askedTag, p.localRepo)
- if len(askedTag) > 0 {
- if _, ok := p.localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
- return nil, fmt.Errorf("Tag does not exist for %s", askedTag)
- }
- return []string{askedTag}, nil
- }
- var tags []string
- for tag := range p.localRepo {
- if !utils.DigestReference(tag) {
- tags = append(tags, tag)
- }
- }
- return tags, nil
- }
- func (p *v2Pusher) pushV2Repository(tag string) error {
- localName := p.repoInfo.LocalName
- if _, found := p.poolAdd("push", localName); found {
- return fmt.Errorf("push or pull %s is already in progress", localName)
- }
- defer p.poolRemove("push", localName)
- tags, err := p.getImageTags(tag)
- if err != nil {
- return fmt.Errorf("error getting tags for %s: %s", localName, err)
- }
- if len(tags) == 0 {
- return fmt.Errorf("no tags to push for %s", localName)
- }
- for _, tag := range tags {
- if err := p.pushV2Tag(tag); err != nil {
- return err
- }
- }
- return nil
- }
- func (p *v2Pusher) pushV2Tag(tag string) error {
- logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
- layerID, exists := p.localRepo[tag]
- if !exists {
- return fmt.Errorf("tag does not exist: %s", tag)
- }
- layersSeen := make(map[string]bool)
- layer, err := p.graph.Get(layerID)
- if err != nil {
- return err
- }
- m := &schema1.Manifest{
- Versioned: manifest.Versioned{
- SchemaVersion: 1,
- },
- Name: p.repo.Name(),
- Tag: tag,
- Architecture: layer.Architecture,
- FSLayers: []schema1.FSLayer{},
- History: []schema1.History{},
- }
- var metadata runconfig.Config
- if layer != nil && layer.Config != nil {
- metadata = *layer.Config
- }
- out := p.config.OutStream
- for ; layer != nil; layer, err = p.graph.GetParent(layer) {
- if err != nil {
- return err
- }
- // break early if layer has already been seen in this image,
- // this prevents infinite loops on layers which loopback, this
- // cannot be prevented since layer IDs are not merkle hashes
- // TODO(dmcgowan): throw error if no valid use case is found
- if layersSeen[layer.ID] {
- break
- }
- // Skip the base layer on Windows. This cannot be pushed.
- if allowBaseParentImage && layer.Parent == "" {
- break
- }
- logrus.Debugf("Pushing layer: %s", layer.ID)
- if layer.Config != nil && metadata.Image != layer.ID {
- if err := runconfig.Merge(&metadata, layer.Config); err != nil {
- return err
- }
- }
- var exists bool
- dgst, err := p.graph.getLayerDigestWithLock(layer.ID)
- switch err {
- case nil:
- if p.layersPushed[dgst] {
- exists = true
- // break out of switch, it is already known that
- // the push is not needed and therefore doing a
- // stat is unnecessary
- break
- }
- _, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
- switch err {
- case nil:
- exists = true
- out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
- case distribution.ErrBlobUnknown:
- // nop
- default:
- out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
- return err
- }
- case errDigestNotSet:
- // nop
- case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
- return fmt.Errorf("error getting image checksum: %v", err)
- }
- // if digest was empty or not saved, or if blob does not exist on the remote repository,
- // then fetch it.
- if !exists {
- var pushDigest digest.Digest
- if pushDigest, err = p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
- return err
- }
- if dgst == "" {
- // Cache new checksum
- if err := p.graph.setLayerDigestWithLock(layer.ID, pushDigest); err != nil {
- return err
- }
- }
- dgst = pushDigest
- }
- // read v1Compatibility config, generate new if needed
- jsonData, err := p.graph.generateV1CompatibilityChain(layer.ID)
- if err != nil {
- return err
- }
- m.FSLayers = append(m.FSLayers, schema1.FSLayer{BlobSum: dgst})
- m.History = append(m.History, schema1.History{V1Compatibility: string(jsonData)})
- layersSeen[layer.ID] = true
- p.layersPushed[dgst] = true
- }
- // Fix parent chain if necessary
- if err = fixHistory(m); err != nil {
- return err
- }
- logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
- signed, err := schema1.Sign(m, p.trustKey)
- if err != nil {
- return err
- }
- manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
- if err != nil {
- return err
- }
- if manifestDigest != "" {
- out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
- }
- manSvc, err := p.repo.Manifests(context.Background())
- if err != nil {
- return err
- }
- return manSvc.Put(signed)
- }
- // fixHistory makes sure that the manifest has parent IDs that are consistent
- // with its image IDs. Because local image IDs are generated from the
- // configuration and filesystem contents, but IDs in the manifest are preserved
- // from the original pull, it's possible to have inconsistencies where parent
- // IDs don't match up with the other IDs in the manifest. This happens in the
- // case where an engine pulls images where are identical except the IDs from the
- // manifest - the local ID will be the same, and one of the v1Compatibility
- // files gets discarded.
- func fixHistory(m *schema1.Manifest) error {
- var lastID string
- for i := len(m.History) - 1; i >= 0; i-- {
- var historyEntry map[string]*json.RawMessage
- if err := json.Unmarshal([]byte(m.History[i].V1Compatibility), &historyEntry); err != nil {
- return err
- }
- idJSON, present := historyEntry["id"]
- if !present || idJSON == nil {
- return errors.New("missing id key in v1compatibility file")
- }
- var id string
- if err := json.Unmarshal(*idJSON, &id); err != nil {
- return err
- }
- parentJSON, present := historyEntry["parent"]
- if i == len(m.History)-1 {
- // The base layer must not reference a parent layer,
- // otherwise the manifest is incomplete. There is an
- // exception for Windows to handle base layers.
- if !allowBaseParentImage && present && parentJSON != nil {
- var parent string
- if err := json.Unmarshal(*parentJSON, &parent); err != nil {
- return err
- }
- if parent != "" {
- logrus.Debugf("parent id mismatch detected; fixing. parent reference: %s", parent)
- delete(historyEntry, "parent")
- fixedHistory, err := json.Marshal(historyEntry)
- if err != nil {
- return err
- }
- m.History[i].V1Compatibility = string(fixedHistory)
- }
- }
- } else {
- // For all other layers, the parent ID should equal the
- // ID of the next item in the history list. If it
- // doesn't, fix it up (but preserve all other fields,
- // possibly including fields that aren't known to this
- // engine version).
- if !present || parentJSON == nil {
- return errors.New("missing parent key in v1compatibility file")
- }
- var parent string
- if err := json.Unmarshal(*parentJSON, &parent); err != nil {
- return err
- }
- if parent != lastID {
- logrus.Debugf("parent id mismatch detected; fixing. parent reference: %s actual id: %s", parent, id)
- historyEntry["parent"] = rawJSON(lastID)
- fixedHistory, err := json.Marshal(historyEntry)
- if err != nil {
- return err
- }
- m.History[i].V1Compatibility = string(fixedHistory)
- }
- }
- lastID = id
- }
- return nil
- }
- func rawJSON(value interface{}) *json.RawMessage {
- jsonval, err := json.Marshal(value)
- if err != nil {
- return nil
- }
- return (*json.RawMessage)(&jsonval)
- }
- func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
- out := p.config.OutStream
- out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
- image, err := p.graph.Get(img.ID)
- if err != nil {
- return "", err
- }
- arch, err := p.graph.tarLayer(image)
- if err != nil {
- return "", err
- }
- defer arch.Close()
- // Send the layer
- layerUpload, err := bs.Create(context.Background())
- if err != nil {
- return "", err
- }
- defer layerUpload.Close()
- reader := progressreader.New(progressreader.Config{
- In: ioutil.NopCloser(arch), // we'll take care of close here.
- Out: out,
- Formatter: p.sf,
- // TODO(stevvooe): This may cause a size reporting error. Try to get
- // this from tar-split or elsewhere. The main issue here is that we
- // don't want to buffer to disk *just* to calculate the size.
- Size: img.Size,
- NewLines: false,
- ID: stringid.TruncateID(img.ID),
- Action: "Pushing",
- })
- digester := digest.Canonical.New()
- // HACK: The MultiWriter doesn't write directly to layerUpload because
- // we must make sure the ReadFrom is used, not Write. Using Write would
- // send a PATCH request for every Write call.
- pipeReader, pipeWriter := io.Pipe()
- // Use a bufio.Writer to avoid excessive chunking in HTTP request.
- bufWriter := bufio.NewWriterSize(io.MultiWriter(pipeWriter, digester.Hash()), compressionBufSize)
- compressor := gzip.NewWriter(bufWriter)
- go func() {
- _, err := io.Copy(compressor, reader)
- if err == nil {
- err = compressor.Close()
- }
- if err == nil {
- err = bufWriter.Flush()
- }
- if err != nil {
- pipeWriter.CloseWithError(err)
- } else {
- pipeWriter.Close()
- }
- }()
- out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
- nn, err := layerUpload.ReadFrom(pipeReader)
- pipeReader.Close()
- if err != nil {
- return "", err
- }
- dgst := digester.Digest()
- if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
- return "", err
- }
- logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
- out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
- return dgst, nil
- }
|