daemon/disk_usage: Use context aware singleflight

The singleflight function was capturing the context.Context of the first
caller that invoked the `singleflight.Do`. This could cause all
concurrent calls to be cancelled when the first request is cancelled.

singleflight calls were also moved from the ImageService to Daemon, to
avoid having to implement this logic in both graphdriver and containerd
based image services.

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Paweł Gronowski 2022-11-29 16:46:19 +01:00
parent 85b80ce3bb
commit dec81e489f
No known key found for this signature in database
GPG key ID: B85EFCFE26DEF92A
14 changed files with 259 additions and 345 deletions

View file

@ -6,22 +6,18 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
)
// ImageService implements daemon.ImageService
type ImageService struct {
client *containerd.Client
snapshotter string
usage singleflight.Group
}
// NewService creates a new ImageService.
@ -105,53 +101,17 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error {
// LayerDiskUsage returns the number of bytes used by layer stores
// called from disk_usage.go
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
var allLayersSize int64
snapshotter := i.client.SnapshotService(i.snapshotter)
snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
usage, err := snapshotter.Usage(ctx, info.Name)
if err != nil {
return err
}
allLayersSize += usage.Size
return nil
})
return allLayersSize, nil
})
select {
case <-ctx.Done():
return 0, ctx.Err()
case res := <-ch:
if res.Err != nil {
return 0, res.Err
}
return res.Val.(int64), nil
}
}
// ImageDiskUsage returns information about image data disk usage.
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
// Get all top images with extra attributes
imgs, err := i.Images(ctx, types.ImageListOptions{
Filters: filters.NewArgs(),
SharedSize: true,
ContainerCount: true,
})
var allLayersSize int64
snapshotter := i.client.SnapshotService(i.snapshotter)
snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
usage, err := snapshotter.Usage(ctx, info.Name)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve image list")
return err
}
return imgs, nil
allLayersSize += usage.Size
return nil
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
return res.Val.([]*types.ImageSummary), nil
}
return allLayersSize, nil
}
// UpdateConfig values

View file

@ -25,6 +25,7 @@ import (
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/builder"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/config"
@ -62,10 +63,10 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/bbolt"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"resenje.org/singleflight"
)
// Daemon holds information about the Docker daemon.
@ -105,7 +106,10 @@ type Daemon struct {
seccompProfile []byte
seccompProfilePath string
usage singleflight.Group
usageContainers singleflight.Group[struct{}, []*types.Container]
usageImages singleflight.Group[struct{}, []*types.ImageSummary]
usageVolumes singleflight.Group[struct{}, []*volume.Volume]
usageLayer singleflight.Group[struct{}, int64]
pruneRunning int32
hosts map[string]bool // hosts stores the addresses the daemon is listening on

View file

@ -6,15 +6,18 @@ import (
"github.com/docker/docker/api/server/router/system"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/volume"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
// ContainerDiskUsage returns information about container data disk usage.
func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) {
ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) {
// containerDiskUsage obtains information about container data disk usage
// and makes sure that only one calculation is performed at the same time.
func (daemon *Daemon) containerDiskUsage(ctx context.Context) ([]*types.Container, error) {
res, _, err := daemon.usageContainers.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.Container, error) {
// Retrieve container list
containers, err := daemon.Containers(context.TODO(), &types.ContainerListOptions{
containers, err := daemon.Containers(ctx, &types.ContainerListOptions{
Size: true,
All: true,
})
@ -23,15 +26,52 @@ func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Containe
}
return containers, nil
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-ch:
if res.Err != nil {
return nil, res.Err
return res, err
}
// imageDiskUsage obtains information about image data disk usage from image service
// and makes sure that only one calculation is performed at the same time.
func (daemon *Daemon) imageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
imgs, _, err := daemon.usageImages.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.ImageSummary, error) {
// Get all top images with extra attributes
imgs, err := daemon.imageService.Images(ctx, types.ImageListOptions{
Filters: filters.NewArgs(),
SharedSize: true,
ContainerCount: true,
})
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve image list")
}
return res.Val.([]*types.Container), nil
}
return imgs, nil
})
return imgs, err
}
// localVolumesSize obtains information about volume disk usage from volumes service
// and makes sure that only one size calculation is performed at the same time.
func (daemon *Daemon) localVolumesSize(ctx context.Context) ([]*volume.Volume, error) {
volumes, _, err := daemon.usageVolumes.Do(ctx, struct{}{}, func(ctx context.Context) ([]*volume.Volume, error) {
volumes, err := daemon.volumes.LocalVolumesSize(ctx)
if err != nil {
return nil, err
}
return volumes, nil
})
return volumes, err
}
// layerDiskUsage obtains information about layer disk usage from image service
// and makes sure that only one size calculation is performed at the same time.
func (daemon *Daemon) layerDiskUsage(ctx context.Context) (int64, error) {
usage, _, err := daemon.usageLayer.Do(ctx, struct{}{}, func(ctx context.Context) (int64, error) {
usage, err := daemon.imageService.LayerDiskUsage(ctx)
if err != nil {
return 0, err
}
return usage, nil
})
return usage, err
}
// SystemDiskUsage returns information about the daemon data disk usage.
@ -43,7 +83,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
if opts.Containers {
eg.Go(func() error {
var err error
containers, err = daemon.ContainerDiskUsage(ctx)
containers, err = daemon.containerDiskUsage(ctx)
return err
})
}
@ -55,12 +95,12 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
if opts.Images {
eg.Go(func() error {
var err error
images, err = daemon.imageService.ImageDiskUsage(ctx)
images, err = daemon.imageDiskUsage(ctx)
return err
})
eg.Go(func() error {
var err error
layersSize, err = daemon.imageService.LayerDiskUsage(ctx)
layersSize, err = daemon.layerDiskUsage(ctx)
return err
})
}
@ -69,7 +109,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
if opts.Volumes {
eg.Go(func() error {
var err error
volumes, err = daemon.volumes.LocalVolumesSize(ctx)
volumes, err = daemon.localVolumesSize(ctx)
return err
})
}

