Browse Source

vendor: hashicorp/memberlist, google/btree (dep) update

Upstream update fixes the issue where left node would be marked as
failed, which caused `TestNetworkDBIslands` to occasionally fail.

Signed-off-by: Roman Volosatovs <roman.volosatovs@docker.com>
Roman Volosatovs 4 năm trước cách đây
mục cha
commit
cdd04a94bc

+ 2 - 1
vendor.conf

@@ -49,7 +49,7 @@ github.com/docker/go-events                         e31b211e4f1cd09aa76fe4ac2445
 github.com/armon/go-radix                           e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
 github.com/armon/go-metrics                         eb0af217e5e9747e41dd5303755356b62d28e3ec
 github.com/hashicorp/go-msgpack                     71c2886f5a673a35f909803f38ece5810165097b
-github.com/hashicorp/memberlist                     3d8438da9589e7b608a83ffac1ef8211486bcb7c
+github.com/hashicorp/memberlist                     619135cdd9e5dda8c12f8ceef39bdade4f5899b6 # v0.2.4
 github.com/sean-/seed                               e2103e2c35297fb7e17febb81e49b312087a2372
 github.com/hashicorp/errwrap                        8a6fb523712970c966eefc6b39ed2c5e74880354 # v1.0.0
 github.com/hashicorp/go-sockaddr                    c7188e74f6acae5a989bdc959aa779f8b9f42faf # v1.0.2
@@ -59,6 +59,7 @@ github.com/docker/libkv                             458977154600b9f23984d9f4b82e
 github.com/vishvananda/netns                        db3c7e526aae966c4ccfa6c8189b693d6ac5d202
 github.com/vishvananda/netlink                      f049be6f391489d3f374498fe0c8df8449258372 # v1.1.0
 github.com/moby/ipvs                                4566ccea0e08d68e9614c3e7a64a23b850c4bb35 # v1.0.1
+github.com/google/btree                             479b5e81b0a93ec038d201b0b33d17db599531d3 # v1.0.1
 
 github.com/samuel/go-zookeeper                      d0e0d8e11f318e000a8cc434616d69e329edc374
 github.com/deckarep/golang-set                      ef32fa3046d9f249d399f98ebaf9be944430fd1d

+ 202 - 0
vendor/github.com/google/btree/LICENSE

@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 12 - 0
vendor/github.com/google/btree/README.md

@@ -0,0 +1,12 @@
+# BTree implementation for Go
+
+![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master)
+
+This package provides an in-memory B-Tree implementation for Go, useful as
+an ordered, mutable data structure.
+
+The API is based off of the wonderful
+http://godoc.org/github.com/petar/GoLLRB/llrb, and is meant to allow btree to
+act as a drop-in replacement for gollrb trees.
+
+See http://godoc.org/github.com/google/btree for documentation.

+ 890 - 0
vendor/github.com/google/btree/btree.go

