migration.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package layer // import "github.com/docker/docker/layer"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "errors"
  6. "io"
  7. "os"
  8. "github.com/containerd/log"
  9. "github.com/opencontainers/go-digest"
  10. "github.com/vbatts/tar-split/tar/asm"
  11. "github.com/vbatts/tar-split/tar/storage"
  12. )
  13. func (ls *layerStore) ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID DiffID, size int64, err error) {
  14. defer func() {
  15. if err != nil {
  16. diffID, size, err = ls.checksumForGraphIDNoTarsplit(id, parent, newTarDataPath)
  17. }
  18. }()
  19. if oldTarDataPath == "" {
  20. err = errors.New("no tar-split file")
  21. return
  22. }
  23. tarDataFile, err := os.Open(oldTarDataPath)
  24. if err != nil {
  25. return
  26. }
  27. defer tarDataFile.Close()
  28. uncompressed, err := gzip.NewReader(tarDataFile)
  29. if err != nil {
  30. return
  31. }
  32. dgst := digest.Canonical.Digester()
  33. err = ls.assembleTarTo(id, uncompressed, &size, dgst.Hash())
  34. if err != nil {
  35. return
  36. }
  37. diffID = DiffID(dgst.Digest())
  38. err = os.RemoveAll(newTarDataPath)
  39. if err != nil {
  40. return
  41. }
  42. err = os.Link(oldTarDataPath, newTarDataPath)
  43. return
  44. }
  45. func (ls *layerStore) checksumForGraphIDNoTarsplit(id, parent, newTarDataPath string) (diffID DiffID, size int64, err error) {
  46. rawarchive, err := ls.driver.Diff(id, parent)
  47. if err != nil {
  48. return
  49. }
  50. defer rawarchive.Close()
  51. f, err := os.Create(newTarDataPath)
  52. if err != nil {
  53. return
  54. }
  55. defer f.Close()
  56. mfz := gzip.NewWriter(f)
  57. defer mfz.Close()
  58. metaPacker := storage.NewJSONPacker(mfz)
  59. packerCounter := &packSizeCounter{metaPacker, &size}
  60. archive, err := asm.NewInputTarStream(rawarchive, packerCounter, nil)
  61. if err != nil {
  62. return
  63. }
  64. dgst, err := digest.FromReader(archive)
  65. if err != nil {
  66. return
  67. }
  68. diffID = DiffID(dgst)
  69. return
  70. }
  71. func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, diffID DiffID, tarDataFile string, size int64) (Layer, error) {
  72. // err is used to hold the error which will always trigger
  73. // cleanup of creates sources but may not be an error returned
  74. // to the caller (already exists).
  75. var err error
  76. var p *roLayer
  77. if string(parent) != "" {
  78. ls.layerL.Lock()
  79. p = ls.get(parent)
  80. ls.layerL.Unlock()
  81. if p == nil {
  82. return nil, ErrLayerDoesNotExist
  83. }
  84. // Release parent chain if error
  85. defer func() {
  86. if err != nil {
  87. ls.layerL.Lock()
  88. ls.releaseLayer(p)
  89. ls.layerL.Unlock()
  90. }
  91. }()
  92. }
  93. // Create new roLayer
  94. layer := &roLayer{
  95. parent: p,
  96. cacheID: graphID,
  97. referenceCount: 1,
  98. layerStore: ls,
  99. references: map[Layer]struct{}{},
  100. diffID: diffID,
  101. size: size,
  102. chainID: createChainIDFromParent(parent, diffID),
  103. }
  104. ls.layerL.Lock()
  105. defer ls.layerL.Unlock()
  106. if existingLayer := ls.get(layer.chainID); existingLayer != nil {
  107. // Set error for cleanup, but do not return
  108. err = errors.New("layer already exists")
  109. return existingLayer.getReference(), nil
  110. }
  111. tx, err := ls.store.StartTransaction()
  112. if err != nil {
  113. return nil, err
  114. }
  115. defer func() {
  116. if err != nil {
  117. log.G(context.TODO()).Debugf("Cleaning up transaction after failed migration for %s: %v", graphID, err)
  118. if err := tx.Cancel(); err != nil {
  119. log.G(context.TODO()).Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  120. }
  121. }
  122. }()
  123. tsw, err := tx.TarSplitWriter(false)
  124. if err != nil {
  125. return nil, err
  126. }
  127. defer tsw.Close()
  128. tdf, err := os.Open(tarDataFile)
  129. if err != nil {
  130. return nil, err
  131. }
  132. defer tdf.Close()
  133. _, err = io.Copy(tsw, tdf)
  134. if err != nil {
  135. return nil, err
  136. }
  137. if err = storeLayer(tx, layer); err != nil {
  138. return nil, err
  139. }
  140. if err = tx.Commit(layer.chainID); err != nil {
  141. return nil, err
  142. }
  143. ls.layerMap[layer.chainID] = layer
  144. return layer.getReference(), nil
  145. }
  146. type unpackSizeCounter struct {
  147. unpacker storage.Unpacker
  148. size *int64
  149. }
  150. func (u *unpackSizeCounter) Next() (*storage.Entry, error) {
  151. e, err := u.unpacker.Next()
  152. if err == nil && u.size != nil {
  153. *u.size += e.Size
  154. }
  155. return e, err
  156. }
  157. type packSizeCounter struct {
  158. packer storage.Packer
  159. size *int64
  160. }
  161. func (p *packSizeCounter) AddEntry(e storage.Entry) (int, error) {
  162. n, err := p.packer.AddEntry(e)
  163. if err == nil && p.size != nil {
  164. *p.size += e.Size
  165. }
  166. return n, err
  167. }