layer_store.go 15 KB


  1. package layer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/distribution"
  10. "github.com/docker/docker/daemon/graphdriver"
  11. "github.com/docker/docker/pkg/idtools"
  12. "github.com/docker/docker/pkg/plugingetter"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/opencontainers/go-digest"
  15. "github.com/vbatts/tar-split/tar/asm"
  16. "github.com/vbatts/tar-split/tar/storage"
  17. )
  18. // maxLayerDepth represents the maximum number of
  19. // layers which can be chained together. 125 was
  20. // chosen to account for the 127 max in some
  21. // graphdrivers plus the 2 additional layers
  22. // used to create a rwlayer.
  23. const maxLayerDepth = 125
  24. type layerStore struct {
  25. store MetadataStore
  26. driver graphdriver.Driver
  27. layerMap map[ChainID]*roLayer
  28. layerL sync.Mutex
  29. mounts map[string]*mountedLayer
  30. mountL sync.Mutex
  31. }
  32. // StoreOptions are the options used to create a new Store instance
  33. type StoreOptions struct {
  34. StorePath string
  35. MetadataStorePathTemplate string
  36. GraphDriver string
  37. GraphDriverOptions []string
  38. UIDMaps []idtools.IDMap
  39. GIDMaps []idtools.IDMap
  40. PluginGetter plugingetter.PluginGetter
  41. ExperimentalEnabled bool
  42. }
  43. // NewStoreFromOptions creates a new Store instance
  44. func NewStoreFromOptions(options StoreOptions) (Store, error) {
  45. driver, err := graphdriver.New(options.GraphDriver, options.PluginGetter, graphdriver.Options{
  46. Root: options.StorePath,
  47. DriverOptions: options.GraphDriverOptions,
  48. UIDMaps: options.UIDMaps,
  49. GIDMaps: options.GIDMaps,
  50. ExperimentalEnabled: options.ExperimentalEnabled,
  51. })
  52. if err != nil {
  53. return nil, fmt.Errorf("error initializing graphdriver: %v", err)
  54. }
  55. logrus.Debugf("Using graph driver %s", driver)
  56. fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver))
  57. if err != nil {
  58. return nil, err
  59. }
  60. return NewStoreFromGraphDriver(fms, driver)
  61. }
  62. // NewStoreFromGraphDriver creates a new Store instance using the provided
  63. // metadata store and graph driver. The metadata store will be used to restore
  64. // the Store.
  65. func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver) (Store, error) {
  66. ls := &layerStore{
  67. store: store,
  68. driver: driver,
  69. layerMap: map[ChainID]*roLayer{},
  70. mounts: map[string]*mountedLayer{},
  71. }
  72. ids, mounts, err := store.List()
  73. if err != nil {
  74. return nil, err
  75. }
  76. for _, id := range ids {
  77. l, err := ls.loadLayer(id)
  78. if err != nil {
  79. logrus.Debugf("Failed to load layer %s: %s", id, err)
  80. continue
  81. }
  82. if l.parent != nil {
  83. l.parent.referenceCount++
  84. }
  85. }
  86. for _, mount := range mounts {
  87. if err := ls.loadMount(mount); err != nil {
  88. logrus.Debugf("Failed to load mount %s: %s", mount, err)
  89. }
  90. }
  91. return ls, nil
  92. }
  93. func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
  94. cl, ok := ls.layerMap[layer]
  95. if ok {
  96. return cl, nil
  97. }
  98. diff, err := ls.store.GetDiffID(layer)
  99. if err != nil {
  100. return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
  101. }
  102. size, err := ls.store.GetSize(layer)
  103. if err != nil {
  104. return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
  105. }
  106. cacheID, err := ls.store.GetCacheID(layer)
  107. if err != nil {
  108. return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
  109. }
  110. parent, err := ls.store.GetParent(layer)
  111. if err != nil {
  112. return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
  113. }
  114. descriptor, err := ls.store.GetDescriptor(layer)
  115. if err != nil {
  116. return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
  117. }
  118. cl = &roLayer{
  119. chainID: layer,
  120. diffID: diff,
  121. size: size,
  122. cacheID: cacheID,
  123. layerStore: ls,
  124. references: map[Layer]struct{}{},
  125. descriptor: descriptor,
  126. }
  127. if parent != "" {
  128. p, err := ls.loadLayer(parent)
  129. if err != nil {
  130. return nil, err
  131. }
  132. cl.parent = p
  133. }
  134. ls.layerMap[cl.chainID] = cl
  135. return cl, nil
  136. }
  137. func (ls *layerStore) loadMount(mount string) error {
  138. if _, ok := ls.mounts[mount]; ok {
  139. return nil
  140. }
  141. mountID, err := ls.store.GetMountID(mount)
  142. if err != nil {
  143. return err
  144. }
  145. initID, err := ls.store.GetInitID(mount)
  146. if err != nil {
  147. return err
  148. }
  149. parent, err := ls.store.GetMountParent(mount)
  150. if err != nil {
  151. return err
  152. }
  153. ml := &mountedLayer{
  154. name: mount,
  155. mountID: mountID,
  156. initID: initID,
  157. layerStore: ls,
  158. references: map[RWLayer]*referencedRWLayer{},
  159. }
  160. if parent != "" {
  161. p, err := ls.loadLayer(parent)
  162. if err != nil {
  163. return err
  164. }
  165. ml.parent = p
  166. p.referenceCount++
  167. }
  168. ls.mounts[ml.name] = ml
  169. return nil
  170. }
  171. func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
  172. digester := digest.Canonical.Digester()
  173. tr := io.TeeReader(ts, digester.Hash())
  174. tsw, err := tx.TarSplitWriter(true)
  175. if err != nil {
  176. return err
  177. }
  178. metaPacker := storage.NewJSONPacker(tsw)
  179. defer tsw.Close()
  180. // we're passing nil here for the file putter, because the ApplyDiff will
  181. // handle the extraction of the archive
  182. rdr, err := asm.NewInputTarStream(tr, metaPacker, nil)
  183. if err != nil {
  184. return err
  185. }
  186. applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, rdr)
  187. if err != nil {
  188. return err
  189. }
  190. // Discard trailing data but ensure metadata is picked up to reconstruct stream
  191. io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
  192. layer.size = applySize
  193. layer.diffID = DiffID(digester.Digest())
  194. logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
  195. return nil
  196. }
  197. func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
  198. return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
  199. }
  200. func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
  201. // err is used to hold the error which will always trigger
  202. // cleanup of creates sources but may not be an error returned
  203. // to the caller (already exists).
  204. var err error
  205. var pid string
  206. var p *roLayer
  207. if string(parent) != "" {
  208. p = ls.get(parent)
  209. if p == nil {
  210. return nil, ErrLayerDoesNotExist
  211. }
  212. pid = p.cacheID
  213. // Release parent chain if error
  214. defer func() {
  215. if err != nil {
  216. ls.layerL.Lock()
  217. ls.releaseLayer(p)
  218. ls.layerL.Unlock()
  219. }
  220. }()
  221. if p.depth() >= maxLayerDepth {
  222. err = ErrMaxDepthExceeded
  223. return nil, err
  224. }
  225. }
  226. // Create new roLayer
  227. layer := &roLayer{
  228. parent: p,
  229. cacheID: stringid.GenerateRandomID(),
  230. referenceCount: 1,
  231. layerStore: ls,
  232. references: map[Layer]struct{}{},
  233. descriptor: descriptor,
  234. }
  235. if err = ls.driver.Create(layer.cacheID, pid, nil); err != nil {
  236. return nil, err
  237. }
  238. tx, err := ls.store.StartTransaction()
  239. if err != nil {
  240. return nil, err
  241. }
  242. defer func() {
  243. if err != nil {
  244. logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
  245. if err := ls.driver.Remove(layer.cacheID); err != nil {
  246. logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
  247. }
  248. if err := tx.Cancel(); err != nil {
  249. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  250. }
  251. }
  252. }()
  253. if err = ls.applyTar(tx, ts, pid, layer); err != nil {
  254. return nil, err
  255. }
  256. if layer.parent == nil {
  257. layer.chainID = ChainID(layer.diffID)
  258. } else {
  259. layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
  260. }
  261. if err = storeLayer(tx, layer); err != nil {
  262. return nil, err
  263. }
  264. ls.layerL.Lock()
  265. defer ls.layerL.Unlock()
  266. if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
  267. // Set error for cleanup, but do not return the error
  268. err = errors.New("layer already exists")
  269. return existingLayer.getReference(), nil
  270. }
  271. if err = tx.Commit(layer.chainID); err != nil {
  272. return nil, err
  273. }
  274. ls.layerMap[layer.chainID] = layer
  275. return layer.getReference(), nil
  276. }
  277. func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
  278. l, ok := ls.layerMap[layer]
  279. if !ok {
  280. return nil
  281. }
  282. l.referenceCount++
  283. return l
  284. }
  285. func (ls *layerStore) get(l ChainID) *roLayer {
  286. ls.layerL.Lock()
  287. defer ls.layerL.Unlock()
  288. return ls.getWithoutLock(l)
  289. }
  290. func (ls *layerStore) Get(l ChainID) (Layer, error) {
  291. ls.layerL.Lock()
  292. defer ls.layerL.Unlock()
  293. layer := ls.getWithoutLock(l)
  294. if layer == nil {
  295. return nil, ErrLayerDoesNotExist
  296. }
  297. return layer.getReference(), nil
  298. }
  299. func (ls *layerStore) Map() map[ChainID]Layer {
  300. ls.layerL.Lock()
  301. defer ls.layerL.Unlock()
  302. layers := map[ChainID]Layer{}
  303. for k, v := range ls.layerMap {
  304. layers[k] = v
  305. }
  306. return layers
  307. }
  308. func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
  309. err := ls.driver.Remove(layer.cacheID)
  310. if err != nil {
  311. return err
  312. }
  313. err = ls.store.Remove(layer.chainID)
  314. if err != nil {
  315. return err
  316. }
  317. metadata.DiffID = layer.diffID
  318. metadata.ChainID = layer.chainID
  319. metadata.Size, err = layer.Size()
  320. if err != nil {
  321. return err
  322. }
  323. metadata.DiffSize = layer.size
  324. return nil
  325. }
  326. func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
  327. depth := 0
  328. removed := []Metadata{}
  329. for {
  330. if l.referenceCount == 0 {
  331. panic("layer not retained")
  332. }
  333. l.referenceCount--
  334. if l.referenceCount != 0 {
  335. return removed, nil
  336. }
  337. if len(removed) == 0 && depth > 0 {
  338. panic("cannot remove layer with child")
  339. }
  340. if l.hasReferences() {
  341. panic("cannot delete referenced layer")
  342. }
  343. var metadata Metadata
  344. if err := ls.deleteLayer(l, &metadata); err != nil {
  345. return nil, err
  346. }
  347. delete(ls.layerMap, l.chainID)
  348. removed = append(removed, metadata)
  349. if l.parent == nil {
  350. return removed, nil
  351. }
  352. depth++
  353. l = l.parent
  354. }
  355. }
  356. func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
  357. ls.layerL.Lock()
  358. defer ls.layerL.Unlock()
  359. layer, ok := ls.layerMap[l.ChainID()]
  360. if !ok {
  361. return []Metadata{}, nil
  362. }
  363. if !layer.hasReference(l) {
  364. return nil, ErrLayerNotRetained
  365. }
  366. layer.deleteReference(l)
  367. return ls.releaseLayer(layer)
  368. }
  369. func (ls *layerStore) CreateRWLayer(name string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error) {
  370. var (
  371. storageOpt map[string]string
  372. initFunc MountInit
  373. mountLabel string
  374. )
  375. if opts != nil {
  376. mountLabel = opts.MountLabel
  377. storageOpt = opts.StorageOpt
  378. initFunc = opts.InitFunc
  379. }
  380. ls.mountL.Lock()
  381. defer ls.mountL.Unlock()
  382. m, ok := ls.mounts[name]
  383. if ok {
  384. return nil, ErrMountNameConflict
  385. }
  386. var err error
  387. var pid string
  388. var p *roLayer
  389. if string(parent) != "" {
  390. p = ls.get(parent)
  391. if p == nil {
  392. return nil, ErrLayerDoesNotExist
  393. }
  394. pid = p.cacheID
  395. // Release parent chain if error
  396. defer func() {
  397. if err != nil {
  398. ls.layerL.Lock()
  399. ls.releaseLayer(p)
  400. ls.layerL.Unlock()
  401. }
  402. }()
  403. }
  404. m = &mountedLayer{
  405. name: name,
  406. parent: p,
  407. mountID: ls.mountID(name),
  408. layerStore: ls,
  409. references: map[RWLayer]*referencedRWLayer{},
  410. }
  411. if initFunc != nil {
  412. pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
  413. if err != nil {
  414. return nil, err
  415. }
  416. m.initID = pid
  417. }
  418. createOpts := &graphdriver.CreateOpts{
  419. StorageOpt: storageOpt,
  420. }
  421. if err = ls.driver.CreateReadWrite(m.mountID, pid, createOpts); err != nil {
  422. return nil, err
  423. }
  424. if err = ls.saveMount(m); err != nil {
  425. return nil, err
  426. }
  427. return m.getReference(), nil
  428. }
  429. func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
  430. ls.mountL.Lock()
  431. defer ls.mountL.Unlock()
  432. mount, ok := ls.mounts[id]
  433. if !ok {
  434. return nil, ErrMountDoesNotExist
  435. }
  436. return mount.getReference(), nil
  437. }
  438. func (ls *layerStore) GetMountID(id string) (string, error) {
  439. ls.mountL.Lock()
  440. defer ls.mountL.Unlock()
  441. mount, ok := ls.mounts[id]
  442. if !ok {
  443. return "", ErrMountDoesNotExist
  444. }
  445. logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
  446. return mount.mountID, nil
  447. }
  448. func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
  449. ls.mountL.Lock()
  450. defer ls.mountL.Unlock()
  451. m, ok := ls.mounts[l.Name()]
  452. if !ok {
  453. return []Metadata{}, nil
  454. }
  455. if err := m.deleteReference(l); err != nil {
  456. return nil, err
  457. }
  458. if m.hasReferences() {
  459. return []Metadata{}, nil
  460. }
  461. if err := ls.driver.Remove(m.mountID); err != nil {
  462. logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
  463. m.retakeReference(l)
  464. return nil, err
  465. }
  466. if m.initID != "" {
  467. if err := ls.driver.Remove(m.initID); err != nil {
  468. logrus.Errorf("Error removing init layer %s: %s", m.name, err)
  469. m.retakeReference(l)
  470. return nil, err
  471. }
  472. }
  473. if err := ls.store.RemoveMount(m.name); err != nil {
  474. logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
  475. m.retakeReference(l)
  476. return nil, err
  477. }
  478. delete(ls.mounts, m.Name())
  479. ls.layerL.Lock()
  480. defer ls.layerL.Unlock()
  481. if m.parent != nil {
  482. return ls.releaseLayer(m.parent)
  483. }
  484. return []Metadata{}, nil
  485. }
  486. func (ls *layerStore) saveMount(mount *mountedLayer) error {
  487. if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
  488. return err
  489. }
  490. if mount.initID != "" {
  491. if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
  492. return err
  493. }
  494. }
  495. if mount.parent != nil {
  496. if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
  497. return err
  498. }
  499. }
  500. ls.mounts[mount.name] = mount
  501. return nil
  502. }
  503. func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
  504. // Use "<graph-id>-init" to maintain compatibility with graph drivers
  505. // which are expecting this layer with this special name. If all
  506. // graph drivers can be updated to not rely on knowing about this layer
  507. // then the initID should be randomly generated.
  508. initID := fmt.Sprintf("%s-init", graphID)
  509. createOpts := &graphdriver.CreateOpts{
  510. MountLabel: mountLabel,
  511. StorageOpt: storageOpt,
  512. }
  513. if err := ls.driver.CreateReadWrite(initID, parent, createOpts); err != nil {
  514. return "", err
  515. }
  516. p, err := ls.driver.Get(initID, "")
  517. if err != nil {
  518. return "", err
  519. }
  520. if err := initFunc(p); err != nil {
  521. ls.driver.Put(initID)
  522. return "", err
  523. }
  524. if err := ls.driver.Put(initID); err != nil {
  525. return "", err
  526. }
  527. return initID, nil
  528. }
  529. func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
  530. diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
  531. if !ok {
  532. diffDriver = &naiveDiffPathDriver{ls.driver}
  533. }
  534. defer metadata.Close()
  535. // get our relative path to the container
  536. fileGetCloser, err := diffDriver.DiffGetter(graphID)
  537. if err != nil {
  538. return err
  539. }
  540. defer fileGetCloser.Close()
  541. metaUnpacker := storage.NewJSONUnpacker(metadata)
  542. upackerCounter := &unpackSizeCounter{metaUnpacker, size}
  543. logrus.Debugf("Assembling tar data for %s", graphID)
  544. return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
  545. }
  546. func (ls *layerStore) Cleanup() error {
  547. return ls.driver.Cleanup()
  548. }
  549. func (ls *layerStore) DriverStatus() [][2]string {
  550. return ls.driver.Status()
  551. }
  552. func (ls *layerStore) DriverName() string {
  553. return ls.driver.String()
  554. }
  555. type naiveDiffPathDriver struct {
  556. graphdriver.Driver
  557. }
  558. type fileGetPutter struct {
  559. storage.FileGetter
  560. driver graphdriver.Driver
  561. id string
  562. }
  563. func (w *fileGetPutter) Close() error {
  564. return w.driver.Put(w.id)
  565. }
  566. func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
  567. p, err := n.Driver.Get(id, "")
  568. if err != nil {
  569. return nil, err
  570. }
  571. return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
  572. }