@@ -0,0 +1,890 @@
+// Copyright 2014 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package btree implements in-memory B-Trees of arbitrary degree.
+//
+// btree implements an in-memory B-Tree for use as an ordered data structure.
+// It is not meant for persistent storage solutions.
+//
+// It has a flatter structure than an equivalent red-black or other binary tree,
+// which in some cases yields better memory usage and/or performance.
+// See some discussion on the matter here:
+//   http://google-opensource.blogspot.com/2013/01/c-containers-that-save-memory-and-time.html
+// Note, though, that this project is in no way related to the C++ B-Tree
+// implementation written about there.
+//
+// Within this tree, each node contains a slice of items and a (possibly nil)
+// slice of children.  For basic numeric values or raw structs, this can cause
+// efficiency differences when compared to equivalent C++ template code that
+// stores values in arrays within the node:
+//   * Due to the overhead of storing values as interfaces (each
+//     value needs to be stored as the value itself, then 2 words for the
+//     interface pointing to that value and its type), resulting in higher
+//     memory use.
+//   * Since interfaces can point to values anywhere in memory, values are
+//     most likely not stored in contiguous blocks, resulting in a higher
+//     number of cache misses.
+// These issues don't tend to matter, though, when working with strings or other
+// heap-allocated structures, since C++-equivalent structures also must store
+// pointers and also distribute their values across the heap.
+//
+// This implementation is designed to be a drop-in replacement to gollrb.LLRB
+// trees, (http://github.com/petar/gollrb), an excellent and probably the most
+// widely used ordered tree implementation in the Go ecosystem currently.
+// Its functions, therefore, exactly mirror those of
+// llrb.LLRB where possible.  Unlike gollrb, though, we currently don't
+// support storing multiple equivalent values.
+package btree
+
+import (
+	"fmt"
+	"io"
+	"sort"
+	"strings"
+	"sync"
+)
+
+// Item represents a single object in the tree.
+type Item interface {
+	// Less tests whether the current item is less than the given argument.
+	//
+	// This must provide a strict weak ordering.
+	// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
+	// hold one of either a or b in the tree).
+	Less(than Item) bool
+}
+
+const (
+	DefaultFreeListSize = 32
+)
+
+var (
+	nilItems    = make(items, 16)
+	nilChildren = make(children, 16)
+)
+
+// FreeList represents a free list of btree nodes. By default each
+// BTree has its own FreeList, but multiple BTrees can share the same
+// FreeList.
+// Two Btrees using the same freelist are safe for concurrent write access.
+type FreeList struct {
+	mu       sync.Mutex
+	freelist []*node
+}
+
+// NewFreeList creates a new free list.
+// size is the maximum size of the returned free list.
+func NewFreeList(size int) *FreeList {
+	return &FreeList{freelist: make([]*node, 0, size)}
+}
+
+func (f *FreeList) newNode() (n *node) {
+	f.mu.Lock()
+	index := len(f.freelist) - 1
+	if index < 0 {
+		f.mu.Unlock()
+		return new(node)
+	}
+	n = f.freelist[index]
+	f.freelist[index] = nil
+	f.freelist = f.freelist[:index]
+	f.mu.Unlock()
+	return
+}
+
+// freeNode adds the given node to the list, returning true if it was added
+// and false if it was discarded.
+func (f *FreeList) freeNode(n *node) (out bool) {
+	f.mu.Lock()
+	if len(f.freelist) < cap(f.freelist) {
+		f.freelist = append(f.freelist, n)
+		out = true
+	}
+	f.mu.Unlock()
+	return
+}
+
+// ItemIterator allows callers of Ascend* to iterate in-order over portions of
+// the tree.  When this function returns false, iteration will stop and the
+// associated Ascend* function will immediately return.
+type ItemIterator func(i Item) bool
+
+// New creates a new B-Tree with the given degree.
+//
+// New(2), for example, will create a 2-3-4 tree (each node contains 1-3 items
+// and 2-4 children).
+func New(degree int) *BTree {
+	return NewWithFreeList(degree, NewFreeList(DefaultFreeListSize))
+}
+
+// NewWithFreeList creates a new B-Tree that uses the given node free list.
+func NewWithFreeList(degree int, f *FreeList) *BTree {
+	if degree <= 1 {
+		panic("bad degree")
+	}
+	return &BTree{
+		degree: degree,
+		cow:    &copyOnWriteContext{freelist: f},
+	}
+}
+
+// items stores items in a node.
+type items []Item
+
+// insertAt inserts a value into the given index, pushing all subsequent values
+// forward.
+func (s *items) insertAt(index int, item Item) {
+	*s = append(*s, nil)
+	if index < len(*s) {
+		copy((*s)[index+1:], (*s)[index:])
+	}
+	(*s)[index] = item
+}
+
+// removeAt removes a value at a given index, pulling all subsequent values
+// back.
+func (s *items) removeAt(index int) Item {
+	item := (*s)[index]
+	copy((*s)[index:], (*s)[index+1:])
+	(*s)[len(*s)-1] = nil
+	*s = (*s)[:len(*s)-1]
+	return item
+}
+
+// pop removes and returns the last element in the list.
+func (s *items) pop() (out Item) {
+	index := len(*s) - 1
+	out = (*s)[index]
+	(*s)[index] = nil
+	*s = (*s)[:index]
+	return
+}
+
+// truncate truncates this instance at index so that it contains only the
+// first index items. index must be less than or equal to length.
+func (s *items) truncate(index int) {
+	var toClear items
+	*s, toClear = (*s)[:index], (*s)[index:]
+	for len(toClear) > 0 {
+		toClear = toClear[copy(toClear, nilItems):]
+	}
+}
+
+// find returns the index where the given item should be inserted into this
+// list.  'found' is true if the item already exists in the list at the given
+// index.
+func (s items) find(item Item) (index int, found bool) {
+	i := sort.Search(len(s), func(i int) bool {
+		return item.Less(s[i])
+	})
+	if i > 0 && !s[i-1].Less(item) {
+		return i - 1, true
+	}
+	return i, false
+}
+
+// children stores child nodes in a node.
+type children []*node
+
+// insertAt inserts a value into the given index, pushing all subsequent values
+// forward.
+func (s *children) insertAt(index int, n *node) {
+	*s = append(*s, nil)
+	if index < len(*s) {
+		copy((*s)[index+1:], (*s)[index:])
+	}
+	(*s)[index] = n
+}
+
+// removeAt removes a value at a given index, pulling all subsequent values
+// back.
+func (s *children) removeAt(index int) *node {
+	n := (*s)[index]
+	copy((*s)[index:], (*s)[index+1:])
+	(*s)[len(*s)-1] = nil
+	*s = (*s)[:len(*s)-1]
+	return n
+}
+
+// pop removes and returns the last element in the list.
+func (s *children) pop() (out *node) {
+	index := len(*s) - 1
+	out = (*s)[index]
+	(*s)[index] = nil
+	*s = (*s)[:index]
+	return
+}
+
+// truncate truncates this instance at index so that it contains only the
+// first index children. index must be less than or equal to length.
+func (s *children) truncate(index int) {
+	var toClear children
+	*s, toClear = (*s)[:index], (*s)[index:]
+	for len(toClear) > 0 {
+		toClear = toClear[copy(toClear, nilChildren):]
+	}
+}
+
+// node is an internal node in a tree.
+//
+// It must at all times maintain the invariant that either
+//   * len(children) == 0, len(items) unconstrained
+//   * len(children) == len(items) + 1
+type node struct {
+	items    items
+	children children
+	cow      *copyOnWriteContext
+}
+
+func (n *node) mutableFor(cow *copyOnWriteContext) *node {
+	if n.cow == cow {
+		return n
+	}
+	out := cow.newNode()
+	if cap(out.items) >= len(n.items) {
+		out.items = out.items[:len(n.items)]
+	} else {
+		out.items = make(items, len(n.items), cap(n.items))
+	}
+	copy(out.items, n.items)
+	// Copy children
+	if cap(out.children) >= len(n.children) {
+		out.children = out.children[:len(n.children)]
+	} else {
+		out.children = make(children, len(n.children), cap(n.children))
+	}
+	copy(out.children, n.children)
+	return out
+}
+
+func (n *node) mutableChild(i int) *node {
+	c := n.children[i].mutableFor(n.cow)
+	n.children[i] = c
+	return c
+}
+
+// split splits the given node at the given index.  The current node shrinks,
+// and this function returns the item that existed at that index and a new node
+// containing all items/children after it.
+func (n *node) split(i int) (Item, *node) {
+	item := n.items[i]
+	next := n.cow.newNode()
+	next.items = append(next.items, n.items[i+1:]...)
+	n.items.truncate(i)
+	if len(n.children) > 0 {
+		next.children = append(next.children, n.children[i+1:]...)
+		n.children.truncate(i + 1)
+	}
+	return item, next
+}
+
+// maybeSplitChild checks if a child should be split, and if so splits it.
+// Returns whether or not a split occurred.
+func (n *node) maybeSplitChild(i, maxItems int) bool {
+	if len(n.children[i].items) < maxItems {
+		return false
+	}
+	first := n.mutableChild(i)
+	item, second := first.split(maxItems / 2)
+	n.items.insertAt(i, item)
+	n.children.insertAt(i+1, second)
+	return true
+}
+
+// insert inserts an item into the subtree rooted at this node, making sure
+// no nodes in the subtree exceed maxItems items.  Should an equivalent item be
+// be found/replaced by insert, it will be returned.
+func (n *node) insert(item Item, maxItems int) Item {
+	i, found := n.items.find(item)
+	if found {
+		out := n.items[i]
+		n.items[i] = item
+		return out
+	}
+	if len(n.children) == 0 {
+		n.items.insertAt(i, item)
+		return nil
+	}
+	if n.maybeSplitChild(i, maxItems) {
+		inTree := n.items[i]
+		switch {
+		case item.Less(inTree):
+			// no change, we want first split node
+		case inTree.Less(item):
+			i++ // we want second split node
+		default:
+			out := n.items[i]
+			n.items[i] = item
+			return out
+		}
+	}
+	return n.mutableChild(i).insert(item, maxItems)
+}
+
+// get finds the given key in the subtree and returns it.
+func (n *node) get(key Item) Item {
+	i, found := n.items.find(key)
+	if found {
+		return n.items[i]
+	} else if len(n.children) > 0 {
+		return n.children[i].get(key)
+	}
+	return nil
+}
+
+// min returns the first item in the subtree.
+func min(n *node) Item {
+	if n == nil {
+		return nil
+	}
+	for len(n.children) > 0 {
+		n = n.children[0]
+	}
+	if len(n.items) == 0 {
+		return nil
+	}
+	return n.items[0]
+}
+
+// max returns the last item in the subtree.
+func max(n *node) Item {
+	if n == nil {
+		return nil
+	}
+	for len(n.children) > 0 {
+		n = n.children[len(n.children)-1]
+	}
+	if len(n.items) == 0 {
+		return nil
+	}
+	return n.items[len(n.items)-1]
+}
+
+// toRemove details what item to remove in a node.remove call.
+type toRemove int
+
+const (
+	removeItem toRemove = iota // removes the given item
+	removeMin                  // removes smallest item in the subtree
+	removeMax                  // removes largest item in the subtree
+)
+
+// remove removes an item from the subtree rooted at this node.
+func (n *node) remove(item Item, minItems int, typ toRemove) Item {
+	var i int
+	var found bool
+	switch typ {
+	case removeMax:
+		if len(n.children) == 0 {
+			return n.items.pop()
+		}
+		i = len(n.items)
+	case removeMin:
+		if len(n.children) == 0 {
+			return n.items.removeAt(0)
+		}
+		i = 0
+	case removeItem:
+		i, found = n.items.find(item)
+		if len(n.children) == 0 {
+			if found {
+				return n.items.removeAt(i)
+			}
+			return nil
+		}
+	default:
+		panic("invalid type")
+	}
+	// If we get to here, we have children.
+	if len(n.children[i].items) <= minItems {
+		return n.growChildAndRemove(i, item, minItems, typ)
+	}
+	child := n.mutableChild(i)
+	// Either we had enough items to begin with, or we've done some
+	// merging/stealing, because we've got enough now and we're ready to return
+	// stuff.
+	if found {
+		// The item exists at index 'i', and the child we've selected can give us a
+		// predecessor, since if we've gotten here it's got > minItems items in it.
+		out := n.items[i]
+		// We use our special-case 'remove' call with typ=maxItem to pull the
+		// predecessor of item i (the rightmost leaf of our immediate left child)
+		// and set it into where we pulled the item from.
+		n.items[i] = child.remove(nil, minItems, removeMax)
+		return out
+	}
+	// Final recursive call.  Once we're here, we know that the item isn't in this
+	// node and that the child is big enough to remove from.
+	return child.remove(item, minItems, typ)
+}
+
+// growChildAndRemove grows child 'i' to make sure it's possible to remove an
+// item from it while keeping it at minItems, then calls remove to actually
+// remove it.
+//
+// Most documentation says we have to do two sets of special casing:
+//   1) item is in this node
+//   2) item is in child
+// In both cases, we need to handle the two subcases:
+//   A) node has enough values that it can spare one
+//   B) node doesn't have enough values
+// For the latter, we have to check:
+//   a) left sibling has node to spare
+//   b) right sibling has node to spare
+//   c) we must merge
+// To simplify our code here, we handle cases #1 and #2 the same:
+// If a node doesn't have enough items, we make sure it does (using a,b,c).
+// We then simply redo our remove call, and the second time (regardless of
+// whether we're in case 1 or 2), we'll have enough items and can guarantee
+// that we hit case A.
+func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) Item {
+	if i > 0 && len(n.children[i-1].items) > minItems {
+		// Steal from left child
+		child := n.mutableChild(i)
+		stealFrom := n.mutableChild(i - 1)
+		stolenItem := stealFrom.items.pop()
+		child.items.insertAt(0, n.items[i-1])
+		n.items[i-1] = stolenItem
+		if len(stealFrom.children) > 0 {
+			child.children.insertAt(0, stealFrom.children.pop())
+		}
+	} else if i < len(n.items) && len(n.children[i+1].items) > minItems {
+		// steal from right child
+		child := n.mutableChild(i)
+		stealFrom := n.mutableChild(i + 1)
+		stolenItem := stealFrom.items.removeAt(0)
+		child.items = append(child.items, n.items[i])
+		n.items[i] = stolenItem
+		if len(stealFrom.children) > 0 {
+			child.children = append(child.children, stealFrom.children.removeAt(0))
+		}
+	} else {
+		if i >= len(n.items) {
+			i--
+		}
+		child := n.mutableChild(i)
+		// merge with right child
+		mergeItem := n.items.removeAt(i)
+		mergeChild := n.children.removeAt(i + 1)
+		child.items = append(child.items, mergeItem)
+		child.items = append(child.items, mergeChild.items...)
+		child.children = append(child.children, mergeChild.children...)
+		n.cow.freeNode(mergeChild)
+	}
+	return n.remove(item, minItems, typ)
+}
+
+type direction int
+
+const (
+	descend = direction(-1)
+	ascend  = direction(+1)
+)
+
+// iterate provides a simple method for iterating over elements in the tree.
+//
+// When ascending, the 'start' should be less than 'stop' and when descending,
+// the 'start' should be greater than 'stop'. Setting 'includeStart' to true
+// will force the iterator to include the first item when it equals 'start',
+// thus creating a "greaterOrEqual" or "lessThanEqual" rather than just a
+// "greaterThan" or "lessThan" queries.
+func (n *node) iterate(dir direction, start, stop Item, includeStart bool, hit bool, iter ItemIterator) (bool, bool) {
+	var ok, found bool
+	var index int
+	switch dir {
+	case ascend:
+		if start != nil {
+			index, _ = n.items.find(start)
+		}
+		for i := index; i < len(n.items); i++ {
+			if len(n.children) > 0 {
+				if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok {
+					return hit, false
+				}
+			}
+			if !includeStart && !hit && start != nil && !start.Less(n.items[i]) {
+				hit = true
+				continue
+			}
+			hit = true
+			if stop != nil && !n.items[i].Less(stop) {
+				return hit, false
+			}
+			if !iter(n.items[i]) {
+				return hit, false
+			}
+		}
+		if len(n.children) > 0 {
+			if hit, ok = n.children[len(n.children)-1].iterate(dir, start, stop, includeStart, hit, iter); !ok {
+				return hit, false
+			}
+		}
+	case descend:
+		if start != nil {
+			index, found = n.items.find(start)
+			if !found {
+				index = index - 1
+			}
+		} else {
+			index = len(n.items) - 1
+		}
+		for i := index; i >= 0; i-- {
+			if start != nil && !n.items[i].Less(start) {
+				if !includeStart || hit || start.Less(n.items[i]) {
+					continue
+				}
+			}
+			if len(n.children) > 0 {
+				if hit, ok = n.children[i+1].iterate(dir, start, stop, includeStart, hit, iter); !ok {
+					return hit, false
+				}
+			}
+			if stop != nil && !stop.Less(n.items[i]) {
+				return hit, false //	continue
+			}
+			hit = true
+			if !iter(n.items[i]) {
+				return hit, false
+			}
+		}
+		if len(n.children) > 0 {
+			if hit, ok = n.children[0].iterate(dir, start, stop, includeStart, hit, iter); !ok {
+				return hit, false
+			}
+		}
+	}
+	return hit, true
+}
+
+// Used for testing/debugging purposes.
+func (n *node) print(w io.Writer, level int) {
+	fmt.Fprintf(w, "%sNODE:%v\n", strings.Repeat("  ", level), n.items)
+	for _, c := range n.children {
+		c.print(w, level+1)
+	}
+}
+
+// BTree is an implementation of a B-Tree.
+//
+// BTree stores Item instances in an ordered structure, allowing easy insertion,
+// removal, and iteration.
+//
+// Write operations are not safe for concurrent mutation by multiple
+// goroutines, but Read operations are.
+type BTree struct {
+	degree int
+	length int
+	root   *node
+	cow    *copyOnWriteContext
+}
+
+// copyOnWriteContext pointers determine node ownership... a tree with a write
+// context equivalent to a node's write context is allowed to modify that node.
+// A tree whose write context does not match a node's is not allowed to modify
+// it, and must create a new, writable copy (IE: it's a Clone).
+//
+// When doing any write operation, we maintain the invariant that the current
+// node's context is equal to the context of the tree that requested the write.
+// We do this by, before we descend into any node, creating a copy with the
+// correct context if the contexts don't match.
+//
+// Since the node we're currently visiting on any write has the requesting
+// tree's context, that node is modifiable in place.  Children of that node may
+// not share context, but before we descend into them, we'll make a mutable
+// copy.
+type copyOnWriteContext struct {
+	freelist *FreeList
+}
+
+// Clone clones the btree, lazily.  Clone should not be called concurrently,
+// but the original tree (t) and the new tree (t2) can be used concurrently
+// once the Clone call completes.
+//
+// The internal tree structure of b is marked read-only and shared between t and
+// t2.  Writes to both t and t2 use copy-on-write logic, creating new nodes
+// whenever one of b's original nodes would have been modified.  Read operations
+// should have no performance degredation.  Write operations for both t and t2
+// will initially experience minor slow-downs caused by additional allocs and
+// copies due to the aforementioned copy-on-write logic, but should converge to
+// the original performance characteristics of the original tree.
+func (t *BTree) Clone() (t2 *BTree) {
+	// Create two entirely new copy-on-write contexts.
+	// This operation effectively creates three trees:
+	//   the original, shared nodes (old b.cow)
+	//   the new b.cow nodes
+	//   the new out.cow nodes
+	cow1, cow2 := *t.cow, *t.cow
+	out := *t
+	t.cow = &cow1
+	out.cow = &cow2
+	return &out
+}
+
+// maxItems returns the max number of items to allow per node.
+func (t *BTree) maxItems() int {
+	return t.degree*2 - 1
+}
+
+// minItems returns the min number of items to allow per node (ignored for the
+// root node).
+func (t *BTree) minItems() int {
+	return t.degree - 1
+}
+
+func (c *copyOnWriteContext) newNode() (n *node) {
+	n = c.freelist.newNode()
+	n.cow = c
+	return
+}
+
+type freeType int
+
+const (
+	ftFreelistFull freeType = iota // node was freed (available for GC, not stored in freelist)
+	ftStored                       // node was stored in the freelist for later use
+	ftNotOwned                     // node was ignored by COW, since it's owned by another one
+)
+
+// freeNode frees a node within a given COW context, if it's owned by that
+// context.  It returns what happened to the node (see freeType const
+// documentation).
+func (c *copyOnWriteContext) freeNode(n *node) freeType {
+	if n.cow == c {
+		// clear to allow GC
+		n.items.truncate(0)
+		n.children.truncate(0)
+		n.cow = nil
+		if c.freelist.freeNode(n) {
+			return ftStored
+		} else {
+			return ftFreelistFull
+		}
+	} else {
+		return ftNotOwned
+	}
+}
+
+// ReplaceOrInsert adds the given item to the tree.  If an item in the tree
+// already equals the given one, it is removed from the tree and returned.
+// Otherwise, nil is returned.
+//
+// nil cannot be added to the tree (will panic).
+func (t *BTree) ReplaceOrInsert(item Item) Item {
+	if item == nil {
+		panic("nil item being added to BTree")
+	}
+	if t.root == nil {
+		t.root = t.cow.newNode()
+		t.root.items = append(t.root.items, item)
+		t.length++
+		return nil
+	} else {
+		t.root = t.root.mutableFor(t.cow)
+		if len(t.root.items) >= t.maxItems() {
+			item2, second := t.root.split(t.maxItems() / 2)
+			oldroot := t.root
+			t.root = t.cow.newNode()
+			t.root.items = append(t.root.items, item2)
+			t.root.children = append(t.root.children, oldroot, second)
+		}
+	}
+	out := t.root.insert(item, t.maxItems())
+	if out == nil {
+		t.length++
+	}
+	return out
+}
+
+// Delete removes an item equal to the passed in item from the tree, returning
+// it.  If no such item exists, returns nil.
+func (t *BTree) Delete(item Item) Item {
+	return t.deleteItem(item, removeItem)
+}
+
+// DeleteMin removes the smallest item in the tree and returns it.
+// If no such item exists, returns nil.
+func (t *BTree) DeleteMin() Item {
+	return t.deleteItem(nil, removeMin)
+}
+
+// DeleteMax removes the largest item in the tree and returns it.
+// If no such item exists, returns nil.
+func (t *BTree) DeleteMax() Item {
+	return t.deleteItem(nil, removeMax)
+}
+
+func (t *BTree) deleteItem(item Item, typ toRemove) Item {
+	if t.root == nil || len(t.root.items) == 0 {
+		return nil
+	}
+	t.root = t.root.mutableFor(t.cow)
+	out := t.root.remove(item, t.minItems(), typ)
+	if len(t.root.items) == 0 && len(t.root.children) > 0 {
+		oldroot := t.root
+		t.root = t.root.children[0]
+		t.cow.freeNode(oldroot)
+	}
+	if out != nil {
+		t.length--
+	}
+	return out
+}
+
+// AscendRange calls the iterator for every value in the tree within the range
+// [greaterOrEqual, lessThan), until iterator returns false.
+func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(ascend, greaterOrEqual, lessThan, true, false, iterator)
+}
+
+// AscendLessThan calls the iterator for every value in the tree within the range
+// [first, pivot), until iterator returns false.
+func (t *BTree) AscendLessThan(pivot Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(ascend, nil, pivot, false, false, iterator)
+}
+
+// AscendGreaterOrEqual calls the iterator for every value in the tree within
+// the range [pivot, last], until iterator returns false.
+func (t *BTree) AscendGreaterOrEqual(pivot Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(ascend, pivot, nil, true, false, iterator)
+}
+
+// Ascend calls the iterator for every value in the tree within the range
+// [first, last], until iterator returns false.
+func (t *BTree) Ascend(iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(ascend, nil, nil, false, false, iterator)
+}
+
+// DescendRange calls the iterator for every value in the tree within the range
+// [lessOrEqual, greaterThan), until iterator returns false.
+func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(descend, lessOrEqual, greaterThan, true, false, iterator)
+}
+
+// DescendLessOrEqual calls the iterator for every value in the tree within the range
+// [pivot, first], until iterator returns false.
+func (t *BTree) DescendLessOrEqual(pivot Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(descend, pivot, nil, true, false, iterator)
+}
+
+// DescendGreaterThan calls the iterator for every value in the tree within
+// the range [last, pivot), until iterator returns false.
+func (t *BTree) DescendGreaterThan(pivot Item, iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(descend, nil, pivot, false, false, iterator)
+}
+
+// Descend calls the iterator for every value in the tree within the range
+// [last, first], until iterator returns false.
+func (t *BTree) Descend(iterator ItemIterator) {
+	if t.root == nil {
+		return
+	}
+	t.root.iterate(descend, nil, nil, false, false, iterator)
+}
+
+// Get looks for the key item in the tree, returning it.  It returns nil if
+// unable to find that item.
+func (t *BTree) Get(key Item) Item {
+	if t.root == nil {
+		return nil
+	}
+	return t.root.get(key)
+}
+
+// Min returns the smallest item in the tree, or nil if the tree is empty.
+func (t *BTree) Min() Item {
+	return min(t.root)
+}
+
+// Max returns the largest item in the tree, or nil if the tree is empty.
+func (t *BTree) Max() Item {
+	return max(t.root)
+}
+
+// Has returns true if the given key is in the tree.
+func (t *BTree) Has(key Item) bool {
+	return t.Get(key) != nil
+}
+
+// Len returns the number of items currently in the tree.
+func (t *BTree) Len() int {
+	return t.length
+}
+
+// Clear removes all items from the btree.  If addNodesToFreelist is true,
+// t's nodes are added to its freelist as part of this call, until the freelist
+// is full.  Otherwise, the root node is simply dereferenced and the subtree
+// left to Go's normal GC processes.
+//
+// This can be much faster
+// than calling Delete on all elements, because that requires finding/removing
+// each element in the tree and updating the tree accordingly.  It also is
+// somewhat faster than creating a new tree to replace the old one, because
+// nodes from the old tree are reclaimed into the freelist for use by the new
+// one, instead of being lost to the garbage collector.
+//
+// This call takes:
+//   O(1): when addNodesToFreelist is false, this is a single operation.
+//   O(1): when the freelist is already full, it breaks out immediately
+//   O(freelist size):  when the freelist is empty and the nodes are all owned
+//       by this tree, nodes are added to the freelist until full.
+//   O(tree size):  when all nodes are owned by another tree, all nodes are
+//       iterated over looking for nodes to add to the freelist, and due to
+//       ownership, none are.
+func (t *BTree) Clear(addNodesToFreelist bool) {
+	if t.root != nil && addNodesToFreelist {
+		t.root.reset(t.cow)
+	}
+	t.root, t.length = nil, 0
+}
+
+// reset returns a subtree to the freelist.  It breaks out immediately if the
+// freelist is full, since the only benefit of iterating is to fill that
+// freelist up.  Returns true if parent reset call should continue.
+func (n *node) reset(c *copyOnWriteContext) bool {
+	for _, child := range n.children {
+		if !child.reset(c) {
+			return false
+		}
+	}
+	return c.freeNode(n) != ftFreelistFull
+}
+
+// Int implements the Item interface for integers.
+type Int int
+
+// Less returns true if int(a) < int(b).
+func (a Int) Less(b Item) bool {
+	return a < b.(Int)
+}

