push_v1.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package distribution
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/distribution/reference"
  7. "github.com/docker/distribution/registry/client/transport"
  8. "github.com/docker/docker/distribution/metadata"
  9. "github.com/docker/docker/dockerversion"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/image/v1"
  12. "github.com/docker/docker/layer"
  13. "github.com/docker/docker/pkg/ioutils"
  14. "github.com/docker/docker/pkg/progress"
  15. "github.com/docker/docker/pkg/stringid"
  16. "github.com/docker/docker/registry"
  17. "github.com/opencontainers/go-digest"
  18. "golang.org/x/net/context"
  19. )
  20. type v1Pusher struct {
  21. v1IDService *metadata.V1IDService
  22. endpoint registry.APIEndpoint
  23. ref reference.Named
  24. repoInfo *registry.RepositoryInfo
  25. config *ImagePushConfig
  26. session *registry.Session
  27. }
  28. func (p *v1Pusher) Push(ctx context.Context) error {
  29. tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
  30. if err != nil {
  31. return err
  32. }
  33. // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
  34. tr := transport.NewTransport(
  35. // TODO(tiborvass): was NoTimeout
  36. registry.NewTransport(tlsConfig),
  37. registry.DockerHeaders(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)...,
  38. )
  39. client := registry.HTTPClient(tr)
  40. v1Endpoint, err := p.endpoint.ToV1Endpoint(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)
  41. if err != nil {
  42. logrus.Debugf("Could not get v1 endpoint: %v", err)
  43. return fallbackError{err: err}
  44. }
  45. p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
  46. if err != nil {
  47. // TODO(dmcgowan): Check if should fallback
  48. return fallbackError{err: err}
  49. }
  50. if err := p.pushRepository(ctx); err != nil {
  51. // TODO(dmcgowan): Check if should fallback
  52. return err
  53. }
  54. return nil
  55. }
  56. // v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
  57. // image being pushed to a v1 registry.
  58. type v1Image interface {
  59. Config() []byte
  60. Layer() layer.Layer
  61. V1ID() string
  62. }
  63. type v1ImageCommon struct {
  64. layer layer.Layer
  65. config []byte
  66. v1ID string
  67. }
  68. func (common *v1ImageCommon) Config() []byte {
  69. return common.config
  70. }
  71. func (common *v1ImageCommon) V1ID() string {
  72. return common.v1ID
  73. }
  74. func (common *v1ImageCommon) Layer() layer.Layer {
  75. return common.layer
  76. }
  77. // v1TopImage defines a runnable (top layer) image being pushed to a v1
  78. // registry.
  79. type v1TopImage struct {
  80. v1ImageCommon
  81. imageID image.ID
  82. }
  83. func newV1TopImage(imageID image.ID, img *image.Image, l layer.Layer, parent *v1DependencyImage) (*v1TopImage, error) {
  84. v1ID := imageID.Digest().Hex()
  85. parentV1ID := ""
  86. if parent != nil {
  87. parentV1ID = parent.V1ID()
  88. }
  89. config, err := v1.MakeV1ConfigFromConfig(img, v1ID, parentV1ID, false)
  90. if err != nil {
  91. return nil, err
  92. }
  93. return &v1TopImage{
  94. v1ImageCommon: v1ImageCommon{
  95. v1ID: v1ID,
  96. config: config,
  97. layer: l,
  98. },
  99. imageID: imageID,
  100. }, nil
  101. }
  102. // v1DependencyImage defines a dependency layer being pushed to a v1 registry.
  103. type v1DependencyImage struct {
  104. v1ImageCommon
  105. }
  106. func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1DependencyImage {
  107. v1ID := digest.Digest(l.ChainID()).Hex()
  108. var config string
  109. if parent != nil {
  110. config = fmt.Sprintf(`{"id":"%s","parent":"%s"}`, v1ID, parent.V1ID())
  111. } else {
  112. config = fmt.Sprintf(`{"id":"%s"}`, v1ID)
  113. }
  114. return &v1DependencyImage{
  115. v1ImageCommon: v1ImageCommon{
  116. v1ID: v1ID,
  117. config: []byte(config),
  118. layer: l,
  119. },
  120. }
  121. }
  122. // Retrieve the all the images to be uploaded in the correct order
  123. func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
  124. tagsByImage = make(map[image.ID][]string)
  125. // Ignore digest references
  126. if _, isCanonical := p.ref.(reference.Canonical); isCanonical {
  127. return
  128. }
  129. tagged, isTagged := p.ref.(reference.NamedTagged)
  130. if isTagged {
  131. // Push a specific tag
  132. var imgID image.ID
  133. var dgst digest.Digest
  134. dgst, err = p.config.ReferenceStore.Get(p.ref)
  135. if err != nil {
  136. return
  137. }
  138. imgID = image.IDFromDigest(dgst)
  139. imageList, err = p.imageListForTag(imgID, nil, &referencedLayers)
  140. if err != nil {
  141. return
  142. }
  143. tagsByImage[imgID] = []string{tagged.Tag()}
  144. return
  145. }
  146. imagesSeen := make(map[digest.Digest]struct{})
  147. dependenciesSeen := make(map[layer.ChainID]*v1DependencyImage)
  148. associations := p.config.ReferenceStore.ReferencesByName(p.ref)
  149. for _, association := range associations {
  150. if tagged, isTagged = association.Ref.(reference.NamedTagged); !isTagged {
  151. // Ignore digest references.
  152. continue
  153. }
  154. imgID := image.IDFromDigest(association.ID)
  155. tagsByImage[imgID] = append(tagsByImage[imgID], tagged.Tag())
  156. if _, present := imagesSeen[association.ID]; present {
  157. // Skip generating image list for already-seen image
  158. continue
  159. }
  160. imagesSeen[association.ID] = struct{}{}
  161. imageListForThisTag, err := p.imageListForTag(imgID, dependenciesSeen, &referencedLayers)
  162. if err != nil {
  163. return nil, nil, nil, err
  164. }
  165. // append to main image list
  166. imageList = append(imageList, imageListForThisTag...)
  167. }
  168. if len(imageList) == 0 {
  169. return nil, nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  170. }
  171. logrus.Debugf("Image list: %v", imageList)
  172. logrus.Debugf("Tags by image: %v", tagsByImage)
  173. return
  174. }
  175. func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
  176. ics, ok := p.config.ImageStore.(*imageConfigStore)
  177. if !ok {
  178. return nil, fmt.Errorf("only image store images supported for v1 push")
  179. }
  180. img, err := ics.Store.Get(imgID)
  181. if err != nil {
  182. return nil, err
  183. }
  184. topLayerID := img.RootFS.ChainID()
  185. pl, err := p.config.LayerStore.Get(topLayerID)
  186. *referencedLayers = append(*referencedLayers, pl)
  187. if err != nil {
  188. return nil, fmt.Errorf("failed to get top layer from image: %v", err)
  189. }
  190. // V1 push is deprecated, only support existing layerstore layers
  191. lsl, ok := pl.(*storeLayer)
  192. if !ok {
  193. return nil, fmt.Errorf("only layer store layers supported for v1 push")
  194. }
  195. l := lsl.Layer
  196. dependencyImages, parent := generateDependencyImages(l.Parent(), dependenciesSeen)
  197. topImage, err := newV1TopImage(imgID, img, l, parent)
  198. if err != nil {
  199. return nil, err
  200. }
  201. imageListForThisTag = append(dependencyImages, topImage)
  202. return
  203. }
  204. func generateDependencyImages(l layer.Layer, dependenciesSeen map[layer.ChainID]*v1DependencyImage) (imageListForThisTag []v1Image, parent *v1DependencyImage) {
  205. if l == nil {
  206. return nil, nil
  207. }
  208. imageListForThisTag, parent = generateDependencyImages(l.Parent(), dependenciesSeen)
  209. if dependenciesSeen != nil {
  210. if dependencyImage, present := dependenciesSeen[l.ChainID()]; present {
  211. // This layer is already on the list, we can ignore it
  212. // and all its parents.
  213. return imageListForThisTag, dependencyImage
  214. }
  215. }
  216. dependencyImage := newV1DependencyImage(l, parent)
  217. imageListForThisTag = append(imageListForThisTag, dependencyImage)
  218. if dependenciesSeen != nil {
  219. dependenciesSeen[l.ChainID()] = dependencyImage
  220. }
  221. return imageListForThisTag, dependencyImage
  222. }
  223. // createImageIndex returns an index of an image's layer IDs and tags.
  224. func createImageIndex(images []v1Image, tags map[image.ID][]string) []*registry.ImgData {
  225. var imageIndex []*registry.ImgData
  226. for _, img := range images {
  227. v1ID := img.V1ID()
  228. if topImage, isTopImage := img.(*v1TopImage); isTopImage {
  229. if tags, hasTags := tags[topImage.imageID]; hasTags {
  230. // If an image has tags you must add an entry in the image index
  231. // for each tag
  232. for _, tag := range tags {
  233. imageIndex = append(imageIndex, &registry.ImgData{
  234. ID: v1ID,
  235. Tag: tag,
  236. })
  237. }
  238. continue
  239. }
  240. }
  241. // If the image does not have a tag it still needs to be sent to the
  242. // registry with an empty tag so that it is associated with the repository
  243. imageIndex = append(imageIndex, &registry.ImgData{
  244. ID: v1ID,
  245. Tag: "",
  246. })
  247. }
  248. return imageIndex
  249. }
  250. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  251. // and if it is absent then it sends the image id to the channel to be pushed.
  252. func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, endpoint string, images chan v1Image, imagesToPush chan string) {
  253. defer wg.Done()
  254. for image := range images {
  255. v1ID := image.V1ID()
  256. truncID := stringid.TruncateID(image.Layer().DiffID().String())
  257. if err := p.session.LookupRemoteImage(v1ID, endpoint); err != nil {
  258. logrus.Errorf("Error in LookupRemoteImage: %s", err)
  259. imagesToPush <- v1ID
  260. progress.Update(p.config.ProgressOutput, truncID, "Waiting")
  261. } else {
  262. progress.Update(p.config.ProgressOutput, truncID, "Already exists")
  263. }
  264. }
  265. }
  266. func (p *v1Pusher) pushImageToEndpoint(ctx context.Context, endpoint string, imageList []v1Image, tags map[image.ID][]string, repo *registry.RepositoryData) error {
  267. workerCount := len(imageList)
  268. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  269. if workerCount > 5 {
  270. workerCount = 5
  271. }
  272. var (
  273. wg = &sync.WaitGroup{}
  274. imageData = make(chan v1Image, workerCount*2)
  275. imagesToPush = make(chan string, workerCount*2)
  276. pushes = make(chan map[string]struct{}, 1)
  277. )
  278. for i := 0; i < workerCount; i++ {
  279. wg.Add(1)
  280. go p.lookupImageOnEndpoint(wg, endpoint, imageData, imagesToPush)
  281. }
  282. // start a go routine that consumes the images to push
  283. go func() {
  284. shouldPush := make(map[string]struct{})
  285. for id := range imagesToPush {
  286. shouldPush[id] = struct{}{}
  287. }
  288. pushes <- shouldPush
  289. }()
  290. for _, v1Image := range imageList {
  291. imageData <- v1Image
  292. }
  293. // close the channel to notify the workers that there will be no more images to check.
  294. close(imageData)
  295. wg.Wait()
  296. close(imagesToPush)
  297. // wait for all the images that require pushes to be collected into a consumable map.
  298. shouldPush := <-pushes
  299. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  300. // is very important that is why we are still iterating over the ordered list of imageIDs.
  301. for _, img := range imageList {
  302. v1ID := img.V1ID()
  303. if _, push := shouldPush[v1ID]; push {
  304. if _, err := p.pushImage(ctx, img, endpoint); err != nil {
  305. // FIXME: Continue on error?
  306. return err
  307. }
  308. }
  309. if topImage, isTopImage := img.(*v1TopImage); isTopImage {
  310. for _, tag := range tags[topImage.imageID] {
  311. progress.Messagef(p.config.ProgressOutput, "", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(v1ID), endpoint+"repositories/"+reference.Path(p.repoInfo.Name)+"/tags/"+tag)
  312. if err := p.session.PushRegistryTag(p.repoInfo.Name, v1ID, tag, endpoint); err != nil {
  313. return err
  314. }
  315. }
  316. }
  317. }
  318. return nil
  319. }
  320. // pushRepository pushes layers that do not already exist on the registry.
  321. func (p *v1Pusher) pushRepository(ctx context.Context) error {
  322. imgList, tags, referencedLayers, err := p.getImageList()
  323. defer func() {
  324. for _, l := range referencedLayers {
  325. l.Release()
  326. }
  327. }()
  328. if err != nil {
  329. return err
  330. }
  331. imageIndex := createImageIndex(imgList, tags)
  332. for _, data := range imageIndex {
  333. logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  334. }
  335. // Register all the images in a repository with the registry
  336. // If an image is not in this list it will not be associated with the repository
  337. repoData, err := p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, false, nil)
  338. if err != nil {
  339. return err
  340. }
  341. // push the repository to each of the endpoints only if it does not exist.
  342. for _, endpoint := range repoData.Endpoints {
  343. if err := p.pushImageToEndpoint(ctx, endpoint, imgList, tags, repoData); err != nil {
  344. return err
  345. }
  346. }
  347. _, err = p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, true, repoData.Endpoints)
  348. return err
  349. }
  350. func (p *v1Pusher) pushImage(ctx context.Context, v1Image v1Image, ep string) (checksum string, err error) {
  351. l := v1Image.Layer()
  352. v1ID := v1Image.V1ID()
  353. truncID := stringid.TruncateID(l.DiffID().String())
  354. jsonRaw := v1Image.Config()
  355. progress.Update(p.config.ProgressOutput, truncID, "Pushing")
  356. // General rule is to use ID for graph accesses and compatibilityID for
  357. // calls to session.registry()
  358. imgData := &registry.ImgData{
  359. ID: v1ID,
  360. }
  361. // Send the json
  362. if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
  363. if err == registry.ErrAlreadyExists {
  364. progress.Update(p.config.ProgressOutput, truncID, "Image already pushed, skipping")
  365. return "", nil
  366. }
  367. return "", err
  368. }
  369. arch, err := l.TarStream()
  370. if err != nil {
  371. return "", err
  372. }
  373. defer arch.Close()
  374. // don't care if this fails; best effort
  375. size, _ := l.DiffSize()
  376. // Send the layer
  377. logrus.Debugf("rendered layer for %s of [%d] size", v1ID, size)
  378. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), p.config.ProgressOutput, size, truncID, "Pushing")
  379. defer reader.Close()
  380. checksum, checksumPayload, err := p.session.PushImageLayerRegistry(v1ID, reader, ep, jsonRaw)
  381. if err != nil {
  382. return "", err
  383. }
  384. imgData.Checksum = checksum
  385. imgData.ChecksumPayload = checksumPayload
  386. // Send the checksum
  387. if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
  388. return "", err
  389. }
  390. if err := p.v1IDService.Set(v1ID, p.repoInfo.Index.Name, l.DiffID()); err != nil {
  391. logrus.Warnf("Could not set v1 ID mapping: %v", err)
  392. }
  393. progress.Update(p.config.ProgressOutput, truncID, "Image successfully pushed")
  394. return imgData.Checksum, nil
  395. }