Browse Source

Merge pull request #19841 from aaronlehmann/compress-panic

Fix panic on network timeout during push
Brian Goff 9 years ago
parent
commit
0c09dda80f
2 changed files with 15 additions and 4 deletions
  1. 10 2
      distribution/push.go
  2. 5 2
      distribution/push_v2.go

+ 10 - 2
distribution/push.go

@@ -170,7 +170,14 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
 // argument so that it can be used with httpBlobWriter's ReadFrom method.
 // argument so that it can be used with httpBlobWriter's ReadFrom method.
 // Using httpBlobWriter's Write method would send a PATCH request for every
 // Using httpBlobWriter's Write method would send a PATCH request for every
 // Write call.
 // Write call.
-func compress(in io.Reader) io.ReadCloser {
+//
+// The second return value is a channel that gets closed when the goroutine
+// is finished. This allows the caller to make sure the goroutine finishes
+// before it releases any resources connected with the reader that was
+// passed in.
+func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
+	compressionDone := make(chan struct{})
+
 	pipeReader, pipeWriter := io.Pipe()
 	pipeReader, pipeWriter := io.Pipe()
 	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
 	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
 	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
 	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
@@ -189,7 +196,8 @@ func compress(in io.Reader) io.ReadCloser {
 		} else {
 		} else {
 			pipeWriter.Close()
 			pipeWriter.Close()
 		}
 		}
+		close(compressionDone)
 	}()
 	}()
 
 
-	return pipeReader
+	return pipeReader, compressionDone
 }
 }

+ 5 - 2
distribution/push_v2.go

@@ -345,8 +345,11 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	size, _ := pd.layer.DiffSize()
 	size, _ := pd.layer.DiffSize()
 
 
 	reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
 	reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
-	defer reader.Close()
-	compressedReader := compress(reader)
+	compressedReader, compressionDone := compress(reader)
+	defer func() {
+		reader.Close()
+		<-compressionDone
+	}()
 
 
 	digester := digest.Canonical.New()
 	digester := digest.Canonical.New()
 	tee := io.TeeReader(compressedReader, digester.Hash())
 	tee := io.TeeReader(compressedReader, digester.Hash())