Merge pull request #7452 from shykes/7370-on-7427

Cleanup: move image management and logging out of deprecated Server
This commit is contained in:
Solomon Hykes 2014-08-06 18:47:45 -07:00
commit 7e12911f62
33 changed files with 1170 additions and 1100 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/dockerversion" "github.com/docker/docker/dockerversion"
"github.com/docker/docker/engine" "github.com/docker/docker/engine"
"github.com/docker/docker/events"
"github.com/docker/docker/pkg/parsers/kernel" "github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/registry" "github.com/docker/docker/registry"
"github.com/docker/docker/server" "github.com/docker/docker/server"
@ -20,6 +21,9 @@ func Register(eng *engine.Engine) error {
if err := remote(eng); err != nil { if err := remote(eng); err != nil {
return err return err
} }
if err := events.New().Install(eng); err != nil {
return err
}
if err := eng.Register("version", dockerVersion); err != nil { if err := eng.Register("version", dockerVersion); err != nil {
return err return err
} }

View file

@ -168,6 +168,13 @@ func (container *Container) WriteHostConfig() error {
return ioutil.WriteFile(pth, data, 0666) return ioutil.WriteFile(pth, data, 0666)
} }
func (container *Container) LogEvent(action string) {
d := container.daemon
if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.Image)).Run(); err != nil {
utils.Errorf("Error running container: %s", err)
}
}
func (container *Container) getResourcePath(path string) (string, error) { func (container *Container) getResourcePath(path string) (string, error) {
cleanPath := filepath.Join("/", path) cleanPath := filepath.Join("/", path)
return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs) return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs)
@ -508,7 +515,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
container.stdin, container.stdinPipe = io.Pipe() container.stdin, container.stdinPipe = io.Pipe()
} }
if container.daemon != nil && container.daemon.srv != nil { if container.daemon != nil && container.daemon.srv != nil {
container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) container.LogEvent("die")
} }
if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
// FIXME: here is race condition between two RUN instructions in Dockerfile // FIXME: here is race condition between two RUN instructions in Dockerfile

View file

