channel.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ttrpc
  14. import (
  15. "bufio"
  16. "encoding/binary"
  17. "fmt"
  18. "io"
  19. "net"
  20. "sync"
  21. "google.golang.org/grpc/codes"
  22. "google.golang.org/grpc/status"
  23. )
  24. const (
  25. messageHeaderLength = 10
  26. messageLengthMax = 4 << 20
  27. )
  28. type messageType uint8
  29. const (
  30. messageTypeRequest messageType = 0x1
  31. messageTypeResponse messageType = 0x2
  32. messageTypeData messageType = 0x3
  33. )
  34. func (mt messageType) String() string {
  35. switch mt {
  36. case messageTypeRequest:
  37. return "request"
  38. case messageTypeResponse:
  39. return "response"
  40. case messageTypeData:
  41. return "data"
  42. default:
  43. return "unknown"
  44. }
  45. }
  46. const (
  47. flagRemoteClosed uint8 = 0x1
  48. flagRemoteOpen uint8 = 0x2
  49. flagNoData uint8 = 0x4
  50. )
  51. // messageHeader represents the fixed-length message header of 10 bytes sent
  52. // with every request.
  53. type messageHeader struct {
  54. Length uint32 // length excluding this header. b[:4]
  55. StreamID uint32 // identifies which request stream message is a part of. b[4:8]
  56. Type messageType // message type b[8]
  57. Flags uint8 // type specific flags b[9]
  58. }
  59. func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) {
  60. _, err := io.ReadFull(r, p[:messageHeaderLength])
  61. if err != nil {
  62. return messageHeader{}, err
  63. }
  64. return messageHeader{
  65. Length: binary.BigEndian.Uint32(p[:4]),
  66. StreamID: binary.BigEndian.Uint32(p[4:8]),
  67. Type: messageType(p[8]),
  68. Flags: p[9],
  69. }, nil
  70. }
  71. func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
  72. binary.BigEndian.PutUint32(p[:4], mh.Length)
  73. binary.BigEndian.PutUint32(p[4:8], mh.StreamID)
  74. p[8] = byte(mh.Type)
  75. p[9] = mh.Flags
  76. _, err := w.Write(p[:])
  77. return err
  78. }
  79. var buffers sync.Pool
  80. type channel struct {
  81. conn net.Conn
  82. bw *bufio.Writer
  83. br *bufio.Reader
  84. hrbuf [messageHeaderLength]byte // avoid alloc when reading header
  85. hwbuf [messageHeaderLength]byte
  86. }
  87. func newChannel(conn net.Conn) *channel {
  88. return &channel{
  89. conn: conn,
  90. bw: bufio.NewWriter(conn),
  91. br: bufio.NewReader(conn),
  92. }
  93. }
  94. // recv a message from the channel. The returned buffer contains the message.
  95. //
  96. // If a valid grpc status is returned, the message header
  97. // returned will be valid and caller should send that along to
  98. // the correct consumer. The bytes on the underlying channel
  99. // will be discarded.
  100. func (ch *channel) recv() (messageHeader, []byte, error) {
  101. mh, err := readMessageHeader(ch.hrbuf[:], ch.br)
  102. if err != nil {
  103. return messageHeader{}, nil, err
  104. }
  105. if mh.Length > uint32(messageLengthMax) {
  106. if _, err := ch.br.Discard(int(mh.Length)); err != nil {
  107. return mh, nil, fmt.Errorf("failed to discard after receiving oversized message: %w", err)
  108. }
  109. return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
  110. }
  111. var p []byte
  112. if mh.Length > 0 {
  113. p = ch.getmbuf(int(mh.Length))
  114. if _, err := io.ReadFull(ch.br, p); err != nil {
  115. return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err)
  116. }
  117. }
  118. return mh, p, nil
  119. }
  120. func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
  121. // TODO: Error on send rather than on recv
  122. //if len(p) > messageLengthMax {
  123. // return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
  124. //}
  125. if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
  126. return err
  127. }
  128. if len(p) > 0 {
  129. _, err := ch.bw.Write(p)
  130. if err != nil {
  131. return err
  132. }
  133. }
  134. return ch.bw.Flush()
  135. }
  136. func (ch *channel) getmbuf(size int) []byte {
  137. // we can't use the standard New method on pool because we want to allocate
  138. // based on size.
  139. b, ok := buffers.Get().(*[]byte)
  140. if !ok || cap(*b) < size {
  141. // TODO(stevvooe): It may be better to allocate these in fixed length
  142. // buckets to reduce fragmentation but its not clear that would help
  143. // with performance. An ilogb approach or similar would work well.
  144. bb := make([]byte, size)
  145. b = &bb
  146. } else {
  147. *b = (*b)[:size]
  148. }
  149. return *b
  150. }
  151. func (ch *channel) putmbuf(p []byte) {
  152. buffers.Put(&p)
  153. }