Merge pull request #42715 from rvolosatovs/shared_disk_usage
Share disk usage computation results between concurrent invocations
This commit is contained in:
commit
b88acf7a7a
5 changed files with 331 additions and 63 deletions
|
@ -19,11 +19,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/pkg/fileutils"
|
||||
"go.etcd.io/bbolt"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
|
@ -38,18 +33,14 @@ import (
|
|||
"github.com/docker/docker/daemon/discovery"
|
||||
"github.com/docker/docker/daemon/events"
|
||||
"github.com/docker/docker/daemon/exec"
|
||||
_ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers
|
||||
"github.com/docker/docker/daemon/images"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/network"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/moby/buildkit/util/resolver"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
// register graph drivers
|
||||
_ "github.com/docker/docker/daemon/graphdriver/register"
|
||||
"github.com/docker/docker/daemon/stats"
|
||||
dmetadata "github.com/docker/docker/distribution/metadata"
|
||||
"github.com/docker/docker/dockerversion"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/image"
|
||||
"github.com/docker/docker/layer"
|
||||
"github.com/docker/docker/libcontainerd"
|
||||
|
@ -57,6 +48,7 @@ import (
|
|||
"github.com/docker/docker/libnetwork"
|
||||
"github.com/docker/docker/libnetwork/cluster"
|
||||
nwconfig "github.com/docker/docker/libnetwork/config"
|
||||
"github.com/docker/docker/pkg/fileutils"
|
||||
"github.com/docker/docker/pkg/idtools"
|
||||
"github.com/docker/docker/pkg/plugingetter"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
|
@ -67,9 +59,15 @@ import (
|
|||
"github.com/docker/docker/registry"
|
||||
"github.com/docker/docker/runconfig"
|
||||
volumesservice "github.com/docker/docker/volume/service"
|
||||
"github.com/moby/buildkit/util/resolver"
|
||||
"github.com/moby/locker"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
)
|
||||
|
||||
// ContainersNamespace is the name of the namespace used for users containers
|
||||
|
@ -120,10 +118,11 @@ type Daemon struct {
|
|||
seccompProfile []byte
|
||||
seccompProfilePath string
|
||||
|
||||
diskUsageRunning int32
|
||||
pruneRunning int32
|
||||
hosts map[string]bool // hosts stores the addresses the daemon is listening on
|
||||
startupDone chan struct{}
|
||||
usage singleflight.Group
|
||||
|
||||
pruneRunning int32
|
||||
hosts map[string]bool // hosts stores the addresses the daemon is listening on
|
||||
startupDone chan struct{}
|
||||
|
||||
attachmentStore network.AttachmentStore
|
||||
attachableNetworkLock *locker.Locker
|
||||
|
|
|
@ -3,36 +3,47 @@ package daemon // import "github.com/docker/docker/daemon"
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/docker/docker/api/server/router/system"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// SystemDiskUsage returns information about the daemon data disk usage
|
||||
func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) {
|
||||
if !atomic.CompareAndSwapInt32(&daemon.diskUsageRunning, 0, 1) {
|
||||
return nil, fmt.Errorf("a disk usage operation is already running")
|
||||
// ContainerDiskUsage returns information about container data disk usage.
|
||||
func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) {
|
||||
ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) {
|
||||
// Retrieve container list
|
||||
containers, err := daemon.Containers(&types.ContainerListOptions{
|
||||
Size: true,
|
||||
All: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve container list: %v", err)
|
||||
}
|
||||
return containers, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*types.Container), nil
|
||||
}
|
||||
defer atomic.StoreInt32(&daemon.diskUsageRunning, 0)
|
||||
}
|
||||
|
||||
// SystemDiskUsage returns information about the daemon data disk usage.
|
||||
// Callers must not mutate contents of the returned fields.
|
||||
func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
var containers []*types.Container
|
||||
if opts.Containers {
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
// Retrieve container list
|
||||
containers, err = daemon.Containers(&types.ContainerListOptions{
|
||||
Size: true,
|
||||
All: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve container list: %v", err)
|
||||
}
|
||||
return nil
|
||||
containers, err = daemon.ContainerDiskUsage(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -43,16 +54,8 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
|
|||
if opts.Images {
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
// Get all top images with extra attributes
|
||||
images, err = daemon.imageService.Images(ctx, types.ImageListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
SharedSize: true,
|
||||
ContainerCount: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve image list: %v", err)
|
||||
}
|
||||
return nil
|
||||
images, err = daemon.imageService.ImageDiskUsage(ctx)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
|
|
|
@ -2,10 +2,13 @@ package images // import "github.com/docker/docker/daemon/images"
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/container"
|
||||
daemonevents "github.com/docker/docker/daemon/events"
|
||||
"github.com/docker/docker/distribution"
|
||||
|
@ -19,6 +22,7 @@ import (
|
|||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type containerStore interface {
|
||||
|
@ -86,6 +90,7 @@ type ImageService struct {
|
|||
leases leases.Manager
|
||||
content content.Store
|
||||
contentNamespace string
|
||||
usage singleflight.Group
|
||||
}
|
||||
|
||||
// DistributionServices provides daemon image storage services
|
||||
|
@ -195,25 +200,36 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer, containerOS string) e
|
|||
// LayerDiskUsage returns the number of bytes used by layer stores
|
||||
// called from disk_usage.go
|
||||
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
|
||||
var allLayersSize int64
|
||||
layerRefs := i.getLayerRefs()
|
||||
allLayers := i.layerStore.Map()
|
||||
for _, l := range allLayers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return allLayersSize, ctx.Err()
|
||||
default:
|
||||
size, err := l.DiffSize()
|
||||
if err == nil {
|
||||
if _, ok := layerRefs[l.ChainID()]; ok {
|
||||
allLayersSize += size
|
||||
ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
|
||||
var allLayersSize int64
|
||||
layerRefs := i.getLayerRefs()
|
||||
allLayers := i.layerStore.Map()
|
||||
for _, l := range allLayers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return allLayersSize, ctx.Err()
|
||||
default:
|
||||
size, err := l.DiffSize()
|
||||
if err == nil {
|
||||
if _, ok := layerRefs[l.ChainID()]; ok {
|
||||
allLayersSize += size
|
||||
}
|
||||
} else {
|
||||
logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
|
||||
}
|
||||
} else {
|
||||
logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
|
||||
}
|
||||
}
|
||||
return allLayersSize, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return 0, res.Err
|
||||
}
|
||||
return res.Val.(int64), nil
|
||||
}
|
||||
return allLayersSize, nil
|
||||
}
|
||||
|
||||
func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
|
||||
|
@ -237,6 +253,31 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
|
|||
return layerRefs
|
||||
}
|
||||
|
||||
// ImageDiskUsage returns information about image data disk usage.
|
||||
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
|
||||
ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
|
||||
// Get all top images with extra attributes
|
||||
images, err := i.Images(ctx, types.ImageListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
SharedSize: true,
|
||||
ContainerCount: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve image list: %v", err)
|
||||
}
|
||||
return images, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*types.ImageSummary), nil
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig values
|
||||
//
|
||||
// called from reload.go
|
||||
|
|
212
vendor/golang.org/x/sync/singleflight/singleflight.go
generated
vendored
Normal file
212
vendor/golang.org/x/sync/singleflight/singleflight.go
generated
vendored
Normal file
|
@ -0,0 +1,212 @@
|
|||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package singleflight provides a duplicate function call suppression
|
||||
// mechanism.
|
||||
package singleflight // import "golang.org/x/sync/singleflight"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// errGoexit indicates the runtime.Goexit was called in
|
||||
// the user given function.
|
||||
var errGoexit = errors.New("runtime.Goexit was called")
|
||||
|
||||
// A panicError is an arbitrary value recovered from a panic
|
||||
// with the stack trace during the execution of given function.
|
||||
type panicError struct {
|
||||
value interface{}
|
||||
stack []byte
|
||||
}
|
||||
|
||||
// Error implements error interface.
|
||||
func (p *panicError) Error() string {
|
||||
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
|
||||
}
|
||||
|
||||
func newPanicError(v interface{}) error {
|
||||
stack := debug.Stack()
|
||||
|
||||
// The first line of the stack trace is of the form "goroutine N [status]:"
|
||||
// but by the time the panic reaches Do the goroutine may no longer exist
|
||||
// and its status will have changed. Trim out the misleading line.
|
||||
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
|
||||
stack = stack[line+1:]
|
||||
}
|
||||
return &panicError{value: v, stack: stack}
|
||||
}
|
||||
|
||||
// call is an in-flight or completed singleflight.Do call
|
||||
type call struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// These fields are written once before the WaitGroup is done
|
||||
// and are only read after the WaitGroup is done.
|
||||
val interface{}
|
||||
err error
|
||||
|
||||
// forgotten indicates whether Forget was called with this call's key
|
||||
// while the call was still in flight.
|
||||
forgotten bool
|
||||
|
||||
// These fields are read and written with the singleflight
|
||||
// mutex held before the WaitGroup is done, and are read but
|
||||
// not written after the WaitGroup is done.
|
||||
dups int
|
||||
chans []chan<- Result
|
||||
}
|
||||
|
||||
// Group represents a class of work and forms a namespace in
|
||||
// which units of work can be executed with duplicate suppression.
|
||||
type Group struct {
|
||||
mu sync.Mutex // protects m
|
||||
m map[string]*call // lazily initialized
|
||||
}
|
||||
|
||||
// Result holds the results of Do, so they can be passed
|
||||
// on a channel.
|
||||
type Result struct {
|
||||
Val interface{}
|
||||
Err error
|
||||
Shared bool
|
||||
}
|
||||
|
||||
// Do executes and returns the results of the given function, making
|
||||
// sure that only one execution is in-flight for a given key at a
|
||||
// time. If a duplicate comes in, the duplicate caller waits for the
|
||||
// original to complete and receives the same results.
|
||||
// The return value shared indicates whether v was given to multiple callers.
|
||||
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
g.mu.Unlock()
|
||||
c.wg.Wait()
|
||||
|
||||
if e, ok := c.err.(*panicError); ok {
|
||||
panic(e)
|
||||
} else if c.err == errGoexit {
|
||||
runtime.Goexit()
|
||||
}
|
||||
return c.val, c.err, true
|
||||
}
|
||||
c := new(call)
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
g.doCall(c, key, fn)
|
||||
return c.val, c.err, c.dups > 0
|
||||
}
|
||||
|
||||
// DoChan is like Do but returns a channel that will receive the
|
||||
// results when they are ready.
|
||||
//
|
||||
// The returned channel will not be closed.
|
||||
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
||||
ch := make(chan Result, 1)
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
c.chans = append(c.chans, ch)
|
||||
g.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
c := &call{chans: []chan<- Result{ch}}
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
go g.doCall(c, key, fn)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// doCall handles the single call for a key.
|
||||
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
||||
normalReturn := false
|
||||
recovered := false
|
||||
|
||||
// use double-defer to distinguish panic from runtime.Goexit,
|
||||
// more details see https://golang.org/cl/134395
|
||||
defer func() {
|
||||
// the given function invoked runtime.Goexit
|
||||
if !normalReturn && !recovered {
|
||||
c.err = errGoexit
|
||||
}
|
||||
|
||||
c.wg.Done()
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
if !c.forgotten {
|
||||
delete(g.m, key)
|
||||
}
|
||||
|
||||
if e, ok := c.err.(*panicError); ok {
|
||||
// In order to prevent the waiting channels from being blocked forever,
|
||||
// needs to ensure that this panic cannot be recovered.
|
||||
if len(c.chans) > 0 {
|
||||
go panic(e)
|
||||
select {} // Keep this goroutine around so that it will appear in the crash dump.
|
||||
} else {
|
||||
panic(e)
|
||||
}
|
||||
} else if c.err == errGoexit {
|
||||
// Already in the process of goexit, no need to call again
|
||||
} else {
|
||||
// Normal return
|
||||
for _, ch := range c.chans {
|
||||
ch <- Result{c.val, c.err, c.dups > 0}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
if !normalReturn {
|
||||
// Ideally, we would wait to take a stack trace until we've determined
|
||||
// whether this is a panic or a runtime.Goexit.
|
||||
//
|
||||
// Unfortunately, the only way we can distinguish the two is to see
|
||||
// whether the recover stopped the goroutine from terminating, and by
|
||||
// the time we know that, the part of the stack trace relevant to the
|
||||
// panic has been discarded.
|
||||
if r := recover(); r != nil {
|
||||
c.err = newPanicError(r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
c.val, c.err = fn()
|
||||
normalReturn = true
|
||||
}()
|
||||
|
||||
if !normalReturn {
|
||||
recovered = true
|
||||
}
|
||||
}
|
||||
|
||||
// Forget tells the singleflight to forget about a key. Future calls
|
||||
// to Do for this key will call the function rather than waiting for
|
||||
// an earlier call to complete.
|
||||
func (g *Group) Forget(key string) {
|
||||
g.mu.Lock()
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.forgotten = true
|
||||
}
|
||||
delete(g.m, key)
|
||||
g.mu.Unlock()
|
||||
}
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/docker/docker/volume/service/opts"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type ds interface {
|
||||
|
@ -36,6 +37,7 @@ type VolumesService struct {
|
|||
ds ds
|
||||
pruneRunning int32
|
||||
eventLogger VolumeEventLogger
|
||||
usage singleflight.Group
|
||||
}
|
||||
|
||||
// NewVolumeService creates a new volume service
|
||||
|
@ -182,14 +184,25 @@ var acceptedListFilters = map[string]bool{
|
|||
// volumes with mount options are not really local even if they are using the
|
||||
// local driver.
|
||||
func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*types.Volume, error) {
|
||||
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
|
||||
dv, ok := v.(volume.DetailedVolume)
|
||||
return ok && len(dv.Options()) == 0
|
||||
})))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) {
|
||||
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
|
||||
dv, ok := v.(volume.DetailedVolume)
|
||||
return ok && len(dv.Options()) == 0
|
||||
})))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*types.Volume), nil
|
||||
}
|
||||
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
|
||||
}
|
||||
|
||||
// Prune removes (local) volumes which match the past in filter arguments.
|
||||
|
|
Loading…
Reference in a new issue