diff --git a/vendor.conf b/vendor.conf index 0eddd2741a..25186592a7 100644 --- a/vendor.conf +++ b/vendor.conf @@ -49,7 +49,7 @@ github.com/docker/go-events e31b211e4f1cd09aa76fe4ac2445 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b -github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c +github.com/hashicorp/memberlist 619135cdd9e5dda8c12f8ceef39bdade4f5899b6 # v0.2.4 github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372 github.com/hashicorp/errwrap 8a6fb523712970c966eefc6b39ed2c5e74880354 # v1.0.0 github.com/hashicorp/go-sockaddr c7188e74f6acae5a989bdc959aa779f8b9f42faf # v1.0.2 @@ -59,6 +59,7 @@ github.com/docker/libkv 458977154600b9f23984d9f4b82e github.com/vishvananda/netns db3c7e526aae966c4ccfa6c8189b693d6ac5d202 github.com/vishvananda/netlink f049be6f391489d3f374498fe0c8df8449258372 # v1.1.0 github.com/moby/ipvs 4566ccea0e08d68e9614c3e7a64a23b850c4bb35 # v1.0.1 +github.com/google/btree 479b5e81b0a93ec038d201b0b33d17db599531d3 # v1.0.1 github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374 github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d diff --git a/vendor/github.com/google/btree/LICENSE b/vendor/github.com/google/btree/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/vendor/github.com/google/btree/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/google/btree/README.md b/vendor/github.com/google/btree/README.md new file mode 100644 index 0000000000..6062a4dacd --- /dev/null +++ b/vendor/github.com/google/btree/README.md @@ -0,0 +1,12 @@ +# BTree implementation for Go + +![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master) + +This package provides an in-memory B-Tree implementation for Go, useful as +an ordered, mutable data structure. + +The API is based off of the wonderful +http://godoc.org/github.com/petar/GoLLRB/llrb, and is meant to allow btree to +act as a drop-in replacement for gollrb trees. + +See http://godoc.org/github.com/google/btree for documentation. diff --git a/vendor/github.com/google/btree/btree.go b/vendor/github.com/google/btree/btree.go new file mode 100644 index 0000000000..b83acdbc6d --- /dev/null +++ b/vendor/github.com/google/btree/btree.go @@ -0,0 +1,890 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package btree implements in-memory B-Trees of arbitrary degree. +// +// btree implements an in-memory B-Tree for use as an ordered data structure. +// It is not meant for persistent storage solutions. +// +// It has a flatter structure than an equivalent red-black or other binary tree, +// which in some cases yields better memory usage and/or performance. +// See some discussion on the matter here: +// http://google-opensource.blogspot.com/2013/01/c-containers-that-save-memory-and-time.html +// Note, though, that this project is in no way related to the C++ B-Tree +// implementation written about there. +// +// Within this tree, each node contains a slice of items and a (possibly nil) +// slice of children. For basic numeric values or raw structs, this can cause +// efficiency differences when compared to equivalent C++ template code that +// stores values in arrays within the node: +// * Due to the overhead of storing values as interfaces (each +// value needs to be stored as the value itself, then 2 words for the +// interface pointing to that value and its type), resulting in higher +// memory use. +// * Since interfaces can point to values anywhere in memory, values are +// most likely not stored in contiguous blocks, resulting in a higher +// number of cache misses. +// These issues don't tend to matter, though, when working with strings or other +// heap-allocated structures, since C++-equivalent structures also must store +// pointers and also distribute their values across the heap. +// +// This implementation is designed to be a drop-in replacement to gollrb.LLRB +// trees, (http://github.com/petar/gollrb), an excellent and probably the most +// widely used ordered tree implementation in the Go ecosystem currently. +// Its functions, therefore, exactly mirror those of +// llrb.LLRB where possible. Unlike gollrb, though, we currently don't +// support storing multiple equivalent values. +package btree + +import ( + "fmt" + "io" + "sort" + "strings" + "sync" +) + +// Item represents a single object in the tree. +type Item interface { + // Less tests whether the current item is less than the given argument. + // + // This must provide a strict weak ordering. + // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only + // hold one of either a or b in the tree). + Less(than Item) bool +} + +const ( + DefaultFreeListSize = 32 +) + +var ( + nilItems = make(items, 16) + nilChildren = make(children, 16) +) + +// FreeList represents a free list of btree nodes. By default each +// BTree has its own FreeList, but multiple BTrees can share the same +// FreeList. +// Two Btrees using the same freelist are safe for concurrent write access. +type FreeList struct { + mu sync.Mutex + freelist []*node +} + +// NewFreeList creates a new free list. +// size is the maximum size of the returned free list. +func NewFreeList(size int) *FreeList { + return &FreeList{freelist: make([]*node, 0, size)} +} + +func (f *FreeList) newNode() (n *node) { + f.mu.Lock() + index := len(f.freelist) - 1 + if index < 0 { + f.mu.Unlock() + return new(node) + } + n = f.freelist[index] + f.freelist[index] = nil + f.freelist = f.freelist[:index] + f.mu.Unlock() + return +} + +// freeNode adds the given node to the list, returning true if it was added +// and false if it was discarded. +func (f *FreeList) freeNode(n *node) (out bool) { + f.mu.Lock() + if len(f.freelist) < cap(f.freelist) { + f.freelist = append(f.freelist, n) + out = true + } + f.mu.Unlock() + return +} + +// ItemIterator allows callers of Ascend* to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend* function will immediately return. +type ItemIterator func(i Item) bool + +// New creates a new B-Tree with the given degree. +// +// New(2), for example, will create a 2-3-4 tree (each node contains 1-3 items +// and 2-4 children). +func New(degree int) *BTree { + return NewWithFreeList(degree, NewFreeList(DefaultFreeListSize)) +} + +// NewWithFreeList creates a new B-Tree that uses the given node free list. +func NewWithFreeList(degree int, f *FreeList) *BTree { + if degree <= 1 { + panic("bad degree") + } + return &BTree{ + degree: degree, + cow: ©OnWriteContext{freelist: f}, + } +} + +// items stores items in a node. +type items []Item + +// insertAt inserts a value into the given index, pushing all subsequent values +// forward. +func (s *items) insertAt(index int, item Item) { + *s = append(*s, nil) + if index < len(*s) { + copy((*s)[index+1:], (*s)[index:]) + } + (*s)[index] = item +} + +// removeAt removes a value at a given index, pulling all subsequent values +// back. +func (s *items) removeAt(index int) Item { + item := (*s)[index] + copy((*s)[index:], (*s)[index+1:]) + (*s)[len(*s)-1] = nil + *s = (*s)[:len(*s)-1] + return item +} + +// pop removes and returns the last element in the list. +func (s *items) pop() (out Item) { + index := len(*s) - 1 + out = (*s)[index] + (*s)[index] = nil + *s = (*s)[:index] + return +} + +// truncate truncates this instance at index so that it contains only the +// first index items. index must be less than or equal to length. +func (s *items) truncate(index int) { + var toClear items + *s, toClear = (*s)[:index], (*s)[index:] + for len(toClear) > 0 { + toClear = toClear[copy(toClear, nilItems):] + } +} + +// find returns the index where the given item should be inserted into this +// list. 'found' is true if the item already exists in the list at the given +// index. +func (s items) find(item Item) (index int, found bool) { + i := sort.Search(len(s), func(i int) bool { + return item.Less(s[i]) + }) + if i > 0 && !s[i-1].Less(item) { + return i - 1, true + } + return i, false +} + +// children stores child nodes in a node. +type children []*node + +// insertAt inserts a value into the given index, pushing all subsequent values +// forward. +func (s *children) insertAt(index int, n *node) { + *s = append(*s, nil) + if index < len(*s) { + copy((*s)[index+1:], (*s)[index:]) + } + (*s)[index] = n +} + +// removeAt removes a value at a given index, pulling all subsequent values +// back. +func (s *children) removeAt(index int) *node { + n := (*s)[index] + copy((*s)[index:], (*s)[index+1:]) + (*s)[len(*s)-1] = nil + *s = (*s)[:len(*s)-1] + return n +} + +// pop removes and returns the last element in the list. +func (s *children) pop() (out *node) { + index := len(*s) - 1 + out = (*s)[index] + (*s)[index] = nil + *s = (*s)[:index] + return +} + +// truncate truncates this instance at index so that it contains only the +// first index children. index must be less than or equal to length. +func (s *children) truncate(index int) { + var toClear children + *s, toClear = (*s)[:index], (*s)[index:] + for len(toClear) > 0 { + toClear = toClear[copy(toClear, nilChildren):] + } +} + +// node is an internal node in a tree. +// +// It must at all times maintain the invariant that either +// * len(children) == 0, len(items) unconstrained +// * len(children) == len(items) + 1 +type node struct { + items items + children children + cow *copyOnWriteContext +} + +func (n *node) mutableFor(cow *copyOnWriteContext) *node { + if n.cow == cow { + return n + } + out := cow.newNode() + if cap(out.items) >= len(n.items) { + out.items = out.items[:len(n.items)] + } else { + out.items = make(items, len(n.items), cap(n.items)) + } + copy(out.items, n.items) + // Copy children + if cap(out.children) >= len(n.children) { + out.children = out.children[:len(n.children)] + } else { + out.children = make(children, len(n.children), cap(n.children)) + } + copy(out.children, n.children) + return out +} + +func (n *node) mutableChild(i int) *node { + c := n.children[i].mutableFor(n.cow) + n.children[i] = c + return c +} + +// split splits the given node at the given index. The current node shrinks, +// and this function returns the item that existed at that index and a new node +// containing all items/children after it. +func (n *node) split(i int) (Item, *node) { + item := n.items[i] + next := n.cow.newNode() + next.items = append(next.items, n.items[i+1:]...) + n.items.truncate(i) + if len(n.children) > 0 { + next.children = append(next.children, n.children[i+1:]...) + n.children.truncate(i + 1) + } + return item, next +} + +// maybeSplitChild checks if a child should be split, and if so splits it. +// Returns whether or not a split occurred. +func (n *node) maybeSplitChild(i, maxItems int) bool { + if len(n.children[i].items) < maxItems { + return false + } + first := n.mutableChild(i) + item, second := first.split(maxItems / 2) + n.items.insertAt(i, item) + n.children.insertAt(i+1, second) + return true +} + +// insert inserts an item into the subtree rooted at this node, making sure +// no nodes in the subtree exceed maxItems items. Should an equivalent item be +// be found/replaced by insert, it will be returned. +func (n *node) insert(item Item, maxItems int) Item { + i, found := n.items.find(item) + if found { + out := n.items[i] + n.items[i] = item + return out + } + if len(n.children) == 0 { + n.items.insertAt(i, item) + return nil + } + if n.maybeSplitChild(i, maxItems) { + inTree := n.items[i] + switch { + case item.Less(inTree): + // no change, we want first split node + case inTree.Less(item): + i++ // we want second split node + default: + out := n.items[i] + n.items[i] = item + return out + } + } + return n.mutableChild(i).insert(item, maxItems) +} + +// get finds the given key in the subtree and returns it. +func (n *node) get(key Item) Item { + i, found := n.items.find(key) + if found { + return n.items[i] + } else if len(n.children) > 0 { + return n.children[i].get(key) + } + return nil +} + +// min returns the first item in the subtree. +func min(n *node) Item { + if n == nil { + return nil + } + for len(n.children) > 0 { + n = n.children[0] + } + if len(n.items) == 0 { + return nil + } + return n.items[0] +} + +// max returns the last item in the subtree. +func max(n *node) Item { + if n == nil { + return nil + } + for len(n.children) > 0 { + n = n.children[len(n.children)-1] + } + if len(n.items) == 0 { + return nil + } + return n.items[len(n.items)-1] +} + +// toRemove details what item to remove in a node.remove call. +type toRemove int + +const ( + removeItem toRemove = iota // removes the given item + removeMin // removes smallest item in the subtree + removeMax // removes largest item in the subtree +) + +// remove removes an item from the subtree rooted at this node. +func (n *node) remove(item Item, minItems int, typ toRemove) Item { + var i int + var found bool + switch typ { + case removeMax: + if len(n.children) == 0 { + return n.items.pop() + } + i = len(n.items) + case removeMin: + if len(n.children) == 0 { + return n.items.removeAt(0) + } + i = 0 + case removeItem: + i, found = n.items.find(item) + if len(n.children) == 0 { + if found { + return n.items.removeAt(i) + } + return nil + } + default: + panic("invalid type") + } + // If we get to here, we have children. + if len(n.children[i].items) <= minItems { + return n.growChildAndRemove(i, item, minItems, typ) + } + child := n.mutableChild(i) + // Either we had enough items to begin with, or we've done some + // merging/stealing, because we've got enough now and we're ready to return + // stuff. + if found { + // The item exists at index 'i', and the child we've selected can give us a + // predecessor, since if we've gotten here it's got > minItems items in it. + out := n.items[i] + // We use our special-case 'remove' call with typ=maxItem to pull the + // predecessor of item i (the rightmost leaf of our immediate left child) + // and set it into where we pulled the item from. + n.items[i] = child.remove(nil, minItems, removeMax) + return out + } + // Final recursive call. Once we're here, we know that the item isn't in this + // node and that the child is big enough to remove from. + return child.remove(item, minItems, typ) +} + +// growChildAndRemove grows child 'i' to make sure it's possible to remove an +// item from it while keeping it at minItems, then calls remove to actually +// remove it. +// +// Most documentation says we have to do two sets of special casing: +// 1) item is in this node +// 2) item is in child +// In both cases, we need to handle the two subcases: +// A) node has enough values that it can spare one +// B) node doesn't have enough values +// For the latter, we have to check: +// a) left sibling has node to spare +// b) right sibling has node to spare +// c) we must merge +// To simplify our code here, we handle cases #1 and #2 the same: +// If a node doesn't have enough items, we make sure it does (using a,b,c). +// We then simply redo our remove call, and the second time (regardless of +// whether we're in case 1 or 2), we'll have enough items and can guarantee +// that we hit case A. +func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) Item { + if i > 0 && len(n.children[i-1].items) > minItems { + // Steal from left child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i - 1) + stolenItem := stealFrom.items.pop() + child.items.insertAt(0, n.items[i-1]) + n.items[i-1] = stolenItem + if len(stealFrom.children) > 0 { + child.children.insertAt(0, stealFrom.children.pop()) + } + } else if i < len(n.items) && len(n.children[i+1].items) > minItems { + // steal from right child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i + 1) + stolenItem := stealFrom.items.removeAt(0) + child.items = append(child.items, n.items[i]) + n.items[i] = stolenItem + if len(stealFrom.children) > 0 { + child.children = append(child.children, stealFrom.children.removeAt(0)) + } + } else { + if i >= len(n.items) { + i-- + } + child := n.mutableChild(i) + // merge with right child + mergeItem := n.items.removeAt(i) + mergeChild := n.children.removeAt(i + 1) + child.items = append(child.items, mergeItem) + child.items = append(child.items, mergeChild.items...) + child.children = append(child.children, mergeChild.children...) + n.cow.freeNode(mergeChild) + } + return n.remove(item, minItems, typ) +} + +type direction int + +const ( + descend = direction(-1) + ascend = direction(+1) +) + +// iterate provides a simple method for iterating over elements in the tree. +// +// When ascending, the 'start' should be less than 'stop' and when descending, +// the 'start' should be greater than 'stop'. Setting 'includeStart' to true +// will force the iterator to include the first item when it equals 'start', +// thus creating a "greaterOrEqual" or "lessThanEqual" rather than just a +// "greaterThan" or "lessThan" queries. +func (n *node) iterate(dir direction, start, stop Item, includeStart bool, hit bool, iter ItemIterator) (bool, bool) { + var ok, found bool + var index int + switch dir { + case ascend: + if start != nil { + index, _ = n.items.find(start) + } + for i := index; i < len(n.items); i++ { + if len(n.children) > 0 { + if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if !includeStart && !hit && start != nil && !start.Less(n.items[i]) { + hit = true + continue + } + hit = true + if stop != nil && !n.items[i].Less(stop) { + return hit, false + } + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[len(n.children)-1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + case descend: + if start != nil { + index, found = n.items.find(start) + if !found { + index = index - 1 + } + } else { + index = len(n.items) - 1 + } + for i := index; i >= 0; i-- { + if start != nil && !n.items[i].Less(start) { + if !includeStart || hit || start.Less(n.items[i]) { + continue + } + } + if len(n.children) > 0 { + if hit, ok = n.children[i+1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if stop != nil && !stop.Less(n.items[i]) { + return hit, false // continue + } + hit = true + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[0].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + } + return hit, true +} + +// Used for testing/debugging purposes. +func (n *node) print(w io.Writer, level int) { + fmt.Fprintf(w, "%sNODE:%v\n", strings.Repeat(" ", level), n.items) + for _, c := range n.children { + c.print(w, level+1) + } +} + +// BTree is an implementation of a B-Tree. +// +// BTree stores Item instances in an ordered structure, allowing easy insertion, +// removal, and iteration. +// +// Write operations are not safe for concurrent mutation by multiple +// goroutines, but Read operations are. +type BTree struct { + degree int + length int + root *node + cow *copyOnWriteContext +} + +// copyOnWriteContext pointers determine node ownership... a tree with a write +// context equivalent to a node's write context is allowed to modify that node. +// A tree whose write context does not match a node's is not allowed to modify +// it, and must create a new, writable copy (IE: it's a Clone). +// +// When doing any write operation, we maintain the invariant that the current +// node's context is equal to the context of the tree that requested the write. +// We do this by, before we descend into any node, creating a copy with the +// correct context if the contexts don't match. +// +// Since the node we're currently visiting on any write has the requesting +// tree's context, that node is modifiable in place. Children of that node may +// not share context, but before we descend into them, we'll make a mutable +// copy. +type copyOnWriteContext struct { + freelist *FreeList +} + +// Clone clones the btree, lazily. Clone should not be called concurrently, +// but the original tree (t) and the new tree (t2) can be used concurrently +// once the Clone call completes. +// +// The internal tree structure of b is marked read-only and shared between t and +// t2. Writes to both t and t2 use copy-on-write logic, creating new nodes +// whenever one of b's original nodes would have been modified. Read operations +// should have no performance degredation. Write operations for both t and t2 +// will initially experience minor slow-downs caused by additional allocs and +// copies due to the aforementioned copy-on-write logic, but should converge to +// the original performance characteristics of the original tree. +func (t *BTree) Clone() (t2 *BTree) { + // Create two entirely new copy-on-write contexts. + // This operation effectively creates three trees: + // the original, shared nodes (old b.cow) + // the new b.cow nodes + // the new out.cow nodes + cow1, cow2 := *t.cow, *t.cow + out := *t + t.cow = &cow1 + out.cow = &cow2 + return &out +} + +// maxItems returns the max number of items to allow per node. +func (t *BTree) maxItems() int { + return t.degree*2 - 1 +} + +// minItems returns the min number of items to allow per node (ignored for the +// root node). +func (t *BTree) minItems() int { + return t.degree - 1 +} + +func (c *copyOnWriteContext) newNode() (n *node) { + n = c.freelist.newNode() + n.cow = c + return +} + +type freeType int + +const ( + ftFreelistFull freeType = iota // node was freed (available for GC, not stored in freelist) + ftStored // node was stored in the freelist for later use + ftNotOwned // node was ignored by COW, since it's owned by another one +) + +// freeNode frees a node within a given COW context, if it's owned by that +// context. It returns what happened to the node (see freeType const +// documentation). +func (c *copyOnWriteContext) freeNode(n *node) freeType { + if n.cow == c { + // clear to allow GC + n.items.truncate(0) + n.children.truncate(0) + n.cow = nil + if c.freelist.freeNode(n) { + return ftStored + } else { + return ftFreelistFull + } + } else { + return ftNotOwned + } +} + +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned. +// Otherwise, nil is returned. +// +// nil cannot be added to the tree (will panic). +func (t *BTree) ReplaceOrInsert(item Item) Item { + if item == nil { + panic("nil item being added to BTree") + } + if t.root == nil { + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item) + t.length++ + return nil + } else { + t.root = t.root.mutableFor(t.cow) + if len(t.root.items) >= t.maxItems() { + item2, second := t.root.split(t.maxItems() / 2) + oldroot := t.root + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item2) + t.root.children = append(t.root.children, oldroot, second) + } + } + out := t.root.insert(item, t.maxItems()) + if out == nil { + t.length++ + } + return out +} + +// Delete removes an item equal to the passed in item from the tree, returning +// it. If no such item exists, returns nil. +func (t *BTree) Delete(item Item) Item { + return t.deleteItem(item, removeItem) +} + +// DeleteMin removes the smallest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMin() Item { + return t.deleteItem(nil, removeMin) +} + +// DeleteMax removes the largest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMax() Item { + return t.deleteItem(nil, removeMax) +} + +func (t *BTree) deleteItem(item Item, typ toRemove) Item { + if t.root == nil || len(t.root.items) == 0 { + return nil + } + t.root = t.root.mutableFor(t.cow) + out := t.root.remove(item, t.minItems(), typ) + if len(t.root.items) == 0 && len(t.root.children) > 0 { + oldroot := t.root + t.root = t.root.children[0] + t.cow.freeNode(oldroot) + } + if out != nil { + t.length-- + } + return out +} + +// AscendRange calls the iterator for every value in the tree within the range +// [greaterOrEqual, lessThan), until iterator returns false. +func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, greaterOrEqual, lessThan, true, false, iterator) +} + +// AscendLessThan calls the iterator for every value in the tree within the range +// [first, pivot), until iterator returns false. +func (t *BTree) AscendLessThan(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, nil, pivot, false, false, iterator) +} + +// AscendGreaterOrEqual calls the iterator for every value in the tree within +// the range [pivot, last], until iterator returns false. +func (t *BTree) AscendGreaterOrEqual(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, pivot, nil, true, false, iterator) +} + +// Ascend calls the iterator for every value in the tree within the range +// [first, last], until iterator returns false. +func (t *BTree) Ascend(iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(ascend, nil, nil, false, false, iterator) +} + +// DescendRange calls the iterator for every value in the tree within the range +// [lessOrEqual, greaterThan), until iterator returns false. +func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, lessOrEqual, greaterThan, true, false, iterator) +} + +// DescendLessOrEqual calls the iterator for every value in the tree within the range +// [pivot, first], until iterator returns false. +func (t *BTree) DescendLessOrEqual(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, pivot, nil, true, false, iterator) +} + +// DescendGreaterThan calls the iterator for every value in the tree within +// the range [last, pivot), until iterator returns false. +func (t *BTree) DescendGreaterThan(pivot Item, iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, nil, pivot, false, false, iterator) +} + +// Descend calls the iterator for every value in the tree within the range +// [last, first], until iterator returns false. +func (t *BTree) Descend(iterator ItemIterator) { + if t.root == nil { + return + } + t.root.iterate(descend, nil, nil, false, false, iterator) +} + +// Get looks for the key item in the tree, returning it. It returns nil if +// unable to find that item. +func (t *BTree) Get(key Item) Item { + if t.root == nil { + return nil + } + return t.root.get(key) +} + +// Min returns the smallest item in the tree, or nil if the tree is empty. +func (t *BTree) Min() Item { + return min(t.root) +} + +// Max returns the largest item in the tree, or nil if the tree is empty. +func (t *BTree) Max() Item { + return max(t.root) +} + +// Has returns true if the given key is in the tree. +func (t *BTree) Has(key Item) bool { + return t.Get(key) != nil +} + +// Len returns the number of items currently in the tree. +func (t *BTree) Len() int { + return t.length +} + +// Clear removes all items from the btree. If addNodesToFreelist is true, +// t's nodes are added to its freelist as part of this call, until the freelist +// is full. Otherwise, the root node is simply dereferenced and the subtree +// left to Go's normal GC processes. +// +// This can be much faster +// than calling Delete on all elements, because that requires finding/removing +// each element in the tree and updating the tree accordingly. It also is +// somewhat faster than creating a new tree to replace the old one, because +// nodes from the old tree are reclaimed into the freelist for use by the new +// one, instead of being lost to the garbage collector. +// +// This call takes: +// O(1): when addNodesToFreelist is false, this is a single operation. +// O(1): when the freelist is already full, it breaks out immediately +// O(freelist size): when the freelist is empty and the nodes are all owned +// by this tree, nodes are added to the freelist until full. +// O(tree size): when all nodes are owned by another tree, all nodes are +// iterated over looking for nodes to add to the freelist, and due to +// ownership, none are. +func (t *BTree) Clear(addNodesToFreelist bool) { + if t.root != nil && addNodesToFreelist { + t.root.reset(t.cow) + } + t.root, t.length = nil, 0 +} + +// reset returns a subtree to the freelist. It breaks out immediately if the +// freelist is full, since the only benefit of iterating is to fill that +// freelist up. Returns true if parent reset call should continue. +func (n *node) reset(c *copyOnWriteContext) bool { + for _, child := range n.children { + if !child.reset(c) { + return false + } + } + return c.freeNode(n) != ftFreelistFull +} + +// Int implements the Item interface for integers. +type Int int + +// Less returns true if int(a) < int(b). +func (a Int) Less(b Item) bool { + return a < b.(Int) +} diff --git a/vendor/github.com/google/btree/go.mod b/vendor/github.com/google/btree/go.mod new file mode 100644 index 0000000000..fe4d5ca17b --- /dev/null +++ b/vendor/github.com/google/btree/go.mod @@ -0,0 +1,17 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module github.com/google/btree + +go 1.12 diff --git a/vendor/github.com/hashicorp/memberlist/README.md b/vendor/github.com/hashicorp/memberlist/README.md index 0adc075e81..6a2caa30e0 100644 --- a/vendor/github.com/hashicorp/memberlist/README.md +++ b/vendor/github.com/hashicorp/memberlist/README.md @@ -1,4 +1,4 @@ -# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) +# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) [![CircleCI](https://circleci.com/gh/hashicorp/memberlist.svg?style=svg)](https://circleci.com/gh/hashicorp/memberlist) memberlist is a [Go](http://www.golang.org) library that manages cluster membership and member failure detection using a gossip based protocol. @@ -23,8 +23,6 @@ Please check your installation with: go version ``` -Run `make deps` to fetch dependencies before building - ## Usage Memberlist is surprisingly simple to use. An example is shown below: @@ -65,7 +63,7 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c ## Protocol -memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways: +memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways: * Several extensions are made to increase propagation speed and convergence rate. diff --git a/vendor/github.com/hashicorp/memberlist/alive_delegate.go b/vendor/github.com/hashicorp/memberlist/alive_delegate.go index 51a0ba9054..615f4a90a5 100644 --- a/vendor/github.com/hashicorp/memberlist/alive_delegate.go +++ b/vendor/github.com/hashicorp/memberlist/alive_delegate.go @@ -7,8 +7,8 @@ package memberlist // a node out and prevent it from being considered a peer // using application specific logic. type AliveDelegate interface { - // NotifyMerge is invoked when a merge could take place. - // Provides a list of the nodes known by the peer. If - // the return value is non-nil, the merge is canceled. + // NotifyAlive is invoked when a message about a live + // node is received from the network. Returning a non-nil + // error prevents the node from being considered a peer. NotifyAlive(peer *Node) error } diff --git a/vendor/github.com/hashicorp/memberlist/broadcast.go b/vendor/github.com/hashicorp/memberlist/broadcast.go index f7e85a119c..d07d41bb69 100644 --- a/vendor/github.com/hashicorp/memberlist/broadcast.go +++ b/vendor/github.com/hashicorp/memberlist/broadcast.go @@ -29,6 +29,11 @@ func (b *memberlistBroadcast) Invalidates(other Broadcast) bool { return b.node == mb.node } +// memberlist.NamedBroadcast optional interface +func (b *memberlistBroadcast) Name() string { + return b.node +} + func (b *memberlistBroadcast) Message() []byte { return b.msg } diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index c85b1657a2..31099e75f4 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -1,10 +1,15 @@ package memberlist import ( + "fmt" "io" "log" + "net" "os" + "strings" "time" + + multierror "github.com/hashicorp/go-multierror" ) type Config struct { @@ -116,6 +121,10 @@ type Config struct { // indirect UDP pings. DisableTcpPings bool + // DisableTcpPingsForNode is like DisableTcpPings, but lets you control + // whether to perform TCP pings on a node-by-node basis. + DisableTcpPingsForNode func(nodeName string) bool + // AwarenessMaxMultiplier will increase the probe interval if the node // becomes aware that it might be degraded and not meeting the soft real // time requirements to reliably probe other nodes. @@ -215,6 +224,44 @@ type Config struct { // This is a legacy name for backward compatibility but should really be // called PacketBufferSize now that we have generalized the transport. UDPBufferSize int + + // DeadNodeReclaimTime controls the time before a dead node's name can be + // reclaimed by one with a different address or port. By default, this is 0, + // meaning nodes cannot be reclaimed this way. + DeadNodeReclaimTime time.Duration + + // RequireNodeNames controls if the name of a node is required when sending + // a message to that node. + RequireNodeNames bool + // CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks + // allowed to connect (you must specify IPv6/IPv4 separately) + // Using [] will block all connections. + CIDRsAllowed []net.IPNet +} + +// ParseCIDRs return a possible empty list of all Network that have been parsed +// In case of error, it returns succesfully parsed CIDRs and the last error found +func ParseCIDRs(v []string) ([]net.IPNet, error) { + nets := make([]net.IPNet, 0) + if v == nil { + return nets, nil + } + var errs error + hasErrors := false + for _, p := range v { + _, net, err := net.ParseCIDR(strings.TrimSpace(p)) + if err != nil { + err = fmt.Errorf("invalid cidr: %s", p) + errs = multierror.Append(errs, err) + hasErrors = true + } else { + nets = append(nets, *net) + } + } + if !hasErrors { + errs = nil + } + return nets, errs } // DefaultLANConfig returns a sane set of configurations for Memberlist. @@ -258,6 +305,7 @@ func DefaultLANConfig() *Config { HandoffQueueDepth: 1024, UDPBufferSize: 1400, + CIDRsAllowed: nil, // same as allow all } } @@ -277,6 +325,24 @@ func DefaultWANConfig() *Config { return conf } +// IPMustBeChecked return true if IPAllowed must be called +func (c *Config) IPMustBeChecked() bool { + return len(c.CIDRsAllowed) > 0 +} + +// IPAllowed return an error if access to memberlist is denied +func (c *Config) IPAllowed(ip net.IP) error { + if !c.IPMustBeChecked() { + return nil + } + for _, n := range c.CIDRsAllowed { + if n.Contains(ip) { + return nil + } + } + return fmt.Errorf("%s is not allowed", ip) +} + // DefaultLocalConfig works like DefaultConfig, however it returns a configuration // that is optimized for a local loopback environments. The default configuration is // still very conservative and errs on the side of caution. diff --git a/vendor/github.com/hashicorp/memberlist/event_delegate.go b/vendor/github.com/hashicorp/memberlist/event_delegate.go index 35e2a56fdd..352f98b43e 100644 --- a/vendor/github.com/hashicorp/memberlist/event_delegate.go +++ b/vendor/github.com/hashicorp/memberlist/event_delegate.go @@ -49,13 +49,16 @@ type NodeEvent struct { } func (c *ChannelEventDelegate) NotifyJoin(n *Node) { - c.Ch <- NodeEvent{NodeJoin, n} + node := *n + c.Ch <- NodeEvent{NodeJoin, &node} } func (c *ChannelEventDelegate) NotifyLeave(n *Node) { - c.Ch <- NodeEvent{NodeLeave, n} + node := *n + c.Ch <- NodeEvent{NodeLeave, &node} } func (c *ChannelEventDelegate) NotifyUpdate(n *Node) { - c.Ch <- NodeEvent{NodeUpdate, n} + node := *n + c.Ch <- NodeEvent{NodeUpdate, &node} } diff --git a/vendor/github.com/hashicorp/memberlist/go.mod b/vendor/github.com/hashicorp/memberlist/go.mod new file mode 100644 index 0000000000..1b83a4f285 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/go.mod @@ -0,0 +1,18 @@ +module github.com/hashicorp/memberlist + +go 1.12 + +require ( + github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c + github.com/hashicorp/go-immutable-radix v1.0.0 // indirect + github.com/hashicorp/go-msgpack v0.5.3 + github.com/hashicorp/go-multierror v1.0.0 + github.com/hashicorp/go-sockaddr v1.0.0 + github.com/miekg/dns v1.1.26 + github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 + github.com/stretchr/testify v1.2.2 +) diff --git a/vendor/github.com/hashicorp/memberlist/logging.go b/vendor/github.com/hashicorp/memberlist/logging.go index f31acfb2fa..2ca2bab4e3 100644 --- a/vendor/github.com/hashicorp/memberlist/logging.go +++ b/vendor/github.com/hashicorp/memberlist/logging.go @@ -13,6 +13,14 @@ func LogAddress(addr net.Addr) string { return fmt.Sprintf("from=%s", addr.String()) } +func LogStringAddress(addr string) string { + if addr == "" { + return "from=" + } + + return fmt.Sprintf("from=%s", addr) +} + func LogConn(conn net.Conn) string { if conn == nil { return LogAddress(nil) diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index e9084f9fd4..7ee0400919 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -15,6 +15,8 @@ multiple routes. package memberlist import ( + "container/list" + "errors" "fmt" "log" "net" @@ -30,10 +32,17 @@ import ( "github.com/miekg/dns" ) +var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided") + type Memberlist struct { sequenceNum uint32 // Local sequence number incarnation uint32 // Local incarnation number numNodes uint32 // Number of known nodes (estimate) + pushPullReq uint32 // Number of push/pull requests + + advertiseLock sync.RWMutex + advertiseAddr net.IP + advertisePort uint16 config *Config shutdown int32 // Used as an atomic boolean value @@ -44,13 +53,17 @@ type Memberlist struct { shutdownLock sync.Mutex // Serializes calls to Shutdown leaveLock sync.Mutex // Serializes calls to Leave - transport Transport - handoff chan msgHandoff + transport NodeAwareTransport + + handoffCh chan struct{} + highPriorityMsgQueue *list.List + lowPriorityMsgQueue *list.List + msgQueueLock sync.Mutex nodeLock sync.RWMutex nodes []*nodeState // Known nodes - nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState - nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer + nodeMap map[string]*nodeState // Maps Node.Name -> NodeState + nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer awareness *awareness tickerLock sync.Mutex @@ -66,6 +79,15 @@ type Memberlist struct { logger *log.Logger } +// BuildVsnArray creates the array of Vsn +func (conf *Config) BuildVsnArray() []uint8 { + return []uint8{ + ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion, + conf.DelegateProtocolMin, conf.DelegateProtocolMax, + conf.DelegateProtocolVersion, + } +} + // newMemberlist creates the network listeners. // Does not schedule execution of background maintenance. func newMemberlist(conf *Config) (*Memberlist, error) { @@ -159,22 +181,38 @@ func newMemberlist(conf *Config) (*Memberlist, error) { transport = nt } + nodeAwareTransport, ok := transport.(NodeAwareTransport) + if !ok { + logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired") + nodeAwareTransport = &shimNodeAwareTransport{transport} + } + m := &Memberlist{ - config: conf, - shutdownCh: make(chan struct{}), - leaveBroadcast: make(chan struct{}, 1), - transport: transport, - handoff: make(chan msgHandoff, conf.HandoffQueueDepth), - nodeMap: make(map[string]*nodeState), - nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), - ackHandlers: make(map[uint32]*ackHandler), - broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, - logger: logger, + config: conf, + shutdownCh: make(chan struct{}), + leaveBroadcast: make(chan struct{}, 1), + transport: nodeAwareTransport, + handoffCh: make(chan struct{}, 1), + highPriorityMsgQueue: list.New(), + lowPriorityMsgQueue: list.New(), + nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), + ackHandlers: make(map[uint32]*ackHandler), + broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, + logger: logger, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() } + + // Get the final advertise address from the transport, which may need + // to see which address we bound to. We'll refresh this each time we + // send out an alive message. + if _, _, err := m.refreshAdvertise(); err != nil { + return nil, err + } + go m.streamListen() go m.packetListen() go m.packetHandler() @@ -222,7 +260,8 @@ func (m *Memberlist) Join(existing []string) (int, error) { for _, addr := range addrs { hp := joinHostPort(addr.ip.String(), addr.port) - if err := m.pushPullNode(hp, true); err != nil { + a := Address{Addr: hp, Name: addr.nodeName} + if err := m.pushPullNode(a, true); err != nil { err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) errs = multierror.Append(errs, err) m.logger.Printf("[DEBUG] memberlist: %v", err) @@ -240,8 +279,9 @@ func (m *Memberlist) Join(existing []string) (int, error) { // ipPort holds information about a node we want to try to join. type ipPort struct { - ip net.IP - port uint16 + ip net.IP + port uint16 + nodeName string // optional } // tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. @@ -250,7 +290,7 @@ type ipPort struct { // Consul's. By doing the TCP lookup directly, we get the best chance for the // largest list of hosts to join. Since joins are relatively rare events, it's ok // to do this rather expensive operation. -func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) { +func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) { // Don't attempt any TCP lookups against non-fully qualified domain // names, since those will likely come from the resolv.conf file. if !strings.Contains(host, ".") { @@ -292,9 +332,9 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err for _, r := range in.Answer { switch rr := r.(type) { case (*dns.A): - ips = append(ips, ipPort{rr.A, defaultPort}) + ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName}) case (*dns.AAAA): - ips = append(ips, ipPort{rr.AAAA, defaultPort}) + ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName}) case (*dns.CNAME): m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) } @@ -308,6 +348,16 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err // resolveAddr is used to resolve the address into an address, // port, and error. If no port is given, use the default func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { + // First peel off any leading node name. This is optional. + nodeName := "" + if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 { + if slashIdx == 0 { + return nil, fmt.Errorf("empty node name provided") + } + nodeName = hostStr[0:slashIdx] + hostStr = hostStr[slashIdx+1:] + } + // This captures the supplied port, or the default one. hostStr = ensurePort(hostStr, m.config.BindPort) host, sport, err := net.SplitHostPort(hostStr) @@ -324,13 +374,15 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { // will make sure the host part is in good shape for parsing, even for // IPv6 addresses. if ip := net.ParseIP(host); ip != nil { - return []ipPort{ipPort{ip, port}}, nil + return []ipPort{ + ipPort{ip: ip, port: port, nodeName: nodeName}, + }, nil } // First try TCP so we have the best chance for the largest list of // hosts to join. If this fails it's not fatal since this isn't a standard // way to query DNS, and we have a fallback below. - ips, err := m.tcpLookupIP(host, port) + ips, err := m.tcpLookupIP(host, port, nodeName) if err != nil { m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) } @@ -347,7 +399,7 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { } ips = make([]ipPort, 0, len(ans)) for _, ip := range ans { - ips = append(ips, ipPort{ip, port}) + ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName}) } return ips, nil } @@ -358,10 +410,9 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { func (m *Memberlist) setAlive() error { // Get the final advertise address from the transport, which may need // to see which address we bound to. - addr, port, err := m.transport.FinalAdvertiseAddr( - m.config.AdvertiseAddr, m.config.AdvertisePort) + addr, port, err := m.refreshAdvertise() if err != nil { - return fmt.Errorf("Failed to get final advertise address: %v", err) + return err } // Check if this is a public address without encryption @@ -394,16 +445,36 @@ func (m *Memberlist) setAlive() error { Addr: addr, Port: uint16(port), Meta: meta, - Vsn: []uint8{ - ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, - m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, - m.config.DelegateProtocolVersion, - }, + Vsn: m.config.BuildVsnArray(), } m.aliveNode(&a, nil, true) + return nil } +func (m *Memberlist) getAdvertise() (net.IP, uint16) { + m.advertiseLock.RLock() + defer m.advertiseLock.RUnlock() + return m.advertiseAddr, m.advertisePort +} + +func (m *Memberlist) setAdvertise(addr net.IP, port int) { + m.advertiseLock.Lock() + defer m.advertiseLock.Unlock() + m.advertiseAddr = addr + m.advertisePort = uint16(port) +} + +func (m *Memberlist) refreshAdvertise() (net.IP, int, error) { + addr, port, err := m.transport.FinalAdvertiseAddr( + m.config.AdvertiseAddr, m.config.AdvertisePort) + if err != nil { + return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err) + } + m.setAdvertise(addr, port) + return addr, port, nil +} + // LocalNode is used to return the local Node func (m *Memberlist) LocalNode() *Node { m.nodeLock.RLock() @@ -439,11 +510,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error { Addr: state.Addr, Port: state.Port, Meta: meta, - Vsn: []uint8{ - ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, - m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, - m.config.DelegateProtocolVersion, - }, + Vsn: m.config.BuildVsnArray(), } notifyCh := make(chan struct{}) m.aliveNode(&a, notifyCh, true) @@ -463,24 +530,29 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error { return nil } -// SendTo is deprecated in favor of SendBestEffort, which requires a node to -// target. +// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to +// target. If you don't have a node then use SendToAddress. func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { + a := Address{Addr: to.String(), Name: ""} + return m.SendToAddress(a, msg) +} + +func (m *Memberlist) SendToAddress(a Address, msg []byte) error { // Encode as a user message buf := make([]byte, 1, len(msg)+1) buf[0] = byte(userMsg) buf = append(buf, msg...) // Send the message - return m.rawSendMsgPacket(to.String(), nil, buf) + return m.rawSendMsgPacket(a, nil, buf) } -// SendToUDP is deprecated in favor of SendBestEffort. +// Deprecated: SendToUDP is deprecated in favor of SendBestEffort. func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { return m.SendBestEffort(to, msg) } -// SendToTCP is deprecated in favor of SendReliable. +// Deprecated: SendToTCP is deprecated in favor of SendReliable. func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { return m.SendReliable(to, msg) } @@ -496,7 +568,8 @@ func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { buf = append(buf, msg...) // Send the message - return m.rawSendMsgPacket(to.Address(), to, buf) + a := Address{Addr: to.Address(), Name: to.Name} + return m.rawSendMsgPacket(a, to, buf) } // SendReliable uses the reliable stream-oriented interface of the transport to @@ -504,7 +577,7 @@ func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { // mechanism). Delivery is guaranteed if no error is returned, and there is no // limit on the size of the message. func (m *Memberlist) SendReliable(to *Node, msg []byte) error { - return m.sendUserMsg(to.Address(), msg) + return m.sendUserMsg(to.FullAddress(), msg) } // Members returns a list of all known live nodes. The node structures @@ -516,7 +589,7 @@ func (m *Memberlist) Members() []*Node { nodes := make([]*Node, 0, len(m.nodes)) for _, n := range m.nodes { - if n.State != stateDead { + if !n.DeadOrLeft() { nodes = append(nodes, &n.Node) } } @@ -533,7 +606,7 @@ func (m *Memberlist) NumMembers() (alive int) { defer m.nodeLock.RUnlock() for _, n := range m.nodes { - if n.State != stateDead { + if !n.DeadOrLeft() { alive++ } } @@ -570,9 +643,14 @@ func (m *Memberlist) Leave(timeout time.Duration) error { return nil } + // This dead message is special, because Node and From are the + // same. This helps other nodes figure out that a node left + // intentionally. When Node equals From, other nodes know for + // sure this node is gone. d := dead{ Incarnation: state.Incarnation, Node: state.Name, + From: state.Name, } m.deadNode(&d) @@ -598,7 +676,7 @@ func (m *Memberlist) anyAlive() bool { m.nodeLock.RLock() defer m.nodeLock.RUnlock() for _, n := range m.nodes { - if n.State != stateDead && n.Name != m.config.Name { + if !n.DeadOrLeft() && n.Name != m.config.Name { return true } } @@ -621,7 +699,7 @@ func (m *Memberlist) ProtocolVersion() uint8 { return m.config.ProtocolVersion } -// Shutdown will stop any background maintanence of network activity +// Shutdown will stop any background maintenance of network activity // for this memberlist, causing it to appear "dead". A leave message // will not be broadcasted prior, so the cluster being left will have // to detect this node's shutdown using probing. If you wish to more @@ -657,3 +735,27 @@ func (m *Memberlist) hasShutdown() bool { func (m *Memberlist) hasLeft() bool { return atomic.LoadInt32(&m.leave) == 1 } + +func (m *Memberlist) getNodeState(addr string) NodeStateType { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + n := m.nodeMap[addr] + return n.State +} + +func (m *Memberlist) getNodeStateChange(addr string) time.Time { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + n := m.nodeMap[addr] + return n.StateChange +} + +func (m *Memberlist) changeNode(addr string, f func(*nodeState)) { + m.nodeLock.Lock() + defer m.nodeLock.Unlock() + + n := m.nodeMap[addr] + f(n) +} diff --git a/vendor/github.com/hashicorp/memberlist/mock_transport.go b/vendor/github.com/hashicorp/memberlist/mock_transport.go index b8bafa8026..0a7d30a277 100644 --- a/vendor/github.com/hashicorp/memberlist/mock_transport.go +++ b/vendor/github.com/hashicorp/memberlist/mock_transport.go @@ -1,7 +1,9 @@ package memberlist import ( + "bytes" "fmt" + "io" "net" "strconv" "time" @@ -10,26 +12,33 @@ import ( // MockNetwork is used as a factory that produces MockTransport instances which // are uniquely addressed and wired up to talk to each other. type MockNetwork struct { - transports map[string]*MockTransport - port int + transportsByAddr map[string]*MockTransport + transportsByName map[string]*MockTransport + port int } // NewTransport returns a new MockTransport with a unique address, wired up to // talk to the other transports in the MockNetwork. -func (n *MockNetwork) NewTransport() *MockTransport { +func (n *MockNetwork) NewTransport(name string) *MockTransport { n.port += 1 addr := fmt.Sprintf("127.0.0.1:%d", n.port) transport := &MockTransport{ net: n, - addr: &MockAddress{addr}, + addr: &MockAddress{addr, name}, packetCh: make(chan *Packet), streamCh: make(chan net.Conn), } - if n.transports == nil { - n.transports = make(map[string]*MockTransport) + if n.transportsByAddr == nil { + n.transportsByAddr = make(map[string]*MockTransport) } - n.transports[addr] = transport + n.transportsByAddr[addr] = transport + + if n.transportsByName == nil { + n.transportsByName = make(map[string]*MockTransport) + } + n.transportsByName[name] = transport + return transport } @@ -37,6 +46,7 @@ func (n *MockNetwork) NewTransport() *MockTransport { // address scheme. type MockAddress struct { addr string + name string } // See net.Addr. @@ -57,6 +67,8 @@ type MockTransport struct { streamCh chan net.Conn } +var _ NodeAwareTransport = (*MockTransport)(nil) + // See Transport. func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) { host, portStr, err := net.SplitHostPort(t.addr.String()) @@ -79,9 +91,15 @@ func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) { // See Transport. func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) { - dest, ok := t.net.transports[addr] - if !ok { - return time.Time{}, fmt.Errorf("No route to %q", addr) + a := Address{Addr: addr, Name: ""} + return t.WriteToAddress(b, a) +} + +// See NodeAwareTransport. +func (t *MockTransport) WriteToAddress(b []byte, a Address) (time.Time, error) { + dest, err := t.getPeer(a) + if err != nil { + return time.Time{}, err } now := time.Now() @@ -98,11 +116,45 @@ func (t *MockTransport) PacketCh() <-chan *Packet { return t.packetCh } +// See NodeAwareTransport. +func (t *MockTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error { + if shouldClose { + defer conn.Close() + } + + // Copy everything from the stream into packet buffer. + var buf bytes.Buffer + if _, err := io.Copy(&buf, conn); err != nil { + return fmt.Errorf("failed to read packet: %v", err) + } + + // Check the length - it needs to have at least one byte to be a proper + // message. This is checked elsewhere for writes coming in directly from + // the UDP socket. + if n := buf.Len(); n < 1 { + return fmt.Errorf("packet too short (%d bytes) %s", n, LogAddress(addr)) + } + + // Inject the packet. + t.packetCh <- &Packet{ + Buf: buf.Bytes(), + From: addr, + Timestamp: now, + } + return nil +} + // See Transport. func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { - dest, ok := t.net.transports[addr] - if !ok { - return nil, fmt.Errorf("No route to %q", addr) + a := Address{Addr: addr, Name: ""} + return t.DialAddressTimeout(a, timeout) +} + +// See NodeAwareTransport. +func (t *MockTransport) DialAddressTimeout(a Address, timeout time.Duration) (net.Conn, error) { + dest, err := t.getPeer(a) + if err != nil { + return nil, err } p1, p2 := net.Pipe() @@ -115,7 +167,29 @@ func (t *MockTransport) StreamCh() <-chan net.Conn { return t.streamCh } +// See NodeAwareTransport. +func (t *MockTransport) IngestStream(conn net.Conn) error { + t.streamCh <- conn + return nil +} + // See Transport. func (t *MockTransport) Shutdown() error { return nil } + +func (t *MockTransport) getPeer(a Address) (*MockTransport, error) { + var ( + dest *MockTransport + ok bool + ) + if a.Name != "" { + dest, ok = t.net.transportsByName[a.Name] + } else { + dest, ok = t.net.transportsByAddr[a.Addr] + } + if !ok { + return nil, fmt.Errorf("No route to %s", a) + } + return dest, nil +} diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index a4330c4d20..bac73bd89f 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -8,9 +8,10 @@ import ( "hash/crc32" "io" "net" + "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" ) @@ -71,7 +72,8 @@ const ( compoundOverhead = 2 // Assumed overhead per entry in compoundHeader userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process - maxPushStateBytes = 10 * 1024 * 1024 + maxPushStateBytes = 20 * 1024 * 1024 + maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests ) // ping request sent directly to node @@ -82,15 +84,28 @@ type ping struct { // the intended recipient. This is to protect again an agent // restart with a new name. Node string + + SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply + SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply + SourceNode string `codec:",omitempty"` // Source name, used for a direct reply } -// indirect ping sent to an indirect ndoe +// indirect ping sent to an indirect node type indirectPingReq struct { SeqNo uint32 Target []byte Port uint16 - Node string - Nack bool // true if we'd like a nack back + + // Node is sent so the target can verify they are + // the intended recipient. This is to protect against an agent + // restart with a new name. + Node string + + Nack bool // true if we'd like a nack back + + SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply + SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply + SourceNode string `codec:",omitempty"` // Source name, used for a direct reply } // ack response is sent for a ping @@ -161,7 +176,7 @@ type pushNodeState struct { Port uint16 Meta []byte Incarnation uint32 - State nodeStateType + State NodeStateType Vsn []uint8 // Protocol versions } @@ -205,9 +220,9 @@ func (m *Memberlist) streamListen() { // handleConn handles a single incoming stream connection from the transport. func (m *Memberlist) handleConn(conn net.Conn) { + defer conn.Close() m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) - defer conn.Close() metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -238,6 +253,16 @@ func (m *Memberlist) handleConn(conn net.Conn) { m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) } case pushPullMsg: + // Increment counter of pending push/pulls + numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) + defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) + + // Check if we have too many open push/pull requests + if numConcurrent >= maxPushPullRequests { + m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") + return + } + join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) @@ -330,6 +355,10 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time } func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) { + if len(buf) < 1 { + m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from)) + return + } // Decode the message type msgType := messageType(buf[0]) buf = buf[1:] @@ -357,10 +386,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim case deadMsg: fallthrough case userMsg: - select { - case m.handoff <- msgHandoff{msgType, buf, from}: - default: + // Determine the message queue, prioritize alive + queue := m.lowPriorityMsgQueue + if msgType == aliveMsg { + queue = m.highPriorityMsgQueue + } + + // Check for overflow and append if not full + m.msgQueueLock.Lock() + if queue.Len() >= m.config.HandoffQueueDepth { m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + } else { + queue.PushBack(msgHandoff{msgType, buf, from}) + } + m.msgQueueLock.Unlock() + + // Notify of pending message + select { + case m.handoffCh <- struct{}{}: + default: } default: @@ -368,28 +412,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim } } +// getNextMessage returns the next message to process in priority order, using LIFO +func (m *Memberlist) getNextMessage() (msgHandoff, bool) { + m.msgQueueLock.Lock() + defer m.msgQueueLock.Unlock() + + if el := m.highPriorityMsgQueue.Back(); el != nil { + m.highPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } else if el := m.lowPriorityMsgQueue.Back(); el != nil { + m.lowPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } + return msgHandoff{}, false +} + // packetHandler is a long running goroutine that processes messages received // over the packet interface, but is decoupled from the listener to avoid // blocking the listener which may cause ping/ack messages to be delayed. func (m *Memberlist) packetHandler() { for { select { - case msg := <-m.handoff: - msgType := msg.msgType - buf := msg.buf - from := msg.from + case <-m.handoffCh: + for { + msg, ok := m.getNextMessage() + if !ok { + break + } + msgType := msg.msgType + buf := msg.buf + from := msg.from - switch msgType { - case suspectMsg: - m.handleSuspect(buf, from) - case aliveMsg: - m.handleAlive(buf, from) - case deadMsg: - m.handleDead(buf, from) - case userMsg: - m.handleUser(buf, from) - default: - m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + switch msgType { + case suspectMsg: + m.handleSuspect(buf, from) + case aliveMsg: + m.handleAlive(buf, from) + case deadMsg: + m.handleDead(buf, from) + case userMsg: + m.handleUser(buf, from) + default: + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + } } case <-m.shutdownCh: @@ -433,7 +500,19 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) { if m.config.Ping != nil { ack.Payload = m.config.Ping.AckPayload() } - if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { + + addr := "" + if len(p.SourceAddr) > 0 && p.SourcePort > 0 { + addr = joinHostPort(net.IP(p.SourceAddr).String(), p.SourcePort) + } else { + addr = from.String() + } + + a := Address{ + Addr: addr, + Name: p.SourceNode, + } + if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from)) } } @@ -453,7 +532,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Send a ping to the correct host. localSeqNo := m.nextSeqNo() - ping := ping{SeqNo: localSeqNo, Node: ind.Node} + selfAddr, selfPort := m.getAdvertise() + ping := ping{ + SeqNo: localSeqNo, + Node: ind.Node, + // The outbound message is addressed FROM us. + SourceAddr: selfAddr, + SourcePort: selfPort, + SourceNode: m.config.Name, + } + + // Forward the ack back to the requestor. If the request encodes an origin + // use that otherwise assume that the other end of the UDP socket is + // usable. + indAddr := "" + if len(ind.SourceAddr) > 0 && ind.SourcePort > 0 { + indAddr = joinHostPort(net.IP(ind.SourceAddr).String(), ind.SourcePort) + } else { + indAddr = from.String() + } // Setup a response handler to relay the ack cancelCh := make(chan struct{}) @@ -461,18 +558,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { // Try to prevent the nack if we've caught it in time. close(cancelCh) - // Forward the ack back to the requestor. ack := ackResp{ind.SeqNo, nil} - if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from)) + a := Address{ + Addr: indAddr, + Name: ind.SourceNode, + } + if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogStringAddress(indAddr)) } } m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) // Send the ping. addr := joinHostPort(net.IP(ind.Target).String(), ind.Port) - if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from)) + a := Address{ + Addr: addr, + Name: ind.Node, + } + if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s %s", err, LogStringAddress(indAddr)) } // Setup a timer to fire off a nack if no ack is seen in time. @@ -483,8 +587,12 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { return case <-time.After(m.config.ProbeTimeout): nack := nackResp{ind.SeqNo} - if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from)) + a := Address{ + Addr: indAddr, + Name: ind.SourceNode, + } + if err := m.encodeAndSendMsg(a, nackRespMsg, &nack); err != nil { + m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogStringAddress(indAddr)) } } }() @@ -518,12 +626,47 @@ func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) { m.suspectNode(&sus) } +// ensureCanConnect return the IP from a RemoteAddress +// return error if this client must not connect +func (m *Memberlist) ensureCanConnect(from net.Addr) error { + if !m.config.IPMustBeChecked() { + return nil + } + source := from.String() + if source == "pipe" { + return nil + } + host, _, err := net.SplitHostPort(source) + if err != nil { + return err + } + + ip := net.ParseIP(host) + if ip == nil { + return fmt.Errorf("Cannot parse IP from %s", host) + } + return m.config.IPAllowed(ip) +} + func (m *Memberlist) handleAlive(buf []byte, from net.Addr) { + if err := m.ensureCanConnect(from); err != nil { + m.logger.Printf("[DEBUG] memberlist: Blocked alive message: %s %s", err, LogAddress(from)) + return + } var live alive if err := decode(buf, &live); err != nil { m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from)) return } + if m.config.IPMustBeChecked() { + innerIP := net.IP(live.Addr) + if innerIP != nil { + if err := m.config.IPAllowed(innerIP); err != nil { + m.logger.Printf("[DEBUG] memberlist: Blocked alive.Addr=%s message from: %s %s", innerIP.String(), err, LogAddress(from)) + return + } + } + } // For proto versions < 2, there is no port provided. Mask old // behavior by using the configured port @@ -565,12 +708,12 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time. } // encodeAndSendMsg is used to combine the encoding and sending steps -func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error { +func (m *Memberlist) encodeAndSendMsg(a Address, msgType messageType, msg interface{}) error { out, err := encode(msgType, msg) if err != nil { return err } - if err := m.sendMsg(addr, out.Bytes()); err != nil { + if err := m.sendMsg(a, out.Bytes()); err != nil { return err } return nil @@ -578,7 +721,7 @@ func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg inte // sendMsg is used to send a message via packet to another host. It will // opportunistically create a compoundMsg and piggy back other broadcasts. -func (m *Memberlist) sendMsg(addr string, msg []byte) error { +func (m *Memberlist) sendMsg(a Address, msg []byte) error { // Check if we can piggy back any messages bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { @@ -588,7 +731,7 @@ func (m *Memberlist) sendMsg(addr string, msg []byte) error { // Fast path if nothing to piggypack if len(extra) == 0 { - return m.rawSendMsgPacket(addr, nil, msg) + return m.rawSendMsgPacket(a, nil, msg) } // Join all the messages @@ -600,12 +743,16 @@ func (m *Memberlist) sendMsg(addr string, msg []byte) error { compound := makeCompoundMessage(msgs) // Send the message - return m.rawSendMsgPacket(addr, nil, compound.Bytes()) + return m.rawSendMsgPacket(a, nil, compound.Bytes()) } // rawSendMsgPacket is used to send message via packet to another host without // modification, other than compression or encryption if enabled. -func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error { +func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { + if a.Name == "" && m.config.RequireNodeNames { + return errNodeNamesAreRequired + } + // Check if we have compression enabled if m.config.EnableCompression { buf, err := compressPayload(msg) @@ -619,11 +766,12 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error } } - // Try to look up the destination node + // Try to look up the destination node. Note this will only work if the + // bare ip address is used as the node name, which is not guaranteed. if node == nil { - toAddr, _, err := net.SplitHostPort(addr) + toAddr, _, err := net.SplitHostPort(a.Addr) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err) + m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", a.Addr, err) return err } m.nodeLock.RLock() @@ -658,14 +806,14 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error } metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) - _, err := m.transport.WriteTo(msg, addr) + _, err := m.transport.WriteToAddress(msg, a) return err } // rawSendMsgStream is used to stream a message to another host without // modification, other than applying compression and encryption if enabled. func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { - // Check if compresion is enabled + // Check if compression is enabled if m.config.EnableCompression { compBuf, err := compressPayload(sendBuf) if err != nil { @@ -698,8 +846,12 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { } // sendUserMsg is used to stream a user message to another host. -func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error { - conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) +func (m *Memberlist) sendUserMsg(a Address, sendBuf []byte) error { + if a.Name == "" && m.config.RequireNodeNames { + return errNodeNamesAreRequired + } + + conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout) if err != nil { return err } @@ -724,14 +876,18 @@ func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error { // sendAndReceiveState is used to initiate a push/pull over a stream with a // remote host. -func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) { +func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, []byte, error) { + if a.Name == "" && m.config.RequireNodeNames { + return nil, nil, errNodeNamesAreRequired + } + // Attempt to connect - conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) + conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout) if err != nil { return nil, nil, err } defer conn.Close() - m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s", conn.RemoteAddr()) + m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr()) metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1) // Send our state @@ -996,16 +1152,17 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us nodes := make([]*Node, len(remoteNodes)) for idx, n := range remoteNodes { nodes[idx] = &Node{ - Name: n.Name, - Addr: n.Addr, - Port: n.Port, - Meta: n.Meta, - PMin: n.Vsn[0], - PMax: n.Vsn[1], - PCur: n.Vsn[2], - DMin: n.Vsn[3], - DMax: n.Vsn[4], - DCur: n.Vsn[5], + Name: n.Name, + Addr: n.Addr, + Port: n.Port, + Meta: n.Meta, + State: n.State, + PMin: n.Vsn[0], + PMax: n.Vsn[1], + PCur: n.Vsn[2], + DMin: n.Vsn[3], + DMax: n.Vsn[4], + DCur: n.Vsn[5], } } if err := m.config.Merge.NotifyMerge(nodes); err != nil { @@ -1058,8 +1215,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { // a ping, and waits for an ack. All of this is done as a series of blocking // operations, given the deadline. The bool return parameter is true if we // we able to round trip a ping to the other node. -func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) { - conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now())) +func (m *Memberlist) sendPingAndWaitForAck(a Address, ping ping, deadline time.Time) (bool, error) { + if a.Name == "" && m.config.RequireNodeNames { + return false, errNodeNamesAreRequired + } + + conn, err := m.transport.DialAddressTimeout(a, deadline.Sub(time.Now())) if err != nil { // If the node is actually dead we expect this to fail, so we // shouldn't spam the logs with it. After this point, errors @@ -1094,7 +1255,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) } return true, nil diff --git a/vendor/github.com/hashicorp/memberlist/net_transport.go b/vendor/github.com/hashicorp/memberlist/net_transport.go index e7b88b01f6..0583011729 100644 --- a/vendor/github.com/hashicorp/memberlist/net_transport.go +++ b/vendor/github.com/hashicorp/memberlist/net_transport.go @@ -1,7 +1,9 @@ package memberlist import ( + "bytes" "fmt" + "io" "log" "net" "sync" @@ -48,6 +50,8 @@ type NetTransport struct { shutdown int32 } +var _ NodeAwareTransport = (*NetTransport)(nil) + // NewNetTransport returns a net transport with the given configuration. On // success all the network listeners will be created and listening. func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { @@ -170,6 +174,14 @@ func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err // See Transport. func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) { + a := Address{Addr: addr, Name: ""} + return t.WriteToAddress(b, a) +} + +// See NodeAwareTransport. +func (t *NetTransport) WriteToAddress(b []byte, a Address) (time.Time, error) { + addr := a.Addr + udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return time.Time{}, err @@ -188,8 +200,44 @@ func (t *NetTransport) PacketCh() <-chan *Packet { return t.packetCh } +// See IngestionAwareTransport. +func (t *NetTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error { + if shouldClose { + defer conn.Close() + } + + // Copy everything from the stream into packet buffer. + var buf bytes.Buffer + if _, err := io.Copy(&buf, conn); err != nil { + return fmt.Errorf("failed to read packet: %v", err) + } + + // Check the length - it needs to have at least one byte to be a proper + // message. This is checked elsewhere for writes coming in directly from + // the UDP socket. + if n := buf.Len(); n < 1 { + return fmt.Errorf("packet too short (%d bytes) %s", n, LogAddress(addr)) + } + + // Inject the packet. + t.packetCh <- &Packet{ + Buf: buf.Bytes(), + From: addr, + Timestamp: now, + } + return nil +} + // See Transport. func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + a := Address{Addr: addr, Name: ""} + return t.DialAddressTimeout(a, timeout) +} + +// See NodeAwareTransport. +func (t *NetTransport) DialAddressTimeout(a Address, timeout time.Duration) (net.Conn, error) { + addr := a.Addr + dialer := net.Dialer{Timeout: timeout} return dialer.Dial("tcp", addr) } @@ -199,6 +247,12 @@ func (t *NetTransport) StreamCh() <-chan net.Conn { return t.streamCh } +// See IngestionAwareTransport. +func (t *NetTransport) IngestStream(conn net.Conn) error { + t.streamCh <- conn + return nil +} + // See Transport. func (t *NetTransport) Shutdown() error { // This will avoid log spam about errors when we shut down. @@ -221,6 +275,16 @@ func (t *NetTransport) Shutdown() error { // and hands them off to the stream channel. func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { defer t.wg.Done() + + // baseDelay is the initial delay after an AcceptTCP() error before attempting again + const baseDelay = 5 * time.Millisecond + + // maxDelay is the maximum delay after an AcceptTCP() error before attempting again. + // In the case that tcpListen() is error-looping, it will delay the shutdown check. + // Therefore, changes to maxDelay may have an effect on the latency of shutdown. + const maxDelay = 1 * time.Second + + var loopDelay time.Duration for { conn, err := tcpLn.AcceptTCP() if err != nil { @@ -228,9 +292,22 @@ func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { break } + if loopDelay == 0 { + loopDelay = baseDelay + } else { + loopDelay *= 2 + } + + if loopDelay > maxDelay { + loopDelay = maxDelay + } + t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err) + time.Sleep(loopDelay) continue } + // No error, reset loop delay + loopDelay = 0 t.streamCh <- conn } diff --git a/vendor/github.com/hashicorp/memberlist/queue.go b/vendor/github.com/hashicorp/memberlist/queue.go index 994b90ff10..c970176e18 100644 --- a/vendor/github.com/hashicorp/memberlist/queue.go +++ b/vendor/github.com/hashicorp/memberlist/queue.go @@ -1,8 +1,10 @@ package memberlist import ( - "sort" + "math" "sync" + + "github.com/google/btree" ) // TransmitLimitedQueue is used to queue messages to broadcast to @@ -19,15 +21,93 @@ type TransmitLimitedQueue struct { // number of retransmissions attempted. RetransmitMult int - sync.Mutex - bcQueue limitedBroadcasts + mu sync.Mutex + tq *btree.BTree // stores *limitedBroadcast as btree.Item + tm map[string]*limitedBroadcast + idGen int64 } type limitedBroadcast struct { - transmits int // Number of transmissions attempted. + transmits int // btree-key[0]: Number of transmissions attempted. + msgLen int64 // btree-key[1]: copied from len(b.Message()) + id int64 // btree-key[2]: unique incrementing id stamped at submission time b Broadcast + + name string // set if Broadcast is a NamedBroadcast +} + +// Less tests whether the current item is less than the given argument. +// +// This must provide a strict weak ordering. +// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only +// hold one of either a or b in the tree). +// +// default ordering is +// - [transmits=0, ..., transmits=inf] +// - [transmits=0:len=999, ..., transmits=0:len=2, ...] +// - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...] +func (b *limitedBroadcast) Less(than btree.Item) bool { + o := than.(*limitedBroadcast) + if b.transmits < o.transmits { + return true + } else if b.transmits > o.transmits { + return false + } + if b.msgLen > o.msgLen { + return true + } else if b.msgLen < o.msgLen { + return false + } + return b.id > o.id +} + +// for testing; emits in transmit order if reverse=false +func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast { + q.mu.Lock() + defer q.mu.Unlock() + + out := make([]*limitedBroadcast, 0, q.lenLocked()) + q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool { + out = append(out, cur) + return true + }) + + return out +} + +// walkReadOnlyLocked calls f for each item in the queue traversing it in +// natural order (by Less) when reverse=false and the opposite when true. You +// must hold the mutex. +// +// This method panics if you attempt to mutate the item during traversal. The +// underlying btree should also not be mutated during traversal. +func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) { + if q.lenLocked() == 0 { + return + } + + iter := func(item btree.Item) bool { + cur := item.(*limitedBroadcast) + + prevTransmits := cur.transmits + prevMsgLen := cur.msgLen + prevID := cur.id + + keepGoing := f(cur) + + if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id { + panic("edited queue while walking read only") + } + + return keepGoing + } + + if reverse { + q.tq.Descend(iter) // end with transmit 0 + } else { + q.tq.Ascend(iter) // start with transmit 0 + } } -type limitedBroadcasts []*limitedBroadcast // Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. @@ -45,123 +125,298 @@ type Broadcast interface { Finished() } +// NamedBroadcast is an optional extension of the Broadcast interface that +// gives each message a unique string name, and that is used to optimize +// +// You shoud ensure that Invalidates() checks the same uniqueness as the +// example below: +// +// func (b *foo) Invalidates(other Broadcast) bool { +// nb, ok := other.(NamedBroadcast) +// if !ok { +// return false +// } +// return b.Name() == nb.Name() +// } +// +// Invalidates() isn't currently used for NamedBroadcasts, but that may change +// in the future. +type NamedBroadcast interface { + Broadcast + // The unique identity of this broadcast message. + Name() string +} + +// UniqueBroadcast is an optional interface that indicates that each message is +// intrinsically unique and there is no need to scan the broadcast queue for +// duplicates. +// +// You should ensure that Invalidates() always returns false if implementing +// this interface. Invalidates() isn't currently used for UniqueBroadcasts, but +// that may change in the future. +type UniqueBroadcast interface { + Broadcast + // UniqueBroadcast is just a marker method for this interface. + UniqueBroadcast() +} + // QueueBroadcast is used to enqueue a broadcast func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { - q.Lock() - defer q.Unlock() + q.queueBroadcast(b, 0) +} - // Check if this message invalidates another - n := len(q.bcQueue) - for i := 0; i < n; i++ { - if b.Invalidates(q.bcQueue[i].b) { - q.bcQueue[i].b.Finished() - copy(q.bcQueue[i:], q.bcQueue[i+1:]) - q.bcQueue[n-1] = nil - q.bcQueue = q.bcQueue[:n-1] - n-- +// lazyInit initializes internal data structures the first time they are +// needed. You must already hold the mutex. +func (q *TransmitLimitedQueue) lazyInit() { + if q.tq == nil { + q.tq = btree.New(32) + } + if q.tm == nil { + q.tm = make(map[string]*limitedBroadcast) + } +} + +// queueBroadcast is like QueueBroadcast but you can use a nonzero value for +// the initial transmit tier assigned to the message. This is meant to be used +// for unit testing. +func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) { + q.mu.Lock() + defer q.mu.Unlock() + + q.lazyInit() + + if q.idGen == math.MaxInt64 { + // it's super duper unlikely to wrap around within the retransmit limit + q.idGen = 1 + } else { + q.idGen++ + } + id := q.idGen + + lb := &limitedBroadcast{ + transmits: initialTransmits, + msgLen: int64(len(b.Message())), + id: id, + b: b, + } + unique := false + if nb, ok := b.(NamedBroadcast); ok { + lb.name = nb.Name() + } else if _, ok := b.(UniqueBroadcast); ok { + unique = true + } + + // Check if this message invalidates another. + if lb.name != "" { + if old, ok := q.tm[lb.name]; ok { + old.b.Finished() + q.deleteItem(old) + } + } else if !unique { + // Slow path, hopefully nothing hot hits this. + var remove []*limitedBroadcast + q.tq.Ascend(func(item btree.Item) bool { + cur := item.(*limitedBroadcast) + + // Special Broadcasts can only invalidate each other. + switch cur.b.(type) { + case NamedBroadcast: + // noop + case UniqueBroadcast: + // noop + default: + if b.Invalidates(cur.b) { + cur.b.Finished() + remove = append(remove, cur) + } + } + return true + }) + for _, cur := range remove { + q.deleteItem(cur) } } - // Append to the queue - q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b}) + // Append to the relevant queue. + q.addItem(lb) +} + +// deleteItem removes the given item from the overall datastructure. You +// must already hold the mutex. +func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) { + _ = q.tq.Delete(cur) + if cur.name != "" { + delete(q.tm, cur.name) + } + + if q.tq.Len() == 0 { + // At idle there's no reason to let the id generator keep going + // indefinitely. + q.idGen = 0 + } +} + +// addItem adds the given item into the overall datastructure. You must already +// hold the mutex. +func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) { + _ = q.tq.ReplaceOrInsert(cur) + if cur.name != "" { + q.tm[cur.name] = cur + } +} + +// getTransmitRange returns a pair of min/max values for transmit values +// represented by the current queue contents. Both values represent actual +// transmit values on the interval [0, len). You must already hold the mutex. +func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) { + if q.lenLocked() == 0 { + return 0, 0 + } + minItem, maxItem := q.tq.Min(), q.tq.Max() + if minItem == nil || maxItem == nil { + return 0, 0 + } + + min := minItem.(*limitedBroadcast).transmits + max := maxItem.(*limitedBroadcast).transmits + + return min, max } // GetBroadcasts is used to get a number of broadcasts, up to a byte limit // and applying a per-message overhead as provided. func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() // Fast path the default case - if len(q.bcQueue) == 0 { + if q.lenLocked() == 0 { return nil } transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes()) - bytesUsed := 0 - var toSend [][]byte - for i := len(q.bcQueue) - 1; i >= 0; i-- { - // Check if this is within our limits - b := q.bcQueue[i] - msg := b.b.Message() - if bytesUsed+overhead+len(msg) > limit { + var ( + bytesUsed int + toSend [][]byte + reinsert []*limitedBroadcast + ) + + // Visit fresher items first, but only look at stuff that will fit. + // We'll go tier by tier, grabbing the largest items first. + minTr, maxTr := q.getTransmitRange() + for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ { + free := int64(limit - bytesUsed - overhead) + if free <= 0 { + break // bail out early + } + + // Search for the least element on a given tier (by transmit count) as + // defined in the limitedBroadcast.Less function that will fit into our + // remaining space. + greaterOrEqual := &limitedBroadcast{ + transmits: transmits, + msgLen: free, + id: math.MaxInt64, + } + lessThan := &limitedBroadcast{ + transmits: transmits + 1, + msgLen: math.MaxInt64, + id: math.MaxInt64, + } + var keep *limitedBroadcast + q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool { + cur := item.(*limitedBroadcast) + // Check if this is within our limits + if int64(len(cur.b.Message())) > free { + // If this happens it's a bug in the datastructure or + // surrounding use doing something like having len(Message()) + // change over time. There's enough going on here that it's + // probably sane to just skip it and move on for now. + return true + } + keep = cur + return false + }) + if keep == nil { + // No more items of an appropriate size in the tier. + transmits++ continue } + msg := keep.b.Message() + // Add to slice to send bytesUsed += overhead + len(msg) toSend = append(toSend, msg) // Check if we should stop transmission - b.transmits++ - if b.transmits >= transmitLimit { - b.b.Finished() - n := len(q.bcQueue) - q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil - q.bcQueue = q.bcQueue[:n-1] + q.deleteItem(keep) + if keep.transmits+1 >= transmitLimit { + keep.b.Finished() + } else { + // We need to bump this item down to another transmit tier, but + // because it would be in the same direction that we're walking the + // tiers, we will have to delay the reinsertion until we are + // finished our search. Otherwise we'll possibly re-add the message + // when we ascend to the next tier. + keep.transmits++ + reinsert = append(reinsert, keep) } } - // If we are sending anything, we need to re-sort to deal - // with adjusted transmit counts - if len(toSend) > 0 { - q.bcQueue.Sort() + for _, cur := range reinsert { + q.addItem(cur) } + return toSend } // NumQueued returns the number of queued messages func (q *TransmitLimitedQueue) NumQueued() int { - q.Lock() - defer q.Unlock() - return len(q.bcQueue) + q.mu.Lock() + defer q.mu.Unlock() + return q.lenLocked() } -// Reset clears all the queued messages -func (q *TransmitLimitedQueue) Reset() { - q.Lock() - defer q.Unlock() - for _, b := range q.bcQueue { - b.b.Finished() +// lenLocked returns the length of the overall queue datastructure. You must +// hold the mutex. +func (q *TransmitLimitedQueue) lenLocked() int { + if q.tq == nil { + return 0 } - q.bcQueue = nil + return q.tq.Len() +} + +// Reset clears all the queued messages. Should only be used for tests. +func (q *TransmitLimitedQueue) Reset() { + q.mu.Lock() + defer q.mu.Unlock() + + q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool { + cur.b.Finished() + return true + }) + + q.tq = nil + q.tm = nil + q.idGen = 0 } // Prune will retain the maxRetain latest messages, and the rest // will be discarded. This can be used to prevent unbounded queue sizes func (q *TransmitLimitedQueue) Prune(maxRetain int) { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() // Do nothing if queue size is less than the limit - n := len(q.bcQueue) - if n < maxRetain { - return + for q.tq.Len() > maxRetain { + item := q.tq.Max() + if item == nil { + break + } + cur := item.(*limitedBroadcast) + cur.b.Finished() + q.deleteItem(cur) } - - // Invalidate the messages we will be removing - for i := 0; i < n-maxRetain; i++ { - q.bcQueue[i].b.Finished() - } - - // Move the messages, and retain only the last maxRetain - copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:]) - q.bcQueue = q.bcQueue[:maxRetain] -} - -func (b limitedBroadcasts) Len() int { - return len(b) -} - -func (b limitedBroadcasts) Less(i, j int) bool { - return b[i].transmits < b[j].transmits -} - -func (b limitedBroadcasts) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} - -func (b limitedBroadcasts) Sort() { - sort.Sort(sort.Reverse(b)) } diff --git a/vendor/github.com/hashicorp/memberlist/security.go b/vendor/github.com/hashicorp/memberlist/security.go index d90114eb0c..4cb4da36f0 100644 --- a/vendor/github.com/hashicorp/memberlist/security.go +++ b/vendor/github.com/hashicorp/memberlist/security.go @@ -106,7 +106,10 @@ func encryptPayload(vsn encryptionVersion, key []byte, msg []byte, data []byte, dst.WriteByte(byte(vsn)) // Add a random nonce - io.CopyN(dst, rand.Reader, nonceSize) + _, err = io.CopyN(dst, rand.Reader, nonceSize) + if err != nil { + return err + } afterNonce := dst.Len() // Ensure we are correctly padded (only version 0) diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index f51692de0a..5e4f7fdd70 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -6,32 +6,35 @@ import ( "math" "math/rand" "net" + "strings" "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" ) -type nodeStateType int +type NodeStateType int const ( - stateAlive nodeStateType = iota - stateSuspect - stateDead + StateAlive NodeStateType = iota + StateSuspect + StateDead + StateLeft ) // Node represents a node in the cluster. type Node struct { - Name string - Addr net.IP - Port uint16 - Meta []byte // Metadata from the delegate for this node. - PMin uint8 // Minimum protocol version this understands - PMax uint8 // Maximum protocol version this understands - PCur uint8 // Current version node is speaking - DMin uint8 // Min protocol version for the delegate to understand - DMax uint8 // Max protocol version for the delegate to understand - DCur uint8 // Current version delegate is speaking + Name string + Addr net.IP + Port uint16 + Meta []byte // Metadata from the delegate for this node. + State NodeStateType // State of the node. + PMin uint8 // Minimum protocol version this understands + PMax uint8 // Maximum protocol version this understands + PCur uint8 // Current version node is speaking + DMin uint8 // Min protocol version for the delegate to understand + DMax uint8 // Max protocol version for the delegate to understand + DCur uint8 // Current version delegate is speaking } // Address returns the host:port form of a node's address, suitable for use @@ -40,6 +43,15 @@ func (n *Node) Address() string { return joinHostPort(n.Addr.String(), n.Port) } +// FullAddress returns the node name and host:port form of a node's address, +// suitable for use with a transport. +func (n *Node) FullAddress() Address { + return Address{ + Addr: joinHostPort(n.Addr.String(), n.Port), + Name: n.Name, + } +} + // String returns the node name func (n *Node) String() string { return n.Name @@ -49,7 +61,7 @@ func (n *Node) String() string { type nodeState struct { Node Incarnation uint32 // Last known incarnation number - State nodeStateType // Current state + State NodeStateType // Current state StateChange time.Time // Time last state change happened } @@ -59,6 +71,16 @@ func (n *nodeState) Address() string { return n.Node.Address() } +// FullAddress returns the node name and host:port form of a node's address, +// suitable for use with a transport. +func (n *nodeState) FullAddress() Address { + return n.Node.FullAddress() +} + +func (n *nodeState) DeadOrLeft() bool { + return n.State == StateDead || n.State == StateLeft +} + // ackHandler is used to register handlers for incoming acks and nacks. type ackHandler struct { ackFn func([]byte, time.Time) @@ -217,7 +239,7 @@ START: node = *m.nodes[m.probeIndex] if node.Name == m.config.Name { skip = true - } else if node.State == stateDead { + } else if node.DeadOrLeft() { skip = true } @@ -233,6 +255,30 @@ START: m.probeNode(&node) } +// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests) +func (m *Memberlist) probeNodeByAddr(addr string) { + m.nodeLock.RLock() + n := m.nodeMap[addr] + m.nodeLock.RUnlock() + + m.probeNode(n) +} + +// failedRemote checks the error and decides if it indicates a failure on the +// other end. +func failedRemote(err error) bool { + switch t := err.(type) { + case *net.OpError: + if strings.HasPrefix(t.Net, "tcp") { + switch t.Op { + case "dial", "read", "write": + return true + } + } + } + return false +} + // probeNode handles a single round of failure checking on a node. func (m *Memberlist) probeNode(node *nodeState) { defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) @@ -246,7 +292,14 @@ func (m *Memberlist) probeNode(node *nodeState) { } // Prepare a ping message and setup an ack handler. - ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name} + selfAddr, selfPort := m.getAdvertise() + ping := ping{ + SeqNo: m.nextSeqNo(), + Node: node.Name, + SourceAddr: selfAddr, + SourcePort: selfPort, + SourceNode: m.config.Name, + } ackCh := make(chan ackMessage, m.config.IndirectChecks+1) nackCh := make(chan struct{}, m.config.IndirectChecks+1) m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) @@ -263,10 +316,20 @@ func (m *Memberlist) probeNode(node *nodeState) { // soon as possible. deadline := sent.Add(probeInterval) addr := node.Address() - if node.State == stateAlive { - if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { + + // Arrange for our self-awareness to get updated. + var awarenessDelta int + defer func() { + m.awareness.ApplyDelta(awarenessDelta) + }() + if node.State == StateAlive { + if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) - return + if failedRemote(err) { + goto HANDLE_REMOTE_FAILURE + } else { + return + } } } else { var msgs [][]byte @@ -285,9 +348,13 @@ func (m *Memberlist) probeNode(node *nodeState) { } compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) - return + if failedRemote(err) { + goto HANDLE_REMOTE_FAILURE + } else { + return + } } } @@ -296,10 +363,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // which will improve our health until we get to the failure scenarios // at the end of this function, which will alter this delta variable // accordingly. - awarenessDelta := -1 - defer func() { - m.awareness.ApplyDelta(awarenessDelta) - }() + awarenessDelta = -1 // Wait for response or round-trip-time. select { @@ -324,21 +388,31 @@ func (m *Memberlist) probeNode(node *nodeState) { // probe interval it will give the TCP fallback more time, which // is more active in dealing with lost packets, and it gives more // time to wait for indirect acks/nacks. - m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name) + m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name) } +HANDLE_REMOTE_FAILURE: // Get some random live nodes. m.nodeLock.RLock() kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool { return n.Name == m.config.Name || n.Name == node.Name || - n.State != stateAlive + n.State != StateAlive }) m.nodeLock.RUnlock() // Attempt an indirect ping. expectedNacks := 0 - ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name} + selfAddr, selfPort = m.getAdvertise() + ind := indirectPingReq{ + SeqNo: ping.SeqNo, + Target: node.Addr, + Port: node.Port, + Node: node.Name, + SourceAddr: selfAddr, + SourcePort: selfPort, + SourceNode: m.config.Name, + } for _, peer := range kNodes { // We only expect nack to be sent from peers who understand // version 4 of the protocol. @@ -346,7 +420,7 @@ func (m *Memberlist) probeNode(node *nodeState) { expectedNacks++ } - if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil { + if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) } } @@ -362,10 +436,13 @@ func (m *Memberlist) probeNode(node *nodeState) { // which protocol version we are speaking. That's why we've included a // config option to turn this off if desired. fallbackCh := make(chan bool, 1) - if (!m.config.DisableTcpPings) && (node.PMax >= 3) { + + disableTcpPings := m.config.DisableTcpPings || + (m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name)) + if (!disableTcpPings) && (node.PMax >= 3) { go func() { defer close(fallbackCh) - didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline) + didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline) if err != nil { m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) } else { @@ -422,12 +499,21 @@ func (m *Memberlist) probeNode(node *nodeState) { // Ping initiates a ping to the node with the specified name. func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { // Prepare a ping message and setup an ack handler. - ping := ping{SeqNo: m.nextSeqNo(), Node: node} + selfAddr, selfPort := m.getAdvertise() + ping := ping{ + SeqNo: m.nextSeqNo(), + Node: node, + SourceAddr: selfAddr, + SourcePort: selfPort, + SourceNode: m.config.Name, + } ackCh := make(chan ackMessage, m.config.IndirectChecks+1) m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) + a := Address{Addr: addr.String(), Name: node} + // Send a ping to the node. - if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil { + if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil { return 0, err } @@ -488,10 +574,10 @@ func (m *Memberlist) gossip() { } switch n.State { - case stateAlive, stateSuspect: + case StateAlive, StateSuspect: return false - case stateDead: + case StateDead: return time.Since(n.StateChange) > m.config.GossipToTheDeadTime default: @@ -516,13 +602,13 @@ func (m *Memberlist) gossip() { addr := node.Address() if len(msgs) == 1 { // Send single message as is - if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil { + if err := m.rawSendMsgPacket(node.FullAddress(), &node, msgs[0]); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } else { // Otherwise create and send a compound message compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { + if err := m.rawSendMsgPacket(node.FullAddress(), &node, compound.Bytes()); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } @@ -538,7 +624,7 @@ func (m *Memberlist) pushPull() { m.nodeLock.RLock() nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool { return n.Name == m.config.Name || - n.State != stateAlive + n.State != StateAlive }) m.nodeLock.RUnlock() @@ -549,17 +635,17 @@ func (m *Memberlist) pushPull() { node := nodes[0] // Attempt a push pull - if err := m.pushPullNode(node.Address(), false); err != nil { + if err := m.pushPullNode(node.FullAddress(), false); err != nil { m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) } } // pushPullNode does a complete state exchange with a specific node. -func (m *Memberlist) pushPullNode(addr string, join bool) error { +func (m *Memberlist) pushPullNode(a Address, join bool) error { defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) // Attempt to send and receive with the node - remote, userState, err := m.sendAndReceiveState(addr, join) + remote, userState, err := m.sendAndReceiveState(a, join) if err != nil { return err } @@ -596,7 +682,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error { for _, rn := range remote { // If the node isn't alive, then skip it - if rn.State != stateAlive { + if rn.State != StateAlive { continue } @@ -625,7 +711,7 @@ func (m *Memberlist) verifyProtocol(remote []pushNodeState) error { for _, n := range m.nodes { // Ignore non-alive nodes - if n.State != stateAlive { + if n.State != StateAlive { continue } @@ -841,11 +927,26 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { return } + if len(a.Vsn) >= 3 { + pMin := a.Vsn[0] + pMax := a.Vsn[1] + pCur := a.Vsn[2] + if pMin == 0 || pMax == 0 || pMin > pMax { + m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax) + return + } + } + // Invoke the Alive delegate if any. This can be used to filter out // alive messages based on custom logic. For example, using a cluster name. // Using a merge delegate is not enough, as it is possible for passive // cluster merging to still occur. if m.config.Alive != nil { + if len(a.Vsn) < 6 { + m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present", + a.Node, net.IP(a.Addr), a.Port) + return + } node := &Node{ Name: a.Node, Addr: a.Addr, @@ -867,7 +968,13 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // Check if we've never seen this node before, and if not, then // store this node in our node map. + var updatesNode bool if !ok { + errCon := m.config.IPAllowed(a.Addr) + if errCon != nil { + m.logger.Printf("[WARN] memberlist: Rejected node %s (%v): %s", a.Node, net.IP(a.Addr), errCon) + return + } state = &nodeState{ Node: Node{ Name: a.Node, @@ -875,7 +982,15 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { Port: a.Port, Meta: a.Meta, }, - State: stateDead, + State: StateDead, + } + if len(a.Vsn) > 5 { + state.PMin = a.Vsn[0] + state.PMax = a.Vsn[1] + state.PCur = a.Vsn[2] + state.DMin = a.Vsn[3] + state.DMax = a.Vsn[4] + state.DCur = a.Vsn[5] } // Add to map @@ -894,29 +1009,45 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // Update numNodes after we've added a new node atomic.AddUint32(&m.numNodes, 1) - } - - // Check if this address is different than the existing node - if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port { - m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d", - state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port) - - // Inform the conflict delegate if provided - if m.config.Conflict != nil { - other := Node{ - Name: a.Node, - Addr: a.Addr, - Port: a.Port, - Meta: a.Meta, + } else { + // Check if this address is different than the existing node unless the old node is dead. + if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port { + errCon := m.config.IPAllowed(a.Addr) + if errCon != nil { + m.logger.Printf("[WARN] memberlist: Rejected IP update from %v to %v for node %s: %s", a.Node, state.Addr, net.IP(a.Addr), errCon) + return + } + // If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died. + canReclaim := (m.config.DeadNodeReclaimTime > 0 && + time.Since(state.StateChange) > m.config.DeadNodeReclaimTime) + + // Allow the address to be updated if a dead node is being replaced. + if state.State == StateLeft || (state.State == StateDead && canReclaim) { + m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d", + state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port) + updatesNode = true + } else { + m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v", + state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State) + + // Inform the conflict delegate if provided + if m.config.Conflict != nil { + other := Node{ + Name: a.Node, + Addr: a.Addr, + Port: a.Port, + Meta: a.Meta, + } + m.config.Conflict.NotifyConflict(&state.Node, &other) + } + return } - m.config.Conflict.NotifyConflict(&state.Node, &other) } - return } // Bail if the incarnation number is older, and this is not about us isLocalNode := state.Name == m.config.Name - if a.Incarnation <= state.Incarnation && !isLocalNode { + if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode { return } @@ -956,9 +1087,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { bytes.Equal(a.Vsn, versions) { return } - m.refute(state, a.Incarnation) - m.logger.Printf("[WARN] memberlist: Refuting an alive message") + m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions) } else { m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) @@ -975,8 +1105,10 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // Update the state and incarnation number state.Incarnation = a.Incarnation state.Meta = a.Meta - if state.State != stateAlive { - state.State = stateAlive + state.Addr = a.Addr + state.Port = a.Port + if state.State != StateAlive { + state.State = StateAlive state.StateChange = time.Now() } } @@ -986,8 +1118,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { // Notify the delegate of any relevant updates if m.config.Events != nil { - if oldState == stateDead { - // if Dead -> Alive, notify of join + if oldState == StateDead || oldState == StateLeft { + // if Dead/Left -> Alive, notify of join m.config.Events.NotifyJoin(&state.Node) } else if !bytes.Equal(oldMeta, state.Meta) { @@ -1026,7 +1158,7 @@ func (m *Memberlist) suspectNode(s *suspect) { } // Ignore non-alive nodes - if state.State != stateAlive { + if state.State != StateAlive { return } @@ -1044,7 +1176,7 @@ func (m *Memberlist) suspectNode(s *suspect) { // Update the state state.Incarnation = s.Incarnation - state.State = stateSuspect + state.State = StateSuspect changeTime := time.Now() state.StateChange = changeTime @@ -1066,9 +1198,14 @@ func (m *Memberlist) suspectNode(s *suspect) { min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval) max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min fn := func(numConfirmations int) { + var d *dead + m.nodeLock.Lock() state, ok := m.nodeMap[s.Node] - timeout := ok && state.State == stateSuspect && state.StateChange == changeTime + timeout := ok && state.State == StateSuspect && state.StateChange == changeTime + if timeout { + d = &dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name} + } m.nodeLock.Unlock() if timeout { @@ -1078,8 +1215,8 @@ func (m *Memberlist) suspectNode(s *suspect) { m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", state.Name, numConfirmations) - d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name} - m.deadNode(&d) + + m.deadNode(d) } } m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn) @@ -1106,7 +1243,7 @@ func (m *Memberlist) deadNode(d *dead) { delete(m.nodeTimers, d.Node) // Ignore if node is already dead - if state.State == stateDead { + if state.DeadOrLeft() { return } @@ -1130,7 +1267,14 @@ func (m *Memberlist) deadNode(d *dead) { // Update the state state.Incarnation = d.Incarnation - state.State = stateDead + + // If the dead message was send by the node itself, mark it is left + // instead of dead. + if d.Node == d.From { + state.State = StateLeft + } else { + state.State = StateDead + } state.StateChange = time.Now() // Notify of death @@ -1144,7 +1288,7 @@ func (m *Memberlist) deadNode(d *dead) { func (m *Memberlist) mergeState(remote []pushNodeState) { for _, r := range remote { switch r.State { - case stateAlive: + case StateAlive: a := alive{ Incarnation: r.Incarnation, Node: r.Name, @@ -1155,11 +1299,14 @@ func (m *Memberlist) mergeState(remote []pushNodeState) { } m.aliveNode(&a, nil, false) - case stateDead: + case StateLeft: + d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name} + m.deadNode(&d) + case StateDead: // If the remote node believes a node is dead, we prefer to // suspect that node instead of declaring it dead instantly fallthrough - case stateSuspect: + case StateSuspect: s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name} m.suspectNode(&s) } diff --git a/vendor/github.com/hashicorp/memberlist/transport.go b/vendor/github.com/hashicorp/memberlist/transport.go index 6ce55ea47f..b23b83914b 100644 --- a/vendor/github.com/hashicorp/memberlist/transport.go +++ b/vendor/github.com/hashicorp/memberlist/transport.go @@ -1,6 +1,7 @@ package memberlist import ( + "fmt" "net" "time" ) @@ -63,3 +64,50 @@ type Transport interface { // transport a chance to clean up any listeners. Shutdown() error } + +type Address struct { + // Addr is a network address as a string, similar to Dial. This usually is + // in the form of "host:port". This is required. + Addr string + + // Name is the name of the node being addressed. This is optional but + // transports may require it. + Name string +} + +func (a *Address) String() string { + if a.Name != "" { + return fmt.Sprintf("%s (%s)", a.Name, a.Addr) + } + return a.Addr +} + +// IngestionAwareTransport is not used. +// +// Deprecated: IngestionAwareTransport is not used and may be removed in a future +// version. Define the interface locally instead of referencing this exported +// interface. +type IngestionAwareTransport interface { + IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error + IngestStream(conn net.Conn) error +} + +type NodeAwareTransport interface { + Transport + WriteToAddress(b []byte, addr Address) (time.Time, error) + DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) +} + +type shimNodeAwareTransport struct { + Transport +} + +var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil) + +func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) { + return t.WriteTo(b, addr.Addr) +} + +func (t *shimNodeAwareTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) { + return t.DialTimeout(addr.Addr, timeout) +} diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index e2381a6986..16a7d36d0b 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -78,10 +78,9 @@ func retransmitLimit(retransmitMult, n int) int { // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle func shuffleNodes(nodes []*nodeState) { n := len(nodes) - for i := n - 1; i > 0; i-- { - j := rand.Intn(i + 1) + rand.Shuffle(n, func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] - } + }) } // pushPushScale is used to scale the time interval at which push/pull @@ -103,7 +102,7 @@ func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int { numDead := 0 n := len(nodes) for i := 0; i < n-numDead; i++ { - if nodes[i].State != stateDead { + if nodes[i].State != StateDead { continue } @@ -120,35 +119,35 @@ func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int { return n - numDead } -// kRandomNodes is used to select up to k random nodes, excluding any nodes where -// the filter function returns true. It is possible that less than k nodes are +// kRandomNodes is used to select up to k random Nodes, excluding any nodes where +// the exclude function returns true. It is possible that less than k nodes are // returned. -func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState { +func kRandomNodes(k int, nodes []*nodeState, exclude func(*nodeState) bool) []Node { n := len(nodes) - kNodes := make([]*nodeState, 0, k) + kNodes := make([]Node, 0, k) OUTER: // Probe up to 3*n times, with large n this is not necessary // since k << n, but with small n we want search to be // exhaustive for i := 0; i < 3*n && len(kNodes) < k; i++ { - // Get random node + // Get random nodeState idx := randomOffset(n) - node := nodes[idx] + state := nodes[idx] // Give the filter a shot at it. - if filterFn != nil && filterFn(node) { + if exclude != nil && exclude(state) { continue OUTER } // Check if we have this node already for j := 0; j < len(kNodes); j++ { - if node == kNodes[j] { + if state.Node.Name == kNodes[j].Name { continue OUTER } } // Append the node - kNodes = append(kNodes, node) + kNodes = append(kNodes, state.Node) } return kNodes } @@ -186,18 +185,18 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) { err = fmt.Errorf("missing compound length byte") return } - numParts := uint8(buf[0]) + numParts := int(buf[0]) buf = buf[1:] // Check we have enough bytes - if len(buf) < int(numParts*2) { + if len(buf) < numParts*2 { err = fmt.Errorf("truncated len slice") return } // Decode the lengths lengths := make([]uint16, numParts) - for i := 0; i < int(numParts); i++ { + for i := 0; i < numParts; i++ { lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2]) } buf = buf[numParts*2:] @@ -205,7 +204,7 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) { // Split each message for idx, msgLen := range lengths { if len(buf) < int(msgLen) { - trunc = int(numParts) - idx + trunc = numParts - idx return }