layer_store.go 18 KB


  1. package layer // import "github.com/docker/docker/layer"
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "github.com/docker/distribution"
  10. "github.com/docker/docker/daemon/graphdriver"
  11. "github.com/docker/docker/pkg/idtools"
  12. "github.com/docker/docker/pkg/plugingetter"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/moby/locker"
  15. "github.com/opencontainers/go-digest"
  16. "github.com/sirupsen/logrus"
  17. "github.com/vbatts/tar-split/tar/asm"
  18. "github.com/vbatts/tar-split/tar/storage"
  19. )
  20. // maxLayerDepth represents the maximum number of
  21. // layers which can be chained together. 125 was
  22. // chosen to account for the 127 max in some
  23. // graphdrivers plus the 2 additional layers
  24. // used to create a rwlayer.
  25. const maxLayerDepth = 125
  26. type layerStore struct {
  27. store *fileMetadataStore
  28. driver graphdriver.Driver
  29. useTarSplit bool
  30. layerMap map[ChainID]*roLayer
  31. layerL sync.Mutex
  32. mounts map[string]*mountedLayer
  33. mountL sync.Mutex
  34. // protect *RWLayer() methods from operating on the same name/id
  35. locker *locker.Locker
  36. }
  37. // StoreOptions are the options used to create a new Store instance
  38. type StoreOptions struct {
  39. Root string
  40. MetadataStorePathTemplate string
  41. GraphDriver string
  42. GraphDriverOptions []string
  43. IDMapping idtools.IdentityMapping
  44. PluginGetter plugingetter.PluginGetter
  45. ExperimentalEnabled bool
  46. }
  47. // NewStoreFromOptions creates a new Store instance
  48. func NewStoreFromOptions(options StoreOptions) (Store, error) {
  49. driver, err := graphdriver.New(options.GraphDriver, options.PluginGetter, graphdriver.Options{
  50. Root: options.Root,
  51. DriverOptions: options.GraphDriverOptions,
  52. IDMap: options.IDMapping,
  53. ExperimentalEnabled: options.ExperimentalEnabled,
  54. })
  55. if err != nil {
  56. return nil, fmt.Errorf("error initializing graphdriver: %v", err)
  57. }
  58. logrus.Debugf("Initialized graph driver %s", driver)
  59. root := fmt.Sprintf(options.MetadataStorePathTemplate, driver)
  60. return newStoreFromGraphDriver(root, driver)
  61. }
  62. // newStoreFromGraphDriver creates a new Store instance using the provided
  63. // metadata store and graph driver. The metadata store will be used to restore
  64. // the Store.
  65. func newStoreFromGraphDriver(root string, driver graphdriver.Driver) (Store, error) {
  66. caps := graphdriver.Capabilities{}
  67. if capDriver, ok := driver.(graphdriver.CapabilityDriver); ok {
  68. caps = capDriver.Capabilities()
  69. }
  70. ms, err := newFSMetadataStore(root)
  71. if err != nil {
  72. return nil, err
  73. }
  74. ls := &layerStore{
  75. store: ms,
  76. driver: driver,
  77. layerMap: map[ChainID]*roLayer{},
  78. mounts: map[string]*mountedLayer{},
  79. locker: locker.New(),
  80. useTarSplit: !caps.ReproducesExactDiffs,
  81. }
  82. ids, mounts, err := ms.List()
  83. if err != nil {
  84. return nil, err
  85. }
  86. for _, id := range ids {
  87. l, err := ls.loadLayer(id)
  88. if err != nil {
  89. logrus.Debugf("Failed to load layer %s: %s", id, err)
  90. continue
  91. }
  92. if l.parent != nil {
  93. l.parent.referenceCount++
  94. }
  95. }
  96. for _, mount := range mounts {
  97. if err := ls.loadMount(mount); err != nil {
  98. logrus.Debugf("Failed to load mount %s: %s", mount, err)
  99. }
  100. }
  101. return ls, nil
  102. }
  103. func (ls *layerStore) Driver() graphdriver.Driver {
  104. return ls.driver
  105. }
  106. func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
  107. cl, ok := ls.layerMap[layer]
  108. if ok {
  109. return cl, nil
  110. }
  111. diff, err := ls.store.GetDiffID(layer)
  112. if err != nil {
  113. return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
  114. }
  115. size, err := ls.store.GetSize(layer)
  116. if err != nil {
  117. return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
  118. }
  119. cacheID, err := ls.store.GetCacheID(layer)
  120. if err != nil {
  121. return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
  122. }
  123. parent, err := ls.store.GetParent(layer)
  124. if err != nil {
  125. return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
  126. }
  127. descriptor, err := ls.store.GetDescriptor(layer)
  128. if err != nil {
  129. return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
  130. }
  131. cl = &roLayer{
  132. chainID: layer,
  133. diffID: diff,
  134. size: size,
  135. cacheID: cacheID,
  136. layerStore: ls,
  137. references: map[Layer]struct{}{},
  138. descriptor: descriptor,
  139. }
  140. if parent != "" {
  141. p, err := ls.loadLayer(parent)
  142. if err != nil {
  143. return nil, err
  144. }
  145. cl.parent = p
  146. }
  147. ls.layerMap[cl.chainID] = cl
  148. return cl, nil
  149. }
  150. func (ls *layerStore) loadMount(mount string) error {
  151. ls.mountL.Lock()
  152. defer ls.mountL.Unlock()
  153. if _, ok := ls.mounts[mount]; ok {
  154. return nil
  155. }
  156. mountID, err := ls.store.GetMountID(mount)
  157. if err != nil {
  158. return err
  159. }
  160. initID, err := ls.store.GetInitID(mount)
  161. if err != nil {
  162. return err
  163. }
  164. parent, err := ls.store.GetMountParent(mount)
  165. if err != nil {
  166. return err
  167. }
  168. ml := &mountedLayer{
  169. name: mount,
  170. mountID: mountID,
  171. initID: initID,
  172. layerStore: ls,
  173. references: map[RWLayer]*referencedRWLayer{},
  174. }
  175. if parent != "" {
  176. p, err := ls.loadLayer(parent)
  177. if err != nil {
  178. return err
  179. }
  180. ml.parent = p
  181. p.referenceCount++
  182. }
  183. ls.mounts[ml.name] = ml
  184. return nil
  185. }
  186. func (ls *layerStore) applyTar(tx *fileMetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
  187. digester := digest.Canonical.Digester()
  188. tr := io.TeeReader(ts, digester.Hash())
  189. rdr := tr
  190. if ls.useTarSplit {
  191. tsw, err := tx.TarSplitWriter(true)
  192. if err != nil {
  193. return err
  194. }
  195. metaPacker := storage.NewJSONPacker(tsw)
  196. defer tsw.Close()
  197. // we're passing nil here for the file putter, because the ApplyDiff will
  198. // handle the extraction of the archive
  199. rdr, err = asm.NewInputTarStream(tr, metaPacker, nil)
  200. if err != nil {
  201. return err
  202. }
  203. }
  204. applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, rdr)
  205. // discard trailing data but ensure metadata is picked up to reconstruct stream
  206. // unconditionally call io.Copy here before checking err to ensure the resources
  207. // allocated by NewInputTarStream above are always released
  208. io.Copy(io.Discard, rdr) // ignore error as reader may be closed
  209. if err != nil {
  210. return err
  211. }
  212. layer.size = applySize
  213. layer.diffID = DiffID(digester.Digest())
  214. logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
  215. return nil
  216. }
  217. func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
  218. return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
  219. }
  220. func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
  221. // err is used to hold the error which will always trigger
  222. // cleanup of creates sources but may not be an error returned
  223. // to the caller (already exists).
  224. var err error
  225. var pid string
  226. var p *roLayer
  227. if string(parent) != "" {
  228. ls.layerL.Lock()
  229. p = ls.get(parent)
  230. ls.layerL.Unlock()
  231. if p == nil {
  232. return nil, ErrLayerDoesNotExist
  233. }
  234. pid = p.cacheID
  235. // Release parent chain if error
  236. defer func() {
  237. if err != nil {
  238. ls.layerL.Lock()
  239. ls.releaseLayer(p)
  240. ls.layerL.Unlock()
  241. }
  242. }()
  243. if p.depth() >= maxLayerDepth {
  244. err = ErrMaxDepthExceeded
  245. return nil, err
  246. }
  247. }
  248. // Create new roLayer
  249. layer := &roLayer{
  250. parent: p,
  251. cacheID: stringid.GenerateRandomID(),
  252. referenceCount: 1,
  253. layerStore: ls,
  254. references: map[Layer]struct{}{},
  255. descriptor: descriptor,
  256. }
  257. if err = ls.driver.Create(layer.cacheID, pid, nil); err != nil {
  258. return nil, err
  259. }
  260. tx, err := ls.store.StartTransaction()
  261. if err != nil {
  262. return nil, err
  263. }
  264. defer func() {
  265. if err != nil {
  266. logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
  267. if err := ls.driver.Remove(layer.cacheID); err != nil {
  268. logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
  269. }
  270. if err := tx.Cancel(); err != nil {
  271. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  272. }
  273. }
  274. }()
  275. if err = ls.applyTar(tx, ts, pid, layer); err != nil {
  276. return nil, err
  277. }
  278. if layer.parent == nil {
  279. layer.chainID = ChainID(layer.diffID)
  280. } else {
  281. layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
  282. }
  283. if err = storeLayer(tx, layer); err != nil {
  284. return nil, err
  285. }
  286. ls.layerL.Lock()
  287. defer ls.layerL.Unlock()
  288. if existingLayer := ls.get(layer.chainID); existingLayer != nil {
  289. // Set error for cleanup, but do not return the error
  290. err = errors.New("layer already exists")
  291. return existingLayer.getReference(), nil
  292. }
  293. if err = tx.Commit(layer.chainID); err != nil {
  294. return nil, err
  295. }
  296. ls.layerMap[layer.chainID] = layer
  297. return layer.getReference(), nil
  298. }
  299. func (ls *layerStore) get(layer ChainID) *roLayer {
  300. l, ok := ls.layerMap[layer]
  301. if !ok {
  302. return nil
  303. }
  304. l.referenceCount++
  305. return l
  306. }
  307. func (ls *layerStore) Get(l ChainID) (Layer, error) {
  308. ls.layerL.Lock()
  309. defer ls.layerL.Unlock()
  310. layer := ls.get(l)
  311. if layer == nil {
  312. return nil, ErrLayerDoesNotExist
  313. }
  314. return layer.getReference(), nil
  315. }
  316. func (ls *layerStore) Map() map[ChainID]Layer {
  317. ls.layerL.Lock()
  318. defer ls.layerL.Unlock()
  319. layers := map[ChainID]Layer{}
  320. for k, v := range ls.layerMap {
  321. layers[k] = v
  322. }
  323. return layers
  324. }
  325. func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
  326. // Rename layer digest folder first so we detect orphan layer(s)
  327. // if ls.driver.Remove fails
  328. var dir string
  329. for {
  330. dgst := digest.Digest(layer.chainID)
  331. tmpID := fmt.Sprintf("%s-%s-removing", dgst.Hex(), stringid.GenerateRandomID())
  332. dir = filepath.Join(ls.store.root, string(dgst.Algorithm()), tmpID)
  333. err := os.Rename(ls.store.getLayerDirectory(layer.chainID), dir)
  334. if os.IsExist(err) {
  335. continue
  336. }
  337. break
  338. }
  339. err := ls.driver.Remove(layer.cacheID)
  340. if err != nil {
  341. return err
  342. }
  343. err = os.RemoveAll(dir)
  344. if err != nil {
  345. return err
  346. }
  347. metadata.DiffID = layer.diffID
  348. metadata.ChainID = layer.chainID
  349. metadata.Size = layer.Size()
  350. if err != nil {
  351. return err
  352. }
  353. metadata.DiffSize = layer.size
  354. return nil
  355. }
  356. func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
  357. depth := 0
  358. removed := []Metadata{}
  359. for {
  360. if l.referenceCount == 0 {
  361. panic("layer not retained")
  362. }
  363. l.referenceCount--
  364. if l.referenceCount != 0 {
  365. return removed, nil
  366. }
  367. if len(removed) == 0 && depth > 0 {
  368. panic("cannot remove layer with child")
  369. }
  370. if l.hasReferences() {
  371. panic("cannot delete referenced layer")
  372. }
  373. // Remove layer from layer map first so it is not considered to exist
  374. // when if ls.deleteLayer fails.
  375. delete(ls.layerMap, l.chainID)
  376. var metadata Metadata
  377. if err := ls.deleteLayer(l, &metadata); err != nil {
  378. return nil, err
  379. }
  380. removed = append(removed, metadata)
  381. if l.parent == nil {
  382. return removed, nil
  383. }
  384. depth++
  385. l = l.parent
  386. }
  387. }
  388. func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
  389. ls.layerL.Lock()
  390. defer ls.layerL.Unlock()
  391. layer, ok := ls.layerMap[l.ChainID()]
  392. if !ok {
  393. return []Metadata{}, nil
  394. }
  395. if !layer.hasReference(l) {
  396. return nil, ErrLayerNotRetained
  397. }
  398. layer.deleteReference(l)
  399. return ls.releaseLayer(layer)
  400. }
  401. func (ls *layerStore) CreateRWLayer(name string, parent ChainID, opts *CreateRWLayerOpts) (_ RWLayer, err error) {
  402. var (
  403. storageOpt map[string]string
  404. initFunc MountInit
  405. mountLabel string
  406. )
  407. if opts != nil {
  408. mountLabel = opts.MountLabel
  409. storageOpt = opts.StorageOpt
  410. initFunc = opts.InitFunc
  411. }
  412. ls.locker.Lock(name)
  413. defer ls.locker.Unlock(name)
  414. ls.mountL.Lock()
  415. _, ok := ls.mounts[name]
  416. ls.mountL.Unlock()
  417. if ok {
  418. return nil, ErrMountNameConflict
  419. }
  420. var pid string
  421. var p *roLayer
  422. if string(parent) != "" {
  423. ls.layerL.Lock()
  424. p = ls.get(parent)
  425. ls.layerL.Unlock()
  426. if p == nil {
  427. return nil, ErrLayerDoesNotExist
  428. }
  429. pid = p.cacheID
  430. // Release parent chain if error
  431. defer func() {
  432. if err != nil {
  433. ls.layerL.Lock()
  434. ls.releaseLayer(p)
  435. ls.layerL.Unlock()
  436. }
  437. }()
  438. }
  439. m := &mountedLayer{
  440. name: name,
  441. parent: p,
  442. mountID: ls.mountID(name),
  443. layerStore: ls,
  444. references: map[RWLayer]*referencedRWLayer{},
  445. }
  446. if initFunc != nil {
  447. pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
  448. if err != nil {
  449. return
  450. }
  451. m.initID = pid
  452. }
  453. createOpts := &graphdriver.CreateOpts{
  454. StorageOpt: storageOpt,
  455. }
  456. if err = ls.driver.CreateReadWrite(m.mountID, pid, createOpts); err != nil {
  457. return
  458. }
  459. if err = ls.saveMount(m); err != nil {
  460. return
  461. }
  462. return m.getReference(), nil
  463. }
  464. func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
  465. ls.locker.Lock(id)
  466. defer ls.locker.Unlock(id)
  467. ls.mountL.Lock()
  468. mount := ls.mounts[id]
  469. ls.mountL.Unlock()
  470. if mount == nil {
  471. return nil, ErrMountDoesNotExist
  472. }
  473. return mount.getReference(), nil
  474. }
  475. func (ls *layerStore) GetMountID(id string) (string, error) {
  476. ls.mountL.Lock()
  477. mount := ls.mounts[id]
  478. ls.mountL.Unlock()
  479. if mount == nil {
  480. return "", ErrMountDoesNotExist
  481. }
  482. logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
  483. return mount.mountID, nil
  484. }
  485. func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
  486. name := l.Name()
  487. ls.locker.Lock(name)
  488. defer ls.locker.Unlock(name)
  489. ls.mountL.Lock()
  490. m := ls.mounts[name]
  491. ls.mountL.Unlock()
  492. if m == nil {
  493. return []Metadata{}, nil
  494. }
  495. if err := m.deleteReference(l); err != nil {
  496. return nil, err
  497. }
  498. if m.hasReferences() {
  499. return []Metadata{}, nil
  500. }
  501. if err := ls.driver.Remove(m.mountID); err != nil {
  502. logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
  503. m.retakeReference(l)
  504. return nil, err
  505. }
  506. if m.initID != "" {
  507. if err := ls.driver.Remove(m.initID); err != nil {
  508. logrus.Errorf("Error removing init layer %s: %s", m.name, err)
  509. m.retakeReference(l)
  510. return nil, err
  511. }
  512. }
  513. if err := ls.store.RemoveMount(m.name); err != nil {
  514. logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
  515. m.retakeReference(l)
  516. return nil, err
  517. }
  518. ls.mountL.Lock()
  519. delete(ls.mounts, name)
  520. ls.mountL.Unlock()
  521. ls.layerL.Lock()
  522. defer ls.layerL.Unlock()
  523. if m.parent != nil {
  524. return ls.releaseLayer(m.parent)
  525. }
  526. return []Metadata{}, nil
  527. }
  528. func (ls *layerStore) saveMount(mount *mountedLayer) error {
  529. if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
  530. return err
  531. }
  532. if mount.initID != "" {
  533. if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
  534. return err
  535. }
  536. }
  537. if mount.parent != nil {
  538. if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
  539. return err
  540. }
  541. }
  542. ls.mountL.Lock()
  543. ls.mounts[mount.name] = mount
  544. ls.mountL.Unlock()
  545. return nil
  546. }
  547. func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
  548. // Use "<graph-id>-init" to maintain compatibility with graph drivers
  549. // which are expecting this layer with this special name. If all
  550. // graph drivers can be updated to not rely on knowing about this layer
  551. // then the initID should be randomly generated.
  552. initID := fmt.Sprintf("%s-init", graphID)
  553. createOpts := &graphdriver.CreateOpts{
  554. MountLabel: mountLabel,
  555. StorageOpt: storageOpt,
  556. }
  557. if err := ls.driver.CreateReadWrite(initID, parent, createOpts); err != nil {
  558. return "", err
  559. }
  560. p, err := ls.driver.Get(initID, "")
  561. if err != nil {
  562. return "", err
  563. }
  564. if err := initFunc(p); err != nil {
  565. ls.driver.Put(initID)
  566. return "", err
  567. }
  568. if err := ls.driver.Put(initID); err != nil {
  569. return "", err
  570. }
  571. return initID, nil
  572. }
  573. func (ls *layerStore) getTarStream(rl *roLayer) (io.ReadCloser, error) {
  574. if !ls.useTarSplit {
  575. var parentCacheID string
  576. if rl.parent != nil {
  577. parentCacheID = rl.parent.cacheID
  578. }
  579. return ls.driver.Diff(rl.cacheID, parentCacheID)
  580. }
  581. r, err := ls.store.TarSplitReader(rl.chainID)
  582. if err != nil {
  583. return nil, err
  584. }
  585. pr, pw := io.Pipe()
  586. go func() {
  587. err := ls.assembleTarTo(rl.cacheID, r, nil, pw)
  588. if err != nil {
  589. pw.CloseWithError(err)
  590. } else {
  591. pw.Close()
  592. }
  593. }()
  594. return pr, nil
  595. }
  596. func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
  597. diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
  598. if !ok {
  599. diffDriver = &naiveDiffPathDriver{ls.driver}
  600. }
  601. defer metadata.Close()
  602. // get our relative path to the container
  603. fileGetCloser, err := diffDriver.DiffGetter(graphID)
  604. if err != nil {
  605. return err
  606. }
  607. defer fileGetCloser.Close()
  608. metaUnpacker := storage.NewJSONUnpacker(metadata)
  609. upackerCounter := &unpackSizeCounter{metaUnpacker, size}
  610. logrus.Debugf("Assembling tar data for %s", graphID)
  611. return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
  612. }
  613. func (ls *layerStore) Cleanup() error {
  614. orphanLayers, err := ls.store.getOrphan()
  615. if err != nil {
  616. logrus.WithError(err).Error("cannot get orphan layers")
  617. }
  618. if len(orphanLayers) > 0 {
  619. logrus.Debugf("found %v orphan layers", len(orphanLayers))
  620. }
  621. for _, orphan := range orphanLayers {
  622. logrus.WithField("cache-id", orphan.cacheID).Debugf("removing orphan layer, chain ID: %v", orphan.chainID)
  623. err = ls.driver.Remove(orphan.cacheID)
  624. if err != nil && !os.IsNotExist(err) {
  625. logrus.WithError(err).WithField("cache-id", orphan.cacheID).Error("cannot remove orphan layer")
  626. continue
  627. }
  628. err = ls.store.Remove(orphan.chainID, orphan.cacheID)
  629. if err != nil {
  630. logrus.WithError(err).WithField("chain-id", orphan.chainID).Error("cannot remove orphan layer metadata")
  631. }
  632. }
  633. return ls.driver.Cleanup()
  634. }
  635. func (ls *layerStore) DriverStatus() [][2]string {
  636. return ls.driver.Status()
  637. }
  638. func (ls *layerStore) DriverName() string {
  639. return ls.driver.String()
  640. }
  641. type naiveDiffPathDriver struct {
  642. graphdriver.Driver
  643. }
  644. type fileGetPutter struct {
  645. storage.FileGetter
  646. driver graphdriver.Driver
  647. id string
  648. }
  649. func (w *fileGetPutter) Close() error {
  650. return w.driver.Put(w.id)
  651. }
  652. func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
  653. p, err := n.Driver.Get(id, "")
  654. if err != nil {
  655. return nil, err
  656. }
  657. return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
  658. }