瀏覽代碼

bump google.golang.org/grpc v1.12.2

full diff: https://github.com/grpc/grpc-go/compare/v1.12.0...v1.12.2

- grpc/grpc-go#2074 transport/server: fix race between writing status and header
  - fix grpc/grpc-go#1972 Possible race sending headers from server while receiving message over size limit
- grpc/grpc-go#2074 transport: account for user configured small io write buffer
  - fix grpc/grpc-go#2089 Server abruptly terminates connections if write buffer is small enough

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
Sebastiaan van Stijn 6 年之前
父節點
當前提交
6f572c8154

+ 1 - 1
vendor.conf

@@ -74,7 +74,7 @@ github.com/opencontainers/go-digest v1.0.0-rc1
 github.com/mistifyio/go-zfs 22c9b32c84eb0d0c6f4043b6e90fc94073de92fa
 github.com/mistifyio/go-zfs 22c9b32c84eb0d0c6f4043b6e90fc94073de92fa
 github.com/pborman/uuid v1.0
 github.com/pborman/uuid v1.0
 
 
-google.golang.org/grpc v1.12.0
+google.golang.org/grpc 7a6a684ca69eb4cae85ad0a484f2e531598c047b # v1.12.2
 
 
 # The version of runc should match the version that is used by the containerd
 # The version of runc should match the version that is used by the containerd
 # version that is used. If you need to update runc, open a pull request in
 # version that is used. If you need to update runc, open a pull request in

+ 1 - 1
vendor/google.golang.org/grpc/rpc_util.go

@@ -722,6 +722,6 @@ const (
 )
 )
 
 
 // Version is the current grpc version.
 // Version is the current grpc version.
-const Version = "1.12.0"
+const Version = "1.12.2"
 
 
 const grpcUA = "grpc-go/" + Version
 const grpcUA = "grpc-go/" + Version

+ 37 - 36
vendor/google.golang.org/grpc/transport/http2_server.go

@@ -682,12 +682,25 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
 	})
 	})
 }
 }
 
 
+func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
+	for k, vv := range md {
+		if isReservedHeader(k) {
+			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+			continue
+		}
+		for _, v := range vv {
+			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+		}
+	}
+	return headerFields
+}
+
 // WriteHeader sends the header metedata md back to the client.
 // WriteHeader sends the header metedata md back to the client.
 func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
-	if s.headerOk || s.getState() == streamDone {
+	if s.updateHeaderSent() || s.getState() == streamDone {
 		return ErrIllegalHeaderWrite
 		return ErrIllegalHeaderWrite
 	}
 	}
-	s.headerOk = true
+	s.hdrMu.Lock()
 	if md.Len() > 0 {
 	if md.Len() > 0 {
 		if s.header.Len() > 0 {
 		if s.header.Len() > 0 {
 			s.header = metadata.Join(s.header, md)
 			s.header = metadata.Join(s.header, md)
@@ -695,7 +708,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 			s.header = md
 			s.header = md
 		}
 		}
 	}
 	}
-	md = s.header
+	t.writeHeaderLocked(s)
+	s.hdrMu.Unlock()
+	return nil
+}
+
+func (t *http2Server) writeHeaderLocked(s *Stream) {
 	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
 	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
 	// first and create a slice of that exact size.
 	// first and create a slice of that exact size.
 	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
 	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
@@ -704,15 +722,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 	if s.sendCompress != "" {
 	if s.sendCompress != "" {
 		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
 		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
 	}
 	}
-	for k, vv := range md {
-		if isReservedHeader(k) {
-			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
-			continue
-		}
-		for _, v := range vv {
-			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
-		}
-	}
+	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
 	t.controlBuf.put(&headerFrame{
 	t.controlBuf.put(&headerFrame{
 		streamID:  s.id,
 		streamID:  s.id,
 		hf:        headerFields,
 		hf:        headerFields,
@@ -728,7 +738,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 		outHeader := &stats.OutHeader{}
 		outHeader := &stats.OutHeader{}
 		t.stats.HandleRPC(s.Context(), outHeader)
 		t.stats.HandleRPC(s.Context(), outHeader)
 	}
 	}
-	return nil
 }
 }
 
 
 // WriteStatus sends stream status to the client and terminates the stream.
 // WriteStatus sends stream status to the client and terminates the stream.
@@ -736,21 +745,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
 // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
 // OK is adopted.
 // OK is adopted.
 func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
 func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
-	if !s.headerOk && s.header.Len() > 0 {
-		if err := t.WriteHeader(s, nil); err != nil {
-			return err
-		}
-	} else {
-		if s.getState() == streamDone {
-			return nil
-		}
+	if s.getState() == streamDone {
+		return nil
 	}
 	}
+	s.hdrMu.Lock()
 	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
 	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
 	// first and create a slice of that exact size.
 	// first and create a slice of that exact size.
 	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
 	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
-	if !s.headerOk {
-		headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
-		headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
+	if !s.updateHeaderSent() {                      // No headers have been sent.
+		if len(s.header) > 0 { // Send a separate header frame.
+			t.writeHeaderLocked(s)
+		} else { // Send a trailer only response.
+			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
+			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
+		}
 	}
 	}
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
@@ -766,16 +774,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
 	}
 	}
 
 
 	// Attach the trailer metadata.
 	// Attach the trailer metadata.
