grpclb.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package grpclb defines a grpclb balancer.
  19. //
  20. // To install grpclb balancer, import this package as:
  21. //
  22. // import _ "google.golang.org/grpc/balancer/grpclb"
  23. package grpclb
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "sync"
  29. "time"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/balancer"
  32. grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
  33. "google.golang.org/grpc/connectivity"
  34. "google.golang.org/grpc/credentials"
  35. "google.golang.org/grpc/grpclog"
  36. "google.golang.org/grpc/internal"
  37. "google.golang.org/grpc/internal/backoff"
  38. "google.golang.org/grpc/internal/resolver/dns"
  39. "google.golang.org/grpc/resolver"
  40. durationpb "github.com/golang/protobuf/ptypes/duration"
  41. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  42. )
  43. const (
  44. lbTokenKey = "lb-token"
  45. defaultFallbackTimeout = 10 * time.Second
  46. grpclbName = "grpclb"
  47. )
  48. var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
  49. var logger = grpclog.Component("grpclb")
  50. func convertDuration(d *durationpb.Duration) time.Duration {
  51. if d == nil {
  52. return 0
  53. }
  54. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  55. }
  56. // Client API for LoadBalancer service.
  57. // Mostly copied from generated pb.go file.
  58. // To avoid circular dependency.
  59. type loadBalancerClient struct {
  60. cc *grpc.ClientConn
  61. }
  62. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
  63. desc := &grpc.StreamDesc{
  64. StreamName: "BalanceLoad",
  65. ServerStreams: true,
  66. ClientStreams: true,
  67. }
  68. stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  69. if err != nil {
  70. return nil, err
  71. }
  72. x := &balanceLoadClientStream{stream}
  73. return x, nil
  74. }
  75. type balanceLoadClientStream struct {
  76. grpc.ClientStream
  77. }
  78. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  79. return x.ClientStream.SendMsg(m)
  80. }
  81. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  82. m := new(lbpb.LoadBalanceResponse)
  83. if err := x.ClientStream.RecvMsg(m); err != nil {
  84. return nil, err
  85. }
  86. return m, nil
  87. }
  88. func init() {
  89. balancer.Register(newLBBuilder())
  90. dns.EnableSRVLookups = true
  91. }
  92. // newLBBuilder creates a builder for grpclb.
  93. func newLBBuilder() balancer.Builder {
  94. return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
  95. }
  96. // newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
  97. // fallbackTimeout. If no response is received from the remote balancer within
  98. // fallbackTimeout, the backend addresses from the resolved address list will be
  99. // used.
  100. //
  101. // Only call this function when a non-default fallback timeout is needed.
  102. func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
  103. return &lbBuilder{
  104. fallbackTimeout: fallbackTimeout,
  105. }
  106. }
  107. type lbBuilder struct {
  108. fallbackTimeout time.Duration
  109. }
  110. func (b *lbBuilder) Name() string {
  111. return grpclbName
  112. }
  113. func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  114. // This generates a manual resolver builder with a fixed scheme. This
  115. // scheme will be used to dial to remote LB, so we can send filtered
  116. // address updates to remote LB ClientConn using this manual resolver.
  117. r := &lbManualResolver{scheme: "grpclb-internal", ccb: cc}
  118. lb := &lbBalancer{
  119. cc: newLBCacheClientConn(cc),
  120. dialTarget: opt.Target.Endpoint(),
  121. target: opt.Target.Endpoint(),
  122. opt: opt,
  123. fallbackTimeout: b.fallbackTimeout,
  124. doneCh: make(chan struct{}),
  125. manualResolver: r,
  126. subConns: make(map[resolver.Address]balancer.SubConn),
  127. scStates: make(map[balancer.SubConn]connectivity.State),
  128. picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
  129. clientStats: newRPCStats(),
  130. backoff: backoff.DefaultExponential, // TODO: make backoff configurable.
  131. }
  132. var err error
  133. if opt.CredsBundle != nil {
  134. lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
  135. if err != nil {
  136. logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
  137. }
  138. lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
  139. if err != nil {
  140. logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
  141. }
  142. }
  143. return lb
  144. }
  145. type lbBalancer struct {
  146. cc *lbCacheClientConn
  147. dialTarget string // user's dial target
  148. target string // same as dialTarget unless overridden in service config
  149. opt balancer.BuildOptions
  150. usePickFirst bool
  151. // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
  152. // servers. If it's nil, use the TransportCredentials from BuildOptions
  153. // instead.
  154. grpclbClientConnCreds credentials.Bundle
  155. // grpclbBackendCreds is the creds bundle to be used for addresses that are
  156. // returned by grpclb server. If it's nil, don't set anything when creating
  157. // SubConns.
  158. grpclbBackendCreds credentials.Bundle
  159. fallbackTimeout time.Duration
  160. doneCh chan struct{}
  161. // manualResolver is used in the remote LB ClientConn inside grpclb. When
  162. // resolved address updates are received by grpclb, filtered updates will be
  163. // send to remote LB ClientConn through this resolver.
  164. manualResolver *lbManualResolver
  165. // The ClientConn to talk to the remote balancer.
  166. ccRemoteLB *remoteBalancerCCWrapper
  167. // backoff for calling remote balancer.
  168. backoff backoff.Strategy
  169. // Support client side load reporting. Each picker gets a reference to this,
  170. // and will update its content.
  171. clientStats *rpcStats
  172. mu sync.Mutex // guards everything following.
  173. // The full server list including drops, used to check if the newly received
  174. // serverList contains anything new. Each generate picker will also have
  175. // reference to this list to do the first layer pick.
  176. fullServerList []*lbpb.Server
  177. // Backend addresses. It's kept so the addresses are available when
  178. // switching between round_robin and pickfirst.
  179. backendAddrs []resolver.Address
  180. // All backends addresses, with metadata set to nil. This list contains all
  181. // backend addresses in the same order and with the same duplicates as in
  182. // serverlist. When generating picker, a SubConn slice with the same order
  183. // but with only READY SCs will be gerenated.
  184. backendAddrsWithoutMetadata []resolver.Address
  185. // Roundrobin functionalities.
  186. state connectivity.State
  187. subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
  188. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
  189. picker balancer.Picker
  190. // Support fallback to resolved backend addresses if there's no response
  191. // from remote balancer within fallbackTimeout.
  192. remoteBalancerConnected bool
  193. serverListReceived bool
  194. inFallback bool
  195. // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
  196. // when resolved address updates are received, and read in the goroutine
  197. // handling fallback.
  198. resolvedBackendAddrs []resolver.Address
  199. connErr error // the last connection error
  200. }
  201. // regeneratePicker takes a snapshot of the balancer, and generates a picker from
  202. // it. The picker
  203. // - always returns ErrTransientFailure if the balancer is in TransientFailure,
  204. // - does two layer roundrobin pick otherwise.
  205. //
  206. // Caller must hold lb.mu.
  207. func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
  208. if lb.state == connectivity.TransientFailure {
  209. lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)}
  210. return
  211. }
  212. if lb.state == connectivity.Connecting {
  213. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
  214. return
  215. }
  216. var readySCs []balancer.SubConn
  217. if lb.usePickFirst {
  218. for _, sc := range lb.subConns {
  219. readySCs = append(readySCs, sc)
  220. break
  221. }
  222. } else {
  223. for _, a := range lb.backendAddrsWithoutMetadata {
  224. if sc, ok := lb.subConns[a]; ok {
  225. if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
  226. readySCs = append(readySCs, sc)
  227. }
  228. }
  229. }
  230. }
  231. if len(readySCs) <= 0 {
  232. // If there's no ready SubConns, always re-pick. This is to avoid drops
  233. // unless at least one SubConn is ready. Otherwise we may drop more
  234. // often than want because of drops + re-picks(which become re-drops).
  235. //
  236. // This doesn't seem to be necessary after the connecting check above.
  237. // Kept for safety.
  238. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
  239. return
  240. }
  241. if lb.inFallback {
  242. lb.picker = newRRPicker(readySCs)
  243. return
  244. }
  245. if resetDrop {
  246. lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
  247. return
  248. }
  249. prevLBPicker, ok := lb.picker.(*lbPicker)
  250. if !ok {
  251. lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
  252. return
  253. }
  254. prevLBPicker.updateReadySCs(readySCs)
  255. }
  256. // aggregateSubConnStats calculate the aggregated state of SubConns in
  257. // lb.SubConns. These SubConns are subconns in use (when switching between
  258. // fallback and grpclb). lb.scState contains states for all SubConns, including
  259. // those in cache (SubConns are cached for 10 seconds after shutdown).
  260. //
  261. // The aggregated state is:
  262. // - If at least one SubConn in Ready, the aggregated state is Ready;
  263. // - Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting;
  264. // - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE,
  265. // they start to connect immediately. But there's a race between the overall
  266. // state is reported, and when the new SubConn state arrives. And SubConns
  267. // never go back to IDLE.
  268. // - Else the aggregated state is TransientFailure.
  269. func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
  270. var numConnecting uint64
  271. for _, sc := range lb.subConns {
  272. if state, ok := lb.scStates[sc]; ok {
  273. switch state {
  274. case connectivity.Ready:
  275. return connectivity.Ready
  276. case connectivity.Connecting, connectivity.Idle:
  277. numConnecting++
  278. }
  279. }
  280. }
  281. if numConnecting > 0 {
  282. return connectivity.Connecting
  283. }
  284. return connectivity.TransientFailure
  285. }
  286. // UpdateSubConnState is unused; NewSubConn's options always specifies
  287. // updateSubConnState as the listener.
  288. func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
  289. logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
  290. }
  291. func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
  292. s := scs.ConnectivityState
  293. if logger.V(2) {
  294. logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
  295. }
  296. lb.mu.Lock()
  297. defer lb.mu.Unlock()
  298. oldS, ok := lb.scStates[sc]
  299. if !ok {
  300. if logger.V(2) {
  301. logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
  302. }
  303. return
  304. }
  305. lb.scStates[sc] = s
  306. switch s {
  307. case connectivity.Idle:
  308. sc.Connect()
  309. case connectivity.Shutdown:
  310. // When an address was removed by resolver, b called Shutdown but kept
  311. // the sc's state in scStates. Remove state for this sc here.
  312. delete(lb.scStates, sc)
  313. case connectivity.TransientFailure:
  314. lb.connErr = scs.ConnectionError
  315. }
  316. // Force regenerate picker if
  317. // - this sc became ready from not-ready
  318. // - this sc became not-ready from ready
  319. lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
  320. // Enter fallback when the aggregated state is not Ready and the connection
  321. // to remote balancer is lost.
  322. if lb.state != connectivity.Ready {
  323. if !lb.inFallback && !lb.remoteBalancerConnected {
  324. // Enter fallback.
  325. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  326. }
  327. }
  328. }
  329. // updateStateAndPicker re-calculate the aggregated state, and regenerate picker
  330. // if overall state is changed.
  331. //
  332. // If forceRegeneratePicker is true, picker will be regenerated.
  333. func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
  334. oldAggrState := lb.state
  335. lb.state = lb.aggregateSubConnStates()
  336. // Regenerate picker when one of the following happens:
  337. // - caller wants to regenerate
  338. // - the aggregated state changed
  339. if forceRegeneratePicker || (lb.state != oldAggrState) {
  340. lb.regeneratePicker(resetDrop)
  341. }
  342. var cc balancer.ClientConn = lb.cc
  343. if lb.usePickFirst {
  344. // Bypass the caching layer that would wrap the picker.
  345. cc = lb.cc.ClientConn
  346. }
  347. cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
  348. }
  349. // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
  350. // resolved backends (backends received from resolver, not from remote balancer)
  351. // if no connection to remote balancers was successful.
  352. func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
  353. timer := time.NewTimer(fallbackTimeout)
  354. defer timer.Stop()
  355. select {
  356. case <-timer.C:
  357. case <-lb.doneCh:
  358. return
  359. }
  360. lb.mu.Lock()
  361. if lb.inFallback || lb.serverListReceived {
  362. lb.mu.Unlock()
  363. return
  364. }
  365. // Enter fallback.
  366. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  367. lb.mu.Unlock()
  368. }
  369. func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
  370. lb.mu.Lock()
  371. defer lb.mu.Unlock()
  372. // grpclb uses the user's dial target to populate the `Name` field of the
  373. // `InitialLoadBalanceRequest` message sent to the remote balancer. But when
  374. // grpclb is used a child policy in the context of RLS, we want the `Name`
  375. // field to be populated with the value received from the RLS server. To
  376. // support this use case, an optional "target_name" field has been added to
  377. // the grpclb LB policy's config. If specified, it overrides the name of
  378. // the target to be sent to the remote balancer; if not, the target to be
  379. // sent to the balancer will continue to be obtained from the target URI
  380. // passed to the gRPC client channel. Whenever that target to be sent to the
  381. // balancer is updated, we need to restart the stream to the balancer as
  382. // this target is sent in the first message on the stream.
  383. if gc != nil {
  384. target := lb.dialTarget
  385. if gc.ServiceName != "" {
  386. target = gc.ServiceName
  387. }
  388. if target != lb.target {
  389. lb.target = target
  390. if lb.ccRemoteLB != nil {
  391. lb.ccRemoteLB.cancelRemoteBalancerCall()
  392. }
  393. }
  394. }
  395. newUsePickFirst := childIsPickFirst(gc)
  396. if lb.usePickFirst == newUsePickFirst {
  397. return
  398. }
  399. if logger.V(2) {
  400. logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
  401. }
  402. lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
  403. }
  404. func (lb *lbBalancer) ResolverError(error) {
  405. // Ignore resolver errors. GRPCLB is not selected unless the resolver
  406. // works at least once.
  407. }
  408. func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
  409. if logger.V(2) {
  410. logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
  411. }
  412. gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
  413. lb.handleServiceConfig(gc)
  414. backendAddrs := ccs.ResolverState.Addresses
  415. var remoteBalancerAddrs []resolver.Address
  416. if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
  417. // Override any balancer addresses provided via
  418. // ccs.ResolverState.Addresses.
  419. remoteBalancerAddrs = sd.BalancerAddresses
  420. }
  421. if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
  422. // There should be at least one address, either grpclb server or
  423. // fallback. Empty address is not valid.
  424. return balancer.ErrBadResolverState
  425. }
  426. if len(remoteBalancerAddrs) == 0 {
  427. if lb.ccRemoteLB != nil {
  428. lb.ccRemoteLB.close()
  429. lb.ccRemoteLB = nil
  430. }
  431. } else if lb.ccRemoteLB == nil {
  432. // First time receiving resolved addresses, create a cc to remote
  433. // balancers.
  434. lb.newRemoteBalancerCCWrapper()
  435. // Start the fallback goroutine.
  436. go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
  437. }
  438. if lb.ccRemoteLB != nil {
  439. // cc to remote balancers uses lb.manualResolver. Send the updated remote
  440. // balancer addresses to it through manualResolver.
  441. lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
  442. }
  443. lb.mu.Lock()
  444. lb.resolvedBackendAddrs = backendAddrs
  445. if len(remoteBalancerAddrs) == 0 || lb.inFallback {
  446. // If there's no remote balancer address in ClientConn update, grpclb
  447. // enters fallback mode immediately.
  448. //
  449. // If a new update is received while grpclb is in fallback, update the
  450. // list of backends being used to the new fallback backends.
  451. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  452. }
  453. lb.mu.Unlock()
  454. return nil
  455. }
  456. func (lb *lbBalancer) Close() {
  457. select {
  458. case <-lb.doneCh:
  459. return
  460. default:
  461. }
  462. close(lb.doneCh)
  463. if lb.ccRemoteLB != nil {
  464. lb.ccRemoteLB.close()
  465. }
  466. lb.cc.close()
  467. }
  468. func (lb *lbBalancer) ExitIdle() {}