View file

@ -35,7 +35,6 @@ type ImageService interface {
LogImageEvent(imageID, refName, action string)
LogImageEventWithAttributes(imageID, refName, action string, attributes map[string]string)
CountImages() int
ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error)
ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error)
ImportImage(ctx context.Context, src string, repository string, platform *v1.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error
TagImage(imageName, repository, tag string) (string, error)

View file

@ -78,6 +78,12 @@ func (i *ImageService) Images(ctx context.Context, opts types.ImageListOptions)
allContainers []*container.Container
)
for id, img := range selectedImages {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if beforeFilter != nil {
if img.Created.Equal(beforeFilter.Created) || img.Created.After(beforeFilter.Created) {
continue

View file

@ -6,8 +6,6 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/container"
daemonevents "github.com/docker/docker/daemon/events"
"github.com/docker/docker/distribution/metadata"
@ -19,7 +17,6 @@ import (
"github.com/docker/libtrust"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
)
type containerStore interface {
@ -85,7 +82,6 @@ type ImageService struct {
leases leases.Manager
content content.Store
contentNamespace string
usage singleflight.Group
}
// DistributionServices provides daemon image storage services
@ -191,32 +187,21 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error {
// LayerDiskUsage returns the number of bytes used by layer stores
// called from disk_usage.go
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
var allLayersSize int64
layerRefs := i.getLayerRefs()
allLayers := i.layerStore.Map()
for _, l := range allLayers {
select {
case <-ctx.Done():
return allLayersSize, ctx.Err()
default:
size := l.DiffSize()
if _, ok := layerRefs[l.ChainID()]; ok {
allLayersSize += size
}
var allLayersSize int64
layerRefs := i.getLayerRefs()
allLayers := i.layerStore.Map()
for _, l := range allLayers {
select {
case <-ctx.Done():
return allLayersSize, ctx.Err()
default:
size := l.DiffSize()
if _, ok := layerRefs[l.ChainID()]; ok {
allLayersSize += size
}
}
return allLayersSize, nil
})
select {
case <-ctx.Done():
return 0, ctx.Err()
case res := <-ch:
if res.Err != nil {
return 0, res.Err
}
return res.Val.(int64), nil
}
return allLayersSize, nil
}
func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
@ -240,31 +225,6 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
return layerRefs
}
// ImageDiskUsage returns information about image data disk usage.
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
// Get all top images with extra attributes
imgs, err := i.Images(ctx, types.ImageListOptions{
Filters: filters.NewArgs(),
SharedSize: true,
ContainerCount: true,
})
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve image list")
}
return imgs, nil
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
return res.Val.([]*types.ImageSummary), nil
}
}
// UpdateConfig values
//
// called from reload.go

View file

@ -89,6 +89,7 @@ require (
google.golang.org/genproto v0.0.0-20220706185917-7780775163c4
google.golang.org/grpc v1.48.0
gotest.tools/v3 v3.4.0
resenje.org/singleflight v0.3.0
)
require (

View file

@ -1884,6 +1884,8 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
resenje.org/singleflight v0.3.0 h1:USJtsAN6HTUA827ksc+2Kcr7QZ4HBq/z/P8ugVbqKFY=
resenje.org/singleflight v0.3.0/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

View file

@ -1,205 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

4
vendor/modules.txt vendored
View file

@ -961,7 +961,6 @@ golang.org/x/oauth2/jwt
## explicit
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
golang.org/x/sync/singleflight
golang.org/x/sync/syncmap
# golang.org/x/sys v0.2.0
## explicit; go 1.17
@ -1153,3 +1152,6 @@ k8s.io/klog/v2/internal/clock
k8s.io/klog/v2/internal/dbg
k8s.io/klog/v2/internal/serialize
k8s.io/klog/v2/internal/severity
# resenje.org/singleflight v0.3.0
## explicit; go 1.18
resenje.org/singleflight

27
vendor/resenje.org/singleflight/LICENSE generated vendored Normal file
View file

@ -0,0 +1,27 @@
Copyright (c) 2019, Janoš Guljaš <janos@resenje.org>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of this project nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

12
vendor/resenje.org/singleflight/README.md generated vendored Normal file
View file

@ -0,0 +1,12 @@
# Singleflight
[![GoDoc](https://godoc.org/resenje.org/singleflight?status.svg)](https://godoc.org/resenje.org/singleflight)
[![Go](https://github.com/janos/singleflight/workflows/Go/badge.svg)](https://github.com/janos/singleflight/actions?query=workflow%3AGo)
Package singleflight provides a duplicate function call suppression
mechanism similar to golang.org/x/sync/singleflight but with support
for context cancelation.
## Installation
Run `go get resenje.org/singleflight` from command line.

119
vendor/resenje.org/singleflight/singleflight.go generated vendored Normal file
View file

@ -0,0 +1,119 @@
// Copyright (c) 2019, Janoš Guljaš <janos@resenje.org>
// All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism similar to golang.org/x/sync/singleflight with support
// for context cancelation.
package singleflight
import (
"context"
"sync"
)
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group[K comparable, V any] struct {
calls map[K]*call[V] // lazily initialized
mu sync.Mutex // protects calls
}
// Do executes and returns the results of the given function, making sure that
// only one execution is in-flight for a given key at a time. If a duplicate
// comes in, the duplicate caller waits for the original to complete and
// receives the same results.
//
// The context passed to the fn function is a new context which is canceled when
// contexts from all callers are canceled, so that no caller is expecting the
// result. If there are multiple callers, context passed to one caller does not
// effect the execution and returned values of others.
//
// The return value shared indicates whether v was given to multiple callers.
func (g *Group[K, V]) Do(ctx context.Context, key K, fn func(ctx context.Context) (V, error)) (v V, shared bool, err error) {
g.mu.Lock()
if g.calls == nil {
g.calls = make(map[K]*call[V])
}
if c, ok := g.calls[key]; ok {
c.shared = true
c.counter++
g.mu.Unlock()
return g.wait(ctx, key, c)
}
callCtx, cancel := context.WithCancel(context.Background())
c := &call[V]{
done: make(chan struct{}),
cancel: cancel,
counter: 1,
}
g.calls[key] = c
g.mu.Unlock()
go func() {
c.val, c.err = fn(callCtx)
close(c.done)
}()
return g.wait(ctx, key, c)
}
// wait for function passed to Do to finish or context to be done.
func (g *Group[K, V]) wait(ctx context.Context, key K, c *call[V]) (v V, shared bool, err error) {
select {
case <-c.done:
v = c.val
err = c.err
case <-ctx.Done():
err = ctx.Err()
}
g.mu.Lock()
c.counter--
if c.counter == 0 {
c.cancel()
}
if !c.forgotten {
delete(g.calls, key)
}
g.mu.Unlock()
return v, c.shared, err
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group[K, V]) Forget(key K) {
g.mu.Lock()
if c, ok := g.calls[key]; ok {
c.forgotten = true
}
delete(g.calls, key)
g.mu.Unlock()
}
// call stores information about as single function call passed to Do function.
type call[V any] struct {
// val and err hold the state about results of the function call.
val V
err error
// done channel signals that the function call is done.
done chan struct{}
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
forgotten bool
// shared indicates if results val and err are passed to multiple callers.
shared bool
// Number of callers that are waiting for the result.
counter int
// Cancel function for the context passed to the executing function.
cancel context.CancelFunc
}

View file

@ -18,7 +18,6 @@ import (
"github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/singleflight"
)
type ds interface {
@ -38,7 +37,6 @@ type VolumesService struct {
ds ds
pruneRunning int32
eventLogger VolumeEventLogger
usage singleflight.Group
}
// NewVolumeService creates a new volume service
@ -192,25 +190,14 @@ var acceptedListFilters = map[string]bool{
// volumes with mount options are not really local even if they are using the
// local driver.
func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*volumetypes.Volume, error) {
ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) {
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
return res.Val.([]*volumetypes.Volume), nil
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
}
// Prune removes (local) volumes which match the past in filter arguments.