progress.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package utils // import "github.com/docker/docker/distribution/utils"
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "os"
  7. "syscall"
  8. "github.com/containerd/log"
  9. "github.com/docker/docker/pkg/progress"
  10. "github.com/docker/docker/pkg/streamformatter"
  11. )
  12. // WriteDistributionProgress is a helper for writing progress from chan to JSON
  13. // stream with an optional cancel function.
  14. func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) {
  15. progressOutput := streamformatter.NewJSONProgressOutput(outStream, false)
  16. operationCancelled := false
  17. for prog := range progressChan {
  18. if err := progressOutput.WriteProgress(prog); err != nil && !operationCancelled {
  19. // don't log broken pipe errors as this is the normal case when a client aborts
  20. if isBrokenPipe(err) {
  21. log.G(context.TODO()).Info("Pull session cancelled")
  22. } else {
  23. log.G(context.TODO()).Errorf("error writing progress to client: %v", err)
  24. }
  25. cancelFunc()
  26. operationCancelled = true
  27. // Don't return, because we need to continue draining
  28. // progressChan until it's closed to avoid a deadlock.
  29. }
  30. }
  31. }
  32. func isBrokenPipe(e error) bool {
  33. if netErr, ok := e.(*net.OpError); ok {
  34. e = netErr.Err
  35. if sysErr, ok := netErr.Err.(*os.SyscallError); ok {
  36. e = sysErr.Err
  37. }
  38. }
  39. return e == syscall.EPIPE
  40. }