Browse Source

Merge pull request #2945 from dotcloud/refactor_stream

Refactor stream
Michael Crosby 11 years ago
parent
commit
e1414a4c39
8 changed files with 296 additions and 211 deletions
  1. 1 1
      commands.go
  2. 1 1
      graph.go
  3. 17 17
      server.go
  4. 125 0
      utils/jsonmessage.go
  5. 24 0
      utils/jsonmessage_test.go
  6. 56 0
      utils/progressreader.go
  7. 72 0
      utils/streamformatter.go
  8. 0 192
      utils/utils.go

+ 1 - 1
commands.go

@@ -206,7 +206,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
 	// FIXME: ProgressReader shouldn't be this annoying to use
 	if context != nil {
 		sf := utils.NewStreamFormatter(false)
-		body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("", "Uploading context", "%v bytes%0.0s%0.0s"), sf, true)
+		body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf, true, "", "Uploading context")
 	}
 	// Upload the build context
 	v := &url.Values{}

+ 1 - 1
graph.go

@@ -205,7 +205,7 @@ func (graph *Graph) TempLayerArchive(id string, compression archive.Compression,
 	if err != nil {
 		return nil, err
 	}
-	return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf.FormatProgress("", "Buffering to disk", "%v/%v (%v)"), sf, true), tmp)
+	return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf, true, "", "Buffering to disk"), tmp)
 }
 
 // Mktemp creates a temporary sub-directory inside the graph's filesystem.

+ 17 - 17
server.go

@@ -459,7 +459,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.
 		return err
 	}
 
-	if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("", "Downloading", "%8v/%v (%v)"), sf, false), path); err != nil {
+	if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, "", "Downloading"), path); err != nil {
 		return err
 	}
 	// FIXME: Handle custom repo, tag comment, author
@@ -769,7 +769,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
 	if err != nil {
 		return err
 	}
-	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling", "dependent layers"))
+	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
 	// FIXME: Try to stream the images?
 	// FIXME: Launch the getRemoteImage() in goroutines
 
@@ -784,33 +784,33 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
 		defer srv.poolRemove("pull", "layer:"+id)
 
 		if !srv.runtime.graph.Exists(id) {
-			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "metadata"))
+			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
 			imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token)
 			if err != nil {
-				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
+				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
 				// FIXME: Keep going in case of error?
 				return err
 			}
 			img, err := NewImgJSON(imgJSON)
 			if err != nil {
-				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
+				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
 				return fmt.Errorf("Failed to parse json: %s", err)
 			}
 
 			// Get the layer
-			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "fs layer"))
+			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil))
 			layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
 			if err != nil {
-				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
+				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
 				return err
 			}
 			defer layer.Close()
-			if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf, false), img); err != nil {
-				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "downloading dependent layers"))
+			if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil {
+				out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
 				return err
 			}
 		}
-		out.Write(sf.FormatProgress(utils.TruncateID(id), "Download", "complete"))
+		out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
 
 	}
 	return nil
@@ -883,29 +883,29 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
 			}
 			defer srv.poolRemove("pull", "img:"+img.ID)
 
-			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s", img.Tag, localName)))
+			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
 			success := false
 			var lastErr error
 			for _, ep := range repoData.Endpoints {
-				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s", img.Tag, localName, ep)))
+				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
 				if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
 					// Its not ideal that only the last error  is returned, it would be better to concatenate the errors.
 					// As the error is also given to the output stream the user will see the error.
 					lastErr = err
-					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err)))
+					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
 					continue
 				}
 				success = true
 				break
 			}
 			if !success {
-				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, %s", img.Tag, localName, lastErr)))
+				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil))
 				if parallel {
 					errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
 					return
 				}
 			}
-			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download", "complete"))
+			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
 
 			if parallel {
 				errors <- nil
@@ -1172,7 +1172,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
 	defer os.RemoveAll(layerData.Name())
 
 	// Send the layer
-	checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf, false), ep, token, jsonRaw)
+	checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, "", "Pushing"), ep, token, jsonRaw)
 	if err != nil {
 		return "", err
 	}
@@ -1252,7 +1252,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
 		if err != nil {
 			return err
 		}
