server.go 57 KB


  1. package docker
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/dotcloud/docker/archive"
  7. "github.com/dotcloud/docker/auth"
  8. "github.com/dotcloud/docker/cgroups"
  9. "github.com/dotcloud/docker/engine"
  10. "github.com/dotcloud/docker/pkg/graphdb"
  11. "github.com/dotcloud/docker/registry"
  12. "github.com/dotcloud/docker/utils"
  13. "io"
  14. "io/ioutil"
  15. "log"
  16. "net/http"
  17. "net/url"
  18. "os"
  19. "os/exec"
  20. "os/signal"
  21. "path"
  22. "path/filepath"
  23. "runtime"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "syscall"
  28. "time"
  29. )
  30. func (srv *Server) Close() error {
  31. return srv.runtime.Close()
  32. }
  33. func init() {
  34. engine.Register("initapi", jobInitApi)
  35. }
  36. // jobInitApi runs the remote api server `srv` as a daemon,
  37. // Only one api server can run at the same time - this is enforced by a pidfile.
  38. // The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
  39. func jobInitApi(job *engine.Job) engine.Status {
  40. job.Logf("Creating server")
  41. // FIXME: ImportEnv deprecates ConfigFromJob
  42. srv, err := NewServer(job.Eng, ConfigFromJob(job))
  43. if err != nil {
  44. job.Error(err)
  45. return engine.StatusErr
  46. }
  47. if srv.runtime.config.Pidfile != "" {
  48. job.Logf("Creating pidfile")
  49. if err := utils.CreatePidFile(srv.runtime.config.Pidfile); err != nil {
  50. // FIXME: do we need fatal here instead of returning a job error?
  51. log.Fatal(err)
  52. }
  53. }
  54. job.Logf("Setting up signal traps")
  55. c := make(chan os.Signal, 1)
  56. signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
  57. go func() {
  58. sig := <-c
  59. log.Printf("Received signal '%v', exiting\n", sig)
  60. utils.RemovePidFile(srv.runtime.config.Pidfile)
  61. srv.Close()
  62. os.Exit(0)
  63. }()
  64. job.Eng.Hack_SetGlobalVar("httpapi.server", srv)
  65. job.Eng.Hack_SetGlobalVar("httpapi.runtime", srv.runtime)
  66. // https://github.com/dotcloud/docker/issues/2768
  67. if srv.runtime.networkManager.bridgeNetwork != nil {
  68. job.Eng.Hack_SetGlobalVar("httpapi.bridgeIP", srv.runtime.networkManager.bridgeNetwork.IP)
  69. }
  70. if err := job.Eng.Register("export", srv.ContainerExport); err != nil {
  71. job.Error(err)
  72. return engine.StatusErr
  73. }
  74. if err := job.Eng.Register("create", srv.ContainerCreate); err != nil {
  75. job.Error(err)
  76. return engine.StatusErr
  77. }
  78. if err := job.Eng.Register("stop", srv.ContainerStop); err != nil {
  79. job.Error(err)
  80. return engine.StatusErr
  81. }
  82. if err := job.Eng.Register("start", srv.ContainerStart); err != nil {
  83. job.Error(err)
  84. return engine.StatusErr
  85. }
  86. if err := job.Eng.Register("kill", srv.ContainerKill); err != nil {
  87. job.Error(err)
  88. return engine.StatusErr
  89. }
  90. if err := job.Eng.Register("serveapi", srv.ListenAndServe); err != nil {
  91. job.Error(err)
  92. return engine.StatusErr
  93. }
  94. if err := job.Eng.Register("wait", srv.ContainerWait); err != nil {
  95. job.Error(err)
  96. return engine.StatusErr
  97. }
  98. if err := job.Eng.Register("tag", srv.ImageTag); err != nil {
  99. job.Error(err)
  100. return engine.StatusErr
  101. }
  102. if err := job.Eng.Register("resize", srv.ContainerResize); err != nil {
  103. job.Error(err)
  104. return engine.StatusErr
  105. }
  106. if err := job.Eng.Register("commit", srv.ContainerCommit); err != nil {
  107. job.Error(err)
  108. return engine.StatusErr
  109. }
  110. if err := job.Eng.Register("info", srv.DockerInfo); err != nil {
  111. job.Error(err)
  112. return engine.StatusErr
  113. }
  114. return engine.StatusOK
  115. }
  116. func (srv *Server) ListenAndServe(job *engine.Job) engine.Status {
  117. protoAddrs := job.Args
  118. chErrors := make(chan error, len(protoAddrs))
  119. for _, protoAddr := range protoAddrs {
  120. protoAddrParts := strings.SplitN(protoAddr, "://", 2)
  121. switch protoAddrParts[0] {
  122. case "unix":
  123. if err := syscall.Unlink(protoAddrParts[1]); err != nil && !os.IsNotExist(err) {
  124. log.Fatal(err)
  125. }
  126. case "tcp":
  127. if !strings.HasPrefix(protoAddrParts[1], "127.0.0.1") {
  128. log.Println("/!\\ DON'T BIND ON ANOTHER IP ADDRESS THAN 127.0.0.1 IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
  129. }
  130. default:
  131. job.Errorf("Invalid protocol format.")
  132. return engine.StatusErr
  133. }
  134. go func() {
  135. // FIXME: merge Server.ListenAndServe with ListenAndServe
  136. chErrors <- ListenAndServe(protoAddrParts[0], protoAddrParts[1], srv, job.GetenvBool("Logging"))
  137. }()
  138. }
  139. for i := 0; i < len(protoAddrs); i += 1 {
  140. err := <-chErrors
  141. if err != nil {
  142. job.Error(err)
  143. return engine.StatusErr
  144. }
  145. }
  146. return engine.StatusOK
  147. }
  148. // simpleVersionInfo is a simple implementation of
  149. // the interface VersionInfo, which is used
  150. // to provide version information for some product,
  151. // component, etc. It stores the product name and the version
  152. // in string and returns them on calls to Name() and Version().
  153. type simpleVersionInfo struct {
  154. name string
  155. version string
  156. }
  157. func (v *simpleVersionInfo) Name() string {
  158. return v.name
  159. }
  160. func (v *simpleVersionInfo) Version() string {
  161. return v.version
  162. }
  163. // ContainerKill send signal to the container
  164. // If no signal is given (sig 0), then Kill with SIGKILL and wait
  165. // for the container to exit.
  166. // If a signal is given, then just send it to the container and return.
  167. func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
  168. if n := len(job.Args); n < 1 || n > 2 {
  169. job.Errorf("Usage: %s CONTAINER [SIGNAL]", job.Name)
  170. return engine.StatusErr
  171. }
  172. name := job.Args[0]
  173. var sig uint64
  174. if len(job.Args) == 2 && job.Args[1] != "" {
  175. var err error
  176. // The largest legal signal is 31, so let's parse on 5 bits
  177. sig, err = strconv.ParseUint(job.Args[1], 10, 5)
  178. if err != nil {
  179. job.Errorf("Invalid signal: %s", job.Args[1])
  180. return engine.StatusErr
  181. }
  182. }
  183. if container := srv.runtime.Get(name); container != nil {
  184. // If no signal is passed, perform regular Kill (SIGKILL + wait())
  185. if sig == 0 {
  186. if err := container.Kill(); err != nil {
  187. job.Errorf("Cannot kill container %s: %s", name, err)
  188. return engine.StatusErr
  189. }
  190. srv.LogEvent("kill", container.ID, srv.runtime.repositories.ImageName(container.Image))
  191. } else {
  192. // Otherwise, just send the requested signal
  193. if err := container.kill(int(sig)); err != nil {
  194. job.Errorf("Cannot kill container %s: %s", name, err)
  195. return engine.StatusErr
  196. }
  197. // FIXME: Add event for signals
  198. }
  199. } else {
  200. job.Errorf("No such container: %s", name)
  201. return engine.StatusErr
  202. }
  203. return engine.StatusOK
  204. }
  205. func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
  206. if len(job.Args) != 1 {
  207. job.Errorf("Usage: %s container_id", job.Name)
  208. return engine.StatusErr
  209. }
  210. name := job.Args[0]
  211. if container := srv.runtime.Get(name); container != nil {
  212. data, err := container.Export()
  213. if err != nil {
  214. job.Errorf("%s: %s", name, err)
  215. return engine.StatusErr
  216. }
  217. // Stream the entire contents of the container (basically a volatile snapshot)
  218. if _, err := io.Copy(job.Stdout, data); err != nil {
  219. job.Errorf("%s: %s", name, err)
  220. return engine.StatusErr
  221. }
  222. // FIXME: factor job-specific LogEvent to engine.Job.Run()
  223. srv.LogEvent("export", container.ID, srv.runtime.repositories.ImageName(container.Image))
  224. return engine.StatusOK
  225. }
  226. job.Errorf("No such container: %s", name)
  227. return engine.StatusErr
  228. }
  229. // ImageExport exports all images with the given tag. All versions
  230. // containing the same tag are exported. The resulting output is an
  231. // uncompressed tar ball.
  232. // name is the set of tags to export.
  233. // out is the writer where the images are written to.
  234. func (srv *Server) ImageExport(name string, out io.Writer) error {
  235. // get image json
  236. tempdir, err := ioutil.TempDir("", "docker-export-")
  237. if err != nil {
  238. return err
  239. }
  240. defer os.RemoveAll(tempdir)
  241. utils.Debugf("Serializing %s", name)
  242. rootRepo, err := srv.runtime.repositories.Get(name)
  243. if err != nil {
  244. return err
  245. }
  246. if rootRepo != nil {
  247. for _, id := range rootRepo {
  248. image, err := srv.ImageInspect(id)
  249. if err != nil {
  250. return err
  251. }
  252. if err := srv.exportImage(image, tempdir); err != nil {
  253. return err
  254. }
  255. }
  256. // write repositories
  257. rootRepoMap := map[string]Repository{}
  258. rootRepoMap[name] = rootRepo
  259. rootRepoJson, _ := json.Marshal(rootRepoMap)
  260. if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.ModeAppend); err != nil {
  261. return err
  262. }
  263. } else {
  264. image, err := srv.ImageInspect(name)
  265. if err != nil {
  266. return err
  267. }
  268. if err := srv.exportImage(image, tempdir); err != nil {
  269. return err
  270. }
  271. }
  272. fs, err := archive.Tar(tempdir, archive.Uncompressed)
  273. if err != nil {
  274. return err
  275. }
  276. if _, err := io.Copy(out, fs); err != nil {
  277. return err
  278. }
  279. return nil
  280. }
  281. func (srv *Server) exportImage(image *Image, tempdir string) error {
  282. for i := image; i != nil; {
  283. // temporary directory
  284. tmpImageDir := path.Join(tempdir, i.ID)
  285. if err := os.Mkdir(tmpImageDir, os.ModeDir); err != nil {
  286. if os.IsExist(err) {
  287. return nil
  288. }
  289. return err
  290. }
  291. var version = "1.0"
  292. var versionBuf = []byte(version)
  293. if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.ModeAppend); err != nil {
  294. return err
  295. }
  296. // serialize json
  297. b, err := json.Marshal(i)
  298. if err != nil {
  299. return err
  300. }
  301. if err := ioutil.WriteFile(path.Join(tmpImageDir, "json"), b, os.ModeAppend); err != nil {
  302. return err
  303. }
  304. // serialize filesystem
  305. fs, err := i.TarLayer()
  306. if err != nil {
  307. return err
  308. }
  309. fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar"))
  310. if err != nil {
  311. return err
  312. }
  313. if _, err = io.Copy(fsTar, fs); err != nil {
  314. return err
  315. }
  316. fsTar.Close()
  317. // find parent
  318. if i.Parent != "" {
  319. i, err = srv.ImageInspect(i.Parent)
  320. if err != nil {
  321. return err
  322. }
  323. } else {
  324. i = nil
  325. }
  326. }
  327. return nil
  328. }
  329. // Loads a set of images into the repository. This is the complementary of ImageExport.
  330. // The input stream is an uncompressed tar ball containing images and metadata.
  331. func (srv *Server) ImageLoad(in io.Reader) error {
  332. tmpImageDir, err := ioutil.TempDir("", "docker-import-")
  333. if err != nil {
  334. return err
  335. }
  336. defer os.RemoveAll(tmpImageDir)
  337. var (
  338. repoTarFile = path.Join(tmpImageDir, "repo.tar")
  339. repoDir = path.Join(tmpImageDir, "repo")
  340. )
  341. tarFile, err := os.Create(repoTarFile)
  342. if err != nil {
  343. return err
  344. }
  345. if _, err := io.Copy(tarFile, in); err != nil {
  346. return err
  347. }
  348. tarFile.Close()
  349. repoFile, err := os.Open(repoTarFile)
  350. if err != nil {
  351. return err
  352. }
  353. if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
  354. return err
  355. }
  356. if err := archive.Untar(repoFile, repoDir, nil); err != nil {
  357. return err
  358. }
  359. dirs, err := ioutil.ReadDir(repoDir)
  360. if err != nil {
  361. return err
  362. }
  363. for _, d := range dirs {
  364. if d.IsDir() {
  365. if err := srv.recursiveLoad(d.Name(), tmpImageDir); err != nil {
  366. return err
  367. }
  368. }
  369. }
  370. repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
  371. if err == nil {
  372. repositories := map[string]Repository{}
  373. if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
  374. return err
  375. }
  376. for imageName, tagMap := range repositories {
  377. for tag, address := range tagMap {
  378. if err := srv.runtime.repositories.Set(imageName, tag, address, true); err != nil {
  379. return err
  380. }
  381. }
  382. }
  383. } else if !os.IsNotExist(err) {
  384. return err
  385. }
  386. return nil
  387. }
  388. func (srv *Server) recursiveLoad(address, tmpImageDir string) error {
  389. if _, err := srv.ImageInspect(address); err != nil {
  390. utils.Debugf("Loading %s", address)
  391. imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
  392. if err != nil {
  393. utils.Debugf("Error reading json", err)
  394. return err
  395. }
  396. layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
  397. if err != nil {
  398. utils.Debugf("Error reading embedded tar", err)
  399. return err
  400. }
  401. img, err := NewImgJSON(imageJson)
  402. if err != nil {
  403. utils.Debugf("Error unmarshalling json", err)
  404. return err
  405. }
  406. if img.Parent != "" {
  407. if !srv.runtime.graph.Exists(img.Parent) {
  408. if err := srv.recursiveLoad(img.Parent, tmpImageDir); err != nil {
  409. return err
  410. }
  411. }
  412. }
  413. if err := srv.runtime.graph.Register(imageJson, layer, img); err != nil {
  414. return err
  415. }
  416. }
  417. utils.Debugf("Completed processing %s", address)
  418. return nil
  419. }
  420. func (srv *Server) ImagesSearch(term string) ([]registry.SearchResult, error) {
  421. r, err := registry.NewRegistry(nil, srv.HTTPRequestFactory(nil), auth.IndexServerAddress())
  422. if err != nil {
  423. return nil, err
  424. }
  425. results, err := r.SearchRepositories(term)
  426. if err != nil {
  427. return nil, err
  428. }
  429. return results.Results, nil
  430. }
  431. func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.StreamFormatter) error {
  432. out = utils.NewWriteFlusher(out)
  433. img, err := srv.runtime.repositories.LookupImage(name)
  434. if err != nil {
  435. return err
  436. }
  437. file, err := utils.Download(url)
  438. if err != nil {
  439. return err
  440. }
  441. defer file.Body.Close()
  442. config, _, _, err := ParseRun([]string{img.ID, "echo", "insert", url, path}, srv.runtime.capabilities)
  443. if err != nil {
  444. return err
  445. }
  446. c, _, err := srv.runtime.Create(config, "")
  447. if err != nil {
  448. return err
  449. }
  450. if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, "", "Downloading"), path); err != nil {
  451. return err
  452. }
  453. // FIXME: Handle custom repo, tag comment, author
  454. img, err = srv.runtime.Commit(c, "", "", img.Comment, img.Author, nil)
  455. if err != nil {
  456. return err
  457. }
  458. out.Write(sf.FormatStatus(img.ID, ""))
  459. return nil
  460. }
  461. func (srv *Server) ImagesViz(out io.Writer) error {
  462. images, _ := srv.runtime.graph.Map()
  463. if images == nil {
  464. return nil
  465. }
  466. out.Write([]byte("digraph docker {\n"))
  467. var (
  468. parentImage *Image
  469. err error
  470. )
  471. for _, image := range images {
  472. parentImage, err = image.GetParent()
  473. if err != nil {
  474. return fmt.Errorf("Error while getting parent image: %v", err)
  475. }
  476. if parentImage != nil {
  477. out.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n"))
  478. } else {
  479. out.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n"))
  480. }
  481. }
  482. reporefs := make(map[string][]string)
  483. for name, repository := range srv.runtime.repositories.Repositories {
  484. for tag, id := range repository {
  485. reporefs[utils.TruncateID(id)] = append(reporefs[utils.TruncateID(id)], fmt.Sprintf("%s:%s", name, tag))
  486. }
  487. }
  488. for id, repos := range reporefs {
  489. out.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
  490. }
  491. out.Write([]byte(" base [style=invisible]\n}\n"))
  492. return nil
  493. }
  494. func (srv *Server) Images(all bool, filter string) ([]APIImages, error) {
  495. var (
  496. allImages map[string]*Image
  497. err error
  498. )
  499. if all {
  500. allImages, err = srv.runtime.graph.Map()
  501. } else {
  502. allImages, err = srv.runtime.graph.Heads()
  503. }
  504. if err != nil {
  505. return nil, err
  506. }
  507. lookup := make(map[string]APIImages)
  508. for name, repository := range srv.runtime.repositories.Repositories {
  509. if filter != "" {
  510. if match, _ := path.Match(filter, name); !match {
  511. continue
  512. }
  513. }
  514. for tag, id := range repository {
  515. image, err := srv.runtime.graph.Get(id)
  516. if err != nil {
  517. log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
  518. continue
  519. }
  520. if out, exists := lookup[id]; exists {
  521. out.RepoTags = append(out.RepoTags, fmt.Sprintf("%s:%s", name, tag))
  522. lookup[id] = out
  523. } else {
  524. var out APIImages
  525. delete(allImages, id)
  526. out.ParentId = image.Parent
  527. out.RepoTags = []string{fmt.Sprintf("%s:%s", name, tag)}
  528. out.ID = image.ID
  529. out.Created = image.Created.Unix()
  530. out.Size = image.Size
  531. out.VirtualSize = image.getParentsSize(0) + image.Size
  532. lookup[id] = out
  533. }
  534. }
  535. }
  536. outs := make([]APIImages, 0, len(lookup))
  537. for _, value := range lookup {
  538. outs = append(outs, value)
  539. }
  540. // Display images which aren't part of a repository/tag
  541. if filter == "" {
  542. for _, image := range allImages {
  543. var out APIImages
  544. out.ID = image.ID
  545. out.ParentId = image.Parent
  546. out.RepoTags = []string{"<none>:<none>"}
  547. out.Created = image.Created.Unix()
  548. out.Size = image.Size
  549. out.VirtualSize = image.getParentsSize(0) + image.Size
  550. outs = append(outs, out)
  551. }
  552. }
  553. sortImagesByCreationAndTag(outs)
  554. return outs, nil
  555. }
  556. func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
  557. images, _ := srv.runtime.graph.Map()
  558. var imgcount int
  559. if images == nil {
  560. imgcount = 0
  561. } else {
  562. imgcount = len(images)
  563. }
  564. lxcVersion := ""
  565. if output, err := exec.Command("lxc-version").CombinedOutput(); err == nil {
  566. outputStr := string(output)
  567. if len(strings.SplitN(outputStr, ":", 2)) == 2 {
  568. lxcVersion = strings.TrimSpace(strings.SplitN(string(output), ":", 2)[1])
  569. }
  570. }
  571. kernelVersion := "<unknown>"
  572. if kv, err := utils.GetKernelVersion(); err == nil {
  573. kernelVersion = kv.String()
  574. }
  575. // if we still have the original dockerinit binary from before we copied it locally, let's return the path to that, since that's more intuitive (the copied path is trivial to derive by hand given VERSION)
  576. initPath := utils.DockerInitPath("")
  577. if initPath == "" {
  578. // if that fails, we'll just return the path from the runtime
  579. initPath = srv.runtime.sysInitPath
  580. }
  581. v := &engine.Env{}
  582. v.SetInt("Containers", len(srv.runtime.List()))
  583. v.SetInt("Images", imgcount)
  584. v.Set("Driver", srv.runtime.driver.String())
  585. v.SetJson("DriverStatus", srv.runtime.driver.Status())
  586. v.SetBool("MemoryLimit", srv.runtime.capabilities.MemoryLimit)
  587. v.SetBool("SwapLimit", srv.runtime.capabilities.SwapLimit)
  588. v.SetBool("IPv4Forwarding", !srv.runtime.capabilities.IPv4ForwardingDisabled)
  589. v.SetBool("Debug", os.Getenv("DEBUG") != "")
  590. v.SetInt("NFd", utils.GetTotalUsedFds())
  591. v.SetInt("NGoroutines", runtime.NumGoroutine())
  592. v.Set("LXCVersion", lxcVersion)
  593. v.SetInt("NEventsListener", len(srv.events))
  594. v.Set("KernelVersion", kernelVersion)
  595. v.Set("IndexServerAddress", auth.IndexServerAddress())
  596. v.Set("InitSha1", utils.INITSHA1)
  597. v.Set("InitPath", initPath)
  598. if _, err := v.WriteTo(job.Stdout); err != nil {
  599. job.Error(err)
  600. return engine.StatusErr
  601. }
  602. return engine.StatusOK
  603. }
  604. func (srv *Server) ImageHistory(name string) ([]APIHistory, error) {
  605. image, err := srv.runtime.repositories.LookupImage(name)
  606. if err != nil {
  607. return nil, err
  608. }
  609. lookupMap := make(map[string][]string)
  610. for name, repository := range srv.runtime.repositories.Repositories {
  611. for tag, id := range repository {
  612. // If the ID already has a reverse lookup, do not update it unless for "latest"
  613. if _, exists := lookupMap[id]; !exists {
  614. lookupMap[id] = []string{}
  615. }
  616. lookupMap[id] = append(lookupMap[id], name+":"+tag)
  617. }
  618. }
  619. outs := []APIHistory{} //produce [] when empty instead of 'null'
  620. err = image.WalkHistory(func(img *Image) error {
  621. var out APIHistory
  622. out.ID = img.ID
  623. out.Created = img.Created.Unix()
  624. out.CreatedBy = strings.Join(img.ContainerConfig.Cmd, " ")
  625. out.Tags = lookupMap[img.ID]
  626. out.Size = img.Size
  627. outs = append(outs, out)
  628. return nil
  629. })
  630. return outs, nil
  631. }
  632. func (srv *Server) ContainerTop(name, psArgs string) (*APITop, error) {
  633. if container := srv.runtime.Get(name); container != nil {
  634. if !container.State.IsRunning() {
  635. return nil, fmt.Errorf("Container %s is not running", name)
  636. }
  637. pids, err := cgroups.GetPidsForContainer(container.ID)
  638. if err != nil {
  639. return nil, err
  640. }
  641. if len(psArgs) == 0 {
  642. psArgs = "-ef"
  643. }
  644. output, err := exec.Command("ps", psArgs).Output()
  645. if err != nil {
  646. return nil, fmt.Errorf("Error running ps: %s", err)
  647. }
  648. lines := strings.Split(string(output), "\n")
  649. header := strings.Fields(lines[0])
  650. procs := APITop{
  651. Titles: header,
  652. }
  653. pidIndex := -1
  654. for i, name := range header {
  655. if name == "PID" {
  656. pidIndex = i
  657. }
  658. }
  659. if pidIndex == -1 {
  660. return nil, errors.New("Couldn't find PID field in ps output")
  661. }
  662. for _, line := range lines[1:] {
  663. if len(line) == 0 {
  664. continue
  665. }
  666. fields := strings.Fields(line)
  667. p, err := strconv.Atoi(fields[pidIndex])
  668. if err != nil {
  669. return nil, fmt.Errorf("Unexpected pid '%s': %s", fields[pidIndex], err)
  670. }
  671. for _, pid := range pids {
  672. if pid == p {
  673. // Make sure number of fields equals number of header titles
  674. // merging "overhanging" fields
  675. processes := fields[:len(procs.Titles)-1]
  676. processes = append(processes, strings.Join(fields[len(procs.Titles)-1:], " "))
  677. procs.Processes = append(procs.Processes, processes)
  678. }
  679. }
  680. }
  681. return &procs, nil
  682. }
  683. return nil, fmt.Errorf("No such container: %s", name)
  684. }
  685. func (srv *Server) ContainerChanges(name string) ([]archive.Change, error) {
  686. if container := srv.runtime.Get(name); container != nil {
  687. return container.Changes()
  688. }
  689. return nil, fmt.Errorf("No such container: %s", name)
  690. }
  691. func (srv *Server) Containers(all, size bool, n int, since, before string) []APIContainers {
  692. var foundBefore bool
  693. var displayed int
  694. out := []APIContainers{}
  695. names := map[string][]string{}
  696. srv.runtime.containerGraph.Walk("/", func(p string, e *graphdb.Entity) error {
  697. names[e.ID()] = append(names[e.ID()], p)
  698. return nil
  699. }, -1)
  700. for _, container := range srv.runtime.List() {
  701. if !container.State.IsRunning() && !all && n == -1 && since == "" && before == "" {
  702. continue
  703. }
  704. if before != "" && !foundBefore {
  705. if container.ID == before || utils.TruncateID(container.ID) == before {
  706. foundBefore = true
  707. }
  708. continue
  709. }
  710. if displayed == n {
  711. break
  712. }
  713. if container.ID == since || utils.TruncateID(container.ID) == since {
  714. break
  715. }
  716. displayed++
  717. c := createAPIContainer(names[container.ID], container, size, srv.runtime)
  718. out = append(out, c)
  719. }
  720. return out
  721. }
  722. func createAPIContainer(names []string, container *Container, size bool, runtime *Runtime) APIContainers {
  723. c := APIContainers{
  724. ID: container.ID,
  725. }
  726. c.Names = names
  727. c.Image = runtime.repositories.ImageName(container.Image)
  728. c.Command = fmt.Sprintf("%s %s", container.Path, strings.Join(container.Args, " "))
  729. c.Created = container.Created.Unix()
  730. c.Status = container.State.String()
  731. c.Ports = container.NetworkSettings.PortMappingAPI()
  732. if size {
  733. c.SizeRw, c.SizeRootFs = container.GetSize()
  734. }
  735. return c
  736. }
  737. func (srv *Server) ContainerCommit(job *engine.Job) engine.Status {
  738. if len(job.Args) != 1 {
  739. job.Errorf("Not enough arguments. Usage: %s CONTAINER\n", job.Name)
  740. return engine.StatusErr
  741. }
  742. name := job.Args[0]
  743. container := srv.runtime.Get(name)
  744. if container == nil {
  745. job.Errorf("No such container: %s", name)
  746. return engine.StatusErr
  747. }
  748. var config Config
  749. if err := job.GetenvJson("config", &config); err != nil {
  750. job.Error(err)
  751. return engine.StatusErr
  752. }
  753. img, err := srv.runtime.Commit(container, job.Getenv("repo"), job.Getenv("tag"), job.Getenv("comment"), job.Getenv("author"), &config)
  754. if err != nil {
  755. job.Error(err)
  756. return engine.StatusErr
  757. }
  758. job.Printf("%s\n", img.ID)
  759. return engine.StatusOK
  760. }
  761. func (srv *Server) ImageTag(job *engine.Job) engine.Status {
  762. if len(job.Args) != 2 && len(job.Args) != 3 {
  763. job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name)
  764. return engine.StatusErr
  765. }
  766. var tag string
  767. if len(job.Args) == 3 {
  768. tag = job.Args[2]
  769. }
  770. if err := srv.runtime.repositories.Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil {
  771. job.Error(err)
  772. return engine.StatusErr
  773. }
  774. return engine.StatusOK
  775. }
  776. func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
  777. history, err := r.GetRemoteHistory(imgID, endpoint, token)
  778. if err != nil {
  779. return err
  780. }
  781. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
  782. // FIXME: Try to stream the images?
  783. // FIXME: Launch the getRemoteImage() in goroutines
  784. for i := len(history) - 1; i >= 0; i-- {
  785. id := history[i]
  786. // ensure no two downloads of the same layer happen at the same time
  787. if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
  788. utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err)
  789. <-c
  790. }
  791. defer srv.poolRemove("pull", "layer:"+id)
  792. if !srv.runtime.graph.Exists(id) {
  793. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
  794. imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token)
  795. if err != nil {
  796. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  797. // FIXME: Keep going in case of error?
  798. return err
  799. }
  800. img, err := NewImgJSON(imgJSON)
  801. if err != nil {
  802. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  803. return fmt.Errorf("Failed to parse json: %s", err)
  804. }
  805. // Get the layer
  806. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil))
  807. layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
  808. if err != nil {
  809. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  810. return err
  811. }
  812. defer layer.Close()
  813. if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil {
  814. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
  815. return err
  816. }
  817. }
  818. out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
  819. }
  820. return nil
  821. }
  822. func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
  823. out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
  824. repoData, err := r.GetRepositoryData(remoteName)
  825. if err != nil {
  826. return err
  827. }
  828. utils.Debugf("Retrieving the tag list")
  829. tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
  830. if err != nil {
  831. utils.Errorf("%v", err)
  832. return err
  833. }
  834. for tag, id := range tagsList {
  835. repoData.ImgList[id] = &registry.ImgData{
  836. ID: id,
  837. Tag: tag,
  838. Checksum: "",
  839. }
  840. }
  841. utils.Debugf("Registering tags")
  842. // If no tag has been specified, pull them all
  843. if askedTag == "" {
  844. for tag, id := range tagsList {
  845. repoData.ImgList[id].Tag = tag
  846. }
  847. } else {
  848. // Otherwise, check that the tag exists and use only that one
  849. id, exists := tagsList[askedTag]
  850. if !exists {
  851. return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
  852. }
  853. repoData.ImgList[id].Tag = askedTag
  854. }
  855. errors := make(chan error)
  856. for _, image := range repoData.ImgList {
  857. downloadImage := func(img *registry.ImgData) {
  858. if askedTag != "" && img.Tag != askedTag {
  859. utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
  860. if parallel {
  861. errors <- nil
  862. }
  863. return
  864. }
  865. if img.Tag == "" {
  866. utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
  867. if parallel {
  868. errors <- nil
  869. }
  870. return
  871. }
  872. // ensure no two downloads of the same image happen at the same time
  873. if c, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
  874. if c != nil {
  875. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
  876. <-c
  877. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  878. } else {
  879. utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
  880. }
  881. if parallel {
  882. errors <- nil
  883. }
  884. return
  885. }
  886. defer srv.poolRemove("pull", "img:"+img.ID)
  887. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
  888. success := false
  889. var lastErr error
  890. for _, ep := range repoData.Endpoints {
  891. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
  892. if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
  893. // Its not ideal that only the last error is returned, it would be better to concatenate the errors.
  894. // As the error is also given to the output stream the user will see the error.
  895. lastErr = err
  896. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
  897. continue
  898. }
  899. success = true
  900. break
  901. }
  902. if !success {
  903. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil))
  904. if parallel {
  905. errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
  906. return
  907. }
  908. }
  909. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  910. if parallel {
  911. errors <- nil
  912. }
  913. }
  914. if parallel {
  915. go downloadImage(image)
  916. } else {
  917. downloadImage(image)
  918. }
  919. }
  920. if parallel {
  921. var lastError error
  922. for i := 0; i < len(repoData.ImgList); i++ {
  923. if err := <-errors; err != nil {
  924. lastError = err
  925. }
  926. }
  927. if lastError != nil {
  928. return lastError
  929. }
  930. }
  931. for tag, id := range tagsList {
  932. if askedTag != "" && tag != askedTag {
  933. continue
  934. }
  935. if err := srv.runtime.repositories.Set(localName, tag, id, true); err != nil {
  936. return err
  937. }
  938. }
  939. if err := srv.runtime.repositories.Save(); err != nil {
  940. return err
  941. }
  942. return nil
  943. }
  944. func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
  945. srv.Lock()
  946. defer srv.Unlock()
  947. if c, exists := srv.pullingPool[key]; exists {
  948. return c, fmt.Errorf("pull %s is already in progress", key)
  949. }
  950. if c, exists := srv.pushingPool[key]; exists {
  951. return c, fmt.Errorf("push %s is already in progress", key)
  952. }
  953. c := make(chan struct{})
  954. switch kind {
  955. case "pull":
  956. srv.pullingPool[key] = c
  957. case "push":
  958. srv.pushingPool[key] = c
  959. default:
  960. return nil, fmt.Errorf("Unknown pool type")
  961. }
  962. return c, nil
  963. }
  964. func (srv *Server) poolRemove(kind, key string) error {
  965. srv.Lock()
  966. defer srv.Unlock()
  967. switch kind {
  968. case "pull":
  969. if c, exists := srv.pullingPool[key]; exists {
  970. close(c)
  971. delete(srv.pullingPool, key)
  972. }
  973. case "push":
  974. if c, exists := srv.pushingPool[key]; exists {
  975. close(c)
  976. delete(srv.pushingPool, key)
  977. }
  978. default:
  979. return fmt.Errorf("Unknown pool type")
  980. }
  981. return nil
  982. }
  983. func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string, parallel bool) error {
  984. out = utils.NewWriteFlusher(out)
  985. c, err := srv.poolAdd("pull", localName+":"+tag)
  986. if err != nil {
  987. if c != nil {
  988. // Another pull of the same repository is already taking place; just wait for it to finish
  989. out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
  990. <-c
  991. return nil
  992. }
  993. return err
  994. }
  995. defer srv.poolRemove("pull", localName+":"+tag)
  996. // Resolve the Repository name from fqn to endpoint + name
  997. endpoint, remoteName, err := registry.ResolveRepositoryName(localName)
  998. if err != nil {
  999. return err
  1000. }
  1001. r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
  1002. if err != nil {
  1003. return err
  1004. }
  1005. if endpoint == auth.IndexServerAddress() {
  1006. // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
  1007. localName = remoteName
  1008. }
  1009. if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, parallel); err != nil {
  1010. return err
  1011. }
  1012. return nil
  1013. }
  1014. // Retrieve the all the images to be uploaded in the correct order
  1015. // Note: we can't use a map as it is not ordered
  1016. func (srv *Server) getImageList(localRepo map[string]string) ([][]*registry.ImgData, error) {
  1017. imgList := map[string]*registry.ImgData{}
  1018. depGraph := utils.NewDependencyGraph()
  1019. for tag, id := range localRepo {
  1020. img, err := srv.runtime.graph.Get(id)
  1021. if err != nil {
  1022. return nil, err
  1023. }
  1024. depGraph.NewNode(img.ID)
  1025. img.WalkHistory(func(current *Image) error {
  1026. imgList[current.ID] = &registry.ImgData{
  1027. ID: current.ID,
  1028. Tag: tag,
  1029. }
  1030. parent, err := current.GetParent()
  1031. if err != nil {
  1032. return err
  1033. }
  1034. if parent == nil {
  1035. return nil
  1036. }
  1037. depGraph.NewNode(parent.ID)
  1038. depGraph.AddDependency(current.ID, parent.ID)
  1039. return nil
  1040. })
  1041. }
  1042. traversalMap, err := depGraph.GenerateTraversalMap()
  1043. if err != nil {
  1044. return nil, err
  1045. }
  1046. utils.Debugf("Traversal map: %v", traversalMap)
  1047. result := [][]*registry.ImgData{}
  1048. for _, round := range traversalMap {
  1049. dataRound := []*registry.ImgData{}
  1050. for _, imgID := range round {
  1051. dataRound = append(dataRound, imgList[imgID])
  1052. }
  1053. result = append(result, dataRound)
  1054. }
  1055. return result, nil
  1056. }
  1057. func flatten(slc [][]*registry.ImgData) []*registry.ImgData {
  1058. result := []*registry.ImgData{}
  1059. for _, x := range slc {
  1060. result = append(result, x...)
  1061. }
  1062. return result
  1063. }
  1064. func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, sf *utils.StreamFormatter) error {
  1065. out = utils.NewWriteFlusher(out)
  1066. imgList, err := srv.getImageList(localRepo)
  1067. if err != nil {
  1068. return err
  1069. }
  1070. flattenedImgList := flatten(imgList)
  1071. out.Write(sf.FormatStatus("", "Sending image list"))
  1072. var repoData *registry.RepositoryData
  1073. repoData, err = r.PushImageJSONIndex(remoteName, flattenedImgList, false, nil)
  1074. if err != nil {
  1075. return err
  1076. }
  1077. for _, ep := range repoData.Endpoints {
  1078. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, len(localRepo)))
  1079. // This section can not be parallelized (each round depends on the previous one)
  1080. for i, round := range imgList {
  1081. // FIXME: This section can be parallelized
  1082. for _, elem := range round {
  1083. var pushTags func() error
  1084. pushTags = func() error {
  1085. if i < (len(imgList) - 1) {
  1086. // Only tag the top layer in the repository
  1087. return nil
  1088. }
  1089. out.Write(sf.FormatStatus("", "Pushing tags for rev [%s] on {%s}", utils.TruncateID(elem.ID), ep+"repositories/"+remoteName+"/tags/"+elem.Tag))
  1090. if err := r.PushRegistryTag(remoteName, elem.ID, elem.Tag, ep, repoData.Tokens); err != nil {
  1091. return err
  1092. }
  1093. return nil
  1094. }
  1095. if _, exists := repoData.ImgList[elem.ID]; exists {
  1096. if err := pushTags(); err != nil {
  1097. return err
  1098. }
  1099. out.Write(sf.FormatProgress(utils.TruncateID(elem.ID), "Image already pushed, skipping", nil))
  1100. continue
  1101. } else if r.LookupRemoteImage(elem.ID, ep, repoData.Tokens) {
  1102. if err := pushTags(); err != nil {
  1103. return err
  1104. }
  1105. out.Write(sf.FormatProgress(utils.TruncateID(elem.ID), "Image already pushed, skipping", nil))
  1106. continue
  1107. }
  1108. checksum, err := srv.pushImage(r, out, remoteName, elem.ID, ep, repoData.Tokens, sf)
  1109. if err != nil {
  1110. // FIXME: Continue on error?
  1111. return err
  1112. }
  1113. elem.Checksum = checksum
  1114. if err := pushTags(); err != nil {
  1115. return err
  1116. }
  1117. }
  1118. }
  1119. }
  1120. if _, err := r.PushImageJSONIndex(remoteName, flattenedImgList, true, repoData.Endpoints); err != nil {
  1121. return err
  1122. }
  1123. return nil
  1124. }
  1125. func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  1126. out = utils.NewWriteFlusher(out)
  1127. jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgID, "json"))
  1128. if err != nil {
  1129. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  1130. }
  1131. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  1132. imgData := &registry.ImgData{
  1133. ID: imgID,
  1134. }
  1135. // Send the json
  1136. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  1137. if err == registry.ErrAlreadyExists {
  1138. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  1139. return "", nil
  1140. }
  1141. return "", err
  1142. }
  1143. layerData, err := srv.runtime.graph.TempLayerArchive(imgID, archive.Uncompressed, sf, out)
  1144. if err != nil {
  1145. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  1146. }
  1147. defer os.RemoveAll(layerData.Name())
  1148. // Send the layer
  1149. checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  1150. if err != nil {
  1151. return "", err
  1152. }
  1153. imgData.Checksum = checksum
  1154. // Send the checksum
  1155. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  1156. return "", err
  1157. }
  1158. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  1159. return imgData.Checksum, nil
  1160. }
  1161. // FIXME: Allow to interrupt current push when new push of same image is done.
  1162. func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string) error {
  1163. if _, err := srv.poolAdd("push", localName); err != nil {
  1164. return err
  1165. }
  1166. defer srv.poolRemove("push", localName)
  1167. // Resolve the Repository name from fqn to endpoint + name
  1168. endpoint, remoteName, err := registry.ResolveRepositoryName(localName)
  1169. if err != nil {
  1170. return err
  1171. }
  1172. out = utils.NewWriteFlusher(out)
  1173. img, err := srv.runtime.graph.Get(localName)
  1174. r, err2 := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
  1175. if err2 != nil {
  1176. return err2
  1177. }
  1178. if err != nil {
  1179. reposLen := len(srv.runtime.repositories.Repositories[localName])
  1180. out.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
  1181. // If it fails, try to get the repository
  1182. if localRepo, exists := srv.runtime.repositories.Repositories[localName]; exists {
  1183. if err := srv.pushRepository(r, out, localName, remoteName, localRepo, sf); err != nil {
  1184. return err
  1185. }
  1186. return nil
  1187. }
  1188. return err
  1189. }
  1190. var token []string
  1191. out.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
  1192. if _, err := srv.pushImage(r, out, remoteName, img.ID, endpoint, token, sf); err != nil {
  1193. return err
  1194. }
  1195. return nil
  1196. }
  1197. func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Writer, sf *utils.StreamFormatter) error {
  1198. var archive io.Reader
  1199. var resp *http.Response
  1200. if src == "-" {
  1201. archive = in
  1202. } else {
  1203. u, err := url.Parse(src)
  1204. if err != nil {
  1205. return err
  1206. }
  1207. if u.Scheme == "" {
  1208. u.Scheme = "http"
  1209. u.Host = src
  1210. u.Path = ""
  1211. }
  1212. out.Write(sf.FormatStatus("", "Downloading from %s", u))
  1213. // Download with curl (pretty progress bar)
  1214. // If curl is not available, fallback to http.Get()
  1215. resp, err = utils.Download(u.String())
  1216. if err != nil {
  1217. return err
  1218. }
  1219. archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
  1220. }
  1221. img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
  1222. if err != nil {
  1223. return err
  1224. }
  1225. // Optionally register the image at REPO/TAG
  1226. if repo != "" {
  1227. if err := srv.runtime.repositories.Set(repo, tag, img.ID, true); err != nil {
  1228. return err
  1229. }
  1230. }
  1231. out.Write(sf.FormatStatus("", img.ID))
  1232. return nil
  1233. }
  1234. func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
  1235. var name string
  1236. if len(job.Args) == 1 {
  1237. name = job.Args[0]
  1238. } else if len(job.Args) > 1 {
  1239. job.Printf("Usage: %s", job.Name)
  1240. return engine.StatusErr
  1241. }
  1242. var config Config
  1243. if err := job.ExportEnv(&config); err != nil {
  1244. job.Error(err)
  1245. return engine.StatusErr
  1246. }
  1247. if config.Memory != 0 && config.Memory < 524288 {
  1248. job.Errorf("Minimum memory limit allowed is 512k")
  1249. return engine.StatusErr
  1250. }
  1251. if config.Memory > 0 && !srv.runtime.capabilities.MemoryLimit {
  1252. config.Memory = 0
  1253. }
  1254. if config.Memory > 0 && !srv.runtime.capabilities.SwapLimit {
  1255. config.MemorySwap = -1
  1256. }
  1257. container, buildWarnings, err := srv.runtime.Create(&config, name)
  1258. if err != nil {
  1259. if srv.runtime.graph.IsNotExist(err) {
  1260. _, tag := utils.ParseRepositoryTag(config.Image)
  1261. if tag == "" {
  1262. tag = DEFAULTTAG
  1263. }
  1264. job.Errorf("No such image: %s (tag: %s)", config.Image, tag)
  1265. return engine.StatusErr
  1266. }
  1267. job.Error(err)
  1268. return engine.StatusErr
  1269. }
  1270. srv.LogEvent("create", container.ID, srv.runtime.repositories.ImageName(container.Image))
  1271. // FIXME: this is necessary because runtime.Create might return a nil container
  1272. // with a non-nil error. This should not happen! Once it's fixed we
  1273. // can remove this workaround.
  1274. if container != nil {
  1275. job.Printf("%s\n", container.ID)
  1276. }
  1277. for _, warning := range buildWarnings {
  1278. job.Errorf("%s\n", warning)
  1279. }
  1280. return engine.StatusOK
  1281. }
  1282. func (srv *Server) ContainerRestart(name string, t int) error {
  1283. if container := srv.runtime.Get(name); container != nil {
  1284. if err := container.Restart(t); err != nil {
  1285. return fmt.Errorf("Cannot restart container %s: %s", name, err)
  1286. }
  1287. srv.LogEvent("restart", container.ID, srv.runtime.repositories.ImageName(container.Image))
  1288. } else {
  1289. return fmt.Errorf("No such container: %s", name)
  1290. }
  1291. return nil
  1292. }
  1293. func (srv *Server) ContainerDestroy(name string, removeVolume, removeLink bool) error {
  1294. container := srv.runtime.Get(name)
  1295. if removeLink {
  1296. if container == nil {
  1297. return fmt.Errorf("No such link: %s", name)
  1298. }
  1299. name, err := getFullName(name)
  1300. if err != nil {
  1301. return err
  1302. }
  1303. parent, n := path.Split(name)
  1304. if parent == "/" {
  1305. return fmt.Errorf("Conflict, cannot remove the default name of the container")
  1306. }
  1307. pe := srv.runtime.containerGraph.Get(parent)
  1308. if pe == nil {
  1309. return fmt.Errorf("Cannot get parent %s for name %s", parent, name)
  1310. }
  1311. parentContainer := srv.runtime.Get(pe.ID())
  1312. if parentContainer != nil && parentContainer.activeLinks != nil {
  1313. if link, exists := parentContainer.activeLinks[n]; exists {
  1314. link.Disable()
  1315. } else {
  1316. utils.Debugf("Could not find active link for %s", name)
  1317. }
  1318. }
  1319. if err := srv.runtime.containerGraph.Delete(name); err != nil {
  1320. return err
  1321. }
  1322. return nil
  1323. }
  1324. if container != nil {
  1325. if container.State.IsRunning() {
  1326. return fmt.Errorf("Impossible to remove a running container, please stop it first")
  1327. }
  1328. volumes := make(map[string]struct{})
  1329. binds := make(map[string]struct{})
  1330. for _, bind := range container.hostConfig.Binds {
  1331. splitBind := strings.Split(bind, ":")
  1332. source := splitBind[0]
  1333. binds[source] = struct{}{}
  1334. }
  1335. // Store all the deleted containers volumes
  1336. for _, volumeId := range container.Volumes {
  1337. // Skip the volumes mounted from external
  1338. if _, exists := binds[volumeId]; exists {
  1339. continue
  1340. }
  1341. volumeId = strings.TrimSuffix(volumeId, "/layer")
  1342. volumeId = filepath.Base(volumeId)
  1343. volumes[volumeId] = struct{}{}
  1344. }
  1345. if err := srv.runtime.Destroy(container); err != nil {
  1346. return fmt.Errorf("Cannot destroy container %s: %s", name, err)
  1347. }
  1348. srv.LogEvent("destroy", container.ID, srv.runtime.repositories.ImageName(container.Image))
  1349. if removeVolume {
  1350. // Retrieve all volumes from all remaining containers
  1351. usedVolumes := make(map[string]*Container)
  1352. for _, container := range srv.runtime.List() {
  1353. for _, containerVolumeId := range container.Volumes {
  1354. usedVolumes[containerVolumeId] = container
  1355. }
  1356. }
  1357. for volumeId := range volumes {
  1358. // If the requested volu
  1359. if c, exists := usedVolumes[volumeId]; exists {
  1360. log.Printf("The volume %s is used by the container %s. Impossible to remove it. Skipping.\n", volumeId, c.ID)
  1361. continue
  1362. }
  1363. if err := srv.runtime.volumes.Delete(volumeId); err != nil {
  1364. return err
  1365. }
  1366. }
  1367. }
  1368. } else {
  1369. return fmt.Errorf("No such container: %s", name)
  1370. }
  1371. return nil
  1372. }
  1373. var ErrImageReferenced = errors.New("Image referenced by a repository")
  1374. func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi, byParents map[string][]*Image) error {
  1375. // If the image is referenced by a repo, do not delete
  1376. if len(srv.runtime.repositories.ByID()[id]) != 0 {
  1377. return ErrImageReferenced
  1378. }
  1379. // If the image is not referenced but has children, go recursive
  1380. referenced := false
  1381. for _, img := range byParents[id] {
  1382. if err := srv.deleteImageAndChildren(img.ID, imgs, byParents); err != nil {
  1383. if err != ErrImageReferenced {
  1384. return err
  1385. }
  1386. referenced = true
  1387. }
  1388. }
  1389. if referenced {
  1390. return ErrImageReferenced
  1391. }
  1392. // If the image is not referenced and has no children, remove it
  1393. byParents, err := srv.runtime.graph.ByParent()
  1394. if err != nil {
  1395. return err
  1396. }
  1397. if len(byParents[id]) == 0 && srv.canDeleteImage(id) == nil {
  1398. if err := srv.runtime.repositories.DeleteAll(id); err != nil {
  1399. return err
  1400. }
  1401. err := srv.runtime.graph.Delete(id)
  1402. if err != nil {
  1403. return err
  1404. }
  1405. *imgs = append(*imgs, APIRmi{Deleted: id})
  1406. srv.LogEvent("delete", id, "")
  1407. return nil
  1408. }
  1409. return nil
  1410. }
  1411. func (srv *Server) deleteImageParents(img *Image, imgs *[]APIRmi) error {
  1412. if img.Parent != "" {
  1413. parent, err := srv.runtime.graph.Get(img.Parent)
  1414. if err != nil {
  1415. return err
  1416. }
  1417. byParents, err := srv.runtime.graph.ByParent()
  1418. if err != nil {
  1419. return err
  1420. }
  1421. // Remove all children images
  1422. if err := srv.deleteImageAndChildren(img.Parent, imgs, byParents); err != nil {
  1423. return err
  1424. }
  1425. return srv.deleteImageParents(parent, imgs)
  1426. }
  1427. return nil
  1428. }
  1429. func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, error) {
  1430. var (
  1431. imgs = []APIRmi{}
  1432. tags = []string{}
  1433. )
  1434. //If delete by id, see if the id belong only to one repository
  1435. if repoName == "" {
  1436. for _, repoAndTag := range srv.runtime.repositories.ByID()[img.ID] {
  1437. parsedRepo, parsedTag := utils.ParseRepositoryTag(repoAndTag)
  1438. if repoName == "" || repoName == parsedRepo {
  1439. repoName = parsedRepo
  1440. if parsedTag != "" {
  1441. tags = append(tags, parsedTag)
  1442. }
  1443. } else if repoName != parsedRepo {
  1444. // the id belongs to multiple repos, like base:latest and user:test,
  1445. // in that case return conflict
  1446. return nil, fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories", utils.TruncateID(img.ID))
  1447. }
  1448. }
  1449. } else {
  1450. tags = append(tags, tag)
  1451. }
  1452. //Untag the current image
  1453. for _, tag := range tags {
  1454. tagDeleted, err := srv.runtime.repositories.Delete(repoName, tag)
  1455. if err != nil {
  1456. return nil, err
  1457. }
  1458. if tagDeleted {
  1459. imgs = append(imgs, APIRmi{Untagged: img.ID})
  1460. srv.LogEvent("untag", img.ID, "")
  1461. }
  1462. }
  1463. if len(srv.runtime.repositories.ByID()[img.ID]) == 0 {
  1464. if err := srv.deleteImageAndChildren(img.ID, &imgs, nil); err != nil {
  1465. if err != ErrImageReferenced {
  1466. return imgs, err
  1467. }
  1468. } else if err := srv.deleteImageParents(img, &imgs); err != nil {
  1469. if err != ErrImageReferenced {
  1470. return imgs, err
  1471. }
  1472. }
  1473. }
  1474. return imgs, nil
  1475. }
  1476. func (srv *Server) ImageDelete(name string, autoPrune bool) ([]APIRmi, error) {
  1477. var (
  1478. repository, tag string
  1479. img, err = srv.runtime.repositories.LookupImage(name)
  1480. )
  1481. if err != nil {
  1482. return nil, fmt.Errorf("No such image: %s", name)
  1483. }
  1484. // FIXME: What does autoPrune mean ?
  1485. if !autoPrune {
  1486. if err := srv.runtime.graph.Delete(img.ID); err != nil {
  1487. return nil, fmt.Errorf("Cannot delete image %s: %s", name, err)
  1488. }
  1489. return nil, nil
  1490. }
  1491. if !strings.Contains(img.ID, name) {
  1492. repository, tag = utils.ParseRepositoryTag(name)
  1493. }
  1494. // If we have a repo and the image is not referenced anywhere else
  1495. // then just perform an untag and do not validate.
  1496. //
  1497. // i.e. only validate if we are performing an actual delete and not
  1498. // an untag op
  1499. if repository != "" && len(srv.runtime.repositories.ByID()[img.ID]) == 1 {
  1500. // Prevent deletion if image is used by a container
  1501. if err := srv.canDeleteImage(img.ID); err != nil {
  1502. return nil, err
  1503. }
  1504. }
  1505. return srv.deleteImage(img, repository, tag)
  1506. }
  1507. func (srv *Server) canDeleteImage(imgID string) error {
  1508. for _, container := range srv.runtime.List() {
  1509. parent, err := srv.runtime.repositories.LookupImage(container.Image)
  1510. if err != nil {
  1511. return err
  1512. }
  1513. if err := parent.WalkHistory(func(p *Image) error {
  1514. if imgID == p.ID {
  1515. return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it", utils.TruncateID(imgID), utils.TruncateID(container.ID))
  1516. }
  1517. return nil
  1518. }); err != nil {
  1519. return err
  1520. }
  1521. }
  1522. return nil
  1523. }
  1524. func (srv *Server) ImageGetCached(imgID string, config *Config) (*Image, error) {
  1525. // Retrieve all images
  1526. images, err := srv.runtime.graph.Map()
  1527. if err != nil {
  1528. return nil, err
  1529. }
  1530. // Store the tree in a map of map (map[parentId][childId])
  1531. imageMap := make(map[string]map[string]struct{})
  1532. for _, img := range images {
  1533. if _, exists := imageMap[img.Parent]; !exists {
  1534. imageMap[img.Parent] = make(map[string]struct{})
  1535. }
  1536. imageMap[img.Parent][img.ID] = struct{}{}
  1537. }
  1538. // Loop on the children of the given image and check the config
  1539. for elem := range imageMap[imgID] {
  1540. img, err := srv.runtime.graph.Get(elem)
  1541. if err != nil {
  1542. return nil, err
  1543. }
  1544. if CompareConfig(&img.ContainerConfig, config) {
  1545. return img, nil
  1546. }
  1547. }
  1548. return nil, nil
  1549. }
  1550. func (srv *Server) RegisterLinks(container *Container, hostConfig *HostConfig) error {
  1551. runtime := srv.runtime
  1552. if hostConfig != nil && hostConfig.Links != nil {
  1553. for _, l := range hostConfig.Links {
  1554. parts, err := parseLink(l)
  1555. if err != nil {
  1556. return err
  1557. }
  1558. child, err := srv.runtime.GetByName(parts["name"])
  1559. if err != nil {
  1560. return err
  1561. }
  1562. if child == nil {
  1563. return fmt.Errorf("Could not get container for %s", parts["name"])
  1564. }
  1565. if err := runtime.RegisterLink(container, child, parts["alias"]); err != nil {
  1566. return err
  1567. }
  1568. }
  1569. // After we load all the links into the runtime
  1570. // set them to nil on the hostconfig
  1571. hostConfig.Links = nil
  1572. if err := container.writeHostConfig(); err != nil {
  1573. return err
  1574. }
  1575. }
  1576. return nil
  1577. }
  1578. func (srv *Server) ContainerStart(job *engine.Job) engine.Status {
  1579. if len(job.Args) < 1 {
  1580. job.Errorf("Usage: %s container_id", job.Name)
  1581. return engine.StatusErr
  1582. }
  1583. name := job.Args[0]
  1584. runtime := srv.runtime
  1585. container := runtime.Get(name)
  1586. if container == nil {
  1587. job.Errorf("No such container: %s", name)
  1588. return engine.StatusErr
  1589. }
  1590. // If no environment was set, then no hostconfig was passed.
  1591. if len(job.Environ()) > 0 {
  1592. var hostConfig HostConfig
  1593. if err := job.ExportEnv(&hostConfig); err != nil {
  1594. job.Error(err)
  1595. return engine.StatusErr
  1596. }
  1597. // Validate the HostConfig binds. Make sure that:
  1598. // 1) the source of a bind mount isn't /
  1599. // The bind mount "/:/foo" isn't allowed.
  1600. // 2) Check that the source exists
  1601. // The source to be bind mounted must exist.
  1602. for _, bind := range hostConfig.Binds {
  1603. splitBind := strings.Split(bind, ":")
  1604. source := splitBind[0]
  1605. // refuse to bind mount "/" to the container
  1606. if source == "/" {
  1607. job.Errorf("Invalid bind mount '%s' : source can't be '/'", bind)
  1608. return engine.StatusErr
  1609. }
  1610. // ensure the source exists on the host
  1611. _, err := os.Stat(source)
  1612. if err != nil && os.IsNotExist(err) {
  1613. job.Errorf("Invalid bind mount '%s' : source doesn't exist", bind)
  1614. return engine.StatusErr
  1615. }
  1616. }
  1617. // Register any links from the host config before starting the container
  1618. if err := srv.RegisterLinks(container, &hostConfig); err != nil {
  1619. job.Error(err)
  1620. return engine.StatusErr
  1621. }
  1622. container.hostConfig = &hostConfig
  1623. container.ToDisk()
  1624. }
  1625. if err := container.Start(); err != nil {
  1626. job.Errorf("Cannot start container %s: %s", name, err)
  1627. return engine.StatusErr
  1628. }
  1629. srv.LogEvent("start", container.ID, runtime.repositories.ImageName(container.Image))
  1630. return engine.StatusOK
  1631. }
  1632. func (srv *Server) ContainerStop(job *engine.Job) engine.Status {
  1633. if len(job.Args) != 1 {
  1634. job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1635. return engine.StatusErr
  1636. }
  1637. name := job.Args[0]
  1638. t := job.GetenvInt("t")
  1639. if t == -1 {
  1640. t = 10
  1641. }
  1642. if container := srv.runtime.Get(name); container != nil {
  1643. if err := container.Stop(int(t)); err != nil {
  1644. job.Errorf("Cannot stop container %s: %s\n", name, err)
  1645. return engine.StatusErr
  1646. }
  1647. srv.LogEvent("stop", container.ID, srv.runtime.repositories.ImageName(container.Image))
  1648. } else {
  1649. job.Errorf("No such container: %s\n", name)
  1650. return engine.StatusErr
  1651. }
  1652. return engine.StatusOK
  1653. }
  1654. func (srv *Server) ContainerWait(job *engine.Job) engine.Status {
  1655. if len(job.Args) != 1 {
  1656. job.Errorf("Usage: %s", job.Name)
  1657. return engine.StatusErr
  1658. }
  1659. name := job.Args[0]
  1660. if container := srv.runtime.Get(name); container != nil {
  1661. status := container.Wait()
  1662. job.Printf("%d\n", status)
  1663. return engine.StatusOK
  1664. }
  1665. job.Errorf("%s: no such container: %s", job.Name, name)
  1666. return engine.StatusErr
  1667. }
  1668. func (srv *Server) ContainerResize(job *engine.Job) engine.Status {
  1669. if len(job.Args) != 3 {
  1670. job.Errorf("Not enough arguments. Usage: %s CONTAINER HEIGHT WIDTH\n", job.Name)
  1671. return engine.StatusErr
  1672. }
  1673. name := job.Args[0]
  1674. height, err := strconv.Atoi(job.Args[1])
  1675. if err != nil {
  1676. job.Error(err)
  1677. return engine.StatusErr
  1678. }
  1679. width, err := strconv.Atoi(job.Args[2])
  1680. if err != nil {
  1681. job.Error(err)
  1682. return engine.StatusErr
  1683. }
  1684. if container := srv.runtime.Get(name); container != nil {
  1685. if err := container.Resize(height, width); err != nil {
  1686. job.Error(err)
  1687. return engine.StatusErr
  1688. }
  1689. return engine.StatusOK
  1690. }
  1691. job.Errorf("No such container: %s", name)
  1692. return engine.StatusErr
  1693. }
  1694. func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, inStream io.ReadCloser, outStream, errStream io.Writer) error {
  1695. container := srv.runtime.Get(name)
  1696. if container == nil {
  1697. return fmt.Errorf("No such container: %s", name)
  1698. }
  1699. //logs
  1700. if logs {
  1701. cLog, err := container.ReadLog("json")
  1702. if err != nil && os.IsNotExist(err) {
  1703. // Legacy logs
  1704. utils.Errorf("Old logs format")
  1705. if stdout {
  1706. cLog, err := container.ReadLog("stdout")
  1707. if err != nil {
  1708. utils.Errorf("Error reading logs (stdout): %s", err)
  1709. } else if _, err := io.Copy(outStream, cLog); err != nil {
  1710. utils.Errorf("Error streaming logs (stdout): %s", err)
  1711. }
  1712. }
  1713. if stderr {
  1714. cLog, err := container.ReadLog("stderr")
  1715. if err != nil {
  1716. utils.Errorf("Error reading logs (stderr): %s", err)
  1717. } else if _, err := io.Copy(errStream, cLog); err != nil {
  1718. utils.Errorf("Error streaming logs (stderr): %s", err)
  1719. }
  1720. }
  1721. } else if err != nil {
  1722. utils.Errorf("Error reading logs (json): %s", err)
  1723. } else {
  1724. dec := json.NewDecoder(cLog)
  1725. for {
  1726. l := &utils.JSONLog{}
  1727. if err := dec.Decode(l); err == io.EOF {
  1728. break
  1729. } else if err != nil {
  1730. utils.Errorf("Error streaming logs: %s", err)
  1731. break
  1732. }
  1733. if l.Stream == "stdout" && stdout {
  1734. fmt.Fprintf(outStream, "%s", l.Log)
  1735. }
  1736. if l.Stream == "stderr" && stderr {
  1737. fmt.Fprintf(errStream, "%s", l.Log)
  1738. }
  1739. }
  1740. }
  1741. }
  1742. //stream
  1743. if stream {
  1744. if container.State.IsGhost() {
  1745. return fmt.Errorf("Impossible to attach to a ghost container")
  1746. }
  1747. var (
  1748. cStdin io.ReadCloser
  1749. cStdout, cStderr io.Writer
  1750. cStdinCloser io.Closer
  1751. )
  1752. if stdin {
  1753. r, w := io.Pipe()
  1754. go func() {
  1755. defer w.Close()
  1756. defer utils.Debugf("Closing buffered stdin pipe")
  1757. io.Copy(w, inStream)
  1758. }()
  1759. cStdin = r
  1760. cStdinCloser = inStream
  1761. }
  1762. if stdout {
  1763. cStdout = outStream
  1764. }
  1765. if stderr {
  1766. cStderr = errStream
  1767. }
  1768. <-container.Attach(cStdin, cStdinCloser, cStdout, cStderr)
  1769. // If we are in stdinonce mode, wait for the process to end
  1770. // otherwise, simply return
  1771. if container.Config.StdinOnce && !container.Config.Tty {
  1772. container.Wait()
  1773. }
  1774. }
  1775. return nil
  1776. }
  1777. func (srv *Server) ContainerInspect(name string) (*Container, error) {
  1778. if container := srv.runtime.Get(name); container != nil {
  1779. return container, nil
  1780. }
  1781. return nil, fmt.Errorf("No such container: %s", name)
  1782. }
  1783. func (srv *Server) ImageInspect(name string) (*Image, error) {
  1784. if image, err := srv.runtime.repositories.LookupImage(name); err == nil && image != nil {
  1785. return image, nil
  1786. }
  1787. return nil, fmt.Errorf("No such image: %s", name)
  1788. }
  1789. func (srv *Server) ContainerCopy(name string, resource string, out io.Writer) error {
  1790. if container := srv.runtime.Get(name); container != nil {
  1791. data, err := container.Copy(resource)
  1792. if err != nil {
  1793. return err
  1794. }
  1795. if _, err := io.Copy(out, data); err != nil {
  1796. return err
  1797. }
  1798. return nil
  1799. }
  1800. return fmt.Errorf("No such container: %s", name)
  1801. }
  1802. func NewServer(eng *engine.Engine, config *DaemonConfig) (*Server, error) {
  1803. runtime, err := NewRuntime(config)
  1804. if err != nil {
  1805. return nil, err
  1806. }
  1807. srv := &Server{
  1808. Eng: eng,
  1809. runtime: runtime,
  1810. pullingPool: make(map[string]chan struct{}),
  1811. pushingPool: make(map[string]chan struct{}),
  1812. events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
  1813. listeners: make(map[string]chan utils.JSONMessage),
  1814. }
  1815. runtime.srv = srv
  1816. return srv, nil
  1817. }
  1818. func (srv *Server) HTTPRequestFactory(metaHeaders map[string][]string) *utils.HTTPRequestFactory {
  1819. srv.Lock()
  1820. defer srv.Unlock()
  1821. v := dockerVersion()
  1822. httpVersion := make([]utils.VersionInfo, 0, 4)
  1823. httpVersion = append(httpVersion, &simpleVersionInfo{"docker", v.Get("Version")})
  1824. httpVersion = append(httpVersion, &simpleVersionInfo{"go", v.Get("GoVersion")})
  1825. httpVersion = append(httpVersion, &simpleVersionInfo{"git-commit", v.Get("GitCommit")})
  1826. httpVersion = append(httpVersion, &simpleVersionInfo{"kernel", v.Get("KernelVersion")})
  1827. httpVersion = append(httpVersion, &simpleVersionInfo{"os", v.Get("Os")})
  1828. httpVersion = append(httpVersion, &simpleVersionInfo{"arch", v.Get("Arch")})
  1829. ud := utils.NewHTTPUserAgentDecorator(httpVersion...)
  1830. md := &utils.HTTPMetaHeadersDecorator{
  1831. Headers: metaHeaders,
  1832. }
  1833. factory := utils.NewHTTPRequestFactory(ud, md)
  1834. return factory
  1835. }
  1836. func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
  1837. now := time.Now().UTC().Unix()
  1838. jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
  1839. srv.AddEvent(jm)
  1840. for _, c := range srv.listeners {
  1841. select { // non blocking channel
  1842. case c <- jm:
  1843. default:
  1844. }
  1845. }
  1846. return &jm
  1847. }
  1848. func (srv *Server) AddEvent(jm utils.JSONMessage) {
  1849. srv.Lock()
  1850. defer srv.Unlock()
  1851. srv.events = append(srv.events, jm)
  1852. }
  1853. func (srv *Server) GetEvents() []utils.JSONMessage {
  1854. srv.RLock()
  1855. defer srv.RUnlock()
  1856. return srv.events
  1857. }
  1858. type Server struct {
  1859. sync.RWMutex
  1860. runtime *Runtime
  1861. pullingPool map[string]chan struct{}
  1862. pushingPool map[string]chan struct{}
  1863. events []utils.JSONMessage
  1864. listeners map[string]chan utils.JSONMessage
  1865. Eng *engine.Engine
  1866. }