瀏覽代碼

Merge pull request #32574 from mlaventure/add-ctx-to-df-prune

Add ctx to df and prune
Kenfe-Mickaël Laventure 8 年之前
父節點
當前提交
d4bf6ad716

+ 1 - 1
api/server/router/container/backend.go

@@ -65,7 +65,7 @@ type attachBackend interface {
 
 // systemBackend includes functions to implement to provide system wide containers functionality
 type systemBackend interface {
-	ContainersPrune(pruneFilters filters.Args) (*types.ContainersPruneReport, error)
+	ContainersPrune(ctx context.Context, pruneFilters filters.Args) (*types.ContainersPruneReport, error)
 }
 
 // Backend is all the methods that need to be implemented to provide container specific functionality.

+ 1 - 1
api/server/router/container/container.go

@@ -68,7 +68,7 @@ func (r *containerRouter) initRoutes() {
 		router.NewPostRoute("/exec/{name:.*}/resize", r.postContainerExecResize),
 		router.NewPostRoute("/containers/{name:.*}/rename", r.postContainerRename),
 		router.NewPostRoute("/containers/{name:.*}/update", r.postContainerUpdate),
-		router.NewPostRoute("/containers/prune", r.postContainersPrune),
+		router.NewPostRoute("/containers/prune", r.postContainersPrune, router.WithCancel),
 		// PUT
 		router.NewPutRoute("/containers/{name:.*}/archive", r.putContainersArchive),
 		// DELETE

+ 1 - 1
api/server/router/container/container_routes.go

@@ -565,7 +565,7 @@ func (s *containerRouter) postContainersPrune(ctx context.Context, w http.Respon
 		return err
 	}
 
-	pruneReport, err := s.backend.ContainersPrune(pruneFilters)
+	pruneReport, err := s.backend.ContainersPrune(ctx, pruneFilters)
 	if err != nil {
 		return err
 	}

+ 1 - 1
api/server/router/image/backend.go

@@ -30,7 +30,7 @@ type imageBackend interface {
 	Images(imageFilters filters.Args, all bool, withExtraAttrs bool) ([]*types.ImageSummary, error)
 	LookupImage(name string) (*types.ImageInspect, error)
 	TagImage(imageName, repository, tag string) error
-	ImagesPrune(pruneFilters filters.Args) (*types.ImagesPruneReport, error)
+	ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error)
 }
 
 type importExportBackend interface {

+ 1 - 1
api/server/router/image/image.go

@@ -43,7 +43,7 @@ func (r *imageRouter) initRoutes() {
 		router.NewPostRoute("/images/create", r.postImagesCreate, router.WithCancel),
 		router.NewPostRoute("/images/{name:.*}/push", r.postImagesPush, router.WithCancel),
 		router.NewPostRoute("/images/{name:.*}/tag", r.postImagesTag),
-		router.NewPostRoute("/images/prune", r.postImagesPrune),
+		router.NewPostRoute("/images/prune", r.postImagesPrune, router.WithCancel),
 		// DELETE
 		router.NewDeleteRoute("/images/{name:.*}", r.deleteImages),
 	}

+ 1 - 1
api/server/router/image/image_routes.go

@@ -336,7 +336,7 @@ func (s *imageRouter) postImagesPrune(ctx context.Context, w http.ResponseWriter
 		return err
 	}
 
-	pruneReport, err := s.backend.ImagesPrune(pruneFilters)
+	pruneReport, err := s.backend.ImagesPrune(ctx, pruneFilters)
 	if err != nil {
 		return err
 	}

+ 3 - 1
api/server/router/network/backend.go

@@ -1,6 +1,8 @@
 package network
 
 import (
+	"golang.org/x/net/context"
+
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/network"
@@ -16,5 +18,5 @@ type Backend interface {
 	ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
 	DisconnectContainerFromNetwork(containerName string, networkName string, force bool) error
 	DeleteNetwork(name string) error
-	NetworksPrune(pruneFilters filters.Args) (*types.NetworksPruneReport, error)
+	NetworksPrune(ctx context.Context, pruneFilters filters.Args) (*types.NetworksPruneReport, error)
 }

+ 1 - 1
api/server/router/network/network.go

@@ -37,7 +37,7 @@ func (r *networkRouter) initRoutes() {
 		router.NewPostRoute("/networks/create", r.postNetworkCreate),
 		router.NewPostRoute("/networks/{id:.*}/connect", r.postNetworkConnect),
 		router.NewPostRoute("/networks/{id:.*}/disconnect", r.postNetworkDisconnect),
-		router.NewPostRoute("/networks/prune", r.postNetworksPrune),
+		router.NewPostRoute("/networks/prune", r.postNetworksPrune, router.WithCancel),
 		// DELETE
 		router.NewDeleteRoute("/networks/{id:.*}", r.deleteNetwork),
 	}

+ 1 - 1
api/server/router/network/network_routes.go

@@ -455,7 +455,7 @@ func (n *networkRouter) postNetworksPrune(ctx context.Context, w http.ResponseWr
 		return err
 	}
 
-	pruneReport, err := n.backend.NetworksPrune(pruneFilters)
+	pruneReport, err := n.backend.NetworksPrune(ctx, pruneFilters)
 	if err != nil {
 		return err
 	}

+ 1 - 1
api/server/router/system/backend.go

@@ -14,7 +14,7 @@ import (
 type Backend interface {
 	SystemInfo() (*types.Info, error)
 	SystemVersion() types.Version
-	SystemDiskUsage() (*types.DiskUsage, error)
+	SystemDiskUsage(ctx context.Context) (*types.DiskUsage, error)
 	SubscribeToEvents(since, until time.Time, ef filters.Args) ([]events.Message, chan interface{})
 	UnsubscribeFromEvents(chan interface{})
 	AuthenticateToRegistry(ctx context.Context, authConfig *types.AuthConfig) (string, string, error)

+ 1 - 1
api/server/router/system/system.go

@@ -26,7 +26,7 @@ func NewRouter(b Backend, c *cluster.Cluster) router.Router {
 		router.NewGetRoute("/events", r.getEvents, router.WithCancel),
 		router.NewGetRoute("/info", r.getInfo),
 		router.NewGetRoute("/version", r.getVersion),
-		router.NewGetRoute("/system/df", r.getDiskUsage),
+		router.NewGetRoute("/system/df", r.getDiskUsage, router.WithCancel),
 		router.NewPostRoute("/auth", r.postAuth),
 	}
 

+ 1 - 1
api/server/router/system/system_routes.go

@@ -71,7 +71,7 @@ func (s *systemRouter) getVersion(ctx context.Context, w http.ResponseWriter, r
 }
 
 func (s *systemRouter) getDiskUsage(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
-	du, err := s.backend.SystemDiskUsage()
+	du, err := s.backend.SystemDiskUsage(ctx)
 	if err != nil {
 		return err
 	}

+ 3 - 1
api/server/router/volume/backend.go

@@ -1,6 +1,8 @@
 package volume
 
 import (
+	"golang.org/x/net/context"
+
 	// TODO return types need to be refactored into pkg
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
@@ -13,5 +15,5 @@ type Backend interface {
 	VolumeInspect(name string) (*types.Volume, error)
 	VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error)
 	VolumeRm(name string, force bool) error
-	VolumesPrune(pruneFilters filters.Args) (*types.VolumesPruneReport, error)
+	VolumesPrune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error)
 }

+ 1 - 1
api/server/router/volume/volume.go

@@ -29,7 +29,7 @@ func (r *volumeRouter) initRoutes() {
 		router.NewGetRoute("/volumes/{name:.*}", r.getVolumeByName),
 		// POST
 		router.NewPostRoute("/volumes/create", r.postVolumesCreate),
-		router.NewPostRoute("/volumes/prune", r.postVolumesPrune),
+		router.NewPostRoute("/volumes/prune", r.postVolumesPrune, router.WithCancel),
 		// DELETE
 		router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes),
 	}

+ 1 - 1
api/server/router/volume/volume_routes.go

@@ -77,7 +77,7 @@ func (v *volumeRouter) postVolumesPrune(ctx context.Context, w http.ResponseWrit
 		return err
 	}
 
-	pruneReport, err := v.backend.VolumesPrune(pruneFilters)
+	pruneReport, err := v.backend.VolumesPrune(ctx, pruneFilters)
 	if err != nil {
 		return err
 	}

+ 3 - 0
daemon/daemon.go

@@ -111,6 +111,9 @@ type Daemon struct {
 
 	seccompProfile     []byte
 	seccompProfilePath string
+
+	diskUsageRunning int32
+	pruneRunning     int32
 }
 
 // HasExperimental returns whether the experimental features of the daemon are enabled or not

+ 35 - 18
daemon/disk_usage.go

@@ -2,6 +2,9 @@ package daemon
 
 import (
 	"fmt"
+	"sync/atomic"
+
+	"golang.org/x/net/context"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/api/types"
@@ -34,7 +37,12 @@ func (daemon *Daemon) getLayerRefs() map[layer.ChainID]int {
 }
 
 // SystemDiskUsage returns information about the daemon data disk usage
-func (daemon *Daemon) SystemDiskUsage() (*types.DiskUsage, error) {
+func (daemon *Daemon) SystemDiskUsage(ctx context.Context) (*types.DiskUsage, error) {
+	if !atomic.CompareAndSwapInt32(&daemon.diskUsageRunning, 0, 1) {
+		return nil, fmt.Errorf("a disk usage operation is already running")
+	}
+	defer atomic.StoreInt32(&daemon.diskUsageRunning, 0)
+
 	// Retrieve container list
 	allContainers, err := daemon.Containers(&types.ContainerListOptions{
 		Size: true,
@@ -53,17 +61,22 @@ func (daemon *Daemon) SystemDiskUsage() (*types.DiskUsage, error) {
 	// Get all local volumes
 	allVolumes := []*types.Volume{}
 	getLocalVols := func(v volume.Volume) error {
-		name := v.Name()
-		refs := daemon.volumes.Refs(v)
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			name := v.Name()
+			refs := daemon.volumes.Refs(v)
 
-		tv := volumeToAPIType(v)
-		sz, err := directory.Size(v.Path())
-		if err != nil {
-			logrus.Warnf("failed to determine size of volume %v", name)
-			sz = -1
+			tv := volumeToAPIType(v)
+			sz, err := directory.Size(v.Path())
+			if err != nil {
+				logrus.Warnf("failed to determine size of volume %v", name)
+				sz = -1
+			}
+			tv.UsageData = &types.VolumeUsageData{Size: sz, RefCount: int64(len(refs))}
+			allVolumes = append(allVolumes, tv)
 		}
-		tv.UsageData = &types.VolumeUsageData{Size: sz, RefCount: int64(len(refs))}
-		allVolumes = append(allVolumes, tv)
 
 		return nil
 	}
@@ -78,17 +91,21 @@ func (daemon *Daemon) SystemDiskUsage() (*types.DiskUsage, error) {
 	allLayers := daemon.layerStore.Map()
 	var allLayersSize int64
 	for _, l := range allLayers {
-		size, err := l.DiffSize()
-		if err == nil {
-			if _, ok := layerRefs[l.ChainID()]; ok {
-				allLayersSize += size
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			size, err := l.DiffSize()
+			if err == nil {
+				if _, ok := layerRefs[l.ChainID()]; ok {
+					allLayersSize += size
+				} else {
+					logrus.Warnf("found leaked image layer %v", l.ChainID())
+				}
 			} else {
-				logrus.Warnf("found leaked image layer %v", l.ChainID())
+				logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
 			}
-		} else {
-			logrus.Warnf("failed to get diff size for layer %v", l.ChainID())
 		}
-
 	}
 
 	return &types.DiskUsage{

+ 125 - 40
daemon/prune.go

@@ -3,6 +3,7 @@ package daemon
 import (
 	"fmt"
 	"regexp"
+	"sync/atomic"
 	"time"
 
 	"github.com/Sirupsen/logrus"
@@ -17,10 +18,22 @@ import (
 	"github.com/docker/docker/volume"
 	"github.com/docker/libnetwork"
 	digest "github.com/opencontainers/go-digest"
+	"golang.org/x/net/context"
+)
+
+var (
+	// ErrPruneRunning is returned when a prune request is received while
+	// one is in progress
+	ErrPruneRunning = fmt.Errorf("a prune operation is already running")
 )
 
 // ContainersPrune removes unused containers
-func (daemon *Daemon) ContainersPrune(pruneFilters filters.Args) (*types.ContainersPruneReport, error) {
+func (daemon *Daemon) ContainersPrune(ctx context.Context, pruneFilters filters.Args) (*types.ContainersPruneReport, error) {
+	if !atomic.CompareAndSwapInt32(&daemon.pruneRunning, 0, 1) {
+		return nil, ErrPruneRunning
+	}
+	defer atomic.StoreInt32(&daemon.pruneRunning, 0)
+
 	rep := &types.ContainersPruneReport{}
 
 	until, err := getUntilFromPruneFilters(pruneFilters)
@@ -30,6 +43,13 @@ func (daemon *Daemon) ContainersPrune(pruneFilters filters.Args) (*types.Contain
 
 	allContainers := daemon.List()
 	for _, c := range allContainers {
+		select {
+		case <-ctx.Done():
+			logrus.Warnf("ContainersPrune operation cancelled: %#v", *rep)
+			return rep, ctx.Err()
+		default:
+		}
+
 		if !c.IsRunning() {
 			if !until.IsZero() && c.Created.After(until) {
 				continue
@@ -55,10 +75,22 @@ func (daemon *Daemon) ContainersPrune(pruneFilters filters.Args) (*types.Contain
 }
 
 // VolumesPrune removes unused local volumes
-func (daemon *Daemon) VolumesPrune(pruneFilters filters.Args) (*types.VolumesPruneReport, error) {
+func (daemon *Daemon) VolumesPrune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error) {
+	if !atomic.CompareAndSwapInt32(&daemon.pruneRunning, 0, 1) {
+		return nil, ErrPruneRunning
+	}
+	defer atomic.StoreInt32(&daemon.pruneRunning, 0)
+
 	rep := &types.VolumesPruneReport{}
 
 	pruneVols := func(v volume.Volume) error {
+		select {
+		case <-ctx.Done():
+			logrus.Warnf("VolumesPrune operation cancelled: %#v", *rep)
+			return ctx.Err()
+		default:
+		}
+
 		name := v.Name()
 		refs := daemon.volumes.Refs(v)
 
@@ -91,7 +123,12 @@ func (daemon *Daemon) VolumesPrune(pruneFilters filters.Args) (*types.VolumesPru
 }
 
 // ImagesPrune removes unused images
-func (daemon *Daemon) ImagesPrune(pruneFilters filters.Args) (*types.ImagesPruneReport, error) {
+func (daemon *Daemon) ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error) {
+	if !atomic.CompareAndSwapInt32(&daemon.pruneRunning, 0, 1) {
+		return nil, ErrPruneRunning
+	}
+	defer atomic.StoreInt32(&daemon.pruneRunning, 0)
+
 	rep := &types.ImagesPruneReport{}
 
 	danglingOnly := true
@@ -117,27 +154,47 @@ func (daemon *Daemon) ImagesPrune(pruneFilters filters.Args) (*types.ImagesPrune
 	allContainers := daemon.List()
 	imageRefs := map[string]bool{}
 	for _, c := range allContainers {
-		imageRefs[c.ID] = true
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			imageRefs[c.ID] = true
+		}
 	}
 
 	// Filter intermediary images and get their unique size
 	allLayers := daemon.layerStore.Map()
 	topImages := map[image.ID]*image.Image{}
 	for id, img := range allImages {
-		dgst := digest.Digest(id)
-		if len(daemon.referenceStore.References(dgst)) == 0 && len(daemon.imageStore.Children(id)) != 0 {
-			continue
-		}
-		if !until.IsZero() && img.Created.After(until) {
-			continue
-		}
-		if !matchLabels(pruneFilters, img.Config.Labels) {
-			continue
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			dgst := digest.Digest(id)
+			if len(daemon.referenceStore.References(dgst)) == 0 && len(daemon.imageStore.Children(id)) != 0 {
+				continue
+			}
+			if !until.IsZero() && img.Created.After(until) {
+				continue
+			}
+			if !matchLabels(pruneFilters, img.Config.Labels) {
+				continue
+			}
+			topImages[id] = img
 		}
-		topImages[id] = img
 	}
 
+	canceled := false
+deleteImagesLoop:
 	for id := range topImages {
+		select {
+		case <-ctx.Done():
+			// we still want to calculate freed size and return the data
+			canceled = true
+			break deleteImagesLoop
+		default:
+		}
+
 		dgst := digest.Digest(id)
 		hex := dgst.Hex()
 		if _, ok := imageRefs[hex]; ok {
@@ -198,17 +255,27 @@ func (daemon *Daemon) ImagesPrune(pruneFilters filters.Args) (*types.ImagesPrune
 		}
 	}
 
+	if canceled {
+		logrus.Warnf("ImagesPrune operation cancelled: %#v", *rep)
+		return nil, ctx.Err()
+	}
+
 	return rep, nil
 }
 
 // localNetworksPrune removes unused local networks
-func (daemon *Daemon) localNetworksPrune(pruneFilters filters.Args) *types.NetworksPruneReport {
+func (daemon *Daemon) localNetworksPrune(ctx context.Context, pruneFilters filters.Args) *types.NetworksPruneReport {
 	rep := &types.NetworksPruneReport{}
 
 	until, _ := getUntilFromPruneFilters(pruneFilters)
 
 	// When the function returns true, the walk will stop.
 	l := func(nw libnetwork.Network) bool {
+		select {
+		case <-ctx.Done():
+			return true
+		default:
+		}
 		if !until.IsZero() && nw.Info().Created().After(until) {
 			return false
 		}
@@ -234,7 +301,7 @@ func (daemon *Daemon) localNetworksPrune(pruneFilters filters.Args) *types.Netwo
 }
 
 // clusterNetworksPrune removes unused cluster networks
-func (daemon *Daemon) clusterNetworksPrune(pruneFilters filters.Args) (*types.NetworksPruneReport, error) {
+func (daemon *Daemon) clusterNetworksPrune(ctx context.Context, pruneFilters filters.Args) (*types.NetworksPruneReport, error) {
 	rep := &types.NetworksPruneReport{}
 
 	until, _ := getUntilFromPruneFilters(pruneFilters)
@@ -251,46 +318,64 @@ func (daemon *Daemon) clusterNetworksPrune(pruneFilters filters.Args) (*types.Ne
 	}
 	networkIsInUse := regexp.MustCompile(`network ([[:alnum:]]+) is in use`)
 	for _, nw := range networks {
-		if nw.Ingress {
-			// Routing-mesh network removal has to be explicitly invoked by user
-			continue
-		}
-		if !until.IsZero() && nw.Created.After(until) {
-			continue
-		}
-		if !matchLabels(pruneFilters, nw.Labels) {
-			continue
-		}
-		// https://github.com/docker/docker/issues/24186
-		// `docker network inspect` unfortunately displays ONLY those containers that are local to that node.
-		// So we try to remove it anyway and check the error
-		err = cluster.RemoveNetwork(nw.ID)
-		if err != nil {
-			// we can safely ignore the "network .. is in use" error
-			match := networkIsInUse.FindStringSubmatch(err.Error())
-			if len(match) != 2 || match[1] != nw.ID {
-				logrus.Warnf("could not remove cluster network %s: %v", nw.Name, err)
+		select {
+		case <-ctx.Done():
+			return rep, ctx.Err()
+		default:
+			if nw.Ingress {
+				// Routing-mesh network removal has to be explicitly invoked by user
+				continue
 			}
-			continue
+			if !until.IsZero() && nw.Created.After(until) {
+				continue
+			}
+			if !matchLabels(pruneFilters, nw.Labels) {
+				continue
+			}
+			// https://github.com/docker/docker/issues/24186
+			// `docker network inspect` unfortunately displays ONLY those containers that are local to that node.
+			// So we try to remove it anyway and check the error
+			err = cluster.RemoveNetwork(nw.ID)
+			if err != nil {
+				// we can safely ignore the "network .. is in use" error
+				match := networkIsInUse.FindStringSubmatch(err.Error())
+				if len(match) != 2 || match[1] != nw.ID {
+					logrus.Warnf("could not remove cluster network %s: %v", nw.Name, err)
+				}
+				continue
+			}
+			rep.NetworksDeleted = append(rep.NetworksDeleted, nw.Name)
 		}
-		rep.NetworksDeleted = append(rep.NetworksDeleted, nw.Name)
 	}
 	return rep, nil
 }
 
 // NetworksPrune removes unused networks
-func (daemon *Daemon) NetworksPrune(pruneFilters filters.Args) (*types.NetworksPruneReport, error) {
+func (daemon *Daemon) NetworksPrune(ctx context.Context, pruneFilters filters.Args) (*types.NetworksPruneReport, error) {
+	if !atomic.CompareAndSwapInt32(&daemon.pruneRunning, 0, 1) {
+		return nil, ErrPruneRunning
+	}
+	defer atomic.StoreInt32(&daemon.pruneRunning, 0)
+
 	if _, err := getUntilFromPruneFilters(pruneFilters); err != nil {
 		return nil, err
 	}
 
 	rep := &types.NetworksPruneReport{}
-	if clusterRep, err := daemon.clusterNetworksPrune(pruneFilters); err == nil {
+	if clusterRep, err := daemon.clusterNetworksPrune(ctx, pruneFilters); err == nil {
 		rep.NetworksDeleted = append(rep.NetworksDeleted, clusterRep.NetworksDeleted...)
 	}
 
-	localRep := daemon.localNetworksPrune(pruneFilters)
+	localRep := daemon.localNetworksPrune(ctx, pruneFilters)
 	rep.NetworksDeleted = append(rep.NetworksDeleted, localRep.NetworksDeleted...)
+
+	select {
+	case <-ctx.Done():
+		logrus.Warnf("NetworksPrune operation cancelled: %#v", *rep)
+		return nil, ctx.Err()
+	default:
+	}
+
 	return rep, nil
 }