@ -40,7 +40,7 @@ func (daemon *Daemon) ContainerCreate(job *engine.Job) engine.Status {
if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled { if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled {
job.Errorf("IPv4 forwarding is disabled.\n") job.Errorf("IPv4 forwarding is disabled.\n")
} }
job.Eng.Job("log", "create", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("create")
// FIXME: this is necessary because daemon.Create might return a nil container // FIXME: this is necessary because daemon.Create might return a nil container
// with a non-nil error. This should not happen! Once it's fixed we // with a non-nil error. This should not happen! Once it's fixed we
// can remove this workaround. // can remove this workaround.

View file

@ -107,6 +107,7 @@ type Daemon struct {
func (daemon *Daemon) Install(eng *engine.Engine) error { func (daemon *Daemon) Install(eng *engine.Engine) error {
// FIXME: rename "delete" to "rm" for consistency with the CLI command // FIXME: rename "delete" to "rm" for consistency with the CLI command
// FIXME: rename ContainerDestroy to ContainerRm for consistency with the CLI command // FIXME: rename ContainerDestroy to ContainerRm for consistency with the CLI command
// FIXME: remove ImageDelete's dependency on Daemon, then move to graph/
for name, method := range map[string]engine.Handler{ for name, method := range map[string]engine.Handler{
"attach": daemon.ContainerAttach, "attach": daemon.ContainerAttach,
"commit": daemon.ContainerCommit, "commit": daemon.ContainerCommit,
@ -127,6 +128,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error {
"top": daemon.ContainerTop, "top": daemon.ContainerTop,
"unpause": daemon.ContainerUnpause, "unpause": daemon.ContainerUnpause,
"wait": daemon.ContainerWait, "wait": daemon.ContainerWait,
"image_delete": daemon.ImageDelete, // FIXME: see above
} { } {
if err := eng.Register(name, method); err != nil { if err := eng.Register(name, method); err != nil {
return err return err

View file

@ -70,7 +70,7 @@ func (daemon *Daemon) ContainerDestroy(job *engine.Job) engine.Status {
if err := daemon.Destroy(container); err != nil { if err := daemon.Destroy(container); err != nil {
return job.Errorf("Cannot destroy container %s: %s", name, err) return job.Errorf("Cannot destroy container %s: %s", name, err)
} }
job.Eng.Job("log", "destroy", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("destroy")
if removeVolume { if removeVolume {
var ( var (

View file

@ -23,7 +23,7 @@ func (daemon *Daemon) ContainerExport(job *engine.Job) engine.Status {
return job.Errorf("%s: %s", name, err) return job.Errorf("%s: %s", name, err)
} }
// FIXME: factor job-specific LogEvent to engine.Job.Run() // FIXME: factor job-specific LogEvent to engine.Job.Run()
job.Eng.Job("log", "export", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("export")
return engine.StatusOK return engine.StatusOK
} }
return job.Errorf("No such container: %s", name) return job.Errorf("No such container: %s", name)

156
daemon/image_delete.go Normal file
View file

@ -0,0 +1,156 @@
package daemon
import (
"fmt"
"strings"
"github.com/docker/docker/engine"
"github.com/docker/docker/graph"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/utils"
)
func (daemon *Daemon) ImageDelete(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {
return job.Errorf("Usage: %s IMAGE", job.Name)
}
imgs := engine.NewTable("", 0)
if err := daemon.DeleteImage(job.Eng, job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil {
return job.Error(err)
}
if len(imgs.Data) == 0 {
return job.Errorf("Conflict, %s wasn't deleted", job.Args[0])
}
if _, err := imgs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
// FIXME: make this private and use the job instead
func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.Table, first, force, noprune bool) error {
var (
repoName, tag string
tags = []string{}
tagDeleted bool
)
// FIXME: please respect DRY and centralize repo+tag parsing in a single central place! -- shykes
repoName, tag = parsers.ParseRepositoryTag(name)
if tag == "" {
tag = graph.DEFAULTTAG
}
img, err := daemon.Repositories().LookupImage(name)
if err != nil {
if r, _ := daemon.Repositories().Get(repoName); r != nil {
return fmt.Errorf("No such image: %s:%s", repoName, tag)
}
return fmt.Errorf("No such image: %s", name)
}
if strings.Contains(img.ID, name) {
repoName = ""
tag = ""
}
byParents, err := daemon.Graph().ByParent()
if err != nil {
return err
}
//If delete by id, see if the id belong only to one repository
if repoName == "" {
for _, repoAndTag := range daemon.Repositories().ByID()[img.ID] {
parsedRepo, parsedTag := parsers.ParseRepositoryTag(repoAndTag)
if repoName == "" || repoName == parsedRepo {
repoName = parsedRepo
if parsedTag != "" {
tags = append(tags, parsedTag)
}
} else if repoName != parsedRepo && !force {
// the id belongs to multiple repos, like base:latest and user:test,
// in that case return conflict
return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name)
}
}
} else {
tags = append(tags, tag)
}
if !first && len(tags) > 0 {
return nil
}
//Untag the current image
for _, tag := range tags {
tagDeleted, err = daemon.Repositories().Delete(repoName, tag)
if err != nil {
return err
}
if tagDeleted {
out := &engine.Env{}
out.Set("Untagged", repoName+":"+tag)
imgs.Add(out)
eng.Job("log", "untag", img.ID, "").Run()
}
}
tags = daemon.Repositories().ByID()[img.ID]
if (len(tags) <= 1 && repoName == "") || len(tags) == 0 {
if len(byParents[img.ID]) == 0 {
if err := daemon.canDeleteImage(img.ID, force, tagDeleted); err != nil {
return err
}
if err := daemon.Repositories().DeleteAll(img.ID); err != nil {
return err
}
if err := daemon.Graph().Delete(img.ID); err != nil {
return err
}
out := &engine.Env{}
out.Set("Deleted", img.ID)
imgs.Add(out)
eng.Job("log", "delete", img.ID, "").Run()
if img.Parent != "" && !noprune {
err := daemon.DeleteImage(eng, img.Parent, imgs, false, force, noprune)
if first {
return err
}
}
}
}
return nil
}
func (daemon *Daemon) canDeleteImage(imgID string, force, untagged bool) error {
var message string
if untagged {
message = " (docker untagged the image)"
}
for _, container := range daemon.List() {
parent, err := daemon.Repositories().LookupImage(container.Image)
if err != nil {
return err
}
if err := parent.WalkHistory(func(p *image.Image) error {
if imgID == p.ID {
if container.State.IsRunning() {
if force {
return fmt.Errorf("Conflict, cannot force delete %s because the running container %s is using it%s, stop it and retry", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
}
return fmt.Errorf("Conflict, cannot delete %s because the running container %s is using it%s, stop it and use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
} else if !force {
return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it%s, use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
}
}
return nil
}); err != nil {
return err
}
}
return nil
}

View file

@ -44,7 +44,7 @@ func (daemon *Daemon) ContainerKill(job *engine.Job) engine.Status {
if err := container.Kill(); err != nil { if err := container.Kill(); err != nil {
return job.Errorf("Cannot kill container %s: %s", name, err) return job.Errorf("Cannot kill container %s: %s", name, err)
} }
job.Eng.Job("log", "kill", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("kill")
} else { } else {
// Otherwise, just send the requested signal // Otherwise, just send the requested signal
if err := container.KillSig(int(sig)); err != nil { if err := container.KillSig(int(sig)); err != nil {

View file

@ -16,7 +16,7 @@ func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status {
if err := container.Pause(); err != nil { if err := container.Pause(); err != nil {
return job.Errorf("Cannot pause container %s: %s", name, err) return job.Errorf("Cannot pause container %s: %s", name, err)
} }
job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("pause")
return engine.StatusOK return engine.StatusOK
} }
@ -32,6 +32,6 @@ func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status {
if err := container.Unpause(); err != nil { if err := container.Unpause(); err != nil {
return job.Errorf("Cannot unpause container %s: %s", name, err) return job.Errorf("Cannot unpause container %s: %s", name, err)
} }
job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("unpause")
return engine.StatusOK return engine.StatusOK
} }

View file

@ -19,7 +19,7 @@ func (daemon *Daemon) ContainerRestart(job *engine.Job) engine.Status {
if err := container.Restart(int(t)); err != nil { if err := container.Restart(int(t)); err != nil {
return job.Errorf("Cannot restart container %s: %s\n", name, err) return job.Errorf("Cannot restart container %s: %s\n", name, err)
} }
job.Eng.Job("log", "restart", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("restart")
} else { } else {
return job.Errorf("No such container: %s\n", name) return job.Errorf("No such container: %s\n", name)
} }

View file

@ -1,10 +1,5 @@
package daemon package daemon
import (
"github.com/docker/docker/utils"
)
type Server interface { type Server interface {
LogEvent(action, id, from string) *utils.JSONMessage
IsRunning() bool // returns true if the server is currently in operation IsRunning() bool // returns true if the server is currently in operation
} }

View file

@ -36,8 +36,7 @@ func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status {
if err := container.Start(); err != nil { if err := container.Start(); err != nil {
return job.Errorf("Cannot start container %s: %s", name, err) return job.Errorf("Cannot start container %s: %s", name, err)
} }
job.Eng.Job("log", "start", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("start")
return engine.StatusOK return engine.StatusOK
} }

View file

@ -22,7 +22,7 @@ func (daemon *Daemon) ContainerStop(job *engine.Job) engine.Status {
if err := container.Stop(int(t)); err != nil { if err := container.Stop(int(t)); err != nil {
return job.Errorf("Cannot stop container %s: %s\n", name, err) return job.Errorf("Cannot stop container %s: %s\n", name, err)
} }
job.Eng.Job("log", "stop", container.ID, daemon.Repositories().ImageName(container.Image)).Run() container.LogEvent("stop")
} else { } else {
return job.Errorf("No such container: %s\n", name) return job.Errorf("No such container: %s\n", name)
} }

176
events/events.go Normal file
View file

@ -0,0 +1,176 @@
package events
import (
"encoding/json"
"sync"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
const eventsLimit = 64
type listener chan<- *utils.JSONMessage
type Events struct {
mu sync.RWMutex
events []*utils.JSONMessage
subscribers []listener
}
func New() *Events {
return &Events{
events: make([]*utils.JSONMessage, 0, eventsLimit),
}
}
// Install installs events public api in docker engine
func (e *Events) Install(eng *engine.Engine) error {
// Here you should describe public interface
jobs := map[string]engine.Handler{
"events": e.Get,
"log": e.Log,
"subscribers_count": e.SubscribersCount,
}
for name, job := range jobs {
if err := eng.Register(name, job); err != nil {
return err
}
}
return nil
}
func (e *Events) Get(job *engine.Job) engine.Status {
var (
since = job.GetenvInt64("since")
until = job.GetenvInt64("until")
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
)
// If no until, disable timeout
if until == 0 {
timeout.Stop()
}
listener := make(chan *utils.JSONMessage)
e.subscribe(listener)
defer e.unsubscribe(listener)
job.Stdout.Write(nil)
// Resend every event in the [since, until] time interval.
if since != 0 {
if err := e.writeCurrent(job, since, until); err != nil {
return job.Error(err)
}
}
for {
select {
case event, ok := <-listener:
if !ok {
return engine.StatusOK
}
if err := writeEvent(job, event); err != nil {
return job.Error(err)
}
case <-timeout.C:
return engine.StatusOK
}
}
}
func (e *Events) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
// not waiting for receivers
go e.log(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}
func (e *Events) SubscribersCount(job *engine.Job) engine.Status {
ret := &engine.Env{}
ret.SetInt("count", e.subscribersCount())
ret.WriteTo(job.Stdout)
return engine.StatusOK
}
func writeEvent(job *engine.Job, event *utils.JSONMessage) error {
// When sending an event JSON serialization errors are ignored, but all
// other errors lead to the eviction of the listener.
if b, err := json.Marshal(event); err == nil {
if _, err = job.Stdout.Write(b); err != nil {
return err
}
}
return nil
}
func (e *Events) writeCurrent(job *engine.Job, since, until int64) error {
e.mu.RLock()
for _, event := range e.events {
if event.Time >= since && (event.Time <= until || until == 0) {
if err := writeEvent(job, event); err != nil {
e.mu.RUnlock()
return err
}
}
}
e.mu.RUnlock()
return nil
}
func (e *Events) subscribersCount() int {
e.mu.RLock()
c := len(e.subscribers)
e.mu.RUnlock()
return c
}
func (e *Events) log(action, id, from string) {
e.mu.Lock()
now := time.Now().UTC().Unix()
jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
for _, s := range e.subscribers {
// We give each subscriber a 100ms time window to receive the event,
// after which we move to the next.
select {
case s <- jm:
case <-time.After(100 * time.Millisecond):
}
}
e.mu.Unlock()
}
func (e *Events) subscribe(l listener) {
e.mu.Lock()
e.subscribers = append(e.subscribers, l)
e.mu.Unlock()
}
// unsubscribe closes and removes the specified listener from the list of
// previously registed ones.
// It returns a boolean value indicating if the listener was successfully
// found, closed and unregistered.
func (e *Events) unsubscribe(l listener) bool {
e.mu.Lock()
for i, subscriber := range e.subscribers {
if subscriber == l {
close(l)
e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)
e.mu.Unlock()
return true
}
}
e.mu.Unlock()
return false
}

154
events/events_test.go Normal file
View file

@ -0,0 +1,154 @@
package events
import (
"bytes"
"encoding/json"
"fmt"
"io"
"testing"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func TestEventsPublish(t *testing.T) {
e := New()
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
count := e.subscribersCount()
if count != 2 {
t.Fatalf("Must be 2 subscribers, got %d", count)
}
go e.log("test", "cont", "image")
select {
case msg := <-l1:
if len(e.events) != 1 {
t.Fatalf("Must be only one event, got %d", len(e.events))
}
if msg.Status != "test" {
t.Fatalf("Status should be test, got %s", msg.Status)
}
if msg.ID != "cont" {
t.Fatalf("ID should be cont, got %s", msg.ID)
}
if msg.From != "image" {
t.Fatalf("From should be image, got %s", msg.From)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for broadcasted message")
}
select {
case msg := <-l2:
if len(e.events) != 1 {
t.Fatalf("Must be only one event, got %d", len(e.events))
}
if msg.Status != "test" {
t.Fatalf("Status should be test, got %s", msg.Status)
}
if msg.ID != "cont" {
t.Fatalf("ID should be cont, got %s", msg.ID)
}
if msg.From != "image" {
t.Fatalf("From should be image, got %s", msg.From)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for broadcasted message")
}
}
func TestEventsPublishTimeout(t *testing.T) {
e := New()
l := make(chan *utils.JSONMessage)
e.subscribe(l)
c := make(chan struct{})
go func() {
e.log("test", "cont", "image")
close(c)
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Timeout publishing message")
}
}
func TestLogEvents(t *testing.T) {
e := New()
eng := engine.New()
if err := e.Install(eng); err != nil {
t.Fatal(err)
}
for i := 0; i < eventsLimit+16; i++ {
action := fmt.Sprintf("action_%d", i)
id := fmt.Sprintf("cont_%d", i)
from := fmt.Sprintf("image_%d", i)
job := eng.Job("log", action, id, from)
if err := job.Run(); err != nil {
t.Fatal(err)
}
}
time.Sleep(50 * time.Millisecond)
if len(e.events) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
}
job := eng.Job("events")
job.SetenvInt64("since", 1)
job.SetenvInt64("until", time.Now().Unix())
buf := bytes.NewBuffer(nil)
job.Stdout.Add(buf)
if err := job.Run(); err != nil {
t.Fatal(err)
}
buf = bytes.NewBuffer(buf.Bytes())
dec := json.NewDecoder(buf)
var msgs []utils.JSONMessage
for {
var jm utils.JSONMessage
if err := dec.Decode(&jm); err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
msgs = append(msgs, jm)
}
if len(msgs) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs))
}
first := msgs[0]
if first.Status != "action_16" {
t.Fatalf("First action is %s, must be action_15", first.Status)
}
last := msgs[len(msgs)-1]
if last.Status != "action_79" {
t.Fatalf("First action is %s, must be action_79", first.Status)
}
}
func TestEventsCountJob(t *testing.T) {
e := New()
eng := engine.New()
if err := e.Install(eng); err != nil {
t.Fatal(err)
}
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
job := eng.Job("subscribers_count")
env, _ := job.Stdout.AddEnv()
if err := job.Run(); err != nil {
t.Fatal(err)
}
count := env.GetInt("count")
if count != 2 {
t.Fatalf("There must be 2 subscribers, got %d", count)
}
}

147
graph/export.go Normal file
View file

@ -0,0 +1,147 @@
package graph
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path"
"github.com/docker/docker/archive"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/utils"
)
// CmdImageExport exports all images with the given tag. All versions
// containing the same tag are exported. The resulting output is an
// uncompressed tar ball.
// name is the set of tags to export.
// out is the writer where the images are written to.
func (s *TagStore) CmdImageExport(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s IMAGE\n", job.Name)
}
name := job.Args[0]
// get image json
tempdir, err := ioutil.TempDir("", "docker-export-")
if err != nil {
return job.Error(err)
}
defer os.RemoveAll(tempdir)
utils.Debugf("Serializing %s", name)
rootRepoMap := map[string]Repository{}
rootRepo, err := s.Get(name)
if err != nil {
return job.Error(err)
}
if rootRepo != nil {
// this is a base repo name, like 'busybox'
for _, id := range rootRepo {
if err := s.exportImage(job.Eng, id, tempdir); err != nil {
return job.Error(err)
}
}
rootRepoMap[name] = rootRepo
} else {
img, err := s.LookupImage(name)
if err != nil {
return job.Error(err)
}
if img != nil {
// This is a named image like 'busybox:latest'
repoName, repoTag := parsers.ParseRepositoryTag(name)
if err := s.exportImage(job.Eng, img.ID, tempdir); err != nil {
return job.Error(err)
}
// check this length, because a lookup of a truncated has will not have a tag
// and will not need to be added to this map
if len(repoTag) > 0 {
rootRepoMap[repoName] = Repository{repoTag: img.ID}
}
} else {
// this must be an ID that didn't get looked up just right?
if err := s.exportImage(job.Eng, name, tempdir); err != nil {
return job.Error(err)
}
}
}
// write repositories, if there is something to write
if len(rootRepoMap) > 0 {
rootRepoJson, _ := json.Marshal(rootRepoMap)
if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.FileMode(0644)); err != nil {
return job.Error(err)
}
} else {
utils.Debugf("There were no repositories to write")
}
fs, err := archive.Tar(tempdir, archive.Uncompressed)
if err != nil {
return job.Error(err)
}
defer fs.Close()
if _, err := io.Copy(job.Stdout, fs); err != nil {
return job.Error(err)
}
utils.Debugf("End Serializing %s", name)
return engine.StatusOK
}
// FIXME: this should be a top-level function, not a class method
func (s *TagStore) exportImage(eng *engine.Engine, name, tempdir string) error {
for n := name; n != ""; {
// temporary directory
tmpImageDir := path.Join(tempdir, n)
if err := os.Mkdir(tmpImageDir, os.FileMode(0755)); err != nil {
if os.IsExist(err) {
return nil
}
return err
}
var version = "1.0"
var versionBuf = []byte(version)
if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.FileMode(0644)); err != nil {
return err
}
// serialize json
json, err := os.Create(path.Join(tmpImageDir, "json"))
if err != nil {
return err
}
job := eng.Job("image_inspect", n)
job.SetenvBool("raw", true)
job.Stdout.Add(json)
if err := job.Run(); err != nil {
return err
}
// serialize filesystem
fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar"))
if err != nil {
return err
}
job = eng.Job("image_tarlayer", n)
job.Stdout.Add(fsTar)
if err := job.Run(); err != nil {
return err
}
// find parent
job = eng.Job("image_get", n)
info, _ := job.Stdout.AddEnv()
if err := job.Run(); err != nil {
return err
}
n = info.Get("Parent")
}
return nil
}

46
graph/history.go Normal file
View file

@ -0,0 +1,46 @@
package graph
import (
"strings"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
)
func (s *TagStore) CmdHistory(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {
return job.Errorf("Usage: %s IMAGE", job.Name)
}
name := job.Args[0]
foundImage, err := s.LookupImage(name)
if err != nil {
return job.Error(err)
}
lookupMap := make(map[string][]string)
for name, repository := range s.Repositories {
for tag, id := range repository {
// If the ID already has a reverse lookup, do not update it unless for "latest"
if _, exists := lookupMap[id]; !exists {
lookupMap[id] = []string{}
}
lookupMap[id] = append(lookupMap[id], name+":"+tag)
}
}
outs := engine.NewTable("Created", 0)
err = foundImage.WalkHistory(func(img *image.Image) error {
out := &engine.Env{}
out.Set("Id", img.ID)
out.SetInt64("Created", img.Created.Unix())
out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " "))
out.SetList("Tags", lookupMap[img.ID])
out.SetInt64("Size", img.Size)
outs.Add(out)
return nil
})
if _, err := outs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}

61
graph/import.go Normal file
View file

@ -0,0 +1,61 @@
package graph
import (
"net/http"
"net/url"
"github.com/docker/docker/archive"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func (s *TagStore) CmdImport(job *engine.Job) engine.Status {
if n := len(job.Args); n != 2 && n != 3 {
return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
}
var (
src = job.Args[0]
repo = job.Args[1]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
archive archive.ArchiveReader
resp *http.Response
)
if len(job.Args) > 2 {
tag = job.Args[2]
}
if src == "-" {
archive = job.Stdin
} else {
u, err := url.Parse(src)
if err != nil {
return job.Error(err)
}
if u.Scheme == "" {
u.Scheme = "http"
u.Host = src
u.Path = ""
}
job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
resp, err = utils.Download(u.String())
if err != nil {
return job.Error(err)
}
progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
defer progressReader.Close()
archive = progressReader
}
img, err := s.graph.Create(archive, "", "", "Imported from "+src, "", nil, nil)
if err != nil {
return job.Error(err)
}
// Optionally register the image at REPO/TAG
if repo != "" {
if err := s.Set(repo, tag, img.ID, true); err != nil {
return job.Error(err)
}
}
job.Stdout.Write(sf.FormatStatus("", img.ID))
return engine.StatusOK
}

103
graph/list.go Normal file
View file

@ -0,0 +1,103 @@
package graph
import (
"fmt"
"log"
"path"
"strings"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/parsers/filters"
)
func (s *TagStore) CmdImages(job *engine.Job) engine.Status {
var (
allImages map[string]*image.Image
err error
filt_tagged = true
)
imageFilters, err := filters.FromParam(job.Getenv("filters"))
if err != nil {
return job.Error(err)
}
if i, ok := imageFilters["dangling"]; ok {
for _, value := range i {
if strings.ToLower(value) == "true" {
filt_tagged = false
}
}
}
if job.GetenvBool("all") && filt_tagged {
allImages, err = s.graph.Map()
} else {
allImages, err = s.graph.Heads()
}
if err != nil {
return job.Error(err)
}
lookup := make(map[string]*engine.Env)
s.Lock()
for name, repository := range s.Repositories {
if job.Getenv("filter") != "" {
if match, _ := path.Match(job.Getenv("filter"), name); !match {
continue
}
}
for tag, id := range repository {
image, err := s.graph.Get(id)
if err != nil {
log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
continue
}
if out, exists := lookup[id]; exists {
if filt_tagged {
out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag)))
}
} else {
// get the boolean list for if only the untagged images are requested
delete(allImages, id)
if filt_tagged {
out := &engine.Env{}
out.Set("ParentId", image.Parent)
out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)})
out.Set("Id", image.ID)
out.SetInt64("Created", image.Created.Unix())
out.SetInt64("Size", image.Size)
out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
lookup[id] = out
}
}
}
}
s.Unlock()
outs := engine.NewTable("Created", len(lookup))
for _, value := range lookup {
outs.Add(value)
}
// Display images which aren't part of a repository/tag
if job.Getenv("filter") == "" {
for _, image := range allImages {
out := &engine.Env{}
out.Set("ParentId", image.Parent)
out.SetList("RepoTags", []string{"<none>:<none>"})
out.Set("Id", image.ID)
out.SetInt64("Created", image.Created.Unix())
out.SetInt64("Size", image.Size)
out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
outs.Add(out)
}
}
outs.ReverseSort()
if _, err := outs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}

118
graph/load.go Normal file
View file

@ -0,0 +1,118 @@
package graph
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path"
"github.com/docker/docker/archive"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/utils"
)
// Loads a set of images into the repository. This is the complementary of ImageExport.
// The input stream is an uncompressed tar ball containing images and metadata.
func (s *TagStore) CmdLoad(job *engine.Job) engine.Status {
tmpImageDir, err := ioutil.TempDir("", "docker-import-")
if err != nil {
return job.Error(err)
}
defer os.RemoveAll(tmpImageDir)
var (
repoTarFile = path.Join(tmpImageDir, "repo.tar")
repoDir = path.Join(tmpImageDir, "repo")
)
tarFile, err := os.Create(repoTarFile)
if err != nil {
return job.Error(err)
}
if _, err := io.Copy(tarFile, job.Stdin); err != nil {
return job.Error(err)
}
tarFile.Close()
repoFile, err := os.Open(repoTarFile)
if err != nil {
return job.Error(err)
}
if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
return job.Error(err)
}
if err := archive.Untar(repoFile, repoDir, nil); err != nil {
return job.Error(err)
}
dirs, err := ioutil.ReadDir(repoDir)
if err != nil {
return job.Error(err)
}
for _, d := range dirs {
if d.IsDir() {
if err := s.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil {
return job.Error(err)
}
}
}
repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
if err == nil {
repositories := map[string]Repository{}
if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
return job.Error(err)
}
for imageName, tagMap := range repositories {
for tag, address := range tagMap {
if err := s.Set(imageName, tag, address, true); err != nil {
return job.Error(err)
}
}
}
} else if !os.IsNotExist(err) {
return job.Error(err)
}
return engine.StatusOK
}
func (s *TagStore) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error {
if err := eng.Job("image_get", address).Run(); err != nil {
utils.Debugf("Loading %s", address)
imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
if err != nil {
utils.Debugf("Error reading json", err)
return err
}
layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
if err != nil {
utils.Debugf("Error reading embedded tar", err)
return err
}
img, err := image.NewImgJSON(imageJson)
if err != nil {
utils.Debugf("Error unmarshalling json", err)
return err
}
if img.Parent != "" {
if !s.graph.Exists(img.Parent) {
if err := s.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil {
return err
}
}
}
if err := s.graph.Register(imageJson, layer, img); err != nil {
return err
}
}
utils.Debugf("Completed processing %s", address)
return nil
}

View file

@ -1,20 +1,33 @@
package graph package graph
import ( import (
"fmt"
"io" "io"
"github.com/docker/docker/engine" "github.com/docker/docker/engine"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
func (s *TagStore) Install(eng *engine.Engine) error { func (s *TagStore) Install(eng *engine.Engine) error {
eng.Register("image_set", s.CmdSet) for name, handler := range map[string]engine.Handler{
eng.Register("image_tag", s.CmdTag) "image_set": s.CmdSet,
eng.Register("image_get", s.CmdGet) "image_tag": s.CmdTag,
eng.Register("image_inspect", s.CmdLookup) "tag": s.CmdTagLegacy, // FIXME merge with "image_tag"
eng.Register("image_tarlayer", s.CmdTarLayer) "image_get": s.CmdGet,
"image_inspect": s.CmdLookup,
"image_tarlayer": s.CmdTarLayer,
"image_export": s.CmdImageExport,
"history": s.CmdHistory,
"images": s.CmdImages,
"viz": s.CmdViz,
"load": s.CmdLoad,
"import": s.CmdImport,
} {
if err := eng.Register(name, handler); err != nil {
return fmt.Errorf("Could not register %q: %v", name, err)
}
}
return nil return nil
} }
@ -65,29 +78,6 @@ func (s *TagStore) CmdSet(job *engine.Job) engine.Status {
return engine.StatusOK return engine.StatusOK
} }
// CmdTag assigns a new name and tag to an existing image. If the tag already exists,
// it is changed and the image previously referenced by the tag loses that reference.
// This may cause the old image to be garbage-collected if its reference count reaches zero.
//
// Syntax: image_tag NEWNAME OLDNAME
// Example: image_tag shykes/myapp:latest shykes/myapp:1.42.0
func (s *TagStore) CmdTag(job *engine.Job) engine.Status {
if len(job.Args) != 2 {
return job.Errorf("usage: %s NEWNAME OLDNAME", job.Name)
}
var (
newName = job.Args[0]
oldName = job.Args[1]
)
newRepo, newTag := parsers.ParseRepositoryTag(newName)
// FIXME: Set should either parse both old and new name, or neither.
// the current prototype is inconsistent.
if err := s.Set(newRepo, newTag, oldName, true); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
// CmdGet returns information about an image. // CmdGet returns information about an image.
// If the image doesn't exist, an empty object is returned, to allow // If the image doesn't exist, an empty object is returned, to allow
// checking for an image's existence. // checking for an image's existence.

44
graph/tag.go Normal file
View file

@ -0,0 +1,44 @@
package graph
import (
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/parsers"
)
// CmdTag assigns a new name and tag to an existing image. If the tag already exists,
// it is changed and the image previously referenced by the tag loses that reference.
// This may cause the old image to be garbage-collected if its reference count reaches zero.
//
// Syntax: image_tag NEWNAME OLDNAME
// Example: image_tag shykes/myapp:latest shykes/myapp:1.42.0
func (s *TagStore) CmdTag(job *engine.Job) engine.Status {
if len(job.Args) != 2 {
return job.Errorf("usage: %s NEWNAME OLDNAME", job.Name)
}
var (
newName = job.Args[0]
oldName = job.Args[1]
)
newRepo, newTag := parsers.ParseRepositoryTag(newName)
// FIXME: Set should either parse both old and new name, or neither.
// the current prototype is inconsistent.
if err := s.Set(newRepo, newTag, oldName, true); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
// FIXME: merge into CmdTag above, and merge "image_tag" and "tag" into a single job.
func (s *TagStore) CmdTagLegacy(job *engine.Job) engine.Status {
if len(job.Args) != 2 && len(job.Args) != 3 {
return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name)
}
var tag string
if len(job.Args) == 3 {
tag = job.Args[2]
}
if err := s.Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil {
return job.Error(err)
}
return engine.StatusOK
}

38
graph/viz.go Normal file
View file

@ -0,0 +1,38 @@
package graph
import (
"strings"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
)
func (s *TagStore) CmdViz(job *engine.Job) engine.Status {
images, _ := s.graph.Map()
if images == nil {
return engine.StatusOK
}
job.Stdout.Write([]byte("digraph docker {\n"))
var (
parentImage *image.Image
err error
)
for _, image := range images {
parentImage, err = image.GetParent()
if err != nil {
return job.Errorf("Error while getting parent image: %v", err)
}
if parentImage != nil {
job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n"))
} else {
job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n"))
}
}
for id, repos := range s.GetRepoRefs() {
job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
}
job.Stdout.Write([]byte(" base [style=invisible]\n}\n"))
return engine.StatusOK
}

View file

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"os/exec" "os/exec"
"strings"
"testing" "testing"
) )
@ -92,6 +93,69 @@ func TestRemoveContainerWithStopAndKill(t *testing.T) {
logDone("rm - with --stop=true and --kill=true") logDone("rm - with --stop=true and --kill=true")
} }
func TestContainerOrphaning(t *testing.T) {
dockerfile1 := `FROM busybox:latest
ENTRYPOINT ["/bin/true"]`
img := "test-container-orphaning"
dockerfile2 := `FROM busybox:latest
ENTRYPOINT ["/bin/true"]
MAINTAINER Integration Tests`
// build first dockerfile
img1, err := buildImage(img, dockerfile1, true)
if err != nil {
t.Fatalf("Could not build image %s: %v", img, err)
}
// run container on first image
if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "run", img)); err != nil {
t.Fatalf("Could not run image %s: %v: %s", img, err, out)
}
// rebuild dockerfile with a small addition at the end
if _, err := buildImage(img, dockerfile2, true); err != nil {
t.Fatalf("Could not rebuild image %s: %v", img, err)
}
// try to remove the image, should error out.
if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "rmi", img)); err == nil {
t.Fatalf("Expected to error out removing the image, but succeeded: %s", out)
}
// check if we deleted the first image
out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "images", "-q", "--no-trunc"))
if err != nil {
t.Fatalf("%v: %s", err, out)
}
if !strings.Contains(out, img1) {
t.Fatal("Orphaned container (could not find '%s' in docker images): %s", img1, out)
}
deleteAllContainers()
logDone("rm - container orphaning")
}
func TestDeleteTagWithExistingContainers(t *testing.T) {
container := "test-delete-tag"
newtag := "busybox:newtag"
bb := "busybox:latest"
if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "tag", bb, newtag)); err != nil {
t.Fatalf("Could not tag busybox: %v: %s", err, out)
}
if out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "run", "--name", container, bb, "/bin/true")); err != nil {
t.Fatalf("Could not run busybox: %v: %s", err, out)
}
out, _, err := runCommandWithOutput(exec.Command(dockerBinary, "rmi", newtag))
if err != nil {
t.Fatalf("Could not remove tag %s: %v: %s", newtag, err, out)
}
if d := strings.Count(out, "Untagged: "); d != 1 {
t.Fatalf("Expected 1 untagged entry got %d: %q", d, out)
}
deleteAllContainers()
logDone("rm - delete tag with existing containers")
}
func createRunningContainer(t *testing.T, name string) { func createRunningContainer(t *testing.T, name string) {
cmd := exec.Command(dockerBinary, "run", "-dt", "--name", name, "busybox", "top") cmd := exec.Command(dockerBinary, "run", "-dt", "--name", name, "busybox", "top")
if _, err := runCommand(cmd); err != nil { if _, err := runCommand(cmd); err != nil {

View file

@ -13,7 +13,6 @@ import (
"github.com/docker/docker/api/client" "github.com/docker/docker/api/client"
"github.com/docker/docker/daemon" "github.com/docker/docker/daemon"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/term" "github.com/docker/docker/pkg/term"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
@ -682,70 +681,3 @@ func TestRunCidFileCleanupIfEmpty(t *testing.T) {
<-c <-c
}) })
} }
func TestContainerOrphaning(t *testing.T) {
// setup a temporary directory
tmpDir, err := ioutil.TempDir("", "project")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// setup a CLI and server
cli := client.NewDockerCli(nil, ioutil.Discard, ioutil.Discard, testDaemonProto, testDaemonAddr, nil)
defer cleanup(globalEngine, t)
srv := mkServerFromEngine(globalEngine, t)
// closure to build something
buildSomething := func(template string, image string) string {
dockerfile := path.Join(tmpDir, "Dockerfile")
replacer := strings.NewReplacer("{IMAGE}", unitTestImageID)
contents := replacer.Replace(template)
ioutil.WriteFile(dockerfile, []byte(contents), 0x777)
if err := cli.CmdBuild("-t", image, tmpDir); err != nil {
t.Fatal(err)
}
job := globalEngine.Job("image_get", image)
info, _ := job.Stdout.AddEnv()
if err := job.Run(); err != nil {
t.Fatal(err)
}
return info.Get("Id")
}
// build an image
imageName := "orphan-test"
template1 := `
from {IMAGE}
cmd ["/bin/echo", "holla"]
`
img1 := buildSomething(template1, imageName)
// create a container using the fist image
if err := cli.CmdRun(imageName); err != nil {
t.Fatal(err)
}
// build a new image that splits lineage
template2 := `
from {IMAGE}
cmd ["/bin/echo", "holla"]
expose 22
`
buildSomething(template2, imageName)
// remove the second image by name
resp := engine.NewTable("", 0)
if err := srv.DeleteImage(imageName, resp, true, false, false); err == nil {
t.Fatal("Expected error, got none")
}
// see if we deleted the first image (and orphaned the container)
for _, i := range resp.Data {
if img1 == i.Get("Deleted") {
t.Fatal("Orphaned image with container")
}
}
}

