Merge pull request #44520 from vvoland/disk-usage-singleflight
daemon/disk_usage: Use context aware singleflight
This commit is contained in:
commit
1907027b7b
14 changed files with 259 additions and 345 deletions
|
@ -6,22 +6,18 @@ import (
|
|||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/snapshots"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/container"
|
||||
"github.com/docker/docker/daemon/images"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/image"
|
||||
"github.com/docker/docker/layer"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
// ImageService implements daemon.ImageService
|
||||
type ImageService struct {
|
||||
client *containerd.Client
|
||||
snapshotter string
|
||||
usage singleflight.Group
|
||||
}
|
||||
|
||||
// NewService creates a new ImageService.
|
||||
|
@ -105,53 +101,17 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error {
|
|||
// LayerDiskUsage returns the number of bytes used by layer stores
|
||||
// called from disk_usage.go
|
||||
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
|
||||
ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
|
||||
var allLayersSize int64
|
||||
snapshotter := i.client.SnapshotService(i.snapshotter)
|
||||
snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
|
||||
usage, err := snapshotter.Usage(ctx, info.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allLayersSize += usage.Size
|
||||
return nil
|
||||
})
|
||||
return allLayersSize, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return 0, res.Err
|
||||
}
|
||||
return res.Val.(int64), nil
|
||||
}
|
||||
}
|
||||
|
||||
// ImageDiskUsage returns information about image data disk usage.
|
||||
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
|
||||
ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
|
||||
// Get all top images with extra attributes
|
||||
imgs, err := i.Images(ctx, types.ImageListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
SharedSize: true,
|
||||
ContainerCount: true,
|
||||
})
|
||||
var allLayersSize int64
|
||||
snapshotter := i.client.SnapshotService(i.snapshotter)
|
||||
snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
|
||||
usage, err := snapshotter.Usage(ctx, info.Name)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to retrieve image list")
|
||||
return err
|
||||
}
|
||||
return imgs, nil
|
||||
allLayersSize += usage.Size
|
||||
return nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*types.ImageSummary), nil
|
||||
}
|
||||
return allLayersSize, nil
|
||||
}
|
||||
|
||||
// UpdateConfig values
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/docker/docker/api/types"
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/api/types/volume"
|
||||
"github.com/docker/docker/builder"
|
||||
"github.com/docker/docker/container"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
|
@ -62,10 +63,10 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"resenje.org/singleflight"
|
||||
)
|
||||
|
||||
// Daemon holds information about the Docker daemon.
|
||||
|
@ -105,7 +106,10 @@ type Daemon struct {
|
|||
seccompProfile []byte
|
||||
seccompProfilePath string
|
||||
|
||||
usage singleflight.Group
|
||||
usageContainers singleflight.Group[struct{}, []*types.Container]
|
||||
usageImages singleflight.Group[struct{}, []*types.ImageSummary]
|
||||
usageVolumes singleflight.Group[struct{}, []*volume.Volume]
|
||||
usageLayer singleflight.Group[struct{}, int64]
|
||||
|
||||
pruneRunning int32
|
||||
hosts map[string]bool // hosts stores the addresses the daemon is listening on
|
||||
|
|
|
@ -6,15 +6,18 @@ import (
|
|||
|
||||
"github.com/docker/docker/api/server/router/system"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/volume"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ContainerDiskUsage returns information about container data disk usage.
|
||||
func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) {
|
||||
ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) {
|
||||
// containerDiskUsage obtains information about container data disk usage
|
||||
// and makes sure that only one calculation is performed at the same time.
|
||||
func (daemon *Daemon) containerDiskUsage(ctx context.Context) ([]*types.Container, error) {
|
||||
res, _, err := daemon.usageContainers.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.Container, error) {
|
||||
// Retrieve container list
|
||||
containers, err := daemon.Containers(context.TODO(), &types.ContainerListOptions{
|
||||
containers, err := daemon.Containers(ctx, &types.ContainerListOptions{
|
||||
Size: true,
|
||||
All: true,
|
||||
})
|
||||
|
@ -23,15 +26,52 @@ func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Containe
|
|||
}
|
||||
return containers, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
return res, err
|
||||
}
|
||||
|
||||
// imageDiskUsage obtains information about image data disk usage from image service
|
||||
// and makes sure that only one calculation is performed at the same time.
|
||||
func (daemon *Daemon) imageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
|
||||
imgs, _, err := daemon.usageImages.Do(ctx, struct{}{}, func(ctx context.Context) ([]*types.ImageSummary, error) {
|
||||
// Get all top images with extra attributes
|
||||
imgs, err := daemon.imageService.Images(ctx, types.ImageListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
SharedSize: true,
|
||||
ContainerCount: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to retrieve image list")
|
||||
}
|
||||
return res.Val.([]*types.Container), nil
|
||||
}
|
||||
return imgs, nil
|
||||
})
|
||||
|
||||
return imgs, err
|
||||
}
|
||||
|
||||
// localVolumesSize obtains information about volume disk usage from volumes service
|
||||
// and makes sure that only one size calculation is performed at the same time.
|
||||
func (daemon *Daemon) localVolumesSize(ctx context.Context) ([]*volume.Volume, error) {
|
||||
volumes, _, err := daemon.usageVolumes.Do(ctx, struct{}{}, func(ctx context.Context) ([]*volume.Volume, error) {
|
||||
volumes, err := daemon.volumes.LocalVolumesSize(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return volumes, nil
|
||||
})
|
||||
return volumes, err
|
||||
}
|
||||
|
||||
// layerDiskUsage obtains information about layer disk usage from image service
|
||||
// and makes sure that only one size calculation is performed at the same time.
|
||||
func (daemon *Daemon) layerDiskUsage(ctx context.Context) (int64, error) {
|
||||
usage, _, err := daemon.usageLayer.Do(ctx, struct{}{}, func(ctx context.Context) (int64, error) {
|
||||
usage, err := daemon.imageService.LayerDiskUsage(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return usage, nil
|
||||
})
|
||||
return usage, err
|
||||
}
|
||||
|
||||
// SystemDiskUsage returns information about the daemon data disk usage.
|
||||
|
@ -43,7 +83,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
|
|||
if opts.Containers {
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
containers, err = daemon.ContainerDiskUsage(ctx)
|
||||
containers, err = daemon.containerDiskUsage(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
@ -55,12 +95,12 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
|
|||
if opts.Images {
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
images, err = daemon.imageService.ImageDiskUsage(ctx)
|
||||
images, err = daemon.imageDiskUsage(ctx)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
layersSize, err = daemon.imageService.LayerDiskUsage(ctx)
|
||||
layersSize, err = daemon.layerDiskUsage(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
@ -69,7 +109,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage
|
|||
if opts.Volumes {
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
volumes, err = daemon.volumes.LocalVolumesSize(ctx)
|
||||
volumes, err = daemon.localVolumesSize(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ type ImageService interface {
|
|||
LogImageEvent(imageID, refName, action string)
|
||||
LogImageEventWithAttributes(imageID, refName, action string, attributes map[string]string)
|
||||
CountImages() int
|
||||
ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error)
|
||||
ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error)
|
||||
ImportImage(ctx context.Context, src string, repository string, platform *v1.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error
|
||||
TagImage(imageName, repository, tag string) (string, error)
|
||||
|
|
|
@ -78,6 +78,12 @@ func (i *ImageService) Images(ctx context.Context, opts types.ImageListOptions)
|
|||
allContainers []*container.Container
|
||||
)
|
||||
for id, img := range selectedImages {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if beforeFilter != nil {
|
||||
if img.Created.Equal(beforeFilter.Created) || img.Created.After(beforeFilter.Created) {
|
||||
continue
|
||||
|
|
|
@ -6,8 +6,6 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/container"
|
||||
daemonevents "github.com/docker/docker/daemon/events"
|
||||
"github.com/docker/docker/distribution/metadata"
|
||||
|
@ -19,7 +17,6 @@ import (
|
|||
"github.com/docker/libtrust"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type containerStore interface {
|
||||
|
@ -85,7 +82,6 @@ type ImageService struct {
|
|||
leases leases.Manager
|
||||
content content.Store
|
||||
contentNamespace string
|
||||
usage singleflight.Group
|
||||
}
|
||||
|
||||
// DistributionServices provides daemon image storage services
|
||||
|
@ -191,32 +187,21 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer) error {
|
|||
// LayerDiskUsage returns the number of bytes used by layer stores
|
||||
// called from disk_usage.go
|
||||
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
|
||||
ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) {
|
||||
var allLayersSize int64
|
||||
layerRefs := i.getLayerRefs()
|
||||
allLayers := i.layerStore.Map()
|
||||
for _, l := range allLayers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return allLayersSize, ctx.Err()
|
||||
default:
|
||||
size := l.DiffSize()
|
||||
if _, ok := layerRefs[l.ChainID()]; ok {
|
||||
allLayersSize += size
|
||||
}
|
||||
var allLayersSize int64
|
||||
layerRefs := i.getLayerRefs()
|
||||
allLayers := i.layerStore.Map()
|
||||
for _, l := range allLayers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return allLayersSize, ctx.Err()
|
||||
default:
|
||||
size := l.DiffSize()
|
||||
if _, ok := layerRefs[l.ChainID()]; ok {
|
||||
allLayersSize += size
|
||||
}
|
||||
}
|
||||
return allLayersSize, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return 0, res.Err
|
||||
}
|
||||
return res.Val.(int64), nil
|
||||
}
|
||||
return allLayersSize, nil
|
||||
}
|
||||
|
||||
func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
|
||||
|
@ -240,31 +225,6 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
|
|||
return layerRefs
|
||||
}
|
||||
|
||||
// ImageDiskUsage returns information about image data disk usage.
|
||||
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
|
||||
ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) {
|
||||
// Get all top images with extra attributes
|
||||
imgs, err := i.Images(ctx, types.ImageListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
SharedSize: true,
|
||||
ContainerCount: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to retrieve image list")
|
||||
}
|
||||
return imgs, nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*types.ImageSummary), nil
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig values
|
||||
//
|
||||
// called from reload.go
|
||||
|
|
|
@ -89,6 +89,7 @@ require (
|
|||
google.golang.org/genproto v0.0.0-20220706185917-7780775163c4
|
||||
google.golang.org/grpc v1.50.1
|
||||
gotest.tools/v3 v3.4.0
|
||||
resenje.org/singleflight v0.3.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
|
@ -1888,6 +1888,8 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
|
|||
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
|
||||
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
|
||||
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
|
||||
resenje.org/singleflight v0.3.0 h1:USJtsAN6HTUA827ksc+2Kcr7QZ4HBq/z/P8ugVbqKFY=
|
||||
resenje.org/singleflight v0.3.0/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
|
|
205
vendor/golang.org/x/sync/singleflight/singleflight.go
generated
vendored
205
vendor/golang.org/x/sync/singleflight/singleflight.go
generated
vendored
|
@ -1,205 +0,0 @@
|
|||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package singleflight provides a duplicate function call suppression
|
||||
// mechanism.
|
||||
package singleflight // import "golang.org/x/sync/singleflight"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// errGoexit indicates the runtime.Goexit was called in
|
||||
// the user given function.
|
||||
var errGoexit = errors.New("runtime.Goexit was called")
|
||||
|
||||
// A panicError is an arbitrary value recovered from a panic
|
||||
// with the stack trace during the execution of given function.
|
||||
type panicError struct {
|
||||
value interface{}
|
||||
stack []byte
|
||||
}
|
||||
|
||||
// Error implements error interface.
|
||||
func (p *panicError) Error() string {
|
||||
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
|
||||
}
|
||||
|
||||
func newPanicError(v interface{}) error {
|
||||
stack := debug.Stack()
|
||||
|
||||
// The first line of the stack trace is of the form "goroutine N [status]:"
|
||||
// but by the time the panic reaches Do the goroutine may no longer exist
|
||||
// and its status will have changed. Trim out the misleading line.
|
||||
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
|
||||
stack = stack[line+1:]
|
||||
}
|
||||
return &panicError{value: v, stack: stack}
|
||||
}
|
||||
|
||||
// call is an in-flight or completed singleflight.Do call
|
||||
type call struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// These fields are written once before the WaitGroup is done
|
||||
// and are only read after the WaitGroup is done.
|
||||
val interface{}
|
||||
err error
|
||||
|
||||
// These fields are read and written with the singleflight
|
||||
// mutex held before the WaitGroup is done, and are read but
|
||||
// not written after the WaitGroup is done.
|
||||
dups int
|
||||
chans []chan<- Result
|
||||
}
|
||||
|
||||
// Group represents a class of work and forms a namespace in
|
||||
// which units of work can be executed with duplicate suppression.
|
||||
type Group struct {
|
||||
mu sync.Mutex // protects m
|
||||
m map[string]*call // lazily initialized
|
||||
}
|
||||
|
||||
// Result holds the results of Do, so they can be passed
|
||||
// on a channel.
|
||||
type Result struct {
|
||||
Val interface{}
|
||||
Err error
|
||||
Shared bool
|
||||
}
|
||||
|
||||
// Do executes and returns the results of the given function, making
|
||||
// sure that only one execution is in-flight for a given key at a
|
||||
// time. If a duplicate comes in, the duplicate caller waits for the
|
||||
// original to complete and receives the same results.
|
||||
// The return value shared indicates whether v was given to multiple callers.
|
||||
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
g.mu.Unlock()
|
||||
c.wg.Wait()
|
||||
|
||||
if e, ok := c.err.(*panicError); ok {
|
||||
panic(e)
|
||||
} else if c.err == errGoexit {
|
||||
runtime.Goexit()
|
||||
}
|
||||
return c.val, c.err, true
|
||||
}
|
||||
c := new(call)
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
g.doCall(c, key, fn)
|
||||
return c.val, c.err, c.dups > 0
|
||||
}
|
||||
|
||||
// DoChan is like Do but returns a channel that will receive the
|
||||
// results when they are ready.
|
||||
//
|
||||
// The returned channel will not be closed.
|
||||
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
||||
ch := make(chan Result, 1)
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
c.chans = append(c.chans, ch)
|
||||
g.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
c := &call{chans: []chan<- Result{ch}}
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
go g.doCall(c, key, fn)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// doCall handles the single call for a key.
|
||||
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
||||
normalReturn := false
|
||||
recovered := false
|
||||
|
||||
// use double-defer to distinguish panic from runtime.Goexit,
|
||||
// more details see https://golang.org/cl/134395
|
||||
defer func() {
|
||||
// the given function invoked runtime.Goexit
|
||||
if !normalReturn && !recovered {
|
||||
c.err = errGoexit
|
||||
}
|
||||
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
c.wg.Done()
|
||||
if g.m[key] == c {
|
||||
delete(g.m, key)
|
||||
}
|
||||
|
||||
if e, ok := c.err.(*panicError); ok {
|
||||
// In order to prevent the waiting channels from being blocked forever,
|
||||
// needs to ensure that this panic cannot be recovered.
|
||||
if len(c.chans) > 0 {
|
||||
go panic(e)
|
||||
select {} // Keep this goroutine around so that it will appear in the crash dump.
|
||||
} else {
|
||||
panic(e)
|
||||
}
|
||||
} else if c.err == errGoexit {
|
||||
// Already in the process of goexit, no need to call again
|
||||
} else {
|
||||
// Normal return
|
||||
for _, ch := range c.chans {
|
||||
ch <- Result{c.val, c.err, c.dups > 0}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
if !normalReturn {
|
||||
// Ideally, we would wait to take a stack trace until we've determined
|
||||
// whether this is a panic or a runtime.Goexit.
|
||||
//
|
||||
// Unfortunately, the only way we can distinguish the two is to see
|
||||
// whether the recover stopped the goroutine from terminating, and by
|
||||
// the time we know that, the part of the stack trace relevant to the
|
||||
// panic has been discarded.
|
||||
if r := recover(); r != nil {
|
||||
c.err = newPanicError(r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
c.val, c.err = fn()
|
||||
normalReturn = true
|
||||
}()
|
||||
|
||||
if !normalReturn {
|
||||
recovered = true
|
||||
}
|
||||
}
|
||||
|
||||
// Forget tells the singleflight to forget about a key. Future calls
|
||||
// to Do for this key will call the function rather than waiting for
|
||||
// an earlier call to complete.
|
||||
func (g *Group) Forget(key string) {
|
||||
g.mu.Lock()
|
||||
delete(g.m, key)
|
||||
g.mu.Unlock()
|
||||
}
|
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
|
@ -961,7 +961,6 @@ golang.org/x/oauth2/jwt
|
|||
## explicit
|
||||
golang.org/x/sync/errgroup
|
||||
golang.org/x/sync/semaphore
|
||||
golang.org/x/sync/singleflight
|
||||
golang.org/x/sync/syncmap
|
||||
# golang.org/x/sys v0.2.0
|
||||
## explicit; go 1.17
|
||||
|
@ -1153,3 +1152,6 @@ k8s.io/klog/v2/internal/clock
|
|||
k8s.io/klog/v2/internal/dbg
|
||||
k8s.io/klog/v2/internal/serialize
|
||||
k8s.io/klog/v2/internal/severity
|
||||
# resenje.org/singleflight v0.3.0
|
||||
## explicit; go 1.18
|
||||
resenje.org/singleflight
|
||||
|
|
27
vendor/resenje.org/singleflight/LICENSE
generated
vendored
Normal file
27
vendor/resenje.org/singleflight/LICENSE
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
Copyright (c) 2019, Janoš Guljaš <janos@resenje.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name of this project nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
12
vendor/resenje.org/singleflight/README.md
generated
vendored
Normal file
12
vendor/resenje.org/singleflight/README.md
generated
vendored
Normal file
|
@ -0,0 +1,12 @@
|
|||
# Singleflight
|
||||
|
||||
[](https://godoc.org/resenje.org/singleflight)
|
||||
[](https://github.com/janos/singleflight/actions?query=workflow%3AGo)
|
||||
|
||||
Package singleflight provides a duplicate function call suppression
|
||||
mechanism similar to golang.org/x/sync/singleflight but with support
|
||||
for context cancelation.
|
||||
|
||||
## Installation
|
||||
|
||||
Run `go get resenje.org/singleflight` from command line.
|
119
vendor/resenje.org/singleflight/singleflight.go
generated
vendored
Normal file
119
vendor/resenje.org/singleflight/singleflight.go
generated
vendored
Normal file
|
@ -0,0 +1,119 @@
|
|||
// Copyright (c) 2019, Janoš Guljaš <janos@resenje.org>
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package singleflight provides a duplicate function call suppression
|
||||
// mechanism similar to golang.org/x/sync/singleflight with support
|
||||
// for context cancelation.
|
||||
package singleflight
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Group represents a class of work and forms a namespace in
|
||||
// which units of work can be executed with duplicate suppression.
|
||||
type Group[K comparable, V any] struct {
|
||||
calls map[K]*call[V] // lazily initialized
|
||||
mu sync.Mutex // protects calls
|
||||
}
|
||||
|
||||
// Do executes and returns the results of the given function, making sure that
|
||||
// only one execution is in-flight for a given key at a time. If a duplicate
|
||||
// comes in, the duplicate caller waits for the original to complete and
|
||||
// receives the same results.
|
||||
//
|
||||
// The context passed to the fn function is a new context which is canceled when
|
||||
// contexts from all callers are canceled, so that no caller is expecting the
|
||||
// result. If there are multiple callers, context passed to one caller does not
|
||||
// effect the execution and returned values of others.
|
||||
//
|
||||
// The return value shared indicates whether v was given to multiple callers.
|
||||
func (g *Group[K, V]) Do(ctx context.Context, key K, fn func(ctx context.Context) (V, error)) (v V, shared bool, err error) {
|
||||
g.mu.Lock()
|
||||
if g.calls == nil {
|
||||
g.calls = make(map[K]*call[V])
|
||||
}
|
||||
|
||||
if c, ok := g.calls[key]; ok {
|
||||
c.shared = true
|
||||
c.counter++
|
||||
g.mu.Unlock()
|
||||
|
||||
return g.wait(ctx, key, c)
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
c := &call[V]{
|
||||
done: make(chan struct{}),
|
||||
cancel: cancel,
|
||||
counter: 1,
|
||||
}
|
||||
g.calls[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
c.val, c.err = fn(callCtx)
|
||||
close(c.done)
|
||||
}()
|
||||
|
||||
return g.wait(ctx, key, c)
|
||||
}
|
||||
|
||||
// wait for function passed to Do to finish or context to be done.
|
||||
func (g *Group[K, V]) wait(ctx context.Context, key K, c *call[V]) (v V, shared bool, err error) {
|
||||
select {
|
||||
case <-c.done:
|
||||
v = c.val
|
||||
err = c.err
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
}
|
||||
g.mu.Lock()
|
||||
c.counter--
|
||||
if c.counter == 0 {
|
||||
c.cancel()
|
||||
}
|
||||
if !c.forgotten {
|
||||
delete(g.calls, key)
|
||||
}
|
||||
g.mu.Unlock()
|
||||
return v, c.shared, err
|
||||
}
|
||||
|
||||
// Forget tells the singleflight to forget about a key. Future calls
|
||||
// to Do for this key will call the function rather than waiting for
|
||||
// an earlier call to complete.
|
||||
func (g *Group[K, V]) Forget(key K) {
|
||||
g.mu.Lock()
|
||||
if c, ok := g.calls[key]; ok {
|
||||
c.forgotten = true
|
||||
}
|
||||
delete(g.calls, key)
|
||||
g.mu.Unlock()
|
||||
}
|
||||
|
||||
// call stores information about as single function call passed to Do function.
|
||||
type call[V any] struct {
|
||||
// val and err hold the state about results of the function call.
|
||||
val V
|
||||
err error
|
||||
|
||||
// done channel signals that the function call is done.
|
||||
done chan struct{}
|
||||
|
||||
// forgotten indicates whether Forget was called with this call's key
|
||||
// while the call was still in flight.
|
||||
forgotten bool
|
||||
|
||||
// shared indicates if results val and err are passed to multiple callers.
|
||||
shared bool
|
||||
|
||||
// Number of callers that are waiting for the result.
|
||||
counter int
|
||||
// Cancel function for the context passed to the executing function.
|
||||
cancel context.CancelFunc
|
||||
}
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/docker/docker/volume/service/opts"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type ds interface {
|
||||
|
@ -38,7 +37,6 @@ type VolumesService struct {
|
|||
ds ds
|
||||
pruneRunning int32
|
||||
eventLogger VolumeEventLogger
|
||||
usage singleflight.Group
|
||||
}
|
||||
|
||||
// NewVolumeService creates a new volume service
|
||||
|
@ -192,25 +190,14 @@ var acceptedListFilters = map[string]bool{
|
|||
// volumes with mount options are not really local even if they are using the
|
||||
// local driver.
|
||||
func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*volumetypes.Volume, error) {
|
||||
ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) {
|
||||
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
|
||||
dv, ok := v.(volume.DetailedVolume)
|
||||
return ok && len(dv.Options()) == 0
|
||||
})))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
return res.Val.([]*volumetypes.Volume), nil
|
||||
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
|
||||
dv, ok := v.(volume.DetailedVolume)
|
||||
return ok && len(dv.Options()) == 0
|
||||
})))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
|
||||
}
|
||||
|
||||
// Prune removes (local) volumes which match the past in filter arguments.
|
||||
|
|
Loading…
Add table
Reference in a new issue