浏览代码

Merge pull request #1282 from sanimej/lateread

For cached connections ignore late replies after read timeout
Alessandro Boch 9 年之前
父节点
当前提交
23830083ea
共有 1 个文件被更改,包括 92 次插入6 次删除
  1. 92 6
      libnetwork/resolver.go

+ 92 - 6
libnetwork/resolver.go

@@ -62,6 +62,21 @@ type extDNSEntry struct {
 	extOnce sync.Once
 	extOnce sync.Once
 }
 }
 
 
+type sboxQuery struct {
+	sboxID string
+	dnsID  uint16
+}
+
+type clientConnGC struct {
+	toDelete bool
+	client   clientConn
+}
+
+var (
+	queryGCMutex sync.Mutex
+	queryGC      map[sboxQuery]*clientConnGC
+)
+
 // resolver implements the Resolver interface
 // resolver implements the Resolver interface
 type resolver struct {
 type resolver struct {
 	sb         *sandbox
 	sb         *sandbox
@@ -79,6 +94,21 @@ type resolver struct {
 
 
 func init() {
 func init() {
 	rand.Seed(time.Now().Unix())
 	rand.Seed(time.Now().Unix())
+	queryGC = make(map[sboxQuery]*clientConnGC)
+	go func() {
+		ticker := time.NewTicker(1 * time.Minute)
+		for range ticker.C {
+			queryGCMutex.Lock()
+			for query, conn := range queryGC {
+				if !conn.toDelete {
+					conn.toDelete = true
+					continue
+				}
+				delete(queryGC, query)
+			}
+			queryGCMutex.Unlock()
+		}
+	}()
 }
 }
 
 
 // NewResolver creates a new instance of the Resolver
 // NewResolver creates a new instance of the Resolver
@@ -370,6 +400,7 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
 		writer = w
 		writer = w
 	} else {
 	} else {
 		queryID := query.Id
 		queryID := query.Id
+	extQueryLoop:
 		for i := 0; i < maxExtDNS; i++ {
 		for i := 0; i < maxExtDNS; i++ {
 			extDNS := &r.extDNSList[i]
 			extDNS := &r.extDNSList[i]
 			if extDNS.ipStr == "" {
 			if extDNS.ipStr == "" {
@@ -435,14 +466,26 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
 				log.Debugf("Send to DNS server failed, %s", err)
 				log.Debugf("Send to DNS server failed, %s", err)
 				continue
 				continue
 			}
 			}
+			for {
+				// If a reply comes after a read timeout it will remain in the socket buffer
+				// and will be read after sending next query. To ignore such stale replies
+				// save the query context in a GC queue when read timesout. On the next reply
+				// if the context is present in the GC queue its a old reply. Ignore it and
+				// read again
+				resp, err = co.ReadMsg()
+				if err != nil {
+					if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
+						r.addQueryToGC(w, query)
+					}
+					r.forwardQueryEnd(w, query)
+					log.Debugf("Read from DNS server failed, %s", err)
+					continue extQueryLoop
+				}
 
 
-			resp, err = co.ReadMsg()
-			if err != nil {
-				r.forwardQueryEnd(w, query)
-				log.Debugf("Read from DNS server failed, %s", err)
-				continue
+				if !r.checkRespInGC(w, resp) {
+					break
+				}
 			}
 			}
-
 			// Retrieves the context for the forwarded query and returns the client connection
 			// Retrieves the context for the forwarded query and returns the client connection
 			// to send the reply to
 			// to send the reply to
 			writer = r.forwardQueryEnd(w, resp)
 			writer = r.forwardQueryEnd(w, resp)
@@ -501,6 +544,49 @@ func (r *resolver) forwardQueryStart(w dns.ResponseWriter, msg *dns.Msg, queryID
 	return true
 	return true
 }
 }
 
 
+func (r *resolver) addQueryToGC(w dns.ResponseWriter, msg *dns.Msg) {
+	if w.LocalAddr().Network() != "udp" {
+		return
+	}
+
+	r.queryLock.Lock()
+	cc, ok := r.client[msg.Id]
+	r.queryLock.Unlock()
+	if !ok {
+		return
+	}
+
+	query := sboxQuery{
+		sboxID: r.sb.ID(),
+		dnsID:  msg.Id,
+	}
+	clientGC := &clientConnGC{
+		client: cc,
+	}
+	queryGCMutex.Lock()
+	queryGC[query] = clientGC
+	queryGCMutex.Unlock()
+}
+
+func (r *resolver) checkRespInGC(w dns.ResponseWriter, msg *dns.Msg) bool {
+	if w.LocalAddr().Network() != "udp" {
+		return false
+	}
+
+	query := sboxQuery{
+		sboxID: r.sb.ID(),
+		dnsID:  msg.Id,
+	}
+
+	queryGCMutex.Lock()
+	defer queryGCMutex.Unlock()
+	if _, ok := queryGC[query]; ok {
+		delete(queryGC, query)
+		return true
+	}
+	return false
+}
+
 func (r *resolver) forwardQueryEnd(w dns.ResponseWriter, msg *dns.Msg) dns.ResponseWriter {
 func (r *resolver) forwardQueryEnd(w dns.ResponseWriter, msg *dns.Msg) dns.ResponseWriter {
 	var (
 	var (
 		cc clientConn
 		cc clientConn