resolver_conn_wrapper.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "strings"
  21. "sync"
  22. "google.golang.org/grpc/balancer"
  23. "google.golang.org/grpc/credentials"
  24. "google.golang.org/grpc/internal/channelz"
  25. "google.golang.org/grpc/internal/grpcsync"
  26. "google.golang.org/grpc/internal/pretty"
  27. "google.golang.org/grpc/resolver"
  28. "google.golang.org/grpc/serviceconfig"
  29. )
  30. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  31. // It implements resolver.ClientConn interface.
  32. type ccResolverWrapper struct {
  33. cc *ClientConn
  34. resolverMu sync.Mutex
  35. resolver resolver.Resolver
  36. done *grpcsync.Event
  37. curState resolver.State
  38. incomingMu sync.Mutex // Synchronizes all the incoming calls.
  39. }
  40. // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
  41. // returns a ccResolverWrapper object which wraps the newly built resolver.
  42. func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  43. ccr := &ccResolverWrapper{
  44. cc: cc,
  45. done: grpcsync.NewEvent(),
  46. }
  47. var credsClone credentials.TransportCredentials
  48. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  49. credsClone = creds.Clone()
  50. }
  51. rbo := resolver.BuildOptions{
  52. DisableServiceConfig: cc.dopts.disableServiceConfig,
  53. DialCreds: credsClone,
  54. CredsBundle: cc.dopts.copts.CredsBundle,
  55. Dialer: cc.dopts.copts.Dialer,
  56. }
  57. var err error
  58. // We need to hold the lock here while we assign to the ccr.resolver field
  59. // to guard against a data race caused by the following code path,
  60. // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
  61. // accessing ccr.resolver which is being assigned here.
  62. ccr.resolverMu.Lock()
  63. defer ccr.resolverMu.Unlock()
  64. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
  65. if err != nil {
  66. return nil, err
  67. }
  68. return ccr, nil
  69. }
  70. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  71. ccr.resolverMu.Lock()
  72. if !ccr.done.HasFired() {
  73. ccr.resolver.ResolveNow(o)
  74. }
  75. ccr.resolverMu.Unlock()
  76. }
  77. func (ccr *ccResolverWrapper) close() {
  78. ccr.resolverMu.Lock()
  79. ccr.resolver.Close()
  80. ccr.done.Fire()
  81. ccr.resolverMu.Unlock()
  82. }
  83. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
  84. ccr.incomingMu.Lock()
  85. defer ccr.incomingMu.Unlock()
  86. if ccr.done.HasFired() {
  87. return nil
  88. }
  89. ccr.addChannelzTraceEvent(s)
  90. ccr.curState = s
  91. if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
  92. return balancer.ErrBadResolverState
  93. }
  94. return nil
  95. }
  96. func (ccr *ccResolverWrapper) ReportError(err error) {
  97. ccr.incomingMu.Lock()
  98. defer ccr.incomingMu.Unlock()
  99. if ccr.done.HasFired() {
  100. return
  101. }
  102. channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
  103. ccr.cc.updateResolverState(resolver.State{}, err)
  104. }
  105. // NewAddress is called by the resolver implementation to send addresses to gRPC.
  106. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  107. ccr.incomingMu.Lock()
  108. defer ccr.incomingMu.Unlock()
  109. if ccr.done.HasFired() {
  110. return
  111. }
  112. ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
  113. ccr.curState.Addresses = addrs
  114. ccr.cc.updateResolverState(ccr.curState, nil)
  115. }
  116. // NewServiceConfig is called by the resolver implementation to send service
  117. // configs to gRPC.
  118. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  119. ccr.incomingMu.Lock()
  120. defer ccr.incomingMu.Unlock()
  121. if ccr.done.HasFired() {
  122. return
  123. }
  124. channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc)
  125. if ccr.cc.dopts.disableServiceConfig {
  126. channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
  127. return
  128. }
  129. scpr := parseServiceConfig(sc)
  130. if scpr.Err != nil {
  131. channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
  132. return
  133. }
  134. ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
  135. ccr.curState.ServiceConfig = scpr
  136. ccr.cc.updateResolverState(ccr.curState, nil)
  137. }
  138. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  139. return parseServiceConfig(scJSON)
  140. }
  141. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  142. var updates []string
  143. var oldSC, newSC *ServiceConfig
  144. var oldOK, newOK bool
  145. if ccr.curState.ServiceConfig != nil {
  146. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  147. }
  148. if s.ServiceConfig != nil {
  149. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  150. }
  151. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  152. updates = append(updates, "service config updated")
  153. }
  154. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  155. updates = append(updates, "resolver returned an empty address list")
  156. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  157. updates = append(updates, "resolver returned new addresses")
  158. }
  159. channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
  160. }