server_help.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package zk
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "math/rand"
  7. "os"
  8. "path/filepath"
  9. "time"
  10. )
  11. type TestServer struct {
  12. Port int
  13. Path string
  14. Srv *Server
  15. }
  16. type TestCluster struct {
  17. Path string
  18. Servers []TestServer
  19. }
  20. func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
  21. tmpPath, err := ioutil.TempDir("", "gozk")
  22. if err != nil {
  23. return nil, err
  24. }
  25. success := false
  26. startPort := int(rand.Int31n(6000) + 10000)
  27. cluster := &TestCluster{Path: tmpPath}
  28. defer func() {
  29. if !success {
  30. cluster.Stop()
  31. }
  32. }()
  33. for serverN := 0; serverN < size; serverN++ {
  34. srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
  35. if err := os.Mkdir(srvPath, 0700); err != nil {
  36. return nil, err
  37. }
  38. port := startPort + serverN*3
  39. cfg := ServerConfig{
  40. ClientPort: port,
  41. DataDir: srvPath,
  42. }
  43. for i := 0; i < size; i++ {
  44. cfg.Servers = append(cfg.Servers, ServerConfigServer{
  45. ID: i + 1,
  46. Host: "127.0.0.1",
  47. PeerPort: startPort + i*3 + 1,
  48. LeaderElectionPort: startPort + i*3 + 2,
  49. })
  50. }
  51. cfgPath := filepath.Join(srvPath, "zoo.cfg")
  52. fi, err := os.Create(cfgPath)
  53. if err != nil {
  54. return nil, err
  55. }
  56. err = cfg.Marshall(fi)
  57. fi.Close()
  58. if err != nil {
  59. return nil, err
  60. }
  61. fi, err = os.Create(filepath.Join(srvPath, "myid"))
  62. if err != nil {
  63. return nil, err
  64. }
  65. _, err = fmt.Fprintf(fi, "%d\n", serverN+1)
  66. fi.Close()
  67. if err != nil {
  68. return nil, err
  69. }
  70. srv := &Server{
  71. ConfigPath: cfgPath,
  72. Stdout: stdout,
  73. Stderr: stderr,
  74. }
  75. if err := srv.Start(); err != nil {
  76. return nil, err
  77. }
  78. cluster.Servers = append(cluster.Servers, TestServer{
  79. Path: srvPath,
  80. Port: cfg.ClientPort,
  81. Srv: srv,
  82. })
  83. }
  84. success = true
  85. time.Sleep(time.Second) // Give the server time to become active. Should probably actually attempt to connect to verify.
  86. return cluster, nil
  87. }
  88. func (ts *TestCluster) Connect(idx int) (*Conn, error) {
  89. zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", ts.Servers[idx].Port)}, time.Second*15)
  90. return zk, err
  91. }
  92. func (ts *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
  93. return ts.ConnectAllTimeout(time.Second * 15)
  94. }
  95. func (ts *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
  96. hosts := make([]string, len(ts.Servers))
  97. for i, srv := range ts.Servers {
  98. hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
  99. }
  100. zk, ch, err := Connect(hosts, sessionTimeout)
  101. return zk, ch, err
  102. }
  103. func (ts *TestCluster) Stop() error {
  104. for _, srv := range ts.Servers {
  105. srv.Srv.Stop()
  106. }
  107. defer os.RemoveAll(ts.Path)
  108. return nil
  109. }