diff --git a/daemon/containerd/service.go b/daemon/containerd/service.go index 04960bcb55..ffd56569a6 100644 --- a/daemon/containerd/service.go +++ b/daemon/containerd/service.go @@ -6,22 +6,18 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/snapshots" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" "github.com/docker/docker/container" "github.com/docker/docker/daemon/images" "github.com/docker/docker/errdefs" "github.com/docker/docker/image" "github.com/docker/docker/layer" "github.com/pkg/errors" - "golang.org/x/sync/singleflight" ) // ImageService implements daemon.ImageService type ImageService struct { client *containerd.Client snapshotter string - usage singleflight.Group } // NewService creates a new ImageService. @@ -105,53 +101,17 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error { // LayerDiskUsage returns the number of bytes used by layer stores // called from disk_usage.go func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) { - ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) { - var allLayersSize int64 - snapshotter := i.client.SnapshotService(i.snapshotter) - snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error { - usage, err := snapshotter.Usage(ctx, info.Name) - if err != nil { - return err - } - allLayersSize += usage.Size - return nil - }) - return allLayersSize, nil - }) - select { - case <-ctx.Done(): - return 0, ctx.Err() - case res := <-ch: - if res.Err != nil { - return 0, res.Err - } - return res.Val.(int64), nil - } -} - -// ImageDiskUsage returns information about image data disk usage. -func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) { - ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) { - // Get all top images with extra attributes - imgs, err := i.Images(ctx, types.ImageListOptions{ - Filters: filters.NewArgs(), - SharedSize: true, - ContainerCount: true, - }) + var allLayersSize int64 + snapshotter := i.client.SnapshotService(i.snapshotter) + snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error { + usage, err := snapshotter.Usage(ctx, info.Name) if err != nil { - return nil, errors.Wrap(err, "failed to retrieve image list") + return err } - return imgs, nil + allLayersSize += usage.Size + return nil }) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-ch: - if res.Err != nil { - return nil, res.Err - } - return res.Val.([]*types.ImageSummary), nil - } + return allLayersSize, nil } // UpdateConfig values diff --git a/daemon/daemon.go b/daemon/daemon.go index 8c7a4b51b8..d61f7ff078 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -25,6 +25,7 @@ import ( "github.com/docker/docker/api/types" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/volume" "github.com/docker/docker/builder" "github.com/docker/docker/container" "github.com/docker/docker/daemon/config" @@ -62,10 +63,10 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/bbolt" "golang.org/x/sync/semaphore" - "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials/insecure" + "resenje.org/singleflight" ) // Daemon holds information about the Docker daemon. @@ -105,7 +106,10 @@ type Daemon struct { seccompProfile []byte seccompProfilePath string - usage singleflight.Group + usageContainers singleflight.Group[struct{}, []*types.Container] + usageImages singleflight.Group[struct{}, []*types.ImageSummary] + usageVolumes singleflight.Group[struct{}, []*volume.Volume] + usageLayer singleflight.Group[struct{}, int64] pruneRunning int32 hosts map[string]bool // hosts stores the addresses the daemon is listening on diff --git a/daemon/disk_usage.go b/daemon/disk_usage.go index 4b7ca33961..b4c2b277d0 100644 --- a/daemon/disk_usage.go +++ b/daemon/disk_usage.go @@ -6,15 +6,18 @@ import ( "github.com/docker/docker/api/server/router/system" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/volume" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) -// ContainerDiskUsage returns information about container data disk usage. -func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) { - ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) { +// containerDiskUsage obtains information about container data disk usage +// and makes sure that only one calculation is performed at the same time. +func (daemon *Daemon) containerDiskUsage(ctx context.Context) ([]*types.Container, error) { + res, _, err := daemon.usageContainers.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.Container, error) { // Retrieve container list - containers, err := daemon.Containers(context.TODO(), &types.ContainerListOptions{ + containers, err := daemon.Containers(ctx, &types.ContainerListOptions{ Size: true, All: true, }) @@ -23,15 +26,52 @@ func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Containe } return containers, nil }) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-ch: - if res.Err != nil { - return nil, res.Err + return res, err +} + +// imageDiskUsage obtains information about image data disk usage from image service +// and makes sure that only one calculation is performed at the same time. +func (daemon *Daemon) imageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) { + imgs, _, err := daemon.usageImages.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.ImageSummary, error) { + // Get all top images with extra attributes + imgs, err := daemon.imageService.Images(ctx, types.ImageListOptions{ + Filters: filters.NewArgs(), + SharedSize: true, + ContainerCount: true, + }) + if err != nil { + return nil, errors.Wrap(err, "failed to retrieve image list") } - return res.Val.([]*types.Container), nil - } + return imgs, nil + }) + + return imgs, err +} + +// localVolumesSize obtains information about volume disk usage from volumes service +// and makes sure that only one size calculation is performed at the same time. +func (daemon *Daemon) localVolumesSize(ctx context.Context) ([]*volume.Volume, error) { + volumes, _, err := daemon.usageVolumes.Do(ctx, struct{}{}, func(ctx context.Context) ([]*volume.Volume, error) { + volumes, err := daemon.volumes.LocalVolumesSize(ctx) + if err != nil { + return nil, err + } + return volumes, nil + }) + return volumes, err +} + +// layerDiskUsage obtains information about layer disk usage from image service +// and makes sure that only one size calculation is performed at the same time. +func (daemon *Daemon) layerDiskUsage(ctx context.Context) (int64, error) { + usage, _, err := daemon.usageLayer.Do(ctx, struct{}{}, func(ctx context.Context) (int64, error) { + usage, err := daemon.imageService.LayerDiskUsage(ctx) + if err != nil { + return 0, err + } + return usage, nil + }) + return usage, err } // SystemDiskUsage returns information about the daemon data disk usage. @@ -43,7 +83,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage if opts.Containers { eg.Go(func() error { var err error - containers, err = daemon.ContainerDiskUsage(ctx) + containers, err = daemon.containerDiskUsage(ctx) return err }) } @@ -55,12 +95,12 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage if opts.Images { eg.Go(func() error { var err error - images, err = daemon.imageService.ImageDiskUsage(ctx) + images, err = daemon.imageDiskUsage(ctx) return err }) eg.Go(func() error { var err error - layersSize, err = daemon.imageService.LayerDiskUsage(ctx) + layersSize, err = daemon.layerDiskUsage(ctx) return err }) } @@ -69,7 +109,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage if opts.Volumes { eg.Go(func() error { var err error - volumes, err = daemon.volumes.LocalVolumesSize(ctx) + volumes, err = daemon.localVolumesSize(ctx) return err }) } diff --git a/daemon/image_service.go b/daemon/image_service.go index 78fec7f960..3af584bdba 100644 --- a/daemon/image_service.go +++ b/daemon/image_service.go @@ -35,7 +35,6 @@ type ImageService interface { LogImageEvent(imageID, refName, action string) LogImageEventWithAttributes(imageID, refName, action string, attributes map[string]string) CountImages() int - ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error) ImportImage(ctx context.Context, src string, repository string, platform *v1.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error TagImage(imageName, repository, tag string) (string, error) diff --git a/daemon/images/image_list.go b/daemon/images/image_list.go index f8dd2eb3b1..b116f8b11a 100644 --- a/daemon/images/image_list.go +++ b/daemon/images/image_list.go @@ -78,6 +78,12 @@ func (i *ImageService) Images(ctx context.Context, opts types.ImageListOptions) allContainers []*container.Container ) for id, img := range selectedImages { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if beforeFilter != nil { if img.Created.Equal(beforeFilter.Created) || img.Created.After(beforeFilter.Created) { continue diff --git a/daemon/images/service.go b/daemon/images/service.go index 804492d8b8..c2ebc75e1b 100644 --- a/daemon/images/service.go +++ b/daemon/images/service.go @@ -6,8 +6,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/leases" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" "github.com/docker/docker/container" daemonevents "github.com/docker/docker/daemon/events" "github.com/docker/docker/distribution/metadata" @@ -19,7 +17,6 @@ import ( "github.com/docker/libtrust" "github.com/opencontainers/go-digest" "github.com/pkg/errors" - "golang.org/x/sync/singleflight" ) type containerStore interface { @@ -85,7 +82,6 @@ type ImageService struct { leases leases.Manager content content.Store contentNamespace string - usage singleflight.Group } // DistributionServices provides daemon image storage services @@ -191,32 +187,21 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error { // LayerDiskUsage returns the number of bytes used by layer stores // called from disk_usage.go func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) { - ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) { - var allLayersSize int64 - layerRefs := i.getLayerRefs() - allLayers := i.layerStore.Map() - for _, l := range allLayers { - select { - case <-ctx.Done(): - return allLayersSize, ctx.Err() - default: - size := l.DiffSize() - if _, ok := layerRefs[l.ChainID()]; ok { - allLayersSize += size - } + var allLayersSize int64 + layerRefs := i.getLayerRefs() + allLayers := i.layerStore.Map() + for _, l := range allLayers { + select { + case <-ctx.Done(): + return allLayersSize, ctx.Err() + default: + size := l.DiffSize() + if _, ok := layerRefs[l.ChainID()]; ok { + allLayersSize += size } } - return allLayersSize, nil - }) - select { - case <-ctx.Done(): - return 0, ctx.Err() - case res := <-ch: - if res.Err != nil { - return 0, res.Err - } - return res.Val.(int64), nil } + return allLayersSize, nil } func (i *ImageService) getLayerRefs() map[layer.ChainID]int { @@ -240,31 +225,6 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int { return layerRefs } -// ImageDiskUsage returns information about image data disk usage. -func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) { - ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) { - // Get all top images with extra attributes - imgs, err := i.Images(ctx, types.ImageListOptions{ - Filters: filters.NewArgs(), - SharedSize: true, - ContainerCount: true, - }) - if err != nil { - return nil, errors.Wrap(err, "failed to retrieve image list") - } - return imgs, nil - }) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-ch: - if res.Err != nil { - return nil, res.Err - } - return res.Val.([]*types.ImageSummary), nil - } -} - // UpdateConfig values // // called from reload.go diff --git a/vendor.mod b/vendor.mod index d5d24ca8a8..ba1e34eef0 100644 --- a/vendor.mod +++ b/vendor.mod @@ -89,6 +89,7 @@ require ( google.golang.org/genproto v0.0.0-20220706185917-7780775163c4 google.golang.org/grpc v1.50.1 gotest.tools/v3 v3.4.0 + resenje.org/singleflight v0.3.0 ) require ( diff --git a/vendor.sum b/vendor.sum index 54db09e93f..fde4304d8f 100644 --- a/vendor.sum +++ b/vendor.sum @@ -1888,6 +1888,8 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +resenje.org/singleflight v0.3.0 h1:USJtsAN6HTUA827ksc+2Kcr7QZ4HBq/z/P8ugVbqKFY= +resenje.org/singleflight v0.3.0/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go deleted file mode 100644 index 8473fb7922..0000000000 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight // import "golang.org/x/sync/singleflight" - -import ( - "bytes" - "errors" - "fmt" - "runtime" - "runtime/debug" - "sync" -) - -// errGoexit indicates the runtime.Goexit was called in -// the user given function. -var errGoexit = errors.New("runtime.Goexit was called") - -// A panicError is an arbitrary value recovered from a panic -// with the stack trace during the execution of given function. -type panicError struct { - value interface{} - stack []byte -} - -// Error implements error interface. -func (p *panicError) Error() string { - return fmt.Sprintf("%v\n\n%s", p.value, p.stack) -} - -func newPanicError(v interface{}) error { - stack := debug.Stack() - - // The first line of the stack trace is of the form "goroutine N [status]:" - // but by the time the panic reaches Do the goroutine may no longer exist - // and its status will have changed. Trim out the misleading line. - if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { - stack = stack[line+1:] - } - return &panicError{value: v, stack: stack} -} - -// call is an in-flight or completed singleflight.Do call -type call struct { - wg sync.WaitGroup - - // These fields are written once before the WaitGroup is done - // and are only read after the WaitGroup is done. - val interface{} - err error - - // These fields are read and written with the singleflight - // mutex held before the WaitGroup is done, and are read but - // not written after the WaitGroup is done. - dups int - chans []chan<- Result -} - -// Group represents a class of work and forms a namespace in -// which units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Result holds the results of Do, so they can be passed -// on a channel. -type Result struct { - Val interface{} - Err error - Shared bool -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -// The return value shared indicates whether v was given to multiple callers. -func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - g.mu.Unlock() - c.wg.Wait() - - if e, ok := c.err.(*panicError); ok { - panic(e) - } else if c.err == errGoexit { - runtime.Goexit() - } - return c.val, c.err, true - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - g.doCall(c, key, fn) - return c.val, c.err, c.dups > 0 -} - -// DoChan is like Do but returns a channel that will receive the -// results when they are ready. -// -// The returned channel will not be closed. -func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { - ch := make(chan Result, 1) - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - c.chans = append(c.chans, ch) - g.mu.Unlock() - return ch - } - c := &call{chans: []chan<- Result{ch}} - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - go g.doCall(c, key, fn) - - return ch -} - -// doCall handles the single call for a key. -func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - normalReturn := false - recovered := false - - // use double-defer to distinguish panic from runtime.Goexit, - // more details see https://golang.org/cl/134395 - defer func() { - // the given function invoked runtime.Goexit - if !normalReturn && !recovered { - c.err = errGoexit - } - - g.mu.Lock() - defer g.mu.Unlock() - c.wg.Done() - if g.m[key] == c { - delete(g.m, key) - } - - if e, ok := c.err.(*panicError); ok { - // In order to prevent the waiting channels from being blocked forever, - // needs to ensure that this panic cannot be recovered. - if len(c.chans) > 0 { - go panic(e) - select {} // Keep this goroutine around so that it will appear in the crash dump. - } else { - panic(e) - } - } else if c.err == errGoexit { - // Already in the process of goexit, no need to call again - } else { - // Normal return - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - } - }() - - func() { - defer func() { - if !normalReturn { - // Ideally, we would wait to take a stack trace until we've determined - // whether this is a panic or a runtime.Goexit. - // - // Unfortunately, the only way we can distinguish the two is to see - // whether the recover stopped the goroutine from terminating, and by - // the time we know that, the part of the stack trace relevant to the - // panic has been discarded. - if r := recover(); r != nil { - c.err = newPanicError(r) - } - } - }() - - c.val, c.err = fn() - normalReturn = true - }() - - if !normalReturn { - recovered = true - } -} - -// Forget tells the singleflight to forget about a key. Future calls -// to Do for this key will call the function rather than waiting for -// an earlier call to complete. -func (g *Group) Forget(key string) { - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() -} diff --git a/vendor/modules.txt b/vendor/modules.txt index fd20aba5e7..0e5832ab9d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -961,7 +961,6 @@ golang.org/x/oauth2/jwt ## explicit golang.org/x/sync/errgroup golang.org/x/sync/semaphore -golang.org/x/sync/singleflight golang.org/x/sync/syncmap # golang.org/x/sys v0.2.0 ## explicit; go 1.17 @@ -1153,3 +1152,6 @@ k8s.io/klog/v2/internal/clock k8s.io/klog/v2/internal/dbg k8s.io/klog/v2/internal/serialize k8s.io/klog/v2/internal/severity +# resenje.org/singleflight v0.3.0 +## explicit; go 1.18 +resenje.org/singleflight diff --git a/vendor/resenje.org/singleflight/LICENSE b/vendor/resenje.org/singleflight/LICENSE new file mode 100644 index 0000000000..9e291b5824 --- /dev/null +++ b/vendor/resenje.org/singleflight/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2019, Janoš Guljaš +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of this project nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/vendor/resenje.org/singleflight/README.md b/vendor/resenje.org/singleflight/README.md new file mode 100644 index 0000000000..39c86f65d7 --- /dev/null +++ b/vendor/resenje.org/singleflight/README.md @@ -0,0 +1,12 @@ +# Singleflight + +[![GoDoc](https://godoc.org/resenje.org/singleflight?status.svg)](https://godoc.org/resenje.org/singleflight) +[![Go](https://github.com/janos/singleflight/workflows/Go/badge.svg)](https://github.com/janos/singleflight/actions?query=workflow%3AGo) + +Package singleflight provides a duplicate function call suppression +mechanism similar to golang.org/x/sync/singleflight but with support +for context cancelation. + +## Installation + +Run `go get resenje.org/singleflight` from command line. \ No newline at end of file diff --git a/vendor/resenje.org/singleflight/singleflight.go b/vendor/resenje.org/singleflight/singleflight.go new file mode 100644 index 0000000000..36b049f56d --- /dev/null +++ b/vendor/resenje.org/singleflight/singleflight.go @@ -0,0 +1,119 @@ +// Copyright (c) 2019, Janoš Guljaš +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism similar to golang.org/x/sync/singleflight with support +// for context cancelation. +package singleflight + +import ( + "context" + "sync" +) + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group[K comparable, V any] struct { + calls map[K]*call[V] // lazily initialized + mu sync.Mutex // protects calls +} + +// Do executes and returns the results of the given function, making sure that +// only one execution is in-flight for a given key at a time. If a duplicate +// comes in, the duplicate caller waits for the original to complete and +// receives the same results. +// +// The context passed to the fn function is a new context which is canceled when +// contexts from all callers are canceled, so that no caller is expecting the +// result. If there are multiple callers, context passed to one caller does not +// effect the execution and returned values of others. +// +// The return value shared indicates whether v was given to multiple callers. +func (g *Group[K, V]) Do(ctx context.Context, key K, fn func(ctx context.Context) (V, error)) (v V, shared bool, err error) { + g.mu.Lock() + if g.calls == nil { + g.calls = make(map[K]*call[V]) + } + + if c, ok := g.calls[key]; ok { + c.shared = true + c.counter++ + g.mu.Unlock() + + return g.wait(ctx, key, c) + } + + callCtx, cancel := context.WithCancel(context.Background()) + + c := &call[V]{ + done: make(chan struct{}), + cancel: cancel, + counter: 1, + } + g.calls[key] = c + g.mu.Unlock() + + go func() { + c.val, c.err = fn(callCtx) + close(c.done) + }() + + return g.wait(ctx, key, c) +} + +// wait for function passed to Do to finish or context to be done. +func (g *Group[K, V]) wait(ctx context.Context, key K, c *call[V]) (v V, shared bool, err error) { + select { + case <-c.done: + v = c.val + err = c.err + case <-ctx.Done(): + err = ctx.Err() + } + g.mu.Lock() + c.counter-- + if c.counter == 0 { + c.cancel() + } + if !c.forgotten { + delete(g.calls, key) + } + g.mu.Unlock() + return v, c.shared, err +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group[K, V]) Forget(key K) { + g.mu.Lock() + if c, ok := g.calls[key]; ok { + c.forgotten = true + } + delete(g.calls, key) + g.mu.Unlock() +} + +// call stores information about as single function call passed to Do function. +type call[V any] struct { + // val and err hold the state about results of the function call. + val V + err error + + // done channel signals that the function call is done. + done chan struct{} + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // shared indicates if results val and err are passed to multiple callers. + shared bool + + // Number of callers that are waiting for the result. + counter int + // Cancel function for the context passed to the executing function. + cancel context.CancelFunc +} diff --git a/volume/service/service.go b/volume/service/service.go index 6432aff474..7030b2a32b 100644 --- a/volume/service/service.go +++ b/volume/service/service.go @@ -18,7 +18,6 @@ import ( "github.com/docker/docker/volume/service/opts" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "golang.org/x/sync/singleflight" ) type ds interface { @@ -38,7 +37,6 @@ type VolumesService struct { ds ds pruneRunning int32 eventLogger VolumeEventLogger - usage singleflight.Group } // NewVolumeService creates a new volume service @@ -192,25 +190,14 @@ var acceptedListFilters = map[string]bool{ // volumes with mount options are not really local even if they are using the // local driver. func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*volumetypes.Volume, error) { - ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) { - ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool { - dv, ok := v.(volume.DetailedVolume) - return ok && len(dv.Options()) == 0 - }))) - if err != nil { - return nil, err - } - return s.volumesToAPI(ctx, ls, calcSize(true)), nil - }) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-ch: - if res.Err != nil { - return nil, res.Err - } - return res.Val.([]*volumetypes.Volume), nil + ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool { + dv, ok := v.(volume.DetailedVolume) + return ok && len(dv.Options()) == 0 + }))) + if err != nil { + return nil, err } + return s.volumesToAPI(ctx, ls, calcSize(true)), nil } // Prune removes (local) volumes which match the past in filter arguments.