Browse Source

daemon,volume: share disk usage computations

Signed-off-by: Roman Volosatovs <roman.volosatovs@docker.com>
Roman Volosatovs 4 years ago
parent
commit
135cec5d4d

+ 6 - 4
daemon/daemon.go

@@ -65,6 +65,7 @@ import (
 	"github.com/sirupsen/logrus"
 	"go.etcd.io/bbolt"
 	"golang.org/x/sync/semaphore"
+	"golang.org/x/sync/singleflight"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/backoff"
 )
@@ -117,10 +118,11 @@ type Daemon struct {
 	seccompProfile     []byte
 	seccompProfilePath string
 
-	diskUsageRunning int32
-	pruneRunning     int32
-	hosts            map[string]bool // hosts stores the addresses the daemon is listening on
-	startupDone      chan struct{}
+	usage singleflight.Group
+
+	pruneRunning int32
+	hosts        map[string]bool // hosts stores the addresses the daemon is listening on
+	startupDone  chan struct{}
 
 	attachmentStore       network.AttachmentStore
 	attachableNetworkLock *locker.Locker

+ 29 - 26
daemon/disk_usage.go

@@ -3,36 +3,47 @@ package daemon // import "github.com/docker/docker/daemon"
 import (
 	"context"
 	"fmt"
-	"sync/atomic"
 
 	"github.com/docker/docker/api/server/router/system"
 	"github.com/docker/docker/api/types"
-	"github.com/docker/docker/api/types/filters"
 	"golang.org/x/sync/errgroup"
 )
 
-// SystemDiskUsage returns information about the daemon data disk usage
-func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) {
-	if !atomic.CompareAndSwapInt32(&daemon.diskUsageRunning, 0, 1) {
-		return nil, fmt.Errorf("a disk usage operation is already running")
+// ContainerDiskUsage returns information about container data disk usage.
+func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) {
+	ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) {
+		// Retrieve container list
+		containers, err := daemon.Containers(&types.ContainerListOptions{
+			Size: true,
+			All:  true,
+		})
+		if err != nil {
+			return nil, fmt.Errorf("failed to retrieve container list: %v", err)
+		}
+		return containers, nil
+	})
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case res := <-ch:
+		if res.Err != nil {
+			return nil, res.Err
+		}
+		return res.Val.([]*types.Container), nil
 	}
-	defer atomic.StoreInt32(&daemon.diskUsageRunning, 0)
+}
 
