Browse Source

Move "pull" and "push" to graph/pull.go and graph/push.go

This is part of the ongoing effort to remove the deprecated server/
package, and generally cleanup and simplify the codebase.

Signed-off-by: Solomon Hykes <solomon@docker.com>
Solomon Hykes 11 years ago
parent
commit
6856a6b110
9 changed files with 646 additions and 621 deletions
  1. 41 0
      graph/pools_test.go
  2. 300 0
      graph/pull.go
  3. 249 0
      graph/push.go
  4. 2 0
      graph/service.go
  5. 49 0
      graph/tags.go
  6. 0 569
      server/image.go
  7. 2 6
      server/init.go
  8. 3 5
      server/server.go
  9. 0 41
      server/server_unit_test.go

+ 41 - 0
graph/pools_test.go

@@ -0,0 +1,41 @@
+package graph
+
+import "testing"
+
+func TestPools(t *testing.T) {
+	s := &TagStore{
+		pullingPool: make(map[string]chan struct{}),
+		pushingPool: make(map[string]chan struct{}),
+	}
+
+	if _, err := s.poolAdd("pull", "test1"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := s.poolAdd("pull", "test2"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := s.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
+		t.Fatalf("Expected `pull test1 is already in progress`")
+	}
+	if _, err := s.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
+		t.Fatalf("Expected `pull test1 is already in progress`")
+	}
+	if _, err := s.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
+		t.Fatalf("Expected `Unknown pool type`")
+	}
+	if err := s.poolRemove("pull", "test2"); err != nil {
+		t.Fatal(err)
+	}
+	if err := s.poolRemove("pull", "test2"); err != nil {
+		t.Fatal(err)
+	}
+	if err := s.poolRemove("pull", "test1"); err != nil {
+		t.Fatal(err)
+	}
+	if err := s.poolRemove("push", "test1"); err != nil {
+		t.Fatal(err)
+	}
+	if err := s.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
+		t.Fatalf("Expected `Unknown pool type`")
+	}
+}

+ 300 - 0
graph/pull.go

