utils.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package docker
  2. import (
  3. "bytes"
  4. "container/list"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "sync"
  10. )
  11. func Trunc(s string, maxlen int) string {
  12. if len(s) <= maxlen {
  13. return s
  14. }
  15. return s[:maxlen]
  16. }
  17. // Figure out the absolute path of our own binary
  18. func SelfPath() string {
  19. path, err := exec.LookPath(os.Args[0])
  20. if err != nil {
  21. panic(err)
  22. }
  23. path, err = filepath.Abs(path)
  24. if err != nil {
  25. panic(err)
  26. }
  27. return path
  28. }
  29. type nopWriteCloser struct {
  30. io.Writer
  31. }
  32. func (w *nopWriteCloser) Close() error { return nil }
  33. func NopWriteCloser(w io.Writer) io.WriteCloser {
  34. return &nopWriteCloser{w}
  35. }
  36. type bufReader struct {
  37. buf *bytes.Buffer
  38. reader io.Reader
  39. err error
  40. l sync.Mutex
  41. wait sync.Cond
  42. }
  43. func newBufReader(r io.Reader) *bufReader {
  44. reader := &bufReader{
  45. buf: &bytes.Buffer{},
  46. reader: r,
  47. }
  48. reader.wait.L = &reader.l
  49. go reader.drain()
  50. return reader
  51. }
  52. func (r *bufReader) drain() {
  53. buf := make([]byte, 1024)
  54. for {
  55. n, err := r.reader.Read(buf)
  56. if err != nil {
  57. r.err = err
  58. } else {
  59. r.buf.Write(buf[0:n])
  60. }
  61. r.l.Lock()
  62. r.wait.Signal()
  63. r.l.Unlock()
  64. if err != nil {
  65. break
  66. }
  67. }
  68. }
  69. func (r *bufReader) Read(p []byte) (n int, err error) {
  70. for {
  71. n, err = r.buf.Read(p)
  72. if n > 0 {
  73. return n, err
  74. }
  75. if r.err != nil {
  76. return 0, r.err
  77. }
  78. r.l.Lock()
  79. r.wait.Wait()
  80. r.l.Unlock()
  81. }
  82. return
  83. }
  84. func (r *bufReader) Close() error {
  85. closer, ok := r.reader.(io.ReadCloser)
  86. if !ok {
  87. return nil
  88. }
  89. return closer.Close()
  90. }
  91. type writeBroadcaster struct {
  92. writers *list.List
  93. }
  94. func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) {
  95. w.writers.PushBack(writer)
  96. }
  97. func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) {
  98. for e := w.writers.Front(); e != nil; e = e.Next() {
  99. v := e.Value.(io.Writer)
  100. if v == writer {
  101. w.writers.Remove(e)
  102. return
  103. }
  104. }
  105. }
  106. func (w *writeBroadcaster) Write(p []byte) (n int, err error) {
  107. failed := []*list.Element{}
  108. for e := w.writers.Front(); e != nil; e = e.Next() {
  109. writer := e.Value.(io.Writer)
  110. if n, err := writer.Write(p); err != nil || n != len(p) {
  111. // On error, evict the writer
  112. failed = append(failed, e)
  113. }
  114. }
  115. // We cannot remove while iterating, so it has to be done in
  116. // a separate step
  117. for _, e := range failed {
  118. w.writers.Remove(e)
  119. }
  120. return len(p), nil
  121. }
  122. func (w *writeBroadcaster) Close() error {
  123. for e := w.writers.Front(); e != nil; e = e.Next() {
  124. writer := e.Value.(io.WriteCloser)
  125. writer.Close()
  126. }
  127. return nil
  128. }
  129. func newWriteBroadcaster() *writeBroadcaster {
  130. return &writeBroadcaster{list.New()}
  131. }