Merge pull request #10977 from robertabbott/10959-progressreader

Moves progressreader from utils to its own package
This commit is contained in:
Alexander Morozov 2015-03-16 12:45:33 -07:00
commit 8ae20d8eba
9 changed files with 168 additions and 67 deletions

View file

@ -40,6 +40,7 @@ import (
"github.com/docker/docker/pkg/networkfs/resolvconf"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/symlink"
@ -232,7 +233,14 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
// FIXME: ProgressReader shouldn't be this annoying to use
if context != nil {
sf := utils.NewStreamFormatter(false)
body = utils.ProgressReader(context, 0, cli.out, sf, true, "", "Sending build context to Docker daemon")
body = progressreader.New(progressreader.Config{
In: context,
Out: cli.out,
Formatter: sf,
NewLines: true,
ID: "",
Action: "Sending build context to Docker daemon",
})
}
// Send the build context
v := &url.Values{}

View file

@ -28,6 +28,7 @@ import (
"github.com/docker/docker/pkg/common"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/symlink"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/pkg/tarsum"
@ -268,7 +269,15 @@ func calcCopyInfo(b *Builder, cmdName string, cInfos *[]*copyInfo, origPath stri
}
// Download and dump result to tmp file
if _, err := io.Copy(tmpFile, utils.ProgressReader(resp.Body, int(resp.ContentLength), b.OutOld, b.StreamFormatter, true, "", "Downloading")); err != nil {
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
In: resp.Body,
Out: b.OutOld,
Formatter: b.StreamFormatter,
Size: int(resp.ContentLength),
NewLines: true,
ID: "",
Action: "Downloading",
})); err != nil {
tmpFile.Close()
return err
}

View file

@ -18,6 +18,7 @@ import (
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/common"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/truncindex"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
@ -210,9 +211,17 @@ func (graph *Graph) TempLayerArchive(id string, sf *utils.StreamFormatter, outpu
if err != nil {
return nil, err
}
progress := utils.ProgressReader(a, 0, output, sf, false, common.TruncateID(id), "Buffering to disk")
defer progress.Close()
return archive.NewTempArchive(progress, tmp)
progressReader := progressreader.New(progressreader.Config{
In: a,
Out: output,
Formatter: sf,
Size: 0,
NewLines: false,
ID: common.TruncateID(id),
Action: "Buffering to disk",
})
defer progressReader.Close()
return archive.NewTempArchive(progressReader, tmp)
}
// Mktemp creates a temporary sub-directory inside the graph's filesystem.

View file

@ -9,6 +9,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
)
@ -48,7 +49,15 @@ func (s *TagStore) CmdImport(job *engine.Job) engine.Status {
if err != nil {
return job.Error(err)
}
progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
progressReader := progressreader.New(progressreader.Config{
In: resp.Body,
Out: job.Stdout,
Formatter: sf,
Size: int(resp.ContentLength),
NewLines: true,
ID: "",
Action: "Importing",
})
defer progressReader.Close()
archive = progressReader
}

View file

@ -14,6 +14,7 @@ import (
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/common"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
@ -337,7 +338,15 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
defer layer.Close()
err = s.graph.Register(img,
utils.ProgressReader(layer, imgSize, out, sf, false, common.TruncateID(id), "Downloading"))
progressreader.New(progressreader.Config{
In: layer,
Out: out,
Formatter: sf,
Size: imgSize,
NewLines: false,
ID: common.TruncateID(id),
Action: "Downloading",
}))
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
@ -496,7 +505,15 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err)
}
if _, err := io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, common.TruncateID(img.ID), "Downloading")); err != nil {
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
In: ioutil.NopCloser(tarSumReader),
Out: out,
Formatter: sf,
Size: int(l),
NewLines: false,
ID: common.TruncateID(img.ID),
Action: "Downloading",
})); err != nil {
return fmt.Errorf("unable to copy v2 image blob data: %s", err)
}
@ -548,7 +565,14 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
err = s.graph.Register(d.img,
utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, common.TruncateID(d.img.ID), "Extracting"))
progressreader.New(progressreader.Config{
In: d.tmpFile,
Out: out,
Formatter: sf,
Size: int(d.length),
ID: common.TruncateID(d.img.ID),
Action: "Extracting",
}))
if err != nil {
return false, err
}

View file

