resumablerequestreader.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package httputils
  2. import (
  3. "fmt"
  4. "io"
  5. "net/http"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. )
  9. type resumableRequestReader struct {
  10. client *http.Client
  11. request *http.Request
  12. lastRange int64
  13. totalSize int64
  14. currentResponse *http.Response
  15. failures uint32
  16. maxFailures uint32
  17. waitDuration time.Duration
  18. }
  19. // ResumableRequestReader makes it possible to resume reading a request's body transparently
  20. // maxfail is the number of times we retry to make requests again (not resumes)
  21. // totalsize is the total length of the body; auto detect if not provided
  22. func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
  23. return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second}
  24. }
  25. // ResumableRequestReaderWithInitialResponse makes it possible to resume
  26. // reading the body of an already initiated request.
  27. func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
  28. return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second}
  29. }
  30. func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
  31. if r.client == nil || r.request == nil {
  32. return 0, fmt.Errorf("client and request can't be nil\n")
  33. }
  34. isFreshRequest := false
  35. if r.lastRange != 0 && r.currentResponse == nil {
  36. readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
  37. r.request.Header.Set("Range", readRange)
  38. time.Sleep(r.waitDuration)
  39. }
  40. if r.currentResponse == nil {
  41. r.currentResponse, err = r.client.Do(r.request)
  42. isFreshRequest = true
  43. }
  44. if err != nil && r.failures+1 != r.maxFailures {
  45. r.cleanUpResponse()
  46. r.failures++
  47. time.Sleep(time.Duration(r.failures) * r.waitDuration)
  48. return 0, nil
  49. } else if err != nil {
  50. r.cleanUpResponse()
  51. return 0, err
  52. }
  53. if r.currentResponse.StatusCode == 416 && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
  54. r.cleanUpResponse()
  55. return 0, io.EOF
  56. } else if r.currentResponse.StatusCode != 206 && r.lastRange != 0 && isFreshRequest {
  57. r.cleanUpResponse()
  58. return 0, fmt.Errorf("the server doesn't support byte ranges")
  59. }
  60. if r.totalSize == 0 {
  61. r.totalSize = r.currentResponse.ContentLength
  62. } else if r.totalSize <= 0 {
  63. r.cleanUpResponse()
  64. return 0, fmt.Errorf("failed to auto detect content length")
  65. }
  66. n, err = r.currentResponse.Body.Read(p)
  67. r.lastRange += int64(n)
  68. if err != nil {
  69. r.cleanUpResponse()
  70. }
  71. if err != nil && err != io.EOF {
  72. logrus.Infof("encountered error during pull and clearing it before resume: %s", err)
  73. err = nil
  74. }
  75. return n, err
  76. }
  77. func (r *resumableRequestReader) Close() error {
  78. r.cleanUpResponse()
  79. r.client = nil
  80. r.request = nil
  81. return nil
  82. }
  83. func (r *resumableRequestReader) cleanUpResponse() {
  84. if r.currentResponse != nil {
  85. r.currentResponse.Body.Close()
  86. r.currentResponse = nil
  87. }
  88. }