Remove jobs from registry.Service

This makes `registry.Service` a first class type and does not use jobs
to interact with this type.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-03-31 16:21:37 -07:00
parent 7dd79dcc7b
commit 03d3d79b2b
13 changed files with 71 additions and 252 deletions

View file

@ -49,6 +49,7 @@ func (cli *DockerCli) CmdSearch(args ...string) error {
if _, err := outs.ReadListFrom(rawBody); err != nil {
return err
}
outs.ReverseSort()
w := tabwriter.NewWriter(cli.out, 10, 1, 3, ' ', 0)
fmt.Fprintf(w, "NAME\tDESCRIPTION\tSTARS\tOFFICIAL\tAUTOMATED\n")
for _, out := range outs.Data {

View file

@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
@ -21,6 +20,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/parsers"
@ -169,29 +169,25 @@ func getBoolParam(value string) (bool, error) {
return ret, nil
}
func getDaemon(eng *engine.Engine) *daemon.Daemon {
return eng.HackGetGlobalVar("httpapi.daemon").(*daemon.Daemon)
}
func postAuth(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var (
authConfig, err = ioutil.ReadAll(r.Body)
job = eng.Job("auth")
stdoutBuffer = bytes.NewBuffer(nil)
)
var config *registry.AuthConfig
err := json.NewDecoder(r.Body).Decode(&config)
r.Body.Close()
if err != nil {
return err
}
job.Setenv("authConfig", string(authConfig))
job.Stdout.Add(stdoutBuffer)
if err = job.Run(); err != nil {
d := getDaemon(eng)
status, err := d.RegistryService.Auth(config)
if err != nil {
return err
}
if status := engine.Tail(stdoutBuffer, 1); status != "" {
var env engine.Env
env.Set("Status", status)
return writeJSON(w, http.StatusOK, &types.AuthResponse{
Status: status,
})
}
w.WriteHeader(http.StatusNoContent)
return nil
return writeJSON(w, http.StatusOK, &types.AuthResponse{
Status: status,
})
}
func getVersion(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -601,31 +597,30 @@ func getImagesSearch(eng *engine.Engine, version version.Version, w http.Respons
return err
}
var (
config *registry.AuthConfig
authEncoded = r.Header.Get("X-Registry-Auth")
authConfig = &registry.AuthConfig{}
metaHeaders = map[string][]string{}
headers = map[string][]string{}
)
if authEncoded != "" {
authJson := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authEncoded))
if err := json.NewDecoder(authJson).Decode(authConfig); err != nil {
if err := json.NewDecoder(authJson).Decode(&config); err != nil {
// for a search it is not an error if no auth was given
// to increase compatibility with the existing api it is defaulting to be empty
authConfig = &registry.AuthConfig{}
config = &registry.AuthConfig{}
}
}
for k, v := range r.Header {
if strings.HasPrefix(k, "X-Meta-") {
metaHeaders[k] = v
headers[k] = v
}
}
var job = eng.Job("search", r.Form.Get("term"))
job.SetenvJson("metaHeaders", metaHeaders)
job.SetenvJson("authConfig", authConfig)
streamJSON(job, w, false)
return job.Run()
d := getDaemon(eng)
query, err := d.RegistryService.Search(r.Form.Get("term"), config, headers)
if err != nil {
return err
}
return json.NewEncoder(w).Encode(query.Results)
}
func postImagesPush(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

View file

@ -34,7 +34,6 @@ import (
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/pkg/urlutil"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
)
@ -439,7 +438,7 @@ func (b *Builder) pullImage(name string) (*imagepkg.Image, error) {
pullRegistryAuth := b.AuthConfig
if len(b.AuthConfigFile.Configs) > 0 {
// The request came with a full auth config file, we prefer to use that
repoInfo, err := registry.ResolveRepositoryInfo(job, remote)
repoInfo, err := b.Daemon.RegistryService.ResolveRepository(remote)
if err != nil {
return nil, err
}

View file

@ -40,6 +40,7 @@ import (
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/sysinfo"
"github.com/docker/docker/pkg/truncindex"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/trust"
"github.com/docker/docker/utils"
@ -107,6 +108,7 @@ type Daemon struct {
trustStore *trust.TrustStore
statsCollector *statsCollector
defaultLogConfig runconfig.LogConfig
RegistryService *registry.Service
}
// Install installs daemon capabilities to eng.
@ -793,15 +795,15 @@ func (daemon *Daemon) RegisterLinks(container *Container, hostConfig *runconfig.
}
// FIXME: harmonize with NewGraph()
func NewDaemon(config *Config, eng *engine.Engine) (*Daemon, error) {
daemon, err := NewDaemonFromDirectory(config, eng)
func NewDaemon(config *Config, eng *engine.Engine, registryService *registry.Service) (*Daemon, error) {
daemon, err := NewDaemonFromDirectory(config, eng, registryService)
if err != nil {
return nil, err
}
return daemon, nil
}
func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error) {
func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService *registry.Service) (*Daemon, error) {
if config.Mtu == 0 {
config.Mtu = getDefaultNetworkMtu()
}
@ -931,7 +933,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error)
}
logrus.Debug("Creating repository list")
repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey)
repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService)
if err != nil {
return nil, fmt.Errorf("Couldn't create Tag store: %s", err)
}
@ -1022,6 +1024,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error)
trustStore: t,
statsCollector: newStatsCollector(1 * time.Second),
defaultLogConfig: config.LogConfig,
RegistryService: registryService,
}
eng.OnShutdown(func() {

View file

@ -56,15 +56,6 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
if err := cjob.Run(); err != nil {
return err
}
registryJob := job.Eng.Job("registry_config")
registryEnv, _ := registryJob.Stdout.AddEnv()
if err := registryJob.Run(); err != nil {
return err
}
registryConfig := registry.ServiceConfig{}
if err := registryEnv.GetJson("config", &registryConfig); err != nil {
return err
}
v := &engine.Env{}
v.SetJson("ID", daemon.ID)
v.SetInt("Containers", len(daemon.List()))
@ -83,7 +74,7 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
v.Set("KernelVersion", kernelVersion)
v.Set("OperatingSystem", operatingSystem)
v.Set("IndexServerAddress", registry.IndexServerAddress())
v.SetJson("RegistryConfig", registryConfig)
v.SetJson("RegistryConfig", daemon.RegistryService.Config)
v.Set("InitSha1", dockerversion.INITSHA1)
v.Set("InitPath", initPath)
v.SetInt("NCPU", runtime.NumCPU())

View file

@ -98,17 +98,13 @@ func mainDaemon() {
logrus.Fatal(err)
}
// load registry service
if err := registry.NewService(registryCfg).Install(eng); err != nil {
logrus.Fatal(err)
}
registryService := registry.NewService(registryCfg)
// load the daemon in the background so we can immediately start
// the http api so that connections don't fail while the daemon
// is booting
daemonInitWait := make(chan error)
go func() {
d, err := daemon.NewDaemon(daemonCfg, eng)
d, err := daemon.NewDaemon(daemonCfg, eng, registryService)
if err != nil {
daemonInitWait <- err
return

View file

@ -35,7 +35,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
)
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
repoInfo, err := s.registryService.ResolveRepository(localName)
if err != nil {
return err
}

View file

@ -498,7 +498,7 @@ func (s *TagStore) CmdPush(job *engine.Job) error {
)
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
repoInfo, err := s.registryService.ResolveRepository(localName)
if err != nil {
return err
}

View file

@ -36,8 +36,9 @@ type TagStore struct {
sync.Mutex
// FIXME: move push/pull-related fields
// to a helper type
pullingPool map[string]chan struct{}
pushingPool map[string]chan struct{}
pullingPool map[string]chan struct{}
pushingPool map[string]chan struct{}
registryService *registry.Service
}
type Repository map[string]string
@ -60,19 +61,20 @@ func (r Repository) Contains(u Repository) bool {
return true
}
func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey) (*TagStore, error) {
func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service) (*TagStore, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
store := &TagStore{
path: abspath,
graph: graph,
trustKey: key,
Repositories: make(map[string]Repository),
pullingPool: make(map[string]chan struct{}),
pushingPool: make(map[string]chan struct{}),
path: abspath,
graph: graph,
trustKey: key,
Repositories: make(map[string]Repository),
pullingPool: make(map[string]chan struct{}),
pushingPool: make(map[string]chan struct{}),
registryService: registryService,
}
// Load the json file if it exists, otherwise create it.
if err := store.reload(); os.IsNotExist(err) {

View file

@ -59,7 +59,7 @@ func mkTestTagStore(root string, t *testing.T) *TagStore {
if err != nil {
t.Fatal(err)
}
store, err := NewTagStore(path.Join(root, "tags"), graph, nil)
store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil)
if err != nil {
t.Fatal(err)
}

View file

@ -177,10 +177,6 @@ func newTestEngine(t Fataler, autorestart bool, root string) *engine.Engine {
if err := builtins.Register(eng); err != nil {
t.Fatal(err)
}
// load registry service
if err := registry.NewService(nil).Install(eng); err != nil {
t.Fatal(err)
}
// (This is manually copied and modified from main() until we have a more generic plugin system)
cfg := &daemon.Config{
@ -193,7 +189,7 @@ func newTestEngine(t Fataler, autorestart bool, root string) *engine.Engine {
TrustKeyPath: filepath.Join(root, "key.json"),
LogConfig: runconfig.LogConfig{Type: "json-file"},
}
d, err := daemon.NewDaemon(cfg, eng)
d, err := daemon.NewDaemon(cfg, eng, registry.NewService(nil))
if err != nil {
t.Fatal(err)
}

View file

@ -230,7 +230,6 @@ func Login(authConfig *AuthConfig, registryEndpoint *Endpoint, factory *requestd
if registryEndpoint.Version == APIVersion2 {
return loginV2(authConfig, registryEndpoint, factory)
}
return loginV1(authConfig, registryEndpoint, factory)
}

View file

@ -1,20 +1,5 @@
package registry
import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
)
// Service exposes registry capabilities in the standard Engine
// interface. Once installed, it extends the engine with the
// following calls:
//
// 'auth': Authenticate against the public registry
// 'search': Search for images on the public registry
// 'pull': Download images from any registry (TODO)
// 'push': Upload images to any registry (TODO)
type Service struct {
Config *ServiceConfig
}
@ -27,201 +12,53 @@ func NewService(options *Options) *Service {
}
}
// Install installs registry capabilities to eng.
func (s *Service) Install(eng *engine.Engine) error {
eng.Register("auth", s.Auth)
eng.Register("search", s.Search)
eng.Register("resolve_repository", s.ResolveRepository)
eng.Register("resolve_index", s.ResolveIndex)
eng.Register("registry_config", s.GetRegistryConfig)
return nil
}
// Auth contacts the public registry with the provided credentials,
// and returns OK if authentication was sucessful.
// It can be used to verify the validity of a client's credentials.
func (s *Service) Auth(job *engine.Job) error {
var (
authConfig = new(AuthConfig)
endpoint *Endpoint
index *IndexInfo
status string
err error
)
job.GetenvJson("authConfig", authConfig)
func (s *Service) Auth(authConfig *AuthConfig) (string, error) {
addr := authConfig.ServerAddress
if addr == "" {
// Use the official registry address if not specified.
addr = IndexServerAddress()
}
if index, err = ResolveIndexInfo(job, addr); err != nil {
return err
index, err := s.ResolveIndex(addr)
if err != nil {
return "", err
}
if endpoint, err = NewEndpoint(index); err != nil {
logrus.Errorf("unable to get new registry endpoint: %s", err)
return err
endpoint, err := NewEndpoint(index)
if err != nil {
return "", err
}
authConfig.ServerAddress = endpoint.String()
if status, err = Login(authConfig, endpoint, HTTPRequestFactory(nil)); err != nil {
logrus.Errorf("unable to login against registry endpoint %s: %s", endpoint, err)
return err
}
logrus.Infof("successful registry login for endpoint %s: %s", endpoint, status)
job.Printf("%s\n", status)
return nil
return Login(authConfig, endpoint, HTTPRequestFactory(nil))
}
// Search queries the public registry for images matching the specified
// search terms, and returns the results.
//
// Argument syntax: search TERM
//
// Option environment:
// 'authConfig': json-encoded credentials to authenticate against the registry.
// The search extends to images only accessible via the credentials.
//
// 'metaHeaders': extra HTTP headers to include in the request to the registry.
// The headers should be passed as a json-encoded dictionary.
//
// Output:
// Results are sent as a collection of structured messages (using engine.Table).
// Each result is sent as a separate message.
// Results are ordered by number of stars on the public registry.
func (s *Service) Search(job *engine.Job) error {
if n := len(job.Args); n != 1 {
return fmt.Errorf("Usage: %s TERM", job.Name)
}
var (
term = job.Args[0]
metaHeaders = map[string][]string{}
authConfig = &AuthConfig{}
)
job.GetenvJson("authConfig", authConfig)
job.GetenvJson("metaHeaders", metaHeaders)
repoInfo, err := ResolveRepositoryInfo(job, term)
func (s *Service) Search(term string, authConfig *AuthConfig, headers map[string][]string) (*SearchResults, error) {
repoInfo, err := s.ResolveRepository(term)
if err != nil {
return err
return nil, err
}
// *TODO: Search multiple indexes.
endpoint, err := repoInfo.GetEndpoint()
if err != nil {
return err
return nil, err
}
r, err := NewSession(authConfig, HTTPRequestFactory(metaHeaders), endpoint, true)
r, err := NewSession(authConfig, HTTPRequestFactory(headers), endpoint, true)
if err != nil {
return err
return nil, err
}
results, err := r.SearchRepositories(repoInfo.GetSearchTerm())
if err != nil {
return err
}
outs := engine.NewTable("star_count", 0)
for _, result := range results.Results {
out := &engine.Env{}
out.Import(result)
outs.Add(out)
}
outs.ReverseSort()
if _, err := outs.WriteListTo(job.Stdout); err != nil {
return err
}
return nil
return r.SearchRepositories(repoInfo.GetSearchTerm())
}
// ResolveRepository splits a repository name into its components
// and configuration of the associated registry.
func (s *Service) ResolveRepository(job *engine.Job) error {
var (
reposName = job.Args[0]
)
repoInfo, err := s.Config.NewRepositoryInfo(reposName)
if err != nil {
return err
}
out := engine.Env{}
err = out.SetJson("repository", repoInfo)
if err != nil {
return err
}
out.WriteTo(job.Stdout)
return nil
}
// Convenience wrapper for calling resolve_repository Job from a running job.
func ResolveRepositoryInfo(jobContext *engine.Job, reposName string) (*RepositoryInfo, error) {
job := jobContext.Eng.Job("resolve_repository", reposName)
env, err := job.Stdout.AddEnv()
if err != nil {
return nil, err
}
if err := job.Run(); err != nil {
return nil, err
}
info := RepositoryInfo{}
if err := env.GetJson("repository", &info); err != nil {
return nil, err
}
return &info, nil
func (s *Service) ResolveRepository(name string) (*RepositoryInfo, error) {
return s.Config.NewRepositoryInfo(name)
}
// ResolveIndex takes indexName and returns index info
func (s *Service) ResolveIndex(job *engine.Job) error {
var (
indexName = job.Args[0]
)
index, err := s.Config.NewIndexInfo(indexName)
if err != nil {
return err
}
out := engine.Env{}
err = out.SetJson("index", index)
if err != nil {
return err
}
out.WriteTo(job.Stdout)
return nil
}
// Convenience wrapper for calling resolve_index Job from a running job.
func ResolveIndexInfo(jobContext *engine.Job, indexName string) (*IndexInfo, error) {
job := jobContext.Eng.Job("resolve_index", indexName)
env, err := job.Stdout.AddEnv()
if err != nil {
return nil, err
}
if err := job.Run(); err != nil {
return nil, err
}
info := IndexInfo{}
if err := env.GetJson("index", &info); err != nil {
return nil, err
}
return &info, nil
}
// GetRegistryConfig returns current registry configuration.
func (s *Service) GetRegistryConfig(job *engine.Job) error {
out := engine.Env{}
err := out.SetJson("config", s.Config)
if err != nil {
return err
}
out.WriteTo(job.Stdout)
return nil
func (s *Service) ResolveIndex(name string) (*IndexInfo, error) {
return s.Config.NewIndexInfo(name)
}