123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package remotes
- import (
- "fmt"
- "math"
- "math/rand"
- "sort"
- "sync"
- "github.com/docker/swarmkit/api"
- )
- var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")
- // DefaultObservationWeight provides a weight to use for positive observations
- // that will balance well under repeated observations.
- const DefaultObservationWeight = 10
- // Remotes keeps track of remote addresses by weight, informed by
- // observations.
- type Remotes interface {
- // Weight returns the remotes with their current weights.
- Weights() map[api.Peer]int
- // Select a remote from the set of available remotes with optionally
- // excluding ID or address.
- Select(...string) (api.Peer, error)
- // Observe records an experience with a particular remote. A positive weight
- // indicates a good experience and a negative weight a bad experience.
- //
- // The observation will be used to calculate a moving weight, which is
- // implementation dependent. This method will be called such that repeated
- // observations of the same master in each session request are favored.
- Observe(peer api.Peer, weight int)
- // ObserveIfExists records an experience with a particular remote if when a
- // remote exists.
- ObserveIfExists(peer api.Peer, weight int)
- // Remove the remote from the list completely.
- Remove(addrs ...api.Peer)
- }
- // NewRemotes returns a Remotes instance with the provided set of addresses.
- // Entries provided are heavily weighted initially.
- func NewRemotes(peers ...api.Peer) Remotes {
- mwr := &remotesWeightedRandom{
- remotes: make(map[api.Peer]int),
- }
- for _, peer := range peers {
- mwr.Observe(peer, DefaultObservationWeight)
- }
- return mwr
- }
- type remotesWeightedRandom struct {
- remotes map[api.Peer]int
- mu sync.Mutex
- // workspace to avoid reallocation. these get lazily allocated when
- // selecting values.
- cdf []float64
- peers []api.Peer
- }
- func (mwr *remotesWeightedRandom) Weights() map[api.Peer]int {
- mwr.mu.Lock()
- defer mwr.mu.Unlock()
- ms := make(map[api.Peer]int, len(mwr.remotes))
- for addr, weight := range mwr.remotes {
- ms[addr] = weight
- }
- return ms
- }
- func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) {
- mwr.mu.Lock()
- defer mwr.mu.Unlock()
- // NOTE(stevvooe): We then use a weighted random selection algorithm
- // (http://stackoverflow.com/questions/4463561/weighted-random-selection-from-array)
- // to choose the master to connect to.
- //
- // It is possible that this is insufficient. The following may inform a
- // better solution:
- // https://github.com/LK4D4/sample
- //
- // The first link applies exponential distribution weight choice reservoir
- // sampling. This may be relevant if we view the master selection as a
- // distributed reservoir sampling problem.
- // bias to zero-weighted remotes have same probability. otherwise, we
- // always select first entry when all are zero.
- const bias = 0.001
- // clear out workspace
- mwr.cdf = mwr.cdf[:0]
- mwr.peers = mwr.peers[:0]
- cum := 0.0
- // calculate CDF over weights
- Loop:
- for peer, weight := range mwr.remotes {
- for _, exclude := range excludes {
- if peer.NodeID == exclude || peer.Addr == exclude {
- // if this peer is excluded, ignore it by continuing the loop to label Loop
- continue Loop
- }
- }
- if weight < 0 {
- // treat these as zero, to keep there selection unlikely.
- weight = 0
- }
- cum += float64(weight) + bias
- mwr.cdf = append(mwr.cdf, cum)
- mwr.peers = append(mwr.peers, peer)
- }
- if len(mwr.peers) == 0 {
- return api.Peer{}, errRemotesUnavailable
- }
- r := mwr.cdf[len(mwr.cdf)-1] * rand.Float64()
- i := sort.SearchFloat64s(mwr.cdf, r)
- return mwr.peers[i], nil
- }
- func (mwr *remotesWeightedRandom) Observe(peer api.Peer, weight int) {
- mwr.mu.Lock()
- defer mwr.mu.Unlock()
- mwr.observe(peer, float64(weight))
- }
- func (mwr *remotesWeightedRandom) ObserveIfExists(peer api.Peer, weight int) {
- mwr.mu.Lock()
- defer mwr.mu.Unlock()
- if _, ok := mwr.remotes[peer]; !ok {
- return
- }
- mwr.observe(peer, float64(weight))
- }
- func (mwr *remotesWeightedRandom) Remove(addrs ...api.Peer) {
- mwr.mu.Lock()
- defer mwr.mu.Unlock()
- for _, addr := range addrs {
- delete(mwr.remotes, addr)
- }
- }
- const (
- // remoteWeightSmoothingFactor for exponential smoothing. This adjusts how
- // much of the // observation and old value we are using to calculate the new value.
- // See
- // https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
- // for details.
- remoteWeightSmoothingFactor = 0.5
- remoteWeightMax = 1 << 8
- )
- func clip(x float64) float64 {
- if math.IsNaN(x) {
- // treat garbage as such
- // acts like a no-op for us.
- return 0
- }
- return math.Max(math.Min(remoteWeightMax, x), -remoteWeightMax)
- }
- func (mwr *remotesWeightedRandom) observe(peer api.Peer, weight float64) {
- // While we have a decent, ad-hoc approach here to weight subsequent
- // observations, we may want to look into applying forward decay:
- //
- // http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
- //
- // We need to get better data from behavior in a cluster.
- // makes the math easier to read below
- var (
- w0 = float64(mwr.remotes[peer])
- w1 = clip(weight)
- )
- const α = remoteWeightSmoothingFactor
- // Multiply the new value to current value, and appy smoothing against the old
- // value.
- wn := clip(α*w1 + (1-α)*w0)
- mwr.remotes[peer] = int(math.Ceil(wn))
- }
|