+// SystemDiskUsage returns information about the daemon data disk usage.
+// Callers must not mutate contents of the returned fields.
+func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) {
 	eg, ctx := errgroup.WithContext(ctx)
 
 	var containers []*types.Container
 	if opts.Containers {
 		eg.Go(func() error {
 			var err error
-			// Retrieve container list
-			containers, err = daemon.Containers(&types.ContainerListOptions{
-				Size: true,
-				All:  true,
-			})
-			if err != nil {
-				return fmt.Errorf("failed to retrieve container list: %v", err)
-			}
-			return nil
+			containers, err = daemon.ContainerDiskUsage(ctx)
+			return err
 		})
 	}
 
@@ -43,16 +54,8 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
 	if opts.Images {
 		eg.Go(func() error {
 			var err error
-			// Get all top images with extra attributes
-			images, err = daemon.imageService.Images(ctx, types.ImageListOptions{
-				Filters:        filters.NewArgs(),
-				SharedSize:     true,
-				ContainerCount: true,
-			})
-			if err != nil {
-				return fmt.Errorf("failed to retrieve image list: %v", err)
-			}
-			return nil
+			images, err = daemon.imageService.ImageDiskUsage(ctx)
+			return err
 		})
 		eg.Go(func() error {
 			var err error

+ 56 - 15
daemon/images/service.go

@@ -2,10 +2,13 @@ package images // import "github.com/docker/docker/daemon/images"
 
 import (
 	"context"
+	"fmt"
 	"os"
 
 	"github.com/containerd/containerd/content"
 	"github.com/containerd/containerd/leases"
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/container"
 	daemonevents "github.com/docker/docker/daemon/events"
 	"github.com/docker/docker/distribution"
@@ -19,6 +22,7 @@ import (
 	digest "github.com/opencontainers/go-digest"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
+	"golang.org/x/sync/singleflight"
 )
 
 type containerStore interface {
@@ -86,6 +90,7 @@ type ImageService struct {
 	leases                    leases.Manager
 	content                   content.Store
 	contentNamespace          string
+	usage                     singleflight.Group
 }
 
 // DistributionServices provides daemon image storage services
@@ -195,25 +200,36 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer, containerOS string) e
 // LayerDiskUsage returns the number of bytes used by layer stores
 // called from disk_usage.go
 func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
-	var allLayersSize int64
-	layerRefs := i.getLayerRefs()
-	allLayers := i.layerStore.Map()
-	for _, l := range allLayers {
-		select {
-		case <-ctx.Done():
-			return allLayersSize, ctx.Err()
-		default:
-			size, err := l.DiffSize()
-			if err == nil {
-				if _, ok := layerRefs[l.ChainID()]; ok {
-					allLayersSize += size
+	ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
+		var allLayersSize int64
+		layerRefs := i.getLayerRefs()
+		allLayers := i.layerStore.Map()
+		for _, l := range allLayers {
+			select {
+			case <-ctx.Done():
+				return allLayersSize, ctx.Err()
+			default:
+				size, err := l.DiffSize()
+				if err == nil {
+					if _, ok := layerRefs[l.ChainID()]; ok {
+						allLayersSize += size
+					}
+				} else {
+					logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
 				}
-			} else {
-				logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
 			}
 		}
+		return allLayersSize, nil
+	})
+	select {
+	case <-ctx.Done():
+		return 0, ctx.Err()
+	case res := <-ch:
+		if res.Err != nil {
+			return 0, res.Err
+		}
+		return res.Val.(int64), nil
 	}
-	return allLayersSize, nil
 }
 
 func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
@@ -237,6 +253,31 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
 	return layerRefs
 }
 
+// ImageDiskUsage returns information about image data disk usage.
+func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
+	ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
+		// Get all top images with extra attributes
+		images, err := i.Images(ctx, types.ImageListOptions{
+			Filters:        filters.NewArgs(),
+			SharedSize:     true,
+			ContainerCount: true,
+		})
+		if err != nil {
+			return nil, fmt.Errorf("failed to retrieve image list: %v", err)
+		}
+		return images, nil
+	})
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case res := <-ch:
+		if res.Err != nil {
+			return nil, res.Err
+		}
+		return res.Val.([]*types.ImageSummary), nil
+	}
+}
+
 // UpdateConfig values
 //
 // called from reload.go

+ 212 - 0
vendor/golang.org/x/sync/singleflight/singleflight.go

@@ -0,0 +1,212 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package singleflight provides a duplicate function call suppression
+// mechanism.
+package singleflight // import "golang.org/x/sync/singleflight"
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"runtime"
+	"runtime/debug"
+	"sync"
+)
+
+// errGoexit indicates the runtime.Goexit was called in
+// the user given function.
+var errGoexit = errors.New("runtime.Goexit was called")
+
+// A panicError is an arbitrary value recovered from a panic
+// with the stack trace during the execution of given function.
+type panicError struct {
+	value interface{}
+	stack []byte
+}
+
+// Error implements error interface.
+func (p *panicError) Error() string {
+	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
+}
+
+func newPanicError(v interface{}) error {
+	stack := debug.Stack()
+
+	// The first line of the stack trace is of the form "goroutine N [status]:"
+	// but by the time the panic reaches Do the goroutine may no longer exist
+	// and its status will have changed. Trim out the misleading line.
+	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
+		stack = stack[line+1:]
+	}
+	return &panicError{value: v, stack: stack}
+}
+
+// call is an in-flight or completed singleflight.Do call
+type call struct {
+	wg sync.WaitGroup
+
+	// These fields are written once before the WaitGroup is done
+	// and are only read after the WaitGroup is done.
+	val interface{}
+	err error
+
+	// forgotten indicates whether Forget was called with this call's key
+	// while the call was still in flight.
+	forgotten bool
+
+	// These fields are read and written with the singleflight
+	// mutex held before the WaitGroup is done, and are read but
+	// not written after the WaitGroup is done.
+	dups  int
+	chans []chan<- Result
+}
+
+// Group represents a class of work and forms a namespace in
+// which units of work can be executed with duplicate suppression.
+type Group struct {
+	mu sync.Mutex       // protects m
+	m  map[string]*call // lazily initialized
+}
+
+// Result holds the results of Do, so they can be passed
+// on a channel.
+type Result struct {
+	Val    interface{}
+	Err    error
+	Shared bool
+}
+
+// Do executes and returns the results of the given function, making
+// sure that only one execution is in-flight for a given key at a
+// time. If a duplicate comes in, the duplicate caller waits for the
+// original to complete and receives the same results.
+// The return value shared indicates whether v was given to multiple callers.
+func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
+	g.mu.Lock()
+	if g.m == nil {
+		g.m = make(map[string]*call)
+	}
+	if c, ok := g.m[key]; ok {
+		c.dups++
+		g.mu.Unlock()
+		c.wg.Wait()
+
+		if e, ok := c.err.(*panicError); ok {
+			panic(e)
+		} else if c.err == errGoexit {
+			runtime.Goexit()
+		}
+		return c.val, c.err, true
+	}
+	c := new(call)
+	c.wg.Add(1)
+	g.m[key] = c
+	g.mu.Unlock()
+
+	g.doCall(c, key, fn)
+	return c.val, c.err, c.dups > 0
+}
+
+// DoChan is like Do but returns a channel that will receive the
+// results when they are ready.
+//
+// The returned channel will not be closed.
+func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
+	ch := make(chan Result, 1)
+	g.mu.Lock()
+	if g.m == nil {
+		g.m = make(map[string]*call)
+	}
+	if c, ok := g.m[key]; ok {
+		c.dups++
+		c.chans = append(c.chans, ch)
+		g.mu.Unlock()
+		return ch
+	}
+	c := &call{chans: []chan<- Result{ch}}
+	c.wg.Add(1)
+	g.m[key] = c
+	g.mu.Unlock()
+
+	go g.doCall(c, key, fn)
+
+	return ch
+}
+
+// doCall handles the single call for a key.
+func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
+	normalReturn := false
+	recovered := false
+
+	// use double-defer to distinguish panic from runtime.Goexit,
+	// more details see https://golang.org/cl/134395
+	defer func() {
+		// the given function invoked runtime.Goexit
+		if !normalReturn && !recovered {
+			c.err = errGoexit
+		}
+
+		c.wg.Done()
+		g.mu.Lock()
+		defer g.mu.Unlock()
+		if !c.forgotten {
+			delete(g.m, key)
+		}
+
+		if e, ok := c.err.(*panicError); ok {
+			// In order to prevent the waiting channels from being blocked forever,
+			// needs to ensure that this panic cannot be recovered.
+			if len(c.chans) > 0 {
+				go panic(e)
+				select {} // Keep this goroutine around so that it will appear in the crash dump.
+			} else {
+				panic(e)
+			}
+		} else if c.err == errGoexit {
+			// Already in the process of goexit, no need to call again
+		} else {
+			// Normal return
+			for _, ch := range c.chans {
+				ch <- Result{c.val, c.err, c.dups > 0}
+			}
+		}
+	}()
+
+	func() {
+		defer func() {
+			if !normalReturn {
+				// Ideally, we would wait to take a stack trace until we've determined
+				// whether this is a panic or a runtime.Goexit.
+				//
+				// Unfortunately, the only way we can distinguish the two is to see
+				// whether the recover stopped the goroutine from terminating, and by
+				// the time we know that, the part of the stack trace relevant to the
+				// panic has been discarded.
+				if r := recover(); r != nil {
+					c.err = newPanicError(r)
+				}
+			}
+		}()
+
+		c.val, c.err = fn()
+		normalReturn = true
+	}()
+
+	if !normalReturn {
+		recovered = true
+	}
+}
+
+// Forget tells the singleflight to forget about a key.  Future calls
+// to Do for this key will call the function rather than waiting for
+// an earlier call to complete.
+func (g *Group) Forget(key string) {
+	g.mu.Lock()
+	if c, ok := g.m[key]; ok {
+		c.forgotten = true
+	}
+	delete(g.m, key)
+	g.mu.Unlock()
+}

