|
@@ -377,17 +377,24 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
|
|
|
|
|
// If content has already been written, the bytes
|
|
// If content has already been written, the bytes
|
|
// cannot be written and the caller must reset
|
|
// cannot be written and the caller must reset
|
|
- if status.Offset > 0 {
|
|
|
|
- status.Offset = 0
|
|
|
|
- status.UpdatedAt = time.Now()
|
|
|
|
- pw.tracker.SetStatus(pw.ref, status)
|
|
|
|
- return 0, content.ErrReset
|
|
|
|
- }
|
|
|
|
|
|
+ status.Offset = 0
|
|
|
|
+ status.UpdatedAt = time.Now()
|
|
|
|
+ pw.tracker.SetStatus(pw.ref, status)
|
|
|
|
+ return 0, content.ErrReset
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
n, err = pw.pipe.Write(p)
|
|
n, err = pw.pipe.Write(p)
|
|
|
|
+ if errors.Is(err, io.ErrClosedPipe) {
|
|
|
|
+ // if the pipe is closed, we might have the original error on the error
|
|
|
|
+ // channel - so we should try and get it
|
|
|
|
+ select {
|
|
|
|
+ case err2 := <-pw.errC:
|
|
|
|
+ err = err2
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+ }
|
|
status.Offset += int64(n)
|
|
status.Offset += int64(n)
|
|
status.UpdatedAt = time.Now()
|
|
status.UpdatedAt = time.Now()
|
|
pw.tracker.SetStatus(pw.ref, status)
|
|
pw.tracker.SetStatus(pw.ref, status)
|
|
@@ -428,7 +435,7 @@ func (pw *pushWriter) Digest() digest.Digest {
|
|
|
|
|
|
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
|
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
|
// Check whether read has already thrown an error
|
|
// Check whether read has already thrown an error
|
|
- if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe {
|
|
|
|
|
|
+ if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
return fmt.Errorf("pipe error before commit: %w", err)
|
|
return fmt.Errorf("pipe error before commit: %w", err)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -439,9 +446,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
|
|
var resp *http.Response
|
|
var resp *http.Response
|
|
select {
|
|
select {
|
|
case err := <-pw.errC:
|
|
case err := <-pw.errC:
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
|
|
+ return err
|
|
case resp = <-pw.respC:
|
|
case resp = <-pw.respC:
|
|
defer resp.Body.Close()
|
|
defer resp.Body.Close()
|
|
case p, ok := <-pw.pipeC:
|
|
case p, ok := <-pw.pipeC:
|
|
@@ -453,18 +458,17 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
|
|
}
|
|
}
|
|
pw.pipe.CloseWithError(content.ErrReset)
|
|
pw.pipe.CloseWithError(content.ErrReset)
|
|
pw.pipe = p
|
|
pw.pipe = p
|
|
|
|
+
|
|
|
|
+ // If content has already been written, the bytes
|
|
|
|
+ // cannot be written again and the caller must reset
|
|
status, err := pw.tracker.GetStatus(pw.ref)
|
|
status, err := pw.tracker.GetStatus(pw.ref)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
- // If content has already been written, the bytes
|
|
|
|
- // cannot be written again and the caller must reset
|
|
|
|
- if status.Offset > 0 {
|
|
|
|
- status.Offset = 0
|
|
|
|
- status.UpdatedAt = time.Now()
|
|
|
|
- pw.tracker.SetStatus(pw.ref, status)
|
|
|
|
- return content.ErrReset
|
|
|
|
- }
|
|
|
|
|
|
+ status.Offset = 0
|
|
|
|
+ status.UpdatedAt = time.Now()
|
|
|
|
+ pw.tracker.SetStatus(pw.ref, status)
|
|
|
|
+ return content.ErrReset
|
|
}
|
|
}
|
|
|
|
|
|
// 201 is specified return status, some registries return
|
|
// 201 is specified return status, some registries return
|