push.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. package graph
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "sync"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/docker/distribution/digest"
  11. "github.com/docker/docker/cliconfig"
  12. "github.com/docker/docker/image"
  13. "github.com/docker/docker/pkg/ioutils"
  14. "github.com/docker/docker/pkg/progressreader"
  15. "github.com/docker/docker/pkg/streamformatter"
  16. "github.com/docker/docker/pkg/stringid"
  17. "github.com/docker/docker/pkg/transport"
  18. "github.com/docker/docker/registry"
  19. "github.com/docker/docker/runconfig"
  20. "github.com/docker/docker/utils"
  21. "github.com/docker/libtrust"
  22. )
  23. var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
  24. type ImagePushConfig struct {
  25. MetaHeaders map[string][]string
  26. AuthConfig *cliconfig.AuthConfig
  27. Tag string
  28. OutStream io.Writer
  29. }
  30. // Retrieve the all the images to be uploaded in the correct order
  31. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  32. var (
  33. imageList []string
  34. imagesSeen = make(map[string]bool)
  35. tagsByImage = make(map[string][]string)
  36. )
  37. for tag, id := range localRepo {
  38. if requestedTag != "" && requestedTag != tag {
  39. // Include only the requested tag.
  40. continue
  41. }
  42. if utils.DigestReference(tag) {
  43. // Ignore digest references.
  44. continue
  45. }
  46. var imageListForThisTag []string
  47. tagsByImage[id] = append(tagsByImage[id], tag)
  48. for img, err := s.graph.Get(id); img != nil; img, err = s.graph.GetParent(img) {
  49. if err != nil {
  50. return nil, nil, err
  51. }
  52. if imagesSeen[img.ID] {
  53. // This image is already on the list, we can ignore it and all its parents
  54. break
  55. }
  56. imagesSeen[img.ID] = true
  57. imageListForThisTag = append(imageListForThisTag, img.ID)
  58. }
  59. // reverse the image list for this tag (so the "most"-parent image is first)
  60. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  61. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  62. }
  63. // append to main image list
  64. imageList = append(imageList, imageListForThisTag...)
  65. }
  66. if len(imageList) == 0 {
  67. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  68. }
  69. logrus.Debugf("Image list: %v", imageList)
  70. logrus.Debugf("Tags by image: %v", tagsByImage)
  71. return imageList, tagsByImage, nil
  72. }
  73. func (s *TagStore) getImageTags(localRepo map[string]string, askedTag string) ([]string, error) {
  74. logrus.Debugf("Checking %s against %#v", askedTag, localRepo)
  75. if len(askedTag) > 0 {
  76. if _, ok := localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
  77. return nil, fmt.Errorf("Tag does not exist: %s", askedTag)
  78. }
  79. return []string{askedTag}, nil
  80. }
  81. var tags []string
  82. for tag := range localRepo {
  83. if !utils.DigestReference(tag) {
  84. tags = append(tags, tag)
  85. }
  86. }
  87. return tags, nil
  88. }
  89. // createImageIndex returns an index of an image's layer IDs and tags.
  90. func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
  91. var imageIndex []*registry.ImgData
  92. for _, id := range images {
  93. if tags, hasTags := tags[id]; hasTags {
  94. // If an image has tags you must add an entry in the image index
  95. // for each tag
  96. for _, tag := range tags {
  97. imageIndex = append(imageIndex, &registry.ImgData{
  98. ID: id,
  99. Tag: tag,
  100. })
  101. }
  102. continue
  103. }
  104. // If the image does not have a tag it still needs to be sent to the
  105. // registry with an empty tag so that it is accociated with the repository
  106. imageIndex = append(imageIndex, &registry.ImgData{
  107. ID: id,
  108. Tag: "",
  109. })
  110. }
  111. return imageIndex
  112. }
  113. type imagePushData struct {
  114. id string
  115. endpoint string
  116. tokens []string
  117. }
  118. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  119. // and if it is absent then it sends the image id to the channel to be pushed.
  120. func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *streamformatter.StreamFormatter,
  121. images chan imagePushData, imagesToPush chan string) {
  122. defer wg.Done()
  123. for image := range images {
  124. if err := r.LookupRemoteImage(image.id, image.endpoint); err != nil {
  125. logrus.Errorf("Error in LookupRemoteImage: %s", err)
  126. imagesToPush <- image.id
  127. continue
  128. }
  129. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", stringid.TruncateID(image.id)))
  130. }
  131. }
  132. func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
  133. tags map[string][]string, repo *registry.RepositoryData, sf *streamformatter.StreamFormatter, r *registry.Session) error {
  134. workerCount := len(imageIDs)
  135. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  136. if workerCount > 5 {
  137. workerCount = 5
  138. }
  139. var (
  140. wg = &sync.WaitGroup{}
  141. imageData = make(chan imagePushData, workerCount*2)
  142. imagesToPush = make(chan string, workerCount*2)
  143. pushes = make(chan map[string]struct{}, 1)
  144. )
  145. for i := 0; i < workerCount; i++ {
  146. wg.Add(1)
  147. go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
  148. }
  149. // start a go routine that consumes the images to push
  150. go func() {
  151. shouldPush := make(map[string]struct{})
  152. for id := range imagesToPush {
  153. shouldPush[id] = struct{}{}
  154. }
  155. pushes <- shouldPush
  156. }()
  157. for _, id := range imageIDs {
  158. imageData <- imagePushData{
  159. id: id,
  160. endpoint: endpoint,
  161. tokens: repo.Tokens,
  162. }
  163. }
  164. // close the channel to notify the workers that there will be no more images to check.
  165. close(imageData)
  166. wg.Wait()
  167. close(imagesToPush)
  168. // wait for all the images that require pushes to be collected into a consumable map.
  169. shouldPush := <-pushes
  170. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  171. // is very important that is why we are still iterating over the ordered list of imageIDs.
  172. for _, id := range imageIDs {
  173. if _, push := shouldPush[id]; push {
  174. if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
  175. // FIXME: Continue on error?
  176. return err
  177. }
  178. }
  179. for _, tag := range tags[id] {
  180. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
  181. if err := r.PushRegistryTag(remoteName, id, tag, endpoint); err != nil {
  182. return err
  183. }
  184. }
  185. }
  186. return nil
  187. }
  188. // pushRepository pushes layers that do not already exist on the registry.
  189. func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
  190. repoInfo *registry.RepositoryInfo, localRepo map[string]string,
  191. tag string, sf *streamformatter.StreamFormatter) error {
  192. logrus.Debugf("Local repo: %s", localRepo)
  193. out = ioutils.NewWriteFlusher(out)
  194. imgList, tags, err := s.getImageList(localRepo, tag)
  195. if err != nil {
  196. return err
  197. }
  198. out.Write(sf.FormatStatus("", "Sending image list"))
  199. imageIndex := s.createImageIndex(imgList, tags)
  200. logrus.Debugf("Preparing to push %s with the following images and tags", localRepo)
  201. for _, data := range imageIndex {
  202. logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  203. }
  204. // Register all the images in a repository with the registry
  205. // If an image is not in this list it will not be associated with the repository
  206. repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
  207. if err != nil {
  208. return err
  209. }
  210. nTag := 1
  211. if tag == "" {
  212. nTag = len(localRepo)
  213. }
  214. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
  215. // push the repository to each of the endpoints only if it does not exist.
  216. for _, endpoint := range repoData.Endpoints {
  217. if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
  218. return err
  219. }
  220. }
  221. _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
  222. return err
  223. }
  224. func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) {
  225. out = ioutils.NewWriteFlusher(out)
  226. jsonRaw, err := s.graph.RawJSON(imgID)
  227. if err != nil {
  228. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  229. }
  230. out.Write(sf.FormatProgress(stringid.TruncateID(imgID), "Pushing", nil))
  231. imgData := &registry.ImgData{
  232. ID: imgID,
  233. }
  234. // Send the json
  235. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
  236. if err == registry.ErrAlreadyExists {
  237. out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  238. return "", nil
  239. }
  240. return "", err
  241. }
  242. layerData, err := s.graph.TempLayerArchive(imgID, sf, out)
  243. if err != nil {
  244. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  245. }
  246. defer os.RemoveAll(layerData.Name())
  247. // Send the layer
  248. logrus.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  249. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID,
  250. progressreader.New(progressreader.Config{
  251. In: layerData,
  252. Out: out,
  253. Formatter: sf,
  254. Size: int(layerData.Size),
  255. NewLines: false,
  256. ID: stringid.TruncateID(imgData.ID),
  257. Action: "Pushing",
  258. }), ep, jsonRaw)
  259. if err != nil {
  260. return "", err
  261. }
  262. imgData.Checksum = checksum
  263. imgData.ChecksumPayload = checksumPayload
  264. // Send the checksum
  265. if err := r.PushImageChecksumRegistry(imgData, ep); err != nil {
  266. return "", err
  267. }
  268. out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image successfully pushed", nil))
  269. return imgData.Checksum, nil
  270. }
  271. func (s *TagStore) pushV2Repository(r *registry.Session, localRepo Repository, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter) error {
  272. endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
  273. if err != nil {
  274. if repoInfo.Index.Official {
  275. logrus.Debugf("Unable to push to V2 registry, falling back to v1: %s", err)
  276. return ErrV2RegistryUnavailable
  277. }
  278. return fmt.Errorf("error getting registry endpoint: %s", err)
  279. }
  280. tags, err := s.getImageTags(localRepo, tag)
  281. if err != nil {
  282. return err
  283. }
  284. if len(tags) == 0 {
  285. return fmt.Errorf("No tags to push for %s", repoInfo.LocalName)
  286. }
  287. auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, false)
  288. if err != nil {
  289. return fmt.Errorf("error getting authorization: %s", err)
  290. }
  291. if !auth.CanAuthorizeV2() {
  292. return ErrV2RegistryUnavailable
  293. }
  294. for _, tag := range tags {
  295. logrus.Debugf("Pushing repository: %s:%s", repoInfo.CanonicalName, tag)
  296. layerId, exists := localRepo[tag]
  297. if !exists {
  298. return fmt.Errorf("tag does not exist: %s", tag)
  299. }
  300. layer, err := s.graph.Get(layerId)
  301. if err != nil {
  302. return err
  303. }
  304. m := &registry.ManifestData{
  305. SchemaVersion: 1,
  306. Name: repoInfo.RemoteName,
  307. Tag: tag,
  308. Architecture: layer.Architecture,
  309. }
  310. var metadata runconfig.Config
  311. if layer.Config != nil {
  312. metadata = *layer.Config
  313. }
  314. layersSeen := make(map[string]bool)
  315. layers := []*image.Image{}
  316. for ; layer != nil; layer, err = s.graph.GetParent(layer) {
  317. if err != nil {
  318. return err
  319. }
  320. if layersSeen[layer.ID] {
  321. break
  322. }
  323. layers = append(layers, layer)
  324. layersSeen[layer.ID] = true
  325. }
  326. m.FSLayers = make([]*registry.FSLayer, len(layers))
  327. m.History = make([]*registry.ManifestHistory, len(layers))
  328. // Schema version 1 requires layer ordering from top to root
  329. for i, layer := range layers {
  330. logrus.Debugf("Pushing layer: %s", layer.ID)
  331. if layer.Config != nil && metadata.Image != layer.ID {
  332. if err := runconfig.Merge(&metadata, layer.Config); err != nil {
  333. return err
  334. }
  335. }
  336. jsonData, err := s.graph.RawJSON(layer.ID)
  337. if err != nil {
  338. return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
  339. }
  340. var exists bool
  341. dgst, err := s.graph.GetDigest(layer.ID)
  342. if err != nil {
  343. if err != ErrDigestNotSet {
  344. return fmt.Errorf("error getting image checksum: %s", err)
  345. }
  346. } else {
  347. // Call mount blob
  348. exists, err = r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, dgst, auth)
  349. if err != nil {
  350. out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
  351. return err
  352. }
  353. }
  354. if !exists {
  355. if pushDigest, err := s.pushV2Image(r, layer, endpoint, repoInfo.RemoteName, sf, out, auth); err != nil {
  356. return err
  357. } else if pushDigest != dgst {
  358. // Cache new checksum
  359. if err := s.graph.SetDigest(layer.ID, pushDigest); err != nil {
  360. return err
  361. }
  362. dgst = pushDigest
  363. }
  364. } else {
  365. out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
  366. }
  367. m.FSLayers[i] = &registry.FSLayer{BlobSum: dgst.String()}
  368. m.History[i] = &registry.ManifestHistory{V1Compatibility: string(jsonData)}
  369. }
  370. if err := validateManifest(m); err != nil {
  371. return fmt.Errorf("invalid manifest: %s", err)
  372. }
  373. logrus.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag)
  374. mBytes, err := json.MarshalIndent(m, "", " ")
  375. if err != nil {
  376. return err
  377. }
  378. js, err := libtrust.NewJSONSignature(mBytes)
  379. if err != nil {
  380. return err
  381. }
  382. if err = js.Sign(s.trustKey); err != nil {
  383. return err
  384. }
  385. signedBody, err := js.PrettySignature("signatures")
  386. if err != nil {
  387. return err
  388. }
  389. logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID())
  390. // push the manifest
  391. digest, err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, signedBody, mBytes, auth)
  392. if err != nil {
  393. return err
  394. }
  395. out.Write(sf.FormatStatus("", "Digest: %s", digest))
  396. }
  397. return nil
  398. }
  399. // PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
  400. func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName string, sf *streamformatter.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) (digest.Digest, error) {
  401. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil))
  402. image, err := s.graph.Get(img.ID)
  403. if err != nil {
  404. return "", err
  405. }
  406. arch, err := s.graph.TarLayer(image)
  407. if err != nil {
  408. return "", err
  409. }
  410. defer arch.Close()
  411. tf, err := s.graph.newTempFile()
  412. if err != nil {
  413. return "", err
  414. }
  415. defer func() {
  416. tf.Close()
  417. os.Remove(tf.Name())
  418. }()
  419. size, dgst, err := bufferToFile(tf, arch)
  420. // Send the layer
  421. logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
  422. if err := r.PutV2ImageBlob(endpoint, imageName, dgst,
  423. progressreader.New(progressreader.Config{
  424. In: tf,
  425. Out: out,
  426. Formatter: sf,
  427. Size: int(size),
  428. NewLines: false,
  429. ID: stringid.TruncateID(img.ID),
  430. Action: "Pushing",
  431. }), auth); err != nil {
  432. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image push failed", nil))
  433. return "", err
  434. }
  435. out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil))
  436. return dgst, nil
  437. }
  438. // FIXME: Allow to interrupt current push when new push of same image is done.
  439. func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
  440. var (
  441. sf = streamformatter.NewJSONStreamFormatter()
  442. )
  443. // Resolve the Repository name from fqn to RepositoryInfo
  444. repoInfo, err := s.registryService.ResolveRepository(localName)
  445. if err != nil {
  446. return err
  447. }
  448. if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
  449. return err
  450. }
  451. defer s.poolRemove("push", repoInfo.LocalName)
  452. endpoint, err := repoInfo.GetEndpoint(imagePushConfig.MetaHeaders)
  453. if err != nil {
  454. return err
  455. }
  456. // TODO(tiborvass): reuse client from endpoint?
  457. // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
  458. tr := transport.NewTransport(
  459. registry.NewTransport(registry.NoTimeout, endpoint.IsSecure),
  460. registry.DockerHeaders(imagePushConfig.MetaHeaders)...,
  461. )
  462. client := registry.HTTPClient(tr)
  463. r, err := registry.NewSession(client, imagePushConfig.AuthConfig, endpoint)
  464. if err != nil {
  465. return err
  466. }
  467. reposLen := 1
  468. if imagePushConfig.Tag == "" {
  469. reposLen = len(s.Repositories[repoInfo.LocalName])
  470. }
  471. imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
  472. // If it fails, try to get the repository
  473. localRepo, exists := s.Repositories[repoInfo.LocalName]
  474. if !exists {
  475. return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
  476. }
  477. if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
  478. err := s.pushV2Repository(r, localRepo, imagePushConfig.OutStream, repoInfo, imagePushConfig.Tag, sf)
  479. if err == nil {
  480. s.eventsService.Log("push", repoInfo.LocalName, "")
  481. return nil
  482. }
  483. if err != ErrV2RegistryUnavailable {
  484. return fmt.Errorf("Error pushing to registry: %s", err)
  485. }
  486. logrus.Debug("V2 registry is unavailable, falling back on V1")
  487. }
  488. if err := s.pushRepository(r, imagePushConfig.OutStream, repoInfo, localRepo, imagePushConfig.Tag, sf); err != nil {
  489. return err
  490. }
  491. s.eventsService.Log("push", repoInfo.LocalName, "")
  492. return nil
  493. }