123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698 |
- package graph
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution"
- "github.com/docker/distribution/digest"
- "github.com/docker/distribution/manifest/schema1"
- "github.com/docker/docker/image"
- "github.com/docker/docker/pkg/broadcaster"
- "github.com/docker/docker/pkg/progressreader"
- "github.com/docker/docker/pkg/streamformatter"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/docker/registry"
- "github.com/docker/docker/utils"
- "golang.org/x/net/context"
- )
- type v2Puller struct {
- *TagStore
- endpoint registry.APIEndpoint
- config *ImagePullConfig
- sf *streamformatter.StreamFormatter
- repoInfo *registry.RepositoryInfo
- repo distribution.Repository
- sessionID string
- }
- func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
- // TODO(tiborvass): was ReceiveTimeout
- p.repo, err = newV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
- if err != nil {
- logrus.Warnf("Error getting v2 registry: %v", err)
- return true, err
- }
- p.sessionID = stringid.GenerateRandomID()
- if err := p.pullV2Repository(tag); err != nil {
- if registry.ContinueOnError(err) {
- logrus.Debugf("Error trying v2 registry: %v", err)
- return true, err
- }
- return false, err
- }
- return false, nil
- }
- func (p *v2Puller) pullV2Repository(tag string) (err error) {
- var tags []string
- taggedName := p.repoInfo.LocalName
- if len(tag) > 0 {
- tags = []string{tag}
- taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
- } else {
- var err error
- manSvc, err := p.repo.Manifests(context.Background())
- if err != nil {
- return err
- }
- tags, err = manSvc.Tags()
- if err != nil {
- return err
- }
- }
- poolKey := "v2:" + taggedName
- broadcaster, found := p.poolAdd("pull", poolKey)
- broadcaster.Add(p.config.OutStream)
- if found {
- // Another pull of the same repository is already taking place; just wait for it to finish
- return broadcaster.Wait()
- }
- // This must use a closure so it captures the value of err when the
- // function returns, not when the 'defer' is evaluated.
- defer func() {
- p.poolRemoveWithError("pull", poolKey, err)
- }()
- var layersDownloaded bool
- for _, tag := range tags {
- // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
- // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
- pulledNew, err := p.pullV2Tag(broadcaster, tag, taggedName)
- if err != nil {
- return err
- }
- layersDownloaded = layersDownloaded || pulledNew
- }
- writeStatus(taggedName, broadcaster, p.sf, layersDownloaded)
- return nil
- }
- // downloadInfo is used to pass information from download to extractor
- type downloadInfo struct {
- img contentAddressableDescriptor
- imgIndex int
- tmpFile *os.File
- digest digest.Digest
- layer distribution.ReadSeekCloser
- size int64
- err chan error
- poolKey string
- broadcaster *broadcaster.Buffered
- }
- // contentAddressableDescriptor is used to pass image data from a manifest to the
- // graph.
- type contentAddressableDescriptor struct {
- id string
- parent string
- strongID digest.Digest
- compatibilityID string
- config []byte
- v1Compatibility []byte
- }
- func newContentAddressableImage(v1Compatibility []byte, blobSum digest.Digest, parent digest.Digest) (contentAddressableDescriptor, error) {
- img := contentAddressableDescriptor{
- v1Compatibility: v1Compatibility,
- }
- var err error
- img.config, err = image.MakeImageConfig(v1Compatibility, blobSum, parent)
- if err != nil {
- return img, err
- }
- img.strongID, err = image.StrongID(img.config)
- if err != nil {
- return img, err
- }
- unmarshalledConfig, err := image.NewImgJSON(v1Compatibility)
- if err != nil {
- return img, err
- }
- img.compatibilityID = unmarshalledConfig.ID
- img.id = img.strongID.Hex()
- return img, nil
- }
- // ID returns the actual ID to be used for the downloaded image. This may be
- // a computed ID.
- func (img contentAddressableDescriptor) ID() string {
- return img.id
- }
- // Parent returns the parent ID to be used for the image. This may be a
- // computed ID.
- func (img contentAddressableDescriptor) Parent() string {
- return img.parent
- }
- // MarshalConfig renders the image structure into JSON.
- func (img contentAddressableDescriptor) MarshalConfig() ([]byte, error) {
- return img.config, nil
- }
- type errVerification struct{}
- func (errVerification) Error() string { return "verification failed" }
- func (p *v2Puller) download(di *downloadInfo) {
- logrus.Debugf("pulling blob %q to %s", di.digest, di.img.id)
- blobs := p.repo.Blobs(context.Background())
- desc, err := blobs.Stat(context.Background(), di.digest)
- if err != nil {
- logrus.Debugf("Error statting layer: %v", err)
- di.err <- err
- return
- }
- di.size = desc.Size
- layerDownload, err := blobs.Open(context.Background(), di.digest)
- if err != nil {
- logrus.Debugf("Error fetching layer: %v", err)
- di.err <- err
- return
- }
- defer layerDownload.Close()
- verifier, err := digest.NewDigestVerifier(di.digest)
- if err != nil {
- di.err <- err
- return
- }
- reader := progressreader.New(progressreader.Config{
- In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
- Out: di.broadcaster,
- Formatter: p.sf,
- Size: di.size,
- NewLines: false,
- ID: stringid.TruncateID(di.img.id),
- Action: "Downloading",
- })
- io.Copy(di.tmpFile, reader)
- di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Verifying Checksum", nil))
- if !verifier.Verified() {
- err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
- logrus.Error(err)
- di.err <- err
- return
- }
- di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Download complete", nil))
- logrus.Debugf("Downloaded %s to tempfile %s", di.img.id, di.tmpFile.Name())
- di.layer = layerDownload
- di.err <- nil
- }
- func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (tagUpdated bool, err error) {
- logrus.Debugf("Pulling tag from V2 registry: %q", tag)
- manSvc, err := p.repo.Manifests(context.Background())
- if err != nil {
- return false, err
- }
- unverifiedManifest, err := manSvc.GetByTag(tag)
- if err != nil {
- return false, err
- }
- if unverifiedManifest == nil {
- return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
- }
- var verifiedManifest *schema1.Manifest
- verifiedManifest, err = verifyManifest(unverifiedManifest, tag)
- if err != nil {
- return false, err
- }
- // remove duplicate layers and check parent chain validity
- err = fixManifestLayers(verifiedManifest)
- if err != nil {
- return false, err
- }
- imgs, err := p.getImageInfos(verifiedManifest)
- if err != nil {
- return false, err
- }
- out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
- var downloads []*downloadInfo
- var layerIDs []string
- defer func() {
- p.graph.Release(p.sessionID, layerIDs...)
- for _, d := range downloads {
- p.poolRemoveWithError("pull", d.poolKey, err)
- if d.tmpFile != nil {
- d.tmpFile.Close()
- if err := os.RemoveAll(d.tmpFile.Name()); err != nil {
- logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name())
- }
- }
- }
- }()
- for i := len(verifiedManifest.FSLayers) - 1; i >= 0; i-- {
- img := imgs[i]
- p.graph.Retain(p.sessionID, img.id)
- layerIDs = append(layerIDs, img.id)
- p.graph.imageMutex.Lock(img.id)
- // Check if exists
- if p.graph.Exists(img.id) {
- if err := p.validateImageInGraph(img.id, imgs, i); err != nil {
- p.graph.imageMutex.Unlock(img.id)
- return false, fmt.Errorf("image validation failed: %v", err)
- }
- logrus.Debugf("Image already exists: %s", img.id)
- p.graph.imageMutex.Unlock(img.id)
- continue
- }
- p.graph.imageMutex.Unlock(img.id)
- out.Write(p.sf.FormatProgress(stringid.TruncateID(img.id), "Pulling fs layer", nil))
- d := &downloadInfo{
- img: img,
- imgIndex: i,
- poolKey: "v2layer:" + img.id,
- digest: verifiedManifest.FSLayers[i].BlobSum,
- // TODO: seems like this chan buffer solved hanging problem in go1.5,
- // this can indicate some deeper problem that somehow we never take
- // error from channel in loop below
- err: make(chan error, 1),
- }
- tmpFile, err := ioutil.TempFile("", "GetImageBlob")
- if err != nil {
- return false, err
- }
- d.tmpFile = tmpFile
- downloads = append(downloads, d)
- broadcaster, found := p.poolAdd("pull", d.poolKey)
- broadcaster.Add(out)
- d.broadcaster = broadcaster
- if found {
- d.err <- nil
- } else {
- go p.download(d)
- }
- }
- for _, d := range downloads {
- if err := <-d.err; err != nil {
- return false, err
- }
- if d.layer == nil {
- // Wait for a different pull to download and extract
- // this layer.
- err = d.broadcaster.Wait()
- if err != nil {
- return false, err
- }
- continue
- }
- d.tmpFile.Seek(0, 0)
- err := func() error {
- reader := progressreader.New(progressreader.Config{
- In: d.tmpFile,
- Out: d.broadcaster,
- Formatter: p.sf,
- Size: d.size,
- NewLines: false,
- ID: stringid.TruncateID(d.img.id),
- Action: "Extracting",
- })
- p.graph.imagesMutex.Lock()
- defer p.graph.imagesMutex.Unlock()
- p.graph.imageMutex.Lock(d.img.id)
- defer p.graph.imageMutex.Unlock(d.img.id)
- // Must recheck the data on disk if any exists.
- // This protects against races where something
- // else is written to the graph under this ID
- // after attemptIDReuse.
- if p.graph.Exists(d.img.id) {
- if err := p.validateImageInGraph(d.img.id, imgs, d.imgIndex); err != nil {
- return fmt.Errorf("image validation failed: %v", err)
- }
- }
- if err := p.graph.register(d.img, reader); err != nil {
- return err
- }
- if err := p.graph.setLayerDigest(d.img.id, d.digest); err != nil {
- return err
- }
- if err := p.graph.setV1CompatibilityConfig(d.img.id, d.img.v1Compatibility); err != nil {
- return err
- }
- return nil
- }()
- if err != nil {
- return false, err
- }
- d.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.id), "Pull complete", nil))
- d.broadcaster.Close()
- tagUpdated = true
- }
- manifestDigest, _, err := digestFromManifest(unverifiedManifest, p.repoInfo.LocalName)
- if err != nil {
- return false, err
- }
- // Check for new tag if no layers downloaded
- if !tagUpdated {
- repo, err := p.get(p.repoInfo.LocalName)
- if err != nil {
- return false, err
- }
- if repo != nil {
- if _, exists := repo[tag]; !exists {
- tagUpdated = true
- }
- } else {
- tagUpdated = true
- }
- }
- firstID := layerIDs[len(layerIDs)-1]
- if utils.DigestReference(tag) {
- // TODO(stevvooe): Ideally, we should always set the digest so we can
- // use the digest whether we pull by it or not. Unfortunately, the tag
- // store treats the digest as a separate tag, meaning there may be an
- // untagged digest image that would seem to be dangling by a user.
- if err = p.setDigest(p.repoInfo.LocalName, tag, firstID); err != nil {
- return false, err
- }
- } else {
- // only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
- if err = p.Tag(p.repoInfo.LocalName, tag, firstID, true); err != nil {
- return false, err
- }
- }
- if manifestDigest != "" {
- out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
- }
- return tagUpdated, nil
- }
- func verifyManifest(signedManifest *schema1.SignedManifest, tag string) (m *schema1.Manifest, err error) {
- // If pull by digest, then verify the manifest digest. NOTE: It is
- // important to do this first, before any other content validation. If the
- // digest cannot be verified, don't even bother with those other things.
- if manifestDigest, err := digest.ParseDigest(tag); err == nil {
- verifier, err := digest.NewDigestVerifier(manifestDigest)
- if err != nil {
- return nil, err
- }
- payload, err := signedManifest.Payload()
- if err != nil {
- // If this failed, the signatures section was corrupted
- // or missing. Treat the entire manifest as the payload.
- payload = signedManifest.Raw
- }
- if _, err := verifier.Write(payload); err != nil {
- return nil, err
- }
- if !verifier.Verified() {
- err := fmt.Errorf("image verification failed for digest %s", manifestDigest)
- logrus.Error(err)
- return nil, err
- }
- var verifiedManifest schema1.Manifest
- if err = json.Unmarshal(payload, &verifiedManifest); err != nil {
- return nil, err
- }
- m = &verifiedManifest
- } else {
- m = &signedManifest.Manifest
- }
- if m.SchemaVersion != 1 {
- return nil, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
- }
- if len(m.FSLayers) != len(m.History) {
- return nil, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
- }
- if len(m.FSLayers) == 0 {
- return nil, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
- }
- return m, nil
- }
- // fixManifestLayers removes repeated layers from the manifest and checks the
- // correctness of the parent chain.
- func fixManifestLayers(m *schema1.Manifest) error {
- images := make([]*image.Image, len(m.FSLayers))
- for i := range m.FSLayers {
- img, err := image.NewImgJSON([]byte(m.History[i].V1Compatibility))
- if err != nil {
- return err
- }
- images[i] = img
- if err := image.ValidateID(img.ID); err != nil {
- return err
- }
- }
- if images[len(images)-1].Parent != "" {
- return errors.New("Invalid parent ID in the base layer of the image.")
- }
- // check general duplicates to error instead of a deadlock
- idmap := make(map[string]struct{})
- var lastID string
- for _, img := range images {
- // skip IDs that appear after each other, we handle those later
- if _, exists := idmap[img.ID]; img.ID != lastID && exists {
- return fmt.Errorf("ID %+v appears multiple times in manifest", img.ID)
- }
- lastID = img.ID
- idmap[lastID] = struct{}{}
- }
- // backwards loop so that we keep the remaining indexes after removing items
- for i := len(images) - 2; i >= 0; i-- {
- if images[i].ID == images[i+1].ID { // repeated ID. remove and continue
- m.FSLayers = append(m.FSLayers[:i], m.FSLayers[i+1:]...)
- m.History = append(m.History[:i], m.History[i+1:]...)
- } else if images[i].Parent != images[i+1].ID {
- return fmt.Errorf("Invalid parent ID. Expected %v, got %v.", images[i+1].ID, images[i].Parent)
- }
- }
- return nil
- }
- // getImageInfos returns an imageinfo struct for every image in the manifest.
- // These objects contain both calculated strongIDs and compatibilityIDs found
- // in v1Compatibility object.
- func (p *v2Puller) getImageInfos(m *schema1.Manifest) ([]contentAddressableDescriptor, error) {
- imgs := make([]contentAddressableDescriptor, len(m.FSLayers))
- var parent digest.Digest
- for i := len(imgs) - 1; i >= 0; i-- {
- var err error
- imgs[i], err = newContentAddressableImage([]byte(m.History[i].V1Compatibility), m.FSLayers[i].BlobSum, parent)
- if err != nil {
- return nil, err
- }
- parent = imgs[i].strongID
- }
- p.attemptIDReuse(imgs)
- return imgs, nil
- }
- // attemptIDReuse does a best attempt to match verified compatibilityIDs
- // already in the graph with the computed strongIDs so we can keep using them.
- // This process will never fail but may just return the strongIDs if none of
- // the compatibilityIDs exists or can be verified. If the strongIDs themselves
- // fail verification, we deterministically generate alternate IDs to use until
- // we find one that's available or already exists with the correct data.
- func (p *v2Puller) attemptIDReuse(imgs []contentAddressableDescriptor) {
- // This function needs to be protected with a global lock, because it
- // locks multiple IDs at once, and there's no good way to make sure
- // the locking happens a deterministic order.
- p.graph.imagesMutex.Lock()
- defer p.graph.imagesMutex.Unlock()
- idMap := make(map[string]struct{})
- for _, img := range imgs {
- idMap[img.id] = struct{}{}
- idMap[img.compatibilityID] = struct{}{}
- if p.graph.Exists(img.compatibilityID) {
- if _, err := p.graph.generateV1CompatibilityChain(img.compatibilityID); err != nil {
- logrus.Debugf("Migration v1Compatibility generation error: %v", err)
- return
- }
- }
- }
- for id := range idMap {
- p.graph.imageMutex.Lock(id)
- defer p.graph.imageMutex.Unlock(id)
- }
- // continueReuse controls whether the function will try to find
- // existing layers on disk under the old v1 IDs, to avoid repulling
- // them. The hashes are checked to ensure these layers are okay to
- // use. continueReuse starts out as true, but is set to false if
- // the code encounters something that doesn't match the expected hash.
- continueReuse := true
- for i := len(imgs) - 1; i >= 0; i-- {
- if p.graph.Exists(imgs[i].id) {
- // Found an image in the graph under the strongID. Validate the
- // image before using it.
- if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil {
- continueReuse = false
- logrus.Debugf("not using existing strongID: %v", err)
- // The strong ID existed in the graph but didn't
- // validate successfully. We can't use the strong ID
- // because it didn't validate successfully. Treat the
- // graph like a hash table with probing... compute
- // SHA256(id) until we find an ID that either doesn't
- // already exist in the graph, or has existing content
- // that validates successfully.
- for {
- if err := p.tryNextID(imgs, i, idMap); err != nil {
- logrus.Debug(err.Error())
- } else {
- break
- }
- }
- }
- continue
- }
- if continueReuse {
- compatibilityID := imgs[i].compatibilityID
- if err := p.validateImageInGraph(compatibilityID, imgs, i); err != nil {
- logrus.Debugf("stopping ID reuse: %v", err)
- continueReuse = false
- } else {
- // The compatibility ID exists in the graph and was
- // validated. Use it.
- imgs[i].id = compatibilityID
- }
- }
- }
- // fix up the parents of the images
- for i := 0; i < len(imgs); i++ {
- if i == len(imgs)-1 { // Base layer
- imgs[i].parent = ""
- } else {
- imgs[i].parent = imgs[i+1].id
- }
- }
- }
- // validateImageInGraph checks that an image in the graph has the expected
- // strongID. id is the entry in the graph to check, imgs is the slice of
- // images being processed (for access to the parent), and i is the index
- // into this slice which the graph entry should be checked against.
- func (p *v2Puller) validateImageInGraph(id string, imgs []contentAddressableDescriptor, i int) error {
- img, err := p.graph.Get(id)
- if err != nil {
- return fmt.Errorf("missing: %v", err)
- }
- layerID, err := p.graph.getLayerDigest(id)
- if err != nil {
- return fmt.Errorf("digest: %v", err)
- }
- var parentID digest.Digest
- if i != len(imgs)-1 {
- if img.Parent != imgs[i+1].id { // comparing that graph points to validated ID
- return fmt.Errorf("parent: %v %v", img.Parent, imgs[i+1].id)
- }
- parentID = imgs[i+1].strongID
- } else if img.Parent != "" {
- return fmt.Errorf("unexpected parent: %v", img.Parent)
- }
- v1Config, err := p.graph.getV1CompatibilityConfig(img.ID)
- if err != nil {
- return fmt.Errorf("v1Compatibility: %v %v", img.ID, err)
- }
- json, err := image.MakeImageConfig(v1Config, layerID, parentID)
- if err != nil {
- return fmt.Errorf("make config: %v", err)
- }
- if dgst, err := image.StrongID(json); err == nil && dgst == imgs[i].strongID {
- logrus.Debugf("Validated %v as %v", dgst, id)
- } else {
- return fmt.Errorf("digest mismatch: %v %v, error: %v", dgst, imgs[i].strongID, err)
- }
- // All clear
- return nil
- }
- func (p *v2Puller) tryNextID(imgs []contentAddressableDescriptor, i int, idMap map[string]struct{}) error {
- nextID, _ := digest.FromBytes([]byte(imgs[i].id))
- imgs[i].id = nextID.Hex()
- if _, exists := idMap[imgs[i].id]; !exists {
- p.graph.imageMutex.Lock(imgs[i].id)
- defer p.graph.imageMutex.Unlock(imgs[i].id)
- }
- if p.graph.Exists(imgs[i].id) {
- if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil {
- return fmt.Errorf("not using existing strongID permutation %s: %v", imgs[i].id, err)
- }
- }
- return nil
- }
|