split in 3 files
This commit is contained in:
parent
377817db1b
commit
597e0e69b4
7 changed files with 260 additions and 211 deletions
|
@ -202,7 +202,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
|
|||
// FIXME: ProgressReader shouldn't be this annoying to use
|
||||
if context != nil {
|
||||
sf := utils.NewStreamFormatter(false)
|
||||
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("", "Uploading context", "%v bytes%0.0s%0.0s"), sf, true)
|
||||
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf, true, "", "Uploading context")
|
||||
}
|
||||
// Upload the build context
|
||||
v := &url.Values{}
|
||||
|
|
2
graph.go
2
graph.go
|
@ -205,7 +205,7 @@ func (graph *Graph) TempLayerArchive(id string, compression archive.Compression,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf.FormatProgress("", "Buffering to disk", "%v/%v (%v)"), sf, true), tmp)
|
||||
return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf, true, "", "Buffering to disk"), tmp)
|
||||
}
|
||||
|
||||
// Mktemp creates a temporary sub-directory inside the graph's filesystem.
|
||||
|
|
34
server.go
34
server.go
|
@ -451,7 +451,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.
|
|||
return err
|
||||
}
|
||||
|
||||
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("", "Downloading", "%8v/%v (%v)"), sf, false), path); err != nil {
|
||||
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, "", "Downloading"), path); err != nil {
|
||||
return err
|
||||
}
|
||||
// FIXME: Handle custom repo, tag comment, author
|
||||
|
@ -761,7 +761,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling", "dependent layers"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
|
||||
// FIXME: Try to stream the images?
|
||||
// FIXME: Launch the getRemoteImage() in goroutines
|
||||
|
||||
|
@ -776,33 +776,33 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
|
|||
defer srv.poolRemove("pull", "layer:"+id)
|
||||
|
||||
if !srv.runtime.graph.Exists(id) {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "metadata"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
|
||||
imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token)
|
||||
if err != nil {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
||||
// FIXME: Keep going in case of error?
|
||||
return err
|
||||
}
|
||||
img, err := NewImgJSON(imgJSON)
|
||||
if err != nil {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
||||
return fmt.Errorf("Failed to parse json: %s", err)
|
||||
}
|
||||
|
||||
// Get the layer
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "fs layer"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil))
|
||||
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
|
||||
if err != nil {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
||||
return err
|
||||
}
|
||||
defer layer.Close()
|
||||
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf, false), img); err != nil {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "downloading dependent layers"))
|
||||
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
|
||||
return err
|
||||
}
|
||||
}
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download", "complete"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
|
||||
|
||||
}
|
||||
return nil
|
||||
|
@ -875,29 +875,29 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
|
|||
}
|
||||
defer srv.poolRemove("pull", "img:"+img.ID)
|
||||
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s", img.Tag, localName)))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
|
||||
success := false
|
||||
var lastErr error
|
||||
for _, ep := range repoData.Endpoints {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s", img.Tag, localName, ep)))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
|
||||
if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
||||
// Its not ideal that only the last error is returned, it would be better to concatenate the errors.
|
||||
// As the error is also given to the output stream the user will see the error.
|
||||
lastErr = err
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err)))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
break
|
||||
}
|
||||
if !success {
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, %s", img.Tag, localName, lastErr)))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil))
|
||||
if parallel {
|
||||
errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
|
||||
return
|
||||
}
|
||||
}
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download", "complete"))
|
||||
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
||||
|
||||
if parallel {
|
||||
errors <- nil
|
||||
|
@ -1171,7 +1171,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
|
|||
defer os.RemoveAll(layerData.Name())
|
||||
|
||||
// Send the layer
|
||||
checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf, false), ep, token, jsonRaw)
|
||||
checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, "", "Pushing"), ep, token, jsonRaw)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -1251,7 +1251,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("", "Importing", "%8v/%v (%v)"), sf, true)
|
||||
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
|
||||
}
|
||||
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
|
||||
if err != nil {
|
||||
|
|
118
utils/jsonmessage.go
Normal file
118
utils/jsonmessage.go
Normal file
|
@ -0,0 +1,118 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JSONError struct {
|
||||
Code int `json:"code,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
func (e *JSONError) Error() string {
|
||||
return e.Message
|
||||
}
|
||||
|
||||
type JSONProgress struct {
|
||||
Current int `json:"current,omitempty"`
|
||||
Total int `json:"total,omitempty"`
|
||||
}
|
||||
|
||||
func (p *JSONProgress) String() string {
|
||||
if p.Current == 0 && p.Total == 0 {
|
||||
return ""
|
||||
}
|
||||
current := HumanSize(int64(p.Current))
|
||||
if p.Total == 0 {
|
||||
return fmt.Sprintf("%8v/?", current)
|
||||
}
|
||||
total := HumanSize(int64(p.Total))
|
||||
percentage := float64(p.Current) / float64(p.Total) * 100
|
||||
return fmt.Sprintf("%8v/%v (%.0f%%)", current, total, percentage)
|
||||
}
|
||||
|
||||
type JSONMessage struct {
|
||||
Status string `json:"status,omitempty"`
|
||||
Progress *JSONProgress `json:"progressDetail,omitempty"`
|
||||
ProgressMessage string `json:"progress,omitempty"` //deprecated
|
||||
ID string `json:"id,omitempty"`
|
||||
From string `json:"from,omitempty"`
|
||||
Time int64 `json:"time,omitempty"`
|
||||
Error *JSONError `json:"errorDetail,omitempty"`
|
||||
ErrorMessage string `json:"error,omitempty"` //deprecated
|
||||
}
|
||||
|
||||
func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
|
||||
if jm.Error != nil {
|
||||
if jm.Error.Code == 401 {
|
||||
return fmt.Errorf("Authentication is required.")
|
||||
}
|
||||
return jm.Error
|
||||
}
|
||||
endl := ""
|
||||
if isTerminal {
|
||||
// <ESC>[2K = erase entire current line
|
||||
fmt.Fprintf(out, "%c[2K\r", 27)
|
||||
endl = "\r"
|
||||
}
|
||||
if jm.Time != 0 {
|
||||
fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
|
||||
}
|
||||
if jm.ID != "" {
|
||||
fmt.Fprintf(out, "%s: ", jm.ID)
|
||||
}
|
||||
if jm.From != "" {
|
||||
fmt.Fprintf(out, "(from %s) ", jm.From)
|
||||
}
|
||||
if jm.Progress != nil {
|
||||
fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
|
||||
} else if jm.ProgressMessage != "" { //deprecated
|
||||
fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl)
|
||||
} else {
|
||||
fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error {
|
||||
dec := json.NewDecoder(in)
|
||||
ids := make(map[string]int)
|
||||
diff := 0
|
||||
for {
|
||||
jm := JSONMessage{}
|
||||
if err := dec.Decode(&jm); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
if (jm.Progress != nil || jm.ProgressMessage != "") && jm.ID != "" {
|
||||
line, ok := ids[jm.ID]
|
||||
if !ok {
|
||||
line = len(ids)
|
||||
ids[jm.ID] = line
|
||||
fmt.Fprintf(out, "\n")
|
||||
diff = 0
|
||||
} else {
|
||||
diff = len(ids) - line
|
||||
}
|
||||
if isTerminal {
|
||||
// <ESC>[{diff}A = move cursor up diff rows
|
||||
fmt.Fprintf(out, "%c[%dA", 27, diff)
|
||||
}
|
||||
}
|
||||
err := jm.Display(out, isTerminal)
|
||||
if jm.ID != "" {
|
||||
if isTerminal {
|
||||
// <ESC>[{diff}B = move cursor down diff rows
|
||||
fmt.Fprintf(out, "%c[%dB", 27, diff)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
55
utils/progressreader.go
Normal file
55
utils/progressreader.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// Reader with progress bar
|
||||
type progressReader struct {
|
||||
reader io.ReadCloser // Stream to read from
|
||||
output io.Writer // Where to send progress bar to
|
||||
progress JSONProgress
|
||||
// readTotal int // Expected stream length (bytes)
|
||||
// readProgress int // How much has been read so far (bytes)
|
||||
lastUpdate int // How many bytes read at least update
|
||||
ID string
|
||||
action string
|
||||
// template string // Template to print. Default "%v/%v (%v)"
|
||||
sf *StreamFormatter
|
||||
newLine bool
|
||||
}
|
||||
|
||||
func (r *progressReader) Read(p []byte) (n int, err error) {
|
||||
read, err := io.ReadCloser(r.reader).Read(p)
|
||||
r.progress.Current += read
|
||||
updateEvery := 1024 * 512 //512kB
|
||||
if r.progress.Total > 0 {
|
||||
// Update progress for every 1% read if 1% < 512kB
|
||||
if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery {
|
||||
updateEvery = increment
|
||||
}
|
||||
}
|
||||
if r.progress.Current-r.lastUpdate > updateEvery || err != nil {
|
||||
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
|
||||
r.lastUpdate = r.progress.Current
|
||||
}
|
||||
// Send newline when complete
|
||||
if r.newLine && err != nil {
|
||||
r.output.Write(r.sf.FormatStatus("", ""))
|
||||
}
|
||||
return read, err
|
||||
}
|
||||
func (r *progressReader) Close() error {
|
||||
return io.ReadCloser(r.reader).Close()
|
||||
}
|
||||
func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader {
|
||||
return &progressReader{
|
||||
reader: r,
|
||||
output: NewWriteFlusher(output),
|
||||
ID: ID,
|
||||
action: action,
|
||||
progress: JSONProgress{Total: size},
|
||||
sf: sf,
|
||||
newLine: newline,
|
||||
}
|
||||
}
|
68
utils/streamformatter.go
Normal file
68
utils/streamformatter.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type StreamFormatter struct {
|
||||
json bool
|
||||
used bool
|
||||
}
|
||||
|
||||
func NewStreamFormatter(json bool) *StreamFormatter {
|
||||
return &StreamFormatter{json, false}
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
|
||||
sf.used = true
|
||||
str := fmt.Sprintf(format, a...)
|
||||
if sf.json {
|
||||
b, err := json.Marshal(&JSONMessage{ID: id, Status: str})
|
||||
if err != nil {
|
||||
return sf.FormatError(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
return []byte(str + "\r\n")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatError(err error) []byte {
|
||||
sf.used = true
|
||||
if sf.json {
|
||||
jsonError, ok := err.(*JSONError)
|
||||
if !ok {
|
||||
jsonError = &JSONError{Message: err.Error()}
|
||||
}
|
||||
if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
|
||||
return b
|
||||
}
|
||||
return []byte("{\"error\":\"format error\"}")
|
||||
}
|
||||
return []byte("Error: " + err.Error() + "\r\n")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
|
||||
if progress == nil {
|
||||
progress = &JSONProgress{}
|
||||
}
|
||||
sf.used = true
|
||||
if sf.json {
|
||||
|
||||
b, err := json.Marshal(&JSONMessage{
|
||||
Status: action,
|
||||
ProgressMessage: progress.String(),
|
||||
Progress: progress,
|
||||
ID: id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return b
|
||||
}
|
||||
return []byte(action + " " + progress.String() + "\r")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) Used() bool {
|
||||
return sf.used
|
||||
}
|
192
utils/utils.go
192
utils/utils.go
|
@ -94,56 +94,6 @@ func Errorf(format string, a ...interface{}) {
|
|||
logf("error", format, a...)
|
||||
}
|
||||
|
||||
// Reader with progress bar
|
||||
type progressReader struct {
|
||||
reader io.ReadCloser // Stream to read from
|
||||
output io.Writer // Where to send progress bar to
|
||||
readTotal int // Expected stream length (bytes)
|
||||
readProgress int // How much has been read so far (bytes)
|
||||
lastUpdate int // How many bytes read at least update
|
||||
template string // Template to print. Default "%v/%v (%v)"
|
||||
sf *StreamFormatter
|
||||
newLine bool
|
||||
}
|
||||
|
||||
func (r *progressReader) Read(p []byte) (n int, err error) {
|
||||
read, err := io.ReadCloser(r.reader).Read(p)
|
||||
r.readProgress += read
|
||||
updateEvery := 1024 * 512 //512kB
|
||||
if r.readTotal > 0 {
|
||||
// Update progress for every 1% read if 1% < 512kB
|
||||
if increment := int(0.01 * float64(r.readTotal)); increment < updateEvery {
|
||||
updateEvery = increment
|
||||
}
|
||||
}
|
||||
if r.readProgress-r.lastUpdate > updateEvery || err != nil {
|
||||
if r.readTotal > 0 {
|
||||
fmt.Fprintf(r.output, r.template, HumanSize(int64(r.readProgress)), HumanSize(int64(r.readTotal)), fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100))
|
||||
} else {
|
||||
fmt.Fprintf(r.output, r.template, r.readProgress, "?", "n/a")
|
||||
}
|
||||
r.lastUpdate = r.readProgress
|
||||
}
|
||||
// Send newline when complete
|
||||
if r.newLine && err != nil {
|
||||
r.output.Write(r.sf.FormatStatus("", ""))
|
||||
}
|
||||
return read, err
|
||||
}
|
||||
func (r *progressReader) Close() error {
|
||||
return io.ReadCloser(r.reader).Close()
|
||||
}
|
||||
func ProgressReader(r io.ReadCloser, size int, output io.Writer, tpl []byte, sf *StreamFormatter, newline bool) *progressReader {
|
||||
return &progressReader{
|
||||
reader: r,
|
||||
output: NewWriteFlusher(output),
|
||||
readTotal: size,
|
||||
template: string(tpl),
|
||||
sf: sf,
|
||||
newLine: newline,
|
||||
}
|
||||
}
|
||||
|
||||
// HumanDuration returns a human-readable approximation of a duration
|
||||
// (eg. "About a minute", "4 hours ago", etc.)
|
||||
func HumanDuration(d time.Duration) string {
|
||||
|
@ -754,25 +704,6 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher {
|
|||
return &WriteFlusher{w: w, flusher: flusher}
|
||||
}
|
||||
|
||||
type JSONError struct {
|
||||
Code int `json:"code,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
type JSONMessage struct {
|
||||
Status string `json:"status,omitempty"`
|
||||
Progress string `json:"progress,omitempty"`
|
||||
ErrorMessage string `json:"error,omitempty"` //deprecated
|
||||
ID string `json:"id,omitempty"`
|
||||
From string `json:"from,omitempty"`
|
||||
Time int64 `json:"time,omitempty"`
|
||||
Error *JSONError `json:"errorDetail,omitempty"`
|
||||
}
|
||||
|
||||
func (e *JSONError) Error() string {
|
||||
return e.Message
|
||||
}
|
||||
|
||||
func NewHTTPRequestError(msg string, res *http.Response) error {
|
||||
return &JSONError{
|
||||
Message: msg,
|
||||
|
@ -780,129 +711,6 @@ func NewHTTPRequestError(msg string, res *http.Response) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
|
||||
if jm.Error != nil {
|
||||
if jm.Error.Code == 401 {
|
||||
return fmt.Errorf("Authentication is required.")
|
||||
}
|
||||
return jm.Error
|
||||
}
|
||||
endl := ""
|
||||
if isTerminal {
|
||||
// <ESC>[2K = erase entire current line
|
||||
fmt.Fprintf(out, "%c[2K\r", 27)
|
||||
endl = "\r"
|
||||
}
|
||||
if jm.Time != 0 {
|
||||
fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
|
||||
}
|
||||
if jm.ID != "" {
|
||||
fmt.Fprintf(out, "%s: ", jm.ID)
|
||||
}
|
||||
if jm.From != "" {
|
||||
fmt.Fprintf(out, "(from %s) ", jm.From)
|
||||
}
|
||||
if jm.Progress != "" {
|
||||
fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress, endl)
|
||||
} else {
|
||||
fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error {
|
||||
dec := json.NewDecoder(in)
|
||||
ids := make(map[string]int)
|
||||
diff := 0
|
||||
for {
|
||||
jm := JSONMessage{}
|
||||
if err := dec.Decode(&jm); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
if jm.Progress != "" && jm.ID != "" {
|
||||
line, ok := ids[jm.ID]
|
||||
if !ok {
|
||||
line = len(ids)
|
||||
ids[jm.ID] = line
|
||||
fmt.Fprintf(out, "\n")
|
||||
diff = 0
|
||||
} else {
|
||||
diff = len(ids) - line
|
||||
}
|
||||
if isTerminal {
|
||||
// <ESC>[{diff}A = move cursor up diff rows
|
||||
fmt.Fprintf(out, "%c[%dA", 27, diff)
|
||||
}
|
||||
}
|
||||
err := jm.Display(out, isTerminal)
|
||||
if jm.ID != "" {
|
||||
if isTerminal {
|
||||
// <ESC>[{diff}B = move cursor down diff rows
|
||||
fmt.Fprintf(out, "%c[%dB", 27, diff)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type StreamFormatter struct {
|
||||
json bool
|
||||
used bool
|
||||
}
|
||||
|
||||
func NewStreamFormatter(json bool) *StreamFormatter {
|
||||
return &StreamFormatter{json, false}
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
|
||||
sf.used = true
|
||||
str := fmt.Sprintf(format, a...)
|
||||
if sf.json {
|
||||
b, err := json.Marshal(&JSONMessage{ID: id, Status: str})
|
||||
if err != nil {
|
||||
return sf.FormatError(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
return []byte(str + "\r\n")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatError(err error) []byte {
|
||||
sf.used = true
|
||||
if sf.json {
|
||||
jsonError, ok := err.(*JSONError)
|
||||
if !ok {
|
||||
jsonError = &JSONError{Message: err.Error()}
|
||||
}
|
||||
if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
|
||||
return b
|
||||
}
|
||||
return []byte("{\"error\":\"format error\"}")
|
||||
}
|
||||
return []byte("Error: " + err.Error() + "\r\n")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) FormatProgress(id, action, progress string) []byte {
|
||||
sf.used = true
|
||||
if sf.json {
|
||||
b, err := json.Marshal(&JSONMessage{Status: action, Progress: progress, ID: id})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return b
|
||||
}
|
||||
return []byte(action + " " + progress + "\r")
|
||||
}
|
||||
|
||||
func (sf *StreamFormatter) Used() bool {
|
||||
return sf.used
|
||||
}
|
||||
|
||||
func IsURL(str string) bool {
|
||||
return strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue