Ver Fonte

basic version of the /events endpoint

Victor Vieux há 12 anos atrás
pai
commit
b5da816487
6 ficheiros alterados com 104 adições e 31 exclusões
  1. 27 1
      api.go
  2. 8 7
      api_params.go
  3. 22 9
      commands.go
  4. 6 6
      runtime_test.go
  5. 26 8
      server.go
  6. 15 0
      utils/utils.go

+ 27 - 1
api.go

@@ -217,6 +217,31 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques
 	return nil
 	return nil
 }
 }
 
 
+func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	events := make(chan utils.JSONMessage)
+	srv.Lock()
+	srv.events[r.RemoteAddr] = events
+	srv.Unlock()
+	w.Header().Set("Content-Type", "application/json")
+	wf := utils.NewWriteFlusher(w)
+	for {
+		event := <-events
+		b, err := json.Marshal(event)
+		if err != nil {
+			continue
+		}
+		_, err = wf.Write(b)
+		if err != nil {
+			utils.Debugf("%s", err)
+			srv.Lock()
+			delete(srv.events, r.RemoteAddr)
+			srv.Unlock()
+			return err
+		}
+	}
+	return nil
+}
+
 func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if vars == nil {
 	if vars == nil {
 		return fmt.Errorf("Missing parameter")
 		return fmt.Errorf("Missing parameter")
@@ -855,8 +880,9 @@ func createRouter(srv *Server, logging bool) (*mux.Router, error) {
 	m := map[string]map[string]func(*Server, float64, http.ResponseWriter, *http.Request, map[string]string) error{
 	m := map[string]map[string]func(*Server, float64, http.ResponseWriter, *http.Request, map[string]string) error{
 		"GET": {
 		"GET": {
 			"/auth":                         getAuth,
 			"/auth":                         getAuth,
-			"/version":                      getVersion,
+			"/events":                       getEvents,
 			"/info":                         getInfo,
 			"/info":                         getInfo,
+			"/version":                      getVersion,
 			"/images/json":                  getImagesJSON,
 			"/images/json":                  getImagesJSON,
 			"/images/viz":                   getImagesViz,
 			"/images/viz":                   getImagesViz,
 			"/images/search":                getImagesSearch,
 			"/images/search":                getImagesSearch,

+ 8 - 7
api_params.go

@@ -17,13 +17,14 @@ type APIImages struct {
 }
 }
 
 
 type APIInfo struct {
 type APIInfo struct {
-	Debug       bool
-	Containers  int
-	Images      int
-	NFd         int  `json:",omitempty"`
-	NGoroutines int  `json:",omitempty"`
-	MemoryLimit bool `json:",omitempty"`
-	SwapLimit   bool `json:",omitempty"`
+	Debug           bool
+	Containers      int
+	Images          int
+	NFd             int  `json:",omitempty"`
+	NGoroutines     int  `json:",omitempty"`
+	MemoryLimit     bool `json:",omitempty"`
+	SwapLimit       bool `json:",omitempty"`
+	NEventsListener int  `json:",omitempty"`
 }
 }
 
 
 type APITop struct {
 type APITop struct {

+ 22 - 9
commands.go

@@ -78,6 +78,7 @@ func (cli *DockerCli) CmdHelp(args ...string) error {
 		{"build", "Build a container from a Dockerfile"},
 		{"build", "Build a container from a Dockerfile"},
 		{"commit", "Create a new image from a container's changes"},
 		{"commit", "Create a new image from a container's changes"},
 		{"diff", "Inspect changes on a container's filesystem"},
 		{"diff", "Inspect changes on a container's filesystem"},
+		{"events", "Get real time events from the server"},
 		{"export", "Stream the contents of a container as a tar archive"},
 		{"export", "Stream the contents of a container as a tar archive"},
 		{"history", "Show the history of an image"},
 		{"history", "Show the history of an image"},
 		{"images", "List images"},
 		{"images", "List images"},
@@ -466,6 +467,7 @@ func (cli *DockerCli) CmdInfo(args ...string) error {
 		fmt.Fprintf(cli.out, "Debug mode (client): %v\n", os.Getenv("DEBUG") != "")
 		fmt.Fprintf(cli.out, "Debug mode (client): %v\n", os.Getenv("DEBUG") != "")
 		fmt.Fprintf(cli.out, "Fds: %d\n", out.NFd)
 		fmt.Fprintf(cli.out, "Fds: %d\n", out.NFd)
 		fmt.Fprintf(cli.out, "Goroutines: %d\n", out.NGoroutines)
 		fmt.Fprintf(cli.out, "Goroutines: %d\n", out.NGoroutines)
+		fmt.Fprintf(cli.out, "EventsListeners: %d\n", out.NEventsListener)
 	}
 	}
 	if !out.MemoryLimit {
 	if !out.MemoryLimit {
 		fmt.Fprintf(cli.err, "WARNING: No memory limit support\n")
 		fmt.Fprintf(cli.err, "WARNING: No memory limit support\n")
@@ -1055,6 +1057,23 @@ func (cli *DockerCli) CmdCommit(args ...string) error {
 	return nil
 	return nil
 }
 }
 
 
+func (cli *DockerCli) CmdEvents(args ...string) error {
+	cmd := Subcmd("events", "", "Get real time events from the server")
+	if err := cmd.Parse(args); err != nil {
+		return nil
+	}
+
+	if cmd.NArg() != 0 {
+		cmd.Usage()
+		return nil
+	}
+
+	if err := cli.stream("GET", "/events", nil, cli.out); err != nil {
+		return err
+	}
+	return nil
+}
+
 func (cli *DockerCli) CmdExport(args ...string) error {
 func (cli *DockerCli) CmdExport(args ...string) error {
 	cmd := Subcmd("export", "CONTAINER", "Export the contents of a filesystem as a tar archive")
 	cmd := Subcmd("export", "CONTAINER", "Export the contents of a filesystem as a tar archive")
 	if err := cmd.Parse(args); err != nil {
 	if err := cmd.Parse(args); err != nil {
@@ -1509,19 +1528,13 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e
 	if resp.Header.Get("Content-Type") == "application/json" {
 	if resp.Header.Get("Content-Type") == "application/json" {
 		dec := json.NewDecoder(resp.Body)
 		dec := json.NewDecoder(resp.Body)
 		for {
 		for {
-			var m utils.JSONMessage
-			if err := dec.Decode(&m); err == io.EOF {
+			var jm utils.JSONMessage
+			if err := dec.Decode(&jm); err == io.EOF {
 				break
 				break
 			} else if err != nil {
 			} else if err != nil {
 				return err
 				return err
 			}
 			}
-			if m.Progress != "" {
-				fmt.Fprintf(out, "%s %s\r", m.Status, m.Progress)
-			} else if m.Error != "" {
-				return fmt.Errorf(m.Error)
-			} else {
-				fmt.Fprintf(out, "%s\n", m.Status)
-			}
+			jm.Display(out)
 		}
 		}
 	} else {
 	} else {
 		if _, err := io.Copy(out, resp.Body); err != nil {
 		if _, err := io.Copy(out, resp.Body); err != nil {

+ 6 - 6
runtime_test.go

@@ -17,12 +17,12 @@ import (
 )
 )
 
 
 const (
 const (
-	unitTestImageName	= "docker-test-image"
-	unitTestImageID		= "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0
-	unitTestNetworkBridge	= "testdockbr0"
-	unitTestStoreBase	= "/var/lib/docker/unit-tests"
-	testDaemonAddr		= "127.0.0.1:4270"
-	testDaemonProto		= "tcp"
+	unitTestImageName     = "docker-test-image"
+	unitTestImageID       = "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0
+	unitTestNetworkBridge = "testdockbr0"
+	unitTestStoreBase     = "/var/lib/docker/unit-tests"
+	testDaemonAddr        = "127.0.0.1:4270"
+	testDaemonProto       = "tcp"
 )
 )
 
 
 var globalRuntime *Runtime
 var globalRuntime *Runtime

+ 26 - 8
server.go

@@ -32,8 +32,9 @@ func (srv *Server) DockerVersion() APIVersion {
 func (srv *Server) ContainerKill(name string) error {
 func (srv *Server) ContainerKill(name string) error {
 	if container := srv.runtime.Get(name); container != nil {
 	if container := srv.runtime.Get(name); container != nil {
 		if err := container.Kill(); err != nil {
 		if err := container.Kill(); err != nil {
-			return fmt.Errorf("Error restarting container %s: %s", name, err)
+			return fmt.Errorf("Error killing container %s: %s", name, err)
 		}
 		}
+		srv.SendEvent("kill", name)
 	} else {
 	} else {
 		return fmt.Errorf("No such container: %s", name)
 		return fmt.Errorf("No such container: %s", name)
 	}
 	}
@@ -52,6 +53,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error {
 		if _, err := io.Copy(out, data); err != nil {
 		if _, err := io.Copy(out, data); err != nil {
 			return err
 			return err
 		}
 		}
+		srv.SendEvent("export", name)
 		return nil
 		return nil
 	}
 	}
 	return fmt.Errorf("No such container: %s", name)
 	return fmt.Errorf("No such container: %s", name)
@@ -209,13 +211,14 @@ func (srv *Server) DockerInfo() *APIInfo {
 		imgcount = len(images)
 		imgcount = len(images)
 	}
 	}
 	return &APIInfo{
 	return &APIInfo{
-		Containers:  len(srv.runtime.List()),
-		Images:      imgcount,
-		MemoryLimit: srv.runtime.capabilities.MemoryLimit,
-		SwapLimit:   srv.runtime.capabilities.SwapLimit,
-		Debug:       os.Getenv("DEBUG") != "",
-		NFd:         utils.GetTotalUsedFds(),
-		NGoroutines: runtime.NumGoroutine(),
+		Containers:      len(srv.runtime.List()),
+		Images:          imgcount,
+		MemoryLimit:     srv.runtime.capabilities.MemoryLimit,
+		SwapLimit:       srv.runtime.capabilities.SwapLimit,
+		Debug:           os.Getenv("DEBUG") != "",
+		NFd:             utils.GetTotalUsedFds(),
+		NGoroutines:     runtime.NumGoroutine(),
+		NEventsListener: len(srv.events),
 	}
 	}
 }
 }
 
 
@@ -810,6 +813,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) {
 		}
 		}
 		return "", err
 		return "", err
 	}
 	}
+	srv.SendEvent("create", container.ShortID())
 	return container.ShortID(), nil
 	return container.ShortID(), nil
 }
 }
 
 
@@ -818,6 +822,7 @@ func (srv *Server) ContainerRestart(name string, t int) error {
 		if err := container.Restart(t); err != nil {
 		if err := container.Restart(t); err != nil {
 			return fmt.Errorf("Error restarting container %s: %s", name, err)
 			return fmt.Errorf("Error restarting container %s: %s", name, err)
 		}
 		}
+		srv.SendEvent("restart", name)
 	} else {
 	} else {
 		return fmt.Errorf("No such container: %s", name)
 		return fmt.Errorf("No such container: %s", name)
 	}
 	}
@@ -837,6 +842,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error {
 		if err := srv.runtime.Destroy(container); err != nil {
 		if err := srv.runtime.Destroy(container); err != nil {
 			return fmt.Errorf("Error destroying container %s: %s", name, err)
 			return fmt.Errorf("Error destroying container %s: %s", name, err)
 		}
 		}
+		srv.SendEvent("destroy", name)
 
 
 		if removeVolume {
 		if removeVolume {
 			// Retrieve all volumes from all remaining containers
 			// Retrieve all volumes from all remaining containers
@@ -903,6 +909,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error {
 			return err
 			return err
 		}
 		}
 		*imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)})
 		*imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)})
+		srv.SendEvent("delete", utils.TruncateID(id))
 		return nil
 		return nil
 	}
 	}
 	return nil
 	return nil
@@ -946,6 +953,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro
 	}
 	}
 	if tagDeleted {
 	if tagDeleted {
 		imgs = append(imgs, APIRmi{Untagged: img.ShortID()})
 		imgs = append(imgs, APIRmi{Untagged: img.ShortID()})
+		srv.SendEvent("untagged", img.ShortID())
 	}
 	}
 	if len(srv.runtime.repositories.ByID()[img.ID]) == 0 {
 	if len(srv.runtime.repositories.ByID()[img.ID]) == 0 {
 		if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil {
 		if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil {
@@ -1018,6 +1026,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error {
 		if err := container.Start(hostConfig); err != nil {
 		if err := container.Start(hostConfig); err != nil {
 			return fmt.Errorf("Error starting container %s: %s", name, err)
 			return fmt.Errorf("Error starting container %s: %s", name, err)
 		}
 		}
+		srv.SendEvent("start", name)
 	} else {
 	} else {
 		return fmt.Errorf("No such container: %s", name)
 		return fmt.Errorf("No such container: %s", name)
 	}
 	}
@@ -1029,6 +1038,7 @@ func (srv *Server) ContainerStop(name string, t int) error {
 		if err := container.Stop(t); err != nil {
 		if err := container.Stop(t); err != nil {
 			return fmt.Errorf("Error stopping container %s: %s", name, err)
 			return fmt.Errorf("Error stopping container %s: %s", name, err)
 		}
 		}
+		srv.SendEvent("stop", name)
 	} else {
 	} else {
 		return fmt.Errorf("No such container: %s", name)
 		return fmt.Errorf("No such container: %s", name)
 	}
 	}
@@ -1162,15 +1172,23 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) (
 		enableCors:  enableCors,
 		enableCors:  enableCors,
 		pullingPool: make(map[string]struct{}),
 		pullingPool: make(map[string]struct{}),
 		pushingPool: make(map[string]struct{}),
 		pushingPool: make(map[string]struct{}),
+		events:      make(map[string]chan utils.JSONMessage),
 	}
 	}
 	runtime.srv = srv
 	runtime.srv = srv
 	return srv, nil
 	return srv, nil
 }
 }
 
 
