layer_store.go 15 KB


  1. package layer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/distribution"
  10. "github.com/docker/distribution/digest"
  11. "github.com/docker/docker/daemon/graphdriver"
  12. "github.com/docker/docker/pkg/idtools"
  13. "github.com/docker/docker/pkg/plugingetter"
  14. "github.com/docker/docker/pkg/stringid"
  15. "github.com/vbatts/tar-split/tar/asm"
  16. "github.com/vbatts/tar-split/tar/storage"
  17. )
  18. // maxLayerDepth represents the maximum number of
  19. // layers which can be chained together. 125 was
  20. // chosen to account for the 127 max in some
  21. // graphdrivers plus the 2 additional layers
  22. // used to create a rwlayer.
  23. const maxLayerDepth = 125
  24. type layerStore struct {
  25. store MetadataStore
  26. driver graphdriver.Driver
  27. layerMap map[ChainID]*roLayer
  28. layerL sync.Mutex
  29. mounts map[string]*mountedLayer
  30. mountL sync.Mutex
  31. }
  32. // StoreOptions are the options used to create a new Store instance
  33. type StoreOptions struct {
  34. StorePath string
  35. MetadataStorePathTemplate string
  36. GraphDriver string
  37. GraphDriverOptions []string
  38. UIDMaps []idtools.IDMap
  39. GIDMaps []idtools.IDMap
  40. PluginGetter plugingetter.PluginGetter
  41. }
  42. // NewStoreFromOptions creates a new Store instance
  43. func NewStoreFromOptions(options StoreOptions) (Store, error) {
  44. driver, err := graphdriver.New(
  45. options.StorePath,
  46. options.GraphDriver,
  47. options.GraphDriverOptions,
  48. options.UIDMaps,
  49. options.GIDMaps,
  50. options.PluginGetter)
  51. if err != nil {
  52. return nil, fmt.Errorf("error initializing graphdriver: %v", err)
  53. }
  54. logrus.Debugf("Using graph driver %s", driver)
  55. fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver))
  56. if err != nil {
  57. return nil, err
  58. }
  59. return NewStoreFromGraphDriver(fms, driver)
  60. }
  61. // NewStoreFromGraphDriver creates a new Store instance using the provided
  62. // metadata store and graph driver. The metadata store will be used to restore
  63. // the Store.
  64. func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver) (Store, error) {
  65. ls := &layerStore{
  66. store: store,
  67. driver: driver,
  68. layerMap: map[ChainID]*roLayer{},
  69. mounts: map[string]*mountedLayer{},
  70. }
  71. ids, mounts, err := store.List()
  72. if err != nil {
  73. return nil, err
  74. }
  75. for _, id := range ids {
  76. l, err := ls.loadLayer(id)
  77. if err != nil {
  78. logrus.Debugf("Failed to load layer %s: %s", id, err)
  79. continue
  80. }
  81. if l.parent != nil {
  82. l.parent.referenceCount++
  83. }
  84. }
  85. for _, mount := range mounts {
  86. if err := ls.loadMount(mount); err != nil {
  87. logrus.Debugf("Failed to load mount %s: %s", mount, err)
  88. }
  89. }
  90. return ls, nil
  91. }
  92. func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
  93. cl, ok := ls.layerMap[layer]
  94. if ok {
  95. return cl, nil
  96. }
  97. diff, err := ls.store.GetDiffID(layer)
  98. if err != nil {
  99. return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
  100. }
  101. size, err := ls.store.GetSize(layer)
  102. if err != nil {
  103. return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
  104. }
  105. cacheID, err := ls.store.GetCacheID(layer)
  106. if err != nil {
  107. return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
  108. }
  109. parent, err := ls.store.GetParent(layer)
  110. if err != nil {
  111. return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
  112. }
  113. descriptor, err := ls.store.GetDescriptor(layer)
  114. if err != nil {
  115. return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
  116. }
  117. cl = &roLayer{
  118. chainID: layer,
  119. diffID: diff,
  120. size: size,
  121. cacheID: cacheID,
  122. layerStore: ls,
  123. references: map[Layer]struct{}{},
  124. descriptor: descriptor,
  125. }
  126. if parent != "" {
  127. p, err := ls.loadLayer(parent)
  128. if err != nil {
  129. return nil, err
  130. }
  131. cl.parent = p
  132. }
  133. ls.layerMap[cl.chainID] = cl
  134. return cl, nil
  135. }
  136. func (ls *layerStore) loadMount(mount string) error {
  137. if _, ok := ls.mounts[mount]; ok {
  138. return nil
  139. }
  140. mountID, err := ls.store.GetMountID(mount)
  141. if err != nil {
  142. return err
  143. }
  144. initID, err := ls.store.GetInitID(mount)
  145. if err != nil {
  146. return err
  147. }
  148. parent, err := ls.store.GetMountParent(mount)
  149. if err != nil {
  150. return err
  151. }
  152. ml := &mountedLayer{
  153. name: mount,
  154. mountID: mountID,
  155. initID: initID,
  156. layerStore: ls,
  157. references: map[RWLayer]*referencedRWLayer{},
  158. }
  159. if parent != "" {
  160. p, err := ls.loadLayer(parent)
  161. if err != nil {
  162. return err
  163. }
  164. ml.parent = p
  165. p.referenceCount++
  166. }
  167. ls.mounts[ml.name] = ml
  168. return nil
  169. }
  170. func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
  171. digester := digest.Canonical.New()
  172. tr := io.TeeReader(ts, digester.Hash())
  173. tsw, err := tx.TarSplitWriter(true)
  174. if err != nil {
  175. return err
  176. }
  177. metaPacker := storage.NewJSONPacker(tsw)
  178. defer tsw.Close()
  179. // we're passing nil here for the file putter, because the ApplyDiff will
  180. // handle the extraction of the archive
  181. rdr, err := asm.NewInputTarStream(tr, metaPacker, nil)
  182. if err != nil {
  183. return err
  184. }
  185. applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, rdr)
  186. if err != nil {
  187. return err
  188. }
  189. // Discard trailing data but ensure metadata is picked up to reconstruct stream
  190. io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
  191. layer.size = applySize
  192. layer.diffID = DiffID(digester.Digest())
  193. logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
  194. return nil
  195. }
  196. func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
  197. return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
  198. }
  199. func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
  200. // err is used to hold the error which will always trigger
  201. // cleanup of creates sources but may not be an error returned
  202. // to the caller (already exists).
  203. var err error
  204. var pid string
  205. var p *roLayer
  206. if string(parent) != "" {
  207. p = ls.get(parent)
  208. if p == nil {
  209. return nil, ErrLayerDoesNotExist
  210. }
  211. pid = p.cacheID
  212. // Release parent chain if error
  213. defer func() {
  214. if err != nil {
  215. ls.layerL.Lock()
  216. ls.releaseLayer(p)
  217. ls.layerL.Unlock()
  218. }
  219. }()
  220. if p.depth() >= maxLayerDepth {
  221. err = ErrMaxDepthExceeded
  222. return nil, err
  223. }
  224. }
  225. // Create new roLayer
  226. layer := &roLayer{
  227. parent: p,
  228. cacheID: stringid.GenerateRandomID(),
  229. referenceCount: 1,
  230. layerStore: ls,
  231. references: map[Layer]struct{}{},
  232. descriptor: descriptor,
  233. }
  234. if err = ls.driver.Create(layer.cacheID, pid, nil); err != nil {
  235. return nil, err
  236. }
  237. tx, err := ls.store.StartTransaction()
  238. if err != nil {
  239. return nil, err
  240. }
  241. defer func() {
  242. if err != nil {
  243. logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
  244. if err := ls.driver.Remove(layer.cacheID); err != nil {
  245. logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
  246. }
  247. if err := tx.Cancel(); err != nil {
  248. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  249. }
  250. }
  251. }()
  252. if err = ls.applyTar(tx, ts, pid, layer); err != nil {
  253. return nil, err
  254. }
  255. if layer.parent == nil {
  256. layer.chainID = ChainID(layer.diffID)
  257. } else {
  258. layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
  259. }
  260. if err = storeLayer(tx, layer); err != nil {
  261. return nil, err
  262. }
  263. ls.layerL.Lock()
  264. defer ls.layerL.Unlock()
  265. if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
  266. // Set error for cleanup, but do not return the error
  267. err = errors.New("layer already exists")
  268. return existingLayer.getReference(), nil
  269. }
  270. if err = tx.Commit(layer.chainID); err != nil {
  271. return nil, err
  272. }
  273. ls.layerMap[layer.chainID] = layer
  274. return layer.getReference(), nil
  275. }
  276. func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
  277. l, ok := ls.layerMap[layer]
  278. if !ok {
  279. return nil
  280. }
  281. l.referenceCount++
  282. return l
  283. }
  284. func (ls *layerStore) get(l ChainID) *roLayer {
  285. ls.layerL.Lock()
  286. defer ls.layerL.Unlock()
  287. return ls.getWithoutLock(l)
  288. }
  289. func (ls *layerStore) Get(l ChainID) (Layer, error) {
  290. ls.layerL.Lock()
  291. defer ls.layerL.Unlock()
  292. layer := ls.getWithoutLock(l)
  293. if layer == nil {
  294. return nil, ErrLayerDoesNotExist
  295. }
  296. return layer.getReference(), nil
  297. }
  298. func (ls *layerStore) Map() map[ChainID]Layer {
  299. ls.layerL.Lock()
  300. defer ls.layerL.Unlock()
  301. layers := map[ChainID]Layer{}
  302. for k, v := range ls.layerMap {
  303. layers[k] = v
  304. }
  305. return layers
  306. }
  307. func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
  308. err := ls.driver.Remove(layer.cacheID)
  309. if err != nil {
  310. return err
  311. }
  312. err = ls.store.Remove(layer.chainID)
  313. if err != nil {
  314. return err
  315. }
  316. metadata.DiffID = layer.diffID
  317. metadata.ChainID = layer.chainID
  318. metadata.Size, err = layer.Size()
  319. if err != nil {
  320. return err
  321. }
  322. metadata.DiffSize = layer.size
  323. return nil
  324. }
  325. func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
  326. depth := 0
  327. removed := []Metadata{}
  328. for {
  329. if l.referenceCount == 0 {
  330. panic("layer not retained")
  331. }
  332. l.referenceCount--
  333. if l.referenceCount != 0 {
  334. return removed, nil
  335. }
  336. if len(removed) == 0 && depth > 0 {
  337. panic("cannot remove layer with child")
  338. }
  339. if l.hasReferences() {
  340. panic("cannot delete referenced layer")
  341. }
  342. var metadata Metadata
  343. if err := ls.deleteLayer(l, &metadata); err != nil {
  344. return nil, err
  345. }
  346. delete(ls.layerMap, l.chainID)
  347. removed = append(removed, metadata)
  348. if l.parent == nil {
  349. return removed, nil
  350. }
  351. depth++
  352. l = l.parent
  353. }
  354. }
  355. func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
  356. ls.layerL.Lock()
  357. defer ls.layerL.Unlock()
  358. layer, ok := ls.layerMap[l.ChainID()]
  359. if !ok {
  360. return []Metadata{}, nil
  361. }
  362. if !layer.hasReference(l) {
  363. return nil, ErrLayerNotRetained
  364. }
  365. layer.deleteReference(l)
  366. return ls.releaseLayer(layer)
  367. }
  368. func (ls *layerStore) CreateRWLayer(name string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error) {
  369. var (
  370. storageOpt map[string]string
  371. initFunc MountInit
  372. mountLabel string
  373. )
  374. if opts != nil {
  375. mountLabel = opts.MountLabel
  376. storageOpt = opts.StorageOpt
  377. initFunc = opts.InitFunc
  378. }
  379. ls.mountL.Lock()
  380. defer ls.mountL.Unlock()
  381. m, ok := ls.mounts[name]
  382. if ok {
  383. return nil, ErrMountNameConflict
  384. }
  385. var err error
  386. var pid string
  387. var p *roLayer
  388. if string(parent) != "" {
  389. p = ls.get(parent)
  390. if p == nil {
  391. return nil, ErrLayerDoesNotExist
  392. }
  393. pid = p.cacheID
  394. // Release parent chain if error
  395. defer func() {
  396. if err != nil {
  397. ls.layerL.Lock()
  398. ls.releaseLayer(p)
  399. ls.layerL.Unlock()
  400. }
  401. }()
  402. }
  403. m = &mountedLayer{
  404. name: name,
  405. parent: p,
  406. mountID: ls.mountID(name),
  407. layerStore: ls,
  408. references: map[RWLayer]*referencedRWLayer{},
  409. }
  410. if initFunc != nil {
  411. pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
  412. if err != nil {
  413. return nil, err
  414. }
  415. m.initID = pid
  416. }
  417. createOpts := &graphdriver.CreateOpts{
  418. StorageOpt: storageOpt,
  419. }
  420. if err = ls.driver.CreateReadWrite(m.mountID, pid, createOpts); err != nil {
  421. return nil, err
  422. }
  423. if err = ls.saveMount(m); err != nil {
  424. return nil, err
  425. }
  426. return m.getReference(), nil
  427. }
  428. func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
  429. ls.mountL.Lock()
  430. defer ls.mountL.Unlock()
  431. mount, ok := ls.mounts[id]
  432. if !ok {
  433. return nil, ErrMountDoesNotExist
  434. }
  435. return mount.getReference(), nil
  436. }
  437. func (ls *layerStore) GetMountID(id string) (string, error) {
  438. ls.mountL.Lock()
  439. defer ls.mountL.Unlock()
  440. mount, ok := ls.mounts[id]
  441. if !ok {
  442. return "", ErrMountDoesNotExist
  443. }
  444. logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
  445. return mount.mountID, nil
  446. }
  447. func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
  448. ls.mountL.Lock()
  449. defer ls.mountL.Unlock()
  450. m, ok := ls.mounts[l.Name()]
  451. if !ok {
  452. return []Metadata{}, nil
  453. }
  454. if err := m.deleteReference(l); err != nil {
  455. return nil, err
  456. }
  457. if m.hasReferences() {
  458. return []Metadata{}, nil
  459. }
  460. if err := ls.driver.Remove(m.mountID); err != nil {
  461. logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
  462. m.retakeReference(l)
  463. return nil, err
  464. }
  465. if m.initID != "" {
  466. if err := ls.driver.Remove(m.initID); err != nil {
  467. logrus.Errorf("Error removing init layer %s: %s", m.name, err)
  468. m.retakeReference(l)
  469. return nil, err
  470. }
  471. }
  472. if err := ls.store.RemoveMount(m.name); err != nil {
  473. logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
  474. m.retakeReference(l)
  475. return nil, err
  476. }
  477. delete(ls.mounts, m.Name())
  478. ls.layerL.Lock()
  479. defer ls.layerL.Unlock()
  480. if m.parent != nil {
  481. return ls.releaseLayer(m.parent)
  482. }
  483. return []Metadata{}, nil
  484. }
  485. func (ls *layerStore) saveMount(mount *mountedLayer) error {
  486. if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
  487. return err
  488. }
  489. if mount.initID != "" {
  490. if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
  491. return err
  492. }
  493. }
  494. if mount.parent != nil {
  495. if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
  496. return err
  497. }
  498. }
  499. ls.mounts[mount.name] = mount
  500. return nil
  501. }
  502. func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
  503. // Use "<graph-id>-init" to maintain compatibility with graph drivers
  504. // which are expecting this layer with this special name. If all
  505. // graph drivers can be updated to not rely on knowing about this layer
  506. // then the initID should be randomly generated.
  507. initID := fmt.Sprintf("%s-init", graphID)
  508. createOpts := &graphdriver.CreateOpts{
  509. MountLabel: mountLabel,
  510. StorageOpt: storageOpt,
  511. }
  512. if err := ls.driver.CreateReadWrite(initID, parent, createOpts); err != nil {
  513. return "", err
  514. }
  515. p, err := ls.driver.Get(initID, "")
  516. if err != nil {
  517. return "", err
  518. }
  519. if err := initFunc(p); err != nil {
  520. ls.driver.Put(initID)
  521. return "", err
  522. }
  523. if err := ls.driver.Put(initID); err != nil {
  524. return "", err
  525. }
  526. return initID, nil
  527. }
  528. func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
  529. diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
  530. if !ok {
  531. diffDriver = &naiveDiffPathDriver{ls.driver}
  532. }
  533. defer metadata.Close()
  534. // get our relative path to the container
  535. fileGetCloser, err := diffDriver.DiffGetter(graphID)
  536. if err != nil {
  537. return err
  538. }
  539. defer fileGetCloser.Close()
  540. metaUnpacker := storage.NewJSONUnpacker(metadata)
  541. upackerCounter := &unpackSizeCounter{metaUnpacker, size}
  542. logrus.Debugf("Assembling tar data for %s", graphID)
  543. return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
  544. }
  545. func (ls *layerStore) Cleanup() error {
  546. return ls.driver.Cleanup()
  547. }
  548. func (ls *layerStore) DriverStatus() [][2]string {
  549. return ls.driver.Status()
  550. }
  551. func (ls *layerStore) DriverName() string {
  552. return ls.driver.String()
  553. }
  554. type naiveDiffPathDriver struct {
  555. graphdriver.Driver
  556. }
  557. type fileGetPutter struct {
  558. storage.FileGetter
  559. driver graphdriver.Driver
  560. id string
  561. }
  562. func (w *fileGetPutter) Close() error {
  563. return w.driver.Put(w.id)
  564. }
  565. func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
  566. p, err := n.Driver.Get(id, "")
  567. if err != nil {
  568. return nil, err
  569. }
  570. return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
  571. }