unbuffered.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package broadcaster
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. // Unbuffered accumulates multiple io.WriteCloser by stream.
  7. type Unbuffered struct {
  8. mu sync.Mutex
  9. writers []io.WriteCloser
  10. }
  11. // Add adds new io.WriteCloser.
  12. func (w *Unbuffered) Add(writer io.WriteCloser) {
  13. w.mu.Lock()
  14. w.writers = append(w.writers, writer)
  15. w.mu.Unlock()
  16. }
  17. // Write writes bytes to all writers. Failed writers will be evicted during
  18. // this call.
  19. func (w *Unbuffered) Write(p []byte) (n int, err error) {
  20. w.mu.Lock()
  21. var evict []int
  22. for i, sw := range w.writers {
  23. if n, err := sw.Write(p); err != nil || n != len(p) {
  24. // On error, evict the writer
  25. evict = append(evict, i)
  26. }
  27. }
  28. for n, i := range evict {
  29. w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...)
  30. }
  31. w.mu.Unlock()
  32. return len(p), nil
  33. }
  34. // Clean closes and removes all writers. Last non-eol-terminated part of data
  35. // will be saved.
  36. func (w *Unbuffered) Clean() error {
  37. w.mu.Lock()
  38. for _, sw := range w.writers {
  39. sw.Close()
  40. }
  41. w.writers = nil
  42. w.mu.Unlock()
  43. return nil
  44. }