push.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. package graph
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sync"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/distribution/digest"
  13. "github.com/docker/docker/cliconfig"
  14. "github.com/docker/docker/image"
  15. "github.com/docker/docker/pkg/progressreader"
  16. "github.com/docker/docker/pkg/streamformatter"
  17. "github.com/docker/docker/pkg/stringid"
  18. "github.com/docker/docker/registry"
  19. "github.com/docker/docker/runconfig"
  20. "github.com/docker/docker/utils"
  21. "github.com/docker/libtrust"
  22. )
  23. var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
  24. type ImagePushConfig struct {
  25. MetaHeaders map[string][]string
  26. AuthConfig *cliconfig.AuthConfig
  27. Tag string
  28. Json bool
  29. OutStream io.Writer
  30. }
  31. // Retrieve the all the images to be uploaded in the correct order
  32. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  33. var (
  34. imageList []string
  35. imagesSeen = make(map[string]bool)
  36. tagsByImage = make(map[string][]string)
  37. )
  38. for tag, id := range localRepo {
  39. if requestedTag != "" && requestedTag != tag {
  40. // Include only the requested tag.
  41. continue
  42. }
  43. if utils.DigestReference(tag) {
  44. // Ignore digest references.
  45. continue
  46. }
  47. var imageListForThisTag []string
  48. tagsByImage[id] = append(tagsByImage[id], tag)
  49. for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
  50. if err != nil {
  51. return nil, nil, err
  52. }
  53. if imagesSeen[img.ID] {
  54. // This image is already on the list, we can ignore it and all its parents
  55. break
  56. }
  57. imagesSeen[img.ID] = true
  58. imageListForThisTag = append(imageListForThisTag, img.ID)
  59. }
  60. // reverse the image list for this tag (so the "most"-parent image is first)
  61. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  62. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  63. }
  64. // append to main image list
  65. imageList = append(imageList, imageListForThisTag...)
  66. }
  67. if len(imageList) == 0 {
  68. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  69. }
  70. logrus.Debugf("Image list: %v", imageList)
  71. logrus.Debugf("Tags by image: %v", tagsByImage)
  72. return imageList, tagsByImage, nil
  73. }
  74. func (s *TagStore) getImageTags(localRepo map[string]string, askedTag string) ([]string, error) {
  75. logrus.Debugf("Checking %s against %#v", askedTag, localRepo)
  76. if len(askedTag) > 0 {
  77. if _, ok := localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
  78. return nil, fmt.Errorf("Tag does not exist: %s", askedTag)
  79. }
  80. return []string{askedTag}, nil
  81. }
  82. var tags []string
  83. for tag := range localRepo {
  84. if !utils.DigestReference(tag) {
  85. tags = append(tags, tag)
  86. }
  87. }
  88. return tags, nil
  89. }
  90. // createImageIndex returns an index of an image's layer IDs and tags.
  91. func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
  92. var imageIndex []*registry.ImgData
  93. for _, id := range images {
  94. if tags, hasTags := tags[id]; hasTags {
  95. // If an image has tags you must add an entry in the image index
  96. // for each tag
  97. for _, tag := range tags {
  98. imageIndex = append(imageIndex, &registry.ImgData{
  99. ID: id,
  100. Tag: tag,
  101. })
  102. }
  103. continue
  104. }
  105. // If the image does not have a tag it still needs to be sent to the
  106. // registry with an empty tag so that it is accociated with the repository
  107. imageIndex = append(imageIndex, &registry.ImgData{
  108. ID: id,
  109. Tag: "",
  110. })
  111. }
  112. return imageIndex
  113. }
  114. type imagePushData struct {
  115. id string
  116. endpoint string
  117. tokens []string
  118. }
  119. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  120. // and if it is absent then it sends the image id to the channel to be pushed.
  121. func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *streamformatter.StreamFormatter,
  122. images chan imagePushData, imagesToPush chan string) {
  123. defer wg.Done()
  124. for image := range images {
  125. if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
  126. logrus.Errorf("Error in LookupRemoteImage: %s", err)
  127. imagesToPush <- image.id
  128. continue
  129. }
  130. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", stringid.TruncateID(image.id)))
  131. }
  132. }
  133. func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
  134. tags map[string][]string, repo *registry.RepositoryData, sf *streamformatter.StreamFormatter, r *registry.Session) error {
  135. workerCount := len(imageIDs)
  136. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  137. if workerCount > 5 {
  138. workerCount = 5
  139. }
  140. var (
  141. wg = &sync.WaitGroup{}
  142. imageData = make(chan imagePushData, workerCount*2)
  143. imagesToPush = make(chan string, workerCount*2)
  144. pushes = make(chan map[string]struct{}, 1)
  145. )
  146. for i := 0; i < workerCount; i++ {
  147. wg.Add(1)
  148. go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
  149. }
  150. // start a go routine that consumes the images to push
  151. go func() {
  152. shouldPush := make(map[string]struct{})
  153. for id := range imagesToPush {
  154. shouldPush[id] = struct{}{}
  155. }
  156. pushes <- shouldPush
  157. }()
  158. for _, id := range imageIDs {
  159. imageData <- imagePushData{
  160. id: id,
  161. endpoint: endpoint,
  162. tokens: repo.Tokens,
  163. }
  164. }
  165. // close the channel to notify the workers that there will be no more images to check.
  166. close(imageData)
  167. wg.Wait()
  168. close(imagesToPush)
  169. // wait for all the images that require pushes to be collected into a consumable map.
  170. shouldPush := <-pushes
  171. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  172. // is very important that is why we are still iterating over the ordered list of imageIDs.
  173. for _, id := range imageIDs {
  174. if _, push := shouldPush[id]; push {
  175. if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
  176. // FIXME: Continue on error?
  177. return err
  178. }
  179. }
  180. for _, tag := range tags[id] {
  181. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
  182. if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
  183. return err
  184. }
  185. }
  186. }
  187. return nil
  188. }
  189. // pushRepository pushes layers that do not already exist on the registry.
  190. func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
  191. repoInfo *registry.RepositoryInfo, localRepo map[string]string,
  192. tag string, sf *streamformatter.StreamFormatter) error {
  193. logrus.Debugf("Local repo: %s", localRepo)
  194. out = utils.NewWriteFlusher(out)
  195. imgList, tags, err := s.getImageList(localRepo, tag)
  196. if err != nil {
  197. return err
  198. }
  199. out.Write(sf.FormatStatus("", "Sending image list"))
  200. imageIndex := s.createImageIndex(imgList, tags)
  201. logrus.Debugf("Preparing to push %s with the following images and tags", localRepo)
  202. for _, data := range imageIndex {
  203. logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  204. }
  205. // Register all the images in a repository with the registry
  206. // If an image is not in this list it will not be associated with the repository
  207. repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
  208. if err != nil {
  209. return err
  210. }
  211. nTag := 1
  212. if tag == "" {
  213. nTag = len(localRepo)
  214. }
  215. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
  216. // push the repository to each of the endpoints only if it does not exist.
  217. for _, endpoint := range repoData.Endpoints {
  218. if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
  219. return err
  220. }
  221. }
  222. _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
  223. return err
  224. }
  225. func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) {
  226. out = utils.NewWriteFlusher(out)
  227. jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
  228. if err != nil {
  229. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  230. }
  231. out.Write(sf.FormatProgress(stringid.TruncateID(imgID), "Pushing", nil))
  232. imgData := &registry.ImgData{
  233. ID: imgID,
  234. }
  235. // Send the json
  236. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  237. if err == registry.ErrAlreadyExists {
  238. out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  239. return "", nil
  240. }
  241. return "", err
  242. }
  243. layerData, err := s.graph.TempLayerArchive(imgID, sf, out)
  244. if err != nil {
  245. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  246. }
  247. defer os.RemoveAll(layerData.Name())
  248. // Send the layer
  249. logrus.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  250. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID,
  251. progressreader.New(progressreader.Config{
  252. In: layerData,
  253. Out: out,
  254. Formatter: sf,
  255. Size: int(layerData.Size),
  256. NewLines: false,
  257. ID: stringid.TruncateID(imgData.ID),
  258. Action: "Pushing",
  259. }), ep, token, jsonRaw)
  260. if err != nil {
  261. return "", err
  262. }
  263. imgData.Checksum = checksum
  264. imgData.ChecksumPayload = checksumPayload
  265. // Send the checksum
  266. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  267. return "", err
  268. }
  269. out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image successfully pushed", nil))
  270. return imgData.Checksum, nil
  271. }
  272. func (s *TagStore) pushV2Repository(r *registry.Session, localRepo Repository, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter) error {
  273. endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
  274. if err != nil {
  275. if repoInfo.Index.Official {
  276. logrus.Debugf("Unable to push to V2 registry, falling back to v1: %s", err)
  277. return ErrV2RegistryUnavailable
  278. }
  279. return fmt.Errorf("error getting registry endpoint: %s", err)
  280. }
  281. tags, err := s.getImageTags(localRepo, tag)
  282. if err != nil {
  283. return err
  284. }
  285. if len(tags) == 0 {
  286. return fmt.Errorf("No tags to push for %s", repoInfo.LocalName)
  287. }
  288. auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, false)
  289. if err != nil {
  290. return fmt.Errorf("error getting authorization: %s", err)
  291. }
  292. for _, tag := range tags {
  293. logrus.Debugf("Pushing repository: %s:%s", repoInfo.CanonicalName, tag)
  294. layerId, exists := localRepo[tag]
  295. if !exists {
  296. return fmt.Errorf("tag does not exist: %s", tag)
  297. }
  298. layer, err := s.graph.Get(layerId)
  299. if err != nil {
  300. return err
  301. }
  302. m := &registry.ManifestData{
  303. SchemaVersion: 1,
  304. Name: repoInfo.RemoteName,
  305. Tag: tag,
  306. Architecture: layer.Architecture,
  307. }
  308. var metadata runconfig.Config
  309. if layer.Config != nil {
  310. metadata = *layer.Config
  311. }
  312. layersSeen := make(map[string]bool)
  313. layers := []*image.Image{layer}
  314. for ; layer != nil; layer, err = layer.GetParent() {
  315. if err != nil {
  316. return err
  317. }
  318. if layersSeen[layer.ID] {
  319. break
  320. }
  321. layers = append(layers, layer)
  322. layersSeen[layer.ID] = true
  323. }
  324. m.FSLayers = make([]*registry.FSLayer, len(layers))
  325. m.History = make([]*registry.ManifestHistory, len(layers))
  326. // Schema version 1 requires layer ordering from top to root
  327. for i, layer := range layers {
  328. logrus.Debugf("Pushing layer: %s", layer.ID)
  329. if layer.Config != nil && metadata.Image != layer.ID {
  330. err = runconfig.Merge(&metadata, layer.Config)
  331. if err != nil {
  332. return err
  333. }
  334. }
  335. jsonData, err := layer.RawJson()
  336. if err != nil {
  337. return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
  338. }
  339. checksum, err := layer.GetCheckSum(s.graph.ImageRoot(layer.ID))
  340. if err != nil {
  341. return fmt.Errorf("error getting image checksum: %s", err)
  342. }
  343. var exists bool
  344. if len(checksum) > 0 {
  345. dgst, err := digest.ParseDigest(checksum)
  346. if err != nil {
  347. return fmt.Errorf("Invalid checksum %s: %s", checksum, err)
  348. }
  349. // Call mount blob
  350. exists, err = r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, dgst, auth)
  351. if err != nil {
  352. out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
  353. return err
  354. }
  355. }
  356. if !exists {
  357. if cs, err := s.pushV2Image(r, layer, endpoint, repoInfo.RemoteName, sf, out, auth); err != nil {
  358. return err
  359. } else if cs != checksum {
  360. // Cache new checksum
  361. if err := layer.SaveCheckSum(s.graph.ImageRoot(layer.ID), cs); err != nil {
  362. return err
  363. }
  364. checksum = cs
  365. }
  366. } else {
  367. out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
  368. }
  369. m.FSLayers[i] = &registry.FSLayer{BlobSum: checksum}
  370. m.History[i] = &registry.ManifestHistory{V1Compatibility: string(jsonData)}
  371. }
  372. if err := checkValidManifest(m); err != nil {
  373. return fmt.Errorf("invalid manifest: %s", err)
  374. }
  375. logrus.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag)
  376. mBytes, err := json.MarshalIndent(m, "", " ")
  377. if err != nil {
  378. return err
  379. }
  380. js, err := libtrust.NewJSONSignature(mBytes)
  381. if err != nil {
  382. return err
  383. }
  384. if err = js.Sign(s.trustKey); err != nil {
  385. return err
  386. }
  387. signedBody, err := js.PrettySignature("signatures")
  388. if err != nil {
  389. return err
  390. }
  391. logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID())
  392. // push the manifest
  393. digest, err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, signedBody, mBytes, auth)
  394. if err != nil {
  395. return err
  396. }
  397. out.Write(sf.FormatStatus("", "Digest: %s", digest))
  398. }
  399. return nil
  400. }
  401. // PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
  402. func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName string, sf *streamformatter.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) (string, error) {
  403. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil))
  404. image, err := s.graph.Get(img.ID)
  405. if err != nil {
  406. return "", err
  407. }
  408. arch, err := image.TarLayer()
  409. if err != nil {
  410. return "", err
  411. }
  412. defer arch.Close()
  413. tf, err := s.graph.newTempFile()
  414. if err != nil {
  415. return "", err
  416. }
  417. defer func() {
  418. tf.Close()
  419. os.Remove(tf.Name())
  420. }()
  421. size, dgst, err := bufferToFile(tf, arch)
  422. // Send the layer
  423. logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
  424. if err := r.PutV2ImageBlob(endpoint, imageName, dgst,
  425. progressreader.New(progressreader.Config{
  426. In: tf,
  427. Out: out,
  428. Formatter: sf,
  429. Size: int(size),
  430. NewLines: false,
  431. ID: stringid.TruncateID(img.ID),
  432. Action: "Pushing",
  433. }), auth); err != nil {
  434. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image push failed", nil))
  435. return "", err
  436. }
  437. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil))
  438. return dgst.String(), nil
  439. }
  440. // FIXME: Allow to interrupt current push when new push of same image is done.
  441. func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
  442. var (
  443. sf = streamformatter.NewStreamFormatter(imagePushConfig.Json)
  444. )
  445. // Resolve the Repository name from fqn to RepositoryInfo
  446. repoInfo, err := s.registryService.ResolveRepository(localName)
  447. if err != nil {
  448. return err
  449. }
  450. if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
  451. return err
  452. }
  453. defer s.poolRemove("push", repoInfo.LocalName)
  454. endpoint, err := repoInfo.GetEndpoint()
  455. if err != nil {
  456. return err
  457. }
  458. r, err := registry.NewSession(imagePushConfig.AuthConfig, registry.HTTPRequestFactory(imagePushConfig.MetaHeaders), endpoint, false)
  459. if err != nil {
  460. return err
  461. }
  462. reposLen := 1
  463. if imagePushConfig.Tag == "" {
  464. reposLen = len(s.Repositories[repoInfo.LocalName])
  465. }
  466. imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
  467. // If it fails, try to get the repository
  468. localRepo, exists := s.Repositories[repoInfo.LocalName]
  469. if !exists {
  470. return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
  471. }
  472. if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
  473. err := s.pushV2Repository(r, localRepo, imagePushConfig.OutStream, repoInfo, imagePushConfig.Tag, sf)
  474. if err == nil {
  475. s.eventsService.Log("push", repoInfo.LocalName, "")
  476. return nil
  477. }
  478. if err != ErrV2RegistryUnavailable {
  479. return fmt.Errorf("Error pushing to registry: %s", err)
  480. }
  481. }
  482. if err := s.pushRepository(r, imagePushConfig.OutStream, repoInfo, localRepo, imagePushConfig.Tag, sf); err != nil {
  483. return err
  484. }
  485. s.eventsService.Log("push", repoInfo.LocalName, "")
  486. return nil
  487. }