+ 17 - 0
vendor/github.com/google/btree/go.mod

@@ -0,0 +1,17 @@
+// Copyright 2014 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module github.com/google/btree
+
+go 1.12

+ 2 - 4
vendor/github.com/hashicorp/memberlist/README.md

@@ -1,4 +1,4 @@
-# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist)
+# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) [![CircleCI](https://circleci.com/gh/hashicorp/memberlist.svg?style=svg)](https://circleci.com/gh/hashicorp/memberlist)
 
 memberlist is a [Go](http://www.golang.org) library that manages cluster
 membership and member failure detection using a gossip based protocol.
@@ -23,8 +23,6 @@ Please check your installation with:
 go version
 ```
 
-Run `make deps` to fetch dependencies before building
-
 ## Usage
 
 Memberlist is surprisingly simple to use. An example is shown below:
@@ -65,7 +63,7 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c
 
 ## Protocol
 
-memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways:
+memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways:
 
 * Several extensions are made to increase propagation speed and
 convergence rate.

+ 3 - 3
vendor/github.com/hashicorp/memberlist/alive_delegate.go

@@ -7,8 +7,8 @@ package memberlist
 // a node out and prevent it from being considered a peer
 // using application specific logic.
 type AliveDelegate interface {
-	// NotifyMerge is invoked when a merge could take place.
-	// Provides a list of the nodes known by the peer. If
-	// the return value is non-nil, the merge is canceled.
+	// NotifyAlive is invoked when a message about a live
+	// node is received from the network.  Returning a non-nil
+	// error prevents the node from being considered a peer.
 	NotifyAlive(peer *Node) error
 }

+ 5 - 0
vendor/github.com/hashicorp/memberlist/broadcast.go

@@ -29,6 +29,11 @@ func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
 	return b.node == mb.node
 }
 
+// memberlist.NamedBroadcast optional interface
+func (b *memberlistBroadcast) Name() string {
+	return b.node
+}
+
 func (b *memberlistBroadcast) Message() []byte {
 	return b.msg
 }

+ 66 - 0
vendor/github.com/hashicorp/memberlist/config.go

@@ -1,10 +1,15 @@
 package memberlist
 
 import (
+	"fmt"
 	"io"
 	"log"
+	"net"
 	"os"
+	"strings"
 	"time"
+
+	multierror "github.com/hashicorp/go-multierror"
 )
 
 type Config struct {
@@ -116,6 +121,10 @@ type Config struct {
 	// indirect UDP pings.
 	DisableTcpPings bool
 
+	// DisableTcpPingsForNode is like DisableTcpPings, but lets you control
+	// whether to perform TCP pings on a node-by-node basis.
+	DisableTcpPingsForNode func(nodeName string) bool
+
 	// AwarenessMaxMultiplier will increase the probe interval if the node
 	// becomes aware that it might be degraded and not meeting the soft real
 	// time requirements to reliably probe other nodes.
@@ -215,6 +224,44 @@ type Config struct {
 	// This is a legacy name for backward compatibility but should really be
 	// called PacketBufferSize now that we have generalized the transport.
 	UDPBufferSize int
+
+	// DeadNodeReclaimTime controls the time before a dead node's name can be
+	// reclaimed by one with a different address or port. By default, this is 0,
+	// meaning nodes cannot be reclaimed this way.
+	DeadNodeReclaimTime time.Duration
+
+	// RequireNodeNames controls if the name of a node is required when sending
+	// a message to that node.
+	RequireNodeNames bool
+	// CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks
+	// allowed to connect (you must specify IPv6/IPv4 separately)
+	// Using [] will block all connections.
+	CIDRsAllowed []net.IPNet
+}
+
+// ParseCIDRs return a possible empty list of all Network that have been parsed
+// In case of error, it returns succesfully parsed CIDRs and the last error found
+func ParseCIDRs(v []string) ([]net.IPNet, error) {
+	nets := make([]net.IPNet, 0)
+	if v == nil {
+		return nets, nil
+	}
+	var errs error
+	hasErrors := false
+	for _, p := range v {
+		_, net, err := net.ParseCIDR(strings.TrimSpace(p))
+		if err != nil {
+			err = fmt.Errorf("invalid cidr: %s", p)
+			errs = multierror.Append(errs, err)
+			hasErrors = true
+		} else {
+			nets = append(nets, *net)
+		}
+	}
+	if !hasErrors {
+		errs = nil
+	}
+	return nets, errs
 }
 
 // DefaultLANConfig returns a sane set of configurations for Memberlist.
@@ -258,6 +305,7 @@ func DefaultLANConfig() *Config {
 
 		HandoffQueueDepth: 1024,
 		UDPBufferSize:     1400,
+		CIDRsAllowed:      nil, // same as allow all
 	}
 }
 
@@ -277,6 +325,24 @@ func DefaultWANConfig() *Config {
 	return conf
 }
 
+// IPMustBeChecked return true if IPAllowed must be called
+func (c *Config) IPMustBeChecked() bool {
+	return len(c.CIDRsAllowed) > 0
+}
+
+// IPAllowed return an error if access to memberlist is denied
+func (c *Config) IPAllowed(ip net.IP) error {
+	if !c.IPMustBeChecked() {
+		return nil
+	}
+	for _, n := range c.CIDRsAllowed {
+		if n.Contains(ip) {
+			return nil
+		}
+	}
+	return fmt.Errorf("%s is not allowed", ip)
+}
+
 // DefaultLocalConfig works like DefaultConfig, however it returns a configuration
 // that is optimized for a local loopback environments. The default configuration is
 // still very conservative and errs on the side of caution.

+ 6 - 3
vendor/github.com/hashicorp/memberlist/event_delegate.go

@@ -49,13 +49,16 @@ type NodeEvent struct {
 }
 
 func (c *ChannelEventDelegate) NotifyJoin(n *Node) {
-	c.Ch <- NodeEvent{NodeJoin, n}
+	node := *n
+	c.Ch <- NodeEvent{NodeJoin, &node}
 }
 
 func (c *ChannelEventDelegate) NotifyLeave(n *Node) {
-	c.Ch <- NodeEvent{NodeLeave, n}
+	node := *n
+	c.Ch <- NodeEvent{NodeLeave, &node}
 }
 
 func (c *ChannelEventDelegate) NotifyUpdate(n *Node) {
-	c.Ch <- NodeEvent{NodeUpdate, n}
+	node := *n
+	c.Ch <- NodeEvent{NodeUpdate, &node}
 }

+ 18 - 0
vendor/github.com/hashicorp/memberlist/go.mod

@@ -0,0 +1,18 @@
+module github.com/hashicorp/memberlist
+
+go 1.12
+
+require (
+	github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
+	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
+	github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
+	github.com/hashicorp/go-msgpack v0.5.3
+	github.com/hashicorp/go-multierror v1.0.0
+	github.com/hashicorp/go-sockaddr v1.0.0
+	github.com/miekg/dns v1.1.26
+	github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c // indirect
+	github.com/pmezard/go-difflib v1.0.0 // indirect
+	github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
+	github.com/stretchr/testify v1.2.2
+)

+ 8 - 0
vendor/github.com/hashicorp/memberlist/logging.go

@@ -13,6 +13,14 @@ func LogAddress(addr net.Addr) string {
 	return fmt.Sprintf("from=%s", addr.String())
 }
 
+func LogStringAddress(addr string) string {
+	if addr == "" {
+		return "from=<unknown address>"
+	}
+
+	return fmt.Sprintf("from=%s", addr)
+}
+
 func LogConn(conn net.Conn) string {
 	if conn == nil {
 		return LogAddress(nil)

+ 150 - 48
vendor/github.com/hashicorp/memberlist/memberlist.go

@@ -15,6 +15,8 @@ multiple routes.
 package memberlist
 
 import (
+	"container/list"
+	"errors"
 	"fmt"
 	"log"
 	"net"
@@ -30,10 +32,17 @@ import (
 	"github.com/miekg/dns"
 )
 
+var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided")
+
 type Memberlist struct {
 	sequenceNum uint32 // Local sequence number
 	incarnation uint32 // Local incarnation number
 	numNodes    uint32 // Number of known nodes (estimate)
+	pushPullReq uint32 // Number of push/pull requests
+
+	advertiseLock sync.RWMutex
+	advertiseAddr net.IP
+	advertisePort uint16
 
 	config         *Config
 	shutdown       int32 // Used as an atomic boolean value
@@ -44,13 +53,17 @@ type Memberlist struct {
 	shutdownLock sync.Mutex // Serializes calls to Shutdown
 	leaveLock    sync.Mutex // Serializes calls to Leave
 
-	transport Transport
-	handoff   chan msgHandoff
+	transport NodeAwareTransport
+
+	handoffCh            chan struct{}
+	highPriorityMsgQueue *list.List
+	lowPriorityMsgQueue  *list.List
+	msgQueueLock         sync.Mutex
 
 	nodeLock   sync.RWMutex
 	nodes      []*nodeState          // Known nodes
-	nodeMap    map[string]*nodeState // Maps Addr.String() -> NodeState
-	nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
+	nodeMap    map[string]*nodeState // Maps Node.Name -> NodeState
+	nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer
 	awareness  *awareness
 
 	tickerLock sync.Mutex
@@ -66,6 +79,15 @@ type Memberlist struct {
 	logger *log.Logger
 }
 
+// BuildVsnArray creates the array of Vsn
+func (conf *Config) BuildVsnArray() []uint8 {
+	return []uint8{
+		ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
+		conf.DelegateProtocolMin, conf.DelegateProtocolMax,
+		conf.DelegateProtocolVersion,
+	}
+}
+
 // newMemberlist creates the network listeners.
 // Does not schedule execution of background maintenance.
 func newMemberlist(conf *Config) (*Memberlist, error) {
@@ -159,22 +181,38 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
 		transport = nt
 	}
 
+	nodeAwareTransport, ok := transport.(NodeAwareTransport)
+	if !ok {
+		logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired")
+		nodeAwareTransport = &shimNodeAwareTransport{transport}
+	}
+
 	m := &Memberlist{
-		config:         conf,
-		shutdownCh:     make(chan struct{}),
-		leaveBroadcast: make(chan struct{}, 1),
-		transport:      transport,
-		handoff:        make(chan msgHandoff, conf.HandoffQueueDepth),
-		nodeMap:        make(map[string]*nodeState),
-		nodeTimers:     make(map[string]*suspicion),
-		awareness:      newAwareness(conf.AwarenessMaxMultiplier),
-		ackHandlers:    make(map[uint32]*ackHandler),
-		broadcasts:     &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
-		logger:         logger,
+		config:               conf,
+		shutdownCh:           make(chan struct{}),
+		leaveBroadcast:       make(chan struct{}, 1),
+		transport:            nodeAwareTransport,
+		handoffCh:            make(chan struct{}, 1),
+		highPriorityMsgQueue: list.New(),
+		lowPriorityMsgQueue:  list.New(),
+		nodeMap:              make(map[string]*nodeState),
+		nodeTimers:           make(map[string]*suspicion),
+		awareness:            newAwareness(conf.AwarenessMaxMultiplier),
+		ackHandlers:          make(map[uint32]*ackHandler),
+		broadcasts:           &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
+		logger:               logger,
 	}
 	m.broadcasts.NumNodes = func() int {
 		return m.estNumNodes()
 	}
+
+	// Get the final advertise address from the transport, which may need
+	// to see which address we bound to. We'll refresh this each time we
+	// send out an alive message.
+	if _, _, err := m.refreshAdvertise(); err != nil {
+		return nil, err
+	}
+
 	go m.streamListen()
 	go m.packetListen()
 	go m.packetHandler()
@@ -222,7 +260,8 @@ func (m *Memberlist) Join(existing []string) (int, error) {
 
 		for _, addr := range addrs {
 			hp := joinHostPort(addr.ip.String(), addr.port)
-			if err := m.pushPullNode(hp, true); err != nil {
+			a := Address{Addr: hp, Name: addr.nodeName}
+			if err := m.pushPullNode(a, true); err != nil {
 				err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
 				errs = multierror.Append(errs, err)
 				m.logger.Printf("[DEBUG] memberlist: %v", err)
@@ -240,8 +279,9 @@ func (m *Memberlist) Join(existing []string) (int, error) {
 
 // ipPort holds information about a node we want to try to join.
 type ipPort struct {
-	ip   net.IP
-	port uint16
+	ip       net.IP
+	port     uint16
+	nodeName string // optional
 }
 
 // tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
@@ -250,7 +290,7 @@ type ipPort struct {
 // Consul's. By doing the TCP lookup directly, we get the best chance for the
 // largest list of hosts to join. Since joins are relatively rare events, it's ok
 // to do this rather expensive operation.
-func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) {
+func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) {
 	// Don't attempt any TCP lookups against non-fully qualified domain
 	// names, since those will likely come from the resolv.conf file.
 	if !strings.Contains(host, ".") {
@@ -292,9 +332,9 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
 		for _, r := range in.Answer {
 			switch rr := r.(type) {
 			case (*dns.A):
-				ips = append(ips, ipPort{rr.A, defaultPort})
+				ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName})
 			case (*dns.AAAA):
-				ips = append(ips, ipPort{rr.AAAA, defaultPort})
+				ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName})
 			case (*dns.CNAME):
 				m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
 			}
@@ -308,6 +348,16 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
 // resolveAddr is used to resolve the address into an address,
 // port, and error. If no port is given, use the default
 func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
+	// First peel off any leading node name. This is optional.
+	nodeName := ""
+	if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 {
+		if slashIdx == 0 {
+			return nil, fmt.Errorf("empty node name provided")
+		}
+		nodeName = hostStr[0:slashIdx]
+		hostStr = hostStr[slashIdx+1:]
+	}
+
 	// This captures the supplied port, or the default one.
 	hostStr = ensurePort(hostStr, m.config.BindPort)
 	host, sport, err := net.SplitHostPort(hostStr)
@@ -324,13 +374,15 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
 	// will make sure the host part is in good shape for parsing, even for
 	// IPv6 addresses.
 	if ip := net.ParseIP(host); ip != nil {
-		return []ipPort{ipPort{ip, port}}, nil
+		return []ipPort{
+			ipPort{ip: ip, port: port, nodeName: nodeName},
+		}, nil
 	}
 
 	// First try TCP so we have the best chance for the largest list of
 	// hosts to join. If this fails it's not fatal since this isn't a standard
 	// way to query DNS, and we have a fallback below.
-	ips, err := m.tcpLookupIP(host, port)
+	ips, err := m.tcpLookupIP(host, port, nodeName)
 	if err != nil {
 		m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
 	}
@@ -347,7 +399,7 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
 	}
 	ips = make([]ipPort, 0, len(ans))
 	for _, ip := range ans {
-		ips = append(ips, ipPort{ip, port})
+		ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName})
 	}
 	return ips, nil
 }
@@ -358,10 +410,9 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
 func (m *Memberlist) setAlive() error {
 	// Get the final advertise address from the transport, which may need
 	// to see which address we bound to.
-	addr, port, err := m.transport.FinalAdvertiseAddr(
-		m.config.AdvertiseAddr, m.config.AdvertisePort)
+	addr, port, err := m.refreshAdvertise()
 	if err != nil {
-		return fmt.Errorf("Failed to get final advertise address: %v", err)
+		return err
 	}
 
 	// Check if this is a public address without encryption
@@ -394,16 +445,36 @@ func (m *Memberlist) setAlive() error {
 		Addr:        addr,
 		Port:        uint16(port),
 		Meta:        meta,
-		Vsn: []uint8{
-			ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
-			m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
-			m.config.DelegateProtocolVersion,
-		},
+		Vsn:         m.config.BuildVsnArray(),
 	}
 	m.aliveNode(&a, nil, true)
+
 	return nil
 }
 
+func (m *Memberlist) getAdvertise() (net.IP, uint16) {
+	m.advertiseLock.RLock()
+	defer m.advertiseLock.RUnlock()
+	return m.advertiseAddr, m.advertisePort
+}
+
+func (m *Memberlist) setAdvertise(addr net.IP, port int) {
+	m.advertiseLock.Lock()
+	defer m.advertiseLock.Unlock()
+	m.advertiseAddr = addr
+	m.advertisePort = uint16(port)
+}
+
+func (m *Memberlist) refreshAdvertise() (net.IP, int, error) {
+	addr, port, err := m.transport.FinalAdvertiseAddr(
+		m.config.AdvertiseAddr, m.config.AdvertisePort)
+	if err != nil {
+		return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err)
+	}
+	m.setAdvertise(addr, port)
+	return addr, port, nil
+}
+
 // LocalNode is used to return the local Node
 func (m *Memberlist) LocalNode() *Node {
 	m.nodeLock.RLock()
@@ -439,11 +510,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
 		Addr:        state.Addr,
 		Port:        state.Port,
 		Meta:        meta,
-		Vsn: []uint8{
-			ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
-			m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
-			m.config.DelegateProtocolVersion,
-		},
+		Vsn:         m.config.BuildVsnArray(),
 	}
 	notifyCh := make(chan struct{})
 	m.aliveNode(&a, notifyCh, true)
@@ -463,24 +530,29 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
 	return nil
 }
 
-// SendTo is deprecated in favor of SendBestEffort, which requires a node to
-// target.
+// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to
+// target. If you don't have a node then use SendToAddress.
 func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
+	a := Address{Addr: to.String(), Name: ""}
+	return m.SendToAddress(a, msg)
+}
+
+func (m *Memberlist) SendToAddress(a Address, msg []byte) error {
 	// Encode as a user message
 	buf := make([]byte, 1, len(msg)+1)
 	buf[0] = byte(userMsg)
 	buf = append(buf, msg...)
 
 	// Send the message
-	return m.rawSendMsgPacket(to.String(), nil, buf)
+	return m.rawSendMsgPacket(a, nil, buf)
 }
 
-// SendToUDP is deprecated in favor of SendBestEffort.
+// Deprecated: SendToUDP is deprecated in favor of SendBestEffort.
 func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
 	return m.SendBestEffort(to, msg)
 }
 
-// SendToTCP is deprecated in favor of SendReliable.
+// Deprecated: SendToTCP is deprecated in favor of SendReliable.
 func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
 	return m.SendReliable(to, msg)
 }
@@ -496,7 +568,8 @@ func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
 	buf = append(buf, msg...)
 
 	// Send the message
-	return m.rawSendMsgPacket(to.Address(), to, buf)
+	a := Address{Addr: to.Address(), Name: to.Name}
+	return m.rawSendMsgPacket(a, to, buf)
 }
 
 // SendReliable uses the reliable stream-oriented interface of the transport to
@@ -504,7 +577,7 @@ func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
 // mechanism). Delivery is guaranteed if no error is returned, and there is no
 // limit on the size of the message.
 func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
-	return m.sendUserMsg(to.Address(), msg)
+	return m.sendUserMsg(to.FullAddress(), msg)
 }
 
 // Members returns a list of all known live nodes. The node structures
@@ -516,7 +589,7 @@ func (m *Memberlist) Members() []*Node {
 
 	nodes := make([]*Node, 0, len(m.nodes))
 	for _, n := range m.nodes {
-		if n.State != stateDead {
+		if !n.DeadOrLeft() {
 			nodes = append(nodes, &n.Node)
 		}
 	}
@@ -533,7 +606,7 @@ func (m *Memberlist) NumMembers() (alive int) {
 	defer m.nodeLock.RUnlock()
 
 	for _, n := range m.nodes {
-		if n.State != stateDead {
+		if !n.DeadOrLeft() {
 			alive++
 		}
 	}
@@ -570,9 +643,14 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
 			return nil
 		}
 
+		// This dead message is special, because Node and From are the
+		// same. This helps other nodes figure out that a node left
+		// intentionally. When Node equals From, other nodes know for
+		// sure this node is gone.
 		d := dead{
 			Incarnation: state.Incarnation,
 			Node:        state.Name,
+			From:        state.Name,
 		}
 		m.deadNode(&d)
 
@@ -598,7 +676,7 @@ func (m *Memberlist) anyAlive() bool {
 	m.nodeLock.RLock()
 	defer m.nodeLock.RUnlock()
 	for _, n := range m.nodes {
-		if n.State != stateDead && n.Name != m.config.Name {
+		if !n.DeadOrLeft() && n.Name != m.config.Name {
 			return true
 		}
 	}
@@ -621,7 +699,7 @@ func (m *Memberlist) ProtocolVersion() uint8 {
 	return m.config.ProtocolVersion
 }
 
-// Shutdown will stop any background maintanence of network activity
+// Shutdown will stop any background maintenance of network activity
 // for this memberlist, causing it to appear "dead". A leave message
 // will not be broadcasted prior, so the cluster being left will have
 // to detect this node's shutdown using probing. If you wish to more
@@ -657,3 +735,27 @@ func (m *Memberlist) hasShutdown() bool {
 func (m *Memberlist) hasLeft() bool {
 	return atomic.LoadInt32(&m.leave) == 1
 }
+
+func (m *Memberlist) getNodeState(addr string) NodeStateType {
+	m.nodeLock.RLock()
+	defer m.nodeLock.RUnlock()
+
+	n := m.nodeMap[addr]
+	return n.State
+}
+
+func (m *Memberlist) getNodeStateChange(addr string) time.Time {
+	m.nodeLock.RLock()
+	defer m.nodeLock.RUnlock()
+
+	n := m.nodeMap[addr]
+	return n.StateChange
+}
+
+func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
+	m.nodeLock.Lock()
+	defer m.nodeLock.Unlock()
+
+	n := m.nodeMap[addr]
+	f(n)
+}

+ 87 - 13
vendor/github.com/hashicorp/memberlist/mock_transport.go

@@ -1,7 +1,9 @@
 package memberlist
 
 import (
+	"bytes"
 	"fmt"
+	"io"
 	"net"
 	"strconv"
 	"time"
@@ -10,26 +12,33 @@ import (
 // MockNetwork is used as a factory that produces MockTransport instances which
 // are uniquely addressed and wired up to talk to each other.
 type MockNetwork struct {
-	transports map[string]*MockTransport
-	port       int
+	transportsByAddr map[string]*MockTransport
+	transportsByName map[string]*MockTransport
+	port             int
 }
 
 // NewTransport returns a new MockTransport with a unique address, wired up to
 // talk to the other transports in the MockNetwork.
-func (n *MockNetwork) NewTransport() *MockTransport {
+func (n *MockNetwork) NewTransport(name string) *MockTransport {
 	n.port += 1
 	addr := fmt.Sprintf("127.0.0.1:%d", n.port)
 	transport := &MockTransport{
 		net:      n,
-		addr:     &MockAddress{addr},
+		addr:     &MockAddress{addr, name},
 		packetCh: make(chan *Packet),
 		streamCh: make(chan net.Conn),
 	}
 
-	if n.transports == nil {
-		n.transports = make(map[string]*MockTransport)
+	if n.transportsByAddr == nil {
+		n.transportsByAddr = make(map[string]*MockTransport)
 	}
-	n.transports[addr] = transport
+	n.transportsByAddr[addr] = transport
+
+	if n.transportsByName == nil {
+		n.transportsByName = make(map[string]*MockTransport)
+	}
+	n.transportsByName[name] = transport
+
 	return transport
 }
 
@@ -37,6 +46,7 @@ func (n *MockNetwork) NewTransport() *MockTransport {
 // address scheme.
 type MockAddress struct {
 	addr string
+	name string
 }
 
 // See net.Addr.
@@ -57,6 +67,8 @@ type MockTransport struct {
 	streamCh chan net.Conn
 }
 
+var _ NodeAwareTransport = (*MockTransport)(nil)
+
 // See Transport.
 func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
 	host, portStr, err := net.SplitHostPort(t.addr.String())
@@ -79,9 +91,15 @@ func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
 
 // See Transport.
 func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) {
-	dest, ok := t.net.transports[addr]
-	if !ok {
-		return time.Time{}, fmt.Errorf("No route to %q", addr)
+	a := Address{Addr: addr, Name: ""}
+	return t.WriteToAddress(b, a)
+}
+
+// See NodeAwareTransport.
+func (t *MockTransport) WriteToAddress(b []byte, a Address) (time.Time, error) {
+	dest, err := t.getPeer(a)
+	if err != nil {
+		return time.Time{}, err
 	}
 
 	now := time.Now()
@@ -98,11 +116,45 @@ func (t *MockTransport) PacketCh() <-chan *Packet {
 	return t.packetCh
 }
 
+// See NodeAwareTransport.
+func (t *MockTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
+	if shouldClose {
+		defer conn.Close()
+	}
+
+	// Copy everything from the stream into packet buffer.
+	var buf bytes.Buffer
+	if _, err := io.Copy(&buf, conn); err != nil {
+		return fmt.Errorf("failed to read packet: %v", err)
+	}
+
+	// Check the length - it needs to have at least one byte to be a proper
+	// message. This is checked elsewhere for writes coming in directly from
+	// the UDP socket.
+	if n := buf.Len(); n < 1 {
+		return fmt.Errorf("packet too short (%d bytes) %s", n, LogAddress(addr))
+	}
+
+	// Inject the packet.
+	t.packetCh <- &Packet{
+		Buf:       buf.Bytes(),
+		From:      addr,
+		Timestamp: now,
+	}
+	return nil
+}
+
 // See Transport.
 func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
-	dest, ok := t.net.transports[addr]
-	if !ok {
-		return nil, fmt.Errorf("No route to %q", addr)
+	a := Address{Addr: addr, Name: ""}
+	return t.DialAddressTimeout(a, timeout)
+}
+
+// See NodeAwareTransport.
+func (t *MockTransport) DialAddressTimeout(a Address, timeout time.Duration) (net.Conn, error) {
+	dest, err := t.getPeer(a)
+	if err != nil {
+		return nil, err
 	}
 
 	p1, p2 := net.Pipe()
@@ -115,7 +167,29 @@ func (t *MockTransport) StreamCh() <-chan net.Conn {
 	return t.streamCh
 }
 
+// See NodeAwareTransport.
+func (t *MockTransport) IngestStream(conn net.Conn) error {
+	t.streamCh <- conn
+	return nil
+}
+
 // See Transport.
 func (t *MockTransport) Shutdown() error {
 	return nil
 }
+
+func (t *MockTransport) getPeer(a Address) (*MockTransport, error) {
+	var (
+		dest *MockTransport
+		ok   bool
+	)
+	if a.Name != "" {
+		dest, ok = t.net.transportsByName[a.Name]
+	} else {
+		dest, ok = t.net.transportsByAddr[a.Addr]
+	}
+	if !ok {
+		return nil, fmt.Errorf("No route to %s", a)
+	}
+	return dest, nil
+}

+ 224 - 63
vendor/github.com/hashicorp/memberlist/net.go

@@ -8,9 +8,10 @@ import (
 	"hash/crc32"
 	"io"
 	"net"
+	"sync/atomic"
 	"time"
 
-	"github.com/armon/go-metrics"
+	metrics "github.com/armon/go-metrics"
 	"github.com/hashicorp/go-msgpack/codec"
 )
 
@@ -71,7 +72,8 @@ const (
 	compoundOverhead       = 2   // Assumed overhead per entry in compoundHeader
 	userMsgOverhead        = 1
 	blockingWarning        = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
-	maxPushStateBytes      = 10 * 1024 * 1024
+	maxPushStateBytes      = 20 * 1024 * 1024
+	maxPushPullRequests    = 128 // Maximum number of concurrent push/pull requests
 )
 
 // ping request sent directly to node
@@ -82,15 +84,28 @@ type ping struct {
 	// the intended recipient. This is to protect again an agent
 	// restart with a new name.
 	Node string
+
+	SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply
+	SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply
+	SourceNode string `codec:",omitempty"` // Source name, used for a direct reply
 }
 
-// indirect ping sent to an indirect ndoe
+// indirect ping sent to an indirect node
 type indirectPingReq struct {
 	SeqNo  uint32
 	Target []byte
 	Port   uint16
-	Node   string
-	Nack   bool // true if we'd like a nack back
+
+	// Node is sent so the target can verify they are
+	// the intended recipient. This is to protect against an agent
+	// restart with a new name.
+	Node string
+
+	Nack bool // true if we'd like a nack back
+
+	SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply
+	SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply
+	SourceNode string `codec:",omitempty"` // Source name, used for a direct reply
 }
 
 // ack response is sent for a ping
@@ -161,7 +176,7 @@ type pushNodeState struct {
 	Port        uint16
 	Meta        []byte
 	Incarnation uint32
-	State       nodeStateType
+	State       NodeStateType
 	Vsn         []uint8 // Protocol versions
 }
 
@@ -205,9 +220,9 @@ func (m *Memberlist) streamListen() {
 
 // handleConn handles a single incoming stream connection from the transport.
 func (m *Memberlist) handleConn(conn net.Conn) {
+	defer conn.Close()
 	m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
 
-	defer conn.Close()
 	metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
 
 	conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
@@ -238,6 +253,16 @@ func (m *Memberlist) handleConn(conn net.Conn) {
 			m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
 		}
 	case pushPullMsg:
+		// Increment counter of pending push/pulls
+		numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
+		defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))
+
+		// Check if we have too many open push/pull requests
+		if numConcurrent >= maxPushPullRequests {
+			m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
+			return
+		}
+
 		join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
 		if err != nil {
 			m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
@@ -330,6 +355,10 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
 }
 
 func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
+	if len(buf) < 1 {
+		m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from))
+		return
+	}
 	// Decode the message type
 	msgType := messageType(buf[0])
 	buf = buf[1:]
@@ -357,10 +386,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
 	case deadMsg:
 		fallthrough
 	case userMsg:
+		// Determine the message queue, prioritize alive
+		queue := m.lowPriorityMsgQueue
+		if msgType == aliveMsg {
+			queue = m.highPriorityMsgQueue
+		}
+
+		// Check for overflow and append if not full
+		m.msgQueueLock.Lock()
+		if queue.Len() >= m.config.HandoffQueueDepth {
+			m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
+		} else {
+			queue.PushBack(msgHandoff{msgType, buf, from})
+		}
+		m.msgQueueLock.Unlock()
+
+		// Notify of pending message
 		select {
-		case m.handoff <- msgHandoff{msgType, buf, from}:
+		case m.handoffCh <- struct{}{}:
 		default:
-			m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
 		}
 
 	default:
@@ -368,28 +412,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
 	}
 }
 
+// getNextMessage returns the next message to process in priority order, using LIFO
+func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
+	m.msgQueueLock.Lock()
+	defer m.msgQueueLock.Unlock()
+
+	if el := m.highPriorityMsgQueue.Back(); el != nil {
+		m.highPriorityMsgQueue.Remove(el)
+		msg := el.Value.(msgHandoff)
+		return msg, true
+	} else if el := m.lowPriorityMsgQueue.Back(); el != nil {
+		m.lowPriorityMsgQueue.Remove(el)
+		msg := el.Value.(msgHandoff)
+		return msg, true
+	}
+	return msgHandoff{}, false
+}
+
 // packetHandler is a long running goroutine that processes messages received
 // over the packet interface, but is decoupled from the listener to avoid
 // blocking the listener which may cause ping/ack messages to be delayed.
 func (m *Memberlist) packetHandler() {
 	for {
 		select {
-		case msg := <-m.handoff:
-			msgType := msg.msgType
-			buf := msg.buf
-			from := msg.from
-
-			switch msgType {
-			case suspectMsg:
-				m.handleSuspect(buf, from)
-			case aliveMsg:
-				m.handleAlive(buf, from)
-			case deadMsg:
-				m.handleDead(buf, from)
-			case userMsg:
-				m.handleUser(buf, from)
-			default:
-				m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
+		case <-m.handoffCh:
+			for {
+				msg, ok := m.getNextMessage()
+				if !ok {
+					break
+				}
+				msgType := msg.msgType
+				buf := msg.buf
+				from := msg.from
+
+				switch msgType {
+				case suspectMsg:
+					m.handleSuspect(buf, from)
+				case aliveMsg:
+					m.handleAlive(buf, from)
+				case deadMsg:
+					m.handleDead(buf, from)
+				case userMsg:
+					m.handleUser(buf, from)
+				default:
+					m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
+				}
 			}
 
 		case <-m.shutdownCh:
@@ -433,7 +500,19 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
 	if m.config.Ping != nil {
 		ack.Payload = m.config.Ping.AckPayload()
 	}
-	if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
+
+	addr := ""
+	if len(p.SourceAddr) > 0 && p.SourcePort > 0 {
+		addr = joinHostPort(net.IP(p.SourceAddr).String(), p.SourcePort)
+	} else {
+		addr = from.String()
+	}
+
+	a := Address{
+		Addr: addr,
+		Name: p.SourceNode,
+	}
+	if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil {
 		m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
 	}
 }
@@ -453,7 +532,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
 
 	// Send a ping to the correct host.
 	localSeqNo := m.nextSeqNo()
-	ping := ping{SeqNo: localSeqNo, Node: ind.Node}
+	selfAddr, selfPort := m.getAdvertise()
+	ping := ping{
+		SeqNo: localSeqNo,
+		Node:  ind.Node,
+		// The outbound message is addressed FROM us.
+		SourceAddr: selfAddr,
+		SourcePort: selfPort,
+		SourceNode: m.config.Name,
+	}
+
+	// Forward the ack back to the requestor. If the request encodes an origin
+	// use that otherwise assume that the other end of the UDP socket is
+	// usable.
+	indAddr := ""
+	if len(ind.SourceAddr) > 0 && ind.SourcePort > 0 {
+		indAddr = joinHostPort(net.IP(ind.SourceAddr).String(), ind.SourcePort)
+	} else {
+		indAddr = from.String()
+	}
 
 	// Setup a response handler to relay the ack
 	cancelCh := make(chan struct{})
@@ -461,18 +558,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
 		// Try to prevent the nack if we've caught it in time.
 		close(cancelCh)
 
-		// Forward the ack back to the requestor.
 		ack := ackResp{ind.SeqNo, nil}
-		if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
-			m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
+		a := Address{
+			Addr: indAddr,
+			Name: ind.SourceNode,
+		}
+		if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil {
+			m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogStringAddress(indAddr))
 		}
 	}
 	m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
 
 	// Send the ping.
 	addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
-	if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
-		m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
+	a := Address{
+		Addr: addr,
+		Name: ind.Node,
+	}
+	if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil {
+		m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s %s", err, LogStringAddress(indAddr))
 	}
 
 	// Setup a timer to fire off a nack if no ack is seen in time.
@@ -483,8 +587,12 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
 				return
 			case <-time.After(m.config.ProbeTimeout):
 				nack := nackResp{ind.SeqNo}
-				if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil {
-					m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
+				a := Address{
+					Addr: indAddr,
+					Name: ind.SourceNode,
+				}
+				if err := m.encodeAndSendMsg(a, nackRespMsg, &nack); err != nil {
+					m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogStringAddress(indAddr))
 				}
 			}
 		}()
@@ -518,12 +626,47 @@ func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
 	m.suspectNode(&sus)
 }
 
+// ensureCanConnect return the IP from a RemoteAddress
+// return error if this client must not connect
+func (m *Memberlist) ensureCanConnect(from net.Addr) error {
+	if !m.config.IPMustBeChecked() {
+		return nil
+	}
+	source := from.String()
+	if source == "pipe" {
+		return nil
+	}
+	host, _, err := net.SplitHostPort(source)
+	if err != nil {
+		return err
+	}
+
+	ip := net.ParseIP(host)
+	if ip == nil {
+		return fmt.Errorf("Cannot parse IP from %s", host)
+	}
+	return m.config.IPAllowed(ip)
+}
+
 func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
+	if err := m.ensureCanConnect(from); err != nil {
+		m.logger.Printf("[DEBUG] memberlist: Blocked alive message: %s %s", err, LogAddress(from))
+		return
+	}
 	var live alive
 	if err := decode(buf, &live); err != nil {
 		m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from))
 		return
 	}
+	if m.config.IPMustBeChecked() {
+		innerIP := net.IP(live.Addr)
+		if innerIP != nil {
+			if err := m.config.IPAllowed(innerIP); err != nil {
+				m.logger.Printf("[DEBUG] memberlist: Blocked alive.Addr=%s message from: %s %s", innerIP.String(), err, LogAddress(from))
+				return
+			}
+		}
+	}
 
 	// For proto versions < 2, there is no port provided. Mask old
 	// behavior by using the configured port
@@ -565,12 +708,12 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.
 }
 
 // encodeAndSendMsg is used to combine the encoding and sending steps
-func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error {
+func (m *Memberlist) encodeAndSendMsg(a Address, msgType messageType, msg interface{}) error {
 	out, err := encode(msgType, msg)
 	if err != nil {
 		return err
 	}
-	if err := m.sendMsg(addr, out.Bytes()); err != nil {
+	if err := m.sendMsg(a, out.Bytes()); err != nil {
 		return err
 	}
 	return nil
@@ -578,7 +721,7 @@ func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg inte
 
 // sendMsg is used to send a message via packet to another host. It will
 // opportunistically create a compoundMsg and piggy back other broadcasts.
-func (m *Memberlist) sendMsg(addr string, msg []byte) error {
+func (m *Memberlist) sendMsg(a Address, msg []byte) error {
 	// Check if we can piggy back any messages
 	bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
 	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
@@ -588,7 +731,7 @@ func (m *Memberlist) sendMsg(addr string, msg []byte) error {
 
 	// Fast path if nothing to piggypack
 	if len(extra) == 0 {
-		return m.rawSendMsgPacket(addr, nil, msg)
+		return m.rawSendMsgPacket(a, nil, msg)
 	}
 
 	// Join all the messages
@@ -600,12 +743,16 @@ func (m *Memberlist) sendMsg(addr string, msg []byte) error {
 	compound := makeCompoundMessage(msgs)
 
 	// Send the message
-	return m.rawSendMsgPacket(addr, nil, compound.Bytes())
+	return m.rawSendMsgPacket(a, nil, compound.Bytes())
 }
 
 // rawSendMsgPacket is used to send message via packet to another host without
 // modification, other than compression or encryption if enabled.
-func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error {
+func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error {
+	if a.Name == "" && m.config.RequireNodeNames {
+		return errNodeNamesAreRequired
+	}
+
 	// Check if we have compression enabled
 	if m.config.EnableCompression {
 		buf, err := compressPayload(msg)
@@ -619,11 +766,12 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error
 		}
 	}
 
-	// Try to look up the destination node
+	// Try to look up the destination node. Note this will only work if the
+	// bare ip address is used as the node name, which is not guaranteed.
 	if node == nil {
-		toAddr, _, err := net.SplitHostPort(addr)
+		toAddr, _, err := net.SplitHostPort(a.Addr)
 		if err != nil {
-			m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err)
+			m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", a.Addr, err)
 			return err
 		}
 		m.nodeLock.RLock()
@@ -658,14 +806,14 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error
 	}
 
 	metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
-	_, err := m.transport.WriteTo(msg, addr)
+	_, err := m.transport.WriteToAddress(msg, a)
 	return err
 }
 
 // rawSendMsgStream is used to stream a message to another host without
 // modification, other than applying compression and encryption if enabled.
 func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
-	// Check if compresion is enabled
+	// Check if compression is enabled
 	if m.config.EnableCompression {
 		compBuf, err := compressPayload(sendBuf)
 		if err != nil {
@@ -698,8 +846,12 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
 }
 
 // sendUserMsg is used to stream a user message to another host.
-func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
-	conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
+func (m *Memberlist) sendUserMsg(a Address, sendBuf []byte) error {
+	if a.Name == "" && m.config.RequireNodeNames {
+		return errNodeNamesAreRequired
+	}
+
+	conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout)
 	if err != nil {
 		return err
 	}
@@ -724,14 +876,18 @@ func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
 
 // sendAndReceiveState is used to initiate a push/pull over a stream with a
 // remote host.
-func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) {
+func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, []byte, error) {
+	if a.Name == "" && m.config.RequireNodeNames {
+		return nil, nil, errNodeNamesAreRequired
+	}
+
 	// Attempt to connect
-	conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
+	conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout)
 	if err != nil {
 		return nil, nil, err
 	}
 	defer conn.Close()
-	m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s", conn.RemoteAddr())
+	m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr())
 	metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)
 
 	// Send our state
@@ -996,16 +1152,17 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
 		nodes := make([]*Node, len(remoteNodes))
 		for idx, n := range remoteNodes {
 			nodes[idx] = &Node{
-				Name: n.Name,
-				Addr: n.Addr,
-				Port: n.Port,
-				Meta: n.Meta,
-				PMin: n.Vsn[0],
-				PMax: n.Vsn[1],
-				PCur: n.Vsn[2],
-				DMin: n.Vsn[3],
-				DMax: n.Vsn[4],
-				DCur: n.Vsn[5],
+				Name:  n.Name,
+				Addr:  n.Addr,
+				Port:  n.Port,
+				Meta:  n.Meta,
+				State: n.State,
+				PMin:  n.Vsn[0],
+				PMax:  n.Vsn[1],
+				PCur:  n.Vsn[2],
+				DMin:  n.Vsn[3],
+				DMax:  n.Vsn[4],
+				DCur:  n.Vsn[5],
 			}
 		}
 		if err := m.config.Merge.NotifyMerge(nodes); err != nil {
@@ -1058,8 +1215,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
 // a ping, and waits for an ack. All of this is done as a series of blocking
 // operations, given the deadline. The bool return parameter is true if we
 // we able to round trip a ping to the other node.
-func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
-	conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
+func (m *Memberlist) sendPingAndWaitForAck(a Address, ping ping, deadline time.Time) (bool, error) {
+	if a.Name == "" && m.config.RequireNodeNames {
+		return false, errNodeNamesAreRequired
+	}
+
+	conn, err := m.transport.DialAddressTimeout(a, deadline.Sub(time.Now()))
 	if err != nil {
 		// If the node is actually dead we expect this to fail, so we
 		// shouldn't spam the logs with it. After this point, errors
@@ -1094,7 +1255,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time
 	}
 
 	if ack.SeqNo != ping.SeqNo {
-		return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
+		return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
 	}
 
 	return true, nil

+ 77 - 0
vendor/github.com/hashicorp/memberlist/net_transport.go

@@ -1,7 +1,9 @@
 package memberlist
 
 import (
+	"bytes"
 	"fmt"
+	"io"
 	"log"
 	"net"
 	"sync"
@@ -48,6 +50,8 @@ type NetTransport struct {
 	shutdown     int32
 }
 
+var _ NodeAwareTransport = (*NetTransport)(nil)
+
 // NewNetTransport returns a net transport with the given configuration. On
 // success all the network listeners will be created and listening.
 func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
@@ -170,6 +174,14 @@ func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err
 
 // See Transport.
 func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) {
+	a := Address{Addr: addr, Name: ""}
+	return t.WriteToAddress(b, a)
+}
+
+// See NodeAwareTransport.
+func (t *NetTransport) WriteToAddress(b []byte, a Address) (time.Time, error) {
+	addr := a.Addr
+
 	udpAddr, err := net.ResolveUDPAddr("udp", addr)
 	if err != nil {
 		return time.Time{}, err
@@ -188,8 +200,44 @@ func (t *NetTransport) PacketCh() <-chan *Packet {
 	return t.packetCh
 }
 
+// See IngestionAwareTransport.
+func (t *NetTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
+	if shouldClose {
+		defer conn.Close()
+	}
+
+	// Copy everything from the stream into packet buffer.
+	var buf bytes.Buffer
+	if _, err := io.Copy(&buf, conn); err != nil {
+		return fmt.Errorf("failed to read packet: %v", err)
+	}
+
+	// Check the length - it needs to have at least one byte to be a proper
+	// message. This is checked elsewhere for writes coming in directly from
+	// the UDP socket.
+	if n := buf.Len(); n < 1 {
+		return fmt.Errorf("packet too short (%d bytes) %s", n, LogAddress(addr))
+	}
+
+	// Inject the packet.
+	t.packetCh <- &Packet{
+		Buf:       buf.Bytes(),
+		From:      addr,
+		Timestamp: now,
+	}
+	return nil
+}
+
 // See Transport.
 func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
+	a := Address{Addr: addr, Name: ""}
+	return t.DialAddressTimeout(a, timeout)
+}
+
+// See NodeAwareTransport.
+func (t *NetTransport) DialAddressTimeout(a Address, timeout time.Duration) (net.Conn, error) {
+	addr := a.Addr
+
 	dialer := net.Dialer{Timeout: timeout}
 	return dialer.Dial("tcp", addr)
 }
@@ -199,6 +247,12 @@ func (t *NetTransport) StreamCh() <-chan net.Conn {
 	return t.streamCh
 }
 
+// See IngestionAwareTransport.
+func (t *NetTransport) IngestStream(conn net.Conn) error {
+	t.streamCh <- conn
+	return nil
+}
+
 // See Transport.
 func (t *NetTransport) Shutdown() error {
 	// This will avoid log spam about errors when we shut down.
@@ -221,6 +275,16 @@ func (t *NetTransport) Shutdown() error {
 // and hands them off to the stream channel.
 func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
 	defer t.wg.Done()
+
+	// baseDelay is the initial delay after an AcceptTCP() error before attempting again
+	const baseDelay = 5 * time.Millisecond
+
+	// maxDelay is the maximum delay after an AcceptTCP() error before attempting again.
+	// In the case that tcpListen() is error-looping, it will delay the shutdown check.
+	// Therefore, changes to maxDelay may have an effect on the latency of shutdown.
+	const maxDelay = 1 * time.Second
+
+	var loopDelay time.Duration
 	for {
 		conn, err := tcpLn.AcceptTCP()
 		if err != nil {
@@ -228,9 +292,22 @@ func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
 				break
 			}
 
+			if loopDelay == 0 {
+				loopDelay = baseDelay
+			} else {
+				loopDelay *= 2
+			}
+
+			if loopDelay > maxDelay {
+				loopDelay = maxDelay
+			}
+
 			t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
+			time.Sleep(loopDelay)
 			continue
 		}
+		// No error, reset loop delay
+		loopDelay = 0
 
 		t.streamCh <- conn
 	}

+ 335 - 80
vendor/github.com/hashicorp/memberlist/queue.go

@@ -1,8 +1,10 @@
 package memberlist
 
 import (
-	"sort"
+	"math"
 	"sync"
+
+	"github.com/google/btree"
 )
 
 // TransmitLimitedQueue is used to queue messages to broadcast to
@@ -19,15 +21,93 @@ type TransmitLimitedQueue struct {
 	// number of retransmissions attempted.
 	RetransmitMult int
 
-	sync.Mutex
-	bcQueue limitedBroadcasts
+	mu    sync.Mutex
+	tq    *btree.BTree // stores *limitedBroadcast as btree.Item
+	tm    map[string]*limitedBroadcast
+	idGen int64
 }
 
 type limitedBroadcast struct {
-	transmits int // Number of transmissions attempted.
+	transmits int   // btree-key[0]: Number of transmissions attempted.
+	msgLen    int64 // btree-key[1]: copied from len(b.Message())
+	id        int64 // btree-key[2]: unique incrementing id stamped at submission time
 	b         Broadcast
+
+	name string // set if Broadcast is a NamedBroadcast
+}
+
+// Less tests whether the current item is less than the given argument.
+//
+// This must provide a strict weak ordering.
+// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
+// hold one of either a or b in the tree).
+//
+// default ordering is
+// - [transmits=0, ..., transmits=inf]
+// - [transmits=0:len=999, ..., transmits=0:len=2, ...]
+// - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...]
+func (b *limitedBroadcast) Less(than btree.Item) bool {
+	o := than.(*limitedBroadcast)
+	if b.transmits < o.transmits {
+		return true
+	} else if b.transmits > o.transmits {
+		return false
+	}
+	if b.msgLen > o.msgLen {
+		return true
+	} else if b.msgLen < o.msgLen {
+		return false
+	}
+	return b.id > o.id
+}
+
+// for testing; emits in transmit order if reverse=false
+func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast {
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	out := make([]*limitedBroadcast, 0, q.lenLocked())
+	q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool {
+		out = append(out, cur)
+		return true
+	})
+
+	return out
+}
+
+// walkReadOnlyLocked calls f for each item in the queue traversing it in
+// natural order (by Less) when reverse=false and the opposite when true. You
+// must hold the mutex.
+//
+// This method panics if you attempt to mutate the item during traversal.  The
+// underlying btree should also not be mutated during traversal.
+func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) {
+	if q.lenLocked() == 0 {
+		return
+	}
+
+	iter := func(item btree.Item) bool {
+		cur := item.(*limitedBroadcast)
+
+		prevTransmits := cur.transmits
+		prevMsgLen := cur.msgLen
+		prevID := cur.id
+
+		keepGoing := f(cur)
+
+		if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id {
+			panic("edited queue while walking read only")
+		}
+
+		return keepGoing
+	}
+
+	if reverse {
+		q.tq.Descend(iter) // end with transmit 0
+	} else {
+		q.tq.Ascend(iter) // start with transmit 0
+	}
 }
-type limitedBroadcasts []*limitedBroadcast
 
 // Broadcast is something that can be broadcasted via gossip to
 // the memberlist cluster.
@@ -45,123 +125,298 @@ type Broadcast interface {
 	Finished()
 }
 
+// NamedBroadcast is an optional extension of the Broadcast interface that
+// gives each message a unique string name, and that is used to optimize
+//
+// You shoud ensure that Invalidates() checks the same uniqueness as the
+// example below:
+//
+// func (b *foo) Invalidates(other Broadcast) bool {
+// 	nb, ok := other.(NamedBroadcast)
+// 	if !ok {
+// 		return false
+// 	}
+// 	return b.Name() == nb.Name()
+// }
+//
+// Invalidates() isn't currently used for NamedBroadcasts, but that may change
+// in the future.
+type NamedBroadcast interface {
+	Broadcast
+	// The unique identity of this broadcast message.
+	Name() string
+}
+
+// UniqueBroadcast is an optional interface that indicates that each message is
+// intrinsically unique and there is no need to scan the broadcast queue for
+// duplicates.
+//
+// You should ensure that Invalidates() always returns false if implementing
+// this interface. Invalidates() isn't currently used for UniqueBroadcasts, but
+// that may change in the future.
+type UniqueBroadcast interface {
+	Broadcast
+	// UniqueBroadcast is just a marker method for this interface.
+	UniqueBroadcast()
+}
+
 // QueueBroadcast is used to enqueue a broadcast
 func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
-	q.Lock()
-	defer q.Unlock()
-
-	// Check if this message invalidates another
-	n := len(q.bcQueue)
-	for i := 0; i < n; i++ {
-		if b.Invalidates(q.bcQueue[i].b) {
-			q.bcQueue[i].b.Finished()
-			copy(q.bcQueue[i:], q.bcQueue[i+1:])
-			q.bcQueue[n-1] = nil
-			q.bcQueue = q.bcQueue[:n-1]
-			n--
+	q.queueBroadcast(b, 0)
+}
+
+// lazyInit initializes internal data structures the first time they are
+// needed.  You must already hold the mutex.
+func (q *TransmitLimitedQueue) lazyInit() {
+	if q.tq == nil {
+		q.tq = btree.New(32)
+	}
+	if q.tm == nil {
+		q.tm = make(map[string]*limitedBroadcast)
+	}
+}
+
+// queueBroadcast is like QueueBroadcast but you can use a nonzero value for
+// the initial transmit tier assigned to the message. This is meant to be used
+// for unit testing.
+func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) {
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	q.lazyInit()
+
+	if q.idGen == math.MaxInt64 {
+		// it's super duper unlikely to wrap around within the retransmit limit
+		q.idGen = 1
+	} else {
+		q.idGen++
+	}
+	id := q.idGen
+
+	lb := &limitedBroadcast{
+		transmits: initialTransmits,
+		msgLen:    int64(len(b.Message())),
+		id:        id,
+		b:         b,
+	}
+	unique := false
+	if nb, ok := b.(NamedBroadcast); ok {
+		lb.name = nb.Name()
+	} else if _, ok := b.(UniqueBroadcast); ok {
+		unique = true
+	}
+
+	// Check if this message invalidates another.
+	if lb.name != "" {
+		if old, ok := q.tm[lb.name]; ok {
+			old.b.Finished()
+			q.deleteItem(old)
+		}
+	} else if !unique {
+		// Slow path, hopefully nothing hot hits this.
+		var remove []*limitedBroadcast
+		q.tq.Ascend(func(item btree.Item) bool {
+			cur := item.(*limitedBroadcast)
+
+			// Special Broadcasts can only invalidate each other.
+			switch cur.b.(type) {
+			case NamedBroadcast:
+				// noop
+			case UniqueBroadcast:
+				// noop
+			default:
+				if b.Invalidates(cur.b) {
+					cur.b.Finished()
+					remove = append(remove, cur)
+				}
+			}
+			return true
+		})
+		for _, cur := range remove {
+			q.deleteItem(cur)
 		}
 	}
 
-	// Append to the queue
-	q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
+	// Append to the relevant queue.
+	q.addItem(lb)
+}
+
+// deleteItem removes the given item from the overall datastructure. You
+// must already hold the mutex.
+func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
+	_ = q.tq.Delete(cur)
+	if cur.name != "" {
+		delete(q.tm, cur.name)
+	}
+
+	if q.tq.Len() == 0 {
+		// At idle there's no reason to let the id generator keep going
+		// indefinitely.
+		q.idGen = 0
+	}
+}
+
+// addItem adds the given item into the overall datastructure. You must already
+// hold the mutex.
+func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) {
+	_ = q.tq.ReplaceOrInsert(cur)
+	if cur.name != "" {
+		q.tm[cur.name] = cur
+	}
+}
+
+// getTransmitRange returns a pair of min/max values for transmit values
+// represented by the current queue contents. Both values represent actual
+// transmit values on the interval [0, len). You must already hold the mutex.
+func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) {
+	if q.lenLocked() == 0 {
+		return 0, 0
+	}
+	minItem, maxItem := q.tq.Min(), q.tq.Max()
+	if minItem == nil || maxItem == nil {
+		return 0, 0
+	}
+
+	min := minItem.(*limitedBroadcast).transmits
+	max := maxItem.(*limitedBroadcast).transmits
+
+	return min, max
 }
 
 // GetBroadcasts is used to get a number of broadcasts, up to a byte limit
 // and applying a per-message overhead as provided.
 func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
-	q.Lock()
-	defer q.Unlock()
+	q.mu.Lock()
+	defer q.mu.Unlock()
 
 	// Fast path the default case
-	if len(q.bcQueue) == 0 {
+	if q.lenLocked() == 0 {
 		return nil
 	}
 
 	transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
-	bytesUsed := 0
-	var toSend [][]byte
-
-	for i := len(q.bcQueue) - 1; i >= 0; i-- {
-		// Check if this is within our limits
-		b := q.bcQueue[i]
-		msg := b.b.Message()
-		if bytesUsed+overhead+len(msg) > limit {
+
+	var (
+		bytesUsed int
+		toSend    [][]byte
+		reinsert  []*limitedBroadcast
+	)
+
+	// Visit fresher items first, but only look at stuff that will fit.
+	// We'll go tier by tier, grabbing the largest items first.
+	minTr, maxTr := q.getTransmitRange()
+	for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ {
+		free := int64(limit - bytesUsed - overhead)
+		if free <= 0 {
+			break // bail out early
+		}
+
+		// Search for the least element on a given tier (by transmit count) as
+		// defined in the limitedBroadcast.Less function that will fit into our
+		// remaining space.
+		greaterOrEqual := &limitedBroadcast{
+			transmits: transmits,
+			msgLen:    free,
+			id:        math.MaxInt64,
+		}
+		lessThan := &limitedBroadcast{
+			transmits: transmits + 1,
+			msgLen:    math.MaxInt64,
+			id:        math.MaxInt64,
+		}
+		var keep *limitedBroadcast
+		q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool {
+			cur := item.(*limitedBroadcast)
+			// Check if this is within our limits
+			if int64(len(cur.b.Message())) > free {
+				// If this happens it's a bug in the datastructure or
+				// surrounding use doing something like having len(Message())
+				// change over time. There's enough going on here that it's
+				// probably sane to just skip it and move on for now.
+				return true
+			}
+			keep = cur
+			return false
+		})
+		if keep == nil {
+			// No more items of an appropriate size in the tier.
+			transmits++
 			continue
 		}
 
+		msg := keep.b.Message()
+
 		// Add to slice to send
 		bytesUsed += overhead + len(msg)
 		toSend = append(toSend, msg)
 
 		// Check if we should stop transmission
-		b.transmits++
-		if b.transmits >= transmitLimit {
-			b.b.Finished()
-			n := len(q.bcQueue)
-			q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
-			q.bcQueue = q.bcQueue[:n-1]
+		q.deleteItem(keep)
+		if keep.transmits+1 >= transmitLimit {
+			keep.b.Finished()
+		} else {
+			// We need to bump this item down to another transmit tier, but
+			// because it would be in the same direction that we're walking the
+			// tiers, we will have to delay the reinsertion until we are
+			// finished our search. Otherwise we'll possibly re-add the message
+			// when we ascend to the next tier.
+			keep.transmits++
+			reinsert = append(reinsert, keep)
 		}
 	}
 
-	// If we are sending anything, we need to re-sort to deal
-	// with adjusted transmit counts
-	if len(toSend) > 0 {
-		q.bcQueue.Sort()
+	for _, cur := range reinsert {
+		q.addItem(cur)
 	}
+
 	return toSend
 }
 
 // NumQueued returns the number of queued messages
 func (q *TransmitLimitedQueue) NumQueued() int {
-	q.Lock()
-	defer q.Unlock()
-	return len(q.bcQueue)
+	q.mu.Lock()
+	defer q.mu.Unlock()
+	return q.lenLocked()
 }
 
-// Reset clears all the queued messages
-func (q *TransmitLimitedQueue) Reset() {
-	q.Lock()
-	defer q.Unlock()
-	for _, b := range q.bcQueue {
-		b.b.Finished()
+// lenLocked returns the length of the overall queue datastructure. You must
+// hold the mutex.
+func (q *TransmitLimitedQueue) lenLocked() int {
+	if q.tq == nil {
+		return 0
 	}
-	q.bcQueue = nil
+	return q.tq.Len()
+}
+
+// Reset clears all the queued messages. Should only be used for tests.
+func (q *TransmitLimitedQueue) Reset() {
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool {
+		cur.b.Finished()
+		return true
+	})
+
+	q.tq = nil
+	q.tm = nil
+	q.idGen = 0
 }
 
 // Prune will retain the maxRetain latest messages, and the rest
 // will be discarded. This can be used to prevent unbounded queue sizes
 func (q *TransmitLimitedQueue) Prune(maxRetain int) {
-	q.Lock()
-	defer q.Unlock()
+	q.mu.Lock()
+	defer q.mu.Unlock()
 
 	// Do nothing if queue size is less than the limit
-	n := len(q.bcQueue)
-	if n < maxRetain {
-		return
-	}
-
-	// Invalidate the messages we will be removing
-	for i := 0; i < n-maxRetain; i++ {
-		q.bcQueue[i].b.Finished()
+	for q.tq.Len() > maxRetain {
+		item := q.tq.Max()
+		if item == nil {
+			break
+		}
+		cur := item.(*limitedBroadcast)
+		cur.b.Finished()
+		q.deleteItem(cur)
 	}
-
-	// Move the messages, and retain only the last maxRetain
-	copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
-	q.bcQueue = q.bcQueue[:maxRetain]
-}
-
-func (b limitedBroadcasts) Len() int {
-	return len(b)
-}
-
-func (b limitedBroadcasts) Less(i, j int) bool {
-	return b[i].transmits < b[j].transmits
-}
-
-func (b limitedBroadcasts) Swap(i, j int) {
-	b[i], b[j] = b[j], b[i]
-}
-
-func (b limitedBroadcasts) Sort() {
-	sort.Sort(sort.Reverse(b))
 }

+ 4 - 1
vendor/github.com/hashicorp/memberlist/security.go

@@ -106,7 +106,10 @@ func encryptPayload(vsn encryptionVersion, key []byte, msg []byte, data []byte,
 	dst.WriteByte(byte(vsn))
 
 	// Add a random nonce
-	io.CopyN(dst, rand.Reader, nonceSize)
+	_, err = io.CopyN(dst, rand.Reader, nonceSize)
+	if err != nil {
+		return err
+	}
 	afterNonce := dst.Len()
 
 	// Ensure we are correctly padded (only version 0)

+ 226 - 79
vendor/github.com/hashicorp/memberlist/state.go

@@ -6,32 +6,35 @@ import (
 	"math"
 	"math/rand"
 	"net"
+	"strings"
 	"sync/atomic"
 	"time"
 
-	"github.com/armon/go-metrics"
+	metrics "github.com/armon/go-metrics"
 )
 
-type nodeStateType int
+type NodeStateType int
 
 const (
-	stateAlive nodeStateType = iota
-	stateSuspect
-	stateDead
+	StateAlive NodeStateType = iota
+	StateSuspect
+	StateDead
+	StateLeft
 )
 
 // Node represents a node in the cluster.
 type Node struct {
-	Name string
-	Addr net.IP
-	Port uint16
-	Meta []byte // Metadata from the delegate for this node.
-	PMin uint8  // Minimum protocol version this understands
-	PMax uint8  // Maximum protocol version this understands
-	PCur uint8  // Current version node is speaking
-	DMin uint8  // Min protocol version for the delegate to understand
-	DMax uint8  // Max protocol version for the delegate to understand
-	DCur uint8  // Current version delegate is speaking
+	Name  string
+	Addr  net.IP
+	Port  uint16
+	Meta  []byte        // Metadata from the delegate for this node.
+	State NodeStateType // State of the node.
+	PMin  uint8         // Minimum protocol version this understands
+	PMax  uint8         // Maximum protocol version this understands
+	PCur  uint8         // Current version node is speaking
+	DMin  uint8         // Min protocol version for the delegate to understand
+	DMax  uint8         // Max protocol version for the delegate to understand
+	DCur  uint8         // Current version delegate is speaking
 }
 
 // Address returns the host:port form of a node's address, suitable for use
@@ -40,6 +43,15 @@ func (n *Node) Address() string {
 	return joinHostPort(n.Addr.String(), n.Port)
 }
 
+// FullAddress returns the node name and host:port form of a node's address,
+// suitable for use with a transport.
+func (n *Node) FullAddress() Address {
+	return Address{
+		Addr: joinHostPort(n.Addr.String(), n.Port),
+		Name: n.Name,
+	}
+}
+
 // String returns the node name
 func (n *Node) String() string {
 	return n.Name
@@ -49,7 +61,7 @@ func (n *Node) String() string {
 type nodeState struct {
 	Node
 	Incarnation uint32        // Last known incarnation number
-	State       nodeStateType // Current state
+	State       NodeStateType // Current state
 	StateChange time.Time     // Time last state change happened
 }
 
@@ -59,6 +71,16 @@ func (n *nodeState) Address() string {
 	return n.Node.Address()
 }
 
+// FullAddress returns the node name and host:port form of a node's address,
+// suitable for use with a transport.
+func (n *nodeState) FullAddress() Address {
+	return n.Node.FullAddress()
+}
+
+func (n *nodeState) DeadOrLeft() bool {
+	return n.State == StateDead || n.State == StateLeft
+}
+
 // ackHandler is used to register handlers for incoming acks and nacks.
 type ackHandler struct {
 	ackFn  func([]byte, time.Time)
@@ -217,7 +239,7 @@ START:
 	node = *m.nodes[m.probeIndex]
 	if node.Name == m.config.Name {
 		skip = true
-	} else if node.State == stateDead {
+	} else if node.DeadOrLeft() {
 		skip = true
 	}
 
@@ -233,6 +255,30 @@ START:
 	m.probeNode(&node)
 }
 
+// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests)
+func (m *Memberlist) probeNodeByAddr(addr string) {
+	m.nodeLock.RLock()
+	n := m.nodeMap[addr]
+	m.nodeLock.RUnlock()
+
+	m.probeNode(n)
+}
+
+// failedRemote checks the error and decides if it indicates a failure on the
+// other end.
+func failedRemote(err error) bool {
+	switch t := err.(type) {
+	case *net.OpError:
+		if strings.HasPrefix(t.Net, "tcp") {
+			switch t.Op {
+			case "dial", "read", "write":
+				return true
+			}
+		}
+	}
+	return false
+}
+
 // probeNode handles a single round of failure checking on a node.
 func (m *Memberlist) probeNode(node *nodeState) {
 	defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
@@ -246,7 +292,14 @@ func (m *Memberlist) probeNode(node *nodeState) {
 	}
 
 	// Prepare a ping message and setup an ack handler.
-	ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
+	selfAddr, selfPort := m.getAdvertise()
+	ping := ping{
+		SeqNo:      m.nextSeqNo(),
+		Node:       node.Name,
+		SourceAddr: selfAddr,
+		SourcePort: selfPort,
+		SourceNode: m.config.Name,
+	}
 	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
 	nackCh := make(chan struct{}, m.config.IndirectChecks+1)
 	m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
@@ -263,10 +316,20 @@ func (m *Memberlist) probeNode(node *nodeState) {
 	// soon as possible.
 	deadline := sent.Add(probeInterval)
 	addr := node.Address()
-	if node.State == stateAlive {
-		if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
+
+	// Arrange for our self-awareness to get updated.
+	var awarenessDelta int
+	defer func() {
+		m.awareness.ApplyDelta(awarenessDelta)
+	}()
+	if node.State == StateAlive {
+		if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
 			m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
-			return
+			if failedRemote(err) {
+				goto HANDLE_REMOTE_FAILURE
+			} else {
+				return
+			}
 		}
 	} else {
 		var msgs [][]byte
@@ -285,9 +348,13 @@ func (m *Memberlist) probeNode(node *nodeState) {
 		}
 
 		compound := makeCompoundMessage(msgs)
-		if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
+		if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
 			m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
-			return
+			if failedRemote(err) {
+				goto HANDLE_REMOTE_FAILURE
+			} else {
+				return
+			}
 		}
 	}
 
@@ -296,10 +363,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
 	// which will improve our health until we get to the failure scenarios
 	// at the end of this function, which will alter this delta variable
 	// accordingly.
-	awarenessDelta := -1
-	defer func() {
-		m.awareness.ApplyDelta(awarenessDelta)
-	}()
+	awarenessDelta = -1
 
 	// Wait for response or round-trip-time.
 	select {
@@ -324,21 +388,31 @@ func (m *Memberlist) probeNode(node *nodeState) {
 		// probe interval it will give the TCP fallback more time, which
 		// is more active in dealing with lost packets, and it gives more
 		// time to wait for indirect acks/nacks.
-		m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
+		m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
 	}
 
+HANDLE_REMOTE_FAILURE:
 	// Get some random live nodes.
 	m.nodeLock.RLock()
 	kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
 		return n.Name == m.config.Name ||
 			n.Name == node.Name ||
-			n.State != stateAlive
+			n.State != StateAlive
 	})
 	m.nodeLock.RUnlock()
 
 	// Attempt an indirect ping.
 	expectedNacks := 0
-	ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
+	selfAddr, selfPort = m.getAdvertise()
+	ind := indirectPingReq{
+		SeqNo:      ping.SeqNo,
+		Target:     node.Addr,
+		Port:       node.Port,
+		Node:       node.Name,
+		SourceAddr: selfAddr,
+		SourcePort: selfPort,
+		SourceNode: m.config.Name,
+	}
 	for _, peer := range kNodes {
 		// We only expect nack to be sent from peers who understand
 		// version 4 of the protocol.
@@ -346,7 +420,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
 			expectedNacks++
 		}
 
-		if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
+		if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
 			m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
 		}
 	}
@@ -362,10 +436,13 @@ func (m *Memberlist) probeNode(node *nodeState) {
 	// which protocol version we are speaking. That's why we've included a
 	// config option to turn this off if desired.
 	fallbackCh := make(chan bool, 1)
-	if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
+
+	disableTcpPings := m.config.DisableTcpPings ||
+		(m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
+	if (!disableTcpPings) && (node.PMax >= 3) {
 		go func() {
 			defer close(fallbackCh)
-			didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
+			didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
 			if err != nil {
 				m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
 			} else {
@@ -422,12 +499,21 @@ func (m *Memberlist) probeNode(node *nodeState) {
 // Ping initiates a ping to the node with the specified name.
 func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
 	// Prepare a ping message and setup an ack handler.
-	ping := ping{SeqNo: m.nextSeqNo(), Node: node}
+	selfAddr, selfPort := m.getAdvertise()
+	ping := ping{
+		SeqNo:      m.nextSeqNo(),
+		Node:       node,
+		SourceAddr: selfAddr,
+		SourcePort: selfPort,
+		SourceNode: m.config.Name,
+	}
 	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
 	m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
 
+	a := Address{Addr: addr.String(), Name: node}
+
 	// Send a ping to the node.
-	if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
+	if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil {
 		return 0, err
 	}
 
@@ -488,10 +574,10 @@ func (m *Memberlist) gossip() {
 		}
 
 		switch n.State {
-		case stateAlive, stateSuspect:
+		case StateAlive, StateSuspect:
 			return false
 
-		case stateDead:
+		case StateDead:
 			return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
 
 		default:
@@ -516,13 +602,13 @@ func (m *Memberlist) gossip() {
 		addr := node.Address()
 		if len(msgs) == 1 {
 			// Send single message as is
-			if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
+			if err := m.rawSendMsgPacket(node.FullAddress(), &node, msgs[0]); err != nil {
 				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
 			}
 		} else {
 			// Otherwise create and send a compound message
 			compound := makeCompoundMessage(msgs)
-			if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
+			if err := m.rawSendMsgPacket(node.FullAddress(), &node, compound.Bytes()); err != nil {
 				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
 			}
 		}
@@ -538,7 +624,7 @@ func (m *Memberlist) pushPull() {
 	m.nodeLock.RLock()
 	nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
 		return n.Name == m.config.Name ||
-			n.State != stateAlive
+			n.State != StateAlive
 	})
 	m.nodeLock.RUnlock()
 
@@ -549,17 +635,17 @@ func (m *Memberlist) pushPull() {
 	node := nodes[0]
 
 	// Attempt a push pull
-	if err := m.pushPullNode(node.Address(), false); err != nil {
+	if err := m.pushPullNode(node.FullAddress(), false); err != nil {
 		m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
 	}
 }
 
 // pushPullNode does a complete state exchange with a specific node.
-func (m *Memberlist) pushPullNode(addr string, join bool) error {
+func (m *Memberlist) pushPullNode(a Address, join bool) error {
 	defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
 
 	// Attempt to send and receive with the node
-	remote, userState, err := m.sendAndReceiveState(addr, join)
+	remote, userState, err := m.sendAndReceiveState(a, join)
 	if err != nil {
 		return err
 	}
@@ -596,7 +682,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
 
 	for _, rn := range remote {
 		// If the node isn't alive, then skip it
-		if rn.State != stateAlive {
+		if rn.State != StateAlive {
 			continue
 		}
 
@@ -625,7 +711,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
 
 	for _, n := range m.nodes {
 		// Ignore non-alive nodes
-		if n.State != stateAlive {
+		if n.State != StateAlive {
 			continue
 		}
 
@@ -841,11 +927,26 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 		return
 	}
 
+	if len(a.Vsn) >= 3 {
+		pMin := a.Vsn[0]
+		pMax := a.Vsn[1]
+		pCur := a.Vsn[2]
+		if pMin == 0 || pMax == 0 || pMin > pMax {
+			m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
+			return
+		}
+	}
+
 	// Invoke the Alive delegate if any. This can be used to filter out
 	// alive messages based on custom logic. For example, using a cluster name.
 	// Using a merge delegate is not enough, as it is possible for passive
 	// cluster merging to still occur.
 	if m.config.Alive != nil {
+		if len(a.Vsn) < 6 {
+			m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
+				a.Node, net.IP(a.Addr), a.Port)
+			return
+		}
 		node := &Node{
 			Name: a.Node,
 			Addr: a.Addr,
@@ -867,7 +968,13 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 
 	// Check if we've never seen this node before, and if not, then
 	// store this node in our node map.
+	var updatesNode bool
 	if !ok {
+		errCon := m.config.IPAllowed(a.Addr)
+		if errCon != nil {
+			m.logger.Printf("[WARN] memberlist: Rejected node %s (%v): %s", a.Node, net.IP(a.Addr), errCon)
+			return
+		}
 		state = &nodeState{
 			Node: Node{
 				Name: a.Node,
@@ -875,7 +982,15 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 				Port: a.Port,
 				Meta: a.Meta,
 			},
-			State: stateDead,
+			State: StateDead,
+		}
+		if len(a.Vsn) > 5 {
+			state.PMin = a.Vsn[0]
+			state.PMax = a.Vsn[1]
+			state.PCur = a.Vsn[2]
+			state.DMin = a.Vsn[3]
+			state.DMax = a.Vsn[4]
+			state.DCur = a.Vsn[5]
 		}
 
 		// Add to map
@@ -894,29 +1009,45 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 
 		// Update numNodes after we've added a new node
 		atomic.AddUint32(&m.numNodes, 1)
-	}
-
-	// Check if this address is different than the existing node
-	if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
-		m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
-			state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
-
-		// Inform the conflict delegate if provided
-		if m.config.Conflict != nil {
-			other := Node{
-				Name: a.Node,
-				Addr: a.Addr,
-				Port: a.Port,
-				Meta: a.Meta,
+	} else {
+		// Check if this address is different than the existing node unless the old node is dead.
+		if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
+			errCon := m.config.IPAllowed(a.Addr)
+			if errCon != nil {
+				m.logger.Printf("[WARN] memberlist: Rejected IP update from %v to %v for node %s: %s", a.Node, state.Addr, net.IP(a.Addr), errCon)
+				return
+			}
+			// If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died.
+			canReclaim := (m.config.DeadNodeReclaimTime > 0 &&
+				time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)
+
+			// Allow the address to be updated if a dead node is being replaced.
+			if state.State == StateLeft || (state.State == StateDead && canReclaim) {
+				m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
+					state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
+				updatesNode = true
+			} else {
+				m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v",
+					state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State)
+
+				// Inform the conflict delegate if provided
+				if m.config.Conflict != nil {
+					other := Node{
+						Name: a.Node,
+						Addr: a.Addr,
+						Port: a.Port,
+						Meta: a.Meta,
+					}
+					m.config.Conflict.NotifyConflict(&state.Node, &other)
+				}
+				return
 			}
-			m.config.Conflict.NotifyConflict(&state.Node, &other)
 		}
-		return
 	}
 
 	// Bail if the incarnation number is older, and this is not about us
 	isLocalNode := state.Name == m.config.Name
-	if a.Incarnation <= state.Incarnation && !isLocalNode {
+	if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode {
 		return
 	}
 
@@ -956,9 +1087,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 			bytes.Equal(a.Vsn, versions) {
 			return
 		}
-
 		m.refute(state, a.Incarnation)
-		m.logger.Printf("[WARN] memberlist: Refuting an alive message")
+		m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
 	} else {
 		m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
 
@@ -975,8 +1105,10 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 		// Update the state and incarnation number
 		state.Incarnation = a.Incarnation
 		state.Meta = a.Meta
-		if state.State != stateAlive {
-			state.State = stateAlive
+		state.Addr = a.Addr
+		state.Port = a.Port
+		if state.State != StateAlive {
+			state.State = StateAlive
 			state.StateChange = time.Now()
 		}
 	}
@@ -986,8 +1118,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
 
 	// Notify the delegate of any relevant updates
 	if m.config.Events != nil {
-		if oldState == stateDead {
-			// if Dead -> Alive, notify of join
+		if oldState == StateDead || oldState == StateLeft {
+			// if Dead/Left -> Alive, notify of join
 			m.config.Events.NotifyJoin(&state.Node)
 
 		} else if !bytes.Equal(oldMeta, state.Meta) {
@@ -1026,7 +1158,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
 	}
 
 	// Ignore non-alive nodes
-	if state.State != stateAlive {
+	if state.State != StateAlive {
 		return
 	}
 
@@ -1044,7 +1176,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
 
 	// Update the state
 	state.Incarnation = s.Incarnation
-	state.State = stateSuspect
+	state.State = StateSuspect
 	changeTime := time.Now()
 	state.StateChange = changeTime
 
@@ -1066,9 +1198,14 @@ func (m *Memberlist) suspectNode(s *suspect) {
 	min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
 	max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
 	fn := func(numConfirmations int) {
+		var d *dead
+
 		m.nodeLock.Lock()
 		state, ok := m.nodeMap[s.Node]
-		timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
+		timeout := ok && state.State == StateSuspect && state.StateChange == changeTime
+		if timeout {
+			d = &dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
+		}
 		m.nodeLock.Unlock()
 
 		if timeout {
@@ -1078,8 +1215,8 @@ func (m *Memberlist) suspectNode(s *suspect) {
 
 			m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
 				state.Name, numConfirmations)
-			d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
-			m.deadNode(&d)
+
+			m.deadNode(d)
 		}
 	}
 	m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
@@ -1106,7 +1243,7 @@ func (m *Memberlist) deadNode(d *dead) {
 	delete(m.nodeTimers, d.Node)
 
 	// Ignore if node is already dead
-	if state.State == stateDead {
+	if state.DeadOrLeft() {
 		return
 	}
 
@@ -1130,7 +1267,14 @@ func (m *Memberlist) deadNode(d *dead) {
 
 	// Update the state
 	state.Incarnation = d.Incarnation
-	state.State = stateDead
+
+	// If the dead message was send by the node itself, mark it is left
+	// instead of dead.
+	if d.Node == d.From {
+		state.State = StateLeft
+	} else {
+		state.State = StateDead
+	}
 	state.StateChange = time.Now()
 
 	// Notify of death
@@ -1144,7 +1288,7 @@ func (m *Memberlist) deadNode(d *dead) {
 func (m *Memberlist) mergeState(remote []pushNodeState) {
 	for _, r := range remote {
 		switch r.State {
-		case stateAlive:
+		case StateAlive:
 			a := alive{
 				Incarnation: r.Incarnation,
 				Node:        r.Name,
@@ -1155,11 +1299,14 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
 			}
 			m.aliveNode(&a, nil, false)
 
-		case stateDead:
+		case StateLeft:
+			d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
+			m.deadNode(&d)
+		case StateDead:
 			// If the remote node believes a node is dead, we prefer to
 			// suspect that node instead of declaring it dead instantly
 			fallthrough
-		case stateSuspect:
+		case StateSuspect:
 			s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
 			m.suspectNode(&s)
 		}

+ 48 - 0
vendor/github.com/hashicorp/memberlist/transport.go

@@ -1,6 +1,7 @@
 package memberlist
 
 import (
+	"fmt"
 	"net"
 	"time"
 )
@@ -63,3 +64,50 @@ type Transport interface {
 	// transport a chance to clean up any listeners.
 	Shutdown() error
 }
+
+type Address struct {
+	// Addr is a network address as a string, similar to Dial. This usually is
+	// in the form of "host:port". This is required.
+	Addr string
+
+	// Name is the name of the node being addressed. This is optional but
+	// transports may require it.
+	Name string
+}
+
+func (a *Address) String() string {
+	if a.Name != "" {
+		return fmt.Sprintf("%s (%s)", a.Name, a.Addr)
+	}
+	return a.Addr
+}
+
+// IngestionAwareTransport is not used.
+//
+// Deprecated: IngestionAwareTransport is not used and may be removed in a future
+// version. Define the interface locally instead of referencing this exported
+// interface.
+type IngestionAwareTransport interface {
+	IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
+	IngestStream(conn net.Conn) error
+}
+
+type NodeAwareTransport interface {
+	Transport
+	WriteToAddress(b []byte, addr Address) (time.Time, error)
+	DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
+}
+
+type shimNodeAwareTransport struct {
+	Transport
+}
+
+var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
+
+func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
+	return t.WriteTo(b, addr.Addr)
+}
+
+func (t *shimNodeAwareTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) {
+	return t.DialTimeout(addr.Addr, timeout)
+}

+ 16 - 17
vendor/github.com/hashicorp/memberlist/util.go

@@ -78,10 +78,9 @@ func retransmitLimit(retransmitMult, n int) int {
 // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle
 func shuffleNodes(nodes []*nodeState) {
 	n := len(nodes)
-	for i := n - 1; i > 0; i-- {
-		j := rand.Intn(i + 1)
+	rand.Shuffle(n, func(i, j int) {
 		nodes[i], nodes[j] = nodes[j], nodes[i]
-	}
+	})
 }
 
 // pushPushScale is used to scale the time interval at which push/pull
@@ -103,7 +102,7 @@ func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int {
 	numDead := 0
 	n := len(nodes)
 	for i := 0; i < n-numDead; i++ {
-		if nodes[i].State != stateDead {
+		if nodes[i].State != StateDead {
 			continue
 		}
 
@@ -120,35 +119,35 @@ func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int {
 	return n - numDead
 }
 
-// kRandomNodes is used to select up to k random nodes, excluding any nodes where
-// the filter function returns true. It is possible that less than k nodes are
+// kRandomNodes is used to select up to k random Nodes, excluding any nodes where
+// the exclude function returns true. It is possible that less than k nodes are
 // returned.
-func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
+func kRandomNodes(k int, nodes []*nodeState, exclude func(*nodeState) bool) []Node {
 	n := len(nodes)
-	kNodes := make([]*nodeState, 0, k)
+	kNodes := make([]Node, 0, k)
 OUTER:
 	// Probe up to 3*n times, with large n this is not necessary
 	// since k << n, but with small n we want search to be
 	// exhaustive
 	for i := 0; i < 3*n && len(kNodes) < k; i++ {
-		// Get random node
+		// Get random nodeState
 		idx := randomOffset(n)
-		node := nodes[idx]
+		state := nodes[idx]
 
 		// Give the filter a shot at it.
-		if filterFn != nil && filterFn(node) {
+		if exclude != nil && exclude(state) {
 			continue OUTER
 		}
 
 		// Check if we have this node already
 		for j := 0; j < len(kNodes); j++ {
-			if node == kNodes[j] {
+			if state.Node.Name == kNodes[j].Name {
 				continue OUTER
 			}
 		}
 
 		// Append the node
-		kNodes = append(kNodes, node)
+		kNodes = append(kNodes, state.Node)
 	}
 	return kNodes
 }
@@ -186,18 +185,18 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
 		err = fmt.Errorf("missing compound length byte")
 		return
 	}
-	numParts := uint8(buf[0])
+	numParts := int(buf[0])
 	buf = buf[1:]
 
 	// Check we have enough bytes
-	if len(buf) < int(numParts*2) {
+	if len(buf) < numParts*2 {
 		err = fmt.Errorf("truncated len slice")
 		return
 	}
 
 	// Decode the lengths
 	lengths := make([]uint16, numParts)
-	for i := 0; i < int(numParts); i++ {
+	for i := 0; i < numParts; i++ {
 		lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
 	}
 	buf = buf[numParts*2:]
@@ -205,7 +204,7 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
 	// Split each message
 	for idx, msgLen := range lengths {
 		if len(buf) < int(msgLen) {
-			trunc = int(numParts) - idx
+			trunc = numParts - idx
 			return
 		}