View file

@ -297,56 +297,3 @@ func TestImagesFilter(t *testing.T) {
t.Fatal("incorrect number of matches returned") t.Fatal("incorrect number of matches returned")
} }
} }
// Regression test for being able to untag an image with an existing
// container
func TestDeleteTagWithExistingContainers(t *testing.T) {
eng := NewTestEngine(t)
defer nuke(mkDaemonFromEngine(eng, t))
srv := mkServerFromEngine(eng, t)
// Tag the image
if err := eng.Job("tag", unitTestImageID, "utest", "tag1").Run(); err != nil {
t.Fatal(err)
}
// Create a container from the image
config, _, _, err := runconfig.Parse([]string{unitTestImageID, "echo test"}, nil)
if err != nil {
t.Fatal(err)
}
id := createNamedTestContainer(eng, config, t, "testingtags")
if id == "" {
t.Fatal("No id returned")
}
job := srv.Eng.Job("containers")
job.SetenvBool("all", true)
outs, err := job.Stdout.AddListTable()
if err != nil {
t.Fatal(err)
}
if err := job.Run(); err != nil {
t.Fatal(err)
}
if len(outs.Data) != 1 {
t.Fatalf("Expected 1 container got %d", len(outs.Data))
}
// Try to remove the tag
imgs := engine.NewTable("", 0)
if err := srv.DeleteImage("utest:tag1", imgs, true, false, false); err != nil {
t.Fatal(err)
}
if len(imgs.Data) != 1 {
t.Fatalf("Should only have deleted one untag %d", len(imgs.Data))
}
if untag := imgs.Data[0].Get("Untagged"); untag != "utest:tag1" {
t.Fatalf("Expected %s got %s", unitTestImageID, untag)
}
}

