push_v2.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package distribution
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "runtime"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/distribution"
  10. "github.com/docker/distribution/digest"
  11. "github.com/docker/distribution/manifest/schema1"
  12. "github.com/docker/distribution/manifest/schema2"
  13. distreference "github.com/docker/distribution/reference"
  14. "github.com/docker/distribution/registry/client"
  15. "github.com/docker/docker/distribution/metadata"
  16. "github.com/docker/docker/distribution/xfer"
  17. "github.com/docker/docker/image"
  18. "github.com/docker/docker/layer"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/pkg/progress"
  21. "github.com/docker/docker/pkg/stringid"
  22. "github.com/docker/docker/reference"
  23. "github.com/docker/docker/registry"
  24. "golang.org/x/net/context"
  25. )
  26. // PushResult contains the tag, manifest digest, and manifest size from the
  27. // push. It's used to signal this information to the trust code in the client
  28. // so it can sign the manifest if necessary.
  29. type PushResult struct {
  30. Tag string
  31. Digest digest.Digest
  32. Size int
  33. }
  34. type v2Pusher struct {
  35. v2MetadataService *metadata.V2MetadataService
  36. ref reference.Named
  37. endpoint registry.APIEndpoint
  38. repoInfo *registry.RepositoryInfo
  39. config *ImagePushConfig
  40. repo distribution.Repository
  41. // pushState is state built by the Upload functions.
  42. pushState pushState
  43. }
  44. type pushState struct {
  45. sync.Mutex
  46. // remoteLayers is the set of layers known to exist on the remote side.
  47. // This avoids redundant queries when pushing multiple tags that
  48. // involve the same layers. It is also used to fill in digest and size
  49. // information when building the manifest.
  50. remoteLayers map[layer.DiffID]distribution.Descriptor
  51. // confirmedV2 is set to true if we confirm we're talking to a v2
  52. // registry. This is used to limit fallbacks to the v1 protocol.
  53. confirmedV2 bool
  54. }
  55. func (p *v2Pusher) Push(ctx context.Context) (err error) {
  56. p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
  57. p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
  58. if err != nil {
  59. logrus.Debugf("Error getting v2 registry: %v", err)
  60. return err
  61. }
  62. if err = p.pushV2Repository(ctx); err != nil {
  63. if continueOnError(err) {
  64. return fallbackError{
  65. err: err,
  66. confirmedV2: p.pushState.confirmedV2,
  67. transportOK: true,
  68. }
  69. }
  70. }
  71. return err
  72. }
  73. func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
  74. if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
  75. imageID, err := p.config.ReferenceStore.Get(p.ref)
  76. if err != nil {
  77. return fmt.Errorf("tag does not exist: %s", p.ref.String())
  78. }
  79. return p.pushV2Tag(ctx, namedTagged, imageID)
  80. }
  81. if !reference.IsNameOnly(p.ref) {
  82. return errors.New("cannot push a digest reference")
  83. }
  84. // Pull all tags
  85. pushed := 0
  86. for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
  87. if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
  88. pushed++
  89. if err := p.pushV2Tag(ctx, namedTagged, association.ImageID); err != nil {
  90. return err
  91. }
  92. }
  93. }
  94. if pushed == 0 {
  95. return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
  96. }
  97. return nil
  98. }
  99. func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, imageID image.ID) error {
  100. logrus.Debugf("Pushing repository: %s", ref.String())
  101. img, err := p.config.ImageStore.Get(imageID)
  102. if err != nil {
  103. return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
  104. }
  105. var l layer.Layer
  106. topLayerID := img.RootFS.ChainID()
  107. if topLayerID == "" {
  108. l = layer.EmptyLayer
  109. } else {
  110. l, err = p.config.LayerStore.Get(topLayerID)
  111. if err != nil {
  112. return fmt.Errorf("failed to get top layer from image: %v", err)
  113. }
  114. defer layer.ReleaseAndLog(p.config.LayerStore, l)
  115. }
  116. var descriptors []xfer.UploadDescriptor
  117. descriptorTemplate := v2PushDescriptor{
  118. v2MetadataService: p.v2MetadataService,
  119. repoInfo: p.repoInfo,
  120. ref: p.ref,
  121. repo: p.repo,
  122. pushState: &p.pushState,
  123. }
  124. // Loop bounds condition is to avoid pushing the base layer on Windows.
  125. for i := 0; i < len(img.RootFS.DiffIDs); i++ {
  126. descriptor := descriptorTemplate
  127. descriptor.layer = l
  128. descriptors = append(descriptors, &descriptor)
  129. l = l.Parent()
  130. }
  131. if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
  132. return err
  133. }
  134. // Try schema2 first
  135. builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
  136. manifest, err := manifestFromBuilder(ctx, builder, descriptors)
  137. if err != nil {
  138. return err
  139. }
  140. manSvc, err := p.repo.Manifests(ctx)
  141. if err != nil {
  142. return err
  143. }
  144. putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
  145. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  146. if runtime.GOOS == "windows" {
  147. logrus.Warnf("failed to upload schema2 manifest: %v", err)
  148. return err
  149. }
  150. logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
  151. manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
  152. if err != nil {
  153. return err
  154. }
  155. builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
  156. manifest, err = manifestFromBuilder(ctx, builder, descriptors)
  157. if err != nil {
  158. return err
  159. }
  160. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  161. return err
  162. }
  163. }
  164. var canonicalManifest []byte
  165. switch v := manifest.(type) {
  166. case *schema1.SignedManifest:
  167. canonicalManifest = v.Canonical
  168. case *schema2.DeserializedManifest:
  169. _, canonicalManifest, err = v.Payload()
  170. if err != nil {
  171. return err
  172. }
  173. }
  174. manifestDigest := digest.FromBytes(canonicalManifest)
  175. progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
  176. if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, imageID); err != nil {
  177. return err
  178. }
  179. // Signal digest to the trust client so it can sign the
  180. // push, if appropriate.
  181. progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})
  182. return nil
  183. }
  184. func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
  185. // descriptors is in reverse order; iterate backwards to get references
  186. // appended in the right order.
  187. for i := len(descriptors) - 1; i >= 0; i-- {
  188. if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
  189. return nil, err
  190. }
  191. }
  192. return builder.Build(ctx)
  193. }
  194. type v2PushDescriptor struct {
  195. layer layer.Layer
  196. v2MetadataService *metadata.V2MetadataService
  197. repoInfo reference.Named
  198. ref reference.Named
  199. repo distribution.Repository
  200. pushState *pushState
  201. remoteDescriptor distribution.Descriptor
  202. }
  203. func (pd *v2PushDescriptor) Key() string {
  204. return "v2push:" + pd.ref.FullName() + " " + pd.layer.DiffID().String()
  205. }
  206. func (pd *v2PushDescriptor) ID() string {
  207. return stringid.TruncateID(pd.layer.DiffID().String())
  208. }
  209. func (pd *v2PushDescriptor) DiffID() layer.DiffID {
  210. return pd.layer.DiffID()
  211. }
  212. func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
  213. if fs, ok := pd.layer.(distribution.Describable); ok {
  214. if d := fs.Descriptor(); len(d.URLs) > 0 {
  215. progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
  216. return d, nil
  217. }
  218. }
  219. diffID := pd.DiffID()
  220. pd.pushState.Lock()
  221. if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
  222. // it is already known that the push is not needed and
  223. // therefore doing a stat is unnecessary
  224. pd.pushState.Unlock()
  225. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  226. return descriptor, nil
  227. }
  228. pd.pushState.Unlock()
  229. // Do we have any metadata associated with this layer's DiffID?
  230. v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
  231. if err == nil {
  232. descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
  233. if err != nil {
  234. progress.Update(progressOutput, pd.ID(), "Image push failed")
  235. return distribution.Descriptor{}, retryOnError(err)
  236. }
  237. if exists {
  238. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  239. pd.pushState.Lock()
  240. pd.pushState.remoteLayers[diffID] = descriptor
  241. pd.pushState.Unlock()
  242. return descriptor, nil
  243. }
  244. }
  245. logrus.Debugf("Pushing layer: %s", diffID)
  246. // if digest was empty or not saved, or if blob does not exist on the remote repository,
  247. // then push the blob.
  248. bs := pd.repo.Blobs(ctx)
  249. var layerUpload distribution.BlobWriter
  250. mountAttemptsRemaining := 3
  251. // Attempt to find another repository in the same registry to mount the layer
  252. // from to avoid an unnecessary upload.
  253. // Note: metadata is stored from oldest to newest, so we iterate through this
  254. // slice in reverse to maximize our chances of the blob still existing in the
  255. // remote repository.
  256. for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
  257. mountFrom := v2Metadata[i]
  258. sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
  259. if err != nil {
  260. continue
  261. }
  262. if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
  263. // don't mount blobs from another registry
  264. continue
  265. }
  266. namedRef, err := reference.WithName(mountFrom.SourceRepository)
  267. if err != nil {
  268. continue
  269. }
  270. // TODO (brianbland): We need to construct a reference where the Name is
  271. // only the full remote name, so clean this up when distribution has a
  272. // richer reference package
  273. remoteRef, err := distreference.WithName(namedRef.RemoteName())
  274. if err != nil {
  275. continue
  276. }
  277. canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
  278. if err != nil {
  279. continue
  280. }
  281. logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
  282. layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
  283. switch err := err.(type) {
  284. case distribution.ErrBlobMounted:
  285. progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
  286. err.Descriptor.MediaType = schema2.MediaTypeLayer
  287. pd.pushState.Lock()
  288. pd.pushState.confirmedV2 = true
  289. pd.pushState.remoteLayers[diffID] = err.Descriptor
  290. pd.pushState.Unlock()
  291. // Cache mapping from this layer's DiffID to the blobsum
  292. if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
  293. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  294. }
  295. return err.Descriptor, nil
  296. case nil:
  297. // blob upload session created successfully, so begin the upload
  298. mountAttemptsRemaining = 0
  299. default:
  300. // unable to mount layer from this repository, so this source mapping is no longer valid
  301. logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
  302. pd.v2MetadataService.Remove(mountFrom)
  303. mountAttemptsRemaining--
  304. }
  305. }
  306. if layerUpload == nil {
  307. layerUpload, err = bs.Create(ctx)
  308. if err != nil {
  309. return distribution.Descriptor{}, retryOnError(err)
  310. }
  311. }
  312. defer layerUpload.Close()
  313. arch, err := pd.layer.TarStream()
  314. if err != nil {
  315. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  316. }
  317. // don't care if this fails; best effort
  318. size, _ := pd.layer.DiffSize()
  319. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
  320. compressedReader, compressionDone := compress(reader)
  321. defer func() {
  322. reader.Close()
  323. <-compressionDone
  324. }()
  325. digester := digest.Canonical.New()
  326. tee := io.TeeReader(compressedReader, digester.Hash())
  327. nn, err := layerUpload.ReadFrom(tee)
  328. compressedReader.Close()
  329. if err != nil {
  330. return distribution.Descriptor{}, retryOnError(err)
  331. }
  332. pushDigest := digester.Digest()
  333. if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
  334. return distribution.Descriptor{}, retryOnError(err)
  335. }
  336. logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
  337. progress.Update(progressOutput, pd.ID(), "Pushed")
  338. // Cache mapping from this layer's DiffID to the blobsum
  339. if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
  340. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  341. }
  342. pd.pushState.Lock()
  343. // If Commit succeeded, that's an indication that the remote registry
  344. // speaks the v2 protocol.
  345. pd.pushState.confirmedV2 = true
  346. descriptor := distribution.Descriptor{
  347. Digest: pushDigest,
  348. MediaType: schema2.MediaTypeLayer,
  349. Size: nn,
  350. }
  351. pd.pushState.remoteLayers[diffID] = descriptor
  352. pd.pushState.Unlock()
  353. return descriptor, nil
  354. }
  355. func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
  356. pd.remoteDescriptor = descriptor
  357. }
  358. func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
  359. return pd.remoteDescriptor
  360. }
  361. // layerAlreadyExists checks if the registry already know about any of the
  362. // metadata passed in the "metadata" slice. If it finds one that the registry
  363. // knows about, it returns the known digest and "true".
  364. func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
  365. for _, meta := range metadata {
  366. // Only check blobsums that are known to this repository or have an unknown source
  367. if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() {
  368. continue
  369. }
  370. descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest)
  371. switch err {
  372. case nil:
  373. descriptor.MediaType = schema2.MediaTypeLayer
  374. return descriptor, true, nil
  375. case distribution.ErrBlobUnknown:
  376. // nop
  377. default:
  378. return distribution.Descriptor{}, false, err
  379. }
  380. }
  381. return distribution.Descriptor{}, false, nil
  382. }