layer_store.go 14 KB


  1. package layer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/distribution"
  10. "github.com/docker/distribution/digest"
  11. "github.com/docker/docker/daemon/graphdriver"
  12. "github.com/docker/docker/pkg/archive"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/stringid"
  15. "github.com/vbatts/tar-split/tar/asm"
  16. "github.com/vbatts/tar-split/tar/storage"
  17. )
  18. // maxLayerDepth represents the maximum number of
  19. // layers which can be chained together. 125 was
  20. // chosen to account for the 127 max in some
  21. // graphdrivers plus the 2 additional layers
  22. // used to create a rwlayer.
  23. const maxLayerDepth = 125
  24. type layerStore struct {
  25. store MetadataStore
  26. driver graphdriver.Driver
  27. layerMap map[ChainID]*roLayer
  28. layerL sync.Mutex
  29. mounts map[string]*mountedLayer
  30. mountL sync.Mutex
  31. }
  32. // StoreOptions are the options used to create a new Store instance
  33. type StoreOptions struct {
  34. StorePath string
  35. MetadataStorePathTemplate string
  36. GraphDriver string
  37. GraphDriverOptions []string
  38. UIDMaps []idtools.IDMap
  39. GIDMaps []idtools.IDMap
  40. }
  41. // NewStoreFromOptions creates a new Store instance
  42. func NewStoreFromOptions(options StoreOptions) (Store, error) {
  43. driver, err := graphdriver.New(
  44. options.StorePath,
  45. options.GraphDriver,
  46. options.GraphDriverOptions,
  47. options.UIDMaps,
  48. options.GIDMaps)
  49. if err != nil {
  50. return nil, fmt.Errorf("error initializing graphdriver: %v", err)
  51. }
  52. logrus.Debugf("Using graph driver %s", driver)
  53. fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver))
  54. if err != nil {
  55. return nil, err
  56. }
  57. return NewStoreFromGraphDriver(fms, driver)
  58. }
  59. // NewStoreFromGraphDriver creates a new Store instance using the provided
  60. // metadata store and graph driver. The metadata store will be used to restore
  61. // the Store.
  62. func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver) (Store, error) {
  63. ls := &layerStore{
  64. store: store,
  65. driver: driver,
  66. layerMap: map[ChainID]*roLayer{},
  67. mounts: map[string]*mountedLayer{},
  68. }
  69. ids, mounts, err := store.List()
  70. if err != nil {
  71. return nil, err
  72. }
  73. for _, id := range ids {
  74. l, err := ls.loadLayer(id)
  75. if err != nil {
  76. logrus.Debugf("Failed to load layer %s: %s", id, err)
  77. continue
  78. }
  79. if l.parent != nil {
  80. l.parent.referenceCount++
  81. }
  82. }
  83. for _, mount := range mounts {
  84. if err := ls.loadMount(mount); err != nil {
  85. logrus.Debugf("Failed to load mount %s: %s", mount, err)
  86. }
  87. }
  88. return ls, nil
  89. }
  90. func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
  91. cl, ok := ls.layerMap[layer]
  92. if ok {
  93. return cl, nil
  94. }
  95. diff, err := ls.store.GetDiffID(layer)
  96. if err != nil {
  97. return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
  98. }
  99. size, err := ls.store.GetSize(layer)
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
  102. }
  103. cacheID, err := ls.store.GetCacheID(layer)
  104. if err != nil {
  105. return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
  106. }
  107. parent, err := ls.store.GetParent(layer)
  108. if err != nil {
  109. return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
  110. }
  111. descriptor, err := ls.store.GetDescriptor(layer)
  112. if err != nil {
  113. return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
  114. }
  115. cl = &roLayer{
  116. chainID: layer,
  117. diffID: diff,
  118. size: size,
  119. cacheID: cacheID,
  120. layerStore: ls,
  121. references: map[Layer]struct{}{},
  122. descriptor: descriptor,
  123. }
  124. if parent != "" {
  125. p, err := ls.loadLayer(parent)
  126. if err != nil {
  127. return nil, err
  128. }
  129. cl.parent = p
  130. }
  131. ls.layerMap[cl.chainID] = cl
  132. return cl, nil
  133. }
  134. func (ls *layerStore) loadMount(mount string) error {
  135. if _, ok := ls.mounts[mount]; ok {
  136. return nil
  137. }
  138. mountID, err := ls.store.GetMountID(mount)
  139. if err != nil {
  140. return err
  141. }
  142. initID, err := ls.store.GetInitID(mount)
  143. if err != nil {
  144. return err
  145. }
  146. parent, err := ls.store.GetMountParent(mount)
  147. if err != nil {
  148. return err
  149. }
  150. ml := &mountedLayer{
  151. name: mount,
  152. mountID: mountID,
  153. initID: initID,
  154. layerStore: ls,
  155. references: map[RWLayer]*referencedRWLayer{},
  156. }
  157. if parent != "" {
  158. p, err := ls.loadLayer(parent)
  159. if err != nil {
  160. return err
  161. }
  162. ml.parent = p
  163. p.referenceCount++
  164. }
  165. ls.mounts[ml.name] = ml
  166. return nil
  167. }
  168. func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
  169. digester := digest.Canonical.New()
  170. tr := io.TeeReader(ts, digester.Hash())
  171. tsw, err := tx.TarSplitWriter(true)
  172. if err != nil {
  173. return err
  174. }
  175. metaPacker := storage.NewJSONPacker(tsw)
  176. defer tsw.Close()
  177. // we're passing nil here for the file putter, because the ApplyDiff will
  178. // handle the extraction of the archive
  179. rdr, err := asm.NewInputTarStream(tr, metaPacker, nil)
  180. if err != nil {
  181. return err
  182. }
  183. applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, archive.Reader(rdr))
  184. if err != nil {
  185. return err
  186. }
  187. // Discard trailing data but ensure metadata is picked up to reconstruct stream
  188. io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
  189. layer.size = applySize
  190. layer.diffID = DiffID(digester.Digest())
  191. logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
  192. return nil
  193. }
  194. func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
  195. return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
  196. }
  197. func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
  198. // err is used to hold the error which will always trigger
  199. // cleanup of creates sources but may not be an error returned
  200. // to the caller (already exists).
  201. var err error
  202. var pid string
  203. var p *roLayer
  204. if string(parent) != "" {
  205. p = ls.get(parent)
  206. if p == nil {
  207. return nil, ErrLayerDoesNotExist
  208. }
  209. pid = p.cacheID
  210. // Release parent chain if error
  211. defer func() {
  212. if err != nil {
  213. ls.layerL.Lock()
  214. ls.releaseLayer(p)
  215. ls.layerL.Unlock()
  216. }
  217. }()
  218. if p.depth() >= maxLayerDepth {
  219. err = ErrMaxDepthExceeded
  220. return nil, err
  221. }
  222. }
  223. // Create new roLayer
  224. layer := &roLayer{
  225. parent: p,
  226. cacheID: stringid.GenerateRandomID(),
  227. referenceCount: 1,
  228. layerStore: ls,
  229. references: map[Layer]struct{}{},
  230. descriptor: descriptor,
  231. }
  232. if err = ls.driver.Create(layer.cacheID, pid, "", nil); err != nil {
  233. return nil, err
  234. }
  235. tx, err := ls.store.StartTransaction()
  236. if err != nil {
  237. return nil, err
  238. }
  239. defer func() {
  240. if err != nil {
  241. logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
  242. if err := ls.driver.Remove(layer.cacheID); err != nil {
  243. logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
  244. }
  245. if err := tx.Cancel(); err != nil {
  246. logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
  247. }
  248. }
  249. }()
  250. if err = ls.applyTar(tx, ts, pid, layer); err != nil {
  251. return nil, err
  252. }
  253. if layer.parent == nil {
  254. layer.chainID = ChainID(layer.diffID)
  255. } else {
  256. layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
  257. }
  258. if err = storeLayer(tx, layer); err != nil {
  259. return nil, err
  260. }
  261. ls.layerL.Lock()
  262. defer ls.layerL.Unlock()
  263. if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
  264. // Set error for cleanup, but do not return the error
  265. err = errors.New("layer already exists")
  266. return existingLayer.getReference(), nil
  267. }
  268. if err = tx.Commit(layer.chainID); err != nil {
  269. return nil, err
  270. }
  271. ls.layerMap[layer.chainID] = layer
  272. return layer.getReference(), nil
  273. }
  274. func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
  275. l, ok := ls.layerMap[layer]
  276. if !ok {
  277. return nil
  278. }
  279. l.referenceCount++
  280. return l
  281. }
  282. func (ls *layerStore) get(l ChainID) *roLayer {
  283. ls.layerL.Lock()
  284. defer ls.layerL.Unlock()
  285. return ls.getWithoutLock(l)
  286. }
  287. func (ls *layerStore) Get(l ChainID) (Layer, error) {
  288. ls.layerL.Lock()
  289. defer ls.layerL.Unlock()
  290. layer := ls.getWithoutLock(l)
  291. if layer == nil {
  292. return nil, ErrLayerDoesNotExist
  293. }
  294. return layer.getReference(), nil
  295. }
  296. func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
  297. err := ls.driver.Remove(layer.cacheID)
  298. if err != nil {
  299. return err
  300. }
  301. err = ls.store.Remove(layer.chainID)
  302. if err != nil {
  303. return err
  304. }
  305. metadata.DiffID = layer.diffID
  306. metadata.ChainID = layer.chainID
  307. metadata.Size, err = layer.Size()
  308. if err != nil {
  309. return err
  310. }
  311. metadata.DiffSize = layer.size
  312. return nil
  313. }
  314. func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
  315. depth := 0
  316. removed := []Metadata{}
  317. for {
  318. if l.referenceCount == 0 {
  319. panic("layer not retained")
  320. }
  321. l.referenceCount--
  322. if l.referenceCount != 0 {
  323. return removed, nil
  324. }
  325. if len(removed) == 0 && depth > 0 {
  326. panic("cannot remove layer with child")
  327. }
  328. if l.hasReferences() {
  329. panic("cannot delete referenced layer")
  330. }
  331. var metadata Metadata
  332. if err := ls.deleteLayer(l, &metadata); err != nil {
  333. return nil, err
  334. }
  335. delete(ls.layerMap, l.chainID)
  336. removed = append(removed, metadata)
  337. if l.parent == nil {
  338. return removed, nil
  339. }
  340. depth++
  341. l = l.parent
  342. }
  343. }
  344. func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
  345. ls.layerL.Lock()
  346. defer ls.layerL.Unlock()
  347. layer, ok := ls.layerMap[l.ChainID()]
  348. if !ok {
  349. return []Metadata{}, nil
  350. }
  351. if !layer.hasReference(l) {
  352. return nil, ErrLayerNotRetained
  353. }
  354. layer.deleteReference(l)
  355. return ls.releaseLayer(layer)
  356. }
  357. func (ls *layerStore) CreateRWLayer(name string, parent ChainID, mountLabel string, initFunc MountInit, storageOpt map[string]string) (RWLayer, error) {
  358. ls.mountL.Lock()
  359. defer ls.mountL.Unlock()
  360. m, ok := ls.mounts[name]
  361. if ok {
  362. return nil, ErrMountNameConflict
  363. }
  364. var err error
  365. var pid string
  366. var p *roLayer
  367. if string(parent) != "" {
  368. p = ls.get(parent)
  369. if p == nil {
  370. return nil, ErrLayerDoesNotExist
  371. }
  372. pid = p.cacheID
  373. // Release parent chain if error
  374. defer func() {
  375. if err != nil {
  376. ls.layerL.Lock()
  377. ls.releaseLayer(p)
  378. ls.layerL.Unlock()
  379. }
  380. }()
  381. }
  382. m = &mountedLayer{
  383. name: name,
  384. parent: p,
  385. mountID: ls.mountID(name),
  386. layerStore: ls,
  387. references: map[RWLayer]*referencedRWLayer{},
  388. }
  389. if initFunc != nil {
  390. pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
  391. if err != nil {
  392. return nil, err
  393. }
  394. m.initID = pid
  395. }
  396. if err = ls.driver.CreateReadWrite(m.mountID, pid, "", storageOpt); err != nil {
  397. return nil, err
  398. }
  399. if err = ls.saveMount(m); err != nil {
  400. return nil, err
  401. }
  402. return m.getReference(), nil
  403. }
  404. func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
  405. ls.mountL.Lock()
  406. defer ls.mountL.Unlock()
  407. mount, ok := ls.mounts[id]
  408. if !ok {
  409. return nil, ErrMountDoesNotExist
  410. }
  411. return mount.getReference(), nil
  412. }
  413. func (ls *layerStore) GetMountID(id string) (string, error) {
  414. ls.mountL.Lock()
  415. defer ls.mountL.Unlock()
  416. mount, ok := ls.mounts[id]
  417. if !ok {
  418. return "", ErrMountDoesNotExist
  419. }
  420. logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
  421. return mount.mountID, nil
  422. }
  423. func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
  424. ls.mountL.Lock()
  425. defer ls.mountL.Unlock()
  426. m, ok := ls.mounts[l.Name()]
  427. if !ok {
  428. return []Metadata{}, nil
  429. }
  430. if err := m.deleteReference(l); err != nil {
  431. return nil, err
  432. }
  433. if m.hasReferences() {
  434. return []Metadata{}, nil
  435. }
  436. if err := ls.driver.Remove(m.mountID); err != nil {
  437. logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
  438. m.retakeReference(l)
  439. return nil, err
  440. }
  441. if m.initID != "" {
  442. if err := ls.driver.Remove(m.initID); err != nil {
  443. logrus.Errorf("Error removing init layer %s: %s", m.name, err)
  444. m.retakeReference(l)
  445. return nil, err
  446. }
  447. }
  448. if err := ls.store.RemoveMount(m.name); err != nil {
  449. logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
  450. m.retakeReference(l)
  451. return nil, err
  452. }
  453. delete(ls.mounts, m.Name())
  454. ls.layerL.Lock()
  455. defer ls.layerL.Unlock()
  456. if m.parent != nil {
  457. return ls.releaseLayer(m.parent)
  458. }
  459. return []Metadata{}, nil
  460. }
  461. func (ls *layerStore) saveMount(mount *mountedLayer) error {
  462. if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
  463. return err
  464. }
  465. if mount.initID != "" {
  466. if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
  467. return err
  468. }
  469. }
  470. if mount.parent != nil {
  471. if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
  472. return err
  473. }
  474. }
  475. ls.mounts[mount.name] = mount
  476. return nil
  477. }
  478. func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
  479. // Use "<graph-id>-init" to maintain compatibility with graph drivers
  480. // which are expecting this layer with this special name. If all
  481. // graph drivers can be updated to not rely on knowing about this layer
  482. // then the initID should be randomly generated.
  483. initID := fmt.Sprintf("%s-init", graphID)
  484. if err := ls.driver.Create(initID, parent, mountLabel, storageOpt); err != nil {
  485. return "", err
  486. }
  487. p, err := ls.driver.Get(initID, "")
  488. if err != nil {
  489. return "", err
  490. }
  491. if err := initFunc(p); err != nil {
  492. ls.driver.Put(initID)
  493. return "", err
  494. }
  495. if err := ls.driver.Put(initID); err != nil {
  496. return "", err
  497. }
  498. return initID, nil
  499. }
  500. func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
  501. diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
  502. if !ok {
  503. diffDriver = &naiveDiffPathDriver{ls.driver}
  504. }
  505. defer metadata.Close()
  506. // get our relative path to the container
  507. fileGetCloser, err := diffDriver.DiffGetter(graphID)
  508. if err != nil {
  509. return err
  510. }
  511. defer fileGetCloser.Close()
  512. metaUnpacker := storage.NewJSONUnpacker(metadata)
  513. upackerCounter := &unpackSizeCounter{metaUnpacker, size}
  514. logrus.Debugf("Assembling tar data for %s", graphID)
  515. return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
  516. }
  517. func (ls *layerStore) Cleanup() error {
  518. return ls.driver.Cleanup()
  519. }
  520. func (ls *layerStore) DriverStatus() [][2]string {
  521. return ls.driver.Status()
  522. }
  523. func (ls *layerStore) DriverName() string {
  524. return ls.driver.String()
  525. }
  526. type naiveDiffPathDriver struct {
  527. graphdriver.Driver
  528. }
  529. type fileGetPutter struct {
  530. storage.FileGetter
  531. driver graphdriver.Driver
  532. id string
  533. }
  534. func (w *fileGetPutter) Close() error {
  535. return w.driver.Put(w.id)
  536. }
  537. func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
  538. p, err := n.Driver.Get(id, "")
  539. if err != nil {
  540. return nil, err
  541. }
  542. return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
  543. }