push_v2.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. package graph
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/distribution"
  12. "github.com/docker/distribution/digest"
  13. "github.com/docker/distribution/manifest"
  14. "github.com/docker/distribution/manifest/schema1"
  15. "github.com/docker/docker/image"
  16. "github.com/docker/docker/pkg/progressreader"
  17. "github.com/docker/docker/pkg/streamformatter"
  18. "github.com/docker/docker/pkg/stringid"
  19. "github.com/docker/docker/registry"
  20. "github.com/docker/docker/runconfig"
  21. "github.com/docker/docker/utils"
  22. "golang.org/x/net/context"
  23. )
  24. const compressionBufSize = 32768
  25. type v2Pusher struct {
  26. *TagStore
  27. endpoint registry.APIEndpoint
  28. localRepo repository
  29. repoInfo *registry.RepositoryInfo
  30. config *ImagePushConfig
  31. sf *streamformatter.StreamFormatter
  32. repo distribution.Repository
  33. // layersPushed is the set of layers known to exist on the remote side.
  34. // This avoids redundant queries when pushing multiple tags that
  35. // involve the same layers.
  36. layersPushed map[digest.Digest]bool
  37. }
  38. func (p *v2Pusher) Push() (fallback bool, err error) {
  39. p.repo, err = newV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
  40. if err != nil {
  41. logrus.Debugf("Error getting v2 registry: %v", err)
  42. return true, err
  43. }
  44. return false, p.pushV2Repository(p.config.Tag)
  45. }
  46. func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
  47. logrus.Debugf("Checking %q against %#v", askedTag, p.localRepo)
  48. if len(askedTag) > 0 {
  49. if _, ok := p.localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
  50. return nil, fmt.Errorf("Tag does not exist for %s", askedTag)
  51. }
  52. return []string{askedTag}, nil
  53. }
  54. var tags []string
  55. for tag := range p.localRepo {
  56. if !utils.DigestReference(tag) {
  57. tags = append(tags, tag)
  58. }
  59. }
  60. return tags, nil
  61. }
  62. func (p *v2Pusher) pushV2Repository(tag string) error {
  63. localName := p.repoInfo.LocalName
  64. if _, found := p.poolAdd("push", localName); found {
  65. return fmt.Errorf("push or pull %s is already in progress", localName)
  66. }
  67. defer p.poolRemove("push", localName)
  68. tags, err := p.getImageTags(tag)
  69. if err != nil {
  70. return fmt.Errorf("error getting tags for %s: %s", localName, err)
  71. }
  72. if len(tags) == 0 {
  73. return fmt.Errorf("no tags to push for %s", localName)
  74. }
  75. for _, tag := range tags {
  76. if err := p.pushV2Tag(tag); err != nil {
  77. return err
  78. }
  79. }
  80. return nil
  81. }
  82. func (p *v2Pusher) pushV2Tag(tag string) error {
  83. logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
  84. layerID, exists := p.localRepo[tag]
  85. if !exists {
  86. return fmt.Errorf("tag does not exist: %s", tag)
  87. }
  88. layersSeen := make(map[string]bool)
  89. layer, err := p.graph.Get(layerID)
  90. if err != nil {
  91. return err
  92. }
  93. m := &schema1.Manifest{
  94. Versioned: manifest.Versioned{
  95. SchemaVersion: 1,
  96. },
  97. Name: p.repo.Name(),
  98. Tag: tag,
  99. Architecture: layer.Architecture,
  100. FSLayers: []schema1.FSLayer{},
  101. History: []schema1.History{},
  102. }
  103. var metadata runconfig.Config
  104. if layer != nil && layer.Config != nil {
  105. metadata = *layer.Config
  106. }
  107. out := p.config.OutStream
  108. for ; layer != nil; layer, err = p.graph.GetParent(layer) {
  109. if err != nil {
  110. return err
  111. }
  112. // break early if layer has already been seen in this image,
  113. // this prevents infinite loops on layers which loopback, this
  114. // cannot be prevented since layer IDs are not merkle hashes
  115. // TODO(dmcgowan): throw error if no valid use case is found
  116. if layersSeen[layer.ID] {
  117. break
  118. }
  119. // Skip the base layer on Windows. This cannot be pushed.
  120. if allowBaseParentImage && layer.Parent == "" {
  121. break
  122. }
  123. logrus.Debugf("Pushing layer: %s", layer.ID)
  124. if layer.Config != nil && metadata.Image != layer.ID {
  125. if err := runconfig.Merge(&metadata, layer.Config); err != nil {
  126. return err
  127. }
  128. }
  129. var exists bool
  130. dgst, err := p.graph.getLayerDigestWithLock(layer.ID)
  131. switch err {
  132. case nil:
  133. if p.layersPushed[dgst] {
  134. exists = true
  135. // break out of switch, it is already known that
  136. // the push is not needed and therefore doing a
  137. // stat is unnecessary
  138. break
  139. }
  140. _, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
  141. switch err {
  142. case nil:
  143. exists = true
  144. out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
  145. case distribution.ErrBlobUnknown:
  146. // nop
  147. default:
  148. out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
  149. return err
  150. }
  151. case errDigestNotSet:
  152. // nop
  153. case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
  154. return fmt.Errorf("error getting image checksum: %v", err)
  155. }
  156. // if digest was empty or not saved, or if blob does not exist on the remote repository,
  157. // then fetch it.
  158. if !exists {
  159. var pushDigest digest.Digest
  160. if pushDigest, err = p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
  161. return err
  162. }
  163. if dgst == "" {
  164. // Cache new checksum
  165. if err := p.graph.setLayerDigestWithLock(layer.ID, pushDigest); err != nil {
  166. return err
  167. }
  168. }
  169. dgst = pushDigest
  170. }
  171. // read v1Compatibility config, generate new if needed
  172. jsonData, err := p.graph.generateV1CompatibilityChain(layer.ID)
  173. if err != nil {
  174. return err
  175. }
  176. m.FSLayers = append(m.FSLayers, schema1.FSLayer{BlobSum: dgst})
  177. m.History = append(m.History, schema1.History{V1Compatibility: string(jsonData)})
  178. layersSeen[layer.ID] = true
  179. p.layersPushed[dgst] = true
  180. }
  181. // Fix parent chain if necessary
  182. if err = fixHistory(m); err != nil {
  183. return err
  184. }
  185. logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
  186. signed, err := schema1.Sign(m, p.trustKey)
  187. if err != nil {
  188. return err
  189. }
  190. manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
  191. if err != nil {
  192. return err
  193. }
  194. if manifestDigest != "" {
  195. out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
  196. }
  197. manSvc, err := p.repo.Manifests(context.Background())
  198. if err != nil {
  199. return err
  200. }
  201. return manSvc.Put(signed)
  202. }
  203. // fixHistory makes sure that the manifest has parent IDs that are consistent
  204. // with its image IDs. Because local image IDs are generated from the
  205. // configuration and filesystem contents, but IDs in the manifest are preserved
  206. // from the original pull, it's possible to have inconsistencies where parent
  207. // IDs don't match up with the other IDs in the manifest. This happens in the
  208. // case where an engine pulls images where are identical except the IDs from the
  209. // manifest - the local ID will be the same, and one of the v1Compatibility
  210. // files gets discarded.
  211. func fixHistory(m *schema1.Manifest) error {
  212. var lastID string
  213. for i := len(m.History) - 1; i >= 0; i-- {
  214. var historyEntry map[string]*json.RawMessage
  215. if err := json.Unmarshal([]byte(m.History[i].V1Compatibility), &historyEntry); err != nil {
  216. return err
  217. }
  218. idJSON, present := historyEntry["id"]
  219. if !present || idJSON == nil {
  220. return errors.New("missing id key in v1compatibility file")
  221. }
  222. var id string
  223. if err := json.Unmarshal(*idJSON, &id); err != nil {
  224. return err
  225. }
  226. parentJSON, present := historyEntry["parent"]
  227. if i == len(m.History)-1 {
  228. // The base layer must not reference a parent layer,
  229. // otherwise the manifest is incomplete. There is an
  230. // exception for Windows to handle base layers.
  231. if !allowBaseParentImage && present && parentJSON != nil {
  232. var parent string
  233. if err := json.Unmarshal(*parentJSON, &parent); err != nil {
  234. return err
  235. }
  236. if parent != "" {
  237. logrus.Debugf("parent id mismatch detected; fixing. parent reference: %s", parent)
  238. delete(historyEntry, "parent")
  239. fixedHistory, err := json.Marshal(historyEntry)
  240. if err != nil {
  241. return err
  242. }
  243. m.History[i].V1Compatibility = string(fixedHistory)
  244. }
  245. }
  246. } else {
  247. // For all other layers, the parent ID should equal the
  248. // ID of the next item in the history list. If it
  249. // doesn't, fix it up (but preserve all other fields,
  250. // possibly including fields that aren't known to this
  251. // engine version).
  252. if !present || parentJSON == nil {
  253. return errors.New("missing parent key in v1compatibility file")
  254. }
  255. var parent string
  256. if err := json.Unmarshal(*parentJSON, &parent); err != nil {
  257. return err
  258. }
  259. if parent != lastID {
  260. logrus.Debugf("parent id mismatch detected; fixing. parent reference: %s actual id: %s", parent, id)
  261. historyEntry["parent"] = rawJSON(lastID)
  262. fixedHistory, err := json.Marshal(historyEntry)
  263. if err != nil {
  264. return err
  265. }
  266. m.History[i].V1Compatibility = string(fixedHistory)
  267. }
  268. }
  269. lastID = id
  270. }
  271. return nil
  272. }
  273. func rawJSON(value interface{}) *json.RawMessage {
  274. jsonval, err := json.Marshal(value)
  275. if err != nil {
  276. return nil
  277. }
  278. return (*json.RawMessage)(&jsonval)
  279. }
  280. func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
  281. out := p.config.OutStream
  282. out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
  283. image, err := p.graph.Get(img.ID)
  284. if err != nil {
  285. return "", err
  286. }
  287. arch, err := p.graph.tarLayer(image)
  288. if err != nil {
  289. return "", err
  290. }
  291. defer arch.Close()
  292. // Send the layer
  293. layerUpload, err := bs.Create(context.Background())
  294. if err != nil {
  295. return "", err
  296. }
  297. defer layerUpload.Close()
  298. reader := progressreader.New(progressreader.Config{
  299. In: ioutil.NopCloser(arch), // we'll take care of close here.
  300. Out: out,
  301. Formatter: p.sf,
  302. // TODO(stevvooe): This may cause a size reporting error. Try to get
  303. // this from tar-split or elsewhere. The main issue here is that we
  304. // don't want to buffer to disk *just* to calculate the size.
  305. Size: img.Size,
  306. NewLines: false,
  307. ID: stringid.TruncateID(img.ID),
  308. Action: "Pushing",
  309. })
  310. digester := digest.Canonical.New()
  311. // HACK: The MultiWriter doesn't write directly to layerUpload because
  312. // we must make sure the ReadFrom is used, not Write. Using Write would
  313. // send a PATCH request for every Write call.
  314. pipeReader, pipeWriter := io.Pipe()
  315. // Use a bufio.Writer to avoid excessive chunking in HTTP request.
  316. bufWriter := bufio.NewWriterSize(io.MultiWriter(pipeWriter, digester.Hash()), compressionBufSize)
  317. compressor := gzip.NewWriter(bufWriter)
  318. go func() {
  319. _, err := io.Copy(compressor, reader)
  320. if err == nil {
  321. err = compressor.Close()
  322. }
  323. if err == nil {
  324. err = bufWriter.Flush()
  325. }
  326. if err != nil {
  327. pipeWriter.CloseWithError(err)
  328. } else {
  329. pipeWriter.Close()
  330. }
  331. }()
  332. out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
  333. nn, err := layerUpload.ReadFrom(pipeReader)
  334. pipeReader.Close()
  335. if err != nil {
  336. return "", err
  337. }
  338. dgst := digester.Digest()
  339. if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
  340. return "", err
  341. }
  342. logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
  343. out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
  344. return dgst, nil
  345. }