server.go 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/dotcloud/docker/api"
  6. "github.com/dotcloud/docker/archive"
  7. "github.com/dotcloud/docker/daemonconfig"
  8. "github.com/dotcloud/docker/dockerversion"
  9. "github.com/dotcloud/docker/engine"
  10. "github.com/dotcloud/docker/graph"
  11. "github.com/dotcloud/docker/image"
  12. "github.com/dotcloud/docker/pkg/graphdb"
  13. "github.com/dotcloud/docker/pkg/signal"
  14. "github.com/dotcloud/docker/registry"
  15. "github.com/dotcloud/docker/runconfig"
  16. "github.com/dotcloud/docker/runtime"
  17. "github.com/dotcloud/docker/utils"
  18. "io"
  19. "io/ioutil"
  20. "log"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. gosignal "os/signal"
  26. "path"
  27. "path/filepath"
  28. goruntime "runtime"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "syscall"
  33. "time"
  34. )
  35. // jobInitApi runs the remote api server `srv` as a daemon,
  36. // Only one api server can run at the same time - this is enforced by a pidfile.
  37. // The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
  38. func InitServer(job *engine.Job) engine.Status {
  39. job.Logf("Creating server")
  40. srv, err := NewServer(job.Eng, daemonconfig.ConfigFromJob(job))
  41. if err != nil {
  42. return job.Error(err)
  43. }
  44. if srv.runtime.Config().Pidfile != "" {
  45. job.Logf("Creating pidfile")
  46. if err := utils.CreatePidFile(srv.runtime.Config().Pidfile); err != nil {
  47. // FIXME: do we need fatal here instead of returning a job error?
  48. log.Fatal(err)
  49. }
  50. }
  51. job.Logf("Setting up signal traps")
  52. c := make(chan os.Signal, 1)
  53. gosignal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
  54. go func() {
  55. sig := <-c
  56. log.Printf("Received signal '%v', starting shutdown of docker...\n", sig)
  57. utils.RemovePidFile(srv.runtime.Config().Pidfile)
  58. srv.Close()
  59. os.Exit(0)
  60. }()
  61. job.Eng.Hack_SetGlobalVar("httpapi.server", srv)
  62. job.Eng.Hack_SetGlobalVar("httpapi.runtime", srv.runtime)
  63. for name, handler := range map[string]engine.Handler{
  64. "export": srv.ContainerExport,
  65. "create": srv.ContainerCreate,
  66. "stop": srv.ContainerStop,
  67. "restart": srv.ContainerRestart,
  68. "start": srv.ContainerStart,
  69. "kill": srv.ContainerKill,
  70. "wait": srv.ContainerWait,
  71. "tag": srv.ImageTag,
  72. "resize": srv.ContainerResize,
  73. "commit": srv.ContainerCommit,
  74. "info": srv.DockerInfo,
  75. "container_delete": srv.ContainerDestroy,
  76. "image_export": srv.ImageExport,
  77. "images": srv.Images,
  78. "history": srv.ImageHistory,
  79. "viz": srv.ImagesViz,
  80. "container_copy": srv.ContainerCopy,
  81. "insert": srv.ImageInsert,
  82. "attach": srv.ContainerAttach,
  83. "search": srv.ImagesSearch,
  84. "changes": srv.ContainerChanges,
  85. "top": srv.ContainerTop,
  86. "version": srv.DockerVersion,
  87. "load": srv.ImageLoad,
  88. "build": srv.Build,
  89. "pull": srv.ImagePull,
  90. "import": srv.ImageImport,
  91. "image_delete": srv.ImageDelete,
  92. "inspect": srv.JobInspect,
  93. "events": srv.Events,
  94. "push": srv.ImagePush,
  95. "containers": srv.Containers,
  96. "auth": srv.Auth,
  97. } {
  98. if err := job.Eng.Register(name, handler); err != nil {
  99. return job.Error(err)
  100. }
  101. }
  102. return engine.StatusOK
  103. }
  104. // simpleVersionInfo is a simple implementation of
  105. // the interface VersionInfo, which is used
  106. // to provide version information for some product,
  107. // component, etc. It stores the product name and the version
  108. // in string and returns them on calls to Name() and Version().
  109. type simpleVersionInfo struct {
  110. name string
  111. version string
  112. }
  113. func (v *simpleVersionInfo) Name() string {
  114. return v.name
  115. }
  116. func (v *simpleVersionInfo) Version() string {
  117. return v.version
  118. }
  119. // ContainerKill send signal to the container
  120. // If no signal is given (sig 0), then Kill with SIGKILL and wait
  121. // for the container to exit.
  122. // If a signal is given, then just send it to the container and return.
  123. func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
  124. if n := len(job.Args); n < 1 || n > 2 {
  125. return job.Errorf("Usage: %s CONTAINER [SIGNAL]", job.Name)
  126. }
  127. var (
  128. name = job.Args[0]
  129. sig uint64
  130. err error
  131. )
  132. // If we have a signal, look at it. Otherwise, do nothing
  133. if len(job.Args) == 2 && job.Args[1] != "" {
  134. // Check if we passed the signal as a number:
  135. // The largest legal signal is 31, so let's parse on 5 bits
  136. sig, err = strconv.ParseUint(job.Args[1], 10, 5)
  137. if err != nil {
  138. // The signal is not a number, treat it as a string (either like "KILL" or like "SIGKILL")
  139. sig = uint64(signal.SignalMap[strings.TrimPrefix(job.Args[1], "SIG")])
  140. if sig == 0 {
  141. return job.Errorf("Invalid signal: %s", job.Args[1])
  142. }
  143. }
  144. }
  145. if container := srv.runtime.Get(name); container != nil {
  146. // If no signal is passed, or SIGKILL, perform regular Kill (SIGKILL + wait())
  147. if sig == 0 || syscall.Signal(sig) == syscall.SIGKILL {
  148. if err := container.Kill(); err != nil {
  149. return job.Errorf("Cannot kill container %s: %s", name, err)
  150. }
  151. srv.LogEvent("kill", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  152. } else {
  153. // Otherwise, just send the requested signal
  154. if err := container.KillSig(int(sig)); err != nil {
  155. return job.Errorf("Cannot kill container %s: %s", name, err)
  156. }
  157. // FIXME: Add event for signals
  158. }
  159. } else {
  160. return job.Errorf("No such container: %s", name)
  161. }
  162. return engine.StatusOK
  163. }
  164. func (srv *Server) Auth(job *engine.Job) engine.Status {
  165. var (
  166. err error
  167. authConfig = &registry.AuthConfig{}
  168. )
  169. job.GetenvJson("authConfig", authConfig)
  170. // TODO: this is only done here because auth and registry need to be merged into one pkg
  171. if addr := authConfig.ServerAddress; addr != "" && addr != registry.IndexServerAddress() {
  172. addr, err = registry.ExpandAndVerifyRegistryUrl(addr)
  173. if err != nil {
  174. return job.Error(err)
  175. }
  176. authConfig.ServerAddress = addr
  177. }
  178. status, err := registry.Login(authConfig, srv.HTTPRequestFactory(nil))
  179. if err != nil {
  180. return job.Error(err)
  181. }
  182. job.Printf("%s\n", status)
  183. return engine.StatusOK
  184. }
  185. func (srv *Server) Events(job *engine.Job) engine.Status {
  186. if len(job.Args) != 1 {
  187. return job.Errorf("Usage: %s FROM", job.Name)
  188. }
  189. var (
  190. from = job.Args[0]
  191. since = job.GetenvInt64("since")
  192. )
  193. sendEvent := func(event *utils.JSONMessage) error {
  194. b, err := json.Marshal(event)
  195. if err != nil {
  196. return fmt.Errorf("JSON error")
  197. }
  198. _, err = job.Stdout.Write(b)
  199. if err != nil {
  200. // On error, evict the listener
  201. utils.Errorf("%s", err)
  202. srv.Lock()
  203. delete(srv.listeners, from)
  204. srv.Unlock()
  205. return err
  206. }
  207. return nil
  208. }
  209. listener := make(chan utils.JSONMessage)
  210. srv.Lock()
  211. if old, ok := srv.listeners[from]; ok {
  212. delete(srv.listeners, from)
  213. close(old)
  214. }
  215. srv.listeners[from] = listener
  216. srv.Unlock()
  217. job.Stdout.Write(nil) // flush
  218. if since != 0 {
  219. // If since, send previous events that happened after the timestamp
  220. for _, event := range srv.GetEvents() {
  221. if event.Time >= since {
  222. err := sendEvent(&event)
  223. if err != nil && err.Error() == "JSON error" {
  224. continue
  225. }
  226. if err != nil {
  227. job.Error(err)
  228. return engine.StatusErr
  229. }
  230. }
  231. }
  232. }
  233. for event := range listener {
  234. err := sendEvent(&event)
  235. if err != nil && err.Error() == "JSON error" {
  236. continue
  237. }
  238. if err != nil {
  239. return job.Error(err)
  240. }
  241. }
  242. return engine.StatusOK
  243. }
  244. func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
  245. if len(job.Args) != 1 {
  246. return job.Errorf("Usage: %s container_id", job.Name)
  247. }
  248. name := job.Args[0]
  249. if container := srv.runtime.Get(name); container != nil {
  250. data, err := container.Export()
  251. if err != nil {
  252. return job.Errorf("%s: %s", name, err)
  253. }
  254. defer data.Close()
  255. // Stream the entire contents of the container (basically a volatile snapshot)
  256. if _, err := io.Copy(job.Stdout, data); err != nil {
  257. return job.Errorf("%s: %s", name, err)
  258. }
  259. // FIXME: factor job-specific LogEvent to engine.Job.Run()
  260. srv.LogEvent("export", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  261. return engine.StatusOK
  262. }
  263. return job.Errorf("No such container: %s", name)
  264. }
  265. // ImageExport exports all images with the given tag. All versions
  266. // containing the same tag are exported. The resulting output is an
  267. // uncompressed tar ball.
  268. // name is the set of tags to export.
  269. // out is the writer where the images are written to.
  270. func (srv *Server) ImageExport(job *engine.Job) engine.Status {
  271. if len(job.Args) != 1 {
  272. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  273. }
  274. name := job.Args[0]
  275. // get image json
  276. tempdir, err := ioutil.TempDir("", "docker-export-")
  277. if err != nil {
  278. return job.Error(err)
  279. }
  280. defer os.RemoveAll(tempdir)
  281. utils.Debugf("Serializing %s", name)
  282. rootRepo, err := srv.runtime.Repositories().Get(name)
  283. if err != nil {
  284. return job.Error(err)
  285. }
  286. if rootRepo != nil {
  287. for _, id := range rootRepo {
  288. image, err := srv.ImageInspect(id)
  289. if err != nil {
  290. return job.Error(err)
  291. }
  292. if err := srv.exportImage(image, tempdir); err != nil {
  293. return job.Error(err)
  294. }
  295. }
  296. // write repositories
  297. rootRepoMap := map[string]graph.Repository{}
  298. rootRepoMap[name] = rootRepo
  299. rootRepoJson, _ := json.Marshal(rootRepoMap)
  300. if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.ModeAppend); err != nil {
  301. return job.Error(err)
  302. }
  303. } else {
  304. image, err := srv.ImageInspect(name)
  305. if err != nil {
  306. return job.Error(err)
  307. }
  308. if err := srv.exportImage(image, tempdir); err != nil {
  309. return job.Error(err)
  310. }
  311. }
  312. fs, err := archive.Tar(tempdir, archive.Uncompressed)
  313. if err != nil {
  314. return job.Error(err)
  315. }
  316. defer fs.Close()
  317. if _, err := io.Copy(job.Stdout, fs); err != nil {
  318. return job.Error(err)
  319. }
  320. return engine.StatusOK
  321. }
  322. func (srv *Server) exportImage(img *image.Image, tempdir string) error {
  323. for i := img; i != nil; {
  324. // temporary directory
  325. tmpImageDir := path.Join(tempdir, i.ID)
  326. if err := os.Mkdir(tmpImageDir, os.ModeDir); err != nil {
  327. if os.IsExist(err) {
  328. return nil
  329. }
  330. return err
  331. }
  332. var version = "1.0"
  333. var versionBuf = []byte(version)
  334. if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.ModeAppend); err != nil {
  335. return err
  336. }
  337. // serialize json
  338. b, err := json.Marshal(i)
  339. if err != nil {
  340. return err
  341. }
  342. if err := ioutil.WriteFile(path.Join(tmpImageDir, "json"), b, os.ModeAppend); err != nil {
  343. return err
  344. }
  345. // serialize filesystem
  346. fs, err := i.TarLayer()
  347. if err != nil {
  348. return err
  349. }
  350. defer fs.Close()
  351. fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar"))
  352. if err != nil {
  353. return err
  354. }
  355. if written, err := io.Copy(fsTar, fs); err != nil {
  356. return err
  357. } else {
  358. utils.Debugf("rendered layer for %s of [%d] size", i.ID, written)
  359. }
  360. if err = fsTar.Close(); err != nil {
  361. return err
  362. }
  363. // find parent
  364. if i.Parent != "" {
  365. i, err = srv.ImageInspect(i.Parent)
  366. if err != nil {
  367. return err
  368. }
  369. } else {
  370. i = nil
  371. }
  372. }
  373. return nil
  374. }
  375. func (srv *Server) Build(job *engine.Job) engine.Status {
  376. if len(job.Args) != 0 {
  377. return job.Errorf("Usage: %s\n", job.Name)
  378. }
  379. var (
  380. remoteURL = job.Getenv("remote")
  381. repoName = job.Getenv("t")
  382. suppressOutput = job.GetenvBool("q")
  383. noCache = job.GetenvBool("nocache")
  384. rm = job.GetenvBool("rm")
  385. authConfig = &registry.AuthConfig{}
  386. configFile = &registry.ConfigFile{}
  387. tag string
  388. context io.ReadCloser
  389. )
  390. job.GetenvJson("authConfig", authConfig)
  391. job.GetenvJson("configFile", configFile)
  392. repoName, tag = utils.ParseRepositoryTag(repoName)
  393. if remoteURL == "" {
  394. context = ioutil.NopCloser(job.Stdin)
  395. } else if utils.IsGIT(remoteURL) {
  396. if !strings.HasPrefix(remoteURL, "git://") {
  397. remoteURL = "https://" + remoteURL
  398. }
  399. root, err := ioutil.TempDir("", "docker-build-git")
  400. if err != nil {
  401. return job.Error(err)
  402. }
  403. defer os.RemoveAll(root)
  404. if output, err := exec.Command("git", "clone", "--recursive", remoteURL, root).CombinedOutput(); err != nil {
  405. return job.Errorf("Error trying to use git: %s (%s)", err, output)
  406. }
  407. c, err := archive.Tar(root, archive.Uncompressed)
  408. if err != nil {
  409. return job.Error(err)
  410. }
  411. context = c
  412. } else if utils.IsURL(remoteURL) {
  413. f, err := utils.Download(remoteURL)
  414. if err != nil {
  415. return job.Error(err)
  416. }
  417. defer f.Body.Close()
  418. dockerFile, err := ioutil.ReadAll(f.Body)
  419. if err != nil {
  420. return job.Error(err)
  421. }
  422. c, err := archive.Generate("Dockerfile", string(dockerFile))
  423. if err != nil {
  424. return job.Error(err)
  425. }
  426. context = c
  427. }
  428. defer context.Close()
  429. sf := utils.NewStreamFormatter(job.GetenvBool("json"))
  430. b := NewBuildFile(srv,
  431. &utils.StdoutFormater{
  432. Writer: job.Stdout,
  433. StreamFormatter: sf,
  434. },
  435. &utils.StderrFormater{
  436. Writer: job.Stdout,
  437. StreamFormatter: sf,
  438. },
  439. !suppressOutput, !noCache, rm, job.Stdout, sf, authConfig, configFile)
  440. id, err := b.Build(context)
  441. if err != nil {
  442. return job.Error(err)
  443. }
  444. if repoName != "" {
  445. srv.runtime.Repositories().Set(repoName, tag, id, false)
  446. }
  447. return engine.StatusOK
  448. }
  449. // Loads a set of images into the repository. This is the complementary of ImageExport.
  450. // The input stream is an uncompressed tar ball containing images and metadata.
  451. func (srv *Server) ImageLoad(job *engine.Job) engine.Status {
  452. tmpImageDir, err := ioutil.TempDir("", "docker-import-")
  453. if err != nil {
  454. return job.Error(err)
  455. }
  456. defer os.RemoveAll(tmpImageDir)
  457. var (
  458. repoTarFile = path.Join(tmpImageDir, "repo.tar")
  459. repoDir = path.Join(tmpImageDir, "repo")
  460. )
  461. tarFile, err := os.Create(repoTarFile)
  462. if err != nil {
  463. return job.Error(err)
  464. }
  465. if _, err := io.Copy(tarFile, job.Stdin); err != nil {
  466. return job.Error(err)
  467. }
  468. tarFile.Close()
  469. repoFile, err := os.Open(repoTarFile)
  470. if err != nil {
  471. return job.Error(err)
  472. }
  473. if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
  474. return job.Error(err)
  475. }
  476. if err := archive.Untar(repoFile, repoDir, nil); err != nil {
  477. return job.Error(err)
  478. }
  479. dirs, err := ioutil.ReadDir(repoDir)
  480. if err != nil {
  481. return job.Error(err)
  482. }
  483. for _, d := range dirs {
  484. if d.IsDir() {
  485. if err := srv.recursiveLoad(d.Name(), tmpImageDir); err != nil {
  486. return job.Error(err)
  487. }
  488. }
  489. }
  490. repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
  491. if err == nil {
  492. repositories := map[string]graph.Repository{}
  493. if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
  494. return job.Error(err)
  495. }
  496. for imageName, tagMap := range repositories {
  497. for tag, address := range tagMap {
  498. if err := srv.runtime.Repositories().Set(imageName, tag, address, true); err != nil {
  499. return job.Error(err)
  500. }
  501. }
  502. }
  503. } else if !os.IsNotExist(err) {
  504. return job.Error(err)
  505. }
  506. return engine.StatusOK
  507. }
  508. func (srv *Server) recursiveLoad(address, tmpImageDir string) error {
  509. if _, err := srv.ImageInspect(address); err != nil {
  510. utils.Debugf("Loading %s", address)
  511. imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
  512. if err != nil {
  513. utils.Debugf("Error reading json", err)
  514. return err
  515. }
  516. layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
  517. if err != nil {
  518. utils.Debugf("Error reading embedded tar", err)
  519. return err
  520. }
  521. img, err := image.NewImgJSON(imageJson)
  522. if err != nil {
  523. utils.Debugf("Error unmarshalling json", err)
  524. return err
  525. }
  526. if img.Parent != "" {
  527. if !srv.runtime.Graph().Exists(img.Parent) {
  528. if err := srv.recursiveLoad(img.Parent, tmpImageDir); err != nil {
  529. return err
  530. }
  531. }
  532. }
  533. if err := srv.runtime.Graph().Register(imageJson, layer, img); err != nil {
  534. return err
  535. }
  536. }
  537. utils.Debugf("Completed processing %s", address)
  538. return nil
  539. }
  540. func (srv *Server) ImagesSearch(job *engine.Job) engine.Status {
  541. if n := len(job.Args); n != 1 {
  542. return job.Errorf("Usage: %s TERM", job.Name)
  543. }
  544. var (
  545. term = job.Args[0]
  546. metaHeaders = map[string][]string{}
  547. authConfig = &registry.AuthConfig{}
  548. )
  549. job.GetenvJson("authConfig", authConfig)
  550. job.GetenvJson("metaHeaders", metaHeaders)
  551. r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), registry.IndexServerAddress())
  552. if err != nil {
  553. return job.Error(err)
  554. }
  555. results, err := r.SearchRepositories(term)
  556. if err != nil {
  557. return job.Error(err)
  558. }
  559. outs := engine.NewTable("star_count", 0)
  560. for _, result := range results.Results {
  561. out := &engine.Env{}
  562. out.Import(result)
  563. outs.Add(out)
  564. }
  565. outs.ReverseSort()
  566. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  567. return job.Error(err)
  568. }
  569. return engine.StatusOK
  570. }
  571. func (srv *Server) ImageInsert(job *engine.Job) engine.Status {
  572. if len(job.Args) != 3 {
  573. return job.Errorf("Usage: %s IMAGE URL PATH\n", job.Name)
  574. }
  575. var (
  576. name = job.Args[0]
  577. url = job.Args[1]
  578. path = job.Args[2]
  579. )
  580. sf := utils.NewStreamFormatter(job.GetenvBool("json"))
  581. out := utils.NewWriteFlusher(job.Stdout)
  582. img, err := srv.runtime.Repositories().LookupImage(name)
  583. if err != nil {
  584. return job.Error(err)
  585. }
  586. file, err := utils.Download(url)
  587. if err != nil {
  588. return job.Error(err)
  589. }
  590. defer file.Body.Close()
  591. config, _, _, err := runconfig.Parse([]string{img.ID, "echo", "insert", url, path}, srv.runtime.SystemConfig())
  592. if err != nil {
  593. return job.Error(err)
  594. }
  595. c, _, err := srv.runtime.Create(config, "")
  596. if err != nil {
  597. return job.Error(err)
  598. }
  599. if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, utils.TruncateID(img.ID), "Downloading"), path); err != nil {
  600. return job.Error(err)
  601. }
  602. // FIXME: Handle custom repo, tag comment, author
  603. img, err = srv.runtime.Commit(c, "", "", img.Comment, img.Author, nil)
  604. if err != nil {
  605. out.Write(sf.FormatError(err))
  606. return engine.StatusErr
  607. }
  608. out.Write(sf.FormatStatus("", img.ID))
  609. return engine.StatusOK
  610. }
  611. func (srv *Server) ImagesViz(job *engine.Job) engine.Status {
  612. images, _ := srv.runtime.Graph().Map()
  613. if images == nil {
  614. return engine.StatusOK
  615. }
  616. job.Stdout.Write([]byte("digraph docker {\n"))
  617. var (
  618. parentImage *image.Image
  619. err error
  620. )
  621. for _, image := range images {
  622. parentImage, err = image.GetParent()
  623. if err != nil {
  624. return job.Errorf("Error while getting parent image: %v", err)
  625. }
  626. if parentImage != nil {
  627. job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n"))
  628. } else {
  629. job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n"))
  630. }
  631. }
  632. reporefs := make(map[string][]string)
  633. for name, repository := range srv.runtime.Repositories().Repositories {
  634. for tag, id := range repository {
  635. reporefs[utils.TruncateID(id)] = append(reporefs[utils.TruncateID(id)], fmt.Sprintf("%s:%s", name, tag))
  636. }
  637. }
  638. for id, repos := range reporefs {
  639. job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
  640. }
  641. job.Stdout.Write([]byte(" base [style=invisible]\n}\n"))
  642. return engine.StatusOK
  643. }
  644. func (srv *Server) Images(job *engine.Job) engine.Status {
  645. var (
  646. allImages map[string]*image.Image
  647. err error
  648. )
  649. if job.GetenvBool("all") {
  650. allImages, err = srv.runtime.Graph().Map()
  651. } else {
  652. allImages, err = srv.runtime.Graph().Heads()
  653. }
  654. if err != nil {
  655. return job.Error(err)
  656. }
  657. lookup := make(map[string]*engine.Env)
  658. for name, repository := range srv.runtime.Repositories().Repositories {
  659. if job.Getenv("filter") != "" {
  660. if match, _ := path.Match(job.Getenv("filter"), name); !match {
  661. continue
  662. }
  663. }
  664. for tag, id := range repository {
  665. image, err := srv.runtime.Graph().Get(id)
  666. if err != nil {
  667. log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
  668. continue
  669. }
  670. if out, exists := lookup[id]; exists {
  671. out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag)))
  672. } else {
  673. out := &engine.Env{}
  674. delete(allImages, id)
  675. out.Set("ParentId", image.Parent)
  676. out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)})
  677. out.Set("Id", image.ID)
  678. out.SetInt64("Created", image.Created.Unix())
  679. out.SetInt64("Size", image.Size)
  680. out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
  681. lookup[id] = out
  682. }
  683. }
  684. }
  685. outs := engine.NewTable("Created", len(lookup))
  686. for _, value := range lookup {
  687. outs.Add(value)
  688. }
  689. // Display images which aren't part of a repository/tag
  690. if job.Getenv("filter") == "" {
  691. for _, image := range allImages {
  692. out := &engine.Env{}
  693. out.Set("ParentId", image.Parent)
  694. out.SetList("RepoTags", []string{"<none>:<none>"})
  695. out.Set("Id", image.ID)
  696. out.SetInt64("Created", image.Created.Unix())
  697. out.SetInt64("Size", image.Size)
  698. out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
  699. outs.Add(out)
  700. }
  701. }
  702. outs.ReverseSort()
  703. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  704. return job.Error(err)
  705. }
  706. return engine.StatusOK
  707. }
  708. func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
  709. images, _ := srv.runtime.Graph().Map()
  710. var imgcount int
  711. if images == nil {
  712. imgcount = 0
  713. } else {
  714. imgcount = len(images)
  715. }
  716. kernelVersion := "<unknown>"
  717. if kv, err := utils.GetKernelVersion(); err == nil {
  718. kernelVersion = kv.String()
  719. }
  720. // if we still have the original dockerinit binary from before we copied it locally, let's return the path to that, since that's more intuitive (the copied path is trivial to derive by hand given VERSION)
  721. initPath := utils.DockerInitPath("")
  722. if initPath == "" {
  723. // if that fails, we'll just return the path from the runtime
  724. initPath = srv.runtime.SystemInitPath()
  725. }
  726. v := &engine.Env{}
  727. v.SetInt("Containers", len(srv.runtime.List()))
  728. v.SetInt("Images", imgcount)
  729. v.Set("Driver", srv.runtime.GraphDriver().String())
  730. v.SetJson("DriverStatus", srv.runtime.GraphDriver().Status())
  731. v.SetBool("MemoryLimit", srv.runtime.SystemConfig().MemoryLimit)
  732. v.SetBool("SwapLimit", srv.runtime.SystemConfig().SwapLimit)
  733. v.SetBool("IPv4Forwarding", !srv.runtime.SystemConfig().IPv4ForwardingDisabled)
  734. v.SetBool("Debug", os.Getenv("DEBUG") != "")
  735. v.SetInt("NFd", utils.GetTotalUsedFds())
  736. v.SetInt("NGoroutines", goruntime.NumGoroutine())
  737. v.Set("ExecutionDriver", srv.runtime.ExecutionDriver().Name())
  738. v.SetInt("NEventsListener", len(srv.listeners))
  739. v.Set("KernelVersion", kernelVersion)
  740. v.Set("IndexServerAddress", registry.IndexServerAddress())
  741. v.Set("InitSha1", dockerversion.INITSHA1)
  742. v.Set("InitPath", initPath)
  743. if _, err := v.WriteTo(job.Stdout); err != nil {
  744. return job.Error(err)
  745. }
  746. return engine.StatusOK
  747. }
  748. func (srv *Server) DockerVersion(job *engine.Job) engine.Status {
  749. v := &engine.Env{}
  750. v.Set("Version", dockerversion.VERSION)
  751. v.Set("ApiVersion", api.APIVERSION)
  752. v.Set("GitCommit", dockerversion.GITCOMMIT)
  753. v.Set("GoVersion", goruntime.Version())
  754. v.Set("Os", goruntime.GOOS)
  755. v.Set("Arch", goruntime.GOARCH)
  756. if kernelVersion, err := utils.GetKernelVersion(); err == nil {
  757. v.Set("KernelVersion", kernelVersion.String())
  758. }
  759. if _, err := v.WriteTo(job.Stdout); err != nil {
  760. return job.Error(err)
  761. }
  762. return engine.StatusOK
  763. }
  764. func (srv *Server) ImageHistory(job *engine.Job) engine.Status {
  765. if n := len(job.Args); n != 1 {
  766. return job.Errorf("Usage: %s IMAGE", job.Name)
  767. }
  768. name := job.Args[0]
  769. foundImage, err := srv.runtime.Repositories().LookupImage(name)
  770. if err != nil {
  771. return job.Error(err)
  772. }
  773. lookupMap := make(map[string][]string)
  774. for name, repository := range srv.runtime.Repositories().Repositories {
  775. for tag, id := range repository {
  776. // If the ID already has a reverse lookup, do not update it unless for "latest"
  777. if _, exists := lookupMap[id]; !exists {
  778. lookupMap[id] = []string{}
  779. }
  780. lookupMap[id] = append(lookupMap[id], name+":"+tag)
  781. }
  782. }
  783. outs := engine.NewTable("Created", 0)
  784. err = foundImage.WalkHistory(func(img *image.Image) error {
  785. out := &engine.Env{}
  786. out.Set("Id", img.ID)
  787. out.SetInt64("Created", img.Created.Unix())
  788. out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " "))
  789. out.SetList("Tags", lookupMap[img.ID])
  790. out.SetInt64("Size", img.Size)
  791. outs.Add(out)
  792. return nil
  793. })
  794. outs.ReverseSort()
  795. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  796. return job.Error(err)
  797. }
  798. return engine.StatusOK
  799. }
  800. func (srv *Server) ContainerTop(job *engine.Job) engine.Status {
  801. if len(job.Args) != 1 && len(job.Args) != 2 {
  802. return job.Errorf("Not enough arguments. Usage: %s CONTAINER [PS_ARGS]\n", job.Name)
  803. }
  804. var (
  805. name = job.Args[0]
  806. psArgs = "-ef"
  807. )
  808. if len(job.Args) == 2 && job.Args[1] != "" {
  809. psArgs = job.Args[1]
  810. }
  811. if container := srv.runtime.Get(name); container != nil {
  812. if !container.State.IsRunning() {
  813. return job.Errorf("Container %s is not running", name)
  814. }
  815. pids, err := srv.runtime.ExecutionDriver().GetPidsForContainer(container.ID)
  816. if err != nil {
  817. return job.Error(err)
  818. }
  819. output, err := exec.Command("ps", psArgs).Output()
  820. if err != nil {
  821. return job.Errorf("Error running ps: %s", err)
  822. }
  823. lines := strings.Split(string(output), "\n")
  824. header := strings.Fields(lines[0])
  825. out := &engine.Env{}
  826. out.SetList("Titles", header)
  827. pidIndex := -1
  828. for i, name := range header {
  829. if name == "PID" {
  830. pidIndex = i
  831. }
  832. }
  833. if pidIndex == -1 {
  834. return job.Errorf("Couldn't find PID field in ps output")
  835. }
  836. processes := [][]string{}
  837. for _, line := range lines[1:] {
  838. if len(line) == 0 {
  839. continue
  840. }
  841. fields := strings.Fields(line)
  842. p, err := strconv.Atoi(fields[pidIndex])
  843. if err != nil {
  844. return job.Errorf("Unexpected pid '%s': %s", fields[pidIndex], err)
  845. }
  846. for _, pid := range pids {
  847. if pid == p {
  848. // Make sure number of fields equals number of header titles
  849. // merging "overhanging" fields
  850. process := fields[:len(header)-1]
  851. process = append(process, strings.Join(fields[len(header)-1:], " "))
  852. processes = append(processes, process)
  853. }
  854. }
  855. }
  856. out.SetJson("Processes", processes)
  857. out.WriteTo(job.Stdout)
  858. return engine.StatusOK
  859. }
  860. return job.Errorf("No such container: %s", name)
  861. }
  862. func (srv *Server) ContainerChanges(job *engine.Job) engine.Status {
  863. if n := len(job.Args); n != 1 {
  864. return job.Errorf("Usage: %s CONTAINER", job.Name)
  865. }
  866. name := job.Args[0]
  867. if container := srv.runtime.Get(name); container != nil {
  868. outs := engine.NewTable("", 0)
  869. changes, err := container.Changes()
  870. if err != nil {
  871. return job.Error(err)
  872. }
  873. for _, change := range changes {
  874. out := &engine.Env{}
  875. if err := out.Import(change); err != nil {
  876. return job.Error(err)
  877. }
  878. outs.Add(out)
  879. }
  880. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  881. return job.Error(err)
  882. }
  883. } else {
  884. return job.Errorf("No such container: %s", name)
  885. }
  886. return engine.StatusOK
  887. }
  888. func (srv *Server) Containers(job *engine.Job) engine.Status {
  889. var (
  890. foundBefore bool
  891. displayed int
  892. all = job.GetenvBool("all")
  893. since = job.Getenv("since")
  894. before = job.Getenv("before")
  895. n = job.GetenvInt("limit")
  896. size = job.GetenvBool("size")
  897. )
  898. outs := engine.NewTable("Created", 0)
  899. names := map[string][]string{}
  900. srv.runtime.ContainerGraph().Walk("/", func(p string, e *graphdb.Entity) error {
  901. names[e.ID()] = append(names[e.ID()], p)
  902. return nil
  903. }, -1)
  904. var beforeCont, sinceCont *runtime.Container
  905. if before != "" {
  906. beforeCont = srv.runtime.Get(before)
  907. if beforeCont == nil {
  908. return job.Error(fmt.Errorf("Could not find container with name or id %s", before))
  909. }
  910. }
  911. if since != "" {
  912. sinceCont = srv.runtime.Get(since)
  913. if sinceCont == nil {
  914. return job.Error(fmt.Errorf("Could not find container with name or id %s", since))
  915. }
  916. }
  917. for _, container := range srv.runtime.List() {
  918. if !container.State.IsRunning() && !all && n <= 0 && since == "" && before == "" {
  919. continue
  920. }
  921. if before != "" && !foundBefore {
  922. if container.ID == beforeCont.ID {
  923. foundBefore = true
  924. }
  925. continue
  926. }
  927. if n > 0 && displayed == n {
  928. break
  929. }
  930. if since != "" {
  931. if container.ID == sinceCont.ID {
  932. break
  933. }
  934. }
  935. displayed++
  936. out := &engine.Env{}
  937. out.Set("Id", container.ID)
  938. out.SetList("Names", names[container.ID])
  939. out.Set("Image", srv.runtime.Repositories().ImageName(container.Image))
  940. if len(container.Args) > 0 {
  941. out.Set("Command", fmt.Sprintf("\"%s %s\"", container.Path, container.ArgsAsString()))
  942. } else {
  943. out.Set("Command", fmt.Sprintf("\"%s\"", container.Path))
  944. }
  945. out.SetInt64("Created", container.Created.Unix())
  946. out.Set("Status", container.State.String())
  947. str, err := container.NetworkSettings.PortMappingAPI().ToListString()
  948. if err != nil {
  949. return job.Error(err)
  950. }
  951. out.Set("Ports", str)
  952. if size {
  953. sizeRw, sizeRootFs := container.GetSize()
  954. out.SetInt64("SizeRw", sizeRw)
  955. out.SetInt64("SizeRootFs", sizeRootFs)
  956. }
  957. outs.Add(out)
  958. }
  959. outs.ReverseSort()
  960. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  961. return job.Error(err)
  962. }
  963. return engine.StatusOK
  964. }
  965. func (srv *Server) ContainerCommit(job *engine.Job) engine.Status {
  966. if len(job.Args) != 1 {
  967. return job.Errorf("Not enough arguments. Usage: %s CONTAINER\n", job.Name)
  968. }
  969. name := job.Args[0]
  970. container := srv.runtime.Get(name)
  971. if container == nil {
  972. return job.Errorf("No such container: %s", name)
  973. }
  974. var config = container.Config
  975. var newConfig runconfig.Config
  976. if err := job.GetenvJson("config", &newConfig); err != nil {
  977. return job.Error(err)
  978. }
  979. if err := runconfig.Merge(&newConfig, config); err != nil {
  980. return job.Error(err)
  981. }
  982. img, err := srv.runtime.Commit(container, job.Getenv("repo"), job.Getenv("tag"), job.Getenv("comment"), job.Getenv("author"), &newConfig)
  983. if err != nil {
  984. return job.Error(err)
  985. }
  986. job.Printf("%s\n", img.ID)
  987. return engine.StatusOK
  988. }
  989. func (srv *Server) ImageTag(job *engine.Job) engine.Status {
  990. if len(job.Args) != 2 && len(job.Args) != 3 {
  991. return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name)
  992. }
  993. var tag string
  994. if len(job.Args) == 3 {
  995. tag = job.Args[2]
  996. }
  997. if err := srv.runtime.Repositories().Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil {
  998. return job.Error(err)
  999. }
  1000. return engine.StatusOK
  1001. }
  1002. func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
  1003. history, err := r.GetRemoteHistory(imgID, endpoint, token)
  1004. if err != nil {
  1005. return err
  1006. }
  1007. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
  1008. // FIXME: Try to stream the images?
  1009. // FIXME: Launch the getRemoteImage() in goroutines
  1010. for i := len(history) - 1; i >= 0; i-- {
  1011. id := history[i]
  1012. // ensure no two downloads of the same layer happen at the same time
  1013. if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
  1014. utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err)
  1015. <-c
  1016. }
  1017. defer srv.poolRemove("pull", "layer:"+id)
  1018. if !srv.runtime.Graph().Exists(id) {
  1019. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
  1020. var (
  1021. imgJSON []byte
  1022. imgSize int
  1023. err error
  1024. img *image.Image
  1025. )
  1026. retries := 5
  1027. for j := 1; j <= retries; j++ {
  1028. imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
  1029. if err != nil && j == retries {
  1030. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1031. return err
  1032. } else if err != nil {
  1033. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1034. continue
  1035. }
  1036. img, err = image.NewImgJSON(imgJSON)
  1037. if err != nil && j == retries {
  1038. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1039. return fmt.Errorf("Failed to parse json: %s", err)
  1040. } else if err != nil {
  1041. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1042. continue
  1043. } else {
  1044. break
  1045. }
  1046. }
  1047. // Get the layer
  1048. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil))
  1049. layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
  1050. if err != nil {
  1051. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1052. return err
  1053. }
  1054. defer layer.Close()
  1055. if err := srv.runtime.Graph().Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil {
  1056. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
  1057. return err
  1058. }
  1059. }
  1060. out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
  1061. }
  1062. return nil
  1063. }
  1064. func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
  1065. out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
  1066. repoData, err := r.GetRepositoryData(remoteName)
  1067. if err != nil {
  1068. return err
  1069. }
  1070. utils.Debugf("Retrieving the tag list")
  1071. tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
  1072. if err != nil {
  1073. utils.Errorf("%v", err)
  1074. return err
  1075. }
  1076. for tag, id := range tagsList {
  1077. repoData.ImgList[id] = &registry.ImgData{
  1078. ID: id,
  1079. Tag: tag,
  1080. Checksum: "",
  1081. }
  1082. }
  1083. utils.Debugf("Registering tags")
  1084. // If no tag has been specified, pull them all
  1085. if askedTag == "" {
  1086. for tag, id := range tagsList {
  1087. repoData.ImgList[id].Tag = tag
  1088. }
  1089. } else {
  1090. // Otherwise, check that the tag exists and use only that one
  1091. id, exists := tagsList[askedTag]
  1092. if !exists {
  1093. return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
  1094. }
  1095. repoData.ImgList[id].Tag = askedTag
  1096. }
  1097. errors := make(chan error)
  1098. for _, image := range repoData.ImgList {
  1099. downloadImage := func(img *registry.ImgData) {
  1100. if askedTag != "" && img.Tag != askedTag {
  1101. utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
  1102. if parallel {
  1103. errors <- nil
  1104. }
  1105. return
  1106. }
  1107. if img.Tag == "" {
  1108. utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
  1109. if parallel {
  1110. errors <- nil
  1111. }
  1112. return
  1113. }
  1114. // ensure no two downloads of the same image happen at the same time
  1115. if c, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
  1116. if c != nil {
  1117. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
  1118. <-c
  1119. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  1120. } else {
  1121. utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
  1122. }
  1123. if parallel {
  1124. errors <- nil
  1125. }
  1126. return
  1127. }
  1128. defer srv.poolRemove("pull", "img:"+img.ID)
  1129. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
  1130. success := false
  1131. var lastErr error
  1132. for _, ep := range repoData.Endpoints {
  1133. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
  1134. if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
  1135. // Its not ideal that only the last error is returned, it would be better to concatenate the errors.
  1136. // As the error is also given to the output stream the user will see the error.
  1137. lastErr = err
  1138. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
  1139. continue
  1140. }
  1141. success = true
  1142. break
  1143. }
  1144. if !success {
  1145. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil))
  1146. if parallel {
  1147. errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
  1148. return
  1149. }
  1150. }
  1151. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  1152. if parallel {
  1153. errors <- nil
  1154. }
  1155. }
  1156. if parallel {
  1157. go downloadImage(image)
  1158. } else {
  1159. downloadImage(image)
  1160. }
  1161. }
  1162. if parallel {
  1163. var lastError error
  1164. for i := 0; i < len(repoData.ImgList); i++ {
  1165. if err := <-errors; err != nil {
  1166. lastError = err
  1167. }
  1168. }
  1169. if lastError != nil {
  1170. return lastError
  1171. }
  1172. }
  1173. for tag, id := range tagsList {
  1174. if askedTag != "" && tag != askedTag {
  1175. continue
  1176. }
  1177. if err := srv.runtime.Repositories().Set(localName, tag, id, true); err != nil {
  1178. return err
  1179. }
  1180. }
  1181. if err := srv.runtime.Repositories().Save(); err != nil {
  1182. return err
  1183. }
  1184. return nil
  1185. }
  1186. func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
  1187. srv.Lock()
  1188. defer srv.Unlock()
  1189. if c, exists := srv.pullingPool[key]; exists {
  1190. return c, fmt.Errorf("pull %s is already in progress", key)
  1191. }
  1192. if c, exists := srv.pushingPool[key]; exists {
  1193. return c, fmt.Errorf("push %s is already in progress", key)
  1194. }
  1195. c := make(chan struct{})
  1196. switch kind {
  1197. case "pull":
  1198. srv.pullingPool[key] = c
  1199. case "push":
  1200. srv.pushingPool[key] = c
  1201. default:
  1202. return nil, fmt.Errorf("Unknown pool type")
  1203. }
  1204. return c, nil
  1205. }
  1206. func (srv *Server) poolRemove(kind, key string) error {
  1207. srv.Lock()
  1208. defer srv.Unlock()
  1209. switch kind {
  1210. case "pull":
  1211. if c, exists := srv.pullingPool[key]; exists {
  1212. close(c)
  1213. delete(srv.pullingPool, key)
  1214. }
  1215. case "push":
  1216. if c, exists := srv.pushingPool[key]; exists {
  1217. close(c)
  1218. delete(srv.pushingPool, key)
  1219. }
  1220. default:
  1221. return fmt.Errorf("Unknown pool type")
  1222. }
  1223. return nil
  1224. }
  1225. func (srv *Server) ImagePull(job *engine.Job) engine.Status {
  1226. if n := len(job.Args); n != 1 && n != 2 {
  1227. return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
  1228. }
  1229. var (
  1230. localName = job.Args[0]
  1231. tag string
  1232. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1233. authConfig = &registry.AuthConfig{}
  1234. metaHeaders map[string][]string
  1235. )
  1236. if len(job.Args) > 1 {
  1237. tag = job.Args[1]
  1238. }
  1239. job.GetenvJson("authConfig", authConfig)
  1240. job.GetenvJson("metaHeaders", metaHeaders)
  1241. c, err := srv.poolAdd("pull", localName+":"+tag)
  1242. if err != nil {
  1243. if c != nil {
  1244. // Another pull of the same repository is already taking place; just wait for it to finish
  1245. job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
  1246. <-c
  1247. return engine.StatusOK
  1248. }
  1249. return job.Error(err)
  1250. }
  1251. defer srv.poolRemove("pull", localName+":"+tag)
  1252. // Resolve the Repository name from fqn to endpoint + name
  1253. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  1254. if err != nil {
  1255. return job.Error(err)
  1256. }
  1257. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  1258. if err != nil {
  1259. return job.Error(err)
  1260. }
  1261. r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
  1262. if err != nil {
  1263. return job.Error(err)
  1264. }
  1265. if endpoint == registry.IndexServerAddress() {
  1266. // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
  1267. localName = remoteName
  1268. }
  1269. if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
  1270. return job.Error(err)
  1271. }
  1272. return engine.StatusOK
  1273. }
  1274. // Retrieve the all the images to be uploaded in the correct order
  1275. func (srv *Server) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  1276. var (
  1277. imageList []string
  1278. imagesSeen map[string]bool = make(map[string]bool)
  1279. tagsByImage map[string][]string = make(map[string][]string)
  1280. )
  1281. for tag, id := range localRepo {
  1282. if requestedTag != "" && requestedTag != tag {
  1283. continue
  1284. }
  1285. var imageListForThisTag []string
  1286. tagsByImage[id] = append(tagsByImage[id], tag)
  1287. for img, err := srv.runtime.Graph().Get(id); img != nil; img, err = img.GetParent() {
  1288. if err != nil {
  1289. return nil, nil, err
  1290. }
  1291. if imagesSeen[img.ID] {
  1292. // This image is already on the list, we can ignore it and all its parents
  1293. break
  1294. }
  1295. imagesSeen[img.ID] = true
  1296. imageListForThisTag = append(imageListForThisTag, img.ID)
  1297. }
  1298. // reverse the image list for this tag (so the "most"-parent image is first)
  1299. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  1300. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  1301. }
  1302. // append to main image list
  1303. imageList = append(imageList, imageListForThisTag...)
  1304. }
  1305. if len(imageList) == 0 {
  1306. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  1307. }
  1308. utils.Debugf("Image list: %v", imageList)
  1309. utils.Debugf("Tags by image: %v", tagsByImage)
  1310. return imageList, tagsByImage, nil
  1311. }
  1312. func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
  1313. out = utils.NewWriteFlusher(out)
  1314. utils.Debugf("Local repo: %s", localRepo)
  1315. imgList, tagsByImage, err := srv.getImageList(localRepo, tag)
  1316. if err != nil {
  1317. return err
  1318. }
  1319. out.Write(sf.FormatStatus("", "Sending image list"))
  1320. var (
  1321. repoData *registry.RepositoryData
  1322. imageIndex []*registry.ImgData
  1323. )
  1324. for _, imgId := range imgList {
  1325. if tags, exists := tagsByImage[imgId]; exists {
  1326. // If an image has tags you must add an entry in the image index
  1327. // for each tag
  1328. for _, tag := range tags {
  1329. imageIndex = append(imageIndex, &registry.ImgData{
  1330. ID: imgId,
  1331. Tag: tag,
  1332. })
  1333. }
  1334. } else {
  1335. // If the image does not have a tag it still needs to be sent to the
  1336. // registry with an empty tag so that it is accociated with the repository
  1337. imageIndex = append(imageIndex, &registry.ImgData{
  1338. ID: imgId,
  1339. Tag: "",
  1340. })
  1341. }
  1342. }
  1343. utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo)
  1344. for _, data := range imageIndex {
  1345. utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag)
  1346. }
  1347. // Register all the images in a repository with the registry
  1348. // If an image is not in this list it will not be associated with the repository
  1349. repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil)
  1350. if err != nil {
  1351. return err
  1352. }
  1353. nTag := 1
  1354. if tag == "" {
  1355. nTag = len(localRepo)
  1356. }
  1357. for _, ep := range repoData.Endpoints {
  1358. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag))
  1359. for _, imgId := range imgList {
  1360. if r.LookupRemoteImage(imgId, ep, repoData.Tokens) {
  1361. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
  1362. } else {
  1363. if _, err := srv.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
  1364. // FIXME: Continue on error?
  1365. return err
  1366. }
  1367. }
  1368. for _, tag := range tagsByImage[imgId] {
  1369. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag))
  1370. if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil {
  1371. return err
  1372. }
  1373. }
  1374. }
  1375. }
  1376. if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil {
  1377. return err
  1378. }
  1379. return nil
  1380. }
  1381. func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  1382. out = utils.NewWriteFlusher(out)
  1383. jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.Graph().Root, imgID, "json"))
  1384. if err != nil {
  1385. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  1386. }
  1387. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  1388. imgData := &registry.ImgData{
  1389. ID: imgID,
  1390. }
  1391. // Send the json
  1392. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  1393. if err == registry.ErrAlreadyExists {
  1394. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  1395. return "", nil
  1396. }
  1397. return "", err
  1398. }
  1399. layerData, err := srv.runtime.Graph().TempLayerArchive(imgID, archive.Uncompressed, sf, out)
  1400. if err != nil {
  1401. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  1402. }
  1403. defer os.RemoveAll(layerData.Name())
  1404. // Send the layer
  1405. utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  1406. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  1407. if err != nil {
  1408. return "", err
  1409. }
  1410. imgData.Checksum = checksum
  1411. imgData.ChecksumPayload = checksumPayload
  1412. // Send the checksum
  1413. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  1414. return "", err
  1415. }
  1416. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  1417. return imgData.Checksum, nil
  1418. }
  1419. // FIXME: Allow to interrupt current push when new push of same image is done.
  1420. func (srv *Server) ImagePush(job *engine.Job) engine.Status {
  1421. if n := len(job.Args); n != 1 {
  1422. return job.Errorf("Usage: %s IMAGE", job.Name)
  1423. }
  1424. var (
  1425. localName = job.Args[0]
  1426. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1427. authConfig = &registry.AuthConfig{}
  1428. metaHeaders map[string][]string
  1429. )
  1430. tag := job.Getenv("tag")
  1431. job.GetenvJson("authConfig", authConfig)
  1432. job.GetenvJson("metaHeaders", metaHeaders)
  1433. if _, err := srv.poolAdd("push", localName); err != nil {
  1434. return job.Error(err)
  1435. }
  1436. defer srv.poolRemove("push", localName)
  1437. // Resolve the Repository name from fqn to endpoint + name
  1438. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  1439. if err != nil {
  1440. return job.Error(err)
  1441. }
  1442. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  1443. if err != nil {
  1444. return job.Error(err)
  1445. }
  1446. img, err := srv.runtime.Graph().Get(localName)
  1447. r, err2 := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
  1448. if err2 != nil {
  1449. return job.Error(err2)
  1450. }
  1451. if err != nil {
  1452. reposLen := 1
  1453. if tag == "" {
  1454. reposLen = len(srv.runtime.Repositories().Repositories[localName])
  1455. }
  1456. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
  1457. // If it fails, try to get the repository
  1458. if localRepo, exists := srv.runtime.Repositories().Repositories[localName]; exists {
  1459. if err := srv.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil {
  1460. return job.Error(err)
  1461. }
  1462. return engine.StatusOK
  1463. }
  1464. return job.Error(err)
  1465. }
  1466. var token []string
  1467. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
  1468. if _, err := srv.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil {
  1469. return job.Error(err)
  1470. }
  1471. return engine.StatusOK
  1472. }
  1473. func (srv *Server) ImageImport(job *engine.Job) engine.Status {
  1474. if n := len(job.Args); n != 2 && n != 3 {
  1475. return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
  1476. }
  1477. var (
  1478. src = job.Args[0]
  1479. repo = job.Args[1]
  1480. tag string
  1481. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1482. archive archive.ArchiveReader
  1483. resp *http.Response
  1484. )
  1485. if len(job.Args) > 2 {
  1486. tag = job.Args[2]
  1487. }
  1488. if src == "-" {
  1489. archive = job.Stdin
  1490. } else {
  1491. u, err := url.Parse(src)
  1492. if err != nil {
  1493. return job.Error(err)
  1494. }
  1495. if u.Scheme == "" {
  1496. u.Scheme = "http"
  1497. u.Host = src
  1498. u.Path = ""
  1499. }
  1500. job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
  1501. // Download with curl (pretty progress bar)
  1502. // If curl is not available, fallback to http.Get()
  1503. resp, err = utils.Download(u.String())
  1504. if err != nil {
  1505. return job.Error(err)
  1506. }
  1507. progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
  1508. defer progressReader.Close()
  1509. archive = progressReader
  1510. }
  1511. img, err := srv.runtime.Graph().Create(archive, "", "", "Imported from "+src, "", nil, nil)
  1512. if err != nil {
  1513. return job.Error(err)
  1514. }
  1515. // Optionally register the image at REPO/TAG
  1516. if repo != "" {
  1517. if err := srv.runtime.Repositories().Set(repo, tag, img.ID, true); err != nil {
  1518. return job.Error(err)
  1519. }
  1520. }
  1521. job.Stdout.Write(sf.FormatStatus("", img.ID))
  1522. return engine.StatusOK
  1523. }
  1524. func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
  1525. var name string
  1526. if len(job.Args) == 1 {
  1527. name = job.Args[0]
  1528. } else if len(job.Args) > 1 {
  1529. return job.Errorf("Usage: %s", job.Name)
  1530. }
  1531. config := runconfig.ContainerConfigFromJob(job)
  1532. if config.Memory != 0 && config.Memory < 524288 {
  1533. return job.Errorf("Minimum memory limit allowed is 512k")
  1534. }
  1535. if config.Memory > 0 && !srv.runtime.SystemConfig().MemoryLimit {
  1536. job.Errorf("Your kernel does not support memory limit capabilities. Limitation discarded.\n")
  1537. config.Memory = 0
  1538. }
  1539. if config.Memory > 0 && !srv.runtime.SystemConfig().SwapLimit {
  1540. job.Errorf("Your kernel does not support swap limit capabilities. Limitation discarded.\n")
  1541. config.MemorySwap = -1
  1542. }
  1543. resolvConf, err := utils.GetResolvConf()
  1544. if err != nil {
  1545. return job.Error(err)
  1546. }
  1547. if !config.NetworkDisabled && len(config.Dns) == 0 && len(srv.runtime.Config().Dns) == 0 && utils.CheckLocalDns(resolvConf) {
  1548. job.Errorf("Local (127.0.0.1) DNS resolver found in resolv.conf and containers can't use it. Using default external servers : %v\n", runtime.DefaultDns)
  1549. config.Dns = runtime.DefaultDns
  1550. }
  1551. container, buildWarnings, err := srv.runtime.Create(config, name)
  1552. if err != nil {
  1553. if srv.runtime.Graph().IsNotExist(err) {
  1554. _, tag := utils.ParseRepositoryTag(config.Image)
  1555. if tag == "" {
  1556. tag = graph.DEFAULTTAG
  1557. }
  1558. return job.Errorf("No such image: %s (tag: %s)", config.Image, tag)
  1559. }
  1560. return job.Error(err)
  1561. }
  1562. if !container.Config.NetworkDisabled && srv.runtime.SystemConfig().IPv4ForwardingDisabled {
  1563. job.Errorf("IPv4 forwarding is disabled.\n")
  1564. }
  1565. srv.LogEvent("create", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  1566. // FIXME: this is necessary because runtime.Create might return a nil container
  1567. // with a non-nil error. This should not happen! Once it's fixed we
  1568. // can remove this workaround.
  1569. if container != nil {
  1570. job.Printf("%s\n", container.ID)
  1571. }
  1572. for _, warning := range buildWarnings {
  1573. job.Errorf("%s\n", warning)
  1574. }
  1575. return engine.StatusOK
  1576. }
  1577. func (srv *Server) ContainerRestart(job *engine.Job) engine.Status {
  1578. if len(job.Args) != 1 {
  1579. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1580. }
  1581. var (
  1582. name = job.Args[0]
  1583. t = 10
  1584. )
  1585. if job.EnvExists("t") {
  1586. t = job.GetenvInt("t")
  1587. }
  1588. if container := srv.runtime.Get(name); container != nil {
  1589. if err := container.Restart(int(t)); err != nil {
  1590. return job.Errorf("Cannot restart container %s: %s\n", name, err)
  1591. }
  1592. srv.LogEvent("restart", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  1593. } else {
  1594. return job.Errorf("No such container: %s\n", name)
  1595. }
  1596. return engine.StatusOK
  1597. }
  1598. func (srv *Server) ContainerDestroy(job *engine.Job) engine.Status {
  1599. if len(job.Args) != 1 {
  1600. return job.Errorf("Not enough arguments. Usage: %s CONTAINER\n", job.Name)
  1601. }
  1602. name := job.Args[0]
  1603. removeVolume := job.GetenvBool("removeVolume")
  1604. removeLink := job.GetenvBool("removeLink")
  1605. forceRemove := job.GetenvBool("forceRemove")
  1606. container := srv.runtime.Get(name)
  1607. if removeLink {
  1608. if container == nil {
  1609. return job.Errorf("No such link: %s", name)
  1610. }
  1611. name, err := runtime.GetFullContainerName(name)
  1612. if err != nil {
  1613. job.Error(err)
  1614. }
  1615. parent, n := path.Split(name)
  1616. if parent == "/" {
  1617. return job.Errorf("Conflict, cannot remove the default name of the container")
  1618. }
  1619. pe := srv.runtime.ContainerGraph().Get(parent)
  1620. if pe == nil {
  1621. return job.Errorf("Cannot get parent %s for name %s", parent, name)
  1622. }
  1623. parentContainer := srv.runtime.Get(pe.ID())
  1624. if parentContainer != nil {
  1625. parentContainer.DisableLink(n)
  1626. }
  1627. if err := srv.runtime.ContainerGraph().Delete(name); err != nil {
  1628. return job.Error(err)
  1629. }
  1630. return engine.StatusOK
  1631. }
  1632. if container != nil {
  1633. if container.State.IsRunning() {
  1634. if forceRemove {
  1635. if err := container.Stop(5); err != nil {
  1636. return job.Errorf("Could not stop running container, cannot remove - %v", err)
  1637. }
  1638. } else {
  1639. return job.Errorf("Impossible to remove a running container, please stop it first or use -f")
  1640. }
  1641. }
  1642. if err := srv.runtime.Destroy(container); err != nil {
  1643. return job.Errorf("Cannot destroy container %s: %s", name, err)
  1644. }
  1645. srv.LogEvent("destroy", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  1646. if removeVolume {
  1647. var (
  1648. volumes = make(map[string]struct{})
  1649. binds = make(map[string]struct{})
  1650. usedVolumes = make(map[string]*runtime.Container)
  1651. )
  1652. // the volume id is always the base of the path
  1653. getVolumeId := func(p string) string {
  1654. return filepath.Base(strings.TrimSuffix(p, "/layer"))
  1655. }
  1656. // populate bind map so that they can be skipped and not removed
  1657. for _, bind := range container.HostConfig().Binds {
  1658. source := strings.Split(bind, ":")[0]
  1659. // TODO: refactor all volume stuff, all of it
  1660. // this is very important that we eval the link
  1661. // or comparing the keys to container.Volumes will not work
  1662. p, err := filepath.EvalSymlinks(source)
  1663. if err != nil {
  1664. return job.Error(err)
  1665. }
  1666. source = p
  1667. binds[source] = struct{}{}
  1668. }
  1669. // Store all the deleted containers volumes
  1670. for _, volumeId := range container.Volumes {
  1671. // Skip the volumes mounted from external
  1672. // bind mounts here will will be evaluated for a symlink
  1673. if _, exists := binds[volumeId]; exists {
  1674. continue
  1675. }
  1676. volumeId = getVolumeId(volumeId)
  1677. volumes[volumeId] = struct{}{}
  1678. }
  1679. // Retrieve all volumes from all remaining containers
  1680. for _, container := range srv.runtime.List() {
  1681. for _, containerVolumeId := range container.Volumes {
  1682. containerVolumeId = getVolumeId(containerVolumeId)
  1683. usedVolumes[containerVolumeId] = container
  1684. }
  1685. }
  1686. for volumeId := range volumes {
  1687. // If the requested volu
  1688. if c, exists := usedVolumes[volumeId]; exists {
  1689. log.Printf("The volume %s is used by the container %s. Impossible to remove it. Skipping.\n", volumeId, c.ID)
  1690. continue
  1691. }
  1692. if err := srv.runtime.Volumes().Delete(volumeId); err != nil {
  1693. return job.Errorf("Error calling volumes.Delete(%q): %v", volumeId, err)
  1694. }
  1695. }
  1696. }
  1697. } else {
  1698. return job.Errorf("No such container: %s", name)
  1699. }
  1700. return engine.StatusOK
  1701. }
  1702. func (srv *Server) DeleteImage(name string, imgs *engine.Table, first, force, noprune bool) error {
  1703. var (
  1704. repoName, tag string
  1705. tags = []string{}
  1706. )
  1707. repoName, tag = utils.ParseRepositoryTag(name)
  1708. if tag == "" {
  1709. tag = graph.DEFAULTTAG
  1710. }
  1711. img, err := srv.runtime.Repositories().LookupImage(name)
  1712. if err != nil {
  1713. if r, _ := srv.runtime.Repositories().Get(repoName); r != nil {
  1714. return fmt.Errorf("No such image: %s:%s", repoName, tag)
  1715. }
  1716. return fmt.Errorf("No such image: %s", name)
  1717. }
  1718. if strings.Contains(img.ID, name) {
  1719. repoName = ""
  1720. tag = ""
  1721. }
  1722. byParents, err := srv.runtime.Graph().ByParent()
  1723. if err != nil {
  1724. return err
  1725. }
  1726. //If delete by id, see if the id belong only to one repository
  1727. if repoName == "" {
  1728. for _, repoAndTag := range srv.runtime.Repositories().ByID()[img.ID] {
  1729. parsedRepo, parsedTag := utils.ParseRepositoryTag(repoAndTag)
  1730. if repoName == "" || repoName == parsedRepo {
  1731. repoName = parsedRepo
  1732. if parsedTag != "" {
  1733. tags = append(tags, parsedTag)
  1734. }
  1735. } else if repoName != parsedRepo && !force {
  1736. // the id belongs to multiple repos, like base:latest and user:test,
  1737. // in that case return conflict
  1738. return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name)
  1739. }
  1740. }
  1741. } else {
  1742. tags = append(tags, tag)
  1743. }
  1744. if !first && len(tags) > 0 {
  1745. return nil
  1746. }
  1747. //Untag the current image
  1748. for _, tag := range tags {
  1749. tagDeleted, err := srv.runtime.Repositories().Delete(repoName, tag)
  1750. if err != nil {
  1751. return err
  1752. }
  1753. if tagDeleted {
  1754. out := &engine.Env{}
  1755. out.Set("Untagged", repoName+":"+tag)
  1756. imgs.Add(out)
  1757. srv.LogEvent("untag", img.ID, "")
  1758. }
  1759. }
  1760. tags = srv.runtime.Repositories().ByID()[img.ID]
  1761. if (len(tags) <= 1 && repoName == "") || len(tags) == 0 {
  1762. if len(byParents[img.ID]) == 0 {
  1763. if err := srv.canDeleteImage(img.ID); err != nil {
  1764. return err
  1765. }
  1766. if err := srv.runtime.Repositories().DeleteAll(img.ID); err != nil {
  1767. return err
  1768. }
  1769. if err := srv.runtime.Graph().Delete(img.ID); err != nil {
  1770. return err
  1771. }
  1772. out := &engine.Env{}
  1773. out.Set("Deleted", img.ID)
  1774. imgs.Add(out)
  1775. srv.LogEvent("delete", img.ID, "")
  1776. if img.Parent != "" && !noprune {
  1777. err := srv.DeleteImage(img.Parent, imgs, false, force, noprune)
  1778. if first {
  1779. return err
  1780. }
  1781. }
  1782. }
  1783. }
  1784. return nil
  1785. }
  1786. func (srv *Server) ImageDelete(job *engine.Job) engine.Status {
  1787. if n := len(job.Args); n != 1 {
  1788. return job.Errorf("Usage: %s IMAGE", job.Name)
  1789. }
  1790. imgs := engine.NewTable("", 0)
  1791. if err := srv.DeleteImage(job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil {
  1792. return job.Error(err)
  1793. }
  1794. if len(imgs.Data) == 0 {
  1795. return job.Errorf("Conflict, %s wasn't deleted", job.Args[0])
  1796. }
  1797. if _, err := imgs.WriteListTo(job.Stdout); err != nil {
  1798. return job.Error(err)
  1799. }
  1800. return engine.StatusOK
  1801. }
  1802. func (srv *Server) canDeleteImage(imgID string) error {
  1803. for _, container := range srv.runtime.List() {
  1804. parent, err := srv.runtime.Repositories().LookupImage(container.Image)
  1805. if err != nil {
  1806. return err
  1807. }
  1808. if err := parent.WalkHistory(func(p *image.Image) error {
  1809. if imgID == p.ID {
  1810. return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it", utils.TruncateID(imgID), utils.TruncateID(container.ID))
  1811. }
  1812. return nil
  1813. }); err != nil {
  1814. return err
  1815. }
  1816. }
  1817. return nil
  1818. }
  1819. func (srv *Server) ImageGetCached(imgID string, config *runconfig.Config) (*image.Image, error) {
  1820. // Retrieve all images
  1821. images, err := srv.runtime.Graph().Map()
  1822. if err != nil {
  1823. return nil, err
  1824. }
  1825. // Store the tree in a map of map (map[parentId][childId])
  1826. imageMap := make(map[string]map[string]struct{})
  1827. for _, img := range images {
  1828. if _, exists := imageMap[img.Parent]; !exists {
  1829. imageMap[img.Parent] = make(map[string]struct{})
  1830. }
  1831. imageMap[img.Parent][img.ID] = struct{}{}
  1832. }
  1833. // Loop on the children of the given image and check the config
  1834. var match *image.Image
  1835. for elem := range imageMap[imgID] {
  1836. img, err := srv.runtime.Graph().Get(elem)
  1837. if err != nil {
  1838. return nil, err
  1839. }
  1840. if runconfig.Compare(&img.ContainerConfig, config) {
  1841. if match == nil || match.Created.Before(img.Created) {
  1842. match = img
  1843. }
  1844. }
  1845. }
  1846. return match, nil
  1847. }
  1848. func (srv *Server) RegisterLinks(container *runtime.Container, hostConfig *runconfig.HostConfig) error {
  1849. runtime := srv.runtime
  1850. if hostConfig != nil && hostConfig.Links != nil {
  1851. for _, l := range hostConfig.Links {
  1852. parts, err := utils.PartParser("name:alias", l)
  1853. if err != nil {
  1854. return err
  1855. }
  1856. child, err := srv.runtime.GetByName(parts["name"])
  1857. if err != nil {
  1858. return err
  1859. }
  1860. if child == nil {
  1861. return fmt.Errorf("Could not get container for %s", parts["name"])
  1862. }
  1863. if err := runtime.RegisterLink(container, child, parts["alias"]); err != nil {
  1864. return err
  1865. }
  1866. }
  1867. // After we load all the links into the runtime
  1868. // set them to nil on the hostconfig
  1869. hostConfig.Links = nil
  1870. if err := container.WriteHostConfig(); err != nil {
  1871. return err
  1872. }
  1873. }
  1874. return nil
  1875. }
  1876. func (srv *Server) ContainerStart(job *engine.Job) engine.Status {
  1877. if len(job.Args) < 1 {
  1878. return job.Errorf("Usage: %s container_id", job.Name)
  1879. }
  1880. var (
  1881. name = job.Args[0]
  1882. runtime = srv.runtime
  1883. container = runtime.Get(name)
  1884. )
  1885. if container == nil {
  1886. return job.Errorf("No such container: %s", name)
  1887. }
  1888. // If no environment was set, then no hostconfig was passed.
  1889. if len(job.Environ()) > 0 {
  1890. hostConfig := runconfig.ContainerHostConfigFromJob(job)
  1891. // Validate the HostConfig binds. Make sure that:
  1892. // 1) the source of a bind mount isn't /
  1893. // The bind mount "/:/foo" isn't allowed.
  1894. // 2) Check that the source exists
  1895. // The source to be bind mounted must exist.
  1896. for _, bind := range hostConfig.Binds {
  1897. splitBind := strings.Split(bind, ":")
  1898. source := splitBind[0]
  1899. // refuse to bind mount "/" to the container
  1900. if source == "/" {
  1901. return job.Errorf("Invalid bind mount '%s' : source can't be '/'", bind)
  1902. }
  1903. // ensure the source exists on the host
  1904. _, err := os.Stat(source)
  1905. if err != nil && os.IsNotExist(err) {
  1906. err = os.MkdirAll(source, 0755)
  1907. if err != nil {
  1908. return job.Errorf("Could not create local directory '%s' for bind mount: %s!", source, err.Error())
  1909. }
  1910. }
  1911. }
  1912. // Register any links from the host config before starting the container
  1913. if err := srv.RegisterLinks(container, hostConfig); err != nil {
  1914. return job.Error(err)
  1915. }
  1916. container.SetHostConfig(hostConfig)
  1917. container.ToDisk()
  1918. }
  1919. if err := container.Start(); err != nil {
  1920. return job.Errorf("Cannot start container %s: %s", name, err)
  1921. }
  1922. srv.LogEvent("start", container.ID, runtime.Repositories().ImageName(container.Image))
  1923. return engine.StatusOK
  1924. }
  1925. func (srv *Server) ContainerStop(job *engine.Job) engine.Status {
  1926. if len(job.Args) != 1 {
  1927. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1928. }
  1929. var (
  1930. name = job.Args[0]
  1931. t = 10
  1932. )
  1933. if job.EnvExists("t") {
  1934. t = job.GetenvInt("t")
  1935. }
  1936. if container := srv.runtime.Get(name); container != nil {
  1937. if err := container.Stop(int(t)); err != nil {
  1938. return job.Errorf("Cannot stop container %s: %s\n", name, err)
  1939. }
  1940. srv.LogEvent("stop", container.ID, srv.runtime.Repositories().ImageName(container.Image))
  1941. } else {
  1942. return job.Errorf("No such container: %s\n", name)
  1943. }
  1944. return engine.StatusOK
  1945. }
  1946. func (srv *Server) ContainerWait(job *engine.Job) engine.Status {
  1947. if len(job.Args) != 1 {
  1948. return job.Errorf("Usage: %s", job.Name)
  1949. }
  1950. name := job.Args[0]
  1951. if container := srv.runtime.Get(name); container != nil {
  1952. status := container.Wait()
  1953. job.Printf("%d\n", status)
  1954. return engine.StatusOK
  1955. }
  1956. return job.Errorf("%s: no such container: %s", job.Name, name)
  1957. }
  1958. func (srv *Server) ContainerResize(job *engine.Job) engine.Status {
  1959. if len(job.Args) != 3 {
  1960. return job.Errorf("Not enough arguments. Usage: %s CONTAINER HEIGHT WIDTH\n", job.Name)
  1961. }
  1962. name := job.Args[0]
  1963. height, err := strconv.Atoi(job.Args[1])
  1964. if err != nil {
  1965. return job.Error(err)
  1966. }
  1967. width, err := strconv.Atoi(job.Args[2])
  1968. if err != nil {
  1969. return job.Error(err)
  1970. }
  1971. if container := srv.runtime.Get(name); container != nil {
  1972. if err := container.Resize(height, width); err != nil {
  1973. return job.Error(err)
  1974. }
  1975. return engine.StatusOK
  1976. }
  1977. return job.Errorf("No such container: %s", name)
  1978. }
  1979. func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
  1980. if len(job.Args) != 1 {
  1981. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1982. }
  1983. var (
  1984. name = job.Args[0]
  1985. logs = job.GetenvBool("logs")
  1986. stream = job.GetenvBool("stream")
  1987. stdin = job.GetenvBool("stdin")
  1988. stdout = job.GetenvBool("stdout")
  1989. stderr = job.GetenvBool("stderr")
  1990. )
  1991. container := srv.runtime.Get(name)
  1992. if container == nil {
  1993. return job.Errorf("No such container: %s", name)
  1994. }
  1995. //logs
  1996. if logs {
  1997. cLog, err := container.ReadLog("json")
  1998. if err != nil && os.IsNotExist(err) {
  1999. // Legacy logs
  2000. utils.Debugf("Old logs format")
  2001. if stdout {
  2002. cLog, err := container.ReadLog("stdout")
  2003. if err != nil {
  2004. utils.Errorf("Error reading logs (stdout): %s", err)
  2005. } else if _, err := io.Copy(job.Stdout, cLog); err != nil {
  2006. utils.Errorf("Error streaming logs (stdout): %s", err)
  2007. }
  2008. }
  2009. if stderr {
  2010. cLog, err := container.ReadLog("stderr")
  2011. if err != nil {
  2012. utils.Errorf("Error reading logs (stderr): %s", err)
  2013. } else if _, err := io.Copy(job.Stderr, cLog); err != nil {
  2014. utils.Errorf("Error streaming logs (stderr): %s", err)
  2015. }
  2016. }
  2017. } else if err != nil {
  2018. utils.Errorf("Error reading logs (json): %s", err)
  2019. } else {
  2020. dec := json.NewDecoder(cLog)
  2021. for {
  2022. l := &utils.JSONLog{}
  2023. if err := dec.Decode(l); err == io.EOF {
  2024. break
  2025. } else if err != nil {
  2026. utils.Errorf("Error streaming logs: %s", err)
  2027. break
  2028. }
  2029. if l.Stream == "stdout" && stdout {
  2030. fmt.Fprintf(job.Stdout, "%s", l.Log)
  2031. }
  2032. if l.Stream == "stderr" && stderr {
  2033. fmt.Fprintf(job.Stderr, "%s", l.Log)
  2034. }
  2035. }
  2036. }
  2037. }
  2038. //stream
  2039. if stream {
  2040. if container.State.IsGhost() {
  2041. return job.Errorf("Impossible to attach to a ghost container")
  2042. }
  2043. var (
  2044. cStdin io.ReadCloser
  2045. cStdout, cStderr io.Writer
  2046. cStdinCloser io.Closer
  2047. )
  2048. if stdin {
  2049. r, w := io.Pipe()
  2050. go func() {
  2051. defer w.Close()
  2052. defer utils.Debugf("Closing buffered stdin pipe")
  2053. io.Copy(w, job.Stdin)
  2054. }()
  2055. cStdin = r
  2056. cStdinCloser = job.Stdin
  2057. }
  2058. if stdout {
  2059. cStdout = job.Stdout
  2060. }
  2061. if stderr {
  2062. cStderr = job.Stderr
  2063. }
  2064. <-container.Attach(cStdin, cStdinCloser, cStdout, cStderr)
  2065. // If we are in stdinonce mode, wait for the process to end
  2066. // otherwise, simply return
  2067. if container.Config.StdinOnce && !container.Config.Tty {
  2068. container.Wait()
  2069. }
  2070. }
  2071. return engine.StatusOK
  2072. }
  2073. func (srv *Server) ContainerInspect(name string) (*runtime.Container, error) {
  2074. if container := srv.runtime.Get(name); container != nil {
  2075. return container, nil
  2076. }
  2077. return nil, fmt.Errorf("No such container: %s", name)
  2078. }
  2079. func (srv *Server) ImageInspect(name string) (*image.Image, error) {
  2080. if image, err := srv.runtime.Repositories().LookupImage(name); err == nil && image != nil {
  2081. return image, nil
  2082. }
  2083. return nil, fmt.Errorf("No such image: %s", name)
  2084. }
  2085. func (srv *Server) JobInspect(job *engine.Job) engine.Status {
  2086. // TODO: deprecate KIND/conflict
  2087. if n := len(job.Args); n != 2 {
  2088. return job.Errorf("Usage: %s CONTAINER|IMAGE KIND", job.Name)
  2089. }
  2090. var (
  2091. name = job.Args[0]
  2092. kind = job.Args[1]
  2093. object interface{}
  2094. conflict = job.GetenvBool("conflict") //should the job detect conflict between containers and images
  2095. image, errImage = srv.ImageInspect(name)
  2096. container, errContainer = srv.ContainerInspect(name)
  2097. )
  2098. if conflict && image != nil && container != nil {
  2099. return job.Errorf("Conflict between containers and images")
  2100. }
  2101. switch kind {
  2102. case "image":
  2103. if errImage != nil {
  2104. return job.Error(errImage)
  2105. }
  2106. object = image
  2107. case "container":
  2108. if errContainer != nil {
  2109. return job.Error(errContainer)
  2110. }
  2111. object = &struct {
  2112. *runtime.Container
  2113. HostConfig *runconfig.HostConfig
  2114. }{container, container.HostConfig()}
  2115. default:
  2116. return job.Errorf("Unknown kind: %s", kind)
  2117. }
  2118. b, err := json.Marshal(object)
  2119. if err != nil {
  2120. return job.Error(err)
  2121. }
  2122. job.Stdout.Write(b)
  2123. return engine.StatusOK
  2124. }
  2125. func (srv *Server) ContainerCopy(job *engine.Job) engine.Status {
  2126. if len(job.Args) != 2 {
  2127. return job.Errorf("Usage: %s CONTAINER RESOURCE\n", job.Name)
  2128. }
  2129. var (
  2130. name = job.Args[0]
  2131. resource = job.Args[1]
  2132. )
  2133. if container := srv.runtime.Get(name); container != nil {
  2134. data, err := container.Copy(resource)
  2135. if err != nil {
  2136. return job.Error(err)
  2137. }
  2138. defer data.Close()
  2139. if _, err := io.Copy(job.Stdout, data); err != nil {
  2140. return job.Error(err)
  2141. }
  2142. return engine.StatusOK
  2143. }
  2144. return job.Errorf("No such container: %s", name)
  2145. }
  2146. func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) {
  2147. runtime, err := runtime.NewRuntime(config, eng)
  2148. if err != nil {
  2149. return nil, err
  2150. }
  2151. srv := &Server{
  2152. Eng: eng,
  2153. runtime: runtime,
  2154. pullingPool: make(map[string]chan struct{}),
  2155. pushingPool: make(map[string]chan struct{}),
  2156. events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
  2157. listeners: make(map[string]chan utils.JSONMessage),
  2158. running: true,
  2159. }
  2160. runtime.SetServer(srv)
  2161. return srv, nil
  2162. }
  2163. func (srv *Server) HTTPRequestFactory(metaHeaders map[string][]string) *utils.HTTPRequestFactory {
  2164. httpVersion := make([]utils.VersionInfo, 0, 4)
  2165. httpVersion = append(httpVersion, &simpleVersionInfo{"docker", dockerversion.VERSION})
  2166. httpVersion = append(httpVersion, &simpleVersionInfo{"go", goruntime.Version()})
  2167. httpVersion = append(httpVersion, &simpleVersionInfo{"git-commit", dockerversion.GITCOMMIT})
  2168. if kernelVersion, err := utils.GetKernelVersion(); err == nil {
  2169. httpVersion = append(httpVersion, &simpleVersionInfo{"kernel", kernelVersion.String()})
  2170. }
  2171. httpVersion = append(httpVersion, &simpleVersionInfo{"os", goruntime.GOOS})
  2172. httpVersion = append(httpVersion, &simpleVersionInfo{"arch", goruntime.GOARCH})
  2173. ud := utils.NewHTTPUserAgentDecorator(httpVersion...)
  2174. md := &utils.HTTPMetaHeadersDecorator{
  2175. Headers: metaHeaders,
  2176. }
  2177. factory := utils.NewHTTPRequestFactory(ud, md)
  2178. return factory
  2179. }
  2180. func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
  2181. now := time.Now().UTC().Unix()
  2182. jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
  2183. srv.AddEvent(jm)
  2184. for _, c := range srv.listeners {
  2185. select { // non blocking channel
  2186. case c <- jm:
  2187. default:
  2188. }
  2189. }
  2190. return &jm
  2191. }
  2192. func (srv *Server) AddEvent(jm utils.JSONMessage) {
  2193. srv.Lock()
  2194. defer srv.Unlock()
  2195. srv.events = append(srv.events, jm)
  2196. }
  2197. func (srv *Server) GetEvents() []utils.JSONMessage {
  2198. srv.RLock()
  2199. defer srv.RUnlock()
  2200. return srv.events
  2201. }
  2202. func (srv *Server) SetRunning(status bool) {
  2203. srv.Lock()
  2204. defer srv.Unlock()
  2205. srv.running = status
  2206. }
  2207. func (srv *Server) IsRunning() bool {
  2208. srv.RLock()
  2209. defer srv.RUnlock()
  2210. return srv.running
  2211. }
  2212. func (srv *Server) Close() error {
  2213. if srv == nil {
  2214. return nil
  2215. }
  2216. srv.SetRunning(false)
  2217. if srv.runtime == nil {
  2218. return nil
  2219. }
  2220. return srv.runtime.Close()
  2221. }
  2222. type Server struct {
  2223. sync.RWMutex
  2224. runtime *runtime.Runtime
  2225. pullingPool map[string]chan struct{}
  2226. pushingPool map[string]chan struct{}
  2227. events []utils.JSONMessage
  2228. listeners map[string]chan utils.JSONMessage
  2229. Eng *engine.Engine
  2230. running bool
  2231. }