resumablerequestreader.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package httputils
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "net/http"
  7. "time"
  8. )
  9. type resumableRequestReader struct {
  10. client *http.Client
  11. request *http.Request
  12. lastRange int64
  13. totalSize int64
  14. currentResponse *http.Response
  15. failures uint32
  16. maxFailures uint32
  17. }
  18. // ResumableRequestReader makes it possible to resume reading a request's body transparently
  19. // maxfail is the number of times we retry to make requests again (not resumes)
  20. // totalsize is the total length of the body; auto detect if not provided
  21. func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
  22. return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize}
  23. }
  24. func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
  25. return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse}
  26. }
  27. func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
  28. if r.client == nil || r.request == nil {
  29. return 0, fmt.Errorf("client and request can't be nil\n")
  30. }
  31. isFreshRequest := false
  32. if r.lastRange != 0 && r.currentResponse == nil {
  33. readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
  34. r.request.Header.Set("Range", readRange)
  35. time.Sleep(5 * time.Second)
  36. }
  37. if r.currentResponse == nil {
  38. r.currentResponse, err = r.client.Do(r.request)
  39. isFreshRequest = true
  40. }
  41. if err != nil && r.failures+1 != r.maxFailures {
  42. r.cleanUpResponse()
  43. r.failures += 1
  44. time.Sleep(5 * time.Duration(r.failures) * time.Second)
  45. return 0, nil
  46. } else if err != nil {
  47. r.cleanUpResponse()
  48. return 0, err
  49. }
  50. if r.currentResponse.StatusCode == 416 && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
  51. r.cleanUpResponse()
  52. return 0, io.EOF
  53. } else if r.currentResponse.StatusCode != 206 && r.lastRange != 0 && isFreshRequest {
  54. r.cleanUpResponse()
  55. return 0, fmt.Errorf("the server doesn't support byte ranges")
  56. }
  57. if r.totalSize == 0 {
  58. r.totalSize = r.currentResponse.ContentLength
  59. } else if r.totalSize <= 0 {
  60. r.cleanUpResponse()
  61. return 0, fmt.Errorf("failed to auto detect content length")
  62. }
  63. n, err = r.currentResponse.Body.Read(p)
  64. r.lastRange += int64(n)
  65. if err != nil {
  66. r.cleanUpResponse()
  67. }
  68. if err != nil && err != io.EOF {
  69. log.Printf("encountered error during pull and clearing it before resume: %s", err)
  70. err = nil
  71. }
  72. return n, err
  73. }
  74. func (r *resumableRequestReader) Close() error {
  75. r.cleanUpResponse()
  76. r.client = nil
  77. r.request = nil
  78. return nil
  79. }
  80. func (r *resumableRequestReader) cleanUpResponse() {
  81. if r.currentResponse != nil {
  82. r.currentResponse.Body.Close()
  83. r.currentResponse = nil
  84. }
  85. }