pool.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Copyright 2020 Google LLC.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package grpc
  5. import (
  6. "context"
  7. "fmt"
  8. "sync/atomic"
  9. "google.golang.org/api/internal"
  10. "google.golang.org/grpc"
  11. )
  12. // ConnPool is a pool of grpc.ClientConns.
  13. type ConnPool = internal.ConnPool // NOTE(cbro): type alias to export the type. It must live in internal to avoid a circular dependency.
  14. var _ ConnPool = &roundRobinConnPool{}
  15. var _ ConnPool = &singleConnPool{}
  16. // singleConnPool is a special case for a single connection.
  17. type singleConnPool struct {
  18. *grpc.ClientConn
  19. }
  20. func (p *singleConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
  21. func (p *singleConnPool) Num() int { return 1 }
  22. type roundRobinConnPool struct {
  23. conns []*grpc.ClientConn
  24. idx uint32 // access via sync/atomic
  25. }
  26. func (p *roundRobinConnPool) Num() int {
  27. return len(p.conns)
  28. }
  29. func (p *roundRobinConnPool) Conn() *grpc.ClientConn {
  30. i := atomic.AddUint32(&p.idx, 1)
  31. return p.conns[i%uint32(len(p.conns))]
  32. }
  33. func (p *roundRobinConnPool) Close() error {
  34. var errs multiError
  35. for _, conn := range p.conns {
  36. if err := conn.Close(); err != nil {
  37. errs = append(errs, err)
  38. }
  39. }
  40. if len(errs) == 0 {
  41. return nil
  42. }
  43. return errs
  44. }
  45. func (p *roundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
  46. return p.Conn().Invoke(ctx, method, args, reply, opts...)
  47. }
  48. func (p *roundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  49. return p.Conn().NewStream(ctx, desc, method, opts...)
  50. }
  51. // multiError represents errors from multiple conns in the group.
  52. //
  53. // TODO: figure out how and whether this is useful to export. End users should
  54. // not be depending on the transport/grpc package directly, so there might need
  55. // to be some service-specific multi-error type.
  56. type multiError []error
  57. func (m multiError) Error() string {
  58. s, n := "", 0
  59. for _, e := range m {
  60. if e != nil {
  61. if n == 0 {
  62. s = e.Error()
  63. }
  64. n++
  65. }
  66. }
  67. switch n {
  68. case 0:
  69. return "(0 errors)"
  70. case 1:
  71. return s
  72. case 2:
  73. return s + " (and 1 other error)"
  74. }
  75. return fmt.Sprintf("%s (and %d other errors)", s, n-1)
  76. }