migration.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package layer // import "github.com/docker/docker/layer"
  2. import (
  3. "compress/gzip"
  4. "errors"
  5. "io"
  6. "os"
  7. digest "github.com/opencontainers/go-digest"
  8. "github.com/sirupsen/logrus"
  9. "github.com/vbatts/tar-split/tar/asm"
  10. "github.com/vbatts/tar-split/tar/storage"
  11. )
  12. func (ls *layerStore) ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID DiffID, size int64, err error) {
  13. defer func() {
  14. if err != nil {
  15. diffID, size, err = ls.checksumForGraphIDNoTarsplit(id, parent, newTarDataPath)
  16. }
  17. }()
  18. if oldTarDataPath == "" {
  19. err = errors.New("no tar-split file")
  20. return
  21. }
  22. tarDataFile, err := os.Open(oldTarDataPath)
  23. if err != nil {
  24. return
  25. }
  26. defer tarDataFile.Close()
  27. uncompressed, err := gzip.NewReader(tarDataFile)
  28. if err != nil {
  29. return
  30. }
  31. dgst := digest.Canonical.Digester()
  32. err = ls.assembleTarTo(id, uncompressed, &size, dgst.Hash())
  33. if err != nil {
  34. return
  35. }
  36. diffID = DiffID(dgst.Digest())
  37. err = os.RemoveAll(newTarDataPath)
  38. if err != nil {
  39. return
  40. }
  41. err = os.Link(oldTarDataPath, newTarDataPath)
  42. return
  43. }
  44. func (ls *layerStore) checksumForGraphIDNoTarsplit(id, parent, newTarDataPath string) (diffID DiffID, size int64, err error) {
  45. rawarchive, err := ls.driver.Diff(id, parent)
  46. if err != nil {
  47. return
  48. }
  49. defer rawarchive.Close()
  50. f, err := os.Create(newTarDataPath)
  51. if err != nil {
  52. return
  53. }
  54. defer f.Close()
  55. mfz := gzip.NewWriter(f)
  56. defer mfz.Close()
  57. metaPacker := storage.NewJSONPacker(mfz)
  58. packerCounter := &packSizeCounter{metaPacker, &size}
  59. archive, err := asm.NewInputTarStream(rawarchive, packerCounter, nil)
  60. if err != nil {
  61. return
  62. }
  63. dgst, err := digest.FromReader(archive)
  64. if err != nil {
  65. return
  66. }
  67. diffID = DiffID(dgst)
  68. return
  69. }
  70. func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, diffID DiffID, tarDataFile string, size int64) (Layer, error) {
  71. // err is used to hold the error which will always trigger
  72. // cleanup of creates sources but may not be an error returned
  73. // to the caller (already exists).
  74. var err error
  75. var p *roLayer
  76. if string(parent) != "" {
  77. p = ls.get(parent)
  78. if p == nil {
  79. return nil, ErrLayerDoesNotExist
  80. }
  81. // Release parent chain if error
  82. defer func() {
  83. if err != nil {
  84. ls.layerL.Lock()
  85. ls.releaseLayer(p)
  86. ls.layerL.Unlock()
  87. }
  88. }()
  89. }
  90. // Create new roLayer
  91. layer := &roLayer{
  92. parent: p,
  93. cacheID: graphID,
  94. referenceCount: 1,
  95. layerStore: ls,
  96. references: map[Layer]struct{}{},
  97. diffID: diffID,
  98. size: size,
  99. chainID: createChainIDFromParent(parent, diffID),
  100. }
  101. ls.layerL.Lock()
  102. defer ls.layerL.Unlock()
  103. if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
  104. // Set error for cleanup, but do not return
  105. err = errors.New("layer already exists")
  106. return existingLayer.getReference(), nil
  107. }
  108. tx, err := ls.store.StartTransaction()
  109. if err != nil {
  110. return nil, err
  111. }
  112. defer func() {
  113. if err != nil {
  114. logrus.Debugf("Cleaning up transaction after failed migration for %s: %v", graphID, err)
  115. if err := tx.Cancel(); err != nil {
  116. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  117. }
  118. }
  119. }()
  120. tsw, err := tx.TarSplitWriter(false)
  121. if err != nil {
  122. return nil, err
  123. }
  124. defer tsw.Close()
  125. tdf, err := os.Open(tarDataFile)
  126. if err != nil {
  127. return nil, err
  128. }
  129. defer tdf.Close()
  130. _, err = io.Copy(tsw, tdf)
  131. if err != nil {
  132. return nil, err
  133. }
  134. if err = storeLayer(tx, layer); err != nil {
  135. return nil, err
  136. }
  137. if err = tx.Commit(layer.chainID); err != nil {
  138. return nil, err
  139. }
  140. ls.layerMap[layer.chainID] = layer
  141. return layer.getReference(), nil
  142. }
  143. type unpackSizeCounter struct {
  144. unpacker storage.Unpacker
  145. size *int64
  146. }
  147. func (u *unpackSizeCounter) Next() (*storage.Entry, error) {
  148. e, err := u.unpacker.Next()
  149. if err == nil && u.size != nil {
  150. *u.size += e.Size
  151. }
  152. return e, err
  153. }
  154. type packSizeCounter struct {
  155. packer storage.Packer
  156. size *int64
  157. }
  158. func (p *packSizeCounter) AddEntry(e storage.Entry) (int, error) {
  159. n, err := p.packer.AddEntry(e)
  160. if err == nil && p.size != nil {
  161. *p.size += e.Size
  162. }
  163. return n, err
  164. }