Merge pull request #1180 from dotcloud/1167_events_endpoint-feature

*Api: Add the /events endpoint
*Client: Add the docker events command
This commit is contained in:
Victor Vieux 2013-07-24 04:53:36 -07:00
commit ebe17f57ff
11 changed files with 266 additions and 27 deletions

61
api.go
View file

@ -217,6 +217,64 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques
return nil
}
func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (error) {
b, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("JSON error")
}
_, err = wf.Write(b)
if err != nil {
// On error, evict the listener
utils.Debugf("%s", err)
srv.Lock()
delete(srv.listeners, r.RemoteAddr)
srv.Unlock()
return err
}
return nil
}
if err := parseForm(r); err != nil {
return err
}
listener := make(chan utils.JSONMessage)
srv.Lock()
srv.listeners[r.RemoteAddr] = listener
srv.Unlock()
since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0)
if err != nil {
since = 0
}
w.Header().Set("Content-Type", "application/json")
wf := utils.NewWriteFlusher(w)
if since != 0 {
// If since, send previous events that happened after the timestamp
for _, event := range srv.events {
if event.Time >= since {
err := sendEvent(wf, &event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
return err
}
}
}
}
for {
event := <-listener
err := sendEvent(wf, &event)
if err != nil && err.Error() == "JSON error" {
continue
}
if err != nil {
return err
}
}
return nil
}
func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
@ -855,8 +913,9 @@ func createRouter(srv *Server, logging bool) (*mux.Router, error) {
m := map[string]map[string]func(*Server, float64, http.ResponseWriter, *http.Request, map[string]string) error{
"GET": {
"/auth": getAuth,
"/version": getVersion,
"/events": getEvents,
"/info": getInfo,
"/version": getVersion,
"/images/json": getImagesJSON,
"/images/viz": getImagesViz,
"/images/search": getImagesSearch,

View file

@ -17,13 +17,14 @@ type APIImages struct {
}
type APIInfo struct {
Debug bool
Containers int
Images int
NFd int `json:",omitempty"`
NGoroutines int `json:",omitempty"`
MemoryLimit bool `json:",omitempty"`
SwapLimit bool `json:",omitempty"`
Debug bool
Containers int
Images int
NFd int `json:",omitempty"`
NGoroutines int `json:",omitempty"`
MemoryLimit bool `json:",omitempty"`
SwapLimit bool `json:",omitempty"`
NEventsListener int `json:",omitempty"`
}
type APITop struct {

View file

@ -89,6 +89,44 @@ func TestGetInfo(t *testing.T) {
}
}
func TestGetEvents(t *testing.T) {
runtime := mkRuntime(t)
srv := &Server{
runtime: runtime,
events: make([]utils.JSONMessage, 0, 64),
listeners: make(map[string]chan utils.JSONMessage),
}
srv.LogEvent("fakeaction", "fakeid")
srv.LogEvent("fakeaction2", "fakeid")
req, err := http.NewRequest("GET", "/events?since=1", nil)
if err != nil {
t.Fatal(err)
}
r := httptest.NewRecorder()
setTimeout(t, "", 500*time.Millisecond, func() {
if err := getEvents(srv, APIVERSION, r, req, nil); err != nil {
t.Fatal(err)
}
})
dec := json.NewDecoder(r.Body)
for i := 0; i < 2; i++ {
var jm utils.JSONMessage
if err := dec.Decode(&jm); err == io.EOF {
break
} else if err != nil {
t.Fatal(err)
}
if jm != srv.events[i] {
t.Fatalf("Event received it different than expected")
}
}
}
func TestGetImagesJSON(t *testing.T) {
runtime := mkRuntime(t)
defer nuke(runtime)

View file

@ -78,6 +78,7 @@ func (cli *DockerCli) CmdHelp(args ...string) error {
{"build", "Build a container from a Dockerfile"},
{"commit", "Create a new image from a container's changes"},
{"diff", "Inspect changes on a container's filesystem"},
{"events", "Get real time events from the server"},
{"export", "Stream the contents of a container as a tar archive"},
{"history", "Show the history of an image"},
{"images", "List images"},
@ -466,6 +467,7 @@ func (cli *DockerCli) CmdInfo(args ...string) error {
fmt.Fprintf(cli.out, "Debug mode (client): %v\n", os.Getenv("DEBUG") != "")
fmt.Fprintf(cli.out, "Fds: %d\n", out.NFd)
fmt.Fprintf(cli.out, "Goroutines: %d\n", out.NGoroutines)
fmt.Fprintf(cli.out, "EventsListeners: %d\n", out.NEventsListener)
}
if !out.MemoryLimit {
fmt.Fprintf(cli.err, "WARNING: No memory limit support\n")
@ -1055,6 +1057,29 @@ func (cli *DockerCli) CmdCommit(args ...string) error {
return nil
}
func (cli *DockerCli) CmdEvents(args ...string) error {
cmd := Subcmd("events", "[OPTIONS]", "Get real time events from the server")
since := cmd.String("since", "", "Show events previously created (used for polling).")
if err := cmd.Parse(args); err != nil {
return nil
}
if cmd.NArg() != 0 {
cmd.Usage()
return nil
}
v := url.Values{}
if *since != "" {
v.Set("since", *since)
}
if err := cli.stream("GET", "/events?"+v.Encode(), nil, cli.out); err != nil {
return err
}
return nil
}
func (cli *DockerCli) CmdExport(args ...string) error {
cmd := Subcmd("export", "CONTAINER", "Export the contents of a filesystem as a tar archive")
if err := cmd.Parse(args); err != nil {
@ -1509,19 +1534,13 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e
if resp.Header.Get("Content-Type") == "application/json" {
dec := json.NewDecoder(resp.Body)
for {
var m utils.JSONMessage
if err := dec.Decode(&m); err == io.EOF {
var jm utils.JSONMessage
if err := dec.Decode(&jm); err == io.EOF {
break
} else if err != nil {
return err
}
if m.Progress != "" {
fmt.Fprintf(out, "%s %s\r", m.Status, m.Progress)
} else if m.Error != "" {
return fmt.Errorf(m.Error)
} else {
fmt.Fprintf(out, "%s\n", m.Status)
}
jm.Display(out)
}
} else {
if _, err := io.Copy(out, resp.Body); err != nil {

View file

@ -38,7 +38,7 @@ func setTimeout(t *testing.T, msg string, d time.Duration, f func()) {
f()
c <- false
}()
if <-c {
if <-c && msg != "" {
t.Fatal(msg)
}
}

View file

@ -805,7 +805,9 @@ func (container *Container) monitor() {
}
}
utils.Debugf("Process finished")
if container.runtime != nil && container.runtime.srv != nil {
container.runtime.srv.LogEvent("die", container.ShortID())
}
exitCode := -1
if container.cmd != nil {
exitCode = container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()

View file

@ -44,6 +44,10 @@ What's new
**New!** List the processes running inside a container.
.. http:get:: /events:
**New!** Monitor docker's events via streaming or via polling
Builder (/build):
- Simplify the upload of the build context

View file

@ -1059,6 +1059,36 @@ Create a new image from a container's changes
:statuscode 500: server error
Monitor Docker's events
***********************
.. http:get:: /events
Get events from docker, either in real time via streaming, or via polling (using `since`)
**Example request**:
.. sourcecode:: http
POST /events?since=1374067924
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{"status":"create","id":"dfdf82bd3881","time":1374067924}
{"status":"start","id":"dfdf82bd3881","time":1374067924}
{"status":"stop","id":"dfdf82bd3881","time":1374067966}
{"status":"destroy","id":"dfdf82bd3881","time":1374067970}
:query since: timestamp used for polling
:statuscode 200: no error
:statuscode 500: server error
3. Going further
================

View file

@ -19,6 +19,7 @@ import (
"runtime"
"strings"
"sync"
"time"
)
func (srv *Server) DockerVersion() APIVersion {
@ -32,8 +33,9 @@ func (srv *Server) DockerVersion() APIVersion {
func (srv *Server) ContainerKill(name string) error {
if container := srv.runtime.Get(name); container != nil {
if err := container.Kill(); err != nil {
return fmt.Errorf("Error restarting container %s: %s", name, err)
return fmt.Errorf("Error killing container %s: %s", name, err)
}
srv.LogEvent("kill", name)
} else {
return fmt.Errorf("No such container: %s", name)
}
@ -52,6 +54,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error {
if _, err := io.Copy(out, data); err != nil {
return err
}
srv.LogEvent("export", name)
return nil
}
return fmt.Errorf("No such container: %s", name)
@ -209,13 +212,14 @@ func (srv *Server) DockerInfo() *APIInfo {
imgcount = len(images)
}
return &APIInfo{
Containers: len(srv.runtime.List()),
Images: imgcount,
MemoryLimit: srv.runtime.capabilities.MemoryLimit,
SwapLimit: srv.runtime.capabilities.SwapLimit,
Debug: os.Getenv("DEBUG") != "",
NFd: utils.GetTotalUsedFds(),
NGoroutines: runtime.NumGoroutine(),
Containers: len(srv.runtime.List()),
Images: imgcount,
MemoryLimit: srv.runtime.capabilities.MemoryLimit,
SwapLimit: srv.runtime.capabilities.SwapLimit,
Debug: os.Getenv("DEBUG") != "",
NFd: utils.GetTotalUsedFds(),
NGoroutines: runtime.NumGoroutine(),
NEventsListener: len(srv.events),
}
}
@ -810,6 +814,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) {
}
return "", err
}
srv.LogEvent("create", container.ShortID())
return container.ShortID(), nil
}
@ -818,6 +823,7 @@ func (srv *Server) ContainerRestart(name string, t int) error {
if err := container.Restart(t); err != nil {
return fmt.Errorf("Error restarting container %s: %s", name, err)
}
srv.LogEvent("restart", name)
} else {
return fmt.Errorf("No such container: %s", name)
}
@ -837,6 +843,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error {
if err := srv.runtime.Destroy(container); err != nil {
return fmt.Errorf("Error destroying container %s: %s", name, err)
}
srv.LogEvent("destroy", name)
if removeVolume {
// Retrieve all volumes from all remaining containers
@ -903,6 +910,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error {
return err
}
*imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)})
srv.LogEvent("delete", utils.TruncateID(id))
return nil
}
return nil
@ -946,6 +954,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro
}
if tagDeleted {
imgs = append(imgs, APIRmi{Untagged: img.ShortID()})
srv.LogEvent("untag", img.ShortID())
}
if len(srv.runtime.repositories.ByID()[img.ID]) == 0 {
if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil {
@ -1018,6 +1027,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error {
if err := container.Start(hostConfig); err != nil {
return fmt.Errorf("Error starting container %s: %s", name, err)
}
srv.LogEvent("start", name)
} else {
return fmt.Errorf("No such container: %s", name)
}
@ -1029,6 +1039,7 @@ func (srv *Server) ContainerStop(name string, t int) error {
if err := container.Stop(t); err != nil {
return fmt.Errorf("Error stopping container %s: %s", name, err)
}
srv.LogEvent("stop", name)
} else {
return fmt.Errorf("No such container: %s", name)
}
@ -1162,15 +1173,31 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) (
enableCors: enableCors,
pullingPool: make(map[string]struct{}),
pushingPool: make(map[string]struct{}),
events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
listeners: make(map[string]chan utils.JSONMessage),
}
runtime.srv = srv
return srv, nil
}
func (srv *Server) LogEvent(action, id string) {
now := time.Now().Unix()
jm := utils.JSONMessage{Status: action, ID: id, Time: now}
srv.events = append(srv.events, jm)
for _, c := range srv.listeners {
select { // non blocking channel
case c <- jm:
default:
}
}
}
type Server struct {
sync.Mutex
runtime *Runtime
enableCors bool
pullingPool map[string]struct{}
pushingPool map[string]struct{}
events []utils.JSONMessage
listeners map[string]chan utils.JSONMessage
}

View file

@ -1,7 +1,9 @@
package docker
import (
"github.com/dotcloud/docker/utils"
"testing"
"time"
)
func TestContainerTagImageDelete(t *testing.T) {
@ -163,3 +165,41 @@ func TestRunWithTooLowMemoryLimit(t *testing.T) {
}
}
func TestLogEvent(t *testing.T) {
runtime := mkRuntime(t)
srv := &Server{
runtime: runtime,
events: make([]utils.JSONMessage, 0, 64),
listeners: make(map[string]chan utils.JSONMessage),
}
srv.LogEvent("fakeaction", "fakeid")
listener := make(chan utils.JSONMessage)
srv.Lock()
srv.listeners["test"] = listener
srv.Unlock()
srv.LogEvent("fakeaction2", "fakeid")
if len(srv.events) != 2 {
t.Fatalf("Expected 2 events, found %d", len(srv.events))
}
go func() {
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction3", "fakeid")
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction4", "fakeid")
}()
setTimeout(t, "Listening for events timed out", 2*time.Second, func() {
for i := 2; i < 4; i++ {
event := <-listener
if event != srv.events[i] {
t.Fatalf("Event received it different than expected")
}
}
})
}

View file

@ -611,8 +611,27 @@ type JSONMessage struct {
Status string `json:"status,omitempty"`
Progress string `json:"progress,omitempty"`
Error string `json:"error,omitempty"`
ID string `json:"id,omitempty"`
Time int64 `json:"time,omitempty"`
}
func (jm *JSONMessage) Display(out io.Writer) (error) {
if jm.Time != 0 {
fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
}
if jm.Progress != "" {
fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress)
} else if jm.Error != "" {
return fmt.Errorf(jm.Error)
} else if jm.ID != "" {
fmt.Fprintf(out, "%s: %s\n", jm.ID, jm.Status)
} else {
fmt.Fprintf(out, "%s\n", jm.Status)
}
return nil
}
type StreamFormatter struct {
json bool
used bool