@@ -0,0 +1,300 @@
+package graph
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"net/url"
+	"strings"
+	"time"
+
+	"github.com/docker/docker/engine"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/utils"
+)
+
+func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
+	if n := len(job.Args); n != 1 && n != 2 {
+		return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
+	}
+	var (
+		localName   = job.Args[0]
+		tag         string
+		sf          = utils.NewStreamFormatter(job.GetenvBool("json"))
+		authConfig  = &registry.AuthConfig{}
+		metaHeaders map[string][]string
+	)
+	if len(job.Args) > 1 {
+		tag = job.Args[1]
+	}
+
+	job.GetenvJson("authConfig", authConfig)
+	job.GetenvJson("metaHeaders", &metaHeaders)
+
+	c, err := s.poolAdd("pull", localName+":"+tag)
+	if err != nil {
+		if c != nil {
+			// Another pull of the same repository is already taking place; just wait for it to finish
+			job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
+			<-c
+			return engine.StatusOK
+		}
+		return job.Error(err)
+	}
+	defer s.poolRemove("pull", localName+":"+tag)
+
+	// Resolve the Repository name from fqn to endpoint + name
+	hostname, remoteName, err := registry.ResolveRepositoryName(localName)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	if endpoint == registry.IndexServerAddress() {
+		// If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
+		localName = remoteName
+	}
+
+	if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
+		return job.Error(err)
+	}
+
+	return engine.StatusOK
+}
+
+func (s *TagStore) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
+	out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
+
+	repoData, err := r.GetRepositoryData(remoteName)
+	if err != nil {
+		if strings.Contains(err.Error(), "HTTP code: 404") {
+			return fmt.Errorf("Error: image %s not found", remoteName)
+		} else {
+			// Unexpected HTTP error
+			return err
+		}
+	}
+
+	utils.Debugf("Retrieving the tag list")
+	tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
+	if err != nil {
+		utils.Errorf("%v", err)
+		return err
+	}
+
+	for tag, id := range tagsList {
+		repoData.ImgList[id] = &registry.ImgData{
+			ID:       id,
+			Tag:      tag,
+			Checksum: "",
+		}
+	}
+
+	utils.Debugf("Registering tags")
+	// If no tag has been specified, pull them all
+	if askedTag == "" {
+		for tag, id := range tagsList {
+			repoData.ImgList[id].Tag = tag
+		}
+	} else {
+		// Otherwise, check that the tag exists and use only that one
+		id, exists := tagsList[askedTag]
+		if !exists {
+			return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
+		}
+		repoData.ImgList[id].Tag = askedTag
+	}
+
+	errors := make(chan error)
+	for _, image := range repoData.ImgList {
+		downloadImage := func(img *registry.ImgData) {
+			if askedTag != "" && img.Tag != askedTag {
+				utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
+				if parallel {
+					errors <- nil
+				}
+				return
+			}
+
+			if img.Tag == "" {
+				utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
+				if parallel {
+					errors <- nil
+				}
+				return
+			}
+
+			// ensure no two downloads of the same image happen at the same time
+			if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
+				if c != nil {
+					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
+					<-c
+					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
+				} else {
+					utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
+				}
+				if parallel {
+					errors <- nil
+				}
+				return
+			}
+			defer s.poolRemove("pull", "img:"+img.ID)
+
+			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
+			success := false
+			var lastErr error
+			for _, ep := range repoData.Endpoints {
+				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
+				if err := s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
+					// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
+					// As the error is also given to the output stream the user will see the error.
+					lastErr = err
+					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
+					continue
+				}
+				success = true
+				break
+			}
+			if !success {
+				err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
+				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
+				if parallel {
+					errors <- err
+					return
+				}
+			}
+			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
+
+			if parallel {
+				errors <- nil
+			}
+		}
+
+		if parallel {
+			go downloadImage(image)
+		} else {
+			downloadImage(image)
+		}
+	}
+	if parallel {
+		var lastError error
+		for i := 0; i < len(repoData.ImgList); i++ {
+			if err := <-errors; err != nil {
+				lastError = err
+			}
+		}
+		if lastError != nil {
+			return lastError
+		}
+
+	}
+	for tag, id := range tagsList {
+		if askedTag != "" && tag != askedTag {
+			continue
+		}
+		if err := s.Set(localName, tag, id, true); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (s *TagStore) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
+	history, err := r.GetRemoteHistory(imgID, endpoint, token)
+	if err != nil {
+		return err
+	}
+	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
+	// FIXME: Try to stream the images?
+	// FIXME: Launch the getRemoteImage() in goroutines
+
+	for i := len(history) - 1; i >= 0; i-- {
+		id := history[i]
+
+		// ensure no two downloads of the same layer happen at the same time
+		if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
+			utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
+			<-c
+		}
+		defer s.poolRemove("pull", "layer:"+id)
+
+		if !s.graph.Exists(id) {
+			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
+			var (
+				imgJSON []byte
+				imgSize int
+				err     error
+				img     *image.Image
+			)
+			retries := 5
+			for j := 1; j <= retries; j++ {
+				imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
+				if err != nil && j == retries {
+					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
+					return err
+				} else if err != nil {
+					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+					continue
+				}
+				img, err = image.NewImgJSON(imgJSON)
+				if err != nil && j == retries {
+					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
+					return fmt.Errorf("Failed to parse json: %s", err)
+				} else if err != nil {
+					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+					continue
+				} else {
+					break
+				}
+			}
+
+			for j := 1; j <= retries; j++ {
+				// Get the layer
+				status := "Pulling fs layer"
+				if j > 1 {
+					status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
+				}
+				out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
+				layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
+				if uerr, ok := err.(*url.Error); ok {
+					err = uerr.Err
+				}
+				if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
+					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+					continue
+				} else if err != nil {
+					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
+					return err
+				}
+				defer layer.Close()
+
+				err = s.graph.Register(imgJSON,
+					utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"),
+					img)
+				if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
+					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+					continue
+				} else if err != nil {
+					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
+					return err
+				} else {
+					break
+				}
+			}
+		}
+		out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
+
+	}
+	return nil
+}

+ 249 - 0
graph/push.go

