event_delegate.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package networkdb
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net"
  6. "github.com/containerd/log"
  7. "github.com/hashicorp/memberlist"
  8. )
  9. type eventDelegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
  13. value, err := json.Marshal(&NodeAddr{addr})
  14. if err == nil {
  15. e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
  16. } else {
  17. log.G(context.TODO()).Errorf("Error marshalling node broadcast event %s", addr.String())
  18. }
  19. }
  20. func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
  21. log.G(context.TODO()).Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
  22. e.broadcastNodeEvent(mn.Addr, opCreate)
  23. e.nDB.Lock()
  24. defer e.nDB.Unlock()
  25. // In case the node is rejoining after a failure or leave,
  26. // just add the node back to active
  27. if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
  28. return
  29. }
  30. // Every node has a unique ID
  31. // Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
  32. // failed or shutdown one
  33. e.nDB.purgeReincarnation(mn)
  34. e.nDB.nodes[mn.Name] = &node{Node: *mn}
  35. log.G(context.TODO()).Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
  36. }
  37. func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
  38. log.G(context.TODO()).Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
  39. e.broadcastNodeEvent(mn.Addr, opDelete)
  40. e.nDB.Lock()
  41. defer e.nDB.Unlock()
  42. n, currState, _ := e.nDB.findNode(mn.Name)
  43. if n == nil {
  44. log.G(context.TODO()).Errorf("Node %s/%s not found in the node lists", mn.Name, mn.Addr)
  45. return
  46. }
  47. // if the node was active means that did not send the leave cluster message, so it's probable that
  48. // failed. Else would be already in the left list so nothing else has to be done
  49. if currState == nodeActiveState {
  50. moved, err := e.nDB.changeNodeState(mn.Name, nodeFailedState)
  51. if err != nil {
  52. log.G(context.TODO()).WithError(err).Errorf("impossible condition, node %s/%s not present in the list", mn.Name, mn.Addr)
  53. return
  54. }
  55. if moved {
  56. log.G(context.TODO()).Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
  57. }
  58. }
  59. }
  60. func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
  61. }