-	for k, vv := range s.trailer {
-		// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
-		if isReservedHeader(k) {
-			continue
-		}
-		for _, v := range vv {
-			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
-		}
-	}
-	trailer := &headerFrame{
+	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
+	trailingHeader := &headerFrame{
 		streamID:  s.id,
 		streamID:  s.id,
 		hf:        headerFields,
 		hf:        headerFields,
 		endStream: true,
 		endStream: true,
@@ -783,7 +783,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
 			atomic.StoreUint32(&t.resetPingStrikes, 1)
 			atomic.StoreUint32(&t.resetPingStrikes, 1)
 		},
 		},
 	}
 	}
-	t.closeStream(s, false, 0, trailer, true)
+	s.hdrMu.Unlock()
+	t.closeStream(s, false, 0, trailingHeader, true)
 	if t.stats != nil {
 	if t.stats != nil {
 		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
 		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
 	}
 	}
@@ -793,7 +794,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
 // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
 // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
 // is returns if it fails (e.g., framing error, transport error).
 // is returns if it fails (e.g., framing error, transport error).
 func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
 func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
-	if !s.headerOk { // Headers haven't been written yet.
+	if !s.isHeaderSent() { // Headers haven't been written yet.
 		if err := t.WriteHeader(s, nil); err != nil {
 		if err := t.WriteHeader(s, nil); err != nil {
 			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
 			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
 			return streamErrorf(codes.Internal, "transport: %v", err)
 			return streamErrorf(codes.Internal, "transport: %v", err)

+ 8 - 4
vendor/google.golang.org/grpc/transport/http_util.go

@@ -531,10 +531,14 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
 	if w.err != nil {
 	if w.err != nil {
 		return 0, w.err
 		return 0, w.err
 	}
 	}
-	n = copy(w.buf[w.offset:], b)
-	w.offset += n
-	if w.offset >= w.batchSize {
-		err = w.Flush()
+	for len(b) > 0 {
+		nn := copy(w.buf[w.offset:], b)
+		b = b[nn:]
+		w.offset += nn
+		n += nn
+		if w.offset >= w.batchSize {
+			err = w.Flush()
+		}
 	}
 	}
 	return n, err
 	return n, err
 }
 }

+ 31 - 6
vendor/google.golang.org/grpc/transport/transport.go

@@ -185,13 +185,20 @@ type Stream struct {
 
 
 	headerChan chan struct{} // closed to indicate the end of header metadata.
 	headerChan chan struct{} // closed to indicate the end of header metadata.
 	headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
 	headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
-	header     metadata.MD   // the received header metadata.
-	trailer    metadata.MD   // the key-value map of trailer metadata.
 
 
-	headerOk bool // becomes true from the first header is about to send
-	state    streamState
+	// hdrMu protects header and trailer metadata on the server-side.
+	hdrMu   sync.Mutex
+	header  metadata.MD // the received header metadata.
+	trailer metadata.MD // the key-value map of trailer metadata.
 
 
-	status *status.Status // the status error received from the server
+	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
+	headerSent uint32
+
+	state streamState
+
+	// On client-side it is the status error received from the server.
+	// On server-side it is unused.
+	status *status.Status
 
 
 	bytesReceived uint32 // indicates whether any bytes have been received on this stream
 	bytesReceived uint32 // indicates whether any bytes have been received on this stream
 	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
 	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
@@ -201,6 +208,17 @@ type Stream struct {
 	contentSubtype string
 	contentSubtype string
 }
 }
 
 
+// isHeaderSent is only valid on the server-side.
+func (s *Stream) isHeaderSent() bool {
+	return atomic.LoadUint32(&s.headerSent) == 1
+}
+
+// updateHeaderSent updates headerSent and returns true
+// if it was alreay set. It is valid only on server-side.
+func (s *Stream) updateHeaderSent() bool {
+	return atomic.SwapUint32(&s.headerSent, 1) == 1
+}
+
 func (s *Stream) swapState(st streamState) streamState {
 func (s *Stream) swapState(st streamState) streamState {
 	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
 	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
 }
 }
@@ -313,10 +331,12 @@ func (s *Stream) SetHeader(md metadata.MD) error {
 	if md.Len() == 0 {
 	if md.Len() == 0 {
 		return nil
 		return nil
 	}
 	}
-	if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
+	if s.isHeaderSent() || s.getState() == streamDone {
 		return ErrIllegalHeaderWrite
 		return ErrIllegalHeaderWrite
 	}
 	}
+	s.hdrMu.Lock()
 	s.header = metadata.Join(s.header, md)
 	s.header = metadata.Join(s.header, md)
+	s.hdrMu.Unlock()
 	return nil
 	return nil
 }
 }
 
 
@@ -335,7 +355,12 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
 	if md.Len() == 0 {
 	if md.Len() == 0 {
 		return nil
 		return nil
 	}
 	}
+	if s.getState() == streamDone {
+		return ErrIllegalHeaderWrite
+	}
+	s.hdrMu.Lock()
 	s.trailer = metadata.Join(s.trailer, md)
 	s.trailer = metadata.Join(s.trailer, md)
+	s.hdrMu.Unlock()
 	return nil
 	return nil
 }
 }