-		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("", "Importing", "%8v/%v (%v)"), sf, true)
+		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
 	}
 	img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
 	if err != nil {

+ 125 - 0
utils/jsonmessage.go

@@ -0,0 +1,125 @@
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"strings"
+	"time"
+)
+
+type JSONError struct {
+	Code    int    `json:"code,omitempty"`
+	Message string `json:"message,omitempty"`
+}
+
+func (e *JSONError) Error() string {
+	return e.Message
+}
+
+type JSONProgress struct {
+	Current int   `json:"current,omitempty"`
+	Total   int   `json:"total,omitempty"`
+	Start   int64 `json:"start,omitempty"`
+}
+
+func (p *JSONProgress) String() string {
+	if p.Current == 0 && p.Total == 0 {
+		return ""
+	}
+	current := HumanSize(int64(p.Current))
+	if p.Total == 0 {
+		return fmt.Sprintf("%8v/?", current)
+	}
+	total := HumanSize(int64(p.Total))
+	percentage := int(float64(p.Current)/float64(p.Total)*100) / 2
+
+	fromStart := time.Now().UTC().Sub(time.Unix(int64(p.Start), 0))
+	perEntry := fromStart / time.Duration(p.Current)
+	left := time.Duration(p.Total-p.Current) * perEntry
+	left = (left / time.Second) * time.Second
+	return fmt.Sprintf("[%s>%s] %8v/%v %s", strings.Repeat("=", percentage), strings.Repeat(" ", 50-percentage), current, total, left.String())
+}
+
+type JSONMessage struct {
+	Status          string        `json:"status,omitempty"`
+	Progress        *JSONProgress `json:"progressDetail,omitempty"`
+	ProgressMessage string        `json:"progress,omitempty"` //deprecated
+	ID              string        `json:"id,omitempty"`
+	From            string        `json:"from,omitempty"`
+	Time            int64         `json:"time,omitempty"`
+	Error           *JSONError    `json:"errorDetail,omitempty"`
+	ErrorMessage    string        `json:"error,omitempty"` //deprecated
+}
+
+func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
+	if jm.Error != nil {
+		if jm.Error.Code == 401 {
+			return fmt.Errorf("Authentication is required.")
+		}
+		return jm.Error
+	}
+	endl := ""
+	if isTerminal {
+		// <ESC>[2K = erase entire current line
+		fmt.Fprintf(out, "%c[2K\r", 27)
+		endl = "\r"
+	}
+	if jm.Time != 0 {
+		fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
+	}
+	if jm.ID != "" {
+		fmt.Fprintf(out, "%s: ", jm.ID)
+	}
+	if jm.From != "" {
+		fmt.Fprintf(out, "(from %s) ", jm.From)
+	}
+	if jm.Progress != nil {
+		fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
+	} else if jm.ProgressMessage != "" { //deprecated
+		fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl)
+	} else {
+		fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
+	}
+	return nil
+}
+
+func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error {
+	dec := json.NewDecoder(in)
+	ids := make(map[string]int)
+	diff := 0
+	for {
+		jm := JSONMessage{}
+		if err := dec.Decode(&jm); err == io.EOF {
+			break
+		} else if err != nil {
+			return err
+		}
+		if (jm.Progress != nil || jm.ProgressMessage != "") && jm.ID != "" {
+			line, ok := ids[jm.ID]
+			if !ok {
+				line = len(ids)
+				ids[jm.ID] = line
+				fmt.Fprintf(out, "\n")
+				diff = 0
+			} else {
+				diff = len(ids) - line
+			}
+			if isTerminal {
+				// <ESC>[{diff}A = move cursor up diff rows
+				fmt.Fprintf(out, "%c[%dA", 27, diff)
+			}
+		}
+		err := jm.Display(out, isTerminal)
+		if jm.ID != "" {
+			if isTerminal {
+				// <ESC>[{diff}B = move cursor down diff rows
+				fmt.Fprintf(out, "%c[%dB", 27, diff)
+			}
+		}
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 24 - 0
utils/jsonmessage_test.go

@@ -0,0 +1,24 @@
+package utils
+
+import (
+	"testing"
+)
+
+func TestError(t *testing.T) {
+	je := JSONError{404, "Not found"}
+	if je.Error() != "Not found" {
+		t.Fatalf("Expected 'Not found' got '%s'", je.Error())
+	}
+}
+
+func TestProgress(t *testing.T) {
+	jp := JSONProgress{0, 0, 0}
+	if jp.String() != "" {
+		t.Fatalf("Expected empty string, got '%s'", jp.String())
+	}
+
+	jp2 := JSONProgress{1, 0, 0}
+	if jp2.String() != "     1 B/?" {
+		t.Fatalf("Expected '     1/?', got '%s'", jp2.String())
+	}
+}

+ 56 - 0
utils/progressreader.go

@@ -0,0 +1,56 @@
+package utils
+
+import (
+	"io"
+	"time"
+)
+
+// Reader with progress bar
+type progressReader struct {
+	reader   io.ReadCloser // Stream to read from
+	output   io.Writer     // Where to send progress bar to
+	progress JSONProgress
+	//	readTotal    int    // Expected stream length (bytes)
+	//	readProgress int    // How much has been read so far (bytes)
+	lastUpdate int // How many bytes read at least update
+	ID         string
+	action     string
+	//	template   string // Template to print. Default "%v/%v (%v)"
+	sf      *StreamFormatter
+	newLine bool
+}
+
+func (r *progressReader) Read(p []byte) (n int, err error) {
+	read, err := io.ReadCloser(r.reader).Read(p)
+	r.progress.Current += read
+	updateEvery := 1024 * 512 //512kB
+	if r.progress.Total > 0 {
+		// Update progress for every 1% read if 1% < 512kB
+		if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery {
+			updateEvery = increment
+		}
+	}
+	if r.progress.Current-r.lastUpdate > updateEvery || err != nil {
+		r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
+		r.lastUpdate = r.progress.Current
+	}
+	// Send newline when complete
+	if r.newLine && err != nil {
+		r.output.Write(r.sf.FormatStatus("", ""))
+	}
+	return read, err
+}
+func (r *progressReader) Close() error {
+	return io.ReadCloser(r.reader).Close()
+}
+func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader {
+	return &progressReader{
+		reader:   r,
+		output:   NewWriteFlusher(output),
+		ID:       ID,
+		action:   action,
+		progress: JSONProgress{Total: size, Start: time.Now().UTC().Unix()},
+		sf:       sf,
+		newLine:  newline,
+	}
+}

+ 72 - 0
utils/streamformatter.go

@@ -0,0 +1,72 @@
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+)
+
+type StreamFormatter struct {
+	json bool
+	used bool
+}
+
+func NewStreamFormatter(json bool) *StreamFormatter {
+	return &StreamFormatter{json, false}
+}
+
+func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
+	sf.used = true
+	str := fmt.Sprintf(format, a...)
+	if sf.json {
+		b, err := json.Marshal(&JSONMessage{ID: id, Status: str})
+		if err != nil {
+			return sf.FormatError(err)
+		}
+		return b
+	}
+	return []byte(str + "\r\n")
+}
+
+func (sf *StreamFormatter) FormatError(err error) []byte {
+	sf.used = true
+	if sf.json {
+		jsonError, ok := err.(*JSONError)
+		if !ok {
+			jsonError = &JSONError{Message: err.Error()}
+		}
+		if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
+			return b
+		}
+		return []byte("{\"error\":\"format error\"}")
+	}
+	return []byte("Error: " + err.Error() + "\r\n")
+}
+
+func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
+	if progress == nil {
+		progress = &JSONProgress{}
+	}
+	sf.used = true
+	if sf.json {
+
+		b, err := json.Marshal(&JSONMessage{
+			Status:          action,
+			ProgressMessage: progress.String(),
+			Progress:        progress,
+			ID:              id,
+		})
+		if err != nil {
+			return nil
+		}
+		return b
+	}
+	endl := "\r"
+	if progress.String() == "" {
+		endl += "\n"
+	}
+	return []byte(action + " " + progress.String() + endl)
+}
+
+func (sf *StreamFormatter) Used() bool {
+	return sf.used
+}

