rpc_util.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "encoding/binary"
  36. "fmt"
  37. "io"
  38. "math"
  39. "math/rand"
  40. "os"
  41. "time"
  42. "github.com/golang/protobuf/proto"
  43. "golang.org/x/net/context"
  44. "google.golang.org/grpc/codes"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/transport"
  47. )
  48. // Codec defines the interface gRPC uses to encode and decode messages.
  49. type Codec interface {
  50. // Marshal returns the wire format of v.
  51. Marshal(v interface{}) ([]byte, error)
  52. // Unmarshal parses the wire format into v.
  53. Unmarshal(data []byte, v interface{}) error
  54. // String returns the name of the Codec implementation. The returned
  55. // string will be used as part of content type in transmission.
  56. String() string
  57. }
  58. // protoCodec is a Codec implemetation with protobuf. It is the default codec for gRPC.
  59. type protoCodec struct{}
  60. func (protoCodec) Marshal(v interface{}) ([]byte, error) {
  61. return proto.Marshal(v.(proto.Message))
  62. }
  63. func (protoCodec) Unmarshal(data []byte, v interface{}) error {
  64. return proto.Unmarshal(data, v.(proto.Message))
  65. }
  66. func (protoCodec) String() string {
  67. return "proto"
  68. }
  69. // CallOption configures a Call before it starts or extracts information from
  70. // a Call after it completes.
  71. type CallOption interface {
  72. // before is called before the call is sent to any server. If before
  73. // returns a non-nil error, the RPC fails with that error.
  74. before(*callInfo) error
  75. // after is called after the call has completed. after cannot return an
  76. // error, so any failures should be reported via output parameters.
  77. after(*callInfo)
  78. }
  79. type beforeCall func(c *callInfo) error
  80. func (o beforeCall) before(c *callInfo) error { return o(c) }
  81. func (o beforeCall) after(c *callInfo) {}
  82. type afterCall func(c *callInfo)
  83. func (o afterCall) before(c *callInfo) error { return nil }
  84. func (o afterCall) after(c *callInfo) { o(c) }
  85. // Header returns a CallOptions that retrieves the header metadata
  86. // for a unary RPC.
  87. func Header(md *metadata.MD) CallOption {
  88. return afterCall(func(c *callInfo) {
  89. *md = c.headerMD
  90. })
  91. }
  92. // Trailer returns a CallOptions that retrieves the trailer metadata
  93. // for a unary RPC.
  94. func Trailer(md *metadata.MD) CallOption {
  95. return afterCall(func(c *callInfo) {
  96. *md = c.trailerMD
  97. })
  98. }
  99. // The format of the payload: compressed or not?
  100. type payloadFormat uint8
  101. const (
  102. compressionNone payloadFormat = iota // no compression
  103. compressionFlate
  104. // More formats
  105. )
  106. // parser reads complelete gRPC messages from the underlying reader.
  107. type parser struct {
  108. s io.Reader
  109. }
  110. // recvMsg is to read a complete gRPC message from the stream. It is blocking if
  111. // the message has not been complete yet. It returns the message and its type,
  112. // EOF is returned with nil msg and 0 pf if the entire stream is done. Other
  113. // non-nil error is returned if something is wrong on reading.
  114. func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
  115. // The header of a gRPC message. Find more detail
  116. // at http://www.grpc.io/docs/guides/wire.html.
  117. var buf [5]byte
  118. if _, err := io.ReadFull(p.s, buf[:]); err != nil {
  119. return 0, nil, err
  120. }
  121. pf = payloadFormat(buf[0])
  122. length := binary.BigEndian.Uint32(buf[1:])
  123. if length == 0 {
  124. return pf, nil, nil
  125. }
  126. msg = make([]byte, int(length))
  127. if _, err := io.ReadFull(p.s, msg); err != nil {
  128. if err == io.EOF {
  129. err = io.ErrUnexpectedEOF
  130. }
  131. return 0, nil, err
  132. }
  133. return pf, msg, nil
  134. }
  135. // encode serializes msg and prepends the message header. If msg is nil, it
  136. // generates the message header of 0 message length.
  137. func encode(c Codec, msg interface{}, pf payloadFormat) ([]byte, error) {
  138. var b []byte
  139. var length uint
  140. if msg != nil {
  141. var err error
  142. // TODO(zhaoq): optimize to reduce memory alloc and copying.
  143. b, err = c.Marshal(msg)
  144. if err != nil {
  145. return nil, err
  146. }
  147. length = uint(len(b))
  148. }
  149. if length > math.MaxUint32 {
  150. return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length)
  151. }
  152. const (
  153. payloadLen = 1
  154. sizeLen = 4
  155. )
  156. var buf = make([]byte, payloadLen+sizeLen+len(b))
  157. // Write payload format
  158. buf[0] = byte(pf)
  159. // Write length of b into buf
  160. binary.BigEndian.PutUint32(buf[1:], uint32(length))
  161. // Copy encoded msg to buf
  162. copy(buf[5:], b)
  163. return buf, nil
  164. }
  165. func recv(p *parser, c Codec, m interface{}) error {
  166. pf, d, err := p.recvMsg()
  167. if err != nil {
  168. return err
  169. }
  170. switch pf {
  171. case compressionNone:
  172. if err := c.Unmarshal(d, m); err != nil {
  173. if rErr, ok := err.(rpcError); ok {
  174. return rErr
  175. } else {
  176. return Errorf(codes.Internal, "grpc: %v", err)
  177. }
  178. }
  179. default:
  180. return Errorf(codes.Internal, "gprc: compression is not supported yet.")
  181. }
  182. return nil
  183. }
  184. // rpcError defines the status from an RPC.
  185. type rpcError struct {
  186. code codes.Code
  187. desc string
  188. }
  189. func (e rpcError) Error() string {
  190. return fmt.Sprintf("rpc error: code = %d desc = %q", e.code, e.desc)
  191. }
  192. // Code returns the error code for err if it was produced by the rpc system.
  193. // Otherwise, it returns codes.Unknown.
  194. func Code(err error) codes.Code {
  195. if err == nil {
  196. return codes.OK
  197. }
  198. if e, ok := err.(rpcError); ok {
  199. return e.code
  200. }
  201. return codes.Unknown
  202. }
  203. // ErrorDesc returns the error description of err if it was produced by the rpc system.
  204. // Otherwise, it returns err.Error() or empty string when err is nil.
  205. func ErrorDesc(err error) string {
  206. if err == nil {
  207. return ""
  208. }
  209. if e, ok := err.(rpcError); ok {
  210. return e.desc
  211. }
  212. return err.Error()
  213. }
  214. // Errorf returns an error containing an error code and a description;
  215. // Errorf returns nil if c is OK.
  216. func Errorf(c codes.Code, format string, a ...interface{}) error {
  217. if c == codes.OK {
  218. return nil
  219. }
  220. return rpcError{
  221. code: c,
  222. desc: fmt.Sprintf(format, a...),
  223. }
  224. }
  225. // toRPCErr converts an error into a rpcError.
  226. func toRPCErr(err error) error {
  227. switch e := err.(type) {
  228. case rpcError:
  229. return err
  230. case transport.StreamError:
  231. return rpcError{
  232. code: e.Code,
  233. desc: e.Desc,
  234. }
  235. case transport.ConnectionError:
  236. return rpcError{
  237. code: codes.Internal,
  238. desc: e.Desc,
  239. }
  240. }
  241. return Errorf(codes.Unknown, "%v", err)
  242. }
  243. // convertCode converts a standard Go error into its canonical code. Note that
  244. // this is only used to translate the error returned by the server applications.
  245. func convertCode(err error) codes.Code {
  246. switch err {
  247. case nil:
  248. return codes.OK
  249. case io.EOF:
  250. return codes.OutOfRange
  251. case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
  252. return codes.FailedPrecondition
  253. case os.ErrInvalid:
  254. return codes.InvalidArgument
  255. case context.Canceled:
  256. return codes.Canceled
  257. case context.DeadlineExceeded:
  258. return codes.DeadlineExceeded
  259. }
  260. switch {
  261. case os.IsExist(err):
  262. return codes.AlreadyExists
  263. case os.IsNotExist(err):
  264. return codes.NotFound
  265. case os.IsPermission(err):
  266. return codes.PermissionDenied
  267. }
  268. return codes.Unknown
  269. }
  270. const (
  271. // how long to wait after the first failure before retrying
  272. baseDelay = 1.0 * time.Second
  273. // upper bound of backoff delay
  274. maxDelay = 120 * time.Second
  275. // backoff increases by this factor on each retry
  276. backoffFactor = 1.6
  277. // backoff is randomized downwards by this factor
  278. backoffJitter = 0.2
  279. )
  280. func backoff(retries int) (t time.Duration) {
  281. if retries == 0 {
  282. return baseDelay
  283. }
  284. backoff, max := float64(baseDelay), float64(maxDelay)
  285. for backoff < max && retries > 0 {
  286. backoff *= backoffFactor
  287. retries--
  288. }
  289. if backoff > max {
  290. backoff = max
  291. }
  292. // Randomize backoff delays so that if a cluster of requests start at
  293. // the same time, they won't operate in lockstep.
  294. backoff *= 1 + backoffJitter*(rand.Float64()*2-1)
  295. if backoff < 0 {
  296. return 0
  297. }
  298. return time.Duration(backoff)
  299. }