Explorar o código

Merge pull request #3739 from vieux/events_jobs

move events to job
Victor Vieux %!s(int64=11) %!d(string=hai) anos
pai
achega
4a708f9f5b
Modificáronse 2 ficheiros con 65 adicións e 51 borrados
  1. 5 51
      api.go
  2. 60 0
      server.go

+ 5 - 51
api.go

@@ -236,61 +236,15 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques
 }
 
 func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
-	sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) error {
-		b, err := json.Marshal(event)
-		if err != nil {
-			return fmt.Errorf("JSON error")
-		}
-		_, err = wf.Write(b)
-		if err != nil {
-			// On error, evict the listener
-			utils.Errorf("%s", err)
-			srv.Lock()
-			delete(srv.listeners, r.RemoteAddr)
-			srv.Unlock()
-			return err
-		}
-		return nil
-	}
-
 	if err := parseForm(r); err != nil {
 		return err
 	}
-	listener := make(chan utils.JSONMessage)
-	srv.Lock()
-	srv.listeners[r.RemoteAddr] = listener
-	srv.Unlock()
-	since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0)
-	if err != nil {
-		since = 0
-	}
+
 	w.Header().Set("Content-Type", "application/json")
-	wf := utils.NewWriteFlusher(w)
-	wf.Flush()
-	if since != 0 {
-		// If since, send previous events that happened after the timestamp
-		for _, event := range srv.GetEvents() {
-			if event.Time >= since {
-				err := sendEvent(wf, &event)
-				if err != nil && err.Error() == "JSON error" {
-					continue
-				}
-				if err != nil {
-					return err
-				}
-			}
-		}
-	}
-	for event := range listener {
-		err := sendEvent(wf, &event)
-		if err != nil && err.Error() == "JSON error" {
-			continue
-		}
-		if err != nil {
-			return err
-		}
-	}
-	return nil
+	var job = srv.Eng.Job("events", r.RemoteAddr)
+	job.Stdout.Add(utils.NewWriteFlusher(w))
+	job.Setenv("since", r.Form.Get("since"))
+	return job.Run()
 }
 
 func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

+ 60 - 0
server.go

@@ -101,6 +101,7 @@ func jobInitApi(job *engine.Job) engine.Status {
 		"import":           srv.ImageImport,
 		"image_delete":     srv.ImageDelete,
 		"inspect":          srv.JobInspect,
+		"events":           srv.Events,
 	} {
 		if err := job.Eng.Register(name, handler); err != nil {
 			job.Error(err)
@@ -240,6 +241,65 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
 	}
 	return engine.StatusOK
 }
+func (srv *Server) Events(job *engine.Job) engine.Status {
+	if len(job.Args) != 1 {
+		job.Errorf("Usage: %s FROM", job.Name)
+		return engine.StatusErr
+	}
+
+	var (
+		from  = job.Args[0]
+		since = job.GetenvInt64("since")
+	)
+	sendEvent := func(event *utils.JSONMessage) error {
+		b, err := json.Marshal(event)
+		if err != nil {
+			return fmt.Errorf("JSON error")
+		}
+		_, err = job.Stdout.Write(b)
+		if err != nil {
+			// On error, evict the listener
+			utils.Errorf("%s", err)
+			srv.Lock()
+			delete(srv.listeners, from)
+			srv.Unlock()
+			return err
+		}
+		return nil
+	}
+
+	listener := make(chan utils.JSONMessage)
+	srv.Lock()
+	srv.listeners[from] = listener
+	srv.Unlock()
+	job.Stdout.Write(nil) // flush
+	if since != 0 {
+		// If since, send previous events that happened after the timestamp
+		for _, event := range srv.GetEvents() {
+			if event.Time >= since {
+				err := sendEvent(&event)
+				if err != nil && err.Error() == "JSON error" {
+					continue
+				}
+				if err != nil {
+					job.Error(err)
+					return engine.StatusErr
+				}
+			}
+		}
+	}
+	for event := range listener {
+		err := sendEvent(&event)
+		if err != nil && err.Error() == "JSON error" {
+			continue
+		}
+		if err != nil {
+			job.Error(err)
+			return engine.StatusErr
+		}
+	}
+	return engine.StatusOK
+}
 
 func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
 	if len(job.Args) != 1 {