+ 0 - 192
utils/utils.go

@@ -94,56 +94,6 @@ func Errorf(format string, a ...interface{}) {
 	logf("error", format, a...)
 }
 
-// Reader with progress bar
-type progressReader struct {
-	reader       io.ReadCloser // Stream to read from
-	output       io.Writer     // Where to send progress bar to
-	readTotal    int           // Expected stream length (bytes)
-	readProgress int           // How much has been read so far (bytes)
-	lastUpdate   int           // How many bytes read at least update
-	template     string        // Template to print. Default "%v/%v (%v)"
-	sf           *StreamFormatter
-	newLine      bool
-}
-
-func (r *progressReader) Read(p []byte) (n int, err error) {
-	read, err := io.ReadCloser(r.reader).Read(p)
-	r.readProgress += read
-	updateEvery := 1024 * 512 //512kB
-	if r.readTotal > 0 {
-		// Update progress for every 1% read if 1% < 512kB
-		if increment := int(0.01 * float64(r.readTotal)); increment < updateEvery {
-			updateEvery = increment
-		}
-	}
-	if r.readProgress-r.lastUpdate > updateEvery || err != nil {
-		if r.readTotal > 0 {
-			fmt.Fprintf(r.output, r.template, HumanSize(int64(r.readProgress)), HumanSize(int64(r.readTotal)), fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100))
-		} else {
-			fmt.Fprintf(r.output, r.template, r.readProgress, "?", "n/a")
-		}
-		r.lastUpdate = r.readProgress
-	}
-	// Send newline when complete
-	if r.newLine && err != nil {
-		r.output.Write(r.sf.FormatStatus("", ""))
-	}
-	return read, err
-}
-func (r *progressReader) Close() error {
-	return io.ReadCloser(r.reader).Close()
-}
-func ProgressReader(r io.ReadCloser, size int, output io.Writer, tpl []byte, sf *StreamFormatter, newline bool) *progressReader {
-	return &progressReader{
-		reader:    r,
-		output:    NewWriteFlusher(output),
-		readTotal: size,
-		template:  string(tpl),
-		sf:        sf,
-		newLine:   newline,
-	}
-}
-
 // HumanDuration returns a human-readable approximation of a duration
 // (eg. "About a minute", "4 hours ago", etc.)
 func HumanDuration(d time.Duration) string {
@@ -754,25 +704,6 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher {
 	return &WriteFlusher{w: w, flusher: flusher}
 }
 
-type JSONError struct {
-	Code    int    `json:"code,omitempty"`
-	Message string `json:"message,omitempty"`
-}
-
-type JSONMessage struct {
-	Status       string     `json:"status,omitempty"`
-	Progress     string     `json:"progress,omitempty"`
-	ErrorMessage string     `json:"error,omitempty"` //deprecated
-	ID           string     `json:"id,omitempty"`
-	From         string     `json:"from,omitempty"`
-	Time         int64      `json:"time,omitempty"`
-	Error        *JSONError `json:"errorDetail,omitempty"`
-}
-
-func (e *JSONError) Error() string {
-	return e.Message
-}
-
 func NewHTTPRequestError(msg string, res *http.Response) error {
 	return &JSONError{
 		Message: msg,
@@ -780,129 +711,6 @@ func NewHTTPRequestError(msg string, res *http.Response) error {
 	}
 }
 
-func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
-	if jm.Error != nil {
-		if jm.Error.Code == 401 {
-			return fmt.Errorf("Authentication is required.")
-		}
-		return jm.Error
-	}
-	endl := ""
-	if isTerminal {
-		// <ESC>[2K = erase entire current line
-		fmt.Fprintf(out, "%c[2K\r", 27)
-		endl = "\r"
-	}
-	if jm.Time != 0 {
-		fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
-	}
-	if jm.ID != "" {
-		fmt.Fprintf(out, "%s: ", jm.ID)
-	}
-	if jm.From != "" {
-		fmt.Fprintf(out, "(from %s) ", jm.From)
-	}
-	if jm.Progress != "" {
-		fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress, endl)
-	} else {
-		fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
-	}
-	return nil
-}
-
-func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error {
-	dec := json.NewDecoder(in)
-	ids := make(map[string]int)
-	diff := 0
-	for {
-		jm := JSONMessage{}
-		if err := dec.Decode(&jm); err == io.EOF {
-			break
-		} else if err != nil {
-			return err
-		}
-		if jm.Progress != "" && jm.ID != "" {
-			line, ok := ids[jm.ID]
-			if !ok {
-				line = len(ids)
-				ids[jm.ID] = line
-				fmt.Fprintf(out, "\n")
-				diff = 0
-			} else {
-				diff = len(ids) - line
-			}
-			if isTerminal {
-				// <ESC>[{diff}A = move cursor up diff rows
-				fmt.Fprintf(out, "%c[%dA", 27, diff)
-			}
-		}
-		err := jm.Display(out, isTerminal)
-		if jm.ID != "" {
-			if isTerminal {
-				// <ESC>[{diff}B = move cursor down diff rows
-				fmt.Fprintf(out, "%c[%dB", 27, diff)
-			}
-		}
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-type StreamFormatter struct {
-	json bool
-	used bool
-}
-
-func NewStreamFormatter(json bool) *StreamFormatter {
-	return &StreamFormatter{json, false}
-}
-
-func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
-	sf.used = true
-	str := fmt.Sprintf(format, a...)
-	if sf.json {
-		b, err := json.Marshal(&JSONMessage{ID: id, Status: str})
-		if err != nil {
-			return sf.FormatError(err)
-		}
-		return b
-	}
-	return []byte(str + "\r\n")
-}
-
-func (sf *StreamFormatter) FormatError(err error) []byte {
-	sf.used = true
-	if sf.json {
-		jsonError, ok := err.(*JSONError)
-		if !ok {
-			jsonError = &JSONError{Message: err.Error()}
-		}
-		if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
-			return b
-		}
-		return []byte("{\"error\":\"format error\"}")
-	}
-	return []byte("Error: " + err.Error() + "\r\n")
-}
-
-func (sf *StreamFormatter) FormatProgress(id, action, progress string) []byte {
-	sf.used = true
-	if sf.json {
-		b, err := json.Marshal(&JSONMessage{Status: action, Progress: progress, ID: id})
-		if err != nil {
-			return nil
-		}
-		return b
-	}
-	return []byte(action + " " + progress + "\r")
-}
-
-func (sf *StreamFormatter) Used() bool {
-	return sf.used
-}
-
 func IsURL(str string) bool {
 	return strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://")
 }