+ 20 - 7
volume/service/service.go

@@ -17,6 +17,7 @@ import (
 	"github.com/docker/docker/volume/service/opts"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
+	"golang.org/x/sync/singleflight"
 )
 
 type ds interface {
@@ -36,6 +37,7 @@ type VolumesService struct {
 	ds           ds
 	pruneRunning int32
 	eventLogger  VolumeEventLogger
+	usage        singleflight.Group
 }
 
 // NewVolumeService creates a new volume service
@@ -182,14 +184,25 @@ var acceptedListFilters = map[string]bool{
 // volumes with mount options are not really local even if they are using the
 // local driver.
 func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*types.Volume, error) {
-	ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
-		dv, ok := v.(volume.DetailedVolume)
-		return ok && len(dv.Options()) == 0
-	})))
-	if err != nil {
-		return nil, err
+	ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) {
+		ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
+			dv, ok := v.(volume.DetailedVolume)
+			return ok && len(dv.Options()) == 0
+		})))
+		if err != nil {
+			return nil, err
+		}
+		return s.volumesToAPI(ctx, ls, calcSize(true)), nil
+	})
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case res := <-ch:
+		if res.Err != nil {
+			return nil, res.Err
+		}
+		return res.Val.([]*types.Volume), nil
 	}
-	return s.volumesToAPI(ctx, ls, calcSize(true)), nil
 }
 
 // Prune removes (local) volumes which match the past in filter arguments.