bytespipe_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package ioutils // import "github.com/docker/docker/pkg/ioutils"
  2. import (
  3. "crypto/sha256"
  4. "encoding/hex"
  5. "math/rand"
  6. "testing"
  7. "time"
  8. )
  9. func TestBytesPipeRead(t *testing.T) {
  10. buf := NewBytesPipe()
  11. _, _ = buf.Write([]byte("12"))
  12. _, _ = buf.Write([]byte("34"))
  13. _, _ = buf.Write([]byte("56"))
  14. _, _ = buf.Write([]byte("78"))
  15. _, _ = buf.Write([]byte("90"))
  16. rd := make([]byte, 4)
  17. n, err := buf.Read(rd)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. if n != 4 {
  22. t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
  23. }
  24. if string(rd) != "1234" {
  25. t.Fatalf("Read %s, but must be %s", rd, "1234")
  26. }
  27. n, err = buf.Read(rd)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. if n != 4 {
  32. t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
  33. }
  34. if string(rd) != "5678" {
  35. t.Fatalf("Read %s, but must be %s", rd, "5679")
  36. }
  37. n, err = buf.Read(rd)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. if n != 2 {
  42. t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2)
  43. }
  44. if string(rd[:n]) != "90" {
  45. t.Fatalf("Read %s, but must be %s", rd, "90")
  46. }
  47. }
  48. func TestBytesPipeWrite(t *testing.T) {
  49. buf := NewBytesPipe()
  50. _, _ = buf.Write([]byte("12"))
  51. _, _ = buf.Write([]byte("34"))
  52. _, _ = buf.Write([]byte("56"))
  53. _, _ = buf.Write([]byte("78"))
  54. _, _ = buf.Write([]byte("90"))
  55. if buf.buf[0].String() != "1234567890" {
  56. t.Fatalf("Buffer %q, must be %q", buf.buf[0].String(), "1234567890")
  57. }
  58. }
  59. // Regression test for #41941.
  60. func TestBytesPipeDeadlock(t *testing.T) {
  61. bp := NewBytesPipe()
  62. bp.buf = []*fixedBuffer{getBuffer(blockThreshold)}
  63. rd := make(chan error)
  64. go func() {
  65. n, err := bp.Read(make([]byte, 1))
  66. t.Logf("Read n=%d, err=%v", n, err)
  67. if n != 1 {
  68. t.Errorf("short read: got %d, want 1", n)
  69. }
  70. rd <- err
  71. }()
  72. wr := make(chan error)
  73. go func() {
  74. const writeLen int = blockThreshold + 1
  75. time.Sleep(time.Millisecond)
  76. n, err := bp.Write(make([]byte, writeLen))
  77. t.Logf("Write n=%d, err=%v", n, err)
  78. if n != writeLen {
  79. t.Errorf("short write: got %d, want %d", n, writeLen)
  80. }
  81. wr <- err
  82. }()
  83. timer := time.NewTimer(time.Second)
  84. defer timer.Stop()
  85. select {
  86. case <-timer.C:
  87. t.Fatal("deadlock! Neither Read() nor Write() returned.")
  88. case rerr := <-rd:
  89. if rerr != nil {
  90. t.Fatal(rerr)
  91. }
  92. select {
  93. case <-timer.C:
  94. t.Fatal("deadlock! Write() did not return.")
  95. case werr := <-wr:
  96. if werr != nil {
  97. t.Fatal(werr)
  98. }
  99. }
  100. case werr := <-wr:
  101. if werr != nil {
  102. t.Fatal(werr)
  103. }
  104. select {
  105. case <-timer.C:
  106. t.Fatal("deadlock! Read() did not return.")
  107. case rerr := <-rd:
  108. if rerr != nil {
  109. t.Fatal(rerr)
  110. }
  111. }
  112. }
  113. }
  114. // Write and read in different speeds/chunk sizes and check valid data is read.
  115. func TestBytesPipeWriteRandomChunks(t *testing.T) {
  116. tests := []struct{ iterations, writesPerLoop, readsPerLoop int }{
  117. {iterations: 100, writesPerLoop: 10, readsPerLoop: 1},
  118. {iterations: 1000, writesPerLoop: 10, readsPerLoop: 5},
  119. {iterations: 1000, writesPerLoop: 100},
  120. {iterations: 1000, writesPerLoop: 5, readsPerLoop: 6},
  121. {iterations: 10000, writesPerLoop: 50, readsPerLoop: 25},
  122. }
  123. testMessage := []byte("this is a random string for testing")
  124. // random slice sizes to read and write
  125. writeChunks := []int{25, 35, 15, 20}
  126. readChunks := []int{5, 45, 20, 25}
  127. for _, tc := range tests {
  128. // first pass: write directly to hash
  129. hash := sha256.New()
  130. for i := 0; i < tc.iterations*tc.writesPerLoop; i++ {
  131. if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
  132. t.Fatal(err)
  133. }
  134. }
  135. expected := hex.EncodeToString(hash.Sum(nil))
  136. // write/read through buffer
  137. buf := NewBytesPipe()
  138. hash.Reset()
  139. done := make(chan struct{})
  140. go func() {
  141. // random delay before read starts
  142. <-time.After(time.Duration(rand.Intn(10)) * time.Millisecond)
  143. for i := 0; ; i++ {
  144. p := make([]byte, readChunks[(tc.iterations*tc.readsPerLoop+i)%len(readChunks)])
  145. n, _ := buf.Read(p)
  146. if n == 0 {
  147. break
  148. }
  149. hash.Write(p[:n])
  150. }
  151. close(done)
  152. }()
  153. for i := 0; i < tc.iterations; i++ {
  154. for w := 0; w < tc.writesPerLoop; w++ {
  155. buf.Write(testMessage[:writeChunks[(i*tc.writesPerLoop+w)%len(writeChunks)]])
  156. }
  157. }
  158. _ = buf.Close()
  159. <-done
  160. actual := hex.EncodeToString(hash.Sum(nil))
  161. if expected != actual {
  162. t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
  163. }
  164. }
  165. }
  166. func BenchmarkBytesPipeWrite(b *testing.B) {
  167. b.ReportAllocs()
  168. testData := []byte("pretty short line, because why not?")
  169. for i := 0; i < b.N; i++ {
  170. readBuf := make([]byte, 1024)
  171. buf := NewBytesPipe()
  172. go func() {
  173. var err error
  174. for err == nil {
  175. _, err = buf.Read(readBuf)
  176. }
  177. }()
  178. for j := 0; j < 1000; j++ {
  179. _, _ = buf.Write(testData)
  180. }
  181. _ = buf.Close()
  182. }
  183. }
  184. func BenchmarkBytesPipeRead(b *testing.B) {
  185. b.ReportAllocs()
  186. rd := make([]byte, 512)
  187. for i := 0; i < b.N; i++ {
  188. b.StopTimer()
  189. buf := NewBytesPipe()
  190. for j := 0; j < 500; j++ {
  191. _, _ = buf.Write(make([]byte, 1024))
  192. }
  193. b.StartTimer()
  194. for j := 0; j < 1000; j++ {
  195. if n, _ := buf.Read(rd); n != 512 {
  196. b.Fatalf("Wrong number of bytes: %d", n)
  197. }
  198. }
  199. }
  200. }