View file

@ -1,108 +0,0 @@
// DEPRECATION NOTICE. PLEASE DO NOT ADD ANYTHING TO THIS FILE.
//
// For additional commments see server/server.go
//
package server
import (
"encoding/json"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func (srv *Server) Events(job *engine.Job) engine.Status {
if len(job.Args) != 0 {
return job.Errorf("Usage: %s", job.Name)
}
var (
since = job.GetenvInt64("since")
until = job.GetenvInt64("until")
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
)
// If no until, disable timeout
if until == 0 {
timeout.Stop()
}
listener := make(chan utils.JSONMessage)
srv.eventPublisher.Subscribe(listener)
defer srv.eventPublisher.Unsubscribe(listener)
// When sending an event JSON serialization errors are ignored, but all
// other errors lead to the eviction of the listener.
sendEvent := func(event *utils.JSONMessage) error {
if b, err := json.Marshal(event); err == nil {
if _, err = job.Stdout.Write(b); err != nil {
return err
}
}
return nil
}
job.Stdout.Write(nil)
// Resend every event in the [since, until] time interval.
if since != 0 {
for _, event := range srv.GetEvents() {
if event.Time >= since && (event.Time <= until || until == 0) {
if err := sendEvent(&event); err != nil {
return job.Error(err)
}
}
}
}
for {
select {
case event, ok := <-listener:
if !ok {
return engine.StatusOK
}
if err := sendEvent(&event); err != nil {
return job.Error(err)
}
case <-timeout.C:
return engine.StatusOK
}
}
}
// FIXME: this is a shim to allow breaking up other parts of Server without
// dragging the sphagetti dependency along.
func (srv *Server) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
srv.LogEvent(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}
func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
now := time.Now().UTC().Unix()
jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
srv.AddEvent(jm)
srv.eventPublisher.Publish(jm)
return &jm
}
func (srv *Server) AddEvent(jm utils.JSONMessage) {
srv.Lock()
if len(srv.events) == cap(srv.events) {
// discard oldest event
copy(srv.events, srv.events[1:])
srv.events[len(srv.events)-1] = jm
} else {
srv.events = append(srv.events, jm)
}
srv.Unlock()
}
func (srv *Server) GetEvents() []utils.JSONMessage {
srv.RLock()
defer srv.RUnlock()
return srv.events
}