+func (srv *Server) SendEvent(action, id string) {
+	for _, c := range srv.events {
+		c <- utils.JSONMessage{Status: action, ID: id}
+	}
+}
+
 type Server struct {
 type Server struct {
 	sync.Mutex
 	sync.Mutex
 	runtime     *Runtime
 	runtime     *Runtime
 	enableCors  bool
 	enableCors  bool
 	pullingPool map[string]struct{}
 	pullingPool map[string]struct{}
 	pushingPool map[string]struct{}
 	pushingPool map[string]struct{}
+	events      map[string]chan utils.JSONMessage
 }
 }

+ 15 - 0
utils/utils.go

@@ -611,8 +611,23 @@ type JSONMessage struct {
 	Status   string `json:"status,omitempty"`
 	Status   string `json:"status,omitempty"`
 	Progress string `json:"progress,omitempty"`
 	Progress string `json:"progress,omitempty"`
 	Error    string `json:"error,omitempty"`
 	Error    string `json:"error,omitempty"`
+	ID	 string `json:"id,omitempty"`
 }
 }
 
 
+func (jm *JSONMessage) Display(out io.Writer) (error) {
+	if jm.Progress != "" {
+		fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress)
+	} else if jm.Error != "" {
+		return fmt.Errorf(jm.Error)
+	} else if jm.ID != "" {
+		fmt.Fprintf(out, "%s: %s\n", jm.ID, jm.Status)
+	} else {
+		fmt.Fprintf(out, "%s\n", jm.Status)
+	}
+	return nil
+}
+
+
 type StreamFormatter struct {
 type StreamFormatter struct {
 	json bool
 	json bool
 	used bool
 	used bool