Merge pull request #10077 from crosbymichael/parallel-image-check

Parallel image lookup
This commit is contained in:
Michael Crosby 2015-01-13 17:02:49 -08:00
commit e90df7c8d2

View file

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
@ -61,85 +62,145 @@ func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string
return imageList, tagsByImage, nil
}
func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
out = utils.NewWriteFlusher(out)
log.Debugf("Local repo: %s", localRepo)
imgList, tagsByImage, err := s.getImageList(localRepo, tag)
if err != nil {
return err
}
out.Write(sf.FormatStatus("", "Sending image list"))
var (
repoData *registry.RepositoryData
imageIndex []*registry.ImgData
)
for _, imgId := range imgList {
if tags, exists := tagsByImage[imgId]; exists {
// createImageIndex returns an index of an image's layer IDs and tags.
func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
var imageIndex []*registry.ImgData
for _, id := range images {
if tags, hasTags := tags[id]; hasTags {
// If an image has tags you must add an entry in the image index
// for each tag
for _, tag := range tags {
imageIndex = append(imageIndex, &registry.ImgData{
ID: imgId,
ID: id,
Tag: tag,
})
}
} else {
// If the image does not have a tag it still needs to be sent to the
// registry with an empty tag so that it is accociated with the repository
imageIndex = append(imageIndex, &registry.ImgData{
ID: imgId,
Tag: "",
})
continue
}
// If the image does not have a tag it still needs to be sent to the
// registry with an empty tag so that it is accociated with the repository
imageIndex = append(imageIndex, &registry.ImgData{
ID: id,
Tag: "",
})
}
return imageIndex
}
type imagePushData struct {
id string
endpoint string
tokens []string
}
// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
// and if it is absent then it sends the image id to the channel to be pushed.
func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter,
images chan imagePushData, imagesToPush chan string) {
defer wg.Done()
for image := range images {
if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
log.Errorf("Error in LookupRemoteImage: %s", err)
imagesToPush <- image.id
continue
}
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id)))
}
}
func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error {
workerCount := len(imageIDs)
// start a maximum of 5 workers to check if images exist on the specified endpoint.
if workerCount > 5 {
workerCount = 5
}
var (
wg = &sync.WaitGroup{}
imageData = make(chan imagePushData, workerCount*2)
imagesToPush = make(chan string, workerCount*2)
pushes = make(chan map[string]struct{}, 1)
)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
}
// start a go routine that consumes the images to push
go func() {
shouldPush := make(map[string]struct{})
for id := range imagesToPush {
shouldPush[id] = struct{}{}
}
pushes <- shouldPush
}()
for _, id := range imageIDs {
imageData <- imagePushData{
id: id,
endpoint: endpoint,
tokens: repo.Tokens,
}
}
// close the channel to notify the workers that there will be no more images to check.
close(imageData)
wg.Wait()
close(imagesToPush)
// wait for all the images that require pushes to be collected into a consumable map.
shouldPush := <-pushes
// finish by pushing any images and tags to the endpoint. The order that the images are pushed
// is very important that is why we are still itterating over the ordered list of imageIDs.
for _, id := range imageIDs {
if _, push := shouldPush[id]; push {
if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
// FIXME: Continue on error?
return err
}
}
for _, tag := range tags[id] {
out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
return err
}
}
}
return nil
}
// pushRepository pushes layers that do not already exist on the registry.
func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
repoInfo *registry.RepositoryInfo, localRepo map[string]string,
tag string, sf *utils.StreamFormatter) error {
log.Debugf("Local repo: %s", localRepo)
out = utils.NewWriteFlusher(out)
imgList, tags, err := s.getImageList(localRepo, tag)
if err != nil {
return err
}
out.Write(sf.FormatStatus("", "Sending image list"))
imageIndex := s.createImageIndex(imgList, tags)
log.Debugf("Preparing to push %s with the following images and tags", localRepo)
for _, data := range imageIndex {
log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
}
// Register all the images in a repository with the registry
// If an image is not in this list it will not be associated with the repository
repoData, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
if err != nil {
return err
}
nTag := 1
if tag == "" {
nTag = len(localRepo)
}
for _, ep := range repoData.Endpoints {
out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
for _, imgId := range imgList {
if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err != nil {
log.Errorf("Error in LookupRemoteImage: %s", err)
if _, err := s.pushImage(r, out, imgId, ep, repoData.Tokens, sf); err != nil {
// FIXME: Continue on error?
return err
}
} else {
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
}
for _, tag := range tagsByImage[imgId] {
out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+repoInfo.RemoteName+"/tags/"+tag))
if err := r.PushRegistryTag(repoInfo.RemoteName, imgId, tag, ep, repoData.Tokens); err != nil {
return err
}
}
out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
// push the repository to each of the endpoints only if it does not exist.
for _, endpoint := range repoData.Endpoints {
if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
return err
}
}
if _, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints); err != nil {
return err
}
return nil
_, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
return err
}
func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {