Browse Source

libnetwork: replace ad-hoc semaphore implementation

...for limiting concurrent external DNS requests with
"golang.org/x/sync/semaphore".Weighted. Replace the ad-hoc rate limiter
for when the concurrency limit is hit (which contains a data-race bug)
with "golang.org/x/time/rate".Sometimes.

Immediately retrying with the next server if the concurrency limit has
been hit just further compounds the problem. Wait on the semaphore and
refuse the query if it could not be acquired in a reasonable amount of
time.

Signed-off-by: Cory Snider <csnider@mirantis.com>
Cory Snider 2 years ago
parent
commit
25b51cad3d

+ 20 - 37
libnetwork/resolver.go

@@ -1,6 +1,7 @@
 package libnetwork
 
 import (
+	"context"
 	"fmt"
 	"math/rand"
 	"net"
@@ -11,6 +12,8 @@ import (
 	"github.com/docker/docker/libnetwork/types"
 	"github.com/miekg/dns"
 	"github.com/sirupsen/logrus"
+	"golang.org/x/sync/semaphore"
+	"golang.org/x/time/rate"
 )
 
 // Resolver represents the embedded DNS server in Docker. It operates
@@ -85,12 +88,12 @@ type resolver struct {
 	tcpServer     *dns.Server
 	tcpListen     *net.TCPListener
 	err           error
-	count         int32
-	tStamp        time.Time
-	queryLock     sync.Mutex
 	listenAddress string
 	proxyDNS      bool
 	startCh       chan struct{}
+
+	fwdSem      *semaphore.Weighted // Limit the number of concurrent external DNS requests in-flight
+	logInverval rate.Sometimes      // Rate-limit logging about hitting the fwdSem limit
 }
 
 // NewResolver creates a new instance of the Resolver
@@ -101,6 +104,8 @@ func NewResolver(address string, proxyDNS bool, backend DNSBackend) Resolver {
 		listenAddress: address,
 		err:           fmt.Errorf("setup not done yet"),
 		startCh:       make(chan struct{}, 1),
+		fwdSem:        semaphore.NewWeighted(maxConcurrent),
+		logInverval:   rate.Sometimes{Interval: logInterval},
 	}
 }
 
@@ -179,9 +184,7 @@ func (r *resolver) Stop() {
 	r.conn = nil
 	r.tcpServer = nil
 	r.err = fmt.Errorf("setup not done yet")
-	r.tStamp = time.Time{}
-	r.count = 0
-	r.queryLock = sync.Mutex{}
+	r.fwdSem = semaphore.NewWeighted(maxConcurrent)
 }
 
 func (r *resolver) SetExtServers(extDNS []extDNSEntry) {
@@ -467,16 +470,19 @@ func (r *resolver) forwardExtDNS(proto string, query *dns.Msg) *dns.Msg {
 		}
 
 		// limits the number of outstanding concurrent queries.
-		if !r.forwardQueryStart() {
-			old := r.tStamp
-			r.tStamp = time.Now()
-			if r.tStamp.Sub(old) > logInterval {
+		ctx, cancel := context.WithTimeout(context.Background(), extIOTimeout)
+		err := r.fwdSem.Acquire(ctx, 1)
+		cancel()
+		if err != nil {
+			r.logInverval.Do(func() {
 				logrus.Errorf("[resolver] more than %v concurrent queries", maxConcurrent)
-			}
-			continue
+			})
+			return new(dns.Msg).SetRcode(query, dns.RcodeRefused)
 		}
-		resp := r.exchange(proto, extDNS, query)
-		r.forwardQueryEnd()
+		resp := func() *dns.Msg {
+			defer r.fwdSem.Release(1)
+			return r.exchange(proto, extDNS, query)
+		}()
 		if resp == nil {
 			continue
 		}
@@ -573,26 +579,3 @@ func statusString(responseCode int) string {
 	}
 	return "UNKNOWN"
 }
-
-func (r *resolver) forwardQueryStart() bool {
-	r.queryLock.Lock()
-	defer r.queryLock.Unlock()
-
-	if r.count == maxConcurrent {
-		return false
-	}
-	r.count++
-
-	return true
-}
-
-func (r *resolver) forwardQueryEnd() {
-	r.queryLock.Lock()
-	defer r.queryLock.Unlock()
-
-	if r.count == 0 {
-		logrus.Error("[resolver] invalid concurrent query count")
-	} else {
-		r.count--
-	}
-}

+ 1 - 1
vendor.mod

@@ -91,7 +91,7 @@ require (
 	golang.org/x/sync v0.1.0
 	golang.org/x/sys v0.5.0
 	golang.org/x/text v0.7.0
-	golang.org/x/time v0.1.0
+	golang.org/x/time v0.3.0
 	google.golang.org/genproto v0.0.0-20220706185917-7780775163c4
 	google.golang.org/grpc v1.50.1
 	gotest.tools/v3 v3.4.0

+ 2 - 2
vendor.sum

@@ -1422,8 +1422,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb
 golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
-golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
+golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

+ 8 - 12
vendor/golang.org/x/time/rate/rate.go

@@ -83,7 +83,7 @@ func (lim *Limiter) Burst() int {
 // TokensAt returns the number of tokens available at time t.
 func (lim *Limiter) TokensAt(t time.Time) float64 {
 	lim.mu.Lock()
-	_, _, tokens := lim.advance(t) // does not mutute lim
+	_, tokens := lim.advance(t) // does not mutate lim
 	lim.mu.Unlock()
 	return tokens
 }
@@ -183,7 +183,7 @@ func (r *Reservation) CancelAt(t time.Time) {
 		return
 	}
 	// advance time to now
-	t, _, tokens := r.lim.advance(t)
+	t, tokens := r.lim.advance(t)
 	// calculate new number of tokens
 	tokens += restoreTokens
 	if burst := float64(r.lim.burst); tokens > burst {
@@ -304,7 +304,7 @@ func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
 	lim.mu.Lock()
 	defer lim.mu.Unlock()
 
-	t, _, tokens := lim.advance(t)
+	t, tokens := lim.advance(t)
 
 	lim.last = t
 	lim.tokens = tokens
@@ -321,7 +321,7 @@ func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
 	lim.mu.Lock()
 	defer lim.mu.Unlock()
 
-	t, _, tokens := lim.advance(t)
+	t, tokens := lim.advance(t)
 
 	lim.last = t
 	lim.tokens = tokens
@@ -356,7 +356,7 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration)
 		}
 	}
 
-	t, last, tokens := lim.advance(t)
+	t, tokens := lim.advance(t)
 
 	// Calculate the remaining number of tokens resulting from the request.
 	tokens -= float64(n)
@@ -379,15 +379,11 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration)
 	if ok {
 		r.tokens = n
 		r.timeToAct = t.Add(waitDuration)
-	}
 
-	// Update state
-	if ok {
+		// Update state
 		lim.last = t
 		lim.tokens = tokens
 		lim.lastEvent = r.timeToAct
-	} else {
-		lim.last = last
 	}
 
 	return r
@@ -396,7 +392,7 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration)
 // advance calculates and returns an updated state for lim resulting from the passage of time.
 // lim is not changed.
 // advance requires that lim.mu is held.
-func (lim *Limiter) advance(t time.Time) (newT time.Time, newLast time.Time, newTokens float64) {
+func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
 	last := lim.last
 	if t.Before(last) {
 		last = t
@@ -409,7 +405,7 @@ func (lim *Limiter) advance(t time.Time) (newT time.Time, newLast time.Time, new
 	if burst := float64(lim.burst); tokens > burst {
 		tokens = burst
 	}
-	return t, last, tokens
+	return t, tokens
 }
 
 // durationFromTokens is a unit conversion function from the number of tokens to the duration

+ 67 - 0
vendor/golang.org/x/time/rate/sometimes.go

@@ -0,0 +1,67 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rate
+
+import (
+	"sync"
+	"time"
+)
+
+// Sometimes will perform an action occasionally.  The First, Every, and
+// Interval fields govern the behavior of Do, which performs the action.
+// A zero Sometimes value will perform an action exactly once.
+//
+// # Example: logging with rate limiting
+//
+//	var sometimes = rate.Sometimes{First: 3, Interval: 10*time.Second}
+//	func Spammy() {
+//	        sometimes.Do(func() { log.Info("here I am!") })
+//	}
+type Sometimes struct {
+	First    int           // if non-zero, the first N calls to Do will run f.
+	Every    int           // if non-zero, every Nth call to Do will run f.
+	Interval time.Duration // if non-zero and Interval has elapsed since f's last run, Do will run f.
+
+	mu    sync.Mutex
+	count int       // number of Do calls
+	last  time.Time // last time f was run
+}
+
+// Do runs the function f as allowed by First, Every, and Interval.
+//
+// The model is a union (not intersection) of filters.  The first call to Do
+// always runs f.  Subsequent calls to Do run f if allowed by First or Every or
+// Interval.
+//
+// A non-zero First:N causes the first N Do(f) calls to run f.
+//
+// A non-zero Every:M causes every Mth Do(f) call, starting with the first, to
+// run f.
+//
+// A non-zero Interval causes Do(f) to run f if Interval has elapsed since
+// Do last ran f.
+//
+// Specifying multiple filters produces the union of these execution streams.
+// For example, specifying both First:N and Every:M causes the first N Do(f)
+// calls and every Mth Do(f) call, starting with the first, to run f.  See
+// Examples for more.
+//
+// If Do is called multiple times simultaneously, the calls will block and run
+// serially.  Therefore, Do is intended for lightweight operations.
+//
+// Because a call to Do may block until f returns, if f causes Do to be called,
+// it will deadlock.
+func (s *Sometimes) Do(f func()) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.count == 0 ||
+		(s.First > 0 && s.count < s.First) ||
+		(s.Every > 0 && s.count%s.Every == 0) ||
+		(s.Interval > 0 && time.Since(s.last) >= s.Interval) {
+		f()
+		s.last = time.Now()
+	}
+	s.count++
+}

+ 1 - 1
vendor/modules.txt

@@ -1086,7 +1086,7 @@ golang.org/x/text/secure/bidirule
 golang.org/x/text/transform
 golang.org/x/text/unicode/bidi
 golang.org/x/text/unicode/norm
-# golang.org/x/time v0.1.0
+# golang.org/x/time v0.3.0
 ## explicit
 golang.org/x/time/rate
 # google.golang.org/api v0.93.0