move events to job

Docker-DCO-1.1-Signed-off-by: Victor Vieux <victor.vieux@docker.com> (github: vieux)
This commit is contained in:
Victor Vieux 2014-01-23 11:12:17 -08:00
parent 59df776469
commit 5cc6312bfc
2 changed files with 65 additions and 51 deletions

56
api.go
View file

@ -236,61 +236,15 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques
}
func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) error {
b, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("JSON error")
}
_, err = wf.Write(b)
if err != nil {
// On error, evict the listener
utils.Errorf("%s", err)
srv.Lock()
delete(srv.listeners, r.RemoteAddr)
srv.Unlock()
return err
}
return nil
}
if err := parseForm(r); err != nil {
return err
}
listener := make(chan utils.JSONMessage)
srv.Lock()
srv.listeners[r.RemoteAddr] = listener
srv.Unlock()
since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0)
if err != nil {
since = 0
}
w.Header().Set("Content-Type", "application/json")
wf := utils.NewWriteFlusher(w)
wf.Flush()
if since != 0 {
// If since, send previous events that happened after the timestamp
for _, event := range srv.GetEvents() {
if event.Time >= since {
err := sendEvent(wf, &event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
return err
}
}
}
}
for event := range listener {
err := sendEvent(wf, &event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
return err
}
}
return nil
var job = srv.Eng.Job("events", r.RemoteAddr)
job.Stdout.Add(utils.NewWriteFlusher(w))
job.Setenv("since", r.Form.Get("since"))
return job.Run()
}
func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

View file

@ -101,6 +101,7 @@ func jobInitApi(job *engine.Job) engine.Status {
"import": srv.ImageImport,
"image_delete": srv.ImageDelete,
"inspect": srv.JobInspect,
"events": srv.Events,
} {
if err := job.Eng.Register(name, handler); err != nil {
job.Error(err)
@ -240,6 +241,65 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
}
return engine.StatusOK
}
func (srv *Server) Events(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
job.Errorf("Usage: %s FROM", job.Name)
return engine.StatusErr
}
var (
from = job.Args[0]
since = job.GetenvInt64("since")
)
sendEvent := func(event *utils.JSONMessage) error {
b, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("JSON error")
}
_, err = job.Stdout.Write(b)
if err != nil {
// On error, evict the listener
utils.Errorf("%s", err)
srv.Lock()
delete(srv.listeners, from)
srv.Unlock()
return err
}
return nil
}
listener := make(chan utils.JSONMessage)
srv.Lock()
srv.listeners[from] = listener
srv.Unlock()
job.Stdout.Write(nil) // flush
if since != 0 {
// If since, send previous events that happened after the timestamp
for _, event := range srv.GetEvents() {
if event.Time >= since {
err := sendEvent(&event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
job.Error(err)
return engine.StatusErr
}
}
}
}
for event := range listener {
err := sendEvent(&event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
job.Error(err)
return engine.StatusErr
}
}
return engine.StatusOK
}
func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
if len(job.Args) != 1 {