migration.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package layer // import "github.com/docker/docker/layer"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "errors"
  6. "io"
  7. "os"
  8. "github.com/containerd/log"
  9. "github.com/opencontainers/go-digest"
  10. "github.com/vbatts/tar-split/tar/asm"
  11. "github.com/vbatts/tar-split/tar/storage"
  12. )
  13. func (ls *layerStore) ChecksumForGraphID(id, parent, newTarDataPath string) (diffID DiffID, size int64, err error) {
  14. rawarchive, err := ls.driver.Diff(id, parent)
  15. if err != nil {
  16. return
  17. }
  18. defer rawarchive.Close()
  19. f, err := os.Create(newTarDataPath)
  20. if err != nil {
  21. return
  22. }
  23. defer f.Close()
  24. mfz := gzip.NewWriter(f)
  25. defer mfz.Close()
  26. metaPacker := storage.NewJSONPacker(mfz)
  27. packerCounter := &packSizeCounter{metaPacker, &size}
  28. archive, err := asm.NewInputTarStream(rawarchive, packerCounter, nil)
  29. if err != nil {
  30. return
  31. }
  32. dgst, err := digest.FromReader(archive)
  33. if err != nil {
  34. return
  35. }
  36. diffID = DiffID(dgst)
  37. return
  38. }
  39. func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, diffID DiffID, tarDataFile string, size int64) (Layer, error) {
  40. // err is used to hold the error which will always trigger
  41. // cleanup of creates sources but may not be an error returned
  42. // to the caller (already exists).
  43. var err error
  44. var p *roLayer
  45. if string(parent) != "" {
  46. ls.layerL.Lock()
  47. p = ls.get(parent)
  48. ls.layerL.Unlock()
  49. if p == nil {
  50. return nil, ErrLayerDoesNotExist
  51. }
  52. // Release parent chain if error
  53. defer func() {
  54. if err != nil {
  55. ls.layerL.Lock()
  56. ls.releaseLayer(p)
  57. ls.layerL.Unlock()
  58. }
  59. }()
  60. }
  61. // Create new roLayer
  62. layer := &roLayer{
  63. parent: p,
  64. cacheID: graphID,
  65. referenceCount: 1,
  66. layerStore: ls,
  67. references: map[Layer]struct{}{},
  68. diffID: diffID,
  69. size: size,
  70. chainID: createChainIDFromParent(parent, diffID),
  71. }
  72. ls.layerL.Lock()
  73. defer ls.layerL.Unlock()
  74. if existingLayer := ls.get(layer.chainID); existingLayer != nil {
  75. // Set error for cleanup, but do not return
  76. err = errors.New("layer already exists")
  77. return existingLayer.getReference(), nil
  78. }
  79. tx, err := ls.store.StartTransaction()
  80. if err != nil {
  81. return nil, err
  82. }
  83. defer func() {
  84. if err != nil {
  85. log.G(context.TODO()).Debugf("Cleaning up transaction after failed migration for %s: %v", graphID, err)
  86. if err := tx.Cancel(); err != nil {
  87. log.G(context.TODO()).Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  88. }
  89. }
  90. }()
  91. tsw, err := tx.TarSplitWriter(false)
  92. if err != nil {
  93. return nil, err
  94. }
  95. defer tsw.Close()
  96. tdf, err := os.Open(tarDataFile)
  97. if err != nil {
  98. return nil, err
  99. }
  100. defer tdf.Close()
  101. _, err = io.Copy(tsw, tdf)
  102. if err != nil {
  103. return nil, err
  104. }
  105. if err = storeLayer(tx, layer); err != nil {
  106. return nil, err
  107. }
  108. if err = tx.Commit(layer.chainID); err != nil {
  109. return nil, err
  110. }
  111. ls.layerMap[layer.chainID] = layer
  112. return layer.getReference(), nil
  113. }
  114. type unpackSizeCounter struct {
  115. unpacker storage.Unpacker
  116. size *int64
  117. }
  118. func (u *unpackSizeCounter) Next() (*storage.Entry, error) {
  119. e, err := u.unpacker.Next()
  120. if err == nil && u.size != nil {
  121. *u.size += e.Size
  122. }
  123. return e, err
  124. }
  125. type packSizeCounter struct {
  126. packer storage.Packer
  127. size *int64
  128. }
  129. func (p *packSizeCounter) AddEntry(e storage.Entry) (int, error) {
  130. n, err := p.packer.AddEntry(e)
  131. if err == nil && p.size != nil {
  132. *p.size += e.Size
  133. }
  134. return n, err
  135. }