layer_store.go 16 KB


  1. package layer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/distribution"
  10. "github.com/docker/docker/daemon/graphdriver"
  11. "github.com/docker/docker/pkg/idtools"
  12. "github.com/docker/docker/pkg/plugingetter"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/opencontainers/go-digest"
  15. "github.com/vbatts/tar-split/tar/asm"
  16. "github.com/vbatts/tar-split/tar/storage"
  17. )
  18. // maxLayerDepth represents the maximum number of
  19. // layers which can be chained together. 125 was
  20. // chosen to account for the 127 max in some
  21. // graphdrivers plus the 2 additional layers
  22. // used to create a rwlayer.
  23. const maxLayerDepth = 125
  24. type layerStore struct {
  25. store MetadataStore
  26. driver graphdriver.Driver
  27. layerMap map[ChainID]*roLayer
  28. layerL sync.Mutex
  29. mounts map[string]*mountedLayer
  30. mountL sync.Mutex
  31. useTarSplit bool
  32. }
  33. // StoreOptions are the options used to create a new Store instance
  34. type StoreOptions struct {
  35. StorePath string
  36. MetadataStorePathTemplate string
  37. GraphDriver string
  38. GraphDriverOptions []string
  39. UIDMaps []idtools.IDMap
  40. GIDMaps []idtools.IDMap
  41. PluginGetter plugingetter.PluginGetter
  42. ExperimentalEnabled bool
  43. }
  44. // NewStoreFromOptions creates a new Store instance
  45. func NewStoreFromOptions(options StoreOptions) (Store, error) {
  46. driver, err := graphdriver.New(options.GraphDriver, options.PluginGetter, graphdriver.Options{
  47. Root: options.StorePath,
  48. DriverOptions: options.GraphDriverOptions,
  49. UIDMaps: options.UIDMaps,
  50. GIDMaps: options.GIDMaps,
  51. ExperimentalEnabled: options.ExperimentalEnabled,
  52. })
  53. if err != nil {
  54. return nil, fmt.Errorf("error initializing graphdriver: %v", err)
  55. }
  56. logrus.Debugf("Using graph driver %s", driver)
  57. fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver))
  58. if err != nil {
  59. return nil, err
  60. }
  61. return NewStoreFromGraphDriver(fms, driver)
  62. }
  63. // NewStoreFromGraphDriver creates a new Store instance using the provided
  64. // metadata store and graph driver. The metadata store will be used to restore
  65. // the Store.
  66. func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver) (Store, error) {
  67. caps := graphdriver.Capabilities{}
  68. if capDriver, ok := driver.(graphdriver.CapabilityDriver); ok {
  69. caps = capDriver.Capabilities()
  70. }
  71. ls := &layerStore{
  72. store: store,
  73. driver: driver,
  74. layerMap: map[ChainID]*roLayer{},
  75. mounts: map[string]*mountedLayer{},
  76. useTarSplit: !caps.ReproducesExactDiffs,
  77. }
  78. ids, mounts, err := store.List()
  79. if err != nil {
  80. return nil, err
  81. }
  82. for _, id := range ids {
  83. l, err := ls.loadLayer(id)
  84. if err != nil {
  85. logrus.Debugf("Failed to load layer %s: %s", id, err)
  86. continue
  87. }
  88. if l.parent != nil {
  89. l.parent.referenceCount++
  90. }
  91. }
  92. for _, mount := range mounts {
  93. if err := ls.loadMount(mount); err != nil {
  94. logrus.Debugf("Failed to load mount %s: %s", mount, err)
  95. }
  96. }
  97. return ls, nil
  98. }
  99. func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
  100. cl, ok := ls.layerMap[layer]
  101. if ok {
  102. return cl, nil
  103. }
  104. diff, err := ls.store.GetDiffID(layer)
  105. if err != nil {
  106. return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
  107. }
  108. size, err := ls.store.GetSize(layer)
  109. if err != nil {
  110. return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
  111. }
  112. cacheID, err := ls.store.GetCacheID(layer)
  113. if err != nil {
  114. return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
  115. }
  116. parent, err := ls.store.GetParent(layer)
  117. if err != nil {
  118. return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
  119. }
  120. descriptor, err := ls.store.GetDescriptor(layer)
  121. if err != nil {
  122. return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
  123. }
  124. cl = &roLayer{
  125. chainID: layer,
  126. diffID: diff,
  127. size: size,
  128. cacheID: cacheID,
  129. layerStore: ls,
  130. references: map[Layer]struct{}{},
  131. descriptor: descriptor,
  132. }
  133. if parent != "" {
  134. p, err := ls.loadLayer(parent)
  135. if err != nil {
  136. return nil, err
  137. }
  138. cl.parent = p
  139. }
  140. ls.layerMap[cl.chainID] = cl
  141. return cl, nil
  142. }
  143. func (ls *layerStore) loadMount(mount string) error {
  144. if _, ok := ls.mounts[mount]; ok {
  145. return nil
  146. }
  147. mountID, err := ls.store.GetMountID(mount)
  148. if err != nil {
  149. return err
  150. }
  151. initID, err := ls.store.GetInitID(mount)
  152. if err != nil {
  153. return err
  154. }
  155. parent, err := ls.store.GetMountParent(mount)
  156. if err != nil {
  157. return err
  158. }
  159. ml := &mountedLayer{
  160. name: mount,
  161. mountID: mountID,
  162. initID: initID,
  163. layerStore: ls,
  164. references: map[RWLayer]*referencedRWLayer{},
  165. }
  166. if parent != "" {
  167. p, err := ls.loadLayer(parent)
  168. if err != nil {
  169. return err
  170. }
  171. ml.parent = p
  172. p.referenceCount++
  173. }
  174. ls.mounts[ml.name] = ml
  175. return nil
  176. }
  177. func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
  178. digester := digest.Canonical.Digester()
  179. tr := io.TeeReader(ts, digester.Hash())
  180. rdr := tr
  181. if ls.useTarSplit {
  182. tsw, err := tx.TarSplitWriter(true)
  183. if err != nil {
  184. return err
  185. }
  186. metaPacker := storage.NewJSONPacker(tsw)
  187. defer tsw.Close()
  188. // we're passing nil here for the file putter, because the ApplyDiff will
  189. // handle the extraction of the archive
  190. rdr, err = asm.NewInputTarStream(tr, metaPacker, nil)
  191. if err != nil {
  192. return err
  193. }
  194. }
  195. applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, rdr)
  196. if err != nil {
  197. return err
  198. }
  199. // Discard trailing data but ensure metadata is picked up to reconstruct stream
  200. io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
  201. layer.size = applySize
  202. layer.diffID = DiffID(digester.Digest())
  203. logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
  204. return nil
  205. }
  206. func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
  207. return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
  208. }
  209. func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
  210. // err is used to hold the error which will always trigger
  211. // cleanup of creates sources but may not be an error returned
  212. // to the caller (already exists).
  213. var err error
  214. var pid string
  215. var p *roLayer
  216. if string(parent) != "" {
  217. p = ls.get(parent)
  218. if p == nil {
  219. return nil, ErrLayerDoesNotExist
  220. }
  221. pid = p.cacheID
  222. // Release parent chain if error
  223. defer func() {
  224. if err != nil {
  225. ls.layerL.Lock()
  226. ls.releaseLayer(p)
  227. ls.layerL.Unlock()
  228. }
  229. }()
  230. if p.depth() >= maxLayerDepth {
  231. err = ErrMaxDepthExceeded
  232. return nil, err
  233. }
  234. }
  235. // Create new roLayer
  236. layer := &roLayer{
  237. parent: p,
  238. cacheID: stringid.GenerateRandomID(),
  239. referenceCount: 1,
  240. layerStore: ls,
  241. references: map[Layer]struct{}{},
  242. descriptor: descriptor,
  243. }
  244. if err = ls.driver.Create(layer.cacheID, pid, nil); err != nil {
  245. return nil, err
  246. }
  247. tx, err := ls.store.StartTransaction()
  248. if err != nil {
  249. return nil, err
  250. }
  251. defer func() {
  252. if err != nil {
  253. logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
  254. if err := ls.driver.Remove(layer.cacheID); err != nil {
  255. logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
  256. }
  257. if err := tx.Cancel(); err != nil {
  258. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  259. }
  260. }
  261. }()
  262. if err = ls.applyTar(tx, ts, pid, layer); err != nil {
  263. return nil, err
  264. }
  265. if layer.parent == nil {
  266. layer.chainID = ChainID(layer.diffID)
  267. } else {
  268. layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
  269. }
  270. if err = storeLayer(tx, layer); err != nil {
  271. return nil, err
  272. }
  273. ls.layerL.Lock()
  274. defer ls.layerL.Unlock()
  275. if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
  276. // Set error for cleanup, but do not return the error
  277. err = errors.New("layer already exists")
  278. return existingLayer.getReference(), nil
  279. }
  280. if err = tx.Commit(layer.chainID); err != nil {
  281. return nil, err
  282. }
  283. ls.layerMap[layer.chainID] = layer
  284. return layer.getReference(), nil
  285. }
  286. func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
  287. l, ok := ls.layerMap[layer]
  288. if !ok {
  289. return nil
  290. }
  291. l.referenceCount++
  292. return l
  293. }
  294. func (ls *layerStore) get(l ChainID) *roLayer {
  295. ls.layerL.Lock()
  296. defer ls.layerL.Unlock()
  297. return ls.getWithoutLock(l)
  298. }
  299. func (ls *layerStore) Get(l ChainID) (Layer, error) {
  300. ls.layerL.Lock()
  301. defer ls.layerL.Unlock()
  302. layer := ls.getWithoutLock(l)
  303. if layer == nil {
  304. return nil, ErrLayerDoesNotExist
  305. }
  306. return layer.getReference(), nil
  307. }
  308. func (ls *layerStore) Map() map[ChainID]Layer {
  309. ls.layerL.Lock()
  310. defer ls.layerL.Unlock()
  311. layers := map[ChainID]Layer{}
  312. for k, v := range ls.layerMap {
  313. layers[k] = v
  314. }
  315. return layers
  316. }
  317. func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
  318. err := ls.driver.Remove(layer.cacheID)
  319. if err != nil {
  320. return err
  321. }
  322. err = ls.store.Remove(layer.chainID)
  323. if err != nil {
  324. return err
  325. }
  326. metadata.DiffID = layer.diffID
  327. metadata.ChainID = layer.chainID
  328. metadata.Size, err = layer.Size()
  329. if err != nil {
  330. return err
  331. }
  332. metadata.DiffSize = layer.size
  333. return nil
  334. }
  335. func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
  336. depth := 0
  337. removed := []Metadata{}
  338. for {
  339. if l.referenceCount == 0 {
  340. panic("layer not retained")
  341. }
  342. l.referenceCount--
  343. if l.referenceCount != 0 {
  344. return removed, nil
  345. }
  346. if len(removed) == 0 && depth > 0 {
  347. panic("cannot remove layer with child")
  348. }
  349. if l.hasReferences() {
  350. panic("cannot delete referenced layer")
  351. }
  352. var metadata Metadata
  353. if err := ls.deleteLayer(l, &metadata); err != nil {
  354. return nil, err
  355. }
  356. delete(ls.layerMap, l.chainID)
  357. removed = append(removed, metadata)
  358. if l.parent == nil {
  359. return removed, nil
  360. }
  361. depth++
  362. l = l.parent
  363. }
  364. }
  365. func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
  366. ls.layerL.Lock()
  367. defer ls.layerL.Unlock()
  368. layer, ok := ls.layerMap[l.ChainID()]
  369. if !ok {
  370. return []Metadata{}, nil
  371. }
  372. if !layer.hasReference(l) {
  373. return nil, ErrLayerNotRetained
  374. }
  375. layer.deleteReference(l)
  376. return ls.releaseLayer(layer)
  377. }
  378. func (ls *layerStore) CreateRWLayer(name string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error) {
  379. var (
  380. storageOpt map[string]string
  381. initFunc MountInit
  382. mountLabel string
  383. )
  384. if opts != nil {
  385. mountLabel = opts.MountLabel
  386. storageOpt = opts.StorageOpt
  387. initFunc = opts.InitFunc
  388. }
  389. ls.mountL.Lock()
  390. defer ls.mountL.Unlock()
  391. m, ok := ls.mounts[name]
  392. if ok {
  393. return nil, ErrMountNameConflict
  394. }
  395. var err error
  396. var pid string
  397. var p *roLayer
  398. if string(parent) != "" {
  399. p = ls.get(parent)
  400. if p == nil {
  401. return nil, ErrLayerDoesNotExist
  402. }
  403. pid = p.cacheID
  404. // Release parent chain if error
  405. defer func() {
  406. if err != nil {
  407. ls.layerL.Lock()
  408. ls.releaseLayer(p)
  409. ls.layerL.Unlock()
  410. }
  411. }()
  412. }
  413. m = &mountedLayer{
  414. name: name,
  415. parent: p,
  416. mountID: ls.mountID(name),
  417. layerStore: ls,
  418. references: map[RWLayer]*referencedRWLayer{},
  419. }
  420. if initFunc != nil {
  421. pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
  422. if err != nil {
  423. return nil, err
  424. }
  425. m.initID = pid
  426. }
  427. createOpts := &graphdriver.CreateOpts{
  428. StorageOpt: storageOpt,
  429. }
  430. if err = ls.driver.CreateReadWrite(m.mountID, pid, createOpts); err != nil {
  431. return nil, err
  432. }
  433. if err = ls.saveMount(m); err != nil {
  434. return nil, err
  435. }
  436. return m.getReference(), nil
  437. }
  438. func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
  439. ls.mountL.Lock()
  440. defer ls.mountL.Unlock()
  441. mount, ok := ls.mounts[id]
  442. if !ok {
  443. return nil, ErrMountDoesNotExist
  444. }
  445. return mount.getReference(), nil
  446. }
  447. func (ls *layerStore) GetMountID(id string) (string, error) {
  448. ls.mountL.Lock()
  449. defer ls.mountL.Unlock()
  450. mount, ok := ls.mounts[id]
  451. if !ok {
  452. return "", ErrMountDoesNotExist
  453. }
  454. logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
  455. return mount.mountID, nil
  456. }
  457. func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
  458. ls.mountL.Lock()
  459. defer ls.mountL.Unlock()
  460. m, ok := ls.mounts[l.Name()]
  461. if !ok {
  462. return []Metadata{}, nil
  463. }
  464. if err := m.deleteReference(l); err != nil {
  465. return nil, err
  466. }
  467. if m.hasReferences() {
  468. return []Metadata{}, nil
  469. }
  470. if err := ls.driver.Remove(m.mountID); err != nil {
  471. logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
  472. m.retakeReference(l)
  473. return nil, err
  474. }
  475. if m.initID != "" {
  476. if err := ls.driver.Remove(m.initID); err != nil {
  477. logrus.Errorf("Error removing init layer %s: %s", m.name, err)
  478. m.retakeReference(l)
  479. return nil, err
  480. }
  481. }
  482. if err := ls.store.RemoveMount(m.name); err != nil {
  483. logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
  484. m.retakeReference(l)
  485. return nil, err
  486. }
  487. delete(ls.mounts, m.Name())
  488. ls.layerL.Lock()
  489. defer ls.layerL.Unlock()
  490. if m.parent != nil {
  491. return ls.releaseLayer(m.parent)
  492. }
  493. return []Metadata{}, nil
  494. }
  495. func (ls *layerStore) saveMount(mount *mountedLayer) error {
  496. if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
  497. return err
  498. }
  499. if mount.initID != "" {
  500. if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
  501. return err
  502. }
  503. }
  504. if mount.parent != nil {
  505. if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
  506. return err
  507. }
  508. }
  509. ls.mounts[mount.name] = mount
  510. return nil
  511. }
  512. func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
  513. // Use "<graph-id>-init" to maintain compatibility with graph drivers
  514. // which are expecting this layer with this special name. If all
  515. // graph drivers can be updated to not rely on knowing about this layer
  516. // then the initID should be randomly generated.
  517. initID := fmt.Sprintf("%s-init", graphID)
  518. createOpts := &graphdriver.CreateOpts{
  519. MountLabel: mountLabel,
  520. StorageOpt: storageOpt,
  521. }
  522. if err := ls.driver.CreateReadWrite(initID, parent, createOpts); err != nil {
  523. return "", err
  524. }
  525. p, err := ls.driver.Get(initID, "")
  526. if err != nil {
  527. return "", err
  528. }
  529. if err := initFunc(p); err != nil {
  530. ls.driver.Put(initID)
  531. return "", err
  532. }
  533. if err := ls.driver.Put(initID); err != nil {
  534. return "", err
  535. }
  536. return initID, nil
  537. }
  538. func (ls *layerStore) getTarStream(rl *roLayer) (io.ReadCloser, error) {
  539. if !ls.useTarSplit {
  540. var parentCacheID string
  541. if rl.parent != nil {
  542. parentCacheID = rl.parent.cacheID
  543. }
  544. return ls.driver.Diff(rl.cacheID, parentCacheID)
  545. }
  546. r, err := ls.store.TarSplitReader(rl.chainID)
  547. if err != nil {
  548. return nil, err
  549. }
  550. pr, pw := io.Pipe()
  551. go func() {
  552. err := ls.assembleTarTo(rl.cacheID, r, nil, pw)
  553. if err != nil {
  554. pw.CloseWithError(err)
  555. } else {
  556. pw.Close()
  557. }
  558. }()
  559. return pr, nil
  560. }
  561. func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
  562. diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
  563. if !ok {
  564. diffDriver = &naiveDiffPathDriver{ls.driver}
  565. }
  566. defer metadata.Close()
  567. // get our relative path to the container
  568. fileGetCloser, err := diffDriver.DiffGetter(graphID)
  569. if err != nil {
  570. return err
  571. }
  572. defer fileGetCloser.Close()
  573. metaUnpacker := storage.NewJSONUnpacker(metadata)
  574. upackerCounter := &unpackSizeCounter{metaUnpacker, size}
  575. logrus.Debugf("Assembling tar data for %s", graphID)
  576. return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
  577. }
  578. func (ls *layerStore) Cleanup() error {
  579. return ls.driver.Cleanup()
  580. }
  581. func (ls *layerStore) DriverStatus() [][2]string {
  582. return ls.driver.Status()
  583. }
  584. func (ls *layerStore) DriverName() string {
  585. return ls.driver.String()
  586. }
  587. type naiveDiffPathDriver struct {
  588. graphdriver.Driver
  589. }
  590. type fileGetPutter struct {
  591. storage.FileGetter
  592. driver graphdriver.Driver
  593. id string
  594. }
  595. func (w *fileGetPutter) Close() error {
  596. return w.driver.Put(w.id)
  597. }
  598. func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
  599. p, err := n.Driver.Get(id, "")
  600. if err != nil {
  601. return nil, err
  602. }
  603. return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
  604. }