resumablerequestreader.go 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package resumable // import "github.com/docker/docker/registry/resumable"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "time"
  8. "github.com/containerd/log"
  9. )
  10. type requestReader struct {
  11. client *http.Client
  12. request *http.Request
  13. lastRange int64
  14. totalSize int64
  15. currentResponse *http.Response
  16. failures uint32
  17. maxFailures uint32
  18. waitDuration time.Duration
  19. }
  20. // NewRequestReader makes it possible to resume reading a request's body transparently
  21. // maxfail is the number of times we retry to make requests again (not resumes)
  22. // totalsize is the total length of the body; auto detect if not provided
  23. func NewRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
  24. return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second}
  25. }
  26. // NewRequestReaderWithInitialResponse makes it possible to resume
  27. // reading the body of an already initiated request.
  28. func NewRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
  29. return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second}
  30. }
  31. func (r *requestReader) Read(p []byte) (n int, err error) {
  32. if r.client == nil || r.request == nil {
  33. return 0, fmt.Errorf("client and request can't be nil")
  34. }
  35. isFreshRequest := false
  36. if r.lastRange != 0 && r.currentResponse == nil {
  37. readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
  38. r.request.Header.Set("Range", readRange)
  39. time.Sleep(r.waitDuration)
  40. }
  41. if r.currentResponse == nil {
  42. r.currentResponse, err = r.client.Do(r.request)
  43. isFreshRequest = true
  44. }
  45. if err != nil && r.failures+1 != r.maxFailures {
  46. r.cleanUpResponse()
  47. r.failures++
  48. time.Sleep(time.Duration(r.failures) * r.waitDuration)
  49. return 0, nil
  50. } else if err != nil {
  51. r.cleanUpResponse()
  52. return 0, err
  53. }
  54. if r.currentResponse.StatusCode == http.StatusRequestedRangeNotSatisfiable && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
  55. r.cleanUpResponse()
  56. return 0, io.EOF
  57. } else if r.currentResponse.StatusCode != http.StatusPartialContent && r.lastRange != 0 && isFreshRequest {
  58. r.cleanUpResponse()
  59. return 0, fmt.Errorf("the server doesn't support byte ranges")
  60. }
  61. if r.totalSize == 0 {
  62. r.totalSize = r.currentResponse.ContentLength
  63. } else if r.totalSize <= 0 {
  64. r.cleanUpResponse()
  65. return 0, fmt.Errorf("failed to auto detect content length")
  66. }
  67. n, err = r.currentResponse.Body.Read(p)
  68. r.lastRange += int64(n)
  69. if err != nil {
  70. r.cleanUpResponse()
  71. }
  72. if err != nil && err != io.EOF {
  73. log.G(context.TODO()).Infof("encountered error during pull and clearing it before resume: %s", err)
  74. err = nil
  75. }
  76. return n, err
  77. }
  78. func (r *requestReader) Close() error {
  79. r.cleanUpResponse()
  80. r.client = nil
  81. r.request = nil
  82. return nil
  83. }
  84. func (r *requestReader) cleanUpResponse() {
  85. if r.currentResponse != nil {
  86. r.currentResponse.Body.Close()
  87. r.currentResponse = nil
  88. }
  89. }