View file

@ -5,13 +5,10 @@
package server package server
import ( import (
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"net" "net"
"net/http"
"net/url" "net/url"
"os" "os"
"os/exec" "os/exec"
@ -22,146 +19,12 @@ import (
"github.com/docker/docker/archive" "github.com/docker/docker/archive"
"github.com/docker/docker/builder" "github.com/docker/docker/builder"
"github.com/docker/docker/engine" "github.com/docker/docker/engine"
"github.com/docker/docker/graph"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/registry" "github.com/docker/docker/registry"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
// ImageExport exports all images with the given tag. All versions
// containing the same tag are exported. The resulting output is an
// uncompressed tar ball.
// name is the set of tags to export.
// out is the writer where the images are written to.
func (srv *Server) ImageExport(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s IMAGE\n", job.Name)
}
name := job.Args[0]
// get image json
tempdir, err := ioutil.TempDir("", "docker-export-")
if err != nil {
return job.Error(err)
}
defer os.RemoveAll(tempdir)
utils.Debugf("Serializing %s", name)
rootRepoMap := map[string]graph.Repository{}
rootRepo, err := srv.daemon.Repositories().Get(name)
if err != nil {
return job.Error(err)
}
if rootRepo != nil {
// this is a base repo name, like 'busybox'
for _, id := range rootRepo {
if err := srv.exportImage(job.Eng, id, tempdir); err != nil {
return job.Error(err)
}
}
rootRepoMap[name] = rootRepo
} else {
img, err := srv.daemon.Repositories().LookupImage(name)
if err != nil {
return job.Error(err)
}
if img != nil {
// This is a named image like 'busybox:latest'
repoName, repoTag := parsers.ParseRepositoryTag(name)
if err := srv.exportImage(job.Eng, img.ID, tempdir); err != nil {
return job.Error(err)
}
// check this length, because a lookup of a truncated has will not have a tag
// and will not need to be added to this map
if len(repoTag) > 0 {
rootRepoMap[repoName] = graph.Repository{repoTag: img.ID}
}
} else {
// this must be an ID that didn't get looked up just right?
if err := srv.exportImage(job.Eng, name, tempdir); err != nil {
return job.Error(err)
}
}
}
// write repositories, if there is something to write
if len(rootRepoMap) > 0 {
rootRepoJson, _ := json.Marshal(rootRepoMap)
if err := ioutil.WriteFile(path.Join(tempdir, "repositories"), rootRepoJson, os.FileMode(0644)); err != nil {
return job.Error(err)
}
} else {
utils.Debugf("There were no repositories to write")
}
fs, err := archive.Tar(tempdir, archive.Uncompressed)
if err != nil {
return job.Error(err)
}
defer fs.Close()
if _, err := io.Copy(job.Stdout, fs); err != nil {
return job.Error(err)
}
utils.Debugf("End Serializing %s", name)
return engine.StatusOK
}
func (srv *Server) exportImage(eng *engine.Engine, name, tempdir string) error {
for n := name; n != ""; {
// temporary directory
tmpImageDir := path.Join(tempdir, n)
if err := os.Mkdir(tmpImageDir, os.FileMode(0755)); err != nil {
if os.IsExist(err) {
return nil
}
return err
}
var version = "1.0"
var versionBuf = []byte(version)
if err := ioutil.WriteFile(path.Join(tmpImageDir, "VERSION"), versionBuf, os.FileMode(0644)); err != nil {
return err
}
// serialize json
json, err := os.Create(path.Join(tmpImageDir, "json"))
if err != nil {
return err
}
job := eng.Job("image_inspect", n)
job.SetenvBool("raw", true)
job.Stdout.Add(json)
if err := job.Run(); err != nil {
return err
}
// serialize filesystem
fsTar, err := os.Create(path.Join(tmpImageDir, "layer.tar"))
if err != nil {
return err
}
job = eng.Job("image_tarlayer", n)
job.Stdout.Add(fsTar)
if err := job.Run(); err != nil {
return err
}
// find parent
job = eng.Job("image_get", n)
info, _ := job.Stdout.AddEnv()
if err := job.Run(); err != nil {
return err
}
n = info.Get("Parent")
}
return nil
}
func (srv *Server) Build(job *engine.Job) engine.Status { func (srv *Server) Build(job *engine.Job) engine.Status {
if len(job.Args) != 0 { if len(job.Args) != 0 {
return job.Errorf("Usage: %s\n", job.Name) return job.Errorf("Usage: %s\n", job.Name)
@ -242,282 +105,6 @@ func (srv *Server) Build(job *engine.Job) engine.Status {
return engine.StatusOK return engine.StatusOK
} }
// Loads a set of images into the repository. This is the complementary of ImageExport.
// The input stream is an uncompressed tar ball containing images and metadata.
func (srv *Server) ImageLoad(job *engine.Job) engine.Status {
tmpImageDir, err := ioutil.TempDir("", "docker-import-")
if err != nil {
return job.Error(err)
}
defer os.RemoveAll(tmpImageDir)
var (
repoTarFile = path.Join(tmpImageDir, "repo.tar")
repoDir = path.Join(tmpImageDir, "repo")
)
tarFile, err := os.Create(repoTarFile)
if err != nil {
return job.Error(err)
}
if _, err := io.Copy(tarFile, job.Stdin); err != nil {
return job.Error(err)
}
tarFile.Close()
repoFile, err := os.Open(repoTarFile)
if err != nil {
return job.Error(err)
}
if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
return job.Error(err)
}
if err := archive.Untar(repoFile, repoDir, nil); err != nil {
return job.Error(err)
}
dirs, err := ioutil.ReadDir(repoDir)
if err != nil {
return job.Error(err)
}
for _, d := range dirs {
if d.IsDir() {
if err := srv.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil {
return job.Error(err)
}
}
}
repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
if err == nil {
repositories := map[string]graph.Repository{}
if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
return job.Error(err)
}
for imageName, tagMap := range repositories {
for tag, address := range tagMap {
if err := srv.daemon.Repositories().Set(imageName, tag, address, true); err != nil {
return job.Error(err)
}
}
}
} else if !os.IsNotExist(err) {
return job.Error(err)
}
return engine.StatusOK
}
func (srv *Server) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error {
if err := eng.Job("image_get", address).Run(); err != nil {
utils.Debugf("Loading %s", address)
imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
if err != nil {
utils.Debugf("Error reading json", err)
return err
}
layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
if err != nil {
utils.Debugf("Error reading embedded tar", err)
return err
}
img, err := image.NewImgJSON(imageJson)
if err != nil {
utils.Debugf("Error unmarshalling json", err)
return err
}
if img.Parent != "" {
if !srv.daemon.Graph().Exists(img.Parent) {
if err := srv.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil {
return err
}
}
}
if err := srv.daemon.Graph().Register(imageJson, layer, img); err != nil {
return err
}
}
utils.Debugf("Completed processing %s", address)
return nil
}
func (srv *Server) ImagesViz(job *engine.Job) engine.Status {
images, _ := srv.daemon.Graph().Map()
if images == nil {
return engine.StatusOK
}
job.Stdout.Write([]byte("digraph docker {\n"))
var (
parentImage *image.Image
err error
)
for _, image := range images {
parentImage, err = image.GetParent()
if err != nil {
return job.Errorf("Error while getting parent image: %v", err)
}
if parentImage != nil {
job.Stdout.Write([]byte(" \"" + parentImage.ID + "\" -> \"" + image.ID + "\"\n"))
} else {
job.Stdout.Write([]byte(" base -> \"" + image.ID + "\" [style=invis]\n"))
}
}
for id, repos := range srv.daemon.Repositories().GetRepoRefs() {
job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
}
job.Stdout.Write([]byte(" base [style=invisible]\n}\n"))
return engine.StatusOK
}
func (srv *Server) Images(job *engine.Job) engine.Status {
var (
allImages map[string]*image.Image
err error
filt_tagged = true
)
imageFilters, err := filters.FromParam(job.Getenv("filters"))
if err != nil {
return job.Error(err)
}
if i, ok := imageFilters["dangling"]; ok {
for _, value := range i {
if strings.ToLower(value) == "true" {
filt_tagged = false
}
}
}
if job.GetenvBool("all") && filt_tagged {
allImages, err = srv.daemon.Graph().Map()
} else {
allImages, err = srv.daemon.Graph().Heads()
}
if err != nil {
return job.Error(err)
}
lookup := make(map[string]*engine.Env)
srv.daemon.Repositories().Lock()
for name, repository := range srv.daemon.Repositories().Repositories {
if job.Getenv("filter") != "" {
if match, _ := path.Match(job.Getenv("filter"), name); !match {
continue
}
}
for tag, id := range repository {
image, err := srv.daemon.Graph().Get(id)
if err != nil {
log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
continue
}
if out, exists := lookup[id]; exists {
if filt_tagged {
out.SetList("RepoTags", append(out.GetList("RepoTags"), fmt.Sprintf("%s:%s", name, tag)))
}
} else {
// get the boolean list for if only the untagged images are requested
delete(allImages, id)
if filt_tagged {
out := &engine.Env{}
out.Set("ParentId", image.Parent)
out.SetList("RepoTags", []string{fmt.Sprintf("%s:%s", name, tag)})
out.Set("Id", image.ID)
out.SetInt64("Created", image.Created.Unix())
out.SetInt64("Size", image.Size)
out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
lookup[id] = out
}
}
}
}
srv.daemon.Repositories().Unlock()
outs := engine.NewTable("Created", len(lookup))
for _, value := range lookup {
outs.Add(value)
}
// Display images which aren't part of a repository/tag
if job.Getenv("filter") == "" {
for _, image := range allImages {
out := &engine.Env{}
out.Set("ParentId", image.Parent)
out.SetList("RepoTags", []string{"<none>:<none>"})
out.Set("Id", image.ID)
out.SetInt64("Created", image.Created.Unix())
out.SetInt64("Size", image.Size)
out.SetInt64("VirtualSize", image.GetParentsSize(0)+image.Size)
outs.Add(out)
}
}
outs.ReverseSort()
if _, err := outs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
func (srv *Server) ImageHistory(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {
return job.Errorf("Usage: %s IMAGE", job.Name)
}
name := job.Args[0]
foundImage, err := srv.daemon.Repositories().LookupImage(name)
if err != nil {
return job.Error(err)
}
lookupMap := make(map[string][]string)
for name, repository := range srv.daemon.Repositories().Repositories {
for tag, id := range repository {
// If the ID already has a reverse lookup, do not update it unless for "latest"
if _, exists := lookupMap[id]; !exists {
lookupMap[id] = []string{}
}
lookupMap[id] = append(lookupMap[id], name+":"+tag)
}
}
outs := engine.NewTable("Created", 0)
err = foundImage.WalkHistory(func(img *image.Image) error {
out := &engine.Env{}
out.Set("Id", img.ID)
out.SetInt64("Created", img.Created.Unix())
out.Set("CreatedBy", strings.Join(img.ContainerConfig.Cmd, " "))
out.SetList("Tags", lookupMap[img.ID])
out.SetInt64("Size", img.Size)
outs.Add(out)
return nil
})
if _, err := outs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
func (srv *Server) ImageTag(job *engine.Job) engine.Status {
if len(job.Args) != 2 && len(job.Args) != 3 {
return job.Errorf("Usage: %s IMAGE REPOSITORY [TAG]\n", job.Name)
}
var tag string
if len(job.Args) == 3 {
tag = job.Args[2]
}
if err := srv.daemon.Repositories().Set(job.Args[1], tag, job.Args[0], job.GetenvBool("force")); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error { func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
history, err := r.GetRemoteHistory(imgID, endpoint, token) history, err := r.GetRemoteHistory(imgID, endpoint, token)
if err != nil { if err != nil {
@ -1038,198 +625,6 @@ func (srv *Server) ImagePush(job *engine.Job) engine.Status {
return engine.StatusOK return engine.StatusOK
} }
func (srv *Server) ImageImport(job *engine.Job) engine.Status {
if n := len(job.Args); n != 2 && n != 3 {
return job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
}
var (
src = job.Args[0]
repo = job.Args[1]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
archive archive.ArchiveReader
resp *http.Response
)
if len(job.Args) > 2 {
tag = job.Args[2]
}
if src == "-" {
archive = job.Stdin
} else {
u, err := url.Parse(src)
if err != nil {
return job.Error(err)
}
if u.Scheme == "" {
u.Scheme = "http"
u.Host = src
u.Path = ""
}
job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
resp, err = utils.Download(u.String())
if err != nil {
return job.Error(err)
}
progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
defer progressReader.Close()
archive = progressReader
}
img, err := srv.daemon.Graph().Create(archive, "", "", "Imported from "+src, "", nil, nil)
if err != nil {
return job.Error(err)
}
// Optionally register the image at REPO/TAG
if repo != "" {
if err := srv.daemon.Repositories().Set(repo, tag, img.ID, true); err != nil {
return job.Error(err)
}
}
job.Stdout.Write(sf.FormatStatus("", img.ID))
return engine.StatusOK
}
func (srv *Server) DeleteImage(name string, imgs *engine.Table, first, force, noprune bool) error {
var (
repoName, tag string
tags = []string{}
tagDeleted bool
)
repoName, tag = parsers.ParseRepositoryTag(name)
if tag == "" {
tag = graph.DEFAULTTAG
}
img, err := srv.daemon.Repositories().LookupImage(name)
if err != nil {
if r, _ := srv.daemon.Repositories().Get(repoName); r != nil {
return fmt.Errorf("No such image: %s:%s", repoName, tag)
}
return fmt.Errorf("No such image: %s", name)
}
if strings.Contains(img.ID, name) {
repoName = ""
tag = ""
}
byParents, err := srv.daemon.Graph().ByParent()
if err != nil {
return err
}
//If delete by id, see if the id belong only to one repository
if repoName == "" {
for _, repoAndTag := range srv.daemon.Repositories().ByID()[img.ID] {
parsedRepo, parsedTag := parsers.ParseRepositoryTag(repoAndTag)
if repoName == "" || repoName == parsedRepo {
repoName = parsedRepo
if parsedTag != "" {
tags = append(tags, parsedTag)
}
} else if repoName != parsedRepo && !force {
// the id belongs to multiple repos, like base:latest and user:test,
// in that case return conflict
return fmt.Errorf("Conflict, cannot delete image %s because it is tagged in multiple repositories, use -f to force", name)
}
}
} else {
tags = append(tags, tag)
}
if !first && len(tags) > 0 {
return nil
}
//Untag the current image
for _, tag := range tags {
tagDeleted, err = srv.daemon.Repositories().Delete(repoName, tag)
if err != nil {
return err
}
if tagDeleted {
out := &engine.Env{}
out.Set("Untagged", repoName+":"+tag)
imgs.Add(out)
srv.LogEvent("untag", img.ID, "")
}
}
tags = srv.daemon.Repositories().ByID()[img.ID]
if (len(tags) <= 1 && repoName == "") || len(tags) == 0 {
if len(byParents[img.ID]) == 0 {
if err := srv.canDeleteImage(img.ID, force, tagDeleted); err != nil {
return err
}
if err := srv.daemon.Repositories().DeleteAll(img.ID); err != nil {
return err
}
if err := srv.daemon.Graph().Delete(img.ID); err != nil {
return err
}
out := &engine.Env{}
out.Set("Deleted", img.ID)
imgs.Add(out)
srv.LogEvent("delete", img.ID, "")
if img.Parent != "" && !noprune {
err := srv.DeleteImage(img.Parent, imgs, false, force, noprune)
if first {
return err
}
}
}
}
return nil
}
func (srv *Server) ImageDelete(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {
return job.Errorf("Usage: %s IMAGE", job.Name)
}
imgs := engine.NewTable("", 0)
if err := srv.DeleteImage(job.Args[0], imgs, true, job.GetenvBool("force"), job.GetenvBool("noprune")); err != nil {
return job.Error(err)
}
if len(imgs.Data) == 0 {
return job.Errorf("Conflict, %s wasn't deleted", job.Args[0])
}
if _, err := imgs.WriteListTo(job.Stdout); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
func (srv *Server) canDeleteImage(imgID string, force, untagged bool) error {
var message string
if untagged {
message = " (docker untagged the image)"
}
for _, container := range srv.daemon.List() {
parent, err := srv.daemon.Repositories().LookupImage(container.Image)
if err != nil {
return err
}
if err := parent.WalkHistory(func(p *image.Image) error {
if imgID == p.ID {
if container.State.IsRunning() {
if force {
return fmt.Errorf("Conflict, cannot force delete %s because the running container %s is using it%s, stop it and retry", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
}
return fmt.Errorf("Conflict, cannot delete %s because the running container %s is using it%s, stop it and use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
} else if !force {
return fmt.Errorf("Conflict, cannot delete %s because the container %s is using it%s, use -f to force", utils.TruncateID(imgID), utils.TruncateID(container.ID), message)
}
}
return nil
}); err != nil {
return err
}
}
return nil
}
func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) { func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
srv.Lock() srv.Lock()
defer srv.Unlock() defer srv.Unlock()

View file

@ -86,20 +86,10 @@ func InitServer(job *engine.Job) engine.Status {
job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon) job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon)
for name, handler := range map[string]engine.Handler{ for name, handler := range map[string]engine.Handler{
"tag": srv.ImageTag, // FIXME merge with "image_tag" "info": srv.DockerInfo,
"info": srv.DockerInfo, "build": srv.Build,
"image_export": srv.ImageExport, "pull": srv.ImagePull,
"images": srv.Images, "push": srv.ImagePush,
"history": srv.ImageHistory,
"viz": srv.ImagesViz,
"log": srv.Log,
"load": srv.ImageLoad,
"build": srv.Build,
"pull": srv.ImagePull,
"import": srv.ImageImport,
"image_delete": srv.ImageDelete,
"events": srv.Events,
"push": srv.ImagePush,
} { } {
if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil { if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil {
return job.Error(err) return job.Error(err)
@ -125,12 +115,10 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
return nil, err return nil, err
} }
srv := &Server{ srv := &Server{
Eng: eng, Eng: eng,
daemon: daemon, daemon: daemon,
pullingPool: make(map[string]chan struct{}), pullingPool: make(map[string]chan struct{}),
pushingPool: make(map[string]chan struct{}), pushingPool: make(map[string]chan struct{}),
events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
eventPublisher: utils.NewJSONMessagePublisher(),
} }
daemon.SetServer(srv) daemon.SetServer(srv)
return srv, nil return srv, nil

View file

@ -67,6 +67,11 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
initPath = srv.daemon.SystemInitPath() initPath = srv.daemon.SystemInitPath()
} }
cjob := job.Eng.Job("subscribers_count")
env, _ := cjob.Stdout.AddEnv()
if err := cjob.Run(); err != nil {
return job.Error(err)
}
v := &engine.Env{} v := &engine.Env{}
v.SetInt("Containers", len(srv.daemon.List())) v.SetInt("Containers", len(srv.daemon.List()))
v.SetInt("Images", imgcount) v.SetInt("Images", imgcount)
@ -79,7 +84,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
v.SetInt("NFd", utils.GetTotalUsedFds()) v.SetInt("NFd", utils.GetTotalUsedFds())
v.SetInt("NGoroutines", runtime.NumGoroutine()) v.SetInt("NGoroutines", runtime.NumGoroutine())
v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name()) v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name())
v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount()) v.SetInt("NEventsListener", env.GetInt("count"))
v.Set("KernelVersion", kernelVersion) v.Set("KernelVersion", kernelVersion)
v.Set("OperatingSystem", operatingSystem) v.Set("OperatingSystem", operatingSystem)
v.Set("IndexServerAddress", registry.IndexServerAddress()) v.Set("IndexServerAddress", registry.IndexServerAddress())
@ -128,12 +133,10 @@ func (srv *Server) Close() error {
type Server struct { type Server struct {
sync.RWMutex sync.RWMutex
daemon *daemon.Daemon daemon *daemon.Daemon
pullingPool map[string]chan struct{} pullingPool map[string]chan struct{}
pushingPool map[string]chan struct{} pushingPool map[string]chan struct{}
events []utils.JSONMessage Eng *engine.Engine
eventPublisher *utils.JSONMessagePublisher running bool
Eng *engine.Engine tasks sync.WaitGroup
running bool
tasks sync.WaitGroup
} }

View file

@ -1,11 +1,6 @@
package server package server
import ( import "testing"
"testing"
"time"
"github.com/docker/docker/utils"
)
func TestPools(t *testing.T) { func TestPools(t *testing.T) {
srv := &Server{ srv := &Server{
@ -44,55 +39,3 @@ func TestPools(t *testing.T) {
t.Fatalf("Expected `Unknown pool type`") t.Fatalf("Expected `Unknown pool type`")
} }
} }
func TestLogEvent(t *testing.T) {
srv := &Server{
events: make([]utils.JSONMessage, 0, 64),
eventPublisher: utils.NewJSONMessagePublisher(),
}
srv.LogEvent("fakeaction", "fakeid", "fakeimage")
listener := make(chan utils.JSONMessage)
srv.eventPublisher.Subscribe(listener)
srv.LogEvent("fakeaction2", "fakeid", "fakeimage")
numEvents := len(srv.GetEvents())
if numEvents != 2 {
t.Fatalf("Expected 2 events, found %d", numEvents)
}
go func() {
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction3", "fakeid", "fakeimage")
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction4", "fakeid", "fakeimage")
}()
setTimeout(t, "Listening for events timed out", 2*time.Second, func() {
for i := 2; i < 4; i++ {
event := <-listener
if event != srv.GetEvents()[i] {
t.Fatalf("Event received it different than expected")
}
}
})
}
// FIXME: this is duplicated from integration/commands_test.go
func setTimeout(t *testing.T, msg string, d time.Duration, f func()) {
c := make(chan bool)
// Make sure we are not too long
go func() {
time.Sleep(d)
c <- true
}()
go func() {
f()
c <- false
}()
if <-c && msg != "" {
t.Fatal(msg)
}
}

View file

@ -1,61 +0,0 @@
package utils
import (
"sync"
"time"
)
func NewJSONMessagePublisher() *JSONMessagePublisher {
return &JSONMessagePublisher{}
}
type JSONMessageListener chan<- JSONMessage
type JSONMessagePublisher struct {
m sync.RWMutex
subscribers []JSONMessageListener
}
func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
p.m.Lock()
p.subscribers = append(p.subscribers, l)
p.m.Unlock()
}
func (p *JSONMessagePublisher) SubscribersCount() int {
p.m.RLock()
count := len(p.subscribers)
p.m.RUnlock()
return count
}
// Unsubscribe closes and removes the specified listener from the list of
// previously registed ones.
// It returns a boolean value indicating if the listener was successfully
// found, closed and unregistered.
func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
p.m.Lock()
defer p.m.Unlock()
for i, subscriber := range p.subscribers {
if subscriber == l {
close(l)
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
return true
}
}
return false
}
func (p *JSONMessagePublisher) Publish(m JSONMessage) {
p.m.RLock()
for _, subscriber := range p.subscribers {
// We give each subscriber a 100ms time window to receive the event,
// after which we move to the next.
select {
case subscriber <- m:
case <-time.After(100 * time.Millisecond):
}
}
p.m.RUnlock()
}

View file

@ -1,73 +0,0 @@
package utils
import (
"testing"
"time"
)
func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) {
if q.SubscribersCount() != expected {
t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount())
}
}
func TestJSONMessagePublisherSubscription(t *testing.T) {
q := NewJSONMessagePublisher()
l1 := make(chan JSONMessage)
l2 := make(chan JSONMessage)
assertSubscribersCount(t, q, 0)
q.Subscribe(l1)
assertSubscribersCount(t, q, 1)
q.Subscribe(l2)
assertSubscribersCount(t, q, 2)
q.Unsubscribe(l1)
q.Unsubscribe(l2)
assertSubscribersCount(t, q, 0)
}
func TestJSONMessagePublisherPublish(t *testing.T) {
q := NewJSONMessagePublisher()
l1 := make(chan JSONMessage)
l2 := make(chan JSONMessage)
go func() {
for {
select {
case <-l1:
close(l1)
l1 = nil
case <-l2:
close(l2)
l2 = nil
case <-time.After(1 * time.Second):
q.Unsubscribe(l1)
q.Unsubscribe(l2)
t.Fatal("Timeout waiting for broadcasted message")
}
}
}()
q.Subscribe(l1)
q.Subscribe(l2)
q.Publish(JSONMessage{})
}
func TestJSONMessagePublishTimeout(t *testing.T) {
q := NewJSONMessagePublisher()
l := make(chan JSONMessage)
q.Subscribe(l)
c := make(chan struct{})
go func() {
q.Publish(JSONMessage{})
close(c)
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Timeout publishing message")
}
}