diff --git a/builtins/builtins.go b/builtins/builtins.go index 9379c67c88..75f9d4ce34 100644 --- a/builtins/builtins.go +++ b/builtins/builtins.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" + "github.com/docker/docker/events" "github.com/docker/docker/pkg/parsers/kernel" "github.com/docker/docker/registry" "github.com/docker/docker/server" @@ -20,6 +21,9 @@ func Register(eng *engine.Engine) error { if err := remote(eng); err != nil { return err } + if err := events.New().Install(eng); err != nil { + return err + } if err := eng.Register("version", dockerVersion); err != nil { return err } diff --git a/daemon/container.go b/daemon/container.go index d62eb6d2fa..9517e28469 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -168,6 +168,13 @@ func (container *Container) WriteHostConfig() error { return ioutil.WriteFile(pth, data, 0666) } +func (container *Container) LogEvent(action string) { + d := container.daemon + if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.Image)).Run(); err != nil { + utils.Errorf("Error running container: %s", err) + } +} + func (container *Container) getResourcePath(path string) (string, error) { cleanPath := filepath.Join("/", path) return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs) @@ -508,7 +515,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error { container.stdin, container.stdinPipe = io.Pipe() } if container.daemon != nil && container.daemon.srv != nil { - container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) + container.LogEvent("die") } if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { // FIXME: here is race condition between two RUN instructions in Dockerfile diff --git a/daemon/create.go b/daemon/create.go index 8d008f8ade..c3aa9ee58e 100644 --- a/daemon/create.go +++ b/daemon/create.go @@ -40,7 +40,7 @@ func (daemon *Daemon) ContainerCreate(job *engine.Job) engine.Status { if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled { job.Errorf("IPv4 forwarding is disabled.\n") } - job.Eng.Job("log", "create", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("create") // FIXME: this is necessary because daemon.Create might return a nil container // with a non-nil error. This should not happen! Once it's fixed we // can remove this workaround. diff --git a/daemon/daemon.go b/daemon/daemon.go index 0d275492dc..41ccf721a6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -107,6 +107,7 @@ type Daemon struct { func (daemon *Daemon) Install(eng *engine.Engine) error { // FIXME: rename "delete" to "rm" for consistency with the CLI command // FIXME: rename ContainerDestroy to ContainerRm for consistency with the CLI command + // FIXME: remove ImageDelete's dependency on Daemon, then move to graph/ for name, method := range map[string]engine.Handler{ "attach": daemon.ContainerAttach, "commit": daemon.ContainerCommit, @@ -127,6 +128,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error { "top": daemon.ContainerTop, "unpause": daemon.ContainerUnpause, "wait": daemon.ContainerWait, + "image_delete": daemon.ImageDelete, // FIXME: see above } { if err := eng.Register(name, method); err != nil { return err diff --git a/daemon/delete.go b/daemon/delete.go index 9c92be3fb1..ad4df69860 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -70,7 +70,7 @@ func (daemon *Daemon) ContainerDestroy(job *engine.Job) engine.Status { if err := daemon.Destroy(container); err != nil { return job.Errorf("Cannot destroy container %s: %s", name, err) } - job.Eng.Job("log", "destroy", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("destroy") if removeVolume { var ( diff --git a/daemon/export.go b/daemon/export.go index 204e862d5c..bc0f14a3bb 100644 --- a/daemon/export.go +++ b/daemon/export.go @@ -23,7 +23,7 @@ func (daemon *Daemon) ContainerExport(job *engine.Job) engine.Status { return job.Errorf("%s: %s", name, err) } // FIXME: factor job-specific LogEvent to engine.Job.Run() - job.Eng.Job("log", "export", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("export") return engine.StatusOK } return job.Errorf("No such container: %s", name) diff --git a/daemon/image_delete.go b/daemon/image_delete.go new file mode 100644 index 0000000000..77e8f85907 --- /dev/null +++ b/daemon/image_delete.go @@ -0,0 +1,156 @@ +package daemon + +import ( + "fmt" + "strings" + + "github.com/docker/docker/engine" + "github.com/docker/docker/graph" + "github.com/docker/docker/image" + "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/utils" +) + +func (daemon *Daemon) ImageDelete(job *engine.Job) engine.Status { + if n := len(job.Args); n != 1 { + return job.Errorf("Usage: %s IMAGE", job.Name) + } + imgs := engine.NewTable("", 0) + if err := daemon.DeleteImage(job.Eng, job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil { + return job.Error(err) + } + if len(imgs.Data) == 0 { + return job.Errorf("Conflict, %s wasn't deleted", job.Args[0]) + } + if _, err := imgs.WriteListTo(job.Stdout); err != nil { + return job.Error(err) + } + return engine.StatusOK +} + +// FIXME: make this private and use the job instead +func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.Table, first, force, noprune bool) error { + var ( + repoName, tag string + tags = []string{} + tagDeleted bool + ) + + // FIXME: please respect DRY and centralize repo+tag parsing in a single central place! -- shykes + repoName, tag = parsers.ParseRepositoryTag(name) + if tag == "" { + tag = graph.DEFAULTTAG + } + + img, err := daemon.Repositories().LookupImage(name) + if err != nil { + if r, _ := daemon.Repositories().Get(repoName); r != nil { + return fmt.Errorf("No such image: %s:%s", repoName, tag) + } + return fmt.Errorf("No such image: %s", name) + } + + if strings.Contains(img.ID, name) { + repoName = "" + tag = "" + } + + byParents, err := daemon.Graph().ByParent() + if err != nil { + return err + } + + //If delete by id, see if the id belong only to one repository + if repoName == "" { + for _, repoAndTag := range daemon.Repositories().ByID()[img.ID] { + parsedRepo, parsedTag := parsers.ParseRepositoryTag(repoAndTag) + if repoName == "" || repoName == parsedRepo { + repoName = parsedRepo + if parsedTag != "" { + tags = append(tags, parsedTag) + } + } else if repoName != parsedRepo && !force { + // the id belongs to multiple repos, like base:latest and user:test, + // in that case return conflict + return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name) + } + } + } else { + tags = append(tags, tag) + } + + if !first && len(tags) > 0 { + return nil + } + + //Untag the current image + for _, tag := range tags { + tagDeleted, err = daemon.Repositories().Delete(repoName, tag) + if err != nil { + return err + } + if tagDeleted { + out := &engine.Env{} + out.Set("Untagged", repoName+":"+tag) + imgs.Add(out) + eng.Job("log", "untag", img.ID, "").Run() + } + } + tags = daemon.Repositories().ByID()[img.ID] + if (len(tags) <= 1 && repoName == "") || len(tags) == 0 { + if len(byParents[img.ID]) == 0 { + if err := daemon.canDeleteImage(img.ID, force, tagDeleted); err != nil { + return err + } + if err := daemon.Repositories().DeleteAll(img.ID); err != nil { + return err + } + if err := daemon.Graph().Delete(img.ID); err != nil { + return err + } + out := &engine.Env{} + out.Set("Deleted", img.ID) + imgs.Add(out) + eng.Job("log", "delete", img.ID, "").Run() + if img.Parent != "" && !noprune { + err := daemon.DeleteImage(eng, img.Parent, imgs, false, force, noprune) + if first { + return err + } + + } + + } + } + return nil +} + +func (daemon *Daemon) canDeleteImage(imgID string, force, untagged bool) error { + var message string + if untagged { + message = " (docker untagged the image)" + } + for _, container := range daemon.List() { + parent, err := daemon.Repositories().LookupImage(container.Image) + if err != nil { + return err + } + + if err := parent.WalkHistory(func(p *image.Image) error { + if imgID == p.ID { + if container.State.IsRunning() { + if force { + return fmt.Errorf("Conflict, cannot force delete %s because the running container %s is using it%s, stop it and retry", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) + } + return fmt.Errorf("Conflict, cannot delete %s because the running container %s is using it%s, stop it and use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) + } else if !force { + return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it%s, use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) + } + } + return nil + }); err != nil { + return err + } + } + return nil +} diff --git a/daemon/kill.go b/daemon/kill.go index f883495cef..f5f5897c88 100644 --- a/daemon/kill.go +++ b/daemon/kill.go @@ -44,7 +44,7 @@ func (daemon *Daemon) ContainerKill(job *engine.Job) engine.Status { if err := container.Kill(); err != nil { return job.Errorf("Cannot kill container %s: %s", name, err) } - job.Eng.Job("log", "kill", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("kill") } else { // Otherwise, just send the requested signal if err := container.KillSig(int(sig)); err != nil { diff --git a/daemon/pause.go b/daemon/pause.go index 72e5cee020..0e4323d9a8 100644 --- a/daemon/pause.go +++ b/daemon/pause.go @@ -16,7 +16,7 @@ func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status { if err := container.Pause(); err != nil { return job.Errorf("Cannot pause container %s: %s", name, err) } - job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("pause") return engine.StatusOK } @@ -32,6 +32,6 @@ func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status { if err := container.Unpause(); err != nil { return job.Errorf("Cannot unpause container %s: %s", name, err) } - job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("unpause") return engine.StatusOK } diff --git a/daemon/restart.go b/daemon/restart.go index c0ae949a88..bcc057156d 100644 --- a/daemon/restart.go +++ b/daemon/restart.go @@ -19,7 +19,7 @@ func (daemon *Daemon) ContainerRestart(job *engine.Job) engine.Status { if err := container.Restart(int(t)); err != nil { return job.Errorf("Cannot restart container %s: %s\n", name, err) } - job.Eng.Job("log", "restart", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("restart") } else { return job.Errorf("No such container: %s\n", name) } diff --git a/daemon/server.go b/daemon/server.go index 1e06eda896..12fb0f57c8 100644 --- a/daemon/server.go +++ b/daemon/server.go @@ -1,10 +1,5 @@ package daemon -import ( - "github.com/docker/docker/utils" -) - type Server interface { - LogEvent(action, id, from string) *utils.JSONMessage IsRunning() bool // returns true if the server is currently in operation } diff --git a/daemon/start.go b/daemon/start.go index 0e64a4e916..cb6e9cb21f 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -36,8 +36,7 @@ func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status { if err := container.Start(); err != nil { return job.Errorf("Cannot start container %s: %s", name, err) } - job.Eng.Job("log", "start", container.ID, daemon.Repositories().ImageName(container.Image)).Run() - + container.LogEvent("start") return engine.StatusOK } diff --git a/daemon/stop.go b/daemon/stop.go index 5ce2e1726e..f1851291fb 100644 --- a/daemon/stop.go +++ b/daemon/stop.go @@ -22,7 +22,7 @@ func (daemon *Daemon) ContainerStop(job *engine.Job) engine.Status { if err := container.Stop(int(t)); err != nil { return job.Errorf("Cannot stop container %s: %s\n", name, err) } - job.Eng.Job("log", "stop", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("stop") } else { return job.Errorf("No such container: %s\n", name) } diff --git a/events/events.go b/events/events.go new file mode 100644 index 0000000000..57a82cada0 --- /dev/null +++ b/events/events.go @@ -0,0 +1,176 @@ +package events + +import ( + "encoding/json" + "sync" + "time" + + "github.com/docker/docker/engine" + "github.com/docker/docker/utils" +) + +const eventsLimit = 64 + +type listener chan<- *utils.JSONMessage + +type Events struct { + mu sync.RWMutex + events []*utils.JSONMessage + subscribers []listener +} + +func New() *Events { + return &Events{ + events: make([]*utils.JSONMessage, 0, eventsLimit), + } +} + +// Install installs events public api in docker engine +func (e *Events) Install(eng *engine.Engine) error { + // Here you should describe public interface + jobs := map[string]engine.Handler{ + "events": e.Get, + "log": e.Log, + "subscribers_count": e.SubscribersCount, + } + for name, job := range jobs { + if err := eng.Register(name, job); err != nil { + return err + } + } + return nil +} + +func (e *Events) Get(job *engine.Job) engine.Status { + var ( + since = job.GetenvInt64("since") + until = job.GetenvInt64("until") + timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) + ) + + // If no until, disable timeout + if until == 0 { + timeout.Stop() + } + + listener := make(chan *utils.JSONMessage) + e.subscribe(listener) + defer e.unsubscribe(listener) + + job.Stdout.Write(nil) + + // Resend every event in the [since, until] time interval. + if since != 0 { + if err := e.writeCurrent(job, since, until); err != nil { + return job.Error(err) + } + } + + for { + select { + case event, ok := <-listener: + if !ok { + return engine.StatusOK + } + if err := writeEvent(job, event); err != nil { + return job.Error(err) + } + case <-timeout.C: + return engine.StatusOK + } + } +} + +func (e *Events) Log(job *engine.Job) engine.Status { + if len(job.Args) != 3 { + return job.Errorf("usage: %s ACTION ID FROM", job.Name) + } + // not waiting for receivers + go e.log(job.Args[0], job.Args[1], job.Args[2]) + return engine.StatusOK +} + +func (e *Events) SubscribersCount(job *engine.Job) engine.Status { + ret := &engine.Env{} + ret.SetInt("count", e.subscribersCount()) + ret.WriteTo(job.Stdout) + return engine.StatusOK +} + +func writeEvent(job *engine.Job, event *utils.JSONMessage) error { + // When sending an event JSON serialization errors are ignored, but all + // other errors lead to the eviction of the listener. + if b, err := json.Marshal(event); err == nil { + if _, err = job.Stdout.Write(b); err != nil { + return err + } + } + return nil +} + +func (e *Events) writeCurrent(job *engine.Job, since, until int64) error { + e.mu.RLock() + for _, event := range e.events { + if event.Time >= since && (event.Time <= until || until == 0) { + if err := writeEvent(job, event); err != nil { + e.mu.RUnlock() + return err + } + } + } + e.mu.RUnlock() + return nil +} + +func (e *Events) subscribersCount() int { + e.mu.RLock() + c := len(e.subscribers) + e.mu.RUnlock() + return c +} + +func (e *Events) log(action, id, from string) { + e.mu.Lock() + now := time.Now().UTC().Unix() + jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now} + if len(e.events) == cap(e.events) { + // discard oldest event + copy(e.events, e.events[1:]) + e.events[len(e.events)-1] = jm + } else { + e.events = append(e.events, jm) + } + for _, s := range e.subscribers { + // We give each subscriber a 100ms time window to receive the event, + // after which we move to the next. + select { + case s <- jm: + case <-time.After(100 * time.Millisecond): + } + } + e.mu.Unlock() +} + +func (e *Events) subscribe(l listener) { + e.mu.Lock() + e.subscribers = append(e.subscribers, l) + e.mu.Unlock() +} + +// unsubscribe closes and removes the specified listener from the list of +// previously registed ones. +// It returns a boolean value indicating if the listener was successfully +// found, closed and unregistered. +func (e *Events) unsubscribe(l listener) bool { + e.mu.Lock() + for i, subscriber := range e.subscribers { + if subscriber == l { + close(l) + e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...) + e.mu.Unlock() + return true + } + } + e.mu.Unlock() + return false +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 0000000000..d4fc664baa --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,154 @@ +package events + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "testing" + "time" + + "github.com/docker/docker/engine" + "github.com/docker/docker/utils" +) + +func TestEventsPublish(t *testing.T) { + e := New() + l1 := make(chan *utils.JSONMessage) + l2 := make(chan *utils.JSONMessage) + e.subscribe(l1) + e.subscribe(l2) + count := e.subscribersCount() + if count != 2 { + t.Fatalf("Must be 2 subscribers, got %d", count) + } + go e.log("test", "cont", "image") + select { + case msg := <-l1: + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if msg.Status != "test" { + t.Fatalf("Status should be test, got %s", msg.Status) + } + if msg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", msg.ID) + } + if msg.From != "image" { + t.Fatalf("From should be image, got %s", msg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } + select { + case msg := <-l2: + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if msg.Status != "test" { + t.Fatalf("Status should be test, got %s", msg.Status) + } + if msg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", msg.ID) + } + if msg.From != "image" { + t.Fatalf("From should be image, got %s", msg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } +} + +func TestEventsPublishTimeout(t *testing.T) { + e := New() + l := make(chan *utils.JSONMessage) + e.subscribe(l) + + c := make(chan struct{}) + go func() { + e.log("test", "cont", "image") + close(c) + }() + + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout publishing message") + } +} + +func TestLogEvents(t *testing.T) { + e := New() + eng := engine.New() + if err := e.Install(eng); err != nil { + t.Fatal(err) + } + + for i := 0; i < eventsLimit+16; i++ { + action := fmt.Sprintf("action_%d", i) + id := fmt.Sprintf("cont_%d", i) + from := fmt.Sprintf("image_%d", i) + job := eng.Job("log", action, id, from) + if err := job.Run(); err != nil { + t.Fatal(err) + } + } + time.Sleep(50 * time.Millisecond) + if len(e.events) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) + } + + job := eng.Job("events") + job.SetenvInt64("since", 1) + job.SetenvInt64("until", time.Now().Unix()) + buf := bytes.NewBuffer(nil) + job.Stdout.Add(buf) + if err := job.Run(); err != nil { + t.Fatal(err) + } + buf = bytes.NewBuffer(buf.Bytes()) + dec := json.NewDecoder(buf) + var msgs []utils.JSONMessage + for { + var jm utils.JSONMessage + if err := dec.Decode(&jm); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + msgs = append(msgs, jm) + } + if len(msgs) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs)) + } + first := msgs[0] + if first.Status != "action_16" { + t.Fatalf("First action is %s, must be action_15", first.Status) + } + last := msgs[len(msgs)-1] + if last.Status != "action_79" { + t.Fatalf("First action is %s, must be action_79", first.Status) + } +} + +func TestEventsCountJob(t *testing.T) { + e := New() + eng := engine.New() + if err := e.Install(eng); err != nil { + t.Fatal(err) + } + l1 := make(chan *utils.JSONMessage) + l2 := make(chan *utils.JSONMessage) + e.subscribe(l1) + e.subscribe(l2) + job := eng.Job("subscribers_count") + env, _ := job.Stdout.AddEnv() + if err := job.Run(); err != nil { + t.Fatal(err) + } + count := env.GetInt("count") + if count != 2 { + t.Fatalf("There must be 2 subscribers, got %d", count) + } +} diff --git a/graph/export.go b/graph/export.go new file mode 100644 index 0000000000..81b78ca50d --- /dev/null +++ b/graph/export.go @@ -0,0 +1,147 @@ +package graph + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + "path" + + "github.com/docker/docker/archive" + "github.com/docker/docker/engine" + "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/utils" +) + +// CmdImageExport exports all images with the given tag. All versions +// containing the same tag are exported. The resulting output is an +// uncompressed tar ball. +// name is the set of tags to export. +// out is the writer where the images are written to. +func (s *TagStore) CmdImageExport(job *engine.Job) engine.Status { + if len(job.Args) != 1 { + return job.Errorf("Usage: %s IMAGE\n", job.Name) + } + name := job.Args[0] + // get image json + tempdir, err := ioutil.TempDir("", "docker-export-") + if err != nil { + return job.Error(err) + } + defer os.RemoveAll(tempdir) + + utils.Debugf("Serializing %s", name) + + rootRepoMap := map[string]Repository{} + rootRepo, err := s.Get(name) + if err != nil { + return job.Error(err) + } + if rootRepo != nil { + // this is a base repo name, like 'busybox' + + for _, id := range rootRepo { + if err := s.exportImage(job.Eng, id, tempdir); err != nil { + return job.Error(err) + } + } + rootRepoMap[name] = rootRepo + } else { + img, err := s.LookupImage(name) + if err != nil { + return job.Error(err) + } + if img != nil { + // This is a named image like 'busybox:latest' + repoName, repoTag := parsers.ParseRepositoryTag(name) + if err := s.exportImage(job.Eng, img.ID, tempdir); err != nil { + return job.Error(err) + } + // check this length, because a lookup of a truncated has will not have a tag + // and will not need to be added to this map + if len(repoTag) > 0 { + rootRepoMap[repoName] = Repository{repoTag: img.ID} + } + } else { + // this must be an ID that didn't get looked up just right? + if err := s.exportImage(job.Eng, name, tempdir); err != nil { + return job.Error(err) + } + } + } + // write repositories, if there is something to write + if len(rootRepoMap) > 0 { + rootRepoJson, _ := json.Marshal(rootRepoMap) + + if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.FileMode(0644)); err != nil { + return job.Error(err) + } + } else { + utils.Debugf("There were no repositories to write") + } + + fs, err := archive.Tar(tempdir, archive.Uncompressed) + if err != nil { + return job.Error(err) + } + defer fs.Close() + + if _, err := io.Copy(job.Stdout, fs); err != nil { + return job.Error(err) + } + utils.Debugf("End Serializing %s", name) + return engine.StatusOK +} + +// FIXME: this should be a top-level function, not a class method +func (s *TagStore) exportImage(eng *engine.Engine, name, tempdir string) error { + for n := name; n != ""; { + // temporary directory + tmpImageDir := path.Join(tempdir, n) + if err := os.Mkdir(tmpImageDir, os.FileMode(0755)); err != nil { + if os.IsExist(err) { + return nil + } + return err + } + + var version = "1.0" + var versionBuf = []byte(version) + + if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.FileMode(0644)); err != nil { + return err + } + + // serialize json + json, err := os.Create(path.Join(tmpImageDir, "json")) + if err != nil { + return err + } + job := eng.Job("image_inspect", n) + job.SetenvBool("raw", true) + job.Stdout.Add(json) + if err := job.Run(); err != nil { + return err + } + + // serialize filesystem + fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar")) + if err != nil { + return err + } + job = eng.Job("image_tarlayer", n) + job.Stdout.Add(fsTar) + if err := job.Run(); err != nil { + return err + } + + // find parent + job = eng.Job("image_get", n) + info, _ := job.Stdout.AddEnv() + if err := job.Run(); err != nil { + return err + } + n = info.Get("Parent") + } + return nil +} diff --git a/graph/history.go b/graph/history.go new file mode 100644 index 0000000000..2030c4c789 --- /dev/null +++ b/graph/history.go @@ -0,0 +1,46 @@ +package graph + +import ( + "strings" + + "github.com/docker/docker/engine" + "github.com/docker/docker/image" +) + +func (s *TagStore) CmdHistory(job *engine.Job) engine.Status { + if n := len(job.Args); n != 1 { + return job.Errorf("Usage: %s IMAGE", job.Name) + } + name := job.Args[0] + foundImage, err := s.LookupImage(name) + if err != nil { + return job.Error(err) + } + + lookupMap := make(map[string][]string) + for name, repository := range s.Repositories { + for tag, id := range repository { + // If the ID already has a reverse lookup, do not update it unless for "latest" + if _, exists := lookupMap[id]; !exists { + lookupMap[id] = []string{} + } + lookupMap[id] = append(lookupMap[id], name+":"+tag) + } + } + + outs := engine.NewTable("Created", 0) + err = foundImage.WalkHistory(func(img *image.Image) error { + out := &engine.Env{} + out.Set("Id", img.ID) + out.SetInt64("Created", img.Created.Unix()) + out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " ")) + out.SetList("Tags", lookupMap[img.ID]) + out.SetInt64("Size", img.Size) + outs.Add(out) + return nil + }) + if _, err := outs.WriteListTo(job.Stdout); err != nil { + return job.Error(err) + } + return engine.StatusOK +} diff --git a/graph/import.go b/graph/import.go new file mode 100644 index 0000000000..049742af45 --- /dev/null +++ b/graph/import.go @@ -0,0 +1,61 @@ +package graph + +import ( + "net/http" + "net/url" + + "github.com/docker/docker/archive" + "github.com/docker/docker/engine" + "github.com/docker/docker/utils" +) + +func (s *TagStore) CmdImport(job *engine.Job) engine.Status { + if n := len(job.Args); n != 2 && n != 3 { + return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name) + } + var ( + src = job.Args[0] + repo = job.Args[1] + tag string + sf = utils.NewStreamFormatter(job.GetenvBool("json")) + archive archive.ArchiveReader + resp *http.Response + ) + if len(job.Args) > 2 { + tag = job.Args[2] + } + + if src == "-" { + archive = job.Stdin + } else { + u, err := url.Parse(src) + if err != nil { + return job.Error(err) + } + if u.Scheme == "" { + u.Scheme = "http" + u.Host = src + u.Path = "" + } + job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u)) + resp, err = utils.Download(u.String()) + if err != nil { + return job.Error(err) + } + progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing") + defer progressReader.Close() + archive = progressReader + } + img, err := s.graph.Create(archive, "", "", "Imported from "+src, "", nil, nil) + if err != nil { + return job.Error(err) + } + // Optionally register the image at REPO/TAG + if repo != "" { + if err := s.Set(repo, tag, img.ID, true); err != nil { + return job.Error(err) + } + } + job.Stdout.Write(sf.FormatStatus("", img.ID)) + return engine.StatusOK +} diff --git a/graph/list.go b/graph/list.go new file mode 100644 index 0000000000..0e0e97e447 --- /dev/null +++ b/graph/list.go @@ -0,0 +1,103 @@ +package graph + +import ( + "fmt" + "log" + "path" + "strings" + + "github.com/docker/docker/engine" + "github.com/docker/docker/image" + "github.com/docker/docker/pkg/parsers/filters" +) + +func (s *TagStore) CmdImages(job *engine.Job) engine.Status { + var ( + allImages map[string]*image.Image + err error + filt_tagged = true + ) + + imageFilters, err := filters.FromParam(job.Getenv("filters")) + if err != nil { + return job.Error(err) + } + if i, ok := imageFilters["dangling"]; ok { + for _, value := range i { + if strings.ToLower(value) == "true" { + filt_tagged = false + } + } + } + + if job.GetenvBool("all") && filt_tagged { + allImages, err = s.graph.Map() + } else { + allImages, err = s.graph.Heads() + } + if err != nil { + return job.Error(err) + } + lookup := make(map[string]*engine.Env) + s.Lock() + for name, repository := range s.Repositories { + if job.Getenv("filter") != "" { + if match, _ := path.Match(job.Getenv("filter"), name); !match { + continue + } + } + for tag, id := range repository { + image, err := s.graph.Get(id) + if err != nil { + log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err) + continue + } + + if out, exists := lookup[id]; exists { + if filt_tagged { + out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag))) + } + } else { + // get the boolean list for if only the untagged images are requested + delete(allImages, id) + if filt_tagged { + out := &engine.Env{} + out.Set("ParentId", image.Parent) + out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)}) + out.Set("Id", image.ID) + out.SetInt64("Created", image.Created.Unix()) + out.SetInt64("Size", image.Size) + out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size) + lookup[id] = out + } + } + + } + } + s.Unlock() + + outs := engine.NewTable("Created", len(lookup)) + for _, value := range lookup { + outs.Add(value) + } + + // Display images which aren't part of a repository/tag + if job.Getenv("filter") == "" { + for _, image := range allImages { + out := &engine.Env{} + out.Set("ParentId", image.Parent) + out.SetList("RepoTags", []string{":"}) + out.Set("Id", image.ID) + out.SetInt64("Created", image.Created.Unix()) + out.SetInt64("Size", image.Size) + out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size) + outs.Add(out) + } + } + + outs.ReverseSort() + if _, err := outs.WriteListTo(job.Stdout); err != nil { + return job.Error(err) + } + return engine.StatusOK +} diff --git a/graph/load.go b/graph/load.go new file mode 100644 index 0000000000..5d84b82c28 --- /dev/null +++ b/graph/load.go @@ -0,0 +1,118 @@ +package graph + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + "path" + + "github.com/docker/docker/archive" + "github.com/docker/docker/engine" + "github.com/docker/docker/image" + "github.com/docker/docker/utils" +) + +// Loads a set of images into the repository. This is the complementary of ImageExport. +// The input stream is an uncompressed tar ball containing images and metadata. +func (s *TagStore) CmdLoad(job *engine.Job) engine.Status { + tmpImageDir, err := ioutil.TempDir("", "docker-import-") + if err != nil { + return job.Error(err) + } + defer os.RemoveAll(tmpImageDir) + + var ( + repoTarFile = path.Join(tmpImageDir, "repo.tar") + repoDir = path.Join(tmpImageDir, "repo") + ) + + tarFile, err := os.Create(repoTarFile) + if err != nil { + return job.Error(err) + } + if _, err := io.Copy(tarFile, job.Stdin); err != nil { + return job.Error(err) + } + tarFile.Close() + + repoFile, err := os.Open(repoTarFile) + if err != nil { + return job.Error(err) + } + if err := os.Mkdir(repoDir, os.ModeDir); err != nil { + return job.Error(err) + } + if err := archive.Untar(repoFile, repoDir, nil); err != nil { + return job.Error(err) + } + + dirs, err := ioutil.ReadDir(repoDir) + if err != nil { + return job.Error(err) + } + + for _, d := range dirs { + if d.IsDir() { + if err := s.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil { + return job.Error(err) + } + } + } + + repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories")) + if err == nil { + repositories := map[string]Repository{} + if err := json.Unmarshal(repositoriesJson, &repositories); err != nil { + return job.Error(err) + } + + for imageName, tagMap := range repositories { + for tag, address := range tagMap { + if err := s.Set(imageName, tag, address, true); err != nil { + return job.Error(err) + } + } + } + } else if !os.IsNotExist(err) { + return job.Error(err) + } + + return engine.StatusOK +} + +func (s *TagStore) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error { + if err := eng.Job("image_get", address).Run(); err != nil { + utils.Debugf("Loading %s", address) + + imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json")) + if err != nil { + utils.Debugf("Error reading json", err) + return err + } + + layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar")) + if err != nil { + utils.Debugf("Error reading embedded tar", err) + return err + } + img, err := image.NewImgJSON(imageJson) + if err != nil { + utils.Debugf("Error unmarshalling json", err) + return err + } + if img.Parent != "" { + if !s.graph.Exists(img.Parent) { + if err := s.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil { + return err + } + } + } + if err := s.graph.Register(imageJson, layer, img); err != nil { + return err + } + } + utils.Debugf("Completed processing %s", address) + + return nil +} diff --git a/graph/service.go b/graph/service.go index 75c2b5c846..74d6226d0a 100644 --- a/graph/service.go +++ b/graph/service.go @@ -1,20 +1,33 @@ package graph import ( + "fmt" "io" "github.com/docker/docker/engine" "github.com/docker/docker/image" - "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/utils" ) func (s *TagStore) Install(eng *engine.Engine) error { - eng.Register("image_set", s.CmdSet) - eng.Register("image_tag", s.CmdTag) - eng.Register("image_get", s.CmdGet) - eng.Register("image_inspect", s.CmdLookup) - eng.Register("image_tarlayer", s.CmdTarLayer) + for name, handler := range map[string]engine.Handler{ + "image_set": s.CmdSet, + "image_tag": s.CmdTag, + "tag": s.CmdTagLegacy, // FIXME merge with "image_tag" + "image_get": s.CmdGet, + "image_inspect": s.CmdLookup, + "image_tarlayer": s.CmdTarLayer, + "image_export": s.CmdImageExport, + "history": s.CmdHistory, + "images": s.CmdImages, + "viz": s.CmdViz, + "load": s.CmdLoad, + "import": s.CmdImport, + } { + if err := eng.Register(name, handler); err != nil { + return fmt.Errorf("Could not register %q: %v", name, err) + } + } return nil } @@ -65,29 +78,6 @@ func (s *TagStore) CmdSet(job *engine.Job) engine.Status { return engine.StatusOK } -// CmdTag assigns a new name and tag to an existing image. If the tag already exists, -// it is changed and the image previously referenced by the tag loses that reference. -// This may cause the old image to be garbage-collected if its reference count reaches zero. -// -// Syntax: image_tag NEWNAME OLDNAME -// Example: image_tag shykes/myapp:latest shykes/myapp:1.42.0 -func (s *TagStore) CmdTag(job *engine.Job) engine.Status { - if len(job.Args) != 2 { - return job.Errorf("usage: %s NEWNAME OLDNAME", job.Name) - } - var ( - newName = job.Args[0] - oldName = job.Args[1] - ) - newRepo, newTag := parsers.ParseRepositoryTag(newName) - // FIXME: Set should either parse both old and new name, or neither. - // the current prototype is inconsistent. - if err := s.Set(newRepo, newTag, oldName, true); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - // CmdGet returns information about an image. // If the image doesn't exist, an empty object is returned, to allow // checking for an image's existence. diff --git a/graph/tag.go b/graph/tag.go new file mode 100644 index 0000000000..3d89422f9d --- /dev/null +++ b/graph/tag.go @@ -0,0 +1,44 @@ +package graph + +import ( + "github.com/docker/docker/engine" + "github.com/docker/docker/pkg/parsers" +) + +// CmdTag assigns a new name and tag to an existing image. If the tag already exists, +// it is changed and the image previously referenced by the tag loses that reference. +// This may cause the old image to be garbage-collected if its reference count reaches zero. +// +// Syntax: image_tag NEWNAME OLDNAME +// Example: image_tag shykes/myapp:latest shykes/myapp:1.42.0 +func (s *TagStore) CmdTag(job *engine.Job) engine.Status { + if len(job.Args) != 2 { + return job.Errorf("usage: %s NEWNAME OLDNAME", job.Name) + } + var ( + newName = job.Args[0] + oldName = job.Args[1] + ) + newRepo, newTag := parsers.ParseRepositoryTag(newName) + // FIXME: Set should either parse both old and new name, or neither. + // the current prototype is inconsistent. + if err := s.Set(newRepo, newTag, oldName, true); err != nil { + return job.Error(err) + } + return engine.StatusOK +} + +// FIXME: merge into CmdTag above, and merge "image_tag" and "tag" into a single job. +func (s *TagStore) CmdTagLegacy(job *engine.Job) engine.Status { + if len(job.Args) != 2 && len(job.Args) != 3 { + return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name) + } + var tag string + if len(job.Args) == 3 { + tag = job.Args[2] + } + if err := s.Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil { + return job.Error(err) + } + return engine.StatusOK +} diff --git a/graph/viz.go b/graph/viz.go new file mode 100644 index 0000000000..924c22b6a2 --- /dev/null +++ b/graph/viz.go @@ -0,0 +1,38 @@ +package graph + +import ( + "strings" + + "github.com/docker/docker/engine" + "github.com/docker/docker/image" +) + +func (s *TagStore) CmdViz(job *engine.Job) engine.Status { + images, _ := s.graph.Map() + if images == nil { + return engine.StatusOK + } + job.Stdout.Write([]byte("digraph docker {\n")) + + var ( + parentImage *image.Image + err error + ) + for _, image := range images { + parentImage, err = image.GetParent() + if err != nil { + return job.Errorf("Error while getting parent image: %v", err) + } + if parentImage != nil { + job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n")) + } else { + job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n")) + } + } + + for id, repos := range s.GetRepoRefs() { + job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n")) + } + job.Stdout.Write([]byte(" base [style=invisible]\n}\n")) + return engine.StatusOK +} diff --git a/integration-cli/docker_cli_rm_test.go b/integration-cli/docker_cli_rm_test.go index b61f1c9a93..08bc2a7224 100644 --- a/integration-cli/docker_cli_rm_test.go +++ b/integration-cli/docker_cli_rm_test.go @@ -3,6 +3,7 @@ package main import ( "os" "os/exec" + "strings" "testing" ) @@ -92,6 +93,69 @@ func TestRemoveContainerWithStopAndKill(t *testing.T) { logDone("rm - with --stop=true and --kill=true") } +func TestContainerOrphaning(t *testing.T) { + dockerfile1 := `FROM busybox:latest + ENTRYPOINT ["/bin/true"]` + img := "test-container-orphaning" + dockerfile2 := `FROM busybox:latest + ENTRYPOINT ["/bin/true"] + MAINTAINER Integration Tests` + + // build first dockerfile + img1, err := buildImage(img, dockerfile1, true) + if err != nil { + t.Fatalf("Could not build image %s: %v", img, err) + } + // run container on first image + if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "run", img)); err != nil { + t.Fatalf("Could not run image %s: %v: %s", img, err, out) + } + // rebuild dockerfile with a small addition at the end + if _, err := buildImage(img, dockerfile2, true); err != nil { + t.Fatalf("Could not rebuild image %s: %v", img, err) + } + // try to remove the image, should error out. + if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "rmi", img)); err == nil { + t.Fatalf("Expected to error out removing the image, but succeeded: %s", out) + } + // check if we deleted the first image + out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "images", "-q", "--no-trunc")) + if err != nil { + t.Fatalf("%v: %s", err, out) + } + if !strings.Contains(out, img1) { + t.Fatal("Orphaned container (could not find '%s' in docker images): %s", img1, out) + } + + deleteAllContainers() + + logDone("rm - container orphaning") +} + +func TestDeleteTagWithExistingContainers(t *testing.T) { + container := "test-delete-tag" + newtag := "busybox:newtag" + bb := "busybox:latest" + if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "tag", bb, newtag)); err != nil { + t.Fatalf("Could not tag busybox: %v: %s", err, out) + } + if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "run", "--name", container, bb, "/bin/true")); err != nil { + t.Fatalf("Could not run busybox: %v: %s", err, out) + } + out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "rmi", newtag)) + if err != nil { + t.Fatalf("Could not remove tag %s: %v: %s", newtag, err, out) + } + if d := strings.Count(out, "Untagged: "); d != 1 { + t.Fatalf("Expected 1 untagged entry got %d: %q", d, out) + } + + deleteAllContainers() + + logDone("rm - delete tag with existing containers") + +} + func createRunningContainer(t *testing.T, name string) { cmd := exec.Command(dockerBinary, "run", "-dt", "--name", name, "busybox", "top") if _, err := runCommand(cmd); err != nil { diff --git a/integration/commands_test.go b/integration/commands_test.go index 1e9f18eaf8..44e77e1c7e 100644 --- a/integration/commands_test.go +++ b/integration/commands_test.go @@ -13,7 +13,6 @@ import ( "github.com/docker/docker/api/client" "github.com/docker/docker/daemon" - "github.com/docker/docker/engine" "github.com/docker/docker/pkg/term" "github.com/docker/docker/utils" ) @@ -682,70 +681,3 @@ func TestRunCidFileCleanupIfEmpty(t *testing.T) { <-c }) } - -func TestContainerOrphaning(t *testing.T) { - - // setup a temporary directory - tmpDir, err := ioutil.TempDir("", "project") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpDir) - - // setup a CLI and server - cli := client.NewDockerCli(nil, ioutil.Discard, ioutil.Discard, testDaemonProto, testDaemonAddr, nil) - defer cleanup(globalEngine, t) - srv := mkServerFromEngine(globalEngine, t) - - // closure to build something - buildSomething := func(template string, image string) string { - dockerfile := path.Join(tmpDir, "Dockerfile") - replacer := strings.NewReplacer("{IMAGE}", unitTestImageID) - contents := replacer.Replace(template) - ioutil.WriteFile(dockerfile, []byte(contents), 0x777) - if err := cli.CmdBuild("-t", image, tmpDir); err != nil { - t.Fatal(err) - } - job := globalEngine.Job("image_get", image) - info, _ := job.Stdout.AddEnv() - if err := job.Run(); err != nil { - t.Fatal(err) - } - return info.Get("Id") - } - - // build an image - imageName := "orphan-test" - template1 := ` - from {IMAGE} - cmd ["/bin/echo", "holla"] - ` - img1 := buildSomething(template1, imageName) - - // create a container using the fist image - if err := cli.CmdRun(imageName); err != nil { - t.Fatal(err) - } - - // build a new image that splits lineage - template2 := ` - from {IMAGE} - cmd ["/bin/echo", "holla"] - expose 22 - ` - buildSomething(template2, imageName) - - // remove the second image by name - resp := engine.NewTable("", 0) - if err := srv.DeleteImage(imageName, resp, true, false, false); err == nil { - t.Fatal("Expected error, got none") - } - - // see if we deleted the first image (and orphaned the container) - for _, i := range resp.Data { - if img1 == i.Get("Deleted") { - t.Fatal("Orphaned image with container") - } - } - -} diff --git a/integration/server_test.go b/integration/server_test.go index 7b9f5ade01..241ca76bfe 100644 --- a/integration/server_test.go +++ b/integration/server_test.go @@ -297,56 +297,3 @@ func TestImagesFilter(t *testing.T) { t.Fatal("incorrect number of matches returned") } } - -// Regression test for being able to untag an image with an existing -// container -func TestDeleteTagWithExistingContainers(t *testing.T) { - eng := NewTestEngine(t) - defer nuke(mkDaemonFromEngine(eng, t)) - - srv := mkServerFromEngine(eng, t) - - // Tag the image - if err := eng.Job("tag", unitTestImageID, "utest", "tag1").Run(); err != nil { - t.Fatal(err) - } - - // Create a container from the image - config, _, _, err := runconfig.Parse([]string{unitTestImageID, "echo test"}, nil) - if err != nil { - t.Fatal(err) - } - - id := createNamedTestContainer(eng, config, t, "testingtags") - if id == "" { - t.Fatal("No id returned") - } - - job := srv.Eng.Job("containers") - job.SetenvBool("all", true) - outs, err := job.Stdout.AddListTable() - if err != nil { - t.Fatal(err) - } - if err := job.Run(); err != nil { - t.Fatal(err) - } - - if len(outs.Data) != 1 { - t.Fatalf("Expected 1 container got %d", len(outs.Data)) - } - - // Try to remove the tag - imgs := engine.NewTable("", 0) - if err := srv.DeleteImage("utest:tag1", imgs, true, false, false); err != nil { - t.Fatal(err) - } - - if len(imgs.Data) != 1 { - t.Fatalf("Should only have deleted one untag %d", len(imgs.Data)) - } - - if untag := imgs.Data[0].Get("Untagged"); untag != "utest:tag1" { - t.Fatalf("Expected %s got %s", unitTestImageID, untag) - } -} diff --git a/server/events.go b/server/events.go deleted file mode 100644 index 214dd69e04..0000000000 --- a/server/events.go +++ /dev/null @@ -1,108 +0,0 @@ -// DEPRECATION NOTICE. PLEASE DO NOT ADD ANYTHING TO THIS FILE. -// -// For additional commments see server/server.go -// -package server - -import ( - "encoding/json" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/utils" -) - -func (srv *Server) Events(job *engine.Job) engine.Status { - if len(job.Args) != 0 { - return job.Errorf("Usage: %s", job.Name) - } - - var ( - since = job.GetenvInt64("since") - until = job.GetenvInt64("until") - timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) - ) - - // If no until, disable timeout - if until == 0 { - timeout.Stop() - } - - listener := make(chan utils.JSONMessage) - srv.eventPublisher.Subscribe(listener) - defer srv.eventPublisher.Unsubscribe(listener) - - // When sending an event JSON serialization errors are ignored, but all - // other errors lead to the eviction of the listener. - sendEvent := func(event *utils.JSONMessage) error { - if b, err := json.Marshal(event); err == nil { - if _, err = job.Stdout.Write(b); err != nil { - return err - } - } - return nil - } - - job.Stdout.Write(nil) - - // Resend every event in the [since, until] time interval. - if since != 0 { - for _, event := range srv.GetEvents() { - if event.Time >= since && (event.Time <= until || until == 0) { - if err := sendEvent(&event); err != nil { - return job.Error(err) - } - } - } - } - - for { - select { - case event, ok := <-listener: - if !ok { - return engine.StatusOK - } - if err := sendEvent(&event); err != nil { - return job.Error(err) - } - case <-timeout.C: - return engine.StatusOK - } - } -} - -// FIXME: this is a shim to allow breaking up other parts of Server without -// dragging the sphagetti dependency along. -func (srv *Server) Log(job *engine.Job) engine.Status { - if len(job.Args) != 3 { - return job.Errorf("usage: %s ACTION ID FROM", job.Name) - } - srv.LogEvent(job.Args[0], job.Args[1], job.Args[2]) - return engine.StatusOK -} - -func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { - now := time.Now().UTC().Unix() - jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now} - srv.AddEvent(jm) - srv.eventPublisher.Publish(jm) - return &jm -} - -func (srv *Server) AddEvent(jm utils.JSONMessage) { - srv.Lock() - if len(srv.events) == cap(srv.events) { - // discard oldest event - copy(srv.events, srv.events[1:]) - srv.events[len(srv.events)-1] = jm - } else { - srv.events = append(srv.events, jm) - } - srv.Unlock() -} - -func (srv *Server) GetEvents() []utils.JSONMessage { - srv.RLock() - defer srv.RUnlock() - return srv.events -} diff --git a/server/image.go b/server/image.go index acc8794985..5cde8dd63e 100644 --- a/server/image.go +++ b/server/image.go @@ -5,13 +5,10 @@ package server import ( - "encoding/json" "fmt" "io" "io/ioutil" - "log" "net" - "net/http" "net/url" "os" "os/exec" @@ -22,146 +19,12 @@ import ( "github.com/docker/docker/archive" "github.com/docker/docker/builder" "github.com/docker/docker/engine" - "github.com/docker/docker/graph" "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" - "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/registry" "github.com/docker/docker/utils" ) -// ImageExport exports all images with the given tag. All versions -// containing the same tag are exported. The resulting output is an -// uncompressed tar ball. -// name is the set of tags to export. -// out is the writer where the images are written to. -func (srv *Server) ImageExport(job *engine.Job) engine.Status { - if len(job.Args) != 1 { - return job.Errorf("Usage: %s IMAGE\n", job.Name) - } - name := job.Args[0] - // get image json - tempdir, err := ioutil.TempDir("", "docker-export-") - if err != nil { - return job.Error(err) - } - defer os.RemoveAll(tempdir) - - utils.Debugf("Serializing %s", name) - - rootRepoMap := map[string]graph.Repository{} - rootRepo, err := srv.daemon.Repositories().Get(name) - if err != nil { - return job.Error(err) - } - if rootRepo != nil { - // this is a base repo name, like 'busybox' - - for _, id := range rootRepo { - if err := srv.exportImage(job.Eng, id, tempdir); err != nil { - return job.Error(err) - } - } - rootRepoMap[name] = rootRepo - } else { - img, err := srv.daemon.Repositories().LookupImage(name) - if err != nil { - return job.Error(err) - } - if img != nil { - // This is a named image like 'busybox:latest' - repoName, repoTag := parsers.ParseRepositoryTag(name) - if err := srv.exportImage(job.Eng, img.ID, tempdir); err != nil { - return job.Error(err) - } - // check this length, because a lookup of a truncated has will not have a tag - // and will not need to be added to this map - if len(repoTag) > 0 { - rootRepoMap[repoName] = graph.Repository{repoTag: img.ID} - } - } else { - // this must be an ID that didn't get looked up just right? - if err := srv.exportImage(job.Eng, name, tempdir); err != nil { - return job.Error(err) - } - } - } - // write repositories, if there is something to write - if len(rootRepoMap) > 0 { - rootRepoJson, _ := json.Marshal(rootRepoMap) - - if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.FileMode(0644)); err != nil { - return job.Error(err) - } - } else { - utils.Debugf("There were no repositories to write") - } - - fs, err := archive.Tar(tempdir, archive.Uncompressed) - if err != nil { - return job.Error(err) - } - defer fs.Close() - - if _, err := io.Copy(job.Stdout, fs); err != nil { - return job.Error(err) - } - utils.Debugf("End Serializing %s", name) - return engine.StatusOK -} - -func (srv *Server) exportImage(eng *engine.Engine, name, tempdir string) error { - for n := name; n != ""; { - // temporary directory - tmpImageDir := path.Join(tempdir, n) - if err := os.Mkdir(tmpImageDir, os.FileMode(0755)); err != nil { - if os.IsExist(err) { - return nil - } - return err - } - - var version = "1.0" - var versionBuf = []byte(version) - - if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.FileMode(0644)); err != nil { - return err - } - - // serialize json - json, err := os.Create(path.Join(tmpImageDir, "json")) - if err != nil { - return err - } - job := eng.Job("image_inspect", n) - job.SetenvBool("raw", true) - job.Stdout.Add(json) - if err := job.Run(); err != nil { - return err - } - - // serialize filesystem - fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar")) - if err != nil { - return err - } - job = eng.Job("image_tarlayer", n) - job.Stdout.Add(fsTar) - if err := job.Run(); err != nil { - return err - } - - // find parent - job = eng.Job("image_get", n) - info, _ := job.Stdout.AddEnv() - if err := job.Run(); err != nil { - return err - } - n = info.Get("Parent") - } - return nil -} - func (srv *Server) Build(job *engine.Job) engine.Status { if len(job.Args) != 0 { return job.Errorf("Usage: %s\n", job.Name) @@ -242,282 +105,6 @@ func (srv *Server) Build(job *engine.Job) engine.Status { return engine.StatusOK } -// Loads a set of images into the repository. This is the complementary of ImageExport. -// The input stream is an uncompressed tar ball containing images and metadata. -func (srv *Server) ImageLoad(job *engine.Job) engine.Status { - tmpImageDir, err := ioutil.TempDir("", "docker-import-") - if err != nil { - return job.Error(err) - } - defer os.RemoveAll(tmpImageDir) - - var ( - repoTarFile = path.Join(tmpImageDir, "repo.tar") - repoDir = path.Join(tmpImageDir, "repo") - ) - - tarFile, err := os.Create(repoTarFile) - if err != nil { - return job.Error(err) - } - if _, err := io.Copy(tarFile, job.Stdin); err != nil { - return job.Error(err) - } - tarFile.Close() - - repoFile, err := os.Open(repoTarFile) - if err != nil { - return job.Error(err) - } - if err := os.Mkdir(repoDir, os.ModeDir); err != nil { - return job.Error(err) - } - if err := archive.Untar(repoFile, repoDir, nil); err != nil { - return job.Error(err) - } - - dirs, err := ioutil.ReadDir(repoDir) - if err != nil { - return job.Error(err) - } - - for _, d := range dirs { - if d.IsDir() { - if err := srv.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil { - return job.Error(err) - } - } - } - - repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories")) - if err == nil { - repositories := map[string]graph.Repository{} - if err := json.Unmarshal(repositoriesJson, &repositories); err != nil { - return job.Error(err) - } - - for imageName, tagMap := range repositories { - for tag, address := range tagMap { - if err := srv.daemon.Repositories().Set(imageName, tag, address, true); err != nil { - return job.Error(err) - } - } - } - } else if !os.IsNotExist(err) { - return job.Error(err) - } - - return engine.StatusOK -} - -func (srv *Server) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error { - if err := eng.Job("image_get", address).Run(); err != nil { - utils.Debugf("Loading %s", address) - - imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json")) - if err != nil { - utils.Debugf("Error reading json", err) - return err - } - - layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar")) - if err != nil { - utils.Debugf("Error reading embedded tar", err) - return err - } - img, err := image.NewImgJSON(imageJson) - if err != nil { - utils.Debugf("Error unmarshalling json", err) - return err - } - if img.Parent != "" { - if !srv.daemon.Graph().Exists(img.Parent) { - if err := srv.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil { - return err - } - } - } - if err := srv.daemon.Graph().Register(imageJson, layer, img); err != nil { - return err - } - } - utils.Debugf("Completed processing %s", address) - - return nil -} - -func (srv *Server) ImagesViz(job *engine.Job) engine.Status { - images, _ := srv.daemon.Graph().Map() - if images == nil { - return engine.StatusOK - } - job.Stdout.Write([]byte("digraph docker {\n")) - - var ( - parentImage *image.Image - err error - ) - for _, image := range images { - parentImage, err = image.GetParent() - if err != nil { - return job.Errorf("Error while getting parent image: %v", err) - } - if parentImage != nil { - job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n")) - } else { - job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n")) - } - } - - for id, repos := range srv.daemon.Repositories().GetRepoRefs() { - job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n")) - } - job.Stdout.Write([]byte(" base [style=invisible]\n}\n")) - return engine.StatusOK -} - -func (srv *Server) Images(job *engine.Job) engine.Status { - var ( - allImages map[string]*image.Image - err error - filt_tagged = true - ) - - imageFilters, err := filters.FromParam(job.Getenv("filters")) - if err != nil { - return job.Error(err) - } - if i, ok := imageFilters["dangling"]; ok { - for _, value := range i { - if strings.ToLower(value) == "true" { - filt_tagged = false - } - } - } - - if job.GetenvBool("all") && filt_tagged { - allImages, err = srv.daemon.Graph().Map() - } else { - allImages, err = srv.daemon.Graph().Heads() - } - if err != nil { - return job.Error(err) - } - lookup := make(map[string]*engine.Env) - srv.daemon.Repositories().Lock() - for name, repository := range srv.daemon.Repositories().Repositories { - if job.Getenv("filter") != "" { - if match, _ := path.Match(job.Getenv("filter"), name); !match { - continue - } - } - for tag, id := range repository { - image, err := srv.daemon.Graph().Get(id) - if err != nil { - log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err) - continue - } - - if out, exists := lookup[id]; exists { - if filt_tagged { - out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag))) - } - } else { - // get the boolean list for if only the untagged images are requested - delete(allImages, id) - if filt_tagged { - out := &engine.Env{} - out.Set("ParentId", image.Parent) - out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)}) - out.Set("Id", image.ID) - out.SetInt64("Created", image.Created.Unix()) - out.SetInt64("Size", image.Size) - out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size) - lookup[id] = out - } - } - - } - } - srv.daemon.Repositories().Unlock() - - outs := engine.NewTable("Created", len(lookup)) - for _, value := range lookup { - outs.Add(value) - } - - // Display images which aren't part of a repository/tag - if job.Getenv("filter") == "" { - for _, image := range allImages { - out := &engine.Env{} - out.Set("ParentId", image.Parent) - out.SetList("RepoTags", []string{":"}) - out.Set("Id", image.ID) - out.SetInt64("Created", image.Created.Unix()) - out.SetInt64("Size", image.Size) - out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size) - outs.Add(out) - } - } - - outs.ReverseSort() - if _, err := outs.WriteListTo(job.Stdout); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - -func (srv *Server) ImageHistory(job *engine.Job) engine.Status { - if n := len(job.Args); n != 1 { - return job.Errorf("Usage: %s IMAGE", job.Name) - } - name := job.Args[0] - foundImage, err := srv.daemon.Repositories().LookupImage(name) - if err != nil { - return job.Error(err) - } - - lookupMap := make(map[string][]string) - for name, repository := range srv.daemon.Repositories().Repositories { - for tag, id := range repository { - // If the ID already has a reverse lookup, do not update it unless for "latest" - if _, exists := lookupMap[id]; !exists { - lookupMap[id] = []string{} - } - lookupMap[id] = append(lookupMap[id], name+":"+tag) - } - } - - outs := engine.NewTable("Created", 0) - err = foundImage.WalkHistory(func(img *image.Image) error { - out := &engine.Env{} - out.Set("Id", img.ID) - out.SetInt64("Created", img.Created.Unix()) - out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " ")) - out.SetList("Tags", lookupMap[img.ID]) - out.SetInt64("Size", img.Size) - outs.Add(out) - return nil - }) - if _, err := outs.WriteListTo(job.Stdout); err != nil { - return job.Error(err) - } - return engine.StatusOK -} -func (srv *Server) ImageTag(job *engine.Job) engine.Status { - if len(job.Args) != 2 && len(job.Args) != 3 { - return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name) - } - var tag string - if len(job.Args) == 3 { - tag = job.Args[2] - } - if err := srv.daemon.Repositories().Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error { history, err := r.GetRemoteHistory(imgID, endpoint, token) if err != nil { @@ -1038,198 +625,6 @@ func (srv *Server) ImagePush(job *engine.Job) engine.Status { return engine.StatusOK } -func (srv *Server) ImageImport(job *engine.Job) engine.Status { - if n := len(job.Args); n != 2 && n != 3 { - return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name) - } - var ( - src = job.Args[0] - repo = job.Args[1] - tag string - sf = utils.NewStreamFormatter(job.GetenvBool("json")) - archive archive.ArchiveReader - resp *http.Response - ) - if len(job.Args) > 2 { - tag = job.Args[2] - } - - if src == "-" { - archive = job.Stdin - } else { - u, err := url.Parse(src) - if err != nil { - return job.Error(err) - } - if u.Scheme == "" { - u.Scheme = "http" - u.Host = src - u.Path = "" - } - job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u)) - resp, err = utils.Download(u.String()) - if err != nil { - return job.Error(err) - } - progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing") - defer progressReader.Close() - archive = progressReader - } - img, err := srv.daemon.Graph().Create(archive, "", "", "Imported from "+src, "", nil, nil) - if err != nil { - return job.Error(err) - } - // Optionally register the image at REPO/TAG - if repo != "" { - if err := srv.daemon.Repositories().Set(repo, tag, img.ID, true); err != nil { - return job.Error(err) - } - } - job.Stdout.Write(sf.FormatStatus("", img.ID)) - return engine.StatusOK -} -func (srv *Server) DeleteImage(name string, imgs *engine.Table, first, force, noprune bool) error { - var ( - repoName, tag string - tags = []string{} - tagDeleted bool - ) - - repoName, tag = parsers.ParseRepositoryTag(name) - if tag == "" { - tag = graph.DEFAULTTAG - } - - img, err := srv.daemon.Repositories().LookupImage(name) - if err != nil { - if r, _ := srv.daemon.Repositories().Get(repoName); r != nil { - return fmt.Errorf("No such image: %s:%s", repoName, tag) - } - return fmt.Errorf("No such image: %s", name) - } - - if strings.Contains(img.ID, name) { - repoName = "" - tag = "" - } - - byParents, err := srv.daemon.Graph().ByParent() - if err != nil { - return err - } - - //If delete by id, see if the id belong only to one repository - if repoName == "" { - for _, repoAndTag := range srv.daemon.Repositories().ByID()[img.ID] { - parsedRepo, parsedTag := parsers.ParseRepositoryTag(repoAndTag) - if repoName == "" || repoName == parsedRepo { - repoName = parsedRepo - if parsedTag != "" { - tags = append(tags, parsedTag) - } - } else if repoName != parsedRepo && !force { - // the id belongs to multiple repos, like base:latest and user:test, - // in that case return conflict - return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name) - } - } - } else { - tags = append(tags, tag) - } - - if !first && len(tags) > 0 { - return nil - } - - //Untag the current image - for _, tag := range tags { - tagDeleted, err = srv.daemon.Repositories().Delete(repoName, tag) - if err != nil { - return err - } - if tagDeleted { - out := &engine.Env{} - out.Set("Untagged", repoName+":"+tag) - imgs.Add(out) - srv.LogEvent("untag", img.ID, "") - } - } - tags = srv.daemon.Repositories().ByID()[img.ID] - if (len(tags) <= 1 && repoName == "") || len(tags) == 0 { - if len(byParents[img.ID]) == 0 { - if err := srv.canDeleteImage(img.ID, force, tagDeleted); err != nil { - return err - } - if err := srv.daemon.Repositories().DeleteAll(img.ID); err != nil { - return err - } - if err := srv.daemon.Graph().Delete(img.ID); err != nil { - return err - } - out := &engine.Env{} - out.Set("Deleted", img.ID) - imgs.Add(out) - srv.LogEvent("delete", img.ID, "") - if img.Parent != "" && !noprune { - err := srv.DeleteImage(img.Parent, imgs, false, force, noprune) - if first { - return err - } - - } - - } - } - return nil -} - -func (srv *Server) ImageDelete(job *engine.Job) engine.Status { - if n := len(job.Args); n != 1 { - return job.Errorf("Usage: %s IMAGE", job.Name) - } - imgs := engine.NewTable("", 0) - if err := srv.DeleteImage(job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil { - return job.Error(err) - } - if len(imgs.Data) == 0 { - return job.Errorf("Conflict, %s wasn't deleted", job.Args[0]) - } - if _, err := imgs.WriteListTo(job.Stdout); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - -func (srv *Server) canDeleteImage(imgID string, force, untagged bool) error { - var message string - if untagged { - message = " (docker untagged the image)" - } - for _, container := range srv.daemon.List() { - parent, err := srv.daemon.Repositories().LookupImage(container.Image) - if err != nil { - return err - } - - if err := parent.WalkHistory(func(p *image.Image) error { - if imgID == p.ID { - if container.State.IsRunning() { - if force { - return fmt.Errorf("Conflict, cannot force delete %s because the running container %s is using it%s, stop it and retry", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) - } - return fmt.Errorf("Conflict, cannot delete %s because the running container %s is using it%s, stop it and use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) - } else if !force { - return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it%s, use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message) - } - } - return nil - }); err != nil { - return err - } - } - return nil -} - func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) { srv.Lock() defer srv.Unlock() diff --git a/server/init.go b/server/init.go index cbe5340f21..1814e6f534 100644 --- a/server/init.go +++ b/server/init.go @@ -86,20 +86,10 @@ func InitServer(job *engine.Job) engine.Status { job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon) for name, handler := range map[string]engine.Handler{ - "tag": srv.ImageTag, // FIXME merge with "image_tag" - "info": srv.DockerInfo, - "image_export": srv.ImageExport, - "images": srv.Images, - "history": srv.ImageHistory, - "viz": srv.ImagesViz, - "log": srv.Log, - "load": srv.ImageLoad, - "build": srv.Build, - "pull": srv.ImagePull, - "import": srv.ImageImport, - "image_delete": srv.ImageDelete, - "events": srv.Events, - "push": srv.ImagePush, + "info": srv.DockerInfo, + "build": srv.Build, + "pull": srv.ImagePull, + "push": srv.ImagePush, } { if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil { return job.Error(err) @@ -125,12 +115,10 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) return nil, err } srv := &Server{ - Eng: eng, - daemon: daemon, - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), - events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events - eventPublisher: utils.NewJSONMessagePublisher(), + Eng: eng, + daemon: daemon, + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), } daemon.SetServer(srv) return srv, nil diff --git a/server/server.go b/server/server.go index afa4f65f3c..3c8f0708a9 100644 --- a/server/server.go +++ b/server/server.go @@ -67,6 +67,11 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { initPath = srv.daemon.SystemInitPath() } + cjob := job.Eng.Job("subscribers_count") + env, _ := cjob.Stdout.AddEnv() + if err := cjob.Run(); err != nil { + return job.Error(err) + } v := &engine.Env{} v.SetInt("Containers", len(srv.daemon.List())) v.SetInt("Images", imgcount) @@ -79,7 +84,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { v.SetInt("NFd", utils.GetTotalUsedFds()) v.SetInt("NGoroutines", runtime.NumGoroutine()) v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name()) - v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount()) + v.SetInt("NEventsListener", env.GetInt("count")) v.Set("KernelVersion", kernelVersion) v.Set("OperatingSystem", operatingSystem) v.Set("IndexServerAddress", registry.IndexServerAddress()) @@ -128,12 +133,10 @@ func (srv *Server) Close() error { type Server struct { sync.RWMutex - daemon *daemon.Daemon - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} - events []utils.JSONMessage - eventPublisher *utils.JSONMessagePublisher - Eng *engine.Engine - running bool - tasks sync.WaitGroup + daemon *daemon.Daemon + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} + Eng *engine.Engine + running bool + tasks sync.WaitGroup } diff --git a/server/server_unit_test.go b/server/server_unit_test.go index 91ca709f4f..16f06c145e 100644 --- a/server/server_unit_test.go +++ b/server/server_unit_test.go @@ -1,11 +1,6 @@ package server -import ( - "testing" - "time" - - "github.com/docker/docker/utils" -) +import "testing" func TestPools(t *testing.T) { srv := &Server{ @@ -44,55 +39,3 @@ func TestPools(t *testing.T) { t.Fatalf("Expected `Unknown pool type`") } } - -func TestLogEvent(t *testing.T) { - srv := &Server{ - events: make([]utils.JSONMessage, 0, 64), - eventPublisher: utils.NewJSONMessagePublisher(), - } - - srv.LogEvent("fakeaction", "fakeid", "fakeimage") - - listener := make(chan utils.JSONMessage) - srv.eventPublisher.Subscribe(listener) - - srv.LogEvent("fakeaction2", "fakeid", "fakeimage") - - numEvents := len(srv.GetEvents()) - if numEvents != 2 { - t.Fatalf("Expected 2 events, found %d", numEvents) - } - go func() { - time.Sleep(200 * time.Millisecond) - srv.LogEvent("fakeaction3", "fakeid", "fakeimage") - time.Sleep(200 * time.Millisecond) - srv.LogEvent("fakeaction4", "fakeid", "fakeimage") - }() - - setTimeout(t, "Listening for events timed out", 2*time.Second, func() { - for i := 2; i < 4; i++ { - event := <-listener - if event != srv.GetEvents()[i] { - t.Fatalf("Event received it different than expected") - } - } - }) -} - -// FIXME: this is duplicated from integration/commands_test.go -func setTimeout(t *testing.T, msg string, d time.Duration, f func()) { - c := make(chan bool) - - // Make sure we are not too long - go func() { - time.Sleep(d) - c <- true - }() - go func() { - f() - c <- false - }() - if <-c && msg != "" { - t.Fatal(msg) - } -} diff --git a/utils/jsonmessagepublisher.go b/utils/jsonmessagepublisher.go deleted file mode 100644 index 659e6c8304..0000000000 --- a/utils/jsonmessagepublisher.go +++ /dev/null @@ -1,61 +0,0 @@ -package utils - -import ( - "sync" - "time" -) - -func NewJSONMessagePublisher() *JSONMessagePublisher { - return &JSONMessagePublisher{} -} - -type JSONMessageListener chan<- JSONMessage - -type JSONMessagePublisher struct { - m sync.RWMutex - subscribers []JSONMessageListener -} - -func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) { - p.m.Lock() - p.subscribers = append(p.subscribers, l) - p.m.Unlock() -} - -func (p *JSONMessagePublisher) SubscribersCount() int { - p.m.RLock() - count := len(p.subscribers) - p.m.RUnlock() - return count -} - -// Unsubscribe closes and removes the specified listener from the list of -// previously registed ones. -// It returns a boolean value indicating if the listener was successfully -// found, closed and unregistered. -func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool { - p.m.Lock() - defer p.m.Unlock() - - for i, subscriber := range p.subscribers { - if subscriber == l { - close(l) - p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) - return true - } - } - return false -} - -func (p *JSONMessagePublisher) Publish(m JSONMessage) { - p.m.RLock() - for _, subscriber := range p.subscribers { - // We give each subscriber a 100ms time window to receive the event, - // after which we move to the next. - select { - case subscriber <- m: - case <-time.After(100 * time.Millisecond): - } - } - p.m.RUnlock() -} diff --git a/utils/jsonmessagepublisher_test.go b/utils/jsonmessagepublisher_test.go deleted file mode 100644 index 2e1a820ca3..0000000000 --- a/utils/jsonmessagepublisher_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package utils - -import ( - "testing" - "time" -) - -func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) { - if q.SubscribersCount() != expected { - t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount()) - } -} - -func TestJSONMessagePublisherSubscription(t *testing.T) { - q := NewJSONMessagePublisher() - l1 := make(chan JSONMessage) - l2 := make(chan JSONMessage) - - assertSubscribersCount(t, q, 0) - q.Subscribe(l1) - assertSubscribersCount(t, q, 1) - q.Subscribe(l2) - assertSubscribersCount(t, q, 2) - - q.Unsubscribe(l1) - q.Unsubscribe(l2) - assertSubscribersCount(t, q, 0) -} - -func TestJSONMessagePublisherPublish(t *testing.T) { - q := NewJSONMessagePublisher() - l1 := make(chan JSONMessage) - l2 := make(chan JSONMessage) - - go func() { - for { - select { - case <-l1: - close(l1) - l1 = nil - case <-l2: - close(l2) - l2 = nil - case <-time.After(1 * time.Second): - q.Unsubscribe(l1) - q.Unsubscribe(l2) - t.Fatal("Timeout waiting for broadcasted message") - } - } - }() - - q.Subscribe(l1) - q.Subscribe(l2) - q.Publish(JSONMessage{}) -} - -func TestJSONMessagePublishTimeout(t *testing.T) { - q := NewJSONMessagePublisher() - l := make(chan JSONMessage) - q.Subscribe(l) - - c := make(chan struct{}) - go func() { - q.Publish(JSONMessage{}) - close(c) - }() - - select { - case <-c: - case <-time.After(time.Second): - t.Fatal("Timeout publishing message") - } -}