123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659 |
- package layer
- import (
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution"
- "github.com/docker/distribution/digest"
- "github.com/docker/docker/daemon/graphdriver"
- "github.com/docker/docker/pkg/archive"
- "github.com/docker/docker/pkg/idtools"
- "github.com/docker/docker/pkg/stringid"
- "github.com/vbatts/tar-split/tar/asm"
- "github.com/vbatts/tar-split/tar/storage"
- )
- // maxLayerDepth represents the maximum number of
- // layers which can be chained together. 125 was
- // chosen to account for the 127 max in some
- // graphdrivers plus the 2 additional layers
- // used to create a rwlayer.
- const maxLayerDepth = 125
- type layerStore struct {
- store MetadataStore
- driver graphdriver.Driver
- layerMap map[ChainID]*roLayer
- layerL sync.Mutex
- mounts map[string]*mountedLayer
- mountL sync.Mutex
- }
- // StoreOptions are the options used to create a new Store instance
- type StoreOptions struct {
- StorePath string
- MetadataStorePathTemplate string
- GraphDriver string
- GraphDriverOptions []string
- UIDMaps []idtools.IDMap
- GIDMaps []idtools.IDMap
- }
- // NewStoreFromOptions creates a new Store instance
- func NewStoreFromOptions(options StoreOptions) (Store, error) {
- driver, err := graphdriver.New(
- options.StorePath,
- options.GraphDriver,
- options.GraphDriverOptions,
- options.UIDMaps,
- options.GIDMaps)
- if err != nil {
- return nil, fmt.Errorf("error initializing graphdriver: %v", err)
- }
- logrus.Debugf("Using graph driver %s", driver)
- fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver))
- if err != nil {
- return nil, err
- }
- return NewStoreFromGraphDriver(fms, driver)
- }
- // NewStoreFromGraphDriver creates a new Store instance using the provided
- // metadata store and graph driver. The metadata store will be used to restore
- // the Store.
- func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver) (Store, error) {
- ls := &layerStore{
- store: store,
- driver: driver,
- layerMap: map[ChainID]*roLayer{},
- mounts: map[string]*mountedLayer{},
- }
- ids, mounts, err := store.List()
- if err != nil {
- return nil, err
- }
- for _, id := range ids {
- l, err := ls.loadLayer(id)
- if err != nil {
- logrus.Debugf("Failed to load layer %s: %s", id, err)
- continue
- }
- if l.parent != nil {
- l.parent.referenceCount++
- }
- }
- for _, mount := range mounts {
- if err := ls.loadMount(mount); err != nil {
- logrus.Debugf("Failed to load mount %s: %s", mount, err)
- }
- }
- return ls, nil
- }
- func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
- cl, ok := ls.layerMap[layer]
- if ok {
- return cl, nil
- }
- diff, err := ls.store.GetDiffID(layer)
- if err != nil {
- return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err)
- }
- size, err := ls.store.GetSize(layer)
- if err != nil {
- return nil, fmt.Errorf("failed to get size for %s: %s", layer, err)
- }
- cacheID, err := ls.store.GetCacheID(layer)
- if err != nil {
- return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err)
- }
- parent, err := ls.store.GetParent(layer)
- if err != nil {
- return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err)
- }
- descriptor, err := ls.store.GetDescriptor(layer)
- if err != nil {
- return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
- }
- cl = &roLayer{
- chainID: layer,
- diffID: diff,
- size: size,
- cacheID: cacheID,
- layerStore: ls,
- references: map[Layer]struct{}{},
- descriptor: descriptor,
- }
- if parent != "" {
- p, err := ls.loadLayer(parent)
- if err != nil {
- return nil, err
- }
- cl.parent = p
- }
- ls.layerMap[cl.chainID] = cl
- return cl, nil
- }
- func (ls *layerStore) loadMount(mount string) error {
- if _, ok := ls.mounts[mount]; ok {
- return nil
- }
- mountID, err := ls.store.GetMountID(mount)
- if err != nil {
- return err
- }
- initID, err := ls.store.GetInitID(mount)
- if err != nil {
- return err
- }
- parent, err := ls.store.GetMountParent(mount)
- if err != nil {
- return err
- }
- ml := &mountedLayer{
- name: mount,
- mountID: mountID,
- initID: initID,
- layerStore: ls,
- references: map[RWLayer]*referencedRWLayer{},
- }
- if parent != "" {
- p, err := ls.loadLayer(parent)
- if err != nil {
- return err
- }
- ml.parent = p
- p.referenceCount++
- }
- ls.mounts[ml.name] = ml
- return nil
- }
- func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error {
- digester := digest.Canonical.New()
- tr := io.TeeReader(ts, digester.Hash())
- tsw, err := tx.TarSplitWriter(true)
- if err != nil {
- return err
- }
- metaPacker := storage.NewJSONPacker(tsw)
- defer tsw.Close()
- // we're passing nil here for the file putter, because the ApplyDiff will
- // handle the extraction of the archive
- rdr, err := asm.NewInputTarStream(tr, metaPacker, nil)
- if err != nil {
- return err
- }
- applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, archive.Reader(rdr))
- if err != nil {
- return err
- }
- // Discard trailing data but ensure metadata is picked up to reconstruct stream
- io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
- layer.size = applySize
- layer.diffID = DiffID(digester.Digest())
- logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
- return nil
- }
- func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
- return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{})
- }
- func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) {
- // err is used to hold the error which will always trigger
- // cleanup of creates sources but may not be an error returned
- // to the caller (already exists).
- var err error
- var pid string
- var p *roLayer
- if string(parent) != "" {
- p = ls.get(parent)
- if p == nil {
- return nil, ErrLayerDoesNotExist
- }
- pid = p.cacheID
- // Release parent chain if error
- defer func() {
- if err != nil {
- ls.layerL.Lock()
- ls.releaseLayer(p)
- ls.layerL.Unlock()
- }
- }()
- if p.depth() >= maxLayerDepth {
- err = ErrMaxDepthExceeded
- return nil, err
- }
- }
- // Create new roLayer
- layer := &roLayer{
- parent: p,
- cacheID: stringid.GenerateRandomID(),
- referenceCount: 1,
- layerStore: ls,
- references: map[Layer]struct{}{},
- descriptor: descriptor,
- }
- if err = ls.driver.Create(layer.cacheID, pid, "", nil); err != nil {
- return nil, err
- }
- tx, err := ls.store.StartTransaction()
- if err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err)
- if err := ls.driver.Remove(layer.cacheID); err != nil {
- logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
- }
- if err := tx.Cancel(); err != nil {
- logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
- }
- }
- }()
- if err = ls.applyTar(tx, ts, pid, layer); err != nil {
- return nil, err
- }
- if layer.parent == nil {
- layer.chainID = ChainID(layer.diffID)
- } else {
- layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
- }
- if err = storeLayer(tx, layer); err != nil {
- return nil, err
- }
- ls.layerL.Lock()
- defer ls.layerL.Unlock()
- if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
- // Set error for cleanup, but do not return the error
- err = errors.New("layer already exists")
- return existingLayer.getReference(), nil
- }
- if err = tx.Commit(layer.chainID); err != nil {
- return nil, err
- }
- ls.layerMap[layer.chainID] = layer
- return layer.getReference(), nil
- }
- func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
- l, ok := ls.layerMap[layer]
- if !ok {
- return nil
- }
- l.referenceCount++
- return l
- }
- func (ls *layerStore) get(l ChainID) *roLayer {
- ls.layerL.Lock()
- defer ls.layerL.Unlock()
- return ls.getWithoutLock(l)
- }
- func (ls *layerStore) Get(l ChainID) (Layer, error) {
- ls.layerL.Lock()
- defer ls.layerL.Unlock()
- layer := ls.getWithoutLock(l)
- if layer == nil {
- return nil, ErrLayerDoesNotExist
- }
- return layer.getReference(), nil
- }
- func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error {
- err := ls.driver.Remove(layer.cacheID)
- if err != nil {
- return err
- }
- err = ls.store.Remove(layer.chainID)
- if err != nil {
- return err
- }
- metadata.DiffID = layer.diffID
- metadata.ChainID = layer.chainID
- metadata.Size, err = layer.Size()
- if err != nil {
- return err
- }
- metadata.DiffSize = layer.size
- return nil
- }
- func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
- depth := 0
- removed := []Metadata{}
- for {
- if l.referenceCount == 0 {
- panic("layer not retained")
- }
- l.referenceCount--
- if l.referenceCount != 0 {
- return removed, nil
- }
- if len(removed) == 0 && depth > 0 {
- panic("cannot remove layer with child")
- }
- if l.hasReferences() {
- panic("cannot delete referenced layer")
- }
- var metadata Metadata
- if err := ls.deleteLayer(l, &metadata); err != nil {
- return nil, err
- }
- delete(ls.layerMap, l.chainID)
- removed = append(removed, metadata)
- if l.parent == nil {
- return removed, nil
- }
- depth++
- l = l.parent
- }
- }
- func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
- ls.layerL.Lock()
- defer ls.layerL.Unlock()
- layer, ok := ls.layerMap[l.ChainID()]
- if !ok {
- return []Metadata{}, nil
- }
- if !layer.hasReference(l) {
- return nil, ErrLayerNotRetained
- }
- layer.deleteReference(l)
- return ls.releaseLayer(layer)
- }
- func (ls *layerStore) CreateRWLayer(name string, parent ChainID, mountLabel string, initFunc MountInit, storageOpt map[string]string) (RWLayer, error) {
- ls.mountL.Lock()
- defer ls.mountL.Unlock()
- m, ok := ls.mounts[name]
- if ok {
- return nil, ErrMountNameConflict
- }
- var err error
- var pid string
- var p *roLayer
- if string(parent) != "" {
- p = ls.get(parent)
- if p == nil {
- return nil, ErrLayerDoesNotExist
- }
- pid = p.cacheID
- // Release parent chain if error
- defer func() {
- if err != nil {
- ls.layerL.Lock()
- ls.releaseLayer(p)
- ls.layerL.Unlock()
- }
- }()
- }
- m = &mountedLayer{
- name: name,
- parent: p,
- mountID: ls.mountID(name),
- layerStore: ls,
- references: map[RWLayer]*referencedRWLayer{},
- }
- if initFunc != nil {
- pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt)
- if err != nil {
- return nil, err
- }
- m.initID = pid
- }
- if err = ls.driver.CreateReadWrite(m.mountID, pid, "", storageOpt); err != nil {
- return nil, err
- }
- if err = ls.saveMount(m); err != nil {
- return nil, err
- }
- return m.getReference(), nil
- }
- func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) {
- ls.mountL.Lock()
- defer ls.mountL.Unlock()
- mount, ok := ls.mounts[id]
- if !ok {
- return nil, ErrMountDoesNotExist
- }
- return mount.getReference(), nil
- }
- func (ls *layerStore) GetMountID(id string) (string, error) {
- ls.mountL.Lock()
- defer ls.mountL.Unlock()
- mount, ok := ls.mounts[id]
- if !ok {
- return "", ErrMountDoesNotExist
- }
- logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID)
- return mount.mountID, nil
- }
- func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) {
- ls.mountL.Lock()
- defer ls.mountL.Unlock()
- m, ok := ls.mounts[l.Name()]
- if !ok {
- return []Metadata{}, nil
- }
- if err := m.deleteReference(l); err != nil {
- return nil, err
- }
- if m.hasReferences() {
- return []Metadata{}, nil
- }
- if err := ls.driver.Remove(m.mountID); err != nil {
- logrus.Errorf("Error removing mounted layer %s: %s", m.name, err)
- m.retakeReference(l)
- return nil, err
- }
- if m.initID != "" {
- if err := ls.driver.Remove(m.initID); err != nil {
- logrus.Errorf("Error removing init layer %s: %s", m.name, err)
- m.retakeReference(l)
- return nil, err
- }
- }
- if err := ls.store.RemoveMount(m.name); err != nil {
- logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err)
- m.retakeReference(l)
- return nil, err
- }
- delete(ls.mounts, m.Name())
- ls.layerL.Lock()
- defer ls.layerL.Unlock()
- if m.parent != nil {
- return ls.releaseLayer(m.parent)
- }
- return []Metadata{}, nil
- }
- func (ls *layerStore) saveMount(mount *mountedLayer) error {
- if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
- return err
- }
- if mount.initID != "" {
- if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
- return err
- }
- }
- if mount.parent != nil {
- if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
- return err
- }
- }
- ls.mounts[mount.name] = mount
- return nil
- }
- func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) {
- // Use "<graph-id>-init" to maintain compatibility with graph drivers
- // which are expecting this layer with this special name. If all
- // graph drivers can be updated to not rely on knowing about this layer
- // then the initID should be randomly generated.
- initID := fmt.Sprintf("%s-init", graphID)
- if err := ls.driver.Create(initID, parent, mountLabel, storageOpt); err != nil {
- return "", err
- }
- p, err := ls.driver.Get(initID, "")
- if err != nil {
- return "", err
- }
- if err := initFunc(p); err != nil {
- ls.driver.Put(initID)
- return "", err
- }
- if err := ls.driver.Put(initID); err != nil {
- return "", err
- }
- return initID, nil
- }
- func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
- diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver)
- if !ok {
- diffDriver = &naiveDiffPathDriver{ls.driver}
- }
- defer metadata.Close()
- // get our relative path to the container
- fileGetCloser, err := diffDriver.DiffGetter(graphID)
- if err != nil {
- return err
- }
- defer fileGetCloser.Close()
- metaUnpacker := storage.NewJSONUnpacker(metadata)
- upackerCounter := &unpackSizeCounter{metaUnpacker, size}
- logrus.Debugf("Assembling tar data for %s", graphID)
- return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w)
- }
- func (ls *layerStore) Cleanup() error {
- return ls.driver.Cleanup()
- }
- func (ls *layerStore) DriverStatus() [][2]string {
- return ls.driver.Status()
- }
- func (ls *layerStore) DriverName() string {
- return ls.driver.String()
- }
- type naiveDiffPathDriver struct {
- graphdriver.Driver
- }
- type fileGetPutter struct {
- storage.FileGetter
- driver graphdriver.Driver
- id string
- }
- func (w *fileGetPutter) Close() error {
- return w.driver.Put(w.id)
- }
- func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) {
- p, err := n.Driver.Get(id, "")
- if err != nil {
- return nil, err
- }
- return &fileGetPutter{storage.NewPathFileGetter(p), n.Driver, id}, nil
- }
|