@ -16,6 +16,7 @@ import (
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/common"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
@ -258,7 +259,16 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin
// Send the layer
log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, common.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID,
progressreader.New(progressreader.Config{
In: layerData,
Out: out,
Formatter: sf,
Size: int(layerData.Size),
NewLines: false,
ID: common.TruncateID(imgData.ID),
Action: "Pushing",
}), ep, token, jsonRaw)
if err != nil {
return "", err
}
@ -459,7 +469,16 @@ func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *
// Send the layer
log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1], utils.ProgressReader(tf, int(size), out, sf, false, common.TruncateID(img.ID), "Pushing"), auth); err != nil {
if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1],
progressreader.New(progressreader.Config{
In: tf,
Out: out,
Formatter: sf,
Size: int(size),
NewLines: false,
ID: common.TruncateID(img.ID),
Action: "Pushing",
}), auth); err != nil {
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil))
return "", err
}

View file

@ -0,0 +1,69 @@
package progressreader
import (
"io"
)
type StreamFormatter interface {
FormatProg(string, string, interface{}) []byte
FormatStatus(string, string, ...interface{}) []byte
FormatError(error) []byte
}
type PR_JSONProgress interface {
GetCurrent() int
GetTotal() int
}
type JSONProg struct {
Current int
Total int
}
func (j *JSONProg) GetCurrent() int {
return j.Current
}
func (j *JSONProg) GetTotal() int {
return j.Total
}
// Reader with progress bar
type Config struct {
In io.ReadCloser // Stream to read from
Out io.Writer // Where to send progress bar to
Formatter StreamFormatter
Size int
Current int
LastUpdate int
NewLines bool
ID string
Action string
}
func New(newReader Config) *Config {
return &newReader
}
func (config *Config) Read(p []byte) (n int, err error) {
read, err := config.In.Read(p)
config.Current += read
updateEvery := 1024 * 512 //512kB
if config.Size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int(0.01 * float64(config.Size)); increment < updateEvery {
updateEvery = increment
}
}
if config.Current-config.LastUpdate > updateEvery || err != nil {
config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size}))
config.LastUpdate = config.Current
}
// Send newline when complete
if config.NewLines && err != nil && read == 0 {
config.Out.Write(config.Formatter.FormatStatus("", ""))
}
return read, err
}
func (config *Config) Close() error {
config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size}))
return config.In.Close()
}

View file

@ -1,55 +0,0 @@
package utils
import (
"io"
"time"
)
// Reader with progress bar
type progressReader struct {
reader io.ReadCloser // Stream to read from
output io.Writer // Where to send progress bar to
progress JSONProgress
lastUpdate int // How many bytes read at least update
ID string
action string
sf *StreamFormatter
newLine bool
}
func (r *progressReader) Read(p []byte) (n int, err error) {
read, err := r.reader.Read(p)
r.progress.Current += read
updateEvery := 1024 * 512 //512kB
if r.progress.Total > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery {
updateEvery = increment
}
}
if r.progress.Current-r.lastUpdate > updateEvery || err != nil {
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
r.lastUpdate = r.progress.Current
}
// Send newline when complete
if r.newLine && err != nil && read == 0 {
r.output.Write(r.sf.FormatStatus("", ""))
}
return read, err
}
func (r *progressReader) Close() error {
r.progress.Current = r.progress.Total
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
return r.reader.Close()
}
func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader {
return &progressReader{
reader: r,
output: NewWriteFlusher(output),
ID: ID,
action: action,
progress: JSONProgress{Total: size, Start: time.Now().UTC().Unix()},
sf: sf,
newLine: newline,
}
}

View file

@ -3,6 +3,7 @@ package utils
import (
"encoding/json"
"fmt"
"github.com/docker/docker/pkg/progressreader"
"io"
)
@ -54,7 +55,15 @@ func (sf *StreamFormatter) FormatError(err error) []byte {
}
return []byte("Error: " + err.Error() + streamNewline)
}
func (sf *StreamFormatter) FormatProg(id, action string, p interface{}) []byte {
switch progress := p.(type) {
case *JSONProgress:
return sf.FormatProgress(id, action, progress)
case progressreader.PR_JSONProgress:
return sf.FormatProgress(id, action, &JSONProgress{Current: progress.GetCurrent(), Total: progress.GetTotal()})
}
return nil
}
func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
if progress == nil {
progress = &JSONProgress{}