@@ -0,0 +1,249 @@
+package graph
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"path"
+
+	"github.com/docker/docker/archive"
+	"github.com/docker/docker/engine"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/utils"
+)
+
+// Retrieve the all the images to be uploaded in the correct order
+func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
+	var (
+		imageList   []string
+		imagesSeen  map[string]bool     = make(map[string]bool)
+		tagsByImage map[string][]string = make(map[string][]string)
+	)
+
+	for tag, id := range localRepo {
+		if requestedTag != "" && requestedTag != tag {
+			continue
+		}
+		var imageListForThisTag []string
+
+		tagsByImage[id] = append(tagsByImage[id], tag)
+
+		for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
+			if err != nil {
+				return nil, nil, err
+			}
+
+			if imagesSeen[img.ID] {
+				// This image is already on the list, we can ignore it and all its parents
+				break
+			}
+
+			imagesSeen[img.ID] = true
+			imageListForThisTag = append(imageListForThisTag, img.ID)
+		}
+
+		// reverse the image list for this tag (so the "most"-parent image is first)
+		for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
+			imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
+		}
+
+		// append to main image list
+		imageList = append(imageList, imageListForThisTag...)
+	}
+	if len(imageList) == 0 {
+		return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
+	}
+	utils.Debugf("Image list: %v", imageList)
+	utils.Debugf("Tags by image: %v", tagsByImage)
+
+	return imageList, tagsByImage, nil
+}
+
+func (s *TagStore) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
+	out = utils.NewWriteFlusher(out)
+	utils.Debugf("Local repo: %s", localRepo)
+	imgList, tagsByImage, err := s.getImageList(localRepo, tag)
+	if err != nil {
+		return err
+	}
+
+	out.Write(sf.FormatStatus("", "Sending image list"))
+
+	var (
+		repoData   *registry.RepositoryData
+		imageIndex []*registry.ImgData
+	)
+
+	for _, imgId := range imgList {
+		if tags, exists := tagsByImage[imgId]; exists {
+			// If an image has tags you must add an entry in the image index
+			// for each tag
+			for _, tag := range tags {
+				imageIndex = append(imageIndex, &registry.ImgData{
+					ID:  imgId,
+					Tag: tag,
+				})
+			}
+		} else {
+			// If the image does not have a tag it still needs to be sent to the
+			// registry with an empty tag so that it is accociated with the repository
+			imageIndex = append(imageIndex, &registry.ImgData{
+				ID:  imgId,
+				Tag: "",
+			})
+
+		}
+	}
+
+	utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo)
+	for _, data := range imageIndex {
+		utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag)
+	}
+
+	// Register all the images in a repository with the registry
+	// If an image is not in this list it will not be associated with the repository
+	repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil)
+	if err != nil {
+		return err
+	}
+
+	nTag := 1
+	if tag == "" {
+		nTag = len(localRepo)
+	}
+	for _, ep := range repoData.Endpoints {
+		out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag))
+
+		for _, imgId := range imgList {
+			if r.LookupRemoteImage(imgId, ep, repoData.Tokens) {
+				out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
+			} else {
+				if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
+					// FIXME: Continue on error?
+					return err
+				}
+			}
+
+			for _, tag := range tagsByImage[imgId] {
+				out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag))
+
+				if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil {
+					return err
+				}
+			}
+		}
+	}
+
+	if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (s *TagStore) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
+	out = utils.NewWriteFlusher(out)
+	jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
+	if err != nil {
+		return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
+	}
+	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
+
+	imgData := &registry.ImgData{
+		ID: imgID,
+	}
+
+	// Send the json
+	if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
+		if err == registry.ErrAlreadyExists {
+			out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
+			return "", nil
+		}
+		return "", err
+	}
+
+	layerData, err := s.graph.TempLayerArchive(imgID, archive.Uncompressed, sf, out)
+	if err != nil {
+		return "", fmt.Errorf("Failed to generate layer archive: %s", err)
+	}
+	defer os.RemoveAll(layerData.Name())
+
+	// Send the layer
+	utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
+
+	checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
+	if err != nil {
+		return "", err
+	}
+	imgData.Checksum = checksum
+	imgData.ChecksumPayload = checksumPayload
+	// Send the checksum
+	if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
+		return "", err
+	}
+
+	out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
+	return imgData.Checksum, nil
+}
+
+// FIXME: Allow to interrupt current push when new push of same image is done.
+func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
+	if n := len(job.Args); n != 1 {
+		return job.Errorf("Usage: %s IMAGE", job.Name)
+	}
+	var (
+		localName   = job.Args[0]
+		sf          = utils.NewStreamFormatter(job.GetenvBool("json"))
+		authConfig  = &registry.AuthConfig{}
+		metaHeaders map[string][]string
+	)
+
+	tag := job.Getenv("tag")
+	job.GetenvJson("authConfig", authConfig)
+	job.GetenvJson("metaHeaders", &metaHeaders)
+	if _, err := s.poolAdd("push", localName); err != nil {
+		return job.Error(err)
+	}
+	defer s.poolRemove("push", localName)
+
+	// Resolve the Repository name from fqn to endpoint + name
+	hostname, remoteName, err := registry.ResolveRepositoryName(localName)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	img, err := s.graph.Get(localName)
+	r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
+	if err2 != nil {
+		return job.Error(err2)
+	}
+
+	if err != nil {
+		reposLen := 1
+		if tag == "" {
+			reposLen = len(s.Repositories[localName])
+		}
+		job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
+		// If it fails, try to get the repository
+		if localRepo, exists := s.Repositories[localName]; exists {
+			if err := s.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil {
+				return job.Error(err)
+			}
+			return engine.StatusOK
+		}
+		return job.Error(err)
+	}
+
+	var token []string
+	job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
+	if _, err := s.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil {
+		return job.Error(err)
+	}
+	return engine.StatusOK
+}

