util.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package raft
  2. import (
  3. "time"
  4. "golang.org/x/net/context"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/manager/state"
  7. "github.com/docker/swarmkit/manager/state/store"
  8. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials"
  11. )
  12. // dial returns a grpc client connection
  13. func dial(addr string, protocol string, creds credentials.TransportCredentials, timeout time.Duration) (*grpc.ClientConn, error) {
  14. grpcOptions := []grpc.DialOption{
  15. grpc.WithBackoffMaxDelay(2 * time.Second),
  16. grpc.WithTransportCredentials(creds),
  17. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  18. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  19. }
  20. if timeout != 0 {
  21. grpcOptions = append(grpcOptions, grpc.WithTimeout(timeout))
  22. }
  23. return grpc.Dial(addr, grpcOptions...)
  24. }
  25. // Register registers the node raft server
  26. func Register(server *grpc.Server, node *Node) {
  27. api.RegisterRaftServer(server, node)
  28. api.RegisterRaftMembershipServer(server, node)
  29. }
  30. // WaitForLeader waits until node observe some leader in cluster. It returns
  31. // error if ctx was cancelled before leader appeared.
  32. func WaitForLeader(ctx context.Context, n *Node) error {
  33. _, err := n.Leader()
  34. if err == nil {
  35. return nil
  36. }
  37. ticker := time.NewTicker(50 * time.Millisecond)
  38. defer ticker.Stop()
  39. for err != nil {
  40. select {
  41. case <-ticker.C:
  42. case <-ctx.Done():
  43. return ctx.Err()
  44. }
  45. _, err = n.Leader()
  46. }
  47. return nil
  48. }
  49. // WaitForCluster waits until node observes that the cluster wide config is
  50. // committed to raft. This ensures that we can see and serve informations
  51. // related to the cluster.
  52. func WaitForCluster(ctx context.Context, n *Node) (cluster *api.Cluster, err error) {
  53. watch, cancel := state.Watch(n.MemoryStore().WatchQueue(), api.EventCreateCluster{})
  54. defer cancel()
  55. var clusters []*api.Cluster
  56. n.MemoryStore().View(func(readTx store.ReadTx) {
  57. clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  58. })
  59. if err != nil {
  60. return nil, err
  61. }
  62. if len(clusters) == 1 {
  63. cluster = clusters[0]
  64. } else {
  65. select {
  66. case e := <-watch:
  67. cluster = e.(api.EventCreateCluster).Cluster
  68. case <-ctx.Done():
  69. return nil, ctx.Err()
  70. }
  71. }
  72. return cluster, nil
  73. }