|
@@ -38,13 +38,13 @@ import (
|
|
|
"google.golang.org/grpc/grpclog"
|
|
|
"google.golang.org/grpc/internal/backoff"
|
|
|
"google.golang.org/grpc/internal/channelz"
|
|
|
- "google.golang.org/grpc/internal/envconfig"
|
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
|
"google.golang.org/grpc/internal/transport"
|
|
|
"google.golang.org/grpc/keepalive"
|
|
|
"google.golang.org/grpc/resolver"
|
|
|
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
|
|
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
|
|
|
+ "google.golang.org/grpc/serviceconfig"
|
|
|
"google.golang.org/grpc/status"
|
|
|
)
|
|
|
|
|
@@ -137,6 +137,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|
|
opt.apply(&cc.dopts)
|
|
|
}
|
|
|
|
|
|
+ chainUnaryClientInterceptors(cc)
|
|
|
+ chainStreamClientInterceptors(cc)
|
|
|
+
|
|
|
defer func() {
|
|
|
if err != nil {
|
|
|
cc.Close()
|
|
@@ -290,6 +293,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|
|
CredsBundle: cc.dopts.copts.CredsBundle,
|
|
|
Dialer: cc.dopts.copts.Dialer,
|
|
|
ChannelzParentID: cc.channelzID,
|
|
|
+ Target: cc.parsedTarget,
|
|
|
}
|
|
|
|
|
|
// Build the resolver.
|
|
@@ -327,6 +331,68 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|
|
return cc, nil
|
|
|
}
|
|
|
|
|
|
+// chainUnaryClientInterceptors chains all unary client interceptors into one.
|
|
|
+func chainUnaryClientInterceptors(cc *ClientConn) {
|
|
|
+ interceptors := cc.dopts.chainUnaryInts
|
|
|
+ // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
|
|
|
+ // be executed before any other chained interceptors.
|
|
|
+ if cc.dopts.unaryInt != nil {
|
|
|
+ interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
|
|
|
+ }
|
|
|
+ var chainedInt UnaryClientInterceptor
|
|
|
+ if len(interceptors) == 0 {
|
|
|
+ chainedInt = nil
|
|
|
+ } else if len(interceptors) == 1 {
|
|
|
+ chainedInt = interceptors[0]
|
|
|
+ } else {
|
|
|
+ chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
|
|
|
+ return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cc.dopts.unaryInt = chainedInt
|
|
|
+}
|
|
|
+
|
|
|
+// getChainUnaryInvoker recursively generate the chained unary invoker.
|
|
|
+func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
|
|
|
+ if curr == len(interceptors)-1 {
|
|
|
+ return finalInvoker
|
|
|
+ }
|
|
|
+ return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
|
|
|
+ return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// chainStreamClientInterceptors chains all stream client interceptors into one.
|
|
|
+func chainStreamClientInterceptors(cc *ClientConn) {
|
|
|
+ interceptors := cc.dopts.chainStreamInts
|
|
|
+ // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
|
|
|
+ // be executed before any other chained interceptors.
|
|
|
+ if cc.dopts.streamInt != nil {
|
|
|
+ interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
|
|
|
+ }
|
|
|
+ var chainedInt StreamClientInterceptor
|
|
|
+ if len(interceptors) == 0 {
|
|
|
+ chainedInt = nil
|
|
|
+ } else if len(interceptors) == 1 {
|
|
|
+ chainedInt = interceptors[0]
|
|
|
+ } else {
|
|
|
+ chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
|
|
|
+ return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cc.dopts.streamInt = chainedInt
|
|
|
+}
|
|
|
+
|
|
|
+// getChainStreamer recursively generate the chained client stream constructor.
|
|
|
+func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
|
|
|
+ if curr == len(interceptors)-1 {
|
|
|
+ return finalStreamer
|
|
|
+ }
|
|
|
+ return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
|
|
+ return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// connectivityStateManager keeps the connectivity.State of ClientConn.
|
|
|
// This struct will eventually be exported so the balancers can access it.
|
|
|
type connectivityStateManager struct {
|
|
@@ -466,24 +532,6 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// gRPC should resort to default service config when:
|
|
|
-// * resolver service config is disabled
|
|
|
-// * or, resolver does not return a service config or returns an invalid one.
|
|
|
-func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
|
|
|
- if cc.dopts.disableServiceConfig {
|
|
|
- return true
|
|
|
- }
|
|
|
- // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
|
|
|
- // Right now, we assume that empty service config string means resolver does not return a config.
|
|
|
- if sc == "" {
|
|
|
- return true
|
|
|
- }
|
|
|
- // TODO: the logic below is temporary. Once we finish the logic to validate service config
|
|
|
- // in resolver, we will replace the logic below.
|
|
|
- _, err := parseServiceConfig(sc)
|
|
|
- return err != nil
|
|
|
-}
|
|
|
-
|
|
|
func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
|
|
cc.mu.Lock()
|
|
|
defer cc.mu.Unlock()
|
|
@@ -494,54 +542,47 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
|
|
|
+ if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
|
|
|
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
|
|
|
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
|
|
|
}
|
|
|
- } else {
|
|
|
- // TODO: the parsing logic below will be moved inside resolver.
|
|
|
- sc, err := parseServiceConfig(s.ServiceConfig)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
|
|
|
- cc.applyServiceConfig(sc)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // update the service config that will be sent to balancer.
|
|
|
- if cc.sc != nil {
|
|
|
- s.ServiceConfig = cc.sc.rawJSONString
|
|
|
+ } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
|
|
|
+ cc.applyServiceConfig(sc)
|
|
|
}
|
|
|
|
|
|
+ var balCfg serviceconfig.LoadBalancingConfig
|
|
|
if cc.dopts.balancerBuilder == nil {
|
|
|
// Only look at balancer types and switch balancer if balancer dial
|
|
|
// option is not set.
|
|
|
- var isGRPCLB bool
|
|
|
- for _, a := range s.Addresses {
|
|
|
- if a.Type == resolver.GRPCLB {
|
|
|
- isGRPCLB = true
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
var newBalancerName string
|
|
|
- // TODO: use new loadBalancerConfig field with appropriate priority.
|
|
|
- if isGRPCLB {
|
|
|
- newBalancerName = grpclbName
|
|
|
- } else if cc.sc != nil && cc.sc.LB != nil {
|
|
|
- newBalancerName = *cc.sc.LB
|
|
|
+ if cc.sc != nil && cc.sc.lbConfig != nil {
|
|
|
+ newBalancerName = cc.sc.lbConfig.name
|
|
|
+ balCfg = cc.sc.lbConfig.cfg
|
|
|
} else {
|
|
|
- newBalancerName = PickFirstBalancerName
|
|
|
+ var isGRPCLB bool
|
|
|
+ for _, a := range s.Addresses {
|
|
|
+ if a.Type == resolver.GRPCLB {
|
|
|
+ isGRPCLB = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if isGRPCLB {
|
|
|
+ newBalancerName = grpclbName
|
|
|
+ } else if cc.sc != nil && cc.sc.LB != nil {
|
|
|
+ newBalancerName = *cc.sc.LB
|
|
|
+ } else {
|
|
|
+ newBalancerName = PickFirstBalancerName
|
|
|
+ }
|
|
|
}
|
|
|
cc.switchBalancer(newBalancerName)
|
|
|
} else if cc.balancerWrapper == nil {
|
|
|
// Balancer dial option was set, and this is the first time handling
|
|
|
// resolved addresses. Build a balancer with dopts.balancerBuilder.
|
|
|
+ cc.curBalancerName = cc.dopts.balancerBuilder.Name()
|
|
|
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
|
|
|
}
|
|
|
|
|
|
- cc.balancerWrapper.updateResolverState(s)
|
|
|
- cc.firstResolveEvent.Fire()
|
|
|
+ cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -554,7 +595,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
|
|
//
|
|
|
// Caller must hold cc.mu.
|
|
|
func (cc *ClientConn) switchBalancer(name string) {
|
|
|
- if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
|
|
|
+ if strings.EqualFold(cc.curBalancerName, name) {
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -693,6 +734,8 @@ func (ac *addrConn) connect() error {
|
|
|
ac.mu.Unlock()
|
|
|
return nil
|
|
|
}
|
|
|
+ // Update connectivity state within the lock to prevent subsequent or
|
|
|
+ // concurrent calls from resetting the transport more than once.
|
|
|
ac.updateConnectivityState(connectivity.Connecting)
|
|
|
ac.mu.Unlock()
|
|
|
|
|
@@ -703,7 +746,16 @@ func (ac *addrConn) connect() error {
|
|
|
|
|
|
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
|
|
|
//
|
|
|
-// It checks whether current connected address of ac is in the new addrs list.
|
|
|
+// If ac is Connecting, it returns false. The caller should tear down the ac and
|
|
|
+// create a new one. Note that the backoff will be reset when this happens.
|
|
|
+//
|
|
|
+// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
|
|
|
+// addresses will be picked up by retry in the next iteration after backoff.
|
|
|
+//
|
|
|
+// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
|
|
|
+//
|
|
|
+// If ac is Ready, it checks whether current connected address of ac is in the
|
|
|
+// new addrs list.
|
|
|
// - If true, it updates ac.addrs and returns true. The ac will keep using
|
|
|
// the existing connection.
|
|
|
// - If false, it does nothing and returns false.
|
|
@@ -711,17 +763,18 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
|
|
ac.mu.Lock()
|
|
|
defer ac.mu.Unlock()
|
|
|
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
|
|
|
- if ac.state == connectivity.Shutdown {
|
|
|
+ if ac.state == connectivity.Shutdown ||
|
|
|
+ ac.state == connectivity.TransientFailure ||
|
|
|
+ ac.state == connectivity.Idle {
|
|
|
ac.addrs = addrs
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
- // Unless we're busy reconnecting already, let's reconnect from the top of
|
|
|
- // the list.
|
|
|
- if ac.state != connectivity.Ready {
|
|
|
+ if ac.state == connectivity.Connecting {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+ // ac.state is Ready, try to find the connected address.
|
|
|
var curAddrFound bool
|
|
|
for _, a := range addrs {
|
|
|
if reflect.DeepEqual(ac.curAddr, a) {
|
|
@@ -970,6 +1023,9 @@ func (ac *addrConn) resetTransport() {
|
|
|
// The spec doesn't mention what should be done for multiple addresses.
|
|
|
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
|
|
|
connectDeadline := time.Now().Add(dialDuration)
|
|
|
+
|
|
|
+ ac.updateConnectivityState(connectivity.Connecting)
|
|
|
+ ac.transport = nil
|
|
|
ac.mu.Unlock()
|
|
|
|
|
|
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
|
|
@@ -1004,55 +1060,32 @@ func (ac *addrConn) resetTransport() {
|
|
|
|
|
|
ac.mu.Lock()
|
|
|
if ac.state == connectivity.Shutdown {
|
|
|
- newTr.Close()
|
|
|
ac.mu.Unlock()
|
|
|
+ newTr.Close()
|
|
|
return
|
|
|
}
|
|
|
ac.curAddr = addr
|
|
|
ac.transport = newTr
|
|
|
ac.backoffIdx = 0
|
|
|
|
|
|
- healthCheckConfig := ac.cc.healthCheckConfig()
|
|
|
- // LB channel health checking is only enabled when all the four requirements below are met:
|
|
|
- // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
|
|
- // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
|
|
- // 3. a service config with non-empty healthCheckConfig field is provided,
|
|
|
- // 4. the current load balancer allows it.
|
|
|
hctx, hcancel := context.WithCancel(ac.ctx)
|
|
|
- healthcheckManagingState := false
|
|
|
- if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
|
|
- if ac.cc.dopts.healthCheckFunc == nil {
|
|
|
- // TODO: add a link to the health check doc in the error message.
|
|
|
- grpclog.Error("the client side LB channel health check function has not been set.")
|
|
|
- } else {
|
|
|
- // TODO(deklerk) refactor to just return transport
|
|
|
- go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
|
|
|
- healthcheckManagingState = true
|
|
|
- }
|
|
|
- }
|
|
|
- if !healthcheckManagingState {
|
|
|
- ac.updateConnectivityState(connectivity.Ready)
|
|
|
- }
|
|
|
+ ac.startHealthCheck(hctx)
|
|
|
ac.mu.Unlock()
|
|
|
|
|
|
// Block until the created transport is down. And when this happens,
|
|
|
// we restart from the top of the addr list.
|
|
|
<-reconnect.Done()
|
|
|
hcancel()
|
|
|
-
|
|
|
- // Need to reconnect after a READY, the addrConn enters
|
|
|
- // TRANSIENT_FAILURE.
|
|
|
+ // restart connecting - the top of the loop will set state to
|
|
|
+ // CONNECTING. This is against the current connectivity semantics doc,
|
|
|
+ // however it allows for graceful behavior for RPCs not yet dispatched
|
|
|
+ // - unfortunate timing would otherwise lead to the RPC failing even
|
|
|
+ // though the TRANSIENT_FAILURE state (called for by the doc) would be
|
|
|
+ // instantaneous.
|
|
|
//
|
|
|
- // This will set addrConn to TRANSIENT_FAILURE for a very short period
|
|
|
- // of time, and turns CONNECTING. It seems reasonable to skip this, but
|
|
|
- // READY-CONNECTING is not a valid transition.
|
|
|
- ac.mu.Lock()
|
|
|
- if ac.state == connectivity.Shutdown {
|
|
|
- ac.mu.Unlock()
|
|
|
- return
|
|
|
- }
|
|
|
- ac.updateConnectivityState(connectivity.TransientFailure)
|
|
|
- ac.mu.Unlock()
|
|
|
+ // Ideally we should transition to Idle here and block until there is
|
|
|
+ // RPC activity that leads to the balancer requesting a reconnect of
|
|
|
+ // the associated SubConn.
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1066,8 +1099,6 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
|
|
ac.mu.Unlock()
|
|
|
return nil, resolver.Address{}, nil, errConnClosing
|
|
|
}
|
|
|
- ac.updateConnectivityState(connectivity.Connecting)
|
|
|
- ac.transport = nil
|
|
|
|
|
|
ac.cc.mu.RLock()
|
|
|
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
|
@@ -1111,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
|
|
Authority: ac.cc.authority,
|
|
|
}
|
|
|
|
|
|
+ once := sync.Once{}
|
|
|
onGoAway := func(r transport.GoAwayReason) {
|
|
|
ac.mu.Lock()
|
|
|
ac.adjustParams(r)
|
|
|
+ once.Do(func() {
|
|
|
+ if ac.state == connectivity.Ready {
|
|
|
+ // Prevent this SubConn from being used for new RPCs by setting its
|
|
|
+ // state to Connecting.
|
|
|
+ //
|
|
|
+ // TODO: this should be Idle when grpc-go properly supports it.
|
|
|
+ ac.updateConnectivityState(connectivity.Connecting)
|
|
|
+ }
|
|
|
+ })
|
|
|
ac.mu.Unlock()
|
|
|
reconnect.Fire()
|
|
|
}
|
|
|
|
|
|
onClose := func() {
|
|
|
+ ac.mu.Lock()
|
|
|
+ once.Do(func() {
|
|
|
+ if ac.state == connectivity.Ready {
|
|
|
+ // Prevent this SubConn from being used for new RPCs by setting its
|
|
|
+ // state to Connecting.
|
|
|
+ //
|
|
|
+ // TODO: this should be Idle when grpc-go properly supports it.
|
|
|
+ ac.updateConnectivityState(connectivity.Connecting)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ ac.mu.Unlock()
|
|
|
close(onCloseCalled)
|
|
|
reconnect.Fire()
|
|
|
}
|
|
@@ -1140,60 +1192,99 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
|
|
return nil, nil, err
|
|
|
}
|
|
|
|
|
|
- if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
|
|
|
- select {
|
|
|
- case <-time.After(connectDeadline.Sub(time.Now())):
|
|
|
- // We didn't get the preface in time.
|
|
|
- newTr.Close()
|
|
|
- grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
|
|
- return nil, nil, errors.New("timed out waiting for server handshake")
|
|
|
- case <-prefaceReceived:
|
|
|
- // We got the preface - huzzah! things are good.
|
|
|
- case <-onCloseCalled:
|
|
|
- // The transport has already closed - noop.
|
|
|
- return nil, nil, errors.New("connection closed")
|
|
|
- // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
|
|
- }
|
|
|
+ select {
|
|
|
+ case <-time.After(connectDeadline.Sub(time.Now())):
|
|
|
+ // We didn't get the preface in time.
|
|
|
+ newTr.Close()
|
|
|
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
|
|
+ return nil, nil, errors.New("timed out waiting for server handshake")
|
|
|
+ case <-prefaceReceived:
|
|
|
+ // We got the preface - huzzah! things are good.
|
|
|
+ case <-onCloseCalled:
|
|
|
+ // The transport has already closed - noop.
|
|
|
+ return nil, nil, errors.New("connection closed")
|
|
|
+ // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
|
|
}
|
|
|
return newTr, reconnect, nil
|
|
|
}
|
|
|
|
|
|
-func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
|
|
|
- // Set up the health check helper functions
|
|
|
- newStream := func() (interface{}, error) {
|
|
|
- return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
|
|
|
+// startHealthCheck starts the health checking stream (RPC) to watch the health
|
|
|
+// stats of this connection if health checking is requested and configured.
|
|
|
+//
|
|
|
+// LB channel health checking is enabled when all requirements below are met:
|
|
|
+// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
|
|
|
+// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
|
|
|
+// 3. a service config with non-empty healthCheckConfig field is provided
|
|
|
+// 4. the load balancer requests it
|
|
|
+//
|
|
|
+// It sets addrConn to READY if the health checking stream is not started.
|
|
|
+//
|
|
|
+// Caller must hold ac.mu.
|
|
|
+func (ac *addrConn) startHealthCheck(ctx context.Context) {
|
|
|
+ var healthcheckManagingState bool
|
|
|
+ defer func() {
|
|
|
+ if !healthcheckManagingState {
|
|
|
+ ac.updateConnectivityState(connectivity.Ready)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ if ac.cc.dopts.disableHealthCheck {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ healthCheckConfig := ac.cc.healthCheckConfig()
|
|
|
+ if healthCheckConfig == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !ac.scopts.HealthCheckEnabled {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ healthCheckFunc := ac.cc.dopts.healthCheckFunc
|
|
|
+ if healthCheckFunc == nil {
|
|
|
+ // The health package is not imported to set health check function.
|
|
|
+ //
|
|
|
+ // TODO: add a link to the health check doc in the error message.
|
|
|
+ grpclog.Error("Health check is requested but health check function is not set.")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ healthcheckManagingState = true
|
|
|
+
|
|
|
+ // Set up the health check helper functions.
|
|
|
+ currentTr := ac.transport
|
|
|
+ newStream := func(method string) (interface{}, error) {
|
|
|
+ ac.mu.Lock()
|
|
|
+ if ac.transport != currentTr {
|
|
|
+ ac.mu.Unlock()
|
|
|
+ return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
|
|
|
+ }
|
|
|
+ ac.mu.Unlock()
|
|
|
+ return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
|
|
|
}
|
|
|
- firstReady := true
|
|
|
- reportHealth := func(ok bool) {
|
|
|
+ setConnectivityState := func(s connectivity.State) {
|
|
|
ac.mu.Lock()
|
|
|
defer ac.mu.Unlock()
|
|
|
- if ac.transport != newTr {
|
|
|
+ if ac.transport != currentTr {
|
|
|
return
|
|
|
}
|
|
|
- if ok {
|
|
|
- if firstReady {
|
|
|
- firstReady = false
|
|
|
- ac.curAddr = addr
|
|
|
- }
|
|
|
- ac.updateConnectivityState(connectivity.Ready)
|
|
|
- } else {
|
|
|
- ac.updateConnectivityState(connectivity.TransientFailure)
|
|
|
- }
|
|
|
+ ac.updateConnectivityState(s)
|
|
|
}
|
|
|
- err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
|
|
|
- if err != nil {
|
|
|
- if status.Code(err) == codes.Unimplemented {
|
|
|
- if channelz.IsOn() {
|
|
|
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
|
|
- Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
|
|
|
- Severity: channelz.CtError,
|
|
|
- })
|
|
|
+ // Start the health checking stream.
|
|
|
+ go func() {
|
|
|
+ err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
|
|
|
+ if err != nil {
|
|
|
+ if status.Code(err) == codes.Unimplemented {
|
|
|
+ if channelz.IsOn() {
|
|
|
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
|
|
+ Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
|
|
|
+ Severity: channelz.CtError,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
|
|
|
+ } else {
|
|
|
+ grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
|
|
|
}
|
|
|
- grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
|
|
|
- } else {
|
|
|
- grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
|
|
|
}
|
|
|
- }
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
func (ac *addrConn) resetConnectBackoff() {
|