broker.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // Package connectionbroker is a layer on top of remotes that returns
  2. // a gRPC connection to a manager. The connection may be a local connection
  3. // using a local socket such as a UNIX socket.
  4. package connectionbroker
  5. import (
  6. "sync"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/remotes"
  9. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  10. "google.golang.org/grpc"
  11. )
  12. // Broker is a simple connection broker. It can either return a fresh
  13. // connection to a remote manager selected with weighted randomization, or a
  14. // local gRPC connection to the local manager.
  15. type Broker struct {
  16. mu sync.Mutex
  17. remotes remotes.Remotes
  18. localConn *grpc.ClientConn
  19. }
  20. // New creates a new connection broker.
  21. func New(remotes remotes.Remotes) *Broker {
  22. return &Broker{
  23. remotes: remotes,
  24. }
  25. }
  26. // SetLocalConn changes the local gRPC connection used by the connection broker.
  27. func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
  28. b.mu.Lock()
  29. defer b.mu.Unlock()
  30. b.localConn = localConn
  31. }
  32. // Select a manager from the set of available managers, and return a connection.
  33. func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
  34. b.mu.Lock()
  35. localConn := b.localConn
  36. b.mu.Unlock()
  37. if localConn != nil {
  38. return &Conn{
  39. ClientConn: localConn,
  40. isLocal: true,
  41. }, nil
  42. }
  43. return b.SelectRemote(dialOpts...)
  44. }
  45. // SelectRemote chooses a manager from the remotes, and returns a TCP
  46. // connection.
  47. func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
  48. peer, err := b.remotes.Select()
  49. if err != nil {
  50. return nil, err
  51. }
  52. dialOpts = append(dialOpts,
  53. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  54. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor))
  55. cc, err := grpc.Dial(peer.Addr, dialOpts...)
  56. if err != nil {
  57. b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
  58. return nil, err
  59. }
  60. return &Conn{
  61. ClientConn: cc,
  62. remotes: b.remotes,
  63. peer: peer,
  64. }, nil
  65. }
  66. // Remotes returns the remotes interface used by the broker, so the caller
  67. // can make observations or see weights directly.
  68. func (b *Broker) Remotes() remotes.Remotes {
  69. return b.remotes
  70. }
  71. // Conn is a wrapper around a gRPC client connection.
  72. type Conn struct {
  73. *grpc.ClientConn
  74. isLocal bool
  75. remotes remotes.Remotes
  76. peer api.Peer
  77. }
  78. // Close closes the client connection if it is a remote connection. It also
  79. // records a positive experience with the remote peer if success is true,
  80. // otherwise it records a negative experience. If a local connection is in use,
  81. // Close is a noop.
  82. func (c *Conn) Close(success bool) error {
  83. if c.isLocal {
  84. return nil
  85. }
  86. if success {
  87. c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
  88. } else {
  89. c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
  90. }
  91. return c.ClientConn.Close()
  92. }