remotes.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package remotes
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "github.com/docker/swarmkit/api"
  9. )
  10. var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")
  11. // DefaultObservationWeight provides a weight to use for positive observations
  12. // that will balance well under repeated observations.
  13. const DefaultObservationWeight = 10
  14. // Remotes keeps track of remote addresses by weight, informed by
  15. // observations.
  16. type Remotes interface {
  17. // Weight returns the remotes with their current weights.
  18. Weights() map[api.Peer]int
  19. // Select a remote from the set of available remotes with optionally
  20. // excluding ID or address.
  21. Select(...string) (api.Peer, error)
  22. // Observe records an experience with a particular remote. A positive weight
  23. // indicates a good experience and a negative weight a bad experience.
  24. //
  25. // The observation will be used to calculate a moving weight, which is
  26. // implementation dependent. This method will be called such that repeated
  27. // observations of the same master in each session request are favored.
  28. Observe(peer api.Peer, weight int)
  29. // ObserveIfExists records an experience with a particular remote if when a
  30. // remote exists.
  31. ObserveIfExists(peer api.Peer, weight int)
  32. // Remove the remote from the list completely.
  33. Remove(addrs ...api.Peer)
  34. }
  35. // NewRemotes returns a Remotes instance with the provided set of addresses.
  36. // Entries provided are heavily weighted initially.
  37. func NewRemotes(peers ...api.Peer) Remotes {
  38. mwr := &remotesWeightedRandom{
  39. remotes: make(map[api.Peer]int),
  40. }
  41. for _, peer := range peers {
  42. mwr.Observe(peer, DefaultObservationWeight)
  43. }
  44. return mwr
  45. }
  46. type remotesWeightedRandom struct {
  47. remotes map[api.Peer]int
  48. mu sync.Mutex
  49. // workspace to avoid reallocation. these get lazily allocated when
  50. // selecting values.
  51. cdf []float64
  52. peers []api.Peer
  53. }
  54. func (mwr *remotesWeightedRandom) Weights() map[api.Peer]int {
  55. mwr.mu.Lock()
  56. defer mwr.mu.Unlock()
  57. ms := make(map[api.Peer]int, len(mwr.remotes))
  58. for addr, weight := range mwr.remotes {
  59. ms[addr] = weight
  60. }
  61. return ms
  62. }
  63. func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) {
  64. mwr.mu.Lock()
  65. defer mwr.mu.Unlock()
  66. // NOTE(stevvooe): We then use a weighted random selection algorithm
  67. // (http://stackoverflow.com/questions/4463561/weighted-random-selection-from-array)
  68. // to choose the master to connect to.
  69. //
  70. // It is possible that this is insufficient. The following may inform a
  71. // better solution:
  72. // https://github.com/LK4D4/sample
  73. //
  74. // The first link applies exponential distribution weight choice reservoir
  75. // sampling. This may be relevant if we view the master selection as a
  76. // distributed reservoir sampling problem.
  77. // bias to zero-weighted remotes have same probability. otherwise, we
  78. // always select first entry when all are zero.
  79. const bias = 0.001
  80. // clear out workspace
  81. mwr.cdf = mwr.cdf[:0]
  82. mwr.peers = mwr.peers[:0]
  83. cum := 0.0
  84. // calculate CDF over weights
  85. Loop:
  86. for peer, weight := range mwr.remotes {
  87. for _, exclude := range excludes {
  88. if peer.NodeID == exclude || peer.Addr == exclude {
  89. // if this peer is excluded, ignore it by continuing the loop to label Loop
  90. continue Loop
  91. }
  92. }
  93. if weight < 0 {
  94. // treat these as zero, to keep there selection unlikely.
  95. weight = 0
  96. }
  97. cum += float64(weight) + bias
  98. mwr.cdf = append(mwr.cdf, cum)
  99. mwr.peers = append(mwr.peers, peer)
  100. }
  101. if len(mwr.peers) == 0 {
  102. return api.Peer{}, errRemotesUnavailable
  103. }
  104. r := mwr.cdf[len(mwr.cdf)-1] * rand.Float64()
  105. i := sort.SearchFloat64s(mwr.cdf, r)
  106. return mwr.peers[i], nil
  107. }
  108. func (mwr *remotesWeightedRandom) Observe(peer api.Peer, weight int) {
  109. mwr.mu.Lock()
  110. defer mwr.mu.Unlock()
  111. mwr.observe(peer, float64(weight))
  112. }
  113. func (mwr *remotesWeightedRandom) ObserveIfExists(peer api.Peer, weight int) {
  114. mwr.mu.Lock()
  115. defer mwr.mu.Unlock()
  116. if _, ok := mwr.remotes[peer]; !ok {
  117. return
  118. }
  119. mwr.observe(peer, float64(weight))
  120. }
  121. func (mwr *remotesWeightedRandom) Remove(addrs ...api.Peer) {
  122. mwr.mu.Lock()
  123. defer mwr.mu.Unlock()
  124. for _, addr := range addrs {
  125. delete(mwr.remotes, addr)
  126. }
  127. }
  128. const (
  129. // remoteWeightSmoothingFactor for exponential smoothing. This adjusts how
  130. // much of the // observation and old value we are using to calculate the new value.
  131. // See
  132. // https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
  133. // for details.
  134. remoteWeightSmoothingFactor = 0.5
  135. remoteWeightMax = 1 << 8
  136. )
  137. func clip(x float64) float64 {
  138. if math.IsNaN(x) {
  139. // treat garbage as such
  140. // acts like a no-op for us.
  141. return 0
  142. }
  143. return math.Max(math.Min(remoteWeightMax, x), -remoteWeightMax)
  144. }
  145. func (mwr *remotesWeightedRandom) observe(peer api.Peer, weight float64) {
  146. // While we have a decent, ad-hoc approach here to weight subsequent
  147. // observations, we may want to look into applying forward decay:
  148. //
  149. // http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
  150. //
  151. // We need to get better data from behavior in a cluster.
  152. // makes the math easier to read below
  153. var (
  154. w0 = float64(mwr.remotes[peer])
  155. w1 = clip(weight)
  156. )
  157. const α = remoteWeightSmoothingFactor
  158. // Multiply the new value to current value, and appy smoothing against the old
  159. // value.
  160. wn := clip(α*w1 + (1-α)*w0)
  161. mwr.remotes[peer] = int(math.Ceil(wn))
  162. }