123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- /*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- // Package grpclb defines a grpclb balancer.
- //
- // To install grpclb balancer, import this package as:
- //
- // import _ "google.golang.org/grpc/balancer/grpclb"
- package grpclb
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer"
- grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/backoff"
- "google.golang.org/grpc/internal/resolver/dns"
- "google.golang.org/grpc/resolver"
- durationpb "github.com/golang/protobuf/ptypes/duration"
- lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
- )
- const (
- lbTokenKey = "lb-token"
- defaultFallbackTimeout = 10 * time.Second
- grpclbName = "grpclb"
- )
- var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
- var logger = grpclog.Component("grpclb")
- func convertDuration(d *durationpb.Duration) time.Duration {
- if d == nil {
- return 0
- }
- return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
- }
- // Client API for LoadBalancer service.
- // Mostly copied from generated pb.go file.
- // To avoid circular dependency.
- type loadBalancerClient struct {
- cc *grpc.ClientConn
- }
- func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
- desc := &grpc.StreamDesc{
- StreamName: "BalanceLoad",
- ServerStreams: true,
- ClientStreams: true,
- }
- stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
- if err != nil {
- return nil, err
- }
- x := &balanceLoadClientStream{stream}
- return x, nil
- }
- type balanceLoadClientStream struct {
- grpc.ClientStream
- }
- func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
- return x.ClientStream.SendMsg(m)
- }
- func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
- m := new(lbpb.LoadBalanceResponse)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
- }
- func init() {
- balancer.Register(newLBBuilder())
- dns.EnableSRVLookups = true
- }
- // newLBBuilder creates a builder for grpclb.
- func newLBBuilder() balancer.Builder {
- return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
- }
- // newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
- // fallbackTimeout. If no response is received from the remote balancer within
- // fallbackTimeout, the backend addresses from the resolved address list will be
- // used.
- //
- // Only call this function when a non-default fallback timeout is needed.
- func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
- return &lbBuilder{
- fallbackTimeout: fallbackTimeout,
- }
- }
- type lbBuilder struct {
- fallbackTimeout time.Duration
- }
- func (b *lbBuilder) Name() string {
- return grpclbName
- }
- func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- // This generates a manual resolver builder with a fixed scheme. This
- // scheme will be used to dial to remote LB, so we can send filtered
- // address updates to remote LB ClientConn using this manual resolver.
- r := &lbManualResolver{scheme: "grpclb-internal", ccb: cc}
- lb := &lbBalancer{
- cc: newLBCacheClientConn(cc),
- dialTarget: opt.Target.Endpoint(),
- target: opt.Target.Endpoint(),
- opt: opt,
- fallbackTimeout: b.fallbackTimeout,
- doneCh: make(chan struct{}),
- manualResolver: r,
- subConns: make(map[resolver.Address]balancer.SubConn),
- scStates: make(map[balancer.SubConn]connectivity.State),
- picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
- clientStats: newRPCStats(),
- backoff: backoff.DefaultExponential, // TODO: make backoff configurable.
- }
- var err error
- if opt.CredsBundle != nil {
- lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
- if err != nil {
- logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
- }
- lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
- if err != nil {
- logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
- }
- }
- return lb
- }
- type lbBalancer struct {
- cc *lbCacheClientConn
- dialTarget string // user's dial target
- target string // same as dialTarget unless overridden in service config
- opt balancer.BuildOptions
- usePickFirst bool
- // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
- // servers. If it's nil, use the TransportCredentials from BuildOptions
- // instead.
- grpclbClientConnCreds credentials.Bundle
- // grpclbBackendCreds is the creds bundle to be used for addresses that are
- // returned by grpclb server. If it's nil, don't set anything when creating
- // SubConns.
- grpclbBackendCreds credentials.Bundle
- fallbackTimeout time.Duration
- doneCh chan struct{}
- // manualResolver is used in the remote LB ClientConn inside grpclb. When
- // resolved address updates are received by grpclb, filtered updates will be
- // send to remote LB ClientConn through this resolver.
- manualResolver *lbManualResolver
- // The ClientConn to talk to the remote balancer.
- ccRemoteLB *remoteBalancerCCWrapper
- // backoff for calling remote balancer.
- backoff backoff.Strategy
- // Support client side load reporting. Each picker gets a reference to this,
- // and will update its content.
- clientStats *rpcStats
- mu sync.Mutex // guards everything following.
- // The full server list including drops, used to check if the newly received
- // serverList contains anything new. Each generate picker will also have
- // reference to this list to do the first layer pick.
- fullServerList []*lbpb.Server
- // Backend addresses. It's kept so the addresses are available when
- // switching between round_robin and pickfirst.
- backendAddrs []resolver.Address
- // All backends addresses, with metadata set to nil. This list contains all
- // backend addresses in the same order and with the same duplicates as in
- // serverlist. When generating picker, a SubConn slice with the same order
- // but with only READY SCs will be gerenated.
- backendAddrsWithoutMetadata []resolver.Address
- // Roundrobin functionalities.
- state connectivity.State
- subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
- scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
- picker balancer.Picker
- // Support fallback to resolved backend addresses if there's no response
- // from remote balancer within fallbackTimeout.
- remoteBalancerConnected bool
- serverListReceived bool
- inFallback bool
- // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
- // when resolved address updates are received, and read in the goroutine
- // handling fallback.
- resolvedBackendAddrs []resolver.Address
- connErr error // the last connection error
- }
- // regeneratePicker takes a snapshot of the balancer, and generates a picker from
- // it. The picker
- // - always returns ErrTransientFailure if the balancer is in TransientFailure,
- // - does two layer roundrobin pick otherwise.
- //
- // Caller must hold lb.mu.
- func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
- if lb.state == connectivity.TransientFailure {
- lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)}
- return
- }
- if lb.state == connectivity.Connecting {
- lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
- return
- }
- var readySCs []balancer.SubConn
- if lb.usePickFirst {
- for _, sc := range lb.subConns {
- readySCs = append(readySCs, sc)
- break
- }
- } else {
- for _, a := range lb.backendAddrsWithoutMetadata {
- if sc, ok := lb.subConns[a]; ok {
- if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
- readySCs = append(readySCs, sc)
- }
- }
- }
- }
- if len(readySCs) <= 0 {
- // If there's no ready SubConns, always re-pick. This is to avoid drops
- // unless at least one SubConn is ready. Otherwise we may drop more
- // often than want because of drops + re-picks(which become re-drops).
- //
- // This doesn't seem to be necessary after the connecting check above.
- // Kept for safety.
- lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
- return
- }
- if lb.inFallback {
- lb.picker = newRRPicker(readySCs)
- return
- }
- if resetDrop {
- lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
- return
- }
- prevLBPicker, ok := lb.picker.(*lbPicker)
- if !ok {
- lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
- return
- }
- prevLBPicker.updateReadySCs(readySCs)
- }
- // aggregateSubConnStats calculate the aggregated state of SubConns in
- // lb.SubConns. These SubConns are subconns in use (when switching between
- // fallback and grpclb). lb.scState contains states for all SubConns, including
- // those in cache (SubConns are cached for 10 seconds after shutdown).
- //
- // The aggregated state is:
- // - If at least one SubConn in Ready, the aggregated state is Ready;
- // - Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting;
- // - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE,
- // they start to connect immediately. But there's a race between the overall
- // state is reported, and when the new SubConn state arrives. And SubConns
- // never go back to IDLE.
- // - Else the aggregated state is TransientFailure.
- func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
- var numConnecting uint64
- for _, sc := range lb.subConns {
- if state, ok := lb.scStates[sc]; ok {
- switch state {
- case connectivity.Ready:
- return connectivity.Ready
- case connectivity.Connecting, connectivity.Idle:
- numConnecting++
- }
- }
- }
- if numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
- }
- // UpdateSubConnState is unused; NewSubConn's options always specifies
- // updateSubConnState as the listener.
- func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
- logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
- }
- func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
- s := scs.ConnectivityState
- if logger.V(2) {
- logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
- }
- lb.mu.Lock()
- defer lb.mu.Unlock()
- oldS, ok := lb.scStates[sc]
- if !ok {
- if logger.V(2) {
- logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
- }
- return
- }
- lb.scStates[sc] = s
- switch s {
- case connectivity.Idle:
- sc.Connect()
- case connectivity.Shutdown:
- // When an address was removed by resolver, b called Shutdown but kept
- // the sc's state in scStates. Remove state for this sc here.
- delete(lb.scStates, sc)
- case connectivity.TransientFailure:
- lb.connErr = scs.ConnectionError
- }
- // Force regenerate picker if
- // - this sc became ready from not-ready
- // - this sc became not-ready from ready
- lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
- // Enter fallback when the aggregated state is not Ready and the connection
- // to remote balancer is lost.
- if lb.state != connectivity.Ready {
- if !lb.inFallback && !lb.remoteBalancerConnected {
- // Enter fallback.
- lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
- }
- }
- }
- // updateStateAndPicker re-calculate the aggregated state, and regenerate picker
- // if overall state is changed.
- //
- // If forceRegeneratePicker is true, picker will be regenerated.
- func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
- oldAggrState := lb.state
- lb.state = lb.aggregateSubConnStates()
- // Regenerate picker when one of the following happens:
- // - caller wants to regenerate
- // - the aggregated state changed
- if forceRegeneratePicker || (lb.state != oldAggrState) {
- lb.regeneratePicker(resetDrop)
- }
- var cc balancer.ClientConn = lb.cc
- if lb.usePickFirst {
- // Bypass the caching layer that would wrap the picker.
- cc = lb.cc.ClientConn
- }
- cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
- }
- // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
- // resolved backends (backends received from resolver, not from remote balancer)
- // if no connection to remote balancers was successful.
- func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
- timer := time.NewTimer(fallbackTimeout)
- defer timer.Stop()
- select {
- case <-timer.C:
- case <-lb.doneCh:
- return
- }
- lb.mu.Lock()
- if lb.inFallback || lb.serverListReceived {
- lb.mu.Unlock()
- return
- }
- // Enter fallback.
- lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
- lb.mu.Unlock()
- }
- func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- // grpclb uses the user's dial target to populate the `Name` field of the
- // `InitialLoadBalanceRequest` message sent to the remote balancer. But when
- // grpclb is used a child policy in the context of RLS, we want the `Name`
- // field to be populated with the value received from the RLS server. To
- // support this use case, an optional "target_name" field has been added to
- // the grpclb LB policy's config. If specified, it overrides the name of
- // the target to be sent to the remote balancer; if not, the target to be
- // sent to the balancer will continue to be obtained from the target URI
- // passed to the gRPC client channel. Whenever that target to be sent to the
- // balancer is updated, we need to restart the stream to the balancer as
- // this target is sent in the first message on the stream.
- if gc != nil {
- target := lb.dialTarget
- if gc.ServiceName != "" {
- target = gc.ServiceName
- }
- if target != lb.target {
- lb.target = target
- if lb.ccRemoteLB != nil {
- lb.ccRemoteLB.cancelRemoteBalancerCall()
- }
- }
- }
- newUsePickFirst := childIsPickFirst(gc)
- if lb.usePickFirst == newUsePickFirst {
- return
- }
- if logger.V(2) {
- logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
- }
- lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
- }
- func (lb *lbBalancer) ResolverError(error) {
- // Ignore resolver errors. GRPCLB is not selected unless the resolver
- // works at least once.
- }
- func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
- if logger.V(2) {
- logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
- }
- gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
- lb.handleServiceConfig(gc)
- backendAddrs := ccs.ResolverState.Addresses
- var remoteBalancerAddrs []resolver.Address
- if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
- // Override any balancer addresses provided via
- // ccs.ResolverState.Addresses.
- remoteBalancerAddrs = sd.BalancerAddresses
- }
- if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
- // There should be at least one address, either grpclb server or
- // fallback. Empty address is not valid.
- return balancer.ErrBadResolverState
- }
- if len(remoteBalancerAddrs) == 0 {
- if lb.ccRemoteLB != nil {
- lb.ccRemoteLB.close()
- lb.ccRemoteLB = nil
- }
- } else if lb.ccRemoteLB == nil {
- // First time receiving resolved addresses, create a cc to remote
- // balancers.
- lb.newRemoteBalancerCCWrapper()
- // Start the fallback goroutine.
- go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
- }
- if lb.ccRemoteLB != nil {
- // cc to remote balancers uses lb.manualResolver. Send the updated remote
- // balancer addresses to it through manualResolver.
- lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
- }
- lb.mu.Lock()
- lb.resolvedBackendAddrs = backendAddrs
- if len(remoteBalancerAddrs) == 0 || lb.inFallback {
- // If there's no remote balancer address in ClientConn update, grpclb
- // enters fallback mode immediately.
- //
- // If a new update is received while grpclb is in fallback, update the
- // list of backends being used to the new fallback backends.
- lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
- }
- lb.mu.Unlock()
- return nil
- }
- func (lb *lbBalancer) Close() {
- select {
- case <-lb.doneCh:
- return
- default:
- }
- close(lb.doneCh)
- if lb.ccRemoteLB != nil {
- lb.ccRemoteLB.close()
- }
- lb.cc.close()
- }
- func (lb *lbBalancer) ExitIdle() {}
|