|
@@ -115,12 +115,6 @@ type serviceInfo struct {
|
|
|
mdata interface{}
|
|
|
}
|
|
|
|
|
|
-type serverWorkerData struct {
|
|
|
- st transport.ServerTransport
|
|
|
- wg *sync.WaitGroup
|
|
|
- stream *transport.Stream
|
|
|
-}
|
|
|
-
|
|
|
// Server is a gRPC server to serve RPC requests.
|
|
|
type Server struct {
|
|
|
opts serverOptions
|
|
@@ -145,7 +139,7 @@ type Server struct {
|
|
|
channelzID *channelz.Identifier
|
|
|
czData *channelzData
|
|
|
|
|
|
- serverWorkerChannel chan *serverWorkerData
|
|
|
+ serverWorkerChannel chan func()
|
|
|
}
|
|
|
|
|
|
type serverOptions struct {
|
|
@@ -177,6 +171,7 @@ type serverOptions struct {
|
|
|
}
|
|
|
|
|
|
var defaultServerOptions = serverOptions{
|
|
|
+ maxConcurrentStreams: math.MaxUint32,
|
|
|
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
|
|
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
|
|
connectionTimeout: 120 * time.Second,
|
|
@@ -387,6 +382,9 @@ func MaxSendMsgSize(m int) ServerOption {
|
|
|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
|
|
|
// of concurrent streams to each ServerTransport.
|
|
|
func MaxConcurrentStreams(n uint32) ServerOption {
|
|
|
+ if n == 0 {
|
|
|
+ n = math.MaxUint32
|
|
|
+ }
|
|
|
return newFuncServerOption(func(o *serverOptions) {
|
|
|
o.maxConcurrentStreams = n
|
|
|
})
|
|
@@ -567,24 +565,19 @@ const serverWorkerResetThreshold = 1 << 16
|
|
|
// [1] https://github.com/golang/go/issues/18138
|
|
|
func (s *Server) serverWorker() {
|
|
|
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
|
|
|
- data, ok := <-s.serverWorkerChannel
|
|
|
+ f, ok := <-s.serverWorkerChannel
|
|
|
if !ok {
|
|
|
return
|
|
|
}
|
|
|
- s.handleSingleStream(data)
|
|
|
+ f()
|
|
|
}
|
|
|
go s.serverWorker()
|
|
|
}
|
|
|
|
|
|
-func (s *Server) handleSingleStream(data *serverWorkerData) {
|
|
|
- defer data.wg.Done()
|
|
|
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
|
|
|
-}
|
|
|
-
|
|
|
// initServerWorkers creates worker goroutines and a channel to process incoming
|
|
|
// connections to reduce the time spent overall on runtime.morestack.
|
|
|
func (s *Server) initServerWorkers() {
|
|
|
- s.serverWorkerChannel = make(chan *serverWorkerData)
|
|
|
+ s.serverWorkerChannel = make(chan func())
|
|
|
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
|
|
|
go s.serverWorker()
|
|
|
}
|
|
@@ -943,21 +936,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
|
|
|
defer st.Close(errors.New("finished serving streams for the server transport"))
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
|
|
|
st.HandleStreams(func(stream *transport.Stream) {
|
|
|
wg.Add(1)
|
|
|
+
|
|
|
+ streamQuota.acquire()
|
|
|
+ f := func() {
|
|
|
+ defer streamQuota.release()
|
|
|
+ defer wg.Done()
|
|
|
+ s.handleStream(st, stream, s.traceInfo(st, stream))
|
|
|
+ }
|
|
|
+
|
|
|
if s.opts.numServerWorkers > 0 {
|
|
|
- data := &serverWorkerData{st: st, wg: &wg, stream: stream}
|
|
|
select {
|
|
|
- case s.serverWorkerChannel <- data:
|
|
|
+ case s.serverWorkerChannel <- f:
|
|
|
return
|
|
|
default:
|
|
|
// If all stream workers are busy, fallback to the default code path.
|
|
|
}
|
|
|
}
|
|
|
- go func() {
|
|
|
- defer wg.Done()
|
|
|
- s.handleStream(st, stream, s.traceInfo(st, stream))
|
|
|
- }()
|
|
|
+ go f()
|
|
|
}, func(ctx context.Context, method string) context.Context {
|
|
|
if !EnableTracing {
|
|
|
return ctx
|
|
@@ -2052,3 +2050,32 @@ func validateSendCompressor(name, clientCompressors string) error {
|
|
|
}
|
|
|
return fmt.Errorf("client does not support compressor %q", name)
|
|
|
}
|
|
|
+
|
|
|
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
|
|
|
+// called synchronously; release may be called asynchronously.
|
|
|
+type atomicSemaphore struct {
|
|
|
+ n int64
|
|
|
+ wait chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+func (q *atomicSemaphore) acquire() {
|
|
|
+ if atomic.AddInt64(&q.n, -1) < 0 {
|
|
|
+ // We ran out of quota. Block until a release happens.
|
|
|
+ <-q.wait
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (q *atomicSemaphore) release() {
|
|
|
+ // N.B. the "<= 0" check below should allow for this to work with multiple
|
|
|
+ // concurrent calls to acquire, but also note that with synchronous calls to
|
|
|
+ // acquire, as our system does, n will never be less than -1. There are
|
|
|
+ // fairness issues (queuing) to consider if this was to be generalized.
|
|
|
+ if atomic.AddInt64(&q.n, 1) <= 0 {
|
|
|
+ // An acquire was waiting on us. Unblock it.
|
|
|
+ q.wait <- struct{}{}
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func newHandlerQuota(n uint32) *atomicSemaphore {
|
|
|
+ return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)}
|
|
|
+}
|