host.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/filters"
  14. "github.com/docker/docker/client"
  15. "github.com/docker/docker/pkg/stdcopy"
  16. )
  17. const (
  18. defaultStackName = "integration-cli-on-swarm"
  19. defaultVolumeName = "integration-cli-on-swarm"
  20. defaultMasterImageName = "integration-cli-master"
  21. defaultWorkerImageName = "integration-cli-worker"
  22. )
  23. func main() {
  24. rc, err := xmain()
  25. if err != nil {
  26. logrus.Fatalf("fatal error: %v", err)
  27. }
  28. os.Exit(rc)
  29. }
  30. func xmain() (int, error) {
  31. // Should we use cobra maybe?
  32. replicas := flag.Int("replicas", 1, "Number of worker service replica")
  33. chunks := flag.Int("chunks", 0, "Number of test chunks executed in batch (0 == replicas)")
  34. pushWorkerImage := flag.String("push-worker-image", "", "Push the worker image to the registry. Required for distribuetd execution. (empty == not to push)")
  35. shuffle := flag.Bool("shuffle", false, "Shuffle the input so as to mitigate makespan nonuniformity")
  36. // flags below are rarely used
  37. randSeed := flag.Int64("rand-seed", int64(0), "Random seed used for shuffling (0 == curent time)")
  38. filtersFile := flag.String("filters-file", "", "Path to optional file composed of `-check.f` filter strings")
  39. dryRun := flag.Bool("dry-run", false, "Dry run")
  40. flag.Parse()
  41. if *chunks == 0 {
  42. *chunks = *replicas
  43. }
  44. if *randSeed == int64(0) {
  45. *randSeed = time.Now().UnixNano()
  46. }
  47. cli, err := client.NewEnvClient()
  48. if err != nil {
  49. return 1, err
  50. }
  51. if hasStack(cli, defaultStackName) {
  52. logrus.Infof("Removing stack %s", defaultStackName)
  53. removeStack(cli, defaultStackName)
  54. }
  55. if hasVolume(cli, defaultVolumeName) {
  56. logrus.Infof("Removing volume %s", defaultVolumeName)
  57. removeVolume(cli, defaultVolumeName)
  58. }
  59. if err = ensureImages(cli, []string{defaultWorkerImageName, defaultMasterImageName}); err != nil {
  60. return 1, err
  61. }
  62. workerImageForStack := defaultWorkerImageName
  63. if *pushWorkerImage != "" {
  64. logrus.Infof("Pushing %s to %s", defaultWorkerImageName, *pushWorkerImage)
  65. if err = pushImage(cli, *pushWorkerImage, defaultWorkerImageName); err != nil {
  66. return 1, err
  67. }
  68. workerImageForStack = *pushWorkerImage
  69. }
  70. compose, err := createCompose("", cli, composeOptions{
  71. Replicas: *replicas,
  72. Chunks: *chunks,
  73. MasterImage: defaultMasterImageName,
  74. WorkerImage: workerImageForStack,
  75. Volume: defaultVolumeName,
  76. Shuffle: *shuffle,
  77. RandSeed: *randSeed,
  78. DryRun: *dryRun,
  79. })
  80. if err != nil {
  81. return 1, err
  82. }
  83. filters, err := filtersBytes(*filtersFile)
  84. if err != nil {
  85. return 1, err
  86. }
  87. logrus.Infof("Creating volume %s with input data", defaultVolumeName)
  88. if err = createVolumeWithData(cli,
  89. defaultVolumeName,
  90. map[string][]byte{"/input": filters},
  91. defaultMasterImageName); err != nil {
  92. return 1, err
  93. }
  94. logrus.Infof("Deploying stack %s from %s", defaultStackName, compose)
  95. defer func() {
  96. logrus.Infof("NOTE: You may want to inspect or clean up following resources:")
  97. logrus.Infof(" - Stack: %s", defaultStackName)
  98. logrus.Infof(" - Volume: %s", defaultVolumeName)
  99. logrus.Infof(" - Compose file: %s", compose)
  100. logrus.Infof(" - Master image: %s", defaultMasterImageName)
  101. logrus.Infof(" - Worker image: %s", workerImageForStack)
  102. }()
  103. if err = deployStack(cli, defaultStackName, compose); err != nil {
  104. return 1, err
  105. }
  106. logrus.Infof("The log will be displayed here after some duration."+
  107. "You can watch the live status via `docker service logs %s_worker`",
  108. defaultStackName)
  109. masterContainerID, err := waitForMasterUp(cli, defaultStackName)
  110. if err != nil {
  111. return 1, err
  112. }
  113. rc, err := waitForContainerCompletion(cli, os.Stdout, os.Stderr, masterContainerID)
  114. if err != nil {
  115. return 1, err
  116. }
  117. logrus.Infof("Exit status: %d", rc)
  118. return int(rc), nil
  119. }
  120. func ensureImages(cli *client.Client, images []string) error {
  121. for _, image := range images {
  122. _, _, err := cli.ImageInspectWithRaw(context.Background(), image)
  123. if err != nil {
  124. return fmt.Errorf("could not find image %s, please run `make build-integration-cli-on-swarm`: %v",
  125. image, err)
  126. }
  127. }
  128. return nil
  129. }
  130. func filtersBytes(optionalFiltersFile string) ([]byte, error) {
  131. var b []byte
  132. if optionalFiltersFile == "" {
  133. tests, err := enumerateTests(".")
  134. if err != nil {
  135. return b, err
  136. }
  137. b = []byte(strings.Join(tests, "\n") + "\n")
  138. } else {
  139. var err error
  140. b, err = ioutil.ReadFile(optionalFiltersFile)
  141. if err != nil {
  142. return b, err
  143. }
  144. }
  145. return b, nil
  146. }
  147. func waitForMasterUp(cli *client.Client, stackName string) (string, error) {
  148. // FIXME(AkihiroSuda): it should retry until master is up, rather than pre-sleeping
  149. time.Sleep(10 * time.Second)
  150. fil := filters.NewArgs()
  151. fil.Add("label", "com.docker.stack.namespace="+stackName)
  152. // FIXME(AkihiroSuda): we should not rely on internal service naming convention
  153. fil.Add("label", "com.docker.swarm.service.name="+stackName+"_master")
  154. masters, err := cli.ContainerList(context.Background(), types.ContainerListOptions{
  155. All: true,
  156. Filters: fil,
  157. })
  158. if err != nil {
  159. return "", err
  160. }
  161. if len(masters) == 0 {
  162. return "", fmt.Errorf("master not running in stack %s?", stackName)
  163. }
  164. return masters[0].ID, nil
  165. }
  166. func waitForContainerCompletion(cli *client.Client, stdout, stderr io.Writer, containerID string) (int64, error) {
  167. stream, err := cli.ContainerLogs(context.Background(),
  168. containerID,
  169. types.ContainerLogsOptions{
  170. ShowStdout: true,
  171. ShowStderr: true,
  172. Follow: true,
  173. })
  174. if err != nil {
  175. return 1, err
  176. }
  177. stdcopy.StdCopy(stdout, stderr, stream)
  178. stream.Close()
  179. return cli.ContainerWait(context.Background(), containerID)
  180. }