Bläddra i källkod

Merge pull request #5707 from vieux/fix_event_removal

fix event removal
Michael Crosby 11 år sedan
förälder
incheckning
5b525feaed
1 ändrade filer med 15 tillägg och 11 borttagningar
  1. 15 11
      server/server.go

+ 15 - 11
server/server.go

@@ -198,6 +198,15 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
 	return engine.StatusOK
 }
 
+func (srv *Server) EvictListener(from string) {
+	srv.Lock()
+	if old, ok := srv.listeners[from]; ok {
+		delete(srv.listeners, from)
+		close(old)
+	}
+	srv.Unlock()
+}
+
 func (srv *Server) Events(job *engine.Job) engine.Status {
 	if len(job.Args) != 1 {
 		return job.Errorf("Usage: %s FROM", job.Name)
@@ -215,15 +224,7 @@ func (srv *Server) Events(job *engine.Job) engine.Status {
 			return fmt.Errorf("JSON error")
 		}
 		_, err = job.Stdout.Write(b)
-		if err != nil {
-			// On error, evict the listener
-			utils.Errorf("%s", err)
-			srv.Lock()
-			delete(srv.listeners, from)
-			srv.Unlock()
-			return err
-		}
-		return nil
+		return err
 	}
 
 	listener := make(chan utils.JSONMessage)
@@ -244,8 +245,9 @@ func (srv *Server) Events(job *engine.Job) engine.Status {
 					continue
 				}
 				if err != nil {
-					job.Error(err)
-					return engine.StatusErr
+					// On error, evict the listener
+					srv.EvictListener(from)
+					return job.Error(err)
 				}
 			}
 		}
@@ -263,6 +265,8 @@ func (srv *Server) Events(job *engine.Job) engine.Status {
 				continue
 			}
 			if err != nil {
+				// On error, evict the listener
+				srv.EvictListener(from)
 				return job.Error(err)
 			}
 		case <-timeout.C: