123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- // Package connectionbroker is a layer on top of remotes that returns
- // a gRPC connection to a manager. The connection may be a local connection
- // using a local socket such as a UNIX socket.
- package connectionbroker
- import (
- "sync"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/remotes"
- grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "google.golang.org/grpc"
- )
- // Broker is a simple connection broker. It can either return a fresh
- // connection to a remote manager selected with weighted randomization, or a
- // local gRPC connection to the local manager.
- type Broker struct {
- mu sync.Mutex
- remotes remotes.Remotes
- localConn *grpc.ClientConn
- }
- // New creates a new connection broker.
- func New(remotes remotes.Remotes) *Broker {
- return &Broker{
- remotes: remotes,
- }
- }
- // SetLocalConn changes the local gRPC connection used by the connection broker.
- func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.localConn = localConn
- }
- // Select a manager from the set of available managers, and return a connection.
- func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
- b.mu.Lock()
- localConn := b.localConn
- b.mu.Unlock()
- if localConn != nil {
- return &Conn{
- ClientConn: localConn,
- isLocal: true,
- }, nil
- }
- return b.SelectRemote(dialOpts...)
- }
- // SelectRemote chooses a manager from the remotes, and returns a TCP
- // connection.
- func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
- peer, err := b.remotes.Select()
- if err != nil {
- return nil, err
- }
- dialOpts = append(dialOpts,
- grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
- grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor))
- cc, err := grpc.Dial(peer.Addr, dialOpts...)
- if err != nil {
- b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
- return nil, err
- }
- return &Conn{
- ClientConn: cc,
- remotes: b.remotes,
- peer: peer,
- }, nil
- }
- // Remotes returns the remotes interface used by the broker, so the caller
- // can make observations or see weights directly.
- func (b *Broker) Remotes() remotes.Remotes {
- return b.remotes
- }
- // Conn is a wrapper around a gRPC client connection.
- type Conn struct {
- *grpc.ClientConn
- isLocal bool
- remotes remotes.Remotes
- peer api.Peer
- }
- // Close closes the client connection if it is a remote connection. It also
- // records a positive experience with the remote peer if success is true,
- // otherwise it records a negative experience. If a local connection is in use,
- // Close is a noop.
- func (c *Conn) Close(success bool) error {
- if c.isLocal {
- return nil
- }
- if success {
- c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
- } else {
- c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
- }
- return c.ClientConn.Close()
- }
|