+ 2 - 0
graph/service.go

@@ -23,6 +23,8 @@ func (s *TagStore) Install(eng *engine.Engine) error {
 		"viz":            s.CmdViz,
 		"load":           s.CmdLoad,
 		"import":         s.CmdImport,
+		"pull":           s.CmdPull,
+		"push":           s.CmdPush,
 	} {
 		if err := eng.Register(name, handler); err != nil {
 			return fmt.Errorf("Could not register %q: %v", name, err)

+ 49 - 0
graph/tags.go

@@ -22,6 +22,10 @@ type TagStore struct {
 	graph        *Graph
 	Repositories map[string]Repository
 	sync.Mutex
+	// FIXME: move push/pull-related fields
+	// to a helper type
+	pullingPool map[string]chan struct{}
+	pushingPool map[string]chan struct{}
 }
 
 type Repository map[string]string
@@ -35,6 +39,8 @@ func NewTagStore(path string, graph *Graph) (*TagStore, error) {
 		path:         abspath,
 		graph:        graph,
 		Repositories: make(map[string]Repository),
+		pullingPool:  make(map[string]chan struct{}),
+		pushingPool:  make(map[string]chan struct{}),
 	}
 	// Load the json file if it exists, otherwise create it.
 	if err := store.reload(); os.IsNotExist(err) {
@@ -263,3 +269,46 @@ func validateTagName(name string) error {
 	}
 	return nil
 }
+
+func (s *TagStore) poolAdd(kind, key string) (chan struct{}, error) {
+	s.Lock()
+	defer s.Unlock()
+
+	if c, exists := s.pullingPool[key]; exists {
+		return c, fmt.Errorf("pull %s is already in progress", key)
+	}
+	if c, exists := s.pushingPool[key]; exists {
+		return c, fmt.Errorf("push %s is already in progress", key)
+	}
+
+	c := make(chan struct{})
+	switch kind {
+	case "pull":
+		s.pullingPool[key] = c
+	case "push":
+		s.pushingPool[key] = c
+	default:
+		return nil, fmt.Errorf("Unknown pool type")
+	}
+	return c, nil
+}
+
+func (s *TagStore) poolRemove(kind, key string) error {
+	s.Lock()
+	defer s.Unlock()
+	switch kind {
+	case "pull":
+		if c, exists := s.pullingPool[key]; exists {
+			close(c)
+			delete(s.pullingPool, key)
+		}
+	case "push":
+		if c, exists := s.pushingPool[key]; exists {
+			close(c)
+			delete(s.pushingPool, key)
+		}
+	default:
+		return fmt.Errorf("Unknown pool type")
+	}
+	return nil
+}

+ 0 - 569
server/image.go

@@ -5,21 +5,15 @@
 package server
 
 import (
-	"fmt"
 	"io"
 	"io/ioutil"
-	"net"
-	"net/url"
 	"os"
 	"os/exec"
-	"path"
 	"strings"
-	"time"
 
 	"github.com/docker/docker/archive"
 	"github.com/docker/docker/builder"
 	"github.com/docker/docker/engine"
-	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/parsers"
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/utils"
@@ -104,566 +98,3 @@ func (srv *Server) Build(job *engine.Job) engine.Status {
 	}
 	return engine.StatusOK
 }
-
-func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
-	history, err := r.GetRemoteHistory(imgID, endpoint, token)
-	if err != nil {
-		return err
-	}
-	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
-	// FIXME: Try to stream the images?
-	// FIXME: Launch the getRemoteImage() in goroutines
-
-	for i := len(history) - 1; i >= 0; i-- {
-		id := history[i]
-
-		// ensure no two downloads of the same layer happen at the same time
-		if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
-			utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
-			<-c
-		}
-		defer srv.poolRemove("pull", "layer:"+id)
-
-		if !srv.daemon.Graph().Exists(id) {
-			out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
-			var (
-				imgJSON []byte
-				imgSize int
-				err     error
-				img     *image.Image
-			)
-			retries := 5
-			for j := 1; j <= retries; j++ {
-				imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
-				if err != nil && j == retries {
-					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
-					return err
-				} else if err != nil {
-					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
-					continue
-				}
-				img, err = image.NewImgJSON(imgJSON)
-				if err != nil && j == retries {
-					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
-					return fmt.Errorf("Failed to parse json: %s", err)
-				} else if err != nil {
-					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
-					continue
-				} else {
-					break
-				}
-			}
-
-			for j := 1; j <= retries; j++ {
-				// Get the layer
-				status := "Pulling fs layer"
-				if j > 1 {
-					status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
-				}
-				out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
-				layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
-				if uerr, ok := err.(*url.Error); ok {
-					err = uerr.Err
-				}
-				if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
-					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
-					continue
-				} else if err != nil {
-					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
-					return err
-				}
-				defer layer.Close()
-
-				err = srv.daemon.Graph().Register(imgJSON,
-					utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"),
-					img)
-				if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
-					time.Sleep(time.Duration(j) * 500 * time.Millisecond)
-					continue
-				} else if err != nil {
-					out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
-					return err
-				} else {
-					break
-				}
-			}
-		}
-		out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
-
-	}
-	return nil
-}
-
-func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
-	out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
-
-	repoData, err := r.GetRepositoryData(remoteName)
-	if err != nil {
-		if strings.Contains(err.Error(), "HTTP code: 404") {
-			return fmt.Errorf("Error: image %s not found", remoteName)
-		} else {
-			// Unexpected HTTP error
-			return err
-		}
-	}
-
-	utils.Debugf("Retrieving the tag list")
-	tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
-	if err != nil {
-		utils.Errorf("%v", err)
-		return err
-	}
-
-	for tag, id := range tagsList {
-		repoData.ImgList[id] = &registry.ImgData{
-			ID:       id,
-			Tag:      tag,
-			Checksum: "",
-		}
-	}
-
-	utils.Debugf("Registering tags")
-	// If no tag has been specified, pull them all
-	if askedTag == "" {
-		for tag, id := range tagsList {
-			repoData.ImgList[id].Tag = tag
-		}
-	} else {
-		// Otherwise, check that the tag exists and use only that one
-		id, exists := tagsList[askedTag]
-		if !exists {
-			return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
-		}
-		repoData.ImgList[id].Tag = askedTag
-	}
-
-	errors := make(chan error)
-	for _, image := range repoData.ImgList {
-		downloadImage := func(img *registry.ImgData) {
-			if askedTag != "" && img.Tag != askedTag {
-				utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
-				if parallel {
-					errors <- nil
-				}
-				return
-			}
-
-			if img.Tag == "" {
-				utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
-				if parallel {
-					errors <- nil
-				}
-				return
-			}
-
-			// ensure no two downloads of the same image happen at the same time
-			if c, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
-				if c != nil {
-					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
-					<-c
-					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
-				} else {
-					utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
-				}
-				if parallel {
-					errors <- nil
-				}
-				return
-			}
-			defer srv.poolRemove("pull", "img:"+img.ID)
-
-			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
-			success := false
-			var lastErr error
-			for _, ep := range repoData.Endpoints {
-				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
-				if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
-					// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
-					// As the error is also given to the output stream the user will see the error.
-					lastErr = err
-					out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
-					continue
-				}
-				success = true
-				break
-			}
-			if !success {
-				err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
-				out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
-				if parallel {
-					errors <- err
-					return
-				}
-			}
-			out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
-
-			if parallel {
-				errors <- nil
-			}
-		}
-
-		if parallel {
-			go downloadImage(image)
-		} else {
-			downloadImage(image)
-		}
-	}
-	if parallel {
-		var lastError error
-		for i := 0; i < len(repoData.ImgList); i++ {
-			if err := <-errors; err != nil {
-				lastError = err
-			}
-		}
-		if lastError != nil {
-			return lastError
-		}
-
-	}
-	for tag, id := range tagsList {
-		if askedTag != "" && tag != askedTag {
-			continue
-		}
-		if err := srv.daemon.Repositories().Set(localName, tag, id, true); err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (srv *Server) ImagePull(job *engine.Job) engine.Status {
-	if n := len(job.Args); n != 1 && n != 2 {
-		return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
-	}
-	var (
-		localName   = job.Args[0]
-		tag         string
-		sf          = utils.NewStreamFormatter(job.GetenvBool("json"))
-		authConfig  = &registry.AuthConfig{}
-		metaHeaders map[string][]string
-	)
-	if len(job.Args) > 1 {
-		tag = job.Args[1]
-	}
-
-	job.GetenvJson("authConfig", authConfig)
-	job.GetenvJson("metaHeaders", &metaHeaders)
-
-	c, err := srv.poolAdd("pull", localName+":"+tag)
-	if err != nil {
-		if c != nil {
-			// Another pull of the same repository is already taking place; just wait for it to finish
-			job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
-			<-c
-			return engine.StatusOK
-		}
-		return job.Error(err)
-	}
-	defer srv.poolRemove("pull", localName+":"+tag)
-
-	// Resolve the Repository name from fqn to endpoint + name
-	hostname, remoteName, err := registry.ResolveRepositoryName(localName)
-	if err != nil {
-		return job.Error(err)
-	}
-
-	endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
-	if err != nil {
-		return job.Error(err)
-	}
-
-	r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
-	if err != nil {
-		return job.Error(err)
-	}
-
-	if endpoint == registry.IndexServerAddress() {
-		// If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
-		localName = remoteName
-	}
-
-	if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
-		return job.Error(err)
-	}
-
-	return engine.StatusOK
-}
-
-// Retrieve the all the images to be uploaded in the correct order
-func (srv *Server) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
-	var (
-		imageList   []string
-		imagesSeen  map[string]bool     = make(map[string]bool)
-		tagsByImage map[string][]string = make(map[string][]string)
-	)
-
-	for tag, id := range localRepo {
-		if requestedTag != "" && requestedTag != tag {
-			continue
-		}
-		var imageListForThisTag []string
-
-		tagsByImage[id] = append(tagsByImage[id], tag)
-
-		for img, err := srv.daemon.Graph().Get(id); img != nil; img, err = img.GetParent() {
-			if err != nil {
-				return nil, nil, err
-			}
-
-			if imagesSeen[img.ID] {
-				// This image is already on the list, we can ignore it and all its parents
-				break
-			}
-
-			imagesSeen[img.ID] = true
-			imageListForThisTag = append(imageListForThisTag, img.ID)
-		}
-
-		// reverse the image list for this tag (so the "most"-parent image is first)
-		for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
-			imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
-		}
-
-		// append to main image list
-		imageList = append(imageList, imageListForThisTag...)
-	}
-	if len(imageList) == 0 {
-		return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
-	}
-	utils.Debugf("Image list: %v", imageList)
-	utils.Debugf("Tags by image: %v", tagsByImage)
-
-	return imageList, tagsByImage, nil
-}
-
-func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
-	out = utils.NewWriteFlusher(out)
-	utils.Debugf("Local repo: %s", localRepo)
-	imgList, tagsByImage, err := srv.getImageList(localRepo, tag)
-	if err != nil {
-		return err
-	}
-
-	out.Write(sf.FormatStatus("", "Sending image list"))
-
-	var (
-		repoData   *registry.RepositoryData
-		imageIndex []*registry.ImgData
-	)
-
-	for _, imgId := range imgList {
-		if tags, exists := tagsByImage[imgId]; exists {
-			// If an image has tags you must add an entry in the image index
-			// for each tag
-			for _, tag := range tags {
-				imageIndex = append(imageIndex, &registry.ImgData{
-					ID:  imgId,
-					Tag: tag,
-				})
-			}
-		} else {
-			// If the image does not have a tag it still needs to be sent to the
-			// registry with an empty tag so that it is accociated with the repository
-			imageIndex = append(imageIndex, &registry.ImgData{
-				ID:  imgId,
-				Tag: "",
-			})
-
-		}
-	}
-
-	utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo)
-	for _, data := range imageIndex {
-		utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag)
-	}
-
-	// Register all the images in a repository with the registry
-	// If an image is not in this list it will not be associated with the repository
-	repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil)
-	if err != nil {
-		return err
-	}
-
-	nTag := 1
-	if tag == "" {
-		nTag = len(localRepo)
-	}
-	for _, ep := range repoData.Endpoints {
-		out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag))
-
-		for _, imgId := range imgList {
-			if r.LookupRemoteImage(imgId, ep, repoData.Tokens) {
-				out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
-			} else {
-				if _, err := srv.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
-					// FIXME: Continue on error?
-					return err
-				}
-			}
-
-			for _, tag := range tagsByImage[imgId] {
-				out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag))
-
-				if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil {
-					return err
-				}
-			}
-		}
-	}
-
-	if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
-	out = utils.NewWriteFlusher(out)
-	jsonRaw, err := ioutil.ReadFile(path.Join(srv.daemon.Graph().Root, imgID, "json"))
-	if err != nil {
-		return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
-	}
-	out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
-
-	imgData := &registry.ImgData{
-		ID: imgID,
-	}
-
-	// Send the json
-	if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
-		if err == registry.ErrAlreadyExists {
-			out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
-			return "", nil
-		}
-		return "", err
-	}
-
-	layerData, err := srv.daemon.Graph().TempLayerArchive(imgID, archive.Uncompressed, sf, out)
-	if err != nil {
-		return "", fmt.Errorf("Failed to generate layer archive: %s", err)
-	}
-	defer os.RemoveAll(layerData.Name())
-
-	// Send the layer
-	utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
-
-	checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
-	if err != nil {
-		return "", err
-	}
-	imgData.Checksum = checksum
-	imgData.ChecksumPayload = checksumPayload
-	// Send the checksum
-	if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
-		return "", err
-	}
-
-	out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
-	return imgData.Checksum, nil
-}
-
-// FIXME: Allow to interrupt current push when new push of same image is done.
-func (srv *Server) ImagePush(job *engine.Job) engine.Status {
-	if n := len(job.Args); n != 1 {
-		return job.Errorf("Usage: %s IMAGE", job.Name)
-	}
-	var (
-		localName   = job.Args[0]
-		sf          = utils.NewStreamFormatter(job.GetenvBool("json"))
-		authConfig  = &registry.AuthConfig{}
-		metaHeaders map[string][]string
-	)
-
-	tag := job.Getenv("tag")
-	job.GetenvJson("authConfig", authConfig)
-	job.GetenvJson("metaHeaders", &metaHeaders)
-	if _, err := srv.poolAdd("push", localName); err != nil {
-		return job.Error(err)
-	}
-	defer srv.poolRemove("push", localName)
-
-	// Resolve the Repository name from fqn to endpoint + name
-	hostname, remoteName, err := registry.ResolveRepositoryName(localName)
-	if err != nil {
-		return job.Error(err)
-	}
-
-	endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
-	if err != nil {
-		return job.Error(err)
-	}
-
-	img, err := srv.daemon.Graph().Get(localName)
-	r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
-	if err2 != nil {
-		return job.Error(err2)
-	}
-
-	if err != nil {
-		reposLen := 1
-		if tag == "" {
-			reposLen = len(srv.daemon.Repositories().Repositories[localName])
-		}
-		job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
-		// If it fails, try to get the repository
-		if localRepo, exists := srv.daemon.Repositories().Repositories[localName]; exists {
-			if err := srv.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil {
-				return job.Error(err)
-			}
-			return engine.StatusOK
-		}
-		return job.Error(err)
-	}
-
-	var token []string
-	job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
-	if _, err := srv.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil {
-		return job.Error(err)
-	}
-	return engine.StatusOK
-}
-
-func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
-	srv.Lock()
-	defer srv.Unlock()
-
-	if c, exists := srv.pullingPool[key]; exists {
-		return c, fmt.Errorf("pull %s is already in progress", key)
-	}
-	if c, exists := srv.pushingPool[key]; exists {
-		return c, fmt.Errorf("push %s is already in progress", key)
-	}
-
-	c := make(chan struct{})
-	switch kind {
-	case "pull":
-		srv.pullingPool[key] = c
-	case "push":
-		srv.pushingPool[key] = c
-	default:
-		return nil, fmt.Errorf("Unknown pool type")
-	}
-	return c, nil
-}
-
-func (srv *Server) poolRemove(kind, key string) error {
-	srv.Lock()
-	defer srv.Unlock()
-	switch kind {
-	case "pull":
-		if c, exists := srv.pullingPool[key]; exists {
-			close(c)
-			delete(srv.pullingPool, key)
-		}
-	case "push":
-		if c, exists := srv.pushingPool[key]; exists {
-			close(c)
-			delete(srv.pushingPool, key)
-		}
-	default:
-		return fmt.Errorf("Unknown pool type")
-	}
-	return nil
-}

+ 2 - 6
server/init.go

@@ -33,8 +33,6 @@ func InitServer(job *engine.Job) engine.Status {
 
 	for name, handler := range map[string]engine.Handler{
 		"build": srv.Build,
-		"pull":  srv.ImagePull,
-		"push":  srv.ImagePush,
 	} {
 		if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil {
 			return job.Error(err)
@@ -59,10 +57,8 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
 		return nil, err
 	}
 	srv := &Server{
-		Eng:         eng,
-		daemon:      daemon,
-		pullingPool: make(map[string]chan struct{}),
-		pushingPool: make(map[string]chan struct{}),
+		Eng:    eng,
+		daemon: daemon,
 	}
 	return srv, nil
 }

+ 3 - 5
server/server.go

@@ -30,9 +30,7 @@ import (
 
 type Server struct {
 	sync.RWMutex
-	daemon      *daemon.Daemon
-	pullingPool map[string]chan struct{}
-	pushingPool map[string]chan struct{}
-	Eng         *engine.Engine
-	tasks       sync.WaitGroup
+	daemon *daemon.Daemon
+	Eng    *engine.Engine
+	tasks  sync.WaitGroup
 }

+ 0 - 41
server/server_unit_test.go

@@ -1,41 +0,0 @@
-package server
-
-import "testing"
-
-func TestPools(t *testing.T) {
-	srv := &Server{
-		pullingPool: make(map[string]chan struct{}),
-		pushingPool: make(map[string]chan struct{}),
-	}
-
-	if _, err := srv.poolAdd("pull", "test1"); err != nil {
-		t.Fatal(err)
-	}
-	if _, err := srv.poolAdd("pull", "test2"); err != nil {
-		t.Fatal(err)
-	}
-	if _, err := srv.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
-		t.Fatalf("Expected `pull test1 is already in progress`")
-	}
-	if _, err := srv.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
-		t.Fatalf("Expected `pull test1 is already in progress`")
-	}
-	if _, err := srv.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
-		t.Fatalf("Expected `Unknown pool type`")
-	}
-	if err := srv.poolRemove("pull", "test2"); err != nil {
-		t.Fatal(err)
-	}
-	if err := srv.poolRemove("pull", "test2"); err != nil {
-		t.Fatal(err)
-	}
-	if err := srv.poolRemove("pull", "test1"); err != nil {
-		t.Fatal(err)
-	}
-	if err := srv.poolRemove("push", "test1"); err != nil {
-		t.Fatal(err)
-	}
-	if err := srv.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
-		t.Fatalf("Expected `Unknown pool type`")
-	}
-}