1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- package raft
- import (
- "time"
- "golang.org/x/net/context"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/manager/state"
- "github.com/docker/swarmkit/manager/state/store"
- grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- )
- // dial returns a grpc client connection
- func dial(addr string, protocol string, creds credentials.TransportCredentials, timeout time.Duration) (*grpc.ClientConn, error) {
- grpcOptions := []grpc.DialOption{
- grpc.WithBackoffMaxDelay(2 * time.Second),
- grpc.WithTransportCredentials(creds),
- grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
- grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
- }
- if timeout != 0 {
- grpcOptions = append(grpcOptions, grpc.WithTimeout(timeout))
- }
- return grpc.Dial(addr, grpcOptions...)
- }
- // Register registers the node raft server
- func Register(server *grpc.Server, node *Node) {
- api.RegisterRaftServer(server, node)
- api.RegisterRaftMembershipServer(server, node)
- }
- // WaitForLeader waits until node observe some leader in cluster. It returns
- // error if ctx was cancelled before leader appeared.
- func WaitForLeader(ctx context.Context, n *Node) error {
- _, err := n.Leader()
- if err == nil {
- return nil
- }
- ticker := time.NewTicker(50 * time.Millisecond)
- defer ticker.Stop()
- for err != nil {
- select {
- case <-ticker.C:
- case <-ctx.Done():
- return ctx.Err()
- }
- _, err = n.Leader()
- }
- return nil
- }
- // WaitForCluster waits until node observes that the cluster wide config is
- // committed to raft. This ensures that we can see and serve informations
- // related to the cluster.
- func WaitForCluster(ctx context.Context, n *Node) (cluster *api.Cluster, err error) {
- watch, cancel := state.Watch(n.MemoryStore().WatchQueue(), api.EventCreateCluster{})
- defer cancel()
- var clusters []*api.Cluster
- n.MemoryStore().View(func(readTx store.ReadTx) {
- clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
- })
- if err != nil {
- return nil, err
- }
- if len(clusters) == 1 {
- cluster = clusters[0]
- } else {
- select {
- case e := <-watch:
- cluster = e.(api.EventCreateCluster).Cluster
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- return cluster, nil
- }
|