blob_writer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package client
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "os"
  9. "time"
  10. "github.com/docker/distribution"
  11. "github.com/docker/distribution/context"
  12. )
  13. type httpBlobUpload struct {
  14. statter distribution.BlobStatter
  15. client *http.Client
  16. uuid string
  17. startedAt time.Time
  18. location string // always the last value of the location header.
  19. offset int64
  20. closed bool
  21. }
  22. func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
  23. if resp.StatusCode == http.StatusNotFound {
  24. return distribution.ErrBlobUploadUnknown
  25. }
  26. return handleErrorResponse(resp)
  27. }
  28. func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
  29. req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
  30. if err != nil {
  31. return 0, err
  32. }
  33. defer req.Body.Close()
  34. resp, err := hbu.client.Do(req)
  35. if err != nil {
  36. return 0, err
  37. }
  38. if resp.StatusCode != http.StatusAccepted {
  39. return 0, hbu.handleErrorResponse(resp)
  40. }
  41. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  42. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  43. if err != nil {
  44. return 0, err
  45. }
  46. rng := resp.Header.Get("Range")
  47. var start, end int64
  48. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  49. return 0, err
  50. } else if n != 2 || end < start {
  51. return 0, fmt.Errorf("bad range format: %s", rng)
  52. }
  53. return (end - start + 1), nil
  54. }
  55. func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
  56. req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
  57. if err != nil {
  58. return 0, err
  59. }
  60. req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
  61. req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
  62. req.Header.Set("Content-Type", "application/octet-stream")
  63. resp, err := hbu.client.Do(req)
  64. if err != nil {
  65. return 0, err
  66. }
  67. if resp.StatusCode != http.StatusAccepted {
  68. return 0, hbu.handleErrorResponse(resp)
  69. }
  70. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  71. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  72. if err != nil {
  73. return 0, err
  74. }
  75. rng := resp.Header.Get("Range")
  76. var start, end int
  77. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  78. return 0, err
  79. } else if n != 2 || end < start {
  80. return 0, fmt.Errorf("bad range format: %s", rng)
  81. }
  82. return (end - start + 1), nil
  83. }
  84. func (hbu *httpBlobUpload) Seek(offset int64, whence int) (int64, error) {
  85. newOffset := hbu.offset
  86. switch whence {
  87. case os.SEEK_CUR:
  88. newOffset += int64(offset)
  89. case os.SEEK_END:
  90. newOffset += int64(offset)
  91. case os.SEEK_SET:
  92. newOffset = int64(offset)
  93. }
  94. hbu.offset = newOffset
  95. return hbu.offset, nil
  96. }
  97. func (hbu *httpBlobUpload) ID() string {
  98. return hbu.uuid
  99. }
  100. func (hbu *httpBlobUpload) StartedAt() time.Time {
  101. return hbu.startedAt
  102. }
  103. func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
  104. // TODO(dmcgowan): Check if already finished, if so just fetch
  105. req, err := http.NewRequest("PUT", hbu.location, nil)
  106. if err != nil {
  107. return distribution.Descriptor{}, err
  108. }
  109. values := req.URL.Query()
  110. values.Set("digest", desc.Digest.String())
  111. req.URL.RawQuery = values.Encode()
  112. resp, err := hbu.client.Do(req)
  113. if err != nil {
  114. return distribution.Descriptor{}, err
  115. }
  116. defer resp.Body.Close()
  117. if resp.StatusCode != http.StatusCreated {
  118. return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
  119. }
  120. return hbu.statter.Stat(ctx, desc.Digest)
  121. }
  122. func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
  123. req, err := http.NewRequest("DELETE", hbu.location, nil)
  124. if err != nil {
  125. return err
  126. }
  127. resp, err := hbu.client.Do(req)
  128. if err != nil {
  129. return err
  130. }
  131. defer resp.Body.Close()
  132. switch resp.StatusCode {
  133. case http.StatusNoContent, http.StatusNotFound:
  134. return nil
  135. default:
  136. return hbu.handleErrorResponse(resp)
  137. }
  138. }
  139. func (hbu *httpBlobUpload) Close() error {
  140. hbu.closed = true
  141. return nil
  142. }