dial.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. // Copyright 2015 Google LLC.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package grpc supports network connections to GRPC servers.
  5. // This package is not intended for use by end developers. Use the
  6. // google.golang.org/api/option package to configure API clients.
  7. package grpc
  8. import (
  9. "context"
  10. "errors"
  11. "log"
  12. "net"
  13. "os"
  14. "strings"
  15. "cloud.google.com/go/compute/metadata"
  16. "go.opencensus.io/plugin/ocgrpc"
  17. "golang.org/x/oauth2"
  18. "google.golang.org/api/internal"
  19. "google.golang.org/api/option"
  20. "google.golang.org/grpc"
  21. grpcgoogle "google.golang.org/grpc/credentials/google"
  22. grpcinsecure "google.golang.org/grpc/credentials/insecure"
  23. "google.golang.org/grpc/credentials/oauth"
  24. // Install grpclb, which is required for direct path.
  25. _ "google.golang.org/grpc/balancer/grpclb"
  26. )
  27. // Check env to disable DirectPath traffic.
  28. const disableDirectPath = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
  29. // Check env to decide if using google-c2p resolver for DirectPath traffic.
  30. const enableDirectPathXds = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
  31. // Set at init time by dial_appengine.go. If nil, we're not on App Engine.
  32. var appengineDialerHook func(context.Context) grpc.DialOption
  33. // Set at init time by dial_socketopt.go. If nil, socketopt is not supported.
  34. var timeoutDialerOption grpc.DialOption
  35. // Dial returns a GRPC connection for use communicating with a Google cloud
  36. // service, configured with the given ClientOptions.
  37. func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  38. o, err := processAndValidateOpts(opts)
  39. if err != nil {
  40. return nil, err
  41. }
  42. if o.GRPCConnPool != nil {
  43. return o.GRPCConnPool.Conn(), nil
  44. }
  45. // NOTE(cbro): We removed support for option.WithGRPCConnPool (GRPCConnPoolSize)
  46. // on 2020-02-12 because RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
  47. //
  48. // Connection pooling is only done via DialPool.
  49. return dial(ctx, false, o)
  50. }
  51. // DialInsecure returns an insecure GRPC connection for use communicating
  52. // with fake or mock Google cloud service implementations, such as emulators.
  53. // The connection is configured with the given ClientOptions.
  54. func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  55. o, err := processAndValidateOpts(opts)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return dial(ctx, true, o)
  60. }
  61. // DialPool returns a pool of GRPC connections for the given service.
  62. // This differs from the connection pooling implementation used by Dial, which uses a custom GRPC load balancer.
  63. // DialPool should be used instead of Dial when a pool is used by default or a different custom GRPC load balancer is needed.
  64. // The context and options are shared between each Conn in the pool.
  65. // The pool size is configured using the WithGRPCConnectionPool option.
  66. //
  67. // This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
  68. func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
  69. o, err := processAndValidateOpts(opts)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if o.GRPCConnPool != nil {
  74. return o.GRPCConnPool, nil
  75. }
  76. poolSize := o.GRPCConnPoolSize
  77. if o.GRPCConn != nil {
  78. // WithGRPCConn is technically incompatible with WithGRPCConnectionPool.
  79. // Always assume pool size is 1 when a grpc.ClientConn is explicitly used.
  80. poolSize = 1
  81. }
  82. o.GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
  83. if poolSize == 0 || poolSize == 1 {
  84. // Fast path for common case for a connection pool with a single connection.
  85. conn, err := dial(ctx, false, o)
  86. if err != nil {
  87. return nil, err
  88. }
  89. return &singleConnPool{conn}, nil
  90. }
  91. pool := &roundRobinConnPool{}
  92. for i := 0; i < poolSize; i++ {
  93. conn, err := dial(ctx, false, o)
  94. if err != nil {
  95. defer pool.Close() // NOTE: error from Close is ignored.
  96. return nil, err
  97. }
  98. pool.conns = append(pool.conns, conn)
  99. }
  100. return pool, nil
  101. }
  102. func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
  103. if o.HTTPClient != nil {
  104. return nil, errors.New("unsupported HTTP client specified")
  105. }
  106. if o.GRPCConn != nil {
  107. return o.GRPCConn, nil
  108. }
  109. transportCreds, endpoint, err := internal.GetGRPCTransportConfigAndEndpoint(o)
  110. if err != nil {
  111. return nil, err
  112. }
  113. if insecure {
  114. transportCreds = grpcinsecure.NewCredentials()
  115. }
  116. // Initialize gRPC dial options with transport-level security options.
  117. grpcOpts := []grpc.DialOption{
  118. grpc.WithTransportCredentials(transportCreds),
  119. }
  120. // Authentication can only be sent when communicating over a secure connection.
  121. //
  122. // TODO: Should we be more lenient in the future and allow sending credentials
  123. // when dialing an insecure connection?
  124. if !o.NoAuth && !insecure {
  125. if o.APIKey != "" {
  126. log.Print("API keys are not supported for gRPC APIs. Remove the WithAPIKey option from your client-creating call.")
  127. }
  128. creds, err := internal.Creds(ctx, o)
  129. if err != nil {
  130. return nil, err
  131. }
  132. grpcOpts = append(grpcOpts,
  133. grpc.WithPerRPCCredentials(grpcTokenSource{
  134. TokenSource: oauth.TokenSource{creds.TokenSource},
  135. quotaProject: internal.GetQuotaProject(creds, o.QuotaProject),
  136. requestReason: o.RequestReason,
  137. }),
  138. )
  139. // Attempt Direct Path:
  140. if isDirectPathEnabled(endpoint, o) && isTokenSourceDirectPathCompatible(creds.TokenSource, o) && metadata.OnGCE() {
  141. // Overwrite all of the previously specific DialOptions, DirectPath uses its own set of credentials and certificates.
  142. grpcOpts = []grpc.DialOption{
  143. grpc.WithCredentialsBundle(grpcgoogle.NewDefaultCredentialsWithOptions(grpcgoogle.DefaultCredentialsOptions{oauth.TokenSource{creds.TokenSource}}))}
  144. if timeoutDialerOption != nil {
  145. grpcOpts = append(grpcOpts, timeoutDialerOption)
  146. }
  147. // Check if google-c2p resolver is enabled for DirectPath
  148. if isDirectPathXdsUsed(o) {
  149. // google-c2p resolver target must not have a port number
  150. if addr, _, err := net.SplitHostPort(endpoint); err == nil {
  151. endpoint = "google-c2p:///" + addr
  152. } else {
  153. endpoint = "google-c2p:///" + endpoint
  154. }
  155. } else {
  156. if !strings.HasPrefix(endpoint, "dns:///") {
  157. endpoint = "dns:///" + endpoint
  158. }
  159. grpcOpts = append(grpcOpts,
  160. // For now all DirectPath go clients will be using the following lb config, but in future
  161. // when different services need different configs, then we should change this to a
  162. // per-service config.
  163. grpc.WithDisableServiceConfig(),
  164. grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`))
  165. }
  166. // TODO(cbro): add support for system parameters (quota project, request reason) via chained interceptor.
  167. }
  168. }
  169. if appengineDialerHook != nil {
  170. // Use the Socket API on App Engine.
  171. // appengine dialer will override socketopt dialer
  172. grpcOpts = append(grpcOpts, appengineDialerHook(ctx))
  173. }
  174. // Add tracing, but before the other options, so that clients can override the
  175. // gRPC stats handler.
  176. // This assumes that gRPC options are processed in order, left to right.
  177. grpcOpts = addOCStatsHandler(grpcOpts, o)
  178. grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
  179. if o.UserAgent != "" {
  180. grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
  181. }
  182. return grpc.DialContext(ctx, endpoint, grpcOpts...)
  183. }
  184. func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
  185. if settings.TelemetryDisabled {
  186. return opts
  187. }
  188. return append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
  189. }
  190. // grpcTokenSource supplies PerRPCCredentials from an oauth.TokenSource.
  191. type grpcTokenSource struct {
  192. oauth.TokenSource
  193. // Additional metadata attached as headers.
  194. quotaProject string
  195. requestReason string
  196. }
  197. // GetRequestMetadata gets the request metadata as a map from a grpcTokenSource.
  198. func (ts grpcTokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (
  199. map[string]string, error) {
  200. metadata, err := ts.TokenSource.GetRequestMetadata(ctx, uri...)
  201. if err != nil {
  202. return nil, err
  203. }
  204. // Attach system parameter
  205. if ts.quotaProject != "" {
  206. metadata["X-goog-user-project"] = ts.quotaProject
  207. }
  208. if ts.requestReason != "" {
  209. metadata["X-goog-request-reason"] = ts.requestReason
  210. }
  211. return metadata, nil
  212. }
  213. func isDirectPathEnabled(endpoint string, o *internal.DialSettings) bool {
  214. if !o.EnableDirectPath {
  215. return false
  216. }
  217. if !checkDirectPathEndPoint(endpoint) {
  218. return false
  219. }
  220. if strings.EqualFold(os.Getenv(disableDirectPath), "true") {
  221. return false
  222. }
  223. return true
  224. }
  225. func isDirectPathXdsUsed(o *internal.DialSettings) bool {
  226. // Method 1: Enable DirectPath xDS by env;
  227. if strings.EqualFold(os.Getenv(enableDirectPathXds), "true") {
  228. return true
  229. }
  230. // Method 2: Enable DirectPath xDS by option;
  231. if o.EnableDirectPathXds {
  232. return true
  233. }
  234. return false
  235. }
  236. func isTokenSourceDirectPathCompatible(ts oauth2.TokenSource, o *internal.DialSettings) bool {
  237. if ts == nil {
  238. return false
  239. }
  240. tok, err := ts.Token()
  241. if err != nil {
  242. return false
  243. }
  244. if tok == nil {
  245. return false
  246. }
  247. if o.AllowNonDefaultServiceAccount {
  248. return true
  249. }
  250. if source, _ := tok.Extra("oauth2.google.tokenSource").(string); source != "compute-metadata" {
  251. return false
  252. }
  253. if acct, _ := tok.Extra("oauth2.google.serviceAccount").(string); acct != "default" {
  254. return false
  255. }
  256. return true
  257. }
  258. func checkDirectPathEndPoint(endpoint string) bool {
  259. // Only [dns:///]host[:port] is supported, not other schemes (e.g., "tcp://" or "unix://").
  260. // Also don't try direct path if the user has chosen an alternate name resolver
  261. // (i.e., via ":///" prefix).
  262. //
  263. // TODO(cbro): once gRPC has introspectible options, check the user hasn't
  264. // provided a custom dialer in gRPC options.
  265. if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") {
  266. return false
  267. }
  268. if endpoint == "" {
  269. return false
  270. }
  271. return true
  272. }
  273. func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
  274. var o internal.DialSettings
  275. for _, opt := range opts {
  276. opt.Apply(&o)
  277. }
  278. if err := o.Validate(); err != nil {
  279. return nil, err
  280. }
  281. return &o, nil
  282. }
  283. type connPoolOption struct{ ConnPool }
  284. // WithConnPool returns a ClientOption that specifies the ConnPool
  285. // connection to use as the basis of communications.
  286. //
  287. // This is only to be used by Google client libraries internally, for example
  288. // when creating a longrunning API client that shares the same connection pool
  289. // as a service client.
  290. func WithConnPool(p ConnPool) option.ClientOption {
  291. return connPoolOption{p}
  292. }
  293. func (o connPoolOption) Apply(s *internal.DialSettings) {
  294. s.GRPCConnPool = o.ConnPool
  295. }