host.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/docker/docker/api/types"
  12. "github.com/docker/docker/api/types/filters"
  13. "github.com/docker/docker/client"
  14. "github.com/docker/docker/pkg/stdcopy"
  15. "github.com/sirupsen/logrus"
  16. )
  17. const (
  18. defaultStackName = "integration-cli-on-swarm"
  19. defaultVolumeName = "integration-cli-on-swarm"
  20. defaultMasterImageName = "integration-cli-master"
  21. defaultWorkerImageName = "integration-cli-worker"
  22. )
  23. func main() {
  24. rc, err := xmain()
  25. if err != nil {
  26. logrus.Fatalf("fatal error: %v", err)
  27. }
  28. os.Exit(rc)
  29. }
  30. func xmain() (int, error) {
  31. // Should we use cobra maybe?
  32. replicas := flag.Int("replicas", 1, "Number of worker service replica")
  33. chunks := flag.Int("chunks", 0, "Number of test chunks executed in batch (0 == replicas)")
  34. pushWorkerImage := flag.String("push-worker-image", "", "Push the worker image to the registry. Required for distributed execution. (empty == not to push)")
  35. shuffle := flag.Bool("shuffle", false, "Shuffle the input so as to mitigate makespan nonuniformity")
  36. // flags below are rarely used
  37. randSeed := flag.Int64("rand-seed", int64(0), "Random seed used for shuffling (0 == current time)")
  38. filtersFile := flag.String("filters-file", "", "Path to optional file composed of `-check.f` filter strings")
  39. dryRun := flag.Bool("dry-run", false, "Dry run")
  40. keepExecutor := flag.Bool("keep-executor", false, "Do not auto-remove executor containers, which is used for running privileged programs on Swarm")
  41. flag.Parse()
  42. if *chunks == 0 {
  43. *chunks = *replicas
  44. }
  45. if *randSeed == int64(0) {
  46. *randSeed = time.Now().UnixNano()
  47. }
  48. cli, err := client.NewEnvClient()
  49. if err != nil {
  50. return 1, err
  51. }
  52. if hasStack(cli, defaultStackName) {
  53. logrus.Infof("Removing stack %s", defaultStackName)
  54. removeStack(cli, defaultStackName)
  55. }
  56. if hasVolume(cli, defaultVolumeName) {
  57. logrus.Infof("Removing volume %s", defaultVolumeName)
  58. removeVolume(cli, defaultVolumeName)
  59. }
  60. if err = ensureImages(cli, []string{defaultWorkerImageName, defaultMasterImageName}); err != nil {
  61. return 1, err
  62. }
  63. workerImageForStack := defaultWorkerImageName
  64. if *pushWorkerImage != "" {
  65. logrus.Infof("Pushing %s to %s", defaultWorkerImageName, *pushWorkerImage)
  66. if err = pushImage(cli, *pushWorkerImage, defaultWorkerImageName); err != nil {
  67. return 1, err
  68. }
  69. workerImageForStack = *pushWorkerImage
  70. }
  71. compose, err := createCompose("", cli, composeOptions{
  72. Replicas: *replicas,
  73. Chunks: *chunks,
  74. MasterImage: defaultMasterImageName,
  75. WorkerImage: workerImageForStack,
  76. Volume: defaultVolumeName,
  77. Shuffle: *shuffle,
  78. RandSeed: *randSeed,
  79. DryRun: *dryRun,
  80. KeepExecutor: *keepExecutor,
  81. })
  82. if err != nil {
  83. return 1, err
  84. }
  85. filters, err := filtersBytes(*filtersFile)
  86. if err != nil {
  87. return 1, err
  88. }
  89. logrus.Infof("Creating volume %s with input data", defaultVolumeName)
  90. if err = createVolumeWithData(cli,
  91. defaultVolumeName,
  92. map[string][]byte{"/input": filters},
  93. defaultMasterImageName); err != nil {
  94. return 1, err
  95. }
  96. logrus.Infof("Deploying stack %s from %s", defaultStackName, compose)
  97. defer func() {
  98. logrus.Infof("NOTE: You may want to inspect or clean up following resources:")
  99. logrus.Infof(" - Stack: %s", defaultStackName)
  100. logrus.Infof(" - Volume: %s", defaultVolumeName)
  101. logrus.Infof(" - Compose file: %s", compose)
  102. logrus.Infof(" - Master image: %s", defaultMasterImageName)
  103. logrus.Infof(" - Worker image: %s", workerImageForStack)
  104. }()
  105. if err = deployStack(cli, defaultStackName, compose); err != nil {
  106. return 1, err
  107. }
  108. logrus.Infof("The log will be displayed here after some duration."+
  109. "You can watch the live status via `docker service logs %s_worker`",
  110. defaultStackName)
  111. masterContainerID, err := waitForMasterUp(cli, defaultStackName)
  112. if err != nil {
  113. return 1, err
  114. }
  115. rc, err := waitForContainerCompletion(cli, os.Stdout, os.Stderr, masterContainerID)
  116. if err != nil {
  117. return 1, err
  118. }
  119. logrus.Infof("Exit status: %d", rc)
  120. return int(rc), nil
  121. }
  122. func ensureImages(cli *client.Client, images []string) error {
  123. for _, image := range images {
  124. _, _, err := cli.ImageInspectWithRaw(context.Background(), image)
  125. if err != nil {
  126. return fmt.Errorf("could not find image %s, please run `make build-integration-cli-on-swarm`: %v",
  127. image, err)
  128. }
  129. }
  130. return nil
  131. }
  132. func filtersBytes(optionalFiltersFile string) ([]byte, error) {
  133. var b []byte
  134. if optionalFiltersFile == "" {
  135. tests, err := enumerateTests(".")
  136. if err != nil {
  137. return b, err
  138. }
  139. b = []byte(strings.Join(tests, "\n") + "\n")
  140. } else {
  141. var err error
  142. b, err = ioutil.ReadFile(optionalFiltersFile)
  143. if err != nil {
  144. return b, err
  145. }
  146. }
  147. return b, nil
  148. }
  149. func waitForMasterUp(cli *client.Client, stackName string) (string, error) {
  150. // FIXME(AkihiroSuda): it should retry until master is up, rather than pre-sleeping
  151. time.Sleep(10 * time.Second)
  152. fil := filters.NewArgs()
  153. fil.Add("label", "com.docker.stack.namespace="+stackName)
  154. // FIXME(AkihiroSuda): we should not rely on internal service naming convention
  155. fil.Add("label", "com.docker.swarm.service.name="+stackName+"_master")
  156. masters, err := cli.ContainerList(context.Background(), types.ContainerListOptions{
  157. All: true,
  158. Filters: fil,
  159. })
  160. if err != nil {
  161. return "", err
  162. }
  163. if len(masters) == 0 {
  164. return "", fmt.Errorf("master not running in stack %s?", stackName)
  165. }
  166. return masters[0].ID, nil
  167. }
  168. func waitForContainerCompletion(cli *client.Client, stdout, stderr io.Writer, containerID string) (int64, error) {
  169. stream, err := cli.ContainerLogs(context.Background(),
  170. containerID,
  171. types.ContainerLogsOptions{
  172. ShowStdout: true,
  173. ShowStderr: true,
  174. Follow: true,
  175. })
  176. if err != nil {
  177. return 1, err
  178. }
  179. stdcopy.StdCopy(stdout, stderr, stream)
  180. stream.Close()
  181. resultC, errC := cli.ContainerWait(context.Background(), containerID, "")
  182. select {
  183. case err := <-errC:
  184. return 1, err
  185. case result := <-resultC:
  186. return result.StatusCode, nil
  187. }
  188. }