server.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518
  1. // DEPRECATION NOTICE. PLEASE DO NOT ADD ANYTHING TO THIS FILE.
  2. //
  3. // server/server.go is deprecated. We are working on breaking it up into smaller, cleaner
  4. // pieces which will be easier to find and test. This will help make the code less
  5. // redundant and more readable.
  6. //
  7. // Contributors, please don't add anything to server/server.go, unless it has the explicit
  8. // goal of helping the deprecation effort.
  9. //
  10. // Maintainers, please refuse patches which add code to server/server.go.
  11. //
  12. // Instead try the following files:
  13. // * For code related to local image management, try graph/
  14. // * For code related to image downloading, uploading, remote search etc, try registry/
  15. // * For code related to the docker daemon, try daemon/
  16. // * For small utilities which could potentially be useful outside of Docker, try pkg/
  17. // * For miscalleneous "util" functions which are docker-specific, try encapsulating them
  18. // inside one of the subsystems above. If you really think they should be more widely
  19. // available, are you sure you can't remove the docker dependencies and move them to
  20. // pkg? In last resort, you can add them to utils/ (but please try not to).
  21. package server
  22. import (
  23. "bytes"
  24. "encoding/json"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "log"
  30. "net"
  31. "net/http"
  32. "net/url"
  33. "os"
  34. "os/exec"
  35. gosignal "os/signal"
  36. "path"
  37. "path/filepath"
  38. "runtime"
  39. "strconv"
  40. "strings"
  41. "sync"
  42. "sync/atomic"
  43. "syscall"
  44. "time"
  45. "github.com/dotcloud/docker/archive"
  46. "github.com/dotcloud/docker/daemon"
  47. "github.com/dotcloud/docker/daemonconfig"
  48. "github.com/dotcloud/docker/dockerversion"
  49. "github.com/dotcloud/docker/engine"
  50. "github.com/dotcloud/docker/graph"
  51. "github.com/dotcloud/docker/image"
  52. "github.com/dotcloud/docker/pkg/graphdb"
  53. "github.com/dotcloud/docker/pkg/signal"
  54. "github.com/dotcloud/docker/pkg/tailfile"
  55. "github.com/dotcloud/docker/registry"
  56. "github.com/dotcloud/docker/runconfig"
  57. "github.com/dotcloud/docker/utils"
  58. "github.com/dotcloud/docker/utils/filters"
  59. )
  60. func (srv *Server) handlerWrap(h engine.Handler) engine.Handler {
  61. return func(job *engine.Job) engine.Status {
  62. if !srv.IsRunning() {
  63. return job.Errorf("Server is not running")
  64. }
  65. srv.tasks.Add(1)
  66. defer srv.tasks.Done()
  67. return h(job)
  68. }
  69. }
  70. // jobInitApi runs the remote api server `srv` as a daemon,
  71. // Only one api server can run at the same time - this is enforced by a pidfile.
  72. // The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup.
  73. func InitServer(job *engine.Job) engine.Status {
  74. job.Logf("Creating server")
  75. srv, err := NewServer(job.Eng, daemonconfig.ConfigFromJob(job))
  76. if err != nil {
  77. return job.Error(err)
  78. }
  79. if srv.daemon.Config().Pidfile != "" {
  80. job.Logf("Creating pidfile")
  81. if err := utils.CreatePidFile(srv.daemon.Config().Pidfile); err != nil {
  82. // FIXME: do we need fatal here instead of returning a job error?
  83. log.Fatal(err)
  84. }
  85. }
  86. job.Logf("Setting up signal traps")
  87. c := make(chan os.Signal, 1)
  88. gosignal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
  89. go func() {
  90. interruptCount := uint32(0)
  91. for sig := range c {
  92. go func(sig os.Signal) {
  93. log.Printf("Received signal '%v', starting shutdown of docker...\n", sig)
  94. switch sig {
  95. case os.Interrupt, syscall.SIGTERM:
  96. // If the user really wants to interrupt, let him do so.
  97. if atomic.LoadUint32(&interruptCount) < 3 {
  98. atomic.AddUint32(&interruptCount, 1)
  99. // Initiate the cleanup only once
  100. if atomic.LoadUint32(&interruptCount) == 1 {
  101. utils.RemovePidFile(srv.daemon.Config().Pidfile)
  102. srv.Close()
  103. } else {
  104. return
  105. }
  106. } else {
  107. log.Printf("Force shutdown of docker, interrupting cleanup\n")
  108. }
  109. case syscall.SIGQUIT:
  110. }
  111. os.Exit(128 + int(sig.(syscall.Signal)))
  112. }(sig)
  113. }
  114. }()
  115. job.Eng.Hack_SetGlobalVar("httpapi.server", srv)
  116. job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon)
  117. for name, handler := range map[string]engine.Handler{
  118. "export": srv.ContainerExport,
  119. "create": srv.ContainerCreate,
  120. "stop": srv.ContainerStop,
  121. "restart": srv.ContainerRestart,
  122. "start": srv.ContainerStart,
  123. "kill": srv.ContainerKill,
  124. "pause": srv.ContainerPause,
  125. "unpause": srv.ContainerUnpause,
  126. "wait": srv.ContainerWait,
  127. "tag": srv.ImageTag, // FIXME merge with "image_tag"
  128. "resize": srv.ContainerResize,
  129. "commit": srv.ContainerCommit,
  130. "info": srv.DockerInfo,
  131. "container_delete": srv.ContainerDestroy,
  132. "image_export": srv.ImageExport,
  133. "images": srv.Images,
  134. "history": srv.ImageHistory,
  135. "viz": srv.ImagesViz,
  136. "container_copy": srv.ContainerCopy,
  137. "attach": srv.ContainerAttach,
  138. "logs": srv.ContainerLogs,
  139. "changes": srv.ContainerChanges,
  140. "top": srv.ContainerTop,
  141. "load": srv.ImageLoad,
  142. "build": srv.Build,
  143. "pull": srv.ImagePull,
  144. "import": srv.ImageImport,
  145. "image_delete": srv.ImageDelete,
  146. "events": srv.Events,
  147. "push": srv.ImagePush,
  148. "containers": srv.Containers,
  149. } {
  150. if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil {
  151. return job.Error(err)
  152. }
  153. }
  154. // Install image-related commands from the image subsystem.
  155. // See `graph/service.go`
  156. if err := srv.daemon.Repositories().Install(job.Eng); err != nil {
  157. return job.Error(err)
  158. }
  159. // Install daemon-related commands from the daemon subsystem.
  160. // See `daemon/`
  161. if err := srv.daemon.Install(job.Eng); err != nil {
  162. return job.Error(err)
  163. }
  164. srv.SetRunning(true)
  165. return engine.StatusOK
  166. }
  167. func (srv *Server) ContainerPause(job *engine.Job) engine.Status {
  168. if len(job.Args) != 1 {
  169. return job.Errorf("Usage: %s CONTAINER", job.Name)
  170. }
  171. name := job.Args[0]
  172. container := srv.daemon.Get(name)
  173. if container == nil {
  174. return job.Errorf("No such container: %s", name)
  175. }
  176. if err := container.Pause(); err != nil {
  177. return job.Errorf("Cannot pause container %s: %s", name, err)
  178. }
  179. return engine.StatusOK
  180. }
  181. func (srv *Server) ContainerUnpause(job *engine.Job) engine.Status {
  182. if n := len(job.Args); n < 1 || n > 2 {
  183. return job.Errorf("Usage: %s CONTAINER", job.Name)
  184. }
  185. name := job.Args[0]
  186. container := srv.daemon.Get(name)
  187. if container == nil {
  188. return job.Errorf("No such container: %s", name)
  189. }
  190. if err := container.Unpause(); err != nil {
  191. return job.Errorf("Cannot unpause container %s: %s", name, err)
  192. }
  193. return engine.StatusOK
  194. }
  195. // ContainerKill send signal to the container
  196. // If no signal is given (sig 0), then Kill with SIGKILL and wait
  197. // for the container to exit.
  198. // If a signal is given, then just send it to the container and return.
  199. func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
  200. if n := len(job.Args); n < 1 || n > 2 {
  201. return job.Errorf("Usage: %s CONTAINER [SIGNAL]", job.Name)
  202. }
  203. var (
  204. name = job.Args[0]
  205. sig uint64
  206. err error
  207. )
  208. // If we have a signal, look at it. Otherwise, do nothing
  209. if len(job.Args) == 2 && job.Args[1] != "" {
  210. // Check if we passed the signal as a number:
  211. // The largest legal signal is 31, so let's parse on 5 bits
  212. sig, err = strconv.ParseUint(job.Args[1], 10, 5)
  213. if err != nil {
  214. // The signal is not a number, treat it as a string (either like "KILL" or like "SIGKILL")
  215. sig = uint64(signal.SignalMap[strings.TrimPrefix(job.Args[1], "SIG")])
  216. if sig == 0 {
  217. return job.Errorf("Invalid signal: %s", job.Args[1])
  218. }
  219. }
  220. }
  221. if container := srv.daemon.Get(name); container != nil {
  222. // If no signal is passed, or SIGKILL, perform regular Kill (SIGKILL + wait())
  223. if sig == 0 || syscall.Signal(sig) == syscall.SIGKILL {
  224. if err := container.Kill(); err != nil {
  225. return job.Errorf("Cannot kill container %s: %s", name, err)
  226. }
  227. srv.LogEvent("kill", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  228. } else {
  229. // Otherwise, just send the requested signal
  230. if err := container.KillSig(int(sig)); err != nil {
  231. return job.Errorf("Cannot kill container %s: %s", name, err)
  232. }
  233. // FIXME: Add event for signals
  234. }
  235. } else {
  236. return job.Errorf("No such container: %s", name)
  237. }
  238. return engine.StatusOK
  239. }
  240. func (srv *Server) Events(job *engine.Job) engine.Status {
  241. if len(job.Args) != 0 {
  242. return job.Errorf("Usage: %s", job.Name)
  243. }
  244. var (
  245. since = job.GetenvInt64("since")
  246. until = job.GetenvInt64("until")
  247. timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
  248. )
  249. // If no until, disable timeout
  250. if until == 0 {
  251. timeout.Stop()
  252. }
  253. listener := make(chan utils.JSONMessage)
  254. srv.eventPublisher.Subscribe(listener)
  255. defer srv.eventPublisher.Unsubscribe(listener)
  256. // When sending an event JSON serialization errors are ignored, but all
  257. // other errors lead to the eviction of the listener.
  258. sendEvent := func(event *utils.JSONMessage) error {
  259. if b, err := json.Marshal(event); err == nil {
  260. if _, err = job.Stdout.Write(b); err != nil {
  261. return err
  262. }
  263. }
  264. return nil
  265. }
  266. job.Stdout.Write(nil)
  267. // Resend every event in the [since, until] time interval.
  268. if since != 0 {
  269. for _, event := range srv.GetEvents() {
  270. if event.Time >= since && (event.Time <= until || until == 0) {
  271. if err := sendEvent(&event); err != nil {
  272. return job.Error(err)
  273. }
  274. }
  275. }
  276. }
  277. for {
  278. select {
  279. case event, ok := <-listener:
  280. if !ok {
  281. return engine.StatusOK
  282. }
  283. if err := sendEvent(&event); err != nil {
  284. return job.Error(err)
  285. }
  286. case <-timeout.C:
  287. return engine.StatusOK
  288. }
  289. }
  290. }
  291. func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
  292. if len(job.Args) != 1 {
  293. return job.Errorf("Usage: %s container_id", job.Name)
  294. }
  295. name := job.Args[0]
  296. if container := srv.daemon.Get(name); container != nil {
  297. data, err := container.Export()
  298. if err != nil {
  299. return job.Errorf("%s: %s", name, err)
  300. }
  301. defer data.Close()
  302. // Stream the entire contents of the container (basically a volatile snapshot)
  303. if _, err := io.Copy(job.Stdout, data); err != nil {
  304. return job.Errorf("%s: %s", name, err)
  305. }
  306. // FIXME: factor job-specific LogEvent to engine.Job.Run()
  307. srv.LogEvent("export", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  308. return engine.StatusOK
  309. }
  310. return job.Errorf("No such container: %s", name)
  311. }
  312. // ImageExport exports all images with the given tag. All versions
  313. // containing the same tag are exported. The resulting output is an
  314. // uncompressed tar ball.
  315. // name is the set of tags to export.
  316. // out is the writer where the images are written to.
  317. func (srv *Server) ImageExport(job *engine.Job) engine.Status {
  318. if len(job.Args) != 1 {
  319. return job.Errorf("Usage: %s IMAGE\n", job.Name)
  320. }
  321. name := job.Args[0]
  322. // get image json
  323. tempdir, err := ioutil.TempDir("", "docker-export-")
  324. if err != nil {
  325. return job.Error(err)
  326. }
  327. defer os.RemoveAll(tempdir)
  328. utils.Debugf("Serializing %s", name)
  329. rootRepoMap := map[string]graph.Repository{}
  330. rootRepo, err := srv.daemon.Repositories().Get(name)
  331. if err != nil {
  332. return job.Error(err)
  333. }
  334. if rootRepo != nil {
  335. // this is a base repo name, like 'busybox'
  336. for _, id := range rootRepo {
  337. if err := srv.exportImage(job.Eng, id, tempdir); err != nil {
  338. return job.Error(err)
  339. }
  340. }
  341. rootRepoMap[name] = rootRepo
  342. } else {
  343. img, err := srv.daemon.Repositories().LookupImage(name)
  344. if err != nil {
  345. return job.Error(err)
  346. }
  347. if img != nil {
  348. // This is a named image like 'busybox:latest'
  349. repoName, repoTag := utils.ParseRepositoryTag(name)
  350. if err := srv.exportImage(job.Eng, img.ID, tempdir); err != nil {
  351. return job.Error(err)
  352. }
  353. // check this length, because a lookup of a truncated has will not have a tag
  354. // and will not need to be added to this map
  355. if len(repoTag) > 0 {
  356. rootRepoMap[repoName] = graph.Repository{repoTag: img.ID}
  357. }
  358. } else {
  359. // this must be an ID that didn't get looked up just right?
  360. if err := srv.exportImage(job.Eng, name, tempdir); err != nil {
  361. return job.Error(err)
  362. }
  363. }
  364. }
  365. // write repositories, if there is something to write
  366. if len(rootRepoMap) > 0 {
  367. rootRepoJson, _ := json.Marshal(rootRepoMap)
  368. if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.FileMode(0644)); err != nil {
  369. return job.Error(err)
  370. }
  371. } else {
  372. utils.Debugf("There were no repositories to write")
  373. }
  374. fs, err := archive.Tar(tempdir, archive.Uncompressed)
  375. if err != nil {
  376. return job.Error(err)
  377. }
  378. defer fs.Close()
  379. if _, err := io.Copy(job.Stdout, fs); err != nil {
  380. return job.Error(err)
  381. }
  382. utils.Debugf("End Serializing %s", name)
  383. return engine.StatusOK
  384. }
  385. func (srv *Server) exportImage(eng *engine.Engine, name, tempdir string) error {
  386. for n := name; n != ""; {
  387. // temporary directory
  388. tmpImageDir := path.Join(tempdir, n)
  389. if err := os.Mkdir(tmpImageDir, os.FileMode(0755)); err != nil {
  390. if os.IsExist(err) {
  391. return nil
  392. }
  393. return err
  394. }
  395. var version = "1.0"
  396. var versionBuf = []byte(version)
  397. if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.FileMode(0644)); err != nil {
  398. return err
  399. }
  400. // serialize json
  401. json, err := os.Create(path.Join(tmpImageDir, "json"))
  402. if err != nil {
  403. return err
  404. }
  405. job := eng.Job("image_inspect", n)
  406. job.SetenvBool("raw", true)
  407. job.Stdout.Add(json)
  408. if err := job.Run(); err != nil {
  409. return err
  410. }
  411. // serialize filesystem
  412. fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar"))
  413. if err != nil {
  414. return err
  415. }
  416. job = eng.Job("image_tarlayer", n)
  417. job.Stdout.Add(fsTar)
  418. if err := job.Run(); err != nil {
  419. return err
  420. }
  421. // find parent
  422. job = eng.Job("image_get", n)
  423. info, _ := job.Stdout.AddEnv()
  424. if err := job.Run(); err != nil {
  425. return err
  426. }
  427. n = info.Get("Parent")
  428. }
  429. return nil
  430. }
  431. func (srv *Server) Build(job *engine.Job) engine.Status {
  432. if len(job.Args) != 0 {
  433. return job.Errorf("Usage: %s\n", job.Name)
  434. }
  435. var (
  436. remoteURL = job.Getenv("remote")
  437. repoName = job.Getenv("t")
  438. suppressOutput = job.GetenvBool("q")
  439. noCache = job.GetenvBool("nocache")
  440. rm = job.GetenvBool("rm")
  441. forceRm = job.GetenvBool("forcerm")
  442. authConfig = &registry.AuthConfig{}
  443. configFile = &registry.ConfigFile{}
  444. tag string
  445. context io.ReadCloser
  446. )
  447. job.GetenvJson("authConfig", authConfig)
  448. job.GetenvJson("configFile", configFile)
  449. repoName, tag = utils.ParseRepositoryTag(repoName)
  450. if remoteURL == "" {
  451. context = ioutil.NopCloser(job.Stdin)
  452. } else if utils.IsGIT(remoteURL) {
  453. if !strings.HasPrefix(remoteURL, "git://") {
  454. remoteURL = "https://" + remoteURL
  455. }
  456. root, err := ioutil.TempDir("", "docker-build-git")
  457. if err != nil {
  458. return job.Error(err)
  459. }
  460. defer os.RemoveAll(root)
  461. if output, err := exec.Command("git", "clone", "--recursive", remoteURL, root).CombinedOutput(); err != nil {
  462. return job.Errorf("Error trying to use git: %s (%s)", err, output)
  463. }
  464. c, err := archive.Tar(root, archive.Uncompressed)
  465. if err != nil {
  466. return job.Error(err)
  467. }
  468. context = c
  469. } else if utils.IsURL(remoteURL) {
  470. f, err := utils.Download(remoteURL)
  471. if err != nil {
  472. return job.Error(err)
  473. }
  474. defer f.Body.Close()
  475. dockerFile, err := ioutil.ReadAll(f.Body)
  476. if err != nil {
  477. return job.Error(err)
  478. }
  479. c, err := archive.Generate("Dockerfile", string(dockerFile))
  480. if err != nil {
  481. return job.Error(err)
  482. }
  483. context = c
  484. }
  485. defer context.Close()
  486. sf := utils.NewStreamFormatter(job.GetenvBool("json"))
  487. b := NewBuildFile(srv,
  488. &utils.StdoutFormater{
  489. Writer: job.Stdout,
  490. StreamFormatter: sf,
  491. },
  492. &utils.StderrFormater{
  493. Writer: job.Stdout,
  494. StreamFormatter: sf,
  495. },
  496. !suppressOutput, !noCache, rm, forceRm, job.Stdout, sf, authConfig, configFile)
  497. id, err := b.Build(context)
  498. if err != nil {
  499. return job.Error(err)
  500. }
  501. if repoName != "" {
  502. srv.daemon.Repositories().Set(repoName, tag, id, false)
  503. }
  504. return engine.StatusOK
  505. }
  506. // Loads a set of images into the repository. This is the complementary of ImageExport.
  507. // The input stream is an uncompressed tar ball containing images and metadata.
  508. func (srv *Server) ImageLoad(job *engine.Job) engine.Status {
  509. tmpImageDir, err := ioutil.TempDir("", "docker-import-")
  510. if err != nil {
  511. return job.Error(err)
  512. }
  513. defer os.RemoveAll(tmpImageDir)
  514. var (
  515. repoTarFile = path.Join(tmpImageDir, "repo.tar")
  516. repoDir = path.Join(tmpImageDir, "repo")
  517. )
  518. tarFile, err := os.Create(repoTarFile)
  519. if err != nil {
  520. return job.Error(err)
  521. }
  522. if _, err := io.Copy(tarFile, job.Stdin); err != nil {
  523. return job.Error(err)
  524. }
  525. tarFile.Close()
  526. repoFile, err := os.Open(repoTarFile)
  527. if err != nil {
  528. return job.Error(err)
  529. }
  530. if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
  531. return job.Error(err)
  532. }
  533. if err := archive.Untar(repoFile, repoDir, nil); err != nil {
  534. return job.Error(err)
  535. }
  536. dirs, err := ioutil.ReadDir(repoDir)
  537. if err != nil {
  538. return job.Error(err)
  539. }
  540. for _, d := range dirs {
  541. if d.IsDir() {
  542. if err := srv.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil {
  543. return job.Error(err)
  544. }
  545. }
  546. }
  547. repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
  548. if err == nil {
  549. repositories := map[string]graph.Repository{}
  550. if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
  551. return job.Error(err)
  552. }
  553. for imageName, tagMap := range repositories {
  554. for tag, address := range tagMap {
  555. if err := srv.daemon.Repositories().Set(imageName, tag, address, true); err != nil {
  556. return job.Error(err)
  557. }
  558. }
  559. }
  560. } else if !os.IsNotExist(err) {
  561. return job.Error(err)
  562. }
  563. return engine.StatusOK
  564. }
  565. func (srv *Server) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error {
  566. if err := eng.Job("image_get", address).Run(); err != nil {
  567. utils.Debugf("Loading %s", address)
  568. imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
  569. if err != nil {
  570. utils.Debugf("Error reading json", err)
  571. return err
  572. }
  573. layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
  574. if err != nil {
  575. utils.Debugf("Error reading embedded tar", err)
  576. return err
  577. }
  578. img, err := image.NewImgJSON(imageJson)
  579. if err != nil {
  580. utils.Debugf("Error unmarshalling json", err)
  581. return err
  582. }
  583. if img.Parent != "" {
  584. if !srv.daemon.Graph().Exists(img.Parent) {
  585. if err := srv.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil {
  586. return err
  587. }
  588. }
  589. }
  590. if err := srv.daemon.Graph().Register(imageJson, layer, img); err != nil {
  591. return err
  592. }
  593. }
  594. utils.Debugf("Completed processing %s", address)
  595. return nil
  596. }
  597. func (srv *Server) ImagesViz(job *engine.Job) engine.Status {
  598. images, _ := srv.daemon.Graph().Map()
  599. if images == nil {
  600. return engine.StatusOK
  601. }
  602. job.Stdout.Write([]byte("digraph docker {\n"))
  603. var (
  604. parentImage *image.Image
  605. err error
  606. )
  607. for _, image := range images {
  608. parentImage, err = image.GetParent()
  609. if err != nil {
  610. return job.Errorf("Error while getting parent image: %v", err)
  611. }
  612. if parentImage != nil {
  613. job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n"))
  614. } else {
  615. job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n"))
  616. }
  617. }
  618. for id, repos := range srv.daemon.Repositories().GetRepoRefs() {
  619. job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
  620. }
  621. job.Stdout.Write([]byte(" base [style=invisible]\n}\n"))
  622. return engine.StatusOK
  623. }
  624. func (srv *Server) Images(job *engine.Job) engine.Status {
  625. var (
  626. allImages map[string]*image.Image
  627. err error
  628. filt_tagged = true
  629. )
  630. imageFilters, err := filters.FromParam(job.Getenv("filters"))
  631. if err != nil {
  632. return job.Error(err)
  633. }
  634. if i, ok := imageFilters["dangling"]; ok {
  635. for _, value := range i {
  636. if strings.ToLower(value) == "true" {
  637. filt_tagged = false
  638. }
  639. }
  640. }
  641. if job.GetenvBool("all") && filt_tagged {
  642. allImages, err = srv.daemon.Graph().Map()
  643. } else {
  644. allImages, err = srv.daemon.Graph().Heads()
  645. }
  646. if err != nil {
  647. return job.Error(err)
  648. }
  649. lookup := make(map[string]*engine.Env)
  650. srv.daemon.Repositories().Lock()
  651. for name, repository := range srv.daemon.Repositories().Repositories {
  652. if job.Getenv("filter") != "" {
  653. if match, _ := path.Match(job.Getenv("filter"), name); !match {
  654. continue
  655. }
  656. }
  657. for tag, id := range repository {
  658. image, err := srv.daemon.Graph().Get(id)
  659. if err != nil {
  660. log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
  661. continue
  662. }
  663. if out, exists := lookup[id]; exists {
  664. if filt_tagged {
  665. out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag)))
  666. }
  667. } else {
  668. // get the boolean list for if only the untagged images are requested
  669. delete(allImages, id)
  670. if filt_tagged {
  671. out := &engine.Env{}
  672. out.Set("ParentId", image.Parent)
  673. out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)})
  674. out.Set("Id", image.ID)
  675. out.SetInt64("Created", image.Created.Unix())
  676. out.SetInt64("Size", image.Size)
  677. out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
  678. lookup[id] = out
  679. }
  680. }
  681. }
  682. }
  683. srv.daemon.Repositories().Unlock()
  684. outs := engine.NewTable("Created", len(lookup))
  685. for _, value := range lookup {
  686. outs.Add(value)
  687. }
  688. // Display images which aren't part of a repository/tag
  689. if job.Getenv("filter") == "" {
  690. for _, image := range allImages {
  691. out := &engine.Env{}
  692. out.Set("ParentId", image.Parent)
  693. out.SetList("RepoTags", []string{"<none>:<none>"})
  694. out.Set("Id", image.ID)
  695. out.SetInt64("Created", image.Created.Unix())
  696. out.SetInt64("Size", image.Size)
  697. out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
  698. outs.Add(out)
  699. }
  700. }
  701. outs.ReverseSort()
  702. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  703. return job.Error(err)
  704. }
  705. return engine.StatusOK
  706. }
  707. func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
  708. images, _ := srv.daemon.Graph().Map()
  709. var imgcount int
  710. if images == nil {
  711. imgcount = 0
  712. } else {
  713. imgcount = len(images)
  714. }
  715. kernelVersion := "<unknown>"
  716. if kv, err := utils.GetKernelVersion(); err == nil {
  717. kernelVersion = kv.String()
  718. }
  719. // if we still have the original dockerinit binary from before we copied it locally, let's return the path to that, since that's more intuitive (the copied path is trivial to derive by hand given VERSION)
  720. initPath := utils.DockerInitPath("")
  721. if initPath == "" {
  722. // if that fails, we'll just return the path from the daemon
  723. initPath = srv.daemon.SystemInitPath()
  724. }
  725. v := &engine.Env{}
  726. v.SetInt("Containers", len(srv.daemon.List()))
  727. v.SetInt("Images", imgcount)
  728. v.Set("Driver", srv.daemon.GraphDriver().String())
  729. v.SetJson("DriverStatus", srv.daemon.GraphDriver().Status())
  730. v.SetBool("MemoryLimit", srv.daemon.SystemConfig().MemoryLimit)
  731. v.SetBool("SwapLimit", srv.daemon.SystemConfig().SwapLimit)
  732. v.SetBool("IPv4Forwarding", !srv.daemon.SystemConfig().IPv4ForwardingDisabled)
  733. v.SetBool("Debug", os.Getenv("DEBUG") != "")
  734. v.SetInt("NFd", utils.GetTotalUsedFds())
  735. v.SetInt("NGoroutines", runtime.NumGoroutine())
  736. v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name())
  737. v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount())
  738. v.Set("KernelVersion", kernelVersion)
  739. v.Set("IndexServerAddress", registry.IndexServerAddress())
  740. v.Set("InitSha1", dockerversion.INITSHA1)
  741. v.Set("InitPath", initPath)
  742. v.SetList("Sockets", srv.daemon.Sockets)
  743. if _, err := v.WriteTo(job.Stdout); err != nil {
  744. return job.Error(err)
  745. }
  746. return engine.StatusOK
  747. }
  748. func (srv *Server) ImageHistory(job *engine.Job) engine.Status {
  749. if n := len(job.Args); n != 1 {
  750. return job.Errorf("Usage: %s IMAGE", job.Name)
  751. }
  752. name := job.Args[0]
  753. foundImage, err := srv.daemon.Repositories().LookupImage(name)
  754. if err != nil {
  755. return job.Error(err)
  756. }
  757. lookupMap := make(map[string][]string)
  758. for name, repository := range srv.daemon.Repositories().Repositories {
  759. for tag, id := range repository {
  760. // If the ID already has a reverse lookup, do not update it unless for "latest"
  761. if _, exists := lookupMap[id]; !exists {
  762. lookupMap[id] = []string{}
  763. }
  764. lookupMap[id] = append(lookupMap[id], name+":"+tag)
  765. }
  766. }
  767. outs := engine.NewTable("Created", 0)
  768. err = foundImage.WalkHistory(func(img *image.Image) error {
  769. out := &engine.Env{}
  770. out.Set("Id", img.ID)
  771. out.SetInt64("Created", img.Created.Unix())
  772. out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " "))
  773. out.SetList("Tags", lookupMap[img.ID])
  774. out.SetInt64("Size", img.Size)
  775. outs.Add(out)
  776. return nil
  777. })
  778. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  779. return job.Error(err)
  780. }
  781. return engine.StatusOK
  782. }
  783. func (srv *Server) ContainerTop(job *engine.Job) engine.Status {
  784. if len(job.Args) != 1 && len(job.Args) != 2 {
  785. return job.Errorf("Not enough arguments. Usage: %s CONTAINER [PS_ARGS]\n", job.Name)
  786. }
  787. var (
  788. name = job.Args[0]
  789. psArgs = "-ef"
  790. )
  791. if len(job.Args) == 2 && job.Args[1] != "" {
  792. psArgs = job.Args[1]
  793. }
  794. if container := srv.daemon.Get(name); container != nil {
  795. if !container.State.IsRunning() {
  796. return job.Errorf("Container %s is not running", name)
  797. }
  798. pids, err := srv.daemon.ExecutionDriver().GetPidsForContainer(container.ID)
  799. if err != nil {
  800. return job.Error(err)
  801. }
  802. output, err := exec.Command("ps", psArgs).Output()
  803. if err != nil {
  804. return job.Errorf("Error running ps: %s", err)
  805. }
  806. lines := strings.Split(string(output), "\n")
  807. header := strings.Fields(lines[0])
  808. out := &engine.Env{}
  809. out.SetList("Titles", header)
  810. pidIndex := -1
  811. for i, name := range header {
  812. if name == "PID" {
  813. pidIndex = i
  814. }
  815. }
  816. if pidIndex == -1 {
  817. return job.Errorf("Couldn't find PID field in ps output")
  818. }
  819. processes := [][]string{}
  820. for _, line := range lines[1:] {
  821. if len(line) == 0 {
  822. continue
  823. }
  824. fields := strings.Fields(line)
  825. p, err := strconv.Atoi(fields[pidIndex])
  826. if err != nil {
  827. return job.Errorf("Unexpected pid '%s': %s", fields[pidIndex], err)
  828. }
  829. for _, pid := range pids {
  830. if pid == p {
  831. // Make sure number of fields equals number of header titles
  832. // merging "overhanging" fields
  833. process := fields[:len(header)-1]
  834. process = append(process, strings.Join(fields[len(header)-1:], " "))
  835. processes = append(processes, process)
  836. }
  837. }
  838. }
  839. out.SetJson("Processes", processes)
  840. out.WriteTo(job.Stdout)
  841. return engine.StatusOK
  842. }
  843. return job.Errorf("No such container: %s", name)
  844. }
  845. func (srv *Server) ContainerChanges(job *engine.Job) engine.Status {
  846. if n := len(job.Args); n != 1 {
  847. return job.Errorf("Usage: %s CONTAINER", job.Name)
  848. }
  849. name := job.Args[0]
  850. if container := srv.daemon.Get(name); container != nil {
  851. outs := engine.NewTable("", 0)
  852. changes, err := container.Changes()
  853. if err != nil {
  854. return job.Error(err)
  855. }
  856. for _, change := range changes {
  857. out := &engine.Env{}
  858. if err := out.Import(change); err != nil {
  859. return job.Error(err)
  860. }
  861. outs.Add(out)
  862. }
  863. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  864. return job.Error(err)
  865. }
  866. } else {
  867. return job.Errorf("No such container: %s", name)
  868. }
  869. return engine.StatusOK
  870. }
  871. func (srv *Server) Containers(job *engine.Job) engine.Status {
  872. var (
  873. foundBefore bool
  874. displayed int
  875. all = job.GetenvBool("all")
  876. since = job.Getenv("since")
  877. before = job.Getenv("before")
  878. n = job.GetenvInt("limit")
  879. size = job.GetenvBool("size")
  880. )
  881. outs := engine.NewTable("Created", 0)
  882. names := map[string][]string{}
  883. srv.daemon.ContainerGraph().Walk("/", func(p string, e *graphdb.Entity) error {
  884. names[e.ID()] = append(names[e.ID()], p)
  885. return nil
  886. }, -1)
  887. var beforeCont, sinceCont *daemon.Container
  888. if before != "" {
  889. beforeCont = srv.daemon.Get(before)
  890. if beforeCont == nil {
  891. return job.Error(fmt.Errorf("Could not find container with name or id %s", before))
  892. }
  893. }
  894. if since != "" {
  895. sinceCont = srv.daemon.Get(since)
  896. if sinceCont == nil {
  897. return job.Error(fmt.Errorf("Could not find container with name or id %s", since))
  898. }
  899. }
  900. errLast := errors.New("last container")
  901. writeCont := func(container *daemon.Container) error {
  902. container.Lock()
  903. defer container.Unlock()
  904. if !container.State.IsRunning() && !all && n <= 0 && since == "" && before == "" {
  905. return nil
  906. }
  907. if before != "" && !foundBefore {
  908. if container.ID == beforeCont.ID {
  909. foundBefore = true
  910. }
  911. return nil
  912. }
  913. if n > 0 && displayed == n {
  914. return errLast
  915. }
  916. if since != "" {
  917. if container.ID == sinceCont.ID {
  918. return errLast
  919. }
  920. }
  921. displayed++
  922. out := &engine.Env{}
  923. out.Set("Id", container.ID)
  924. out.SetList("Names", names[container.ID])
  925. out.Set("Image", srv.daemon.Repositories().ImageName(container.Image))
  926. if len(container.Args) > 0 {
  927. args := []string{}
  928. for _, arg := range container.Args {
  929. if strings.Contains(arg, " ") {
  930. args = append(args, fmt.Sprintf("'%s'", arg))
  931. } else {
  932. args = append(args, arg)
  933. }
  934. }
  935. argsAsString := strings.Join(args, " ")
  936. out.Set("Command", fmt.Sprintf("\"%s %s\"", container.Path, argsAsString))
  937. } else {
  938. out.Set("Command", fmt.Sprintf("\"%s\"", container.Path))
  939. }
  940. out.SetInt64("Created", container.Created.Unix())
  941. out.Set("Status", container.State.String())
  942. str, err := container.NetworkSettings.PortMappingAPI().ToListString()
  943. if err != nil {
  944. return err
  945. }
  946. out.Set("Ports", str)
  947. if size {
  948. sizeRw, sizeRootFs := container.GetSize()
  949. out.SetInt64("SizeRw", sizeRw)
  950. out.SetInt64("SizeRootFs", sizeRootFs)
  951. }
  952. outs.Add(out)
  953. return nil
  954. }
  955. for _, container := range srv.daemon.List() {
  956. if err := writeCont(container); err != nil {
  957. if err != errLast {
  958. return job.Error(err)
  959. }
  960. break
  961. }
  962. }
  963. outs.ReverseSort()
  964. if _, err := outs.WriteListTo(job.Stdout); err != nil {
  965. return job.Error(err)
  966. }
  967. return engine.StatusOK
  968. }
  969. func (srv *Server) ContainerCommit(job *engine.Job) engine.Status {
  970. if len(job.Args) != 1 {
  971. return job.Errorf("Not enough arguments. Usage: %s CONTAINER\n", job.Name)
  972. }
  973. name := job.Args[0]
  974. container := srv.daemon.Get(name)
  975. if container == nil {
  976. return job.Errorf("No such container: %s", name)
  977. }
  978. var (
  979. config = container.Config
  980. newConfig runconfig.Config
  981. )
  982. if err := job.GetenvJson("config", &newConfig); err != nil {
  983. return job.Error(err)
  984. }
  985. if err := runconfig.Merge(&newConfig, config); err != nil {
  986. return job.Error(err)
  987. }
  988. img, err := srv.daemon.Commit(container, job.Getenv("repo"), job.Getenv("tag"), job.Getenv("comment"), job.Getenv("author"), job.GetenvBool("pause"), &newConfig)
  989. if err != nil {
  990. return job.Error(err)
  991. }
  992. job.Printf("%s\n", img.ID)
  993. return engine.StatusOK
  994. }
  995. func (srv *Server) ImageTag(job *engine.Job) engine.Status {
  996. if len(job.Args) != 2 && len(job.Args) != 3 {
  997. return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name)
  998. }
  999. var tag string
  1000. if len(job.Args) == 3 {
  1001. tag = job.Args[2]
  1002. }
  1003. if err := srv.daemon.Repositories().Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil {
  1004. return job.Error(err)
  1005. }
  1006. return engine.StatusOK
  1007. }
  1008. func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
  1009. history, err := r.GetRemoteHistory(imgID, endpoint, token)
  1010. if err != nil {
  1011. return err
  1012. }
  1013. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
  1014. // FIXME: Try to stream the images?
  1015. // FIXME: Launch the getRemoteImage() in goroutines
  1016. for i := len(history) - 1; i >= 0; i-- {
  1017. id := history[i]
  1018. // ensure no two downloads of the same layer happen at the same time
  1019. if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
  1020. utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
  1021. <-c
  1022. }
  1023. defer srv.poolRemove("pull", "layer:"+id)
  1024. if !srv.daemon.Graph().Exists(id) {
  1025. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
  1026. var (
  1027. imgJSON []byte
  1028. imgSize int
  1029. err error
  1030. img *image.Image
  1031. )
  1032. retries := 5
  1033. for j := 1; j <= retries; j++ {
  1034. imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
  1035. if err != nil && j == retries {
  1036. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1037. return err
  1038. } else if err != nil {
  1039. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1040. continue
  1041. }
  1042. img, err = image.NewImgJSON(imgJSON)
  1043. if err != nil && j == retries {
  1044. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1045. return fmt.Errorf("Failed to parse json: %s", err)
  1046. } else if err != nil {
  1047. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1048. continue
  1049. } else {
  1050. break
  1051. }
  1052. }
  1053. for j := 1; j <= retries; j++ {
  1054. // Get the layer
  1055. status := "Pulling fs layer"
  1056. if j > 1 {
  1057. status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
  1058. }
  1059. out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
  1060. layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
  1061. if uerr, ok := err.(*url.Error); ok {
  1062. err = uerr.Err
  1063. }
  1064. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  1065. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1066. continue
  1067. } else if err != nil {
  1068. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  1069. return err
  1070. }
  1071. defer layer.Close()
  1072. err = srv.daemon.Graph().Register(imgJSON,
  1073. utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"),
  1074. img)
  1075. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  1076. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  1077. continue
  1078. } else if err != nil {
  1079. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
  1080. return err
  1081. } else {
  1082. break
  1083. }
  1084. }
  1085. }
  1086. out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
  1087. }
  1088. return nil
  1089. }
  1090. func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
  1091. out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
  1092. repoData, err := r.GetRepositoryData(remoteName)
  1093. if err != nil {
  1094. return err
  1095. }
  1096. utils.Debugf("Retrieving the tag list")
  1097. tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
  1098. if err != nil {
  1099. utils.Errorf("%v", err)
  1100. return err
  1101. }
  1102. for tag, id := range tagsList {
  1103. repoData.ImgList[id] = &registry.ImgData{
  1104. ID: id,
  1105. Tag: tag,
  1106. Checksum: "",
  1107. }
  1108. }
  1109. utils.Debugf("Registering tags")
  1110. // If no tag has been specified, pull them all
  1111. if askedTag == "" {
  1112. for tag, id := range tagsList {
  1113. repoData.ImgList[id].Tag = tag
  1114. }
  1115. } else {
  1116. // Otherwise, check that the tag exists and use only that one
  1117. id, exists := tagsList[askedTag]
  1118. if !exists {
  1119. return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
  1120. }
  1121. repoData.ImgList[id].Tag = askedTag
  1122. }
  1123. errors := make(chan error)
  1124. for _, image := range repoData.ImgList {
  1125. downloadImage := func(img *registry.ImgData) {
  1126. if askedTag != "" && img.Tag != askedTag {
  1127. utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
  1128. if parallel {
  1129. errors <- nil
  1130. }
  1131. return
  1132. }
  1133. if img.Tag == "" {
  1134. utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
  1135. if parallel {
  1136. errors <- nil
  1137. }
  1138. return
  1139. }
  1140. // ensure no two downloads of the same image happen at the same time
  1141. if c, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
  1142. if c != nil {
  1143. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
  1144. <-c
  1145. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  1146. } else {
  1147. utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
  1148. }
  1149. if parallel {
  1150. errors <- nil
  1151. }
  1152. return
  1153. }
  1154. defer srv.poolRemove("pull", "img:"+img.ID)
  1155. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
  1156. success := false
  1157. var lastErr error
  1158. for _, ep := range repoData.Endpoints {
  1159. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
  1160. if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
  1161. // It's not ideal that only the last error is returned, it would be better to concatenate the errors.
  1162. // As the error is also given to the output stream the user will see the error.
  1163. lastErr = err
  1164. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
  1165. continue
  1166. }
  1167. success = true
  1168. break
  1169. }
  1170. if !success {
  1171. err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
  1172. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
  1173. if parallel {
  1174. errors <- err
  1175. return
  1176. }
  1177. }
  1178. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  1179. if parallel {
  1180. errors <- nil
  1181. }
  1182. }
  1183. if parallel {
  1184. go downloadImage(image)
  1185. } else {
  1186. downloadImage(image)
  1187. }
  1188. }
  1189. if parallel {
  1190. var lastError error
  1191. for i := 0; i < len(repoData.ImgList); i++ {
  1192. if err := <-errors; err != nil {
  1193. lastError = err
  1194. }
  1195. }
  1196. if lastError != nil {
  1197. return lastError
  1198. }
  1199. }
  1200. for tag, id := range tagsList {
  1201. if askedTag != "" && tag != askedTag {
  1202. continue
  1203. }
  1204. if err := srv.daemon.Repositories().Set(localName, tag, id, true); err != nil {
  1205. return err
  1206. }
  1207. }
  1208. return nil
  1209. }
  1210. func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
  1211. srv.Lock()
  1212. defer srv.Unlock()
  1213. if c, exists := srv.pullingPool[key]; exists {
  1214. return c, fmt.Errorf("pull %s is already in progress", key)
  1215. }
  1216. if c, exists := srv.pushingPool[key]; exists {
  1217. return c, fmt.Errorf("push %s is already in progress", key)
  1218. }
  1219. c := make(chan struct{})
  1220. switch kind {
  1221. case "pull":
  1222. srv.pullingPool[key] = c
  1223. case "push":
  1224. srv.pushingPool[key] = c
  1225. default:
  1226. return nil, fmt.Errorf("Unknown pool type")
  1227. }
  1228. return c, nil
  1229. }
  1230. func (srv *Server) poolRemove(kind, key string) error {
  1231. srv.Lock()
  1232. defer srv.Unlock()
  1233. switch kind {
  1234. case "pull":
  1235. if c, exists := srv.pullingPool[key]; exists {
  1236. close(c)
  1237. delete(srv.pullingPool, key)
  1238. }
  1239. case "push":
  1240. if c, exists := srv.pushingPool[key]; exists {
  1241. close(c)
  1242. delete(srv.pushingPool, key)
  1243. }
  1244. default:
  1245. return fmt.Errorf("Unknown pool type")
  1246. }
  1247. return nil
  1248. }
  1249. func (srv *Server) ImagePull(job *engine.Job) engine.Status {
  1250. if n := len(job.Args); n != 1 && n != 2 {
  1251. return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
  1252. }
  1253. var (
  1254. localName = job.Args[0]
  1255. tag string
  1256. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1257. authConfig = &registry.AuthConfig{}
  1258. metaHeaders map[string][]string
  1259. )
  1260. if len(job.Args) > 1 {
  1261. tag = job.Args[1]
  1262. }
  1263. job.GetenvJson("authConfig", authConfig)
  1264. job.GetenvJson("metaHeaders", &metaHeaders)
  1265. c, err := srv.poolAdd("pull", localName+":"+tag)
  1266. if err != nil {
  1267. if c != nil {
  1268. // Another pull of the same repository is already taking place; just wait for it to finish
  1269. job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
  1270. <-c
  1271. return engine.StatusOK
  1272. }
  1273. return job.Error(err)
  1274. }
  1275. defer srv.poolRemove("pull", localName+":"+tag)
  1276. // Resolve the Repository name from fqn to endpoint + name
  1277. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  1278. if err != nil {
  1279. return job.Error(err)
  1280. }
  1281. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  1282. if err != nil {
  1283. return job.Error(err)
  1284. }
  1285. r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
  1286. if err != nil {
  1287. return job.Error(err)
  1288. }
  1289. if endpoint == registry.IndexServerAddress() {
  1290. // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
  1291. localName = remoteName
  1292. }
  1293. if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
  1294. return job.Error(err)
  1295. }
  1296. return engine.StatusOK
  1297. }
  1298. // Retrieve the all the images to be uploaded in the correct order
  1299. func (srv *Server) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  1300. var (
  1301. imageList []string
  1302. imagesSeen map[string]bool = make(map[string]bool)
  1303. tagsByImage map[string][]string = make(map[string][]string)
  1304. )
  1305. for tag, id := range localRepo {
  1306. if requestedTag != "" && requestedTag != tag {
  1307. continue
  1308. }
  1309. var imageListForThisTag []string
  1310. tagsByImage[id] = append(tagsByImage[id], tag)
  1311. for img, err := srv.daemon.Graph().Get(id); img != nil; img, err = img.GetParent() {
  1312. if err != nil {
  1313. return nil, nil, err
  1314. }
  1315. if imagesSeen[img.ID] {
  1316. // This image is already on the list, we can ignore it and all its parents
  1317. break
  1318. }
  1319. imagesSeen[img.ID] = true
  1320. imageListForThisTag = append(imageListForThisTag, img.ID)
  1321. }
  1322. // reverse the image list for this tag (so the "most"-parent image is first)
  1323. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  1324. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  1325. }
  1326. // append to main image list
  1327. imageList = append(imageList, imageListForThisTag...)
  1328. }
  1329. if len(imageList) == 0 {
  1330. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  1331. }
  1332. utils.Debugf("Image list: %v", imageList)
  1333. utils.Debugf("Tags by image: %v", tagsByImage)
  1334. return imageList, tagsByImage, nil
  1335. }
  1336. func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
  1337. out = utils.NewWriteFlusher(out)
  1338. utils.Debugf("Local repo: %s", localRepo)
  1339. imgList, tagsByImage, err := srv.getImageList(localRepo, tag)
  1340. if err != nil {
  1341. return err
  1342. }
  1343. out.Write(sf.FormatStatus("", "Sending image list"))
  1344. var (
  1345. repoData *registry.RepositoryData
  1346. imageIndex []*registry.ImgData
  1347. )
  1348. for _, imgId := range imgList {
  1349. if tags, exists := tagsByImage[imgId]; exists {
  1350. // If an image has tags you must add an entry in the image index
  1351. // for each tag
  1352. for _, tag := range tags {
  1353. imageIndex = append(imageIndex, &registry.ImgData{
  1354. ID: imgId,
  1355. Tag: tag,
  1356. })
  1357. }
  1358. } else {
  1359. // If the image does not have a tag it still needs to be sent to the
  1360. // registry with an empty tag so that it is accociated with the repository
  1361. imageIndex = append(imageIndex, &registry.ImgData{
  1362. ID: imgId,
  1363. Tag: "",
  1364. })
  1365. }
  1366. }
  1367. utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo)
  1368. for _, data := range imageIndex {
  1369. utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag)
  1370. }
  1371. // Register all the images in a repository with the registry
  1372. // If an image is not in this list it will not be associated with the repository
  1373. repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil)
  1374. if err != nil {
  1375. return err
  1376. }
  1377. nTag := 1
  1378. if tag == "" {
  1379. nTag = len(localRepo)
  1380. }
  1381. for _, ep := range repoData.Endpoints {
  1382. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag))
  1383. for _, imgId := range imgList {
  1384. if r.LookupRemoteImage(imgId, ep, repoData.Tokens) {
  1385. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
  1386. } else {
  1387. if _, err := srv.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
  1388. // FIXME: Continue on error?
  1389. return err
  1390. }
  1391. }
  1392. for _, tag := range tagsByImage[imgId] {
  1393. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag))
  1394. if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil {
  1395. return err
  1396. }
  1397. }
  1398. }
  1399. }
  1400. if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil {
  1401. return err
  1402. }
  1403. return nil
  1404. }
  1405. func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  1406. out = utils.NewWriteFlusher(out)
  1407. jsonRaw, err := ioutil.ReadFile(path.Join(srv.daemon.Graph().Root, imgID, "json"))
  1408. if err != nil {
  1409. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  1410. }
  1411. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  1412. imgData := &registry.ImgData{
  1413. ID: imgID,
  1414. }
  1415. // Send the json
  1416. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  1417. if err == registry.ErrAlreadyExists {
  1418. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  1419. return "", nil
  1420. }
  1421. return "", err
  1422. }
  1423. layerData, err := srv.daemon.Graph().TempLayerArchive(imgID, archive.Uncompressed, sf, out)
  1424. if err != nil {
  1425. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  1426. }
  1427. defer os.RemoveAll(layerData.Name())
  1428. // Send the layer
  1429. utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  1430. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  1431. if err != nil {
  1432. return "", err
  1433. }
  1434. imgData.Checksum = checksum
  1435. imgData.ChecksumPayload = checksumPayload
  1436. // Send the checksum
  1437. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  1438. return "", err
  1439. }
  1440. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  1441. return imgData.Checksum, nil
  1442. }
  1443. // FIXME: Allow to interrupt current push when new push of same image is done.
  1444. func (srv *Server) ImagePush(job *engine.Job) engine.Status {
  1445. if n := len(job.Args); n != 1 {
  1446. return job.Errorf("Usage: %s IMAGE", job.Name)
  1447. }
  1448. var (
  1449. localName = job.Args[0]
  1450. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1451. authConfig = &registry.AuthConfig{}
  1452. metaHeaders map[string][]string
  1453. )
  1454. tag := job.Getenv("tag")
  1455. job.GetenvJson("authConfig", authConfig)
  1456. job.GetenvJson("metaHeaders", &metaHeaders)
  1457. if _, err := srv.poolAdd("push", localName); err != nil {
  1458. return job.Error(err)
  1459. }
  1460. defer srv.poolRemove("push", localName)
  1461. // Resolve the Repository name from fqn to endpoint + name
  1462. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  1463. if err != nil {
  1464. return job.Error(err)
  1465. }
  1466. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  1467. if err != nil {
  1468. return job.Error(err)
  1469. }
  1470. img, err := srv.daemon.Graph().Get(localName)
  1471. r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
  1472. if err2 != nil {
  1473. return job.Error(err2)
  1474. }
  1475. if err != nil {
  1476. reposLen := 1
  1477. if tag == "" {
  1478. reposLen = len(srv.daemon.Repositories().Repositories[localName])
  1479. }
  1480. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
  1481. // If it fails, try to get the repository
  1482. if localRepo, exists := srv.daemon.Repositories().Repositories[localName]; exists {
  1483. if err := srv.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil {
  1484. return job.Error(err)
  1485. }
  1486. return engine.StatusOK
  1487. }
  1488. return job.Error(err)
  1489. }
  1490. var token []string
  1491. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
  1492. if _, err := srv.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil {
  1493. return job.Error(err)
  1494. }
  1495. return engine.StatusOK
  1496. }
  1497. func (srv *Server) ImageImport(job *engine.Job) engine.Status {
  1498. if n := len(job.Args); n != 2 && n != 3 {
  1499. return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
  1500. }
  1501. var (
  1502. src = job.Args[0]
  1503. repo = job.Args[1]
  1504. tag string
  1505. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  1506. archive archive.ArchiveReader
  1507. resp *http.Response
  1508. )
  1509. if len(job.Args) > 2 {
  1510. tag = job.Args[2]
  1511. }
  1512. if src == "-" {
  1513. archive = job.Stdin
  1514. } else {
  1515. u, err := url.Parse(src)
  1516. if err != nil {
  1517. return job.Error(err)
  1518. }
  1519. if u.Scheme == "" {
  1520. u.Scheme = "http"
  1521. u.Host = src
  1522. u.Path = ""
  1523. }
  1524. job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
  1525. // Download with curl (pretty progress bar)
  1526. // If curl is not available, fallback to http.Get()
  1527. resp, err = utils.Download(u.String())
  1528. if err != nil {
  1529. return job.Error(err)
  1530. }
  1531. progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
  1532. defer progressReader.Close()
  1533. archive = progressReader
  1534. }
  1535. img, err := srv.daemon.Graph().Create(archive, "", "", "Imported from "+src, "", nil, nil)
  1536. if err != nil {
  1537. return job.Error(err)
  1538. }
  1539. // Optionally register the image at REPO/TAG
  1540. if repo != "" {
  1541. if err := srv.daemon.Repositories().Set(repo, tag, img.ID, true); err != nil {
  1542. return job.Error(err)
  1543. }
  1544. }
  1545. job.Stdout.Write(sf.FormatStatus("", img.ID))
  1546. return engine.StatusOK
  1547. }
  1548. func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
  1549. var name string
  1550. if len(job.Args) == 1 {
  1551. name = job.Args[0]
  1552. } else if len(job.Args) > 1 {
  1553. return job.Errorf("Usage: %s", job.Name)
  1554. }
  1555. config := runconfig.ContainerConfigFromJob(job)
  1556. if config.Memory != 0 && config.Memory < 524288 {
  1557. return job.Errorf("Minimum memory limit allowed is 512k")
  1558. }
  1559. if config.Memory > 0 && !srv.daemon.SystemConfig().MemoryLimit {
  1560. job.Errorf("Your kernel does not support memory limit capabilities. Limitation discarded.\n")
  1561. config.Memory = 0
  1562. }
  1563. if config.Memory > 0 && !srv.daemon.SystemConfig().SwapLimit {
  1564. job.Errorf("Your kernel does not support swap limit capabilities. Limitation discarded.\n")
  1565. config.MemorySwap = -1
  1566. }
  1567. container, buildWarnings, err := srv.daemon.Create(config, name)
  1568. if err != nil {
  1569. if srv.daemon.Graph().IsNotExist(err) {
  1570. _, tag := utils.ParseRepositoryTag(config.Image)
  1571. if tag == "" {
  1572. tag = graph.DEFAULTTAG
  1573. }
  1574. return job.Errorf("No such image: %s (tag: %s)", config.Image, tag)
  1575. }
  1576. return job.Error(err)
  1577. }
  1578. if !container.Config.NetworkDisabled && srv.daemon.SystemConfig().IPv4ForwardingDisabled {
  1579. job.Errorf("IPv4 forwarding is disabled.\n")
  1580. }
  1581. srv.LogEvent("create", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  1582. // FIXME: this is necessary because daemon.Create might return a nil container
  1583. // with a non-nil error. This should not happen! Once it's fixed we
  1584. // can remove this workaround.
  1585. if container != nil {
  1586. job.Printf("%s\n", container.ID)
  1587. }
  1588. for _, warning := range buildWarnings {
  1589. job.Errorf("%s\n", warning)
  1590. }
  1591. return engine.StatusOK
  1592. }
  1593. func (srv *Server) ContainerRestart(job *engine.Job) engine.Status {
  1594. if len(job.Args) != 1 {
  1595. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1596. }
  1597. var (
  1598. name = job.Args[0]
  1599. t = 10
  1600. )
  1601. if job.EnvExists("t") {
  1602. t = job.GetenvInt("t")
  1603. }
  1604. if container := srv.daemon.Get(name); container != nil {
  1605. if err := container.Restart(int(t)); err != nil {
  1606. return job.Errorf("Cannot restart container %s: %s\n", name, err)
  1607. }
  1608. srv.LogEvent("restart", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  1609. } else {
  1610. return job.Errorf("No such container: %s\n", name)
  1611. }
  1612. return engine.StatusOK
  1613. }
  1614. func (srv *Server) ContainerDestroy(job *engine.Job) engine.Status {
  1615. if len(job.Args) != 1 {
  1616. return job.Errorf("Not enough arguments. Usage: %s CONTAINER\n", job.Name)
  1617. }
  1618. name := job.Args[0]
  1619. removeVolume := job.GetenvBool("removeVolume")
  1620. removeLink := job.GetenvBool("removeLink")
  1621. forceRemove := job.GetenvBool("forceRemove")
  1622. container := srv.daemon.Get(name)
  1623. if removeLink {
  1624. if container == nil {
  1625. return job.Errorf("No such link: %s", name)
  1626. }
  1627. name, err := daemon.GetFullContainerName(name)
  1628. if err != nil {
  1629. job.Error(err)
  1630. }
  1631. parent, n := path.Split(name)
  1632. if parent == "/" {
  1633. return job.Errorf("Conflict, cannot remove the default name of the container")
  1634. }
  1635. pe := srv.daemon.ContainerGraph().Get(parent)
  1636. if pe == nil {
  1637. return job.Errorf("Cannot get parent %s for name %s", parent, name)
  1638. }
  1639. parentContainer := srv.daemon.Get(pe.ID())
  1640. if parentContainer != nil {
  1641. parentContainer.DisableLink(n)
  1642. }
  1643. if err := srv.daemon.ContainerGraph().Delete(name); err != nil {
  1644. return job.Error(err)
  1645. }
  1646. return engine.StatusOK
  1647. }
  1648. if container != nil {
  1649. if container.State.IsRunning() {
  1650. if forceRemove {
  1651. if err := container.Stop(5); err != nil {
  1652. return job.Errorf("Could not stop running container, cannot remove - %v", err)
  1653. }
  1654. } else {
  1655. return job.Errorf("Impossible to remove a running container, please stop it first or use -f")
  1656. }
  1657. }
  1658. if err := srv.daemon.Destroy(container); err != nil {
  1659. return job.Errorf("Cannot destroy container %s: %s", name, err)
  1660. }
  1661. srv.LogEvent("destroy", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  1662. if removeVolume {
  1663. var (
  1664. volumes = make(map[string]struct{})
  1665. binds = make(map[string]struct{})
  1666. usedVolumes = make(map[string]*daemon.Container)
  1667. )
  1668. // the volume id is always the base of the path
  1669. getVolumeId := func(p string) string {
  1670. return filepath.Base(strings.TrimSuffix(p, "/layer"))
  1671. }
  1672. // populate bind map so that they can be skipped and not removed
  1673. for _, bind := range container.HostConfig().Binds {
  1674. source := strings.Split(bind, ":")[0]
  1675. // TODO: refactor all volume stuff, all of it
  1676. // it is very important that we eval the link or comparing the keys to container.Volumes will not work
  1677. //
  1678. // eval symlink can fail, ref #5244 if we receive an is not exist error we can ignore it
  1679. p, err := filepath.EvalSymlinks(source)
  1680. if err != nil && !os.IsNotExist(err) {
  1681. return job.Error(err)
  1682. }
  1683. if p != "" {
  1684. source = p
  1685. }
  1686. binds[source] = struct{}{}
  1687. }
  1688. // Store all the deleted containers volumes
  1689. for _, volumeId := range container.Volumes {
  1690. // Skip the volumes mounted from external
  1691. // bind mounts here will will be evaluated for a symlink
  1692. if _, exists := binds[volumeId]; exists {
  1693. continue
  1694. }
  1695. volumeId = getVolumeId(volumeId)
  1696. volumes[volumeId] = struct{}{}
  1697. }
  1698. // Retrieve all volumes from all remaining containers
  1699. for _, container := range srv.daemon.List() {
  1700. for _, containerVolumeId := range container.Volumes {
  1701. containerVolumeId = getVolumeId(containerVolumeId)
  1702. usedVolumes[containerVolumeId] = container
  1703. }
  1704. }
  1705. for volumeId := range volumes {
  1706. // If the requested volu
  1707. if c, exists := usedVolumes[volumeId]; exists {
  1708. log.Printf("The volume %s is used by the container %s. Impossible to remove it. Skipping.\n", volumeId, c.ID)
  1709. continue
  1710. }
  1711. if err := srv.daemon.Volumes().Delete(volumeId); err != nil {
  1712. return job.Errorf("Error calling volumes.Delete(%q): %v", volumeId, err)
  1713. }
  1714. }
  1715. }
  1716. } else {
  1717. return job.Errorf("No such container: %s", name)
  1718. }
  1719. return engine.StatusOK
  1720. }
  1721. func (srv *Server) DeleteImage(name string, imgs *engine.Table, first, force, noprune bool) error {
  1722. var (
  1723. repoName, tag string
  1724. tags = []string{}
  1725. tagDeleted bool
  1726. )
  1727. repoName, tag = utils.ParseRepositoryTag(name)
  1728. if tag == "" {
  1729. tag = graph.DEFAULTTAG
  1730. }
  1731. img, err := srv.daemon.Repositories().LookupImage(name)
  1732. if err != nil {
  1733. if r, _ := srv.daemon.Repositories().Get(repoName); r != nil {
  1734. return fmt.Errorf("No such image: %s:%s", repoName, tag)
  1735. }
  1736. return fmt.Errorf("No such image: %s", name)
  1737. }
  1738. if strings.Contains(img.ID, name) {
  1739. repoName = ""
  1740. tag = ""
  1741. }
  1742. byParents, err := srv.daemon.Graph().ByParent()
  1743. if err != nil {
  1744. return err
  1745. }
  1746. //If delete by id, see if the id belong only to one repository
  1747. if repoName == "" {
  1748. for _, repoAndTag := range srv.daemon.Repositories().ByID()[img.ID] {
  1749. parsedRepo, parsedTag := utils.ParseRepositoryTag(repoAndTag)
  1750. if repoName == "" || repoName == parsedRepo {
  1751. repoName = parsedRepo
  1752. if parsedTag != "" {
  1753. tags = append(tags, parsedTag)
  1754. }
  1755. } else if repoName != parsedRepo && !force {
  1756. // the id belongs to multiple repos, like base:latest and user:test,
  1757. // in that case return conflict
  1758. return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name)
  1759. }
  1760. }
  1761. } else {
  1762. tags = append(tags, tag)
  1763. }
  1764. if !first && len(tags) > 0 {
  1765. return nil
  1766. }
  1767. //Untag the current image
  1768. for _, tag := range tags {
  1769. tagDeleted, err = srv.daemon.Repositories().Delete(repoName, tag)
  1770. if err != nil {
  1771. return err
  1772. }
  1773. if tagDeleted {
  1774. out := &engine.Env{}
  1775. out.Set("Untagged", repoName+":"+tag)
  1776. imgs.Add(out)
  1777. srv.LogEvent("untag", img.ID, "")
  1778. }
  1779. }
  1780. tags = srv.daemon.Repositories().ByID()[img.ID]
  1781. if (len(tags) <= 1 && repoName == "") || len(tags) == 0 {
  1782. if len(byParents[img.ID]) == 0 {
  1783. if err := srv.canDeleteImage(img.ID, force, tagDeleted); err != nil {
  1784. return err
  1785. }
  1786. if err := srv.daemon.Repositories().DeleteAll(img.ID); err != nil {
  1787. return err
  1788. }
  1789. if err := srv.daemon.Graph().Delete(img.ID); err != nil {
  1790. return err
  1791. }
  1792. out := &engine.Env{}
  1793. out.Set("Deleted", img.ID)
  1794. imgs.Add(out)
  1795. srv.LogEvent("delete", img.ID, "")
  1796. if img.Parent != "" && !noprune {
  1797. err := srv.DeleteImage(img.Parent, imgs, false, force, noprune)
  1798. if first {
  1799. return err
  1800. }
  1801. }
  1802. }
  1803. }
  1804. return nil
  1805. }
  1806. func (srv *Server) ImageDelete(job *engine.Job) engine.Status {
  1807. if n := len(job.Args); n != 1 {
  1808. return job.Errorf("Usage: %s IMAGE", job.Name)
  1809. }
  1810. imgs := engine.NewTable("", 0)
  1811. if err := srv.DeleteImage(job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil {
  1812. return job.Error(err)
  1813. }
  1814. if len(imgs.Data) == 0 {
  1815. return job.Errorf("Conflict, %s wasn't deleted", job.Args[0])
  1816. }
  1817. if _, err := imgs.WriteListTo(job.Stdout); err != nil {
  1818. return job.Error(err)
  1819. }
  1820. return engine.StatusOK
  1821. }
  1822. func (srv *Server) canDeleteImage(imgID string, force, untagged bool) error {
  1823. var message string
  1824. if untagged {
  1825. message = " (docker untagged the image)"
  1826. }
  1827. for _, container := range srv.daemon.List() {
  1828. parent, err := srv.daemon.Repositories().LookupImage(container.Image)
  1829. if err != nil {
  1830. return err
  1831. }
  1832. if err := parent.WalkHistory(func(p *image.Image) error {
  1833. if imgID == p.ID {
  1834. if container.State.IsRunning() {
  1835. if force {
  1836. return fmt.Errorf("Conflict, cannot force delete %s because the running container %s is using it%s, stop it and retry", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
  1837. }
  1838. return fmt.Errorf("Conflict, cannot delete %s because the running container %s is using it%s, stop it and use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
  1839. } else if !force {
  1840. return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it%s, use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
  1841. }
  1842. }
  1843. return nil
  1844. }); err != nil {
  1845. return err
  1846. }
  1847. }
  1848. return nil
  1849. }
  1850. func (srv *Server) ImageGetCached(imgID string, config *runconfig.Config) (*image.Image, error) {
  1851. // Retrieve all images
  1852. images, err := srv.daemon.Graph().Map()
  1853. if err != nil {
  1854. return nil, err
  1855. }
  1856. // Store the tree in a map of map (map[parentId][childId])
  1857. imageMap := make(map[string]map[string]struct{})
  1858. for _, img := range images {
  1859. if _, exists := imageMap[img.Parent]; !exists {
  1860. imageMap[img.Parent] = make(map[string]struct{})
  1861. }
  1862. imageMap[img.Parent][img.ID] = struct{}{}
  1863. }
  1864. // Loop on the children of the given image and check the config
  1865. var match *image.Image
  1866. for elem := range imageMap[imgID] {
  1867. img, err := srv.daemon.Graph().Get(elem)
  1868. if err != nil {
  1869. return nil, err
  1870. }
  1871. if runconfig.Compare(&img.ContainerConfig, config) {
  1872. if match == nil || match.Created.Before(img.Created) {
  1873. match = img
  1874. }
  1875. }
  1876. }
  1877. return match, nil
  1878. }
  1879. func (srv *Server) setHostConfig(container *daemon.Container, hostConfig *runconfig.HostConfig) error {
  1880. // Validate the HostConfig binds. Make sure that:
  1881. // the source exists
  1882. for _, bind := range hostConfig.Binds {
  1883. splitBind := strings.Split(bind, ":")
  1884. source := splitBind[0]
  1885. // ensure the source exists on the host
  1886. _, err := os.Stat(source)
  1887. if err != nil && os.IsNotExist(err) {
  1888. err = os.MkdirAll(source, 0755)
  1889. if err != nil {
  1890. return fmt.Errorf("Could not create local directory '%s' for bind mount: %s!", source, err.Error())
  1891. }
  1892. }
  1893. }
  1894. // Register any links from the host config before starting the container
  1895. if err := srv.daemon.RegisterLinks(container, hostConfig); err != nil {
  1896. return err
  1897. }
  1898. container.SetHostConfig(hostConfig)
  1899. container.ToDisk()
  1900. return nil
  1901. }
  1902. func (srv *Server) ContainerStart(job *engine.Job) engine.Status {
  1903. if len(job.Args) < 1 {
  1904. return job.Errorf("Usage: %s container_id", job.Name)
  1905. }
  1906. var (
  1907. name = job.Args[0]
  1908. daemon = srv.daemon
  1909. container = daemon.Get(name)
  1910. )
  1911. if container == nil {
  1912. return job.Errorf("No such container: %s", name)
  1913. }
  1914. if container.State.IsRunning() {
  1915. return job.Errorf("Container already started")
  1916. }
  1917. // If no environment was set, then no hostconfig was passed.
  1918. if len(job.Environ()) > 0 {
  1919. hostConfig := runconfig.ContainerHostConfigFromJob(job)
  1920. if err := srv.setHostConfig(container, hostConfig); err != nil {
  1921. return job.Error(err)
  1922. }
  1923. }
  1924. if err := container.Start(); err != nil {
  1925. return job.Errorf("Cannot start container %s: %s", name, err)
  1926. }
  1927. srv.LogEvent("start", container.ID, daemon.Repositories().ImageName(container.Image))
  1928. return engine.StatusOK
  1929. }
  1930. func (srv *Server) ContainerStop(job *engine.Job) engine.Status {
  1931. if len(job.Args) != 1 {
  1932. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1933. }
  1934. var (
  1935. name = job.Args[0]
  1936. t = 10
  1937. )
  1938. if job.EnvExists("t") {
  1939. t = job.GetenvInt("t")
  1940. }
  1941. if container := srv.daemon.Get(name); container != nil {
  1942. if !container.State.IsRunning() {
  1943. return job.Errorf("Container already stopped")
  1944. }
  1945. if err := container.Stop(int(t)); err != nil {
  1946. return job.Errorf("Cannot stop container %s: %s\n", name, err)
  1947. }
  1948. srv.LogEvent("stop", container.ID, srv.daemon.Repositories().ImageName(container.Image))
  1949. } else {
  1950. return job.Errorf("No such container: %s\n", name)
  1951. }
  1952. return engine.StatusOK
  1953. }
  1954. func (srv *Server) ContainerWait(job *engine.Job) engine.Status {
  1955. if len(job.Args) != 1 {
  1956. return job.Errorf("Usage: %s", job.Name)
  1957. }
  1958. name := job.Args[0]
  1959. if container := srv.daemon.Get(name); container != nil {
  1960. status, _ := container.State.WaitStop(-1 * time.Second)
  1961. job.Printf("%d\n", status)
  1962. return engine.StatusOK
  1963. }
  1964. return job.Errorf("%s: no such container: %s", job.Name, name)
  1965. }
  1966. func (srv *Server) ContainerResize(job *engine.Job) engine.Status {
  1967. if len(job.Args) != 3 {
  1968. return job.Errorf("Not enough arguments. Usage: %s CONTAINER HEIGHT WIDTH\n", job.Name)
  1969. }
  1970. name := job.Args[0]
  1971. height, err := strconv.Atoi(job.Args[1])
  1972. if err != nil {
  1973. return job.Error(err)
  1974. }
  1975. width, err := strconv.Atoi(job.Args[2])
  1976. if err != nil {
  1977. return job.Error(err)
  1978. }
  1979. if container := srv.daemon.Get(name); container != nil {
  1980. if err := container.Resize(height, width); err != nil {
  1981. return job.Error(err)
  1982. }
  1983. return engine.StatusOK
  1984. }
  1985. return job.Errorf("No such container: %s", name)
  1986. }
  1987. func (srv *Server) ContainerLogs(job *engine.Job) engine.Status {
  1988. if len(job.Args) != 1 {
  1989. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  1990. }
  1991. var (
  1992. name = job.Args[0]
  1993. stdout = job.GetenvBool("stdout")
  1994. stderr = job.GetenvBool("stderr")
  1995. tail = job.Getenv("tail")
  1996. follow = job.GetenvBool("follow")
  1997. times = job.GetenvBool("timestamps")
  1998. lines = -1
  1999. format string
  2000. )
  2001. if !(stdout || stderr) {
  2002. return job.Errorf("You must choose at least one stream")
  2003. }
  2004. if times {
  2005. format = time.StampMilli
  2006. }
  2007. if tail == "" {
  2008. tail = "all"
  2009. }
  2010. container := srv.daemon.Get(name)
  2011. if container == nil {
  2012. return job.Errorf("No such container: %s", name)
  2013. }
  2014. cLog, err := container.ReadLog("json")
  2015. if err != nil && os.IsNotExist(err) {
  2016. // Legacy logs
  2017. utils.Debugf("Old logs format")
  2018. if stdout {
  2019. cLog, err := container.ReadLog("stdout")
  2020. if err != nil {
  2021. utils.Errorf("Error reading logs (stdout): %s", err)
  2022. } else if _, err := io.Copy(job.Stdout, cLog); err != nil {
  2023. utils.Errorf("Error streaming logs (stdout): %s", err)
  2024. }
  2025. }
  2026. if stderr {
  2027. cLog, err := container.ReadLog("stderr")
  2028. if err != nil {
  2029. utils.Errorf("Error reading logs (stderr): %s", err)
  2030. } else if _, err := io.Copy(job.Stderr, cLog); err != nil {
  2031. utils.Errorf("Error streaming logs (stderr): %s", err)
  2032. }
  2033. }
  2034. } else if err != nil {
  2035. utils.Errorf("Error reading logs (json): %s", err)
  2036. } else {
  2037. if tail != "all" {
  2038. var err error
  2039. lines, err = strconv.Atoi(tail)
  2040. if err != nil {
  2041. utils.Errorf("Failed to parse tail %s, error: %v, show all logs", err)
  2042. lines = -1
  2043. }
  2044. }
  2045. if lines != 0 {
  2046. if lines > 0 {
  2047. f := cLog.(*os.File)
  2048. ls, err := tailfile.TailFile(f, lines)
  2049. if err != nil {
  2050. return job.Error(err)
  2051. }
  2052. tmp := bytes.NewBuffer([]byte{})
  2053. for _, l := range ls {
  2054. fmt.Fprintf(tmp, "%s\n", l)
  2055. }
  2056. cLog = tmp
  2057. }
  2058. dec := json.NewDecoder(cLog)
  2059. for {
  2060. l := &utils.JSONLog{}
  2061. if err := dec.Decode(l); err == io.EOF {
  2062. break
  2063. } else if err != nil {
  2064. utils.Errorf("Error streaming logs: %s", err)
  2065. break
  2066. }
  2067. logLine := l.Log
  2068. if times {
  2069. logLine = fmt.Sprintf("[%s] %s", l.Created.Format(format), logLine)
  2070. }
  2071. if l.Stream == "stdout" && stdout {
  2072. fmt.Fprintf(job.Stdout, "%s", logLine)
  2073. }
  2074. if l.Stream == "stderr" && stderr {
  2075. fmt.Fprintf(job.Stderr, "%s", logLine)
  2076. }
  2077. }
  2078. }
  2079. }
  2080. if follow {
  2081. errors := make(chan error, 2)
  2082. if stdout {
  2083. stdoutPipe := container.StdoutLogPipe()
  2084. go func() {
  2085. errors <- utils.WriteLog(stdoutPipe, job.Stdout, format)
  2086. }()
  2087. }
  2088. if stderr {
  2089. stderrPipe := container.StderrLogPipe()
  2090. go func() {
  2091. errors <- utils.WriteLog(stderrPipe, job.Stderr, format)
  2092. }()
  2093. }
  2094. err := <-errors
  2095. if err != nil {
  2096. utils.Errorf("%s", err)
  2097. }
  2098. }
  2099. return engine.StatusOK
  2100. }
  2101. func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
  2102. if len(job.Args) != 1 {
  2103. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  2104. }
  2105. var (
  2106. name = job.Args[0]
  2107. logs = job.GetenvBool("logs")
  2108. stream = job.GetenvBool("stream")
  2109. stdin = job.GetenvBool("stdin")
  2110. stdout = job.GetenvBool("stdout")
  2111. stderr = job.GetenvBool("stderr")
  2112. )
  2113. container := srv.daemon.Get(name)
  2114. if container == nil {
  2115. return job.Errorf("No such container: %s", name)
  2116. }
  2117. //logs
  2118. if logs {
  2119. cLog, err := container.ReadLog("json")
  2120. if err != nil && os.IsNotExist(err) {
  2121. // Legacy logs
  2122. utils.Debugf("Old logs format")
  2123. if stdout {
  2124. cLog, err := container.ReadLog("stdout")
  2125. if err != nil {
  2126. utils.Errorf("Error reading logs (stdout): %s", err)
  2127. } else if _, err := io.Copy(job.Stdout, cLog); err != nil {
  2128. utils.Errorf("Error streaming logs (stdout): %s", err)
  2129. }
  2130. }
  2131. if stderr {
  2132. cLog, err := container.ReadLog("stderr")
  2133. if err != nil {
  2134. utils.Errorf("Error reading logs (stderr): %s", err)
  2135. } else if _, err := io.Copy(job.Stderr, cLog); err != nil {
  2136. utils.Errorf("Error streaming logs (stderr): %s", err)
  2137. }
  2138. }
  2139. } else if err != nil {
  2140. utils.Errorf("Error reading logs (json): %s", err)
  2141. } else {
  2142. dec := json.NewDecoder(cLog)
  2143. for {
  2144. l := &utils.JSONLog{}
  2145. if err := dec.Decode(l); err == io.EOF {
  2146. break
  2147. } else if err != nil {
  2148. utils.Errorf("Error streaming logs: %s", err)
  2149. break
  2150. }
  2151. if l.Stream == "stdout" && stdout {
  2152. fmt.Fprintf(job.Stdout, "%s", l.Log)
  2153. }
  2154. if l.Stream == "stderr" && stderr {
  2155. fmt.Fprintf(job.Stderr, "%s", l.Log)
  2156. }
  2157. }
  2158. }
  2159. }
  2160. //stream
  2161. if stream {
  2162. var (
  2163. cStdin io.ReadCloser
  2164. cStdout, cStderr io.Writer
  2165. cStdinCloser io.Closer
  2166. )
  2167. if stdin {
  2168. r, w := io.Pipe()
  2169. go func() {
  2170. defer w.Close()
  2171. defer utils.Debugf("Closing buffered stdin pipe")
  2172. io.Copy(w, job.Stdin)
  2173. }()
  2174. cStdin = r
  2175. cStdinCloser = job.Stdin
  2176. }
  2177. if stdout {
  2178. cStdout = job.Stdout
  2179. }
  2180. if stderr {
  2181. cStderr = job.Stderr
  2182. }
  2183. <-srv.daemon.Attach(container, cStdin, cStdinCloser, cStdout, cStderr)
  2184. // If we are in stdinonce mode, wait for the process to end
  2185. // otherwise, simply return
  2186. if container.Config.StdinOnce && !container.Config.Tty {
  2187. container.State.WaitStop(-1 * time.Second)
  2188. }
  2189. }
  2190. return engine.StatusOK
  2191. }
  2192. func (srv *Server) ContainerCopy(job *engine.Job) engine.Status {
  2193. if len(job.Args) != 2 {
  2194. return job.Errorf("Usage: %s CONTAINER RESOURCE\n", job.Name)
  2195. }
  2196. var (
  2197. name = job.Args[0]
  2198. resource = job.Args[1]
  2199. )
  2200. if container := srv.daemon.Get(name); container != nil {
  2201. data, err := container.Copy(resource)
  2202. if err != nil {
  2203. return job.Error(err)
  2204. }
  2205. defer data.Close()
  2206. if _, err := io.Copy(job.Stdout, data); err != nil {
  2207. return job.Error(err)
  2208. }
  2209. return engine.StatusOK
  2210. }
  2211. return job.Errorf("No such container: %s", name)
  2212. }
  2213. func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) {
  2214. daemon, err := daemon.NewDaemon(config, eng)
  2215. if err != nil {
  2216. return nil, err
  2217. }
  2218. srv := &Server{
  2219. Eng: eng,
  2220. daemon: daemon,
  2221. pullingPool: make(map[string]chan struct{}),
  2222. pushingPool: make(map[string]chan struct{}),
  2223. events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
  2224. eventPublisher: utils.NewJSONMessagePublisher(),
  2225. }
  2226. daemon.SetServer(srv)
  2227. return srv, nil
  2228. }
  2229. func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
  2230. now := time.Now().UTC().Unix()
  2231. jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
  2232. srv.AddEvent(jm)
  2233. srv.eventPublisher.Publish(jm)
  2234. return &jm
  2235. }
  2236. func (srv *Server) AddEvent(jm utils.JSONMessage) {
  2237. srv.Lock()
  2238. defer srv.Unlock()
  2239. srv.events = append(srv.events, jm)
  2240. }
  2241. func (srv *Server) GetEvents() []utils.JSONMessage {
  2242. srv.RLock()
  2243. defer srv.RUnlock()
  2244. return srv.events
  2245. }
  2246. func (srv *Server) SetRunning(status bool) {
  2247. srv.Lock()
  2248. defer srv.Unlock()
  2249. srv.running = status
  2250. }
  2251. func (srv *Server) IsRunning() bool {
  2252. srv.RLock()
  2253. defer srv.RUnlock()
  2254. return srv.running
  2255. }
  2256. func (srv *Server) Close() error {
  2257. if srv == nil {
  2258. return nil
  2259. }
  2260. srv.SetRunning(false)
  2261. done := make(chan struct{})
  2262. go func() {
  2263. srv.tasks.Wait()
  2264. close(done)
  2265. }()
  2266. select {
  2267. // Waiting server jobs for 15 seconds, shutdown immediately after that time
  2268. case <-time.After(time.Second * 15):
  2269. case <-done:
  2270. }
  2271. if srv.daemon == nil {
  2272. return nil
  2273. }
  2274. return srv.daemon.Close()
  2275. }
  2276. type Server struct {
  2277. sync.RWMutex
  2278. daemon *daemon.Daemon
  2279. pullingPool map[string]chan struct{}
  2280. pushingPool map[string]chan struct{}
  2281. events []utils.JSONMessage
  2282. eventPublisher *utils.JSONMessagePublisher
  2283. Eng *engine.Engine
  2284. running bool
  2285. tasks sync.WaitGroup
  2286. }