blob_writer.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package client
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "time"
  10. "github.com/docker/distribution"
  11. )
  12. type httpBlobUpload struct {
  13. statter distribution.BlobStatter
  14. client *http.Client
  15. uuid string
  16. startedAt time.Time
  17. location string // always the last value of the location header.
  18. offset int64
  19. closed bool
  20. }
  21. func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
  22. panic("Not implemented")
  23. }
  24. func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
  25. if resp.StatusCode == http.StatusNotFound {
  26. return distribution.ErrBlobUploadUnknown
  27. }
  28. return HandleErrorResponse(resp)
  29. }
  30. func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
  31. req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
  32. if err != nil {
  33. return 0, err
  34. }
  35. defer req.Body.Close()
  36. req.Header.Set("Content-Type", "application/octet-stream")
  37. resp, err := hbu.client.Do(req)
  38. if err != nil {
  39. return 0, err
  40. }
  41. if !SuccessStatus(resp.StatusCode) {
  42. return 0, hbu.handleErrorResponse(resp)
  43. }
  44. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  45. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  46. if err != nil {
  47. return 0, err
  48. }
  49. rng := resp.Header.Get("Range")
  50. var start, end int64
  51. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  52. return 0, err
  53. } else if n != 2 || end < start {
  54. return 0, fmt.Errorf("bad range format: %s", rng)
  55. }
  56. return (end - start + 1), nil
  57. }
  58. func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
  59. req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
  60. if err != nil {
  61. return 0, err
  62. }
  63. req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
  64. req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
  65. req.Header.Set("Content-Type", "application/octet-stream")
  66. resp, err := hbu.client.Do(req)
  67. if err != nil {
  68. return 0, err
  69. }
  70. if !SuccessStatus(resp.StatusCode) {
  71. return 0, hbu.handleErrorResponse(resp)
  72. }
  73. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  74. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  75. if err != nil {
  76. return 0, err
  77. }
  78. rng := resp.Header.Get("Range")
  79. var start, end int
  80. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  81. return 0, err
  82. } else if n != 2 || end < start {
  83. return 0, fmt.Errorf("bad range format: %s", rng)
  84. }
  85. return (end - start + 1), nil
  86. }
  87. func (hbu *httpBlobUpload) Size() int64 {
  88. return hbu.offset
  89. }
  90. func (hbu *httpBlobUpload) ID() string {
  91. return hbu.uuid
  92. }
  93. func (hbu *httpBlobUpload) StartedAt() time.Time {
  94. return hbu.startedAt
  95. }
  96. func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
  97. // TODO(dmcgowan): Check if already finished, if so just fetch
  98. req, err := http.NewRequest("PUT", hbu.location, nil)
  99. if err != nil {
  100. return distribution.Descriptor{}, err
  101. }
  102. values := req.URL.Query()
  103. values.Set("digest", desc.Digest.String())
  104. req.URL.RawQuery = values.Encode()
  105. resp, err := hbu.client.Do(req)
  106. if err != nil {
  107. return distribution.Descriptor{}, err
  108. }
  109. defer resp.Body.Close()
  110. if !SuccessStatus(resp.StatusCode) {
  111. return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
  112. }
  113. return hbu.statter.Stat(ctx, desc.Digest)
  114. }
  115. func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
  116. req, err := http.NewRequest("DELETE", hbu.location, nil)
  117. if err != nil {
  118. return err
  119. }
  120. resp, err := hbu.client.Do(req)
  121. if err != nil {
  122. return err
  123. }
  124. defer resp.Body.Close()
  125. if resp.StatusCode == http.StatusNotFound || SuccessStatus(resp.StatusCode) {
  126. return nil
  127. }
  128. return hbu.handleErrorResponse(resp)
  129. }
  130. func (hbu *httpBlobUpload) Close() error {
  131. hbu.closed = true
  132. return nil
  133. }