Remove engine mechanism

Signed-off-by: Antonio Murdaca <me@runcom.ninja>
This commit is contained in:
Antonio Murdaca 2015-04-27 23:11:29 +02:00
parent 029cbc1004
commit 531f4122bd
20 changed files with 239 additions and 2286 deletions

View file

@ -1,13 +1,14 @@
package client
import (
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/api/types"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/signal"
@ -65,12 +66,12 @@ func (cli *DockerCli) CmdStart(args ...string) error {
return err
}
env := engine.Env{}
if err := env.Decode(stream); err != nil {
var c types.ContainerJSON
if err := json.NewDecoder(stream).Decode(&c); err != nil {
return err
}
config := env.GetSubEnv("Config")
tty = config.GetBool("Tty")
tty = c.Config.Tty
if !tty {
sigc := cli.forwardAllSignals(cmd.Arg(0))
@ -82,7 +83,7 @@ func (cli *DockerCli) CmdStart(args ...string) error {
v := url.Values{}
v.Set("stream", "1")
if *openStdin && config.GetBool("OpenStdin") {
if *openStdin && c.Config.OpenStdin {
v.Set("stdin", "1")
in = cli.in
}

View file

@ -22,7 +22,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/autogen/dockerversion"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/stdcopy"
@ -42,18 +41,8 @@ func (cli *DockerCli) HTTPClient() *http.Client {
func (cli *DockerCli) encodeData(data interface{}) (*bytes.Buffer, error) {
params := bytes.NewBuffer(nil)
if data != nil {
if env, ok := data.(engine.Env); ok {
if err := env.Encode(params); err != nil {
return nil, err
}
} else {
buf, err := json.Marshal(data)
if err != nil {
return nil, err
}
if _, err := params.Write(buf); err != nil {
return nil, err
}
if err := json.NewEncoder(params).Encode(data); err != nil {
return nil, err
}
}
return params, nil

View file

@ -1,13 +1,14 @@
package client
import (
"encoding/json"
"fmt"
"runtime"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/types"
"github.com/docker/docker/autogen/dockerversion"
"github.com/docker/docker/engine"
flag "github.com/docker/docker/pkg/mflag"
)
@ -32,28 +33,24 @@ func (cli *DockerCli) CmdVersion(args ...string) error {
}
fmt.Fprintf(cli.out, "OS/Arch (client): %s/%s\n", runtime.GOOS, runtime.GOARCH)
body, _, err := readBody(cli.call("GET", "/version", nil, nil))
stream, _, err := cli.call("GET", "/version", nil, nil)
if err != nil {
return err
}
out := engine.NewOutput()
remoteVersion, err := out.AddEnv()
if err != nil {
var v types.Version
if err := json.NewDecoder(stream).Decode(&v); err != nil {
logrus.Errorf("Error reading remote version: %s", err)
return err
}
if _, err := out.Write(body); err != nil {
logrus.Errorf("Error reading remote version: %s", err)
return err
fmt.Fprintf(cli.out, "Server version: %s\n", v.Version)
if v.ApiVersion != "" {
fmt.Fprintf(cli.out, "Server API version: %s\n", v.ApiVersion)
}
out.Close()
fmt.Fprintf(cli.out, "Server version: %s\n", remoteVersion.Get("Version"))
if apiVersion := remoteVersion.Get("ApiVersion"); apiVersion != "" {
fmt.Fprintf(cli.out, "Server API version: %s\n", apiVersion)
}
fmt.Fprintf(cli.out, "Go version (server): %s\n", remoteVersion.Get("GoVersion"))
fmt.Fprintf(cli.out, "Git commit (server): %s\n", remoteVersion.Get("GitCommit"))
fmt.Fprintf(cli.out, "OS/Arch (server): %s/%s\n", remoteVersion.Get("Os"), remoteVersion.Get("Arch"))
fmt.Fprintf(cli.out, "Go version (server): %s\n", v.GoVersion)
fmt.Fprintf(cli.out, "Git commit (server): %s\n", v.GitCommit)
fmt.Fprintf(cli.out, "OS/Arch (server): %s/%s\n", v.Os, v.Arch)
return nil
}

View file

@ -1,9 +1,6 @@
package server
import (
"runtime"
"time"
"encoding/base64"
"encoding/json"
"fmt"
@ -11,8 +8,10 @@ import (
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
"code.google.com/p/go.net/websocket"
"github.com/gorilla/mux"
@ -25,7 +24,6 @@ import (
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/engine"
"github.com/docker/docker/graph"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers"
@ -53,26 +51,31 @@ type ServerConfig struct {
}
type Server struct {
daemon *daemon.Daemon
cfg *ServerConfig
router *mux.Router
start chan struct{}
// TODO: delete engine
eng *engine.Engine
daemon *daemon.Daemon
cfg *ServerConfig
router *mux.Router
start chan struct{}
servers []serverCloser
}
func New(cfg *ServerConfig, eng *engine.Engine) *Server {
func New(cfg *ServerConfig) *Server {
srv := &Server{
cfg: cfg,
start: make(chan struct{}),
eng: eng,
}
r := createRouter(srv, eng)
r := createRouter(srv)
srv.router = r
return srv
}
func (s *Server) Close() {
for _, srv := range s.servers {
if err := srv.Close(); err != nil {
logrus.Error(err)
}
}
}
func (s *Server) SetDaemon(d *daemon.Daemon) {
s.daemon = d
}
@ -92,19 +95,15 @@ func (s *Server) ServeApi(protoAddrs []string) error {
if len(protoAddrParts) != 2 {
return fmt.Errorf("bad format, expected PROTO://ADDR")
}
srv, err := s.newServer(protoAddrParts[0], protoAddrParts[1])
if err != nil {
return err
}
s.servers = append(s.servers, srv)
go func(proto, addr string) {
logrus.Infof("Listening for HTTP on %s (%s)", proto, addr)
srv, err := s.newServer(proto, addr)
if err != nil {
chErrors <- err
return
}
s.eng.OnShutdown(func() {
if err := srv.Close(); err != nil {
logrus.Error(err)
}
})
if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
if err := srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
err = nil
}
chErrors <- err
@ -133,7 +132,7 @@ func (s *HttpServer) Close() error {
return s.l.Close()
}
type HttpApiFunc func(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error
type HttpApiFunc func(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error
func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) {
conn, _, err := w.(http.Hijacker).Hijack()
@ -230,16 +229,7 @@ func writeJSON(w http.ResponseWriter, code int, v interface{}) error {
return json.NewEncoder(w).Encode(v)
}
func streamJSON(out *engine.Output, w http.ResponseWriter, flush bool) {
w.Header().Set("Content-Type", "application/json")
if flush {
out.Add(utils.NewWriteFlusher(w))
} else {
out.Add(w)
}
}
func (s *Server) postAuth(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postAuth(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var config *cliconfig.AuthConfig
err := json.NewDecoder(r.Body).Decode(&config)
r.Body.Close()
@ -255,7 +245,7 @@ func (s *Server) postAuth(eng *engine.Engine, version version.Version, w http.Re
})
}
func (s *Server) getVersion(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getVersion(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
w.Header().Set("Content-Type", "application/json")
v := &types.Version{
@ -273,7 +263,7 @@ func (s *Server) getVersion(eng *engine.Engine, version version.Version, w http.
return writeJSON(w, http.StatusOK, v)
}
func (s *Server) postContainersKill(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersKill(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -308,7 +298,7 @@ func (s *Server) postContainersKill(eng *engine.Engine, version version.Version,
return nil
}
func (s *Server) postContainersPause(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersPause(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -332,7 +322,7 @@ func (s *Server) postContainersPause(eng *engine.Engine, version version.Version
return nil
}
func (s *Server) postContainersUnpause(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersUnpause(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -356,7 +346,7 @@ func (s *Server) postContainersUnpause(eng *engine.Engine, version version.Versi
return nil
}
func (s *Server) getContainersExport(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersExport(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -364,7 +354,7 @@ func (s *Server) getContainersExport(eng *engine.Engine, version version.Version
return s.daemon.ContainerExport(vars["name"], w)
}
func (s *Server) getImagesJSON(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getImagesJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -405,7 +395,7 @@ func (s *Server) getImagesJSON(eng *engine.Engine, version version.Version, w ht
return writeJSON(w, http.StatusOK, legacyImages)
}
func (s *Server) getInfo(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getInfo(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
w.Header().Set("Content-Type", "application/json")
info, err := s.daemon.SystemInfo()
@ -416,7 +406,7 @@ func (s *Server) getInfo(eng *engine.Engine, version version.Version, w http.Res
return writeJSON(w, http.StatusOK, info)
}
func (s *Server) getEvents(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getEvents(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -520,7 +510,7 @@ func (s *Server) getEvents(eng *engine.Engine, version version.Version, w http.R
}
}
func (s *Server) getImagesHistory(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getImagesHistory(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -534,7 +524,7 @@ func (s *Server) getImagesHistory(eng *engine.Engine, version version.Version, w
return writeJSON(w, http.StatusOK, history)
}
func (s *Server) getContainersChanges(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersChanges(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -553,7 +543,7 @@ func (s *Server) getContainersChanges(eng *engine.Engine, version version.Versio
return writeJSON(w, http.StatusOK, changes)
}
func (s *Server) getContainersTop(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersTop(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if version.LessThan("1.4") {
return fmt.Errorf("top was improved a lot since 1.3, Please upgrade your docker client.")
}
@ -574,7 +564,7 @@ func (s *Server) getContainersTop(eng *engine.Engine, version version.Version, w
return writeJSON(w, http.StatusOK, procList)
}
func (s *Server) getContainersJSON(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -603,7 +593,7 @@ func (s *Server) getContainersJSON(eng *engine.Engine, version version.Version,
return writeJSON(w, http.StatusOK, containers)
}
func (s *Server) getContainersStats(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersStats(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -614,7 +604,7 @@ func (s *Server) getContainersStats(eng *engine.Engine, version version.Version,
return s.daemon.ContainerStats(vars["name"], utils.NewWriteFlusher(w))
}
func (s *Server) getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -644,7 +634,7 @@ func (s *Server) getContainersLogs(eng *engine.Engine, version version.Version,
return nil
}
func (s *Server) postImagesTag(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postImagesTag(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -662,7 +652,7 @@ func (s *Server) postImagesTag(eng *engine.Engine, version version.Version, w ht
return nil
}
func (s *Server) postCommit(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postCommit(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -708,7 +698,7 @@ func (s *Server) postCommit(eng *engine.Engine, version version.Version, w http.
}
// Creates an image from Pull or from Import
func (s *Server) postImagesCreate(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postImagesCreate(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -788,7 +778,7 @@ func (s *Server) postImagesCreate(eng *engine.Engine, version version.Version, w
return nil
}
func (s *Server) getImagesSearch(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getImagesSearch(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -818,7 +808,7 @@ func (s *Server) getImagesSearch(eng *engine.Engine, version version.Version, w
return json.NewEncoder(w).Encode(query.Results)
}
func (s *Server) postImagesPush(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -875,7 +865,7 @@ func (s *Server) postImagesPush(eng *engine.Engine, version version.Version, w h
}
func (s *Server) getImagesGet(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getImagesGet(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -907,11 +897,11 @@ func (s *Server) getImagesGet(eng *engine.Engine, version version.Version, w htt
}
func (s *Server) postImagesLoad(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postImagesLoad(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
return s.daemon.Repositories().Load(r.Body, w)
}
func (s *Server) postContainersCreate(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersCreate(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return nil
}
@ -939,7 +929,7 @@ func (s *Server) postContainersCreate(eng *engine.Engine, version version.Versio
})
}
func (s *Server) postContainersRestart(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersRestart(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -961,7 +951,7 @@ func (s *Server) postContainersRestart(eng *engine.Engine, version version.Versi
return nil
}
func (s *Server) postContainerRename(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainerRename(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -978,7 +968,7 @@ func (s *Server) postContainerRename(eng *engine.Engine, version version.Version
return nil
}
func (s *Server) deleteContainers(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) deleteContainers(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1006,7 +996,7 @@ func (s *Server) deleteContainers(eng *engine.Engine, version version.Version, w
return nil
}
func (s *Server) deleteImages(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) deleteImages(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1026,7 +1016,7 @@ func (s *Server) deleteImages(eng *engine.Engine, version version.Version, w htt
return writeJSON(w, http.StatusOK, list)
}
func (s *Server) postContainersStart(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersStart(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -1062,7 +1052,7 @@ func (s *Server) postContainersStart(eng *engine.Engine, version version.Version
return nil
}
func (s *Server) postContainersStop(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersStop(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1087,7 +1077,7 @@ func (s *Server) postContainersStop(eng *engine.Engine, version version.Version,
return nil
}
func (s *Server) postContainersWait(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersWait(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -1105,7 +1095,7 @@ func (s *Server) postContainersWait(eng *engine.Engine, version version.Version,
})
}
func (s *Server) postContainersResize(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersResize(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1130,7 +1120,7 @@ func (s *Server) postContainersResize(eng *engine.Engine, version version.Versio
return cont.Resize(height, width)
}
func (s *Server) postContainersAttach(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersAttach(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1185,7 +1175,7 @@ func (s *Server) postContainersAttach(eng *engine.Engine, version version.Versio
return nil
}
func (s *Server) wsContainersAttach(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) wsContainersAttach(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1211,7 +1201,7 @@ func (s *Server) wsContainersAttach(eng *engine.Engine, version version.Version,
return nil
}
func (s *Server) getContainersByName(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getContainersByName(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -1232,7 +1222,7 @@ func (s *Server) getContainersByName(eng *engine.Engine, version version.Version
return writeJSON(w, http.StatusOK, containerJSON)
}
func (s *Server) getExecByID(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getExecByID(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter 'id'")
}
@ -1245,7 +1235,7 @@ func (s *Server) getExecByID(eng *engine.Engine, version version.Version, w http
return writeJSON(w, http.StatusOK, eConfig)
}
func (s *Server) getImagesByName(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) getImagesByName(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -1268,7 +1258,7 @@ func (s *Server) getImagesByName(eng *engine.Engine, version version.Version, w
return writeJSON(w, http.StatusOK, imageInspect)
}
func (s *Server) postBuild(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postBuild(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if version.LessThan("1.3") {
return fmt.Errorf("Multipart upload for build is no longer supported. Please upgrade your docker client.")
}
@ -1363,7 +1353,7 @@ func (s *Server) postBuild(eng *engine.Engine, version version.Version, w http.R
return nil
}
func (s *Server) postContainersCopy(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainersCopy(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -1413,7 +1403,7 @@ func (s *Server) postContainersCopy(eng *engine.Engine, version version.Version,
return nil
}
func (s *Server) postContainerExecCreate(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainerExecCreate(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return nil
}
@ -1442,7 +1432,7 @@ func (s *Server) postContainerExecCreate(eng *engine.Engine, version version.Ver
}
// TODO(vishh): Refactor the code to avoid having to specify stream config as part of both create and start.
func (s *Server) postContainerExecStart(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainerExecStart(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return nil
}
@ -1495,7 +1485,7 @@ func (s *Server) postContainerExecStart(eng *engine.Engine, version version.Vers
return nil
}
func (s *Server) postContainerExecResize(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) postContainerExecResize(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := parseForm(r); err != nil {
return err
}
@ -1515,7 +1505,7 @@ func (s *Server) postContainerExecResize(eng *engine.Engine, version version.Ver
return s.daemon.ContainerExecResize(vars["name"], height, width)
}
func (s *Server) optionsHandler(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) optionsHandler(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
w.WriteHeader(http.StatusOK)
return nil
}
@ -1526,12 +1516,12 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request, corsHeaders string
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
}
func (s *Server) ping(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func (s *Server) ping(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
_, err := w.Write([]byte{'O', 'K'})
return err
}
func makeHttpHandler(eng *engine.Engine, logging bool, localMethod string, localRoute string, handlerFunc HttpApiFunc, corsHeaders string, dockerVersion version.Version) http.HandlerFunc {
func makeHttpHandler(logging bool, localMethod string, localRoute string, handlerFunc HttpApiFunc, corsHeaders string, dockerVersion version.Version) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// log the request
logrus.Debugf("Calling %s %s", localMethod, localRoute)
@ -1559,7 +1549,7 @@ func makeHttpHandler(eng *engine.Engine, logging bool, localMethod string, local
return
}
if err := handlerFunc(eng, version, w, r, mux.Vars(r)); err != nil {
if err := handlerFunc(version, w, r, mux.Vars(r)); err != nil {
logrus.Errorf("Handler for %s %s returned error: %s", localMethod, localRoute, err)
httpError(w, err)
}
@ -1567,7 +1557,7 @@ func makeHttpHandler(eng *engine.Engine, logging bool, localMethod string, local
}
// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(s *Server, eng *engine.Engine) *mux.Router {
func createRouter(s *Server) *mux.Router {
r := mux.NewRouter()
if os.Getenv("DEBUG") != "" {
ProfilerSetup(r, "/debug/")
@ -1644,7 +1634,7 @@ func createRouter(s *Server, eng *engine.Engine) *mux.Router {
localMethod := method
// build the handler function
f := makeHttpHandler(eng, s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
// add the new route
if localRoute == "" {
@ -1659,23 +1649,6 @@ func createRouter(s *Server, eng *engine.Engine) *mux.Router {
return r
}
// ServeRequest processes a single http request to the docker remote api.
// FIXME: refactor this to be part of Server and not require re-creating a new
// router each time. This requires first moving ListenAndServe into Server.
func ServeRequest(eng *engine.Engine, apiversion version.Version, w http.ResponseWriter, req *http.Request) {
cfg := &ServerConfig{
EnableCors: true,
Version: string(apiversion),
}
api := New(cfg, eng)
daemon, _ := eng.HackGetGlobalVar("httpapi.daemon").(*daemon.Daemon)
api.AcceptConnections(daemon)
router := createRouter(api, eng)
// Insert APIVERSION into the request as a convenience
req.URL.Path = fmt.Sprintf("/v%s%s", apiversion, req.URL.Path)
router.ServeHTTP(w, req)
}
func allocateDaemonPort(addr string) error {
host, port, err := net.SplitHostPort(addr)
if err != nil {

View file

@ -26,7 +26,6 @@ import (
"github.com/docker/docker/daemon/logger/syslog"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/links"
"github.com/docker/docker/nat"
@ -600,10 +599,7 @@ func (container *Container) AllocateNetwork() error {
return nil
}
var (
err error
eng = container.daemon.eng
)
var err error
networkSettings, err := bridge.Allocate(container.ID, container.Config.MacAddress, "", "")
if err != nil {
@ -650,7 +646,7 @@ func (container *Container) AllocateNetwork() error {
container.NetworkSettings.PortMapping = nil
for port := range portSpecs {
if err = container.allocatePort(eng, port, bindings); err != nil {
if err = container.allocatePort(port, bindings); err != nil {
bridge.Release(container.ID)
return err
}
@ -686,8 +682,6 @@ func (container *Container) RestoreNetwork() error {
return nil
}
eng := container.daemon.eng
// Re-allocate the interface with the same IP and MAC address.
if _, err := bridge.Allocate(container.ID, container.NetworkSettings.MacAddress, container.NetworkSettings.IPAddress, ""); err != nil {
return err
@ -695,7 +689,7 @@ func (container *Container) RestoreNetwork() error {
// Re-allocate any previously allocated ports.
for port := range container.NetworkSettings.Ports {
if err := container.allocatePort(eng, port, container.NetworkSettings.Ports); err != nil {
if err := container.allocatePort(port, container.NetworkSettings.Ports); err != nil {
return err
}
}
@ -1483,7 +1477,7 @@ func (container *Container) waitForStart() error {
return nil
}
func (container *Container) allocatePort(eng *engine.Engine, port nat.Port, bindings nat.PortMap) error {
func (container *Container) allocatePort(port nat.Port, bindings nat.PortMap) error {
binding := bindings[port]
if container.hostConfig.PublishAllPorts && len(binding) == 0 {
binding = append(binding, nat.PortBinding{})

View file

@ -27,7 +27,6 @@ import (
_ "github.com/docker/docker/daemon/graphdriver/vfs"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/engine"
"github.com/docker/docker/graph"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/archive"
@ -38,7 +37,6 @@ import (
"github.com/docker/docker/pkg/namesgenerator"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/pkg/pidfile"
"github.com/docker/docker/pkg/resolvconf"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/sysinfo"
@ -103,7 +101,6 @@ type Daemon struct {
idIndex *truncindex.TruncIndex
sysInfo *sysinfo.SysInfo
volumes *volumes.Repository
eng *engine.Engine
config *Config
containerGraph *graphdb.Database
driver graphdriver.Driver
@ -114,14 +111,6 @@ type Daemon struct {
EventsService *events.Events
}
// Install installs daemon capabilities to eng.
func (daemon *Daemon) Install(eng *engine.Engine) error {
// FIXME: this hack is necessary for legacy integration tests to access
// the daemon object.
eng.HackSetGlobalVar("httpapi.daemon", daemon)
return nil
}
// Get looks for a container using the provided information, which could be
// one of the following inputs from the caller:
// - A full container ID, which will exact match a container in daemon's list
@ -741,16 +730,7 @@ func (daemon *Daemon) RegisterLinks(container *Container, hostConfig *runconfig.
return nil
}
// FIXME: harmonize with NewGraph()
func NewDaemon(config *Config, eng *engine.Engine, registryService *registry.Service) (*Daemon, error) {
daemon, err := NewDaemonFromDirectory(config, eng, registryService)
if err != nil {
return nil, err
}
return daemon, nil
}
func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService *registry.Service) (*Daemon, error) {
func NewDaemon(config *Config, registryService *registry.Service) (daemon *Daemon, err error) {
if config.Mtu == 0 {
config.Mtu = getDefaultNetworkMtu()
}
@ -766,19 +746,6 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
}
config.DisableNetwork = config.Bridge.Iface == disableNetworkBridge
// Claim the pidfile first, to avoid any and all unexpected race conditions.
// Some of the init doesn't need a pidfile lock - but let's not try to be smart.
if config.Pidfile != "" {
file, err := pidfile.New(config.Pidfile)
if err != nil {
return nil, err
}
eng.OnShutdown(func() {
// Always release the pidfile last, just in case
file.Remove()
})
}
// Check that the system is supported and we have sufficient privileges
if runtime.GOOS != "linux" {
return nil, fmt.Errorf("The Docker daemon is only supported on linux")
@ -826,17 +793,22 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
return nil, fmt.Errorf("error initializing graphdriver: %v", err)
}
logrus.Debugf("Using graph driver %s", driver)
// register cleanup for graph driver
eng.OnShutdown(func() {
if err := driver.Cleanup(); err != nil {
logrus.Errorf("Error during graph storage driver.Cleanup(): %v", err)
d := &Daemon{}
d.driver = driver
defer func() {
if err != nil {
if err := d.Shutdown(); err != nil {
logrus.Error(err)
}
}
})
}()
if config.EnableSelinuxSupport {
if selinuxEnabled() {
// As Docker on btrfs and SELinux are incompatible at present, error on both being enabled
if driver.String() == "btrfs" {
if d.driver.String() == "btrfs" {
return nil, fmt.Errorf("SELinux is not supported with the BTRFS graph driver")
}
logrus.Debug("SELinux enabled successfully")
@ -854,12 +826,12 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
}
// Migrate the container if it is aufs and aufs is enabled
if err = migrateIfAufs(driver, config.Root); err != nil {
if err := migrateIfAufs(d.driver, config.Root); err != nil {
return nil, err
}
logrus.Debug("Creating images graph")
g, err := graph.NewGraph(path.Join(config.Root, "graph"), driver)
g, err := graph.NewGraph(path.Join(config.Root, "graph"), d.driver)
if err != nil {
return nil, err
}
@ -897,7 +869,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
Events: eventsService,
Trust: trustService,
}
repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), tagCfg)
repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+d.driver.String()), tagCfg)
if err != nil {
return nil, fmt.Errorf("Couldn't create Tag store: %s", err)
}
@ -913,12 +885,8 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
if err != nil {
return nil, err
}
// register graph close on shutdown
eng.OnShutdown(func() {
if err := graph.Close(); err != nil {
logrus.Errorf("Error during container graph.Close(): %v", err)
}
})
d.containerGraph = graph
localCopy := path.Join(config.Root, "init", fmt.Sprintf("dockerinit-%s", dockerversion.VERSION))
sysInitPath := utils.DockerInitPath(localCopy)
@ -947,66 +915,67 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
return nil, err
}
daemon := &Daemon{
ID: trustKey.PublicKey().KeyID(),
repository: daemonRepo,
containers: &contStore{s: make(map[string]*Container)},
execCommands: newExecStore(),
graph: g,
repositories: repositories,
idIndex: truncindex.NewTruncIndex([]string{}),
sysInfo: sysInfo,
volumes: volumes,
config: config,
containerGraph: graph,
driver: driver,
sysInitPath: sysInitPath,
execDriver: ed,
eng: eng,
statsCollector: newStatsCollector(1 * time.Second),
defaultLogConfig: config.LogConfig,
RegistryService: registryService,
EventsService: eventsService,
}
d.ID = trustKey.PublicKey().KeyID()
d.repository = daemonRepo
d.containers = &contStore{s: make(map[string]*Container)}
d.execCommands = newExecStore()
d.graph = g
d.repositories = repositories
d.idIndex = truncindex.NewTruncIndex([]string{})
d.sysInfo = sysInfo
d.volumes = volumes
d.config = config
d.sysInitPath = sysInitPath
d.execDriver = ed
d.statsCollector = newStatsCollector(1 * time.Second)
d.defaultLogConfig = config.LogConfig
d.RegistryService = registryService
d.EventsService = eventsService
eng.OnShutdown(func() {
if err := daemon.shutdown(); err != nil {
logrus.Errorf("Error during daemon.shutdown(): %v", err)
}
})
if err := daemon.restore(); err != nil {
if err := d.restore(); err != nil {
return nil, err
}
// set up filesystem watch on resolv.conf for network changes
if err := daemon.setupResolvconfWatcher(); err != nil {
if err := d.setupResolvconfWatcher(); err != nil {
return nil, err
}
return daemon, nil
return d, nil
}
func (daemon *Daemon) shutdown() error {
group := sync.WaitGroup{}
logrus.Debug("starting clean shutdown of all containers...")
for _, container := range daemon.List() {
c := container
if c.IsRunning() {
logrus.Debugf("stopping %s", c.ID)
group.Add(1)
go func() {
defer group.Done()
if err := c.KillSig(15); err != nil {
logrus.Debugf("kill 15 error for %s - %s", c.ID, err)
}
c.WaitStop(-1 * time.Second)
logrus.Debugf("container stopped %s", c.ID)
}()
func (daemon *Daemon) Shutdown() error {
if daemon.containerGraph != nil {
if err := daemon.containerGraph.Close(); err != nil {
logrus.Errorf("Error during container graph.Close(): %v", err)
}
}
group.Wait()
if daemon.driver != nil {
if err := daemon.driver.Cleanup(); err != nil {
logrus.Errorf("Error during graph storage driver.Cleanup(): %v", err)
}
}
if daemon.containers != nil {
group := sync.WaitGroup{}
logrus.Debug("starting clean shutdown of all containers...")
for _, container := range daemon.List() {
c := container
if c.IsRunning() {
logrus.Debugf("stopping %s", c.ID)
group.Add(1)
go func() {
defer group.Done()
if err := c.KillSig(15); err != nil {
logrus.Debugf("kill 15 error for %s - %s", c.ID, err)
}
c.WaitStop(-1 * time.Second)
logrus.Debugf("container stopped %s", c.ID)
}()
}
}
group.Wait()
}
return nil
}
@ -1087,26 +1056,6 @@ func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan interface
return nil
}
// Nuke kills all containers then removes all content
// from the content root, including images, volumes and
// container filesystems.
// Again: this will remove your entire docker daemon!
// FIXME: this is deprecated, and only used in legacy
// tests. Please remove.
func (daemon *Daemon) Nuke() error {
var wg sync.WaitGroup
for _, container := range daemon.List() {
wg.Add(1)
go func(c *Container) {
c.Kill()
wg.Done()
}(container)
}
wg.Wait()
return os.RemoveAll(daemon.config.Root)
}
// FIXME: this is a convenience function for integration tests
// which need direct access to daemon.graph.
// Once the tests switch to using engine and jobs, this method

View file

@ -7,6 +7,7 @@ import (
"io"
"os"
"path/filepath"
"time"
"github.com/Sirupsen/logrus"
apiserver "github.com/docker/docker/api/server"
@ -14,9 +15,9 @@ import (
"github.com/docker/docker/daemon"
_ "github.com/docker/docker/daemon/execdriver/lxc"
_ "github.com/docker/docker/daemon/execdriver/native"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/homedir"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/docker/pkg/pidfile"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/pkg/timeutils"
@ -83,14 +84,45 @@ func mainDaemon() {
logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: timeutils.RFC3339NanoFixed})
eng := engine.New()
signal.Trap(eng.Shutdown)
var pfile *pidfile.PidFile
if daemonCfg.Pidfile != "" {
pf, err := pidfile.New(daemonCfg.Pidfile)
if err != nil {
logrus.Fatalf("Error starting daemon: %v", err)
}
pfile = pf
defer func() {
if err := pfile.Remove(); err != nil {
logrus.Error(err)
}
}()
}
if err := migrateKey(); err != nil {
logrus.Fatal(err)
}
daemonCfg.TrustKeyPath = *flTrustKey
registryService := registry.NewService(registryCfg)
d, err := daemon.NewDaemon(daemonCfg, registryService)
if err != nil {
if pfile != nil {
if err := pfile.Remove(); err != nil {
logrus.Error(err)
}
}
logrus.Fatalf("Error starting daemon: %v", err)
}
logrus.Info("Daemon has completed initialization")
logrus.WithFields(logrus.Fields{
"version": dockerversion.VERSION,
"commit": dockerversion.GITCOMMIT,
"execdriver": d.ExecutionDriver().Name(),
"graphdriver": d.GraphDriver().String(),
}).Info("Docker daemon")
serverConfig := &apiserver.ServerConfig{
Logging: true,
EnableCors: daemonCfg.EnableCors,
@ -104,7 +136,7 @@ func mainDaemon() {
TlsKey: *flKey,
}
api := apiserver.New(serverConfig, eng)
api := apiserver.New(serverConfig)
// The serve API routine never exits unless an error occurs
// We need to start it as a goroutine and wait on it so
@ -119,40 +151,52 @@ func mainDaemon() {
serveAPIWait <- nil
}()
registryService := registry.NewService(registryCfg)
d, err := daemon.NewDaemon(daemonCfg, eng, registryService)
if err != nil {
eng.Shutdown()
logrus.Fatalf("Error starting daemon: %v", err)
}
if err := d.Install(eng); err != nil {
eng.Shutdown()
logrus.Fatalf("Error starting daemon: %v", err)
}
logrus.Info("Daemon has completed initialization")
logrus.WithFields(logrus.Fields{
"version": dockerversion.VERSION,
"commit": dockerversion.GITCOMMIT,
"execdriver": d.ExecutionDriver().Name(),
"graphdriver": d.GraphDriver().String(),
}).Info("Docker daemon")
signal.Trap(func() {
api.Close()
<-serveAPIWait
shutdownDaemon(d, 15)
if pfile != nil {
if err := pfile.Remove(); err != nil {
logrus.Error(err)
}
}
})
// after the daemon is done setting up we can tell the api to start
// accepting connections with specified daemon
api.AcceptConnections(d)
// Daemon is fully initialized and handling API traffic
// Wait for serve API job to complete
// Wait for serve API to complete
errAPI := <-serveAPIWait
eng.Shutdown()
shutdownDaemon(d, 15)
if errAPI != nil {
if pfile != nil {
if err := pfile.Remove(); err != nil {
logrus.Error(err)
}
}
logrus.Fatalf("Shutting down due to ServeAPI error: %v", errAPI)
}
}
// shutdownDaemon just wraps daemon.Shutdown() to handle a timeout in case
// d.Shutdown() is waiting too long to kill container or worst it's
// blocked there
func shutdownDaemon(d *daemon.Daemon, timeout time.Duration) {
ch := make(chan struct{})
go func() {
d.Shutdown()
close(ch)
}()
select {
case <-ch:
logrus.Debug("Clean shutdown succeded")
case <-time.After(timeout * time.Second):
logrus.Error("Force shutdown daemon")
}
}
// currentUserIsOwner checks whether the current user is the owner of the given
// file.
func currentUserIsOwner(f string) bool {

View file

@ -1,255 +0,0 @@
package engine
import (
"bufio"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stringid"
)
// Installer is a standard interface for objects which can "install" themselves
// on an engine by registering handlers.
// This can be used as an entrypoint for external plugins etc.
type Installer interface {
Install(*Engine) error
}
type Handler func(*Job) error
var globalHandlers map[string]Handler
func init() {
globalHandlers = make(map[string]Handler)
}
func Register(name string, handler Handler) error {
_, exists := globalHandlers[name]
if exists {
return fmt.Errorf("Can't overwrite global handler for command %s", name)
}
globalHandlers[name] = handler
return nil
}
func unregister(name string) {
delete(globalHandlers, name)
}
// The Engine is the core of Docker.
// It acts as a store for *containers*, and allows manipulation of these
// containers by executing *jobs*.
type Engine struct {
handlers map[string]Handler
catchall Handler
hack Hack // data for temporary hackery (see hack.go)
id string
Stdout io.Writer
Stderr io.Writer
Stdin io.Reader
Logging bool
tasks sync.WaitGroup
l sync.RWMutex // lock for shutdown
shutdownWait sync.WaitGroup
shutdown bool
onShutdown []func() // shutdown handlers
}
func (eng *Engine) Register(name string, handler Handler) error {
_, exists := eng.handlers[name]
if exists {
return fmt.Errorf("Can't overwrite handler for command %s", name)
}
eng.handlers[name] = handler
return nil
}
func (eng *Engine) RegisterCatchall(catchall Handler) {
eng.catchall = catchall
}
// New initializes a new engine.
func New() *Engine {
eng := &Engine{
handlers: make(map[string]Handler),
id: stringid.GenerateRandomID(),
Stdout: os.Stdout,
Stderr: os.Stderr,
Stdin: os.Stdin,
Logging: true,
}
eng.Register("commands", func(job *Job) error {
for _, name := range eng.commands() {
job.Printf("%s\n", name)
}
return nil
})
// Copy existing global handlers
for k, v := range globalHandlers {
eng.handlers[k] = v
}
return eng
}
func (eng *Engine) String() string {
return fmt.Sprintf("%s", eng.id[:8])
}
// Commands returns a list of all currently registered commands,
// sorted alphabetically.
func (eng *Engine) commands() []string {
names := make([]string, 0, len(eng.handlers))
for name := range eng.handlers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// Job creates a new job which can later be executed.
// This function mimics `Command` from the standard os/exec package.
func (eng *Engine) Job(name string, args ...string) *Job {
job := &Job{
Eng: eng,
Name: name,
Args: args,
Stdin: NewInput(),
Stdout: NewOutput(),
Stderr: NewOutput(),
env: &Env{},
closeIO: true,
cancelled: make(chan struct{}),
}
if eng.Logging {
job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
}
// Catchall is shadowed by specific Register.
if handler, exists := eng.handlers[name]; exists {
job.handler = handler
} else if eng.catchall != nil && name != "" {
// empty job names are illegal, catchall or not.
job.handler = eng.catchall
}
return job
}
// OnShutdown registers a new callback to be called by Shutdown.
// This is typically used by services to perform cleanup.
func (eng *Engine) OnShutdown(h func()) {
eng.l.Lock()
eng.onShutdown = append(eng.onShutdown, h)
eng.shutdownWait.Add(1)
eng.l.Unlock()
}
// Shutdown permanently shuts down eng as follows:
// - It refuses all new jobs, permanently.
// - It waits for all active jobs to complete (with no timeout)
// - It calls all shutdown handlers concurrently (if any)
// - It returns when all handlers complete, or after 15 seconds,
// whichever happens first.
func (eng *Engine) Shutdown() {
eng.l.Lock()
if eng.shutdown {
eng.l.Unlock()
eng.shutdownWait.Wait()
return
}
eng.shutdown = true
eng.l.Unlock()
// We don't need to protect the rest with a lock, to allow
// for other calls to immediately fail with "shutdown" instead
// of hanging for 15 seconds.
// This requires all concurrent calls to check for shutdown, otherwise
// it might cause a race.
// Wait for all jobs to complete.
// Timeout after 5 seconds.
tasksDone := make(chan struct{})
go func() {
eng.tasks.Wait()
close(tasksDone)
}()
select {
case <-time.After(time.Second * 5):
case <-tasksDone:
}
// Call shutdown handlers, if any.
// Timeout after 10 seconds.
for _, h := range eng.onShutdown {
go func(h func()) {
h()
eng.shutdownWait.Done()
}(h)
}
done := make(chan struct{})
go func() {
eng.shutdownWait.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 10):
case <-done:
}
return
}
// IsShutdown returns true if the engine is in the process
// of shutting down, or already shut down.
// Otherwise it returns false.
func (eng *Engine) IsShutdown() bool {
eng.l.RLock()
defer eng.l.RUnlock()
return eng.shutdown
}
// ParseJob creates a new job from a text description using a shell-like syntax.
//
// The following syntax is used to parse `input`:
//
// * Words are separated using standard whitespaces as separators.
// * Quotes and backslashes are not interpreted.
// * Words of the form 'KEY=[VALUE]' are added to the job environment.
// * All other words are added to the job arguments.
//
// For example:
//
// job, _ := eng.ParseJob("VERBOSE=1 echo hello TEST=true world")
//
// The resulting job will have:
// job.Args={"echo", "hello", "world"}
// job.Env={"VERBOSE":"1", "TEST":"true"}
//
func (eng *Engine) ParseJob(input string) (*Job, error) {
// FIXME: use a full-featured command parser
scanner := bufio.NewScanner(strings.NewReader(input))
scanner.Split(bufio.ScanWords)
var (
cmd []string
env Env
)
for scanner.Scan() {
word := scanner.Text()
kv := strings.SplitN(word, "=", 2)
if len(kv) == 2 {
env.Set(kv[0], kv[1])
} else {
cmd = append(cmd, word)
}
}
if len(cmd) == 0 {
return nil, fmt.Errorf("empty command: '%s'", input)
}
job := eng.Job(cmd[0], cmd[1:]...)
job.Env().Init(&env)
return job, nil
}

View file

@ -1,236 +0,0 @@
package engine
import (
"bytes"
"strings"
"testing"
"github.com/docker/docker/pkg/ioutils"
)
func TestRegister(t *testing.T) {
if err := Register("dummy1", nil); err != nil {
t.Fatal(err)
}
if err := Register("dummy1", nil); err == nil {
t.Fatalf("Expecting error, got none")
}
// Register is global so let's cleanup to avoid conflicts
defer unregister("dummy1")
eng := New()
//Should fail because global handlers are copied
//at the engine creation
if err := eng.Register("dummy1", nil); err == nil {
t.Fatalf("Expecting error, got none")
}
if err := eng.Register("dummy2", nil); err != nil {
t.Fatal(err)
}
if err := eng.Register("dummy2", nil); err == nil {
t.Fatalf("Expecting error, got none")
}
defer unregister("dummy2")
}
func TestJob(t *testing.T) {
eng := New()
job1 := eng.Job("dummy1", "--level=awesome")
if job1.handler != nil {
t.Fatalf("job1.handler should be empty")
}
h := func(j *Job) error {
j.Printf("%s\n", j.Name)
return nil
}
eng.Register("dummy2", h)
defer unregister("dummy2")
job2 := eng.Job("dummy2", "--level=awesome")
if job2.handler == nil {
t.Fatalf("job2.handler shouldn't be nil")
}
if job2.handler(job2) != nil {
t.Fatalf("handler dummy2 was not found in job2")
}
}
func TestEngineShutdown(t *testing.T) {
eng := New()
if eng.IsShutdown() {
t.Fatalf("Engine should not show as shutdown")
}
eng.Shutdown()
if !eng.IsShutdown() {
t.Fatalf("Engine should show as shutdown")
}
}
func TestEngineCommands(t *testing.T) {
eng := New()
handler := func(job *Job) error { return nil }
eng.Register("foo", handler)
eng.Register("bar", handler)
eng.Register("echo", handler)
eng.Register("die", handler)
var output bytes.Buffer
commands := eng.Job("commands")
commands.Stdout.Add(&output)
commands.Run()
expected := "bar\ncommands\ndie\necho\nfoo\n"
if result := output.String(); result != expected {
t.Fatalf("Unexpected output:\nExpected = %v\nResult = %v\n", expected, result)
}
}
func TestEngineString(t *testing.T) {
eng1 := New()
eng2 := New()
s1 := eng1.String()
s2 := eng2.String()
if eng1 == eng2 {
t.Fatalf("Different engines should have different names (%v == %v)", s1, s2)
}
}
func TestParseJob(t *testing.T) {
eng := New()
// Verify that the resulting job calls to the right place
var called bool
eng.Register("echo", func(job *Job) error {
called = true
return nil
})
input := "echo DEBUG=1 hello world VERBOSITY=42"
job, err := eng.ParseJob(input)
if err != nil {
t.Fatal(err)
}
if job.Name != "echo" {
t.Fatalf("Invalid job name: %v", job.Name)
}
if strings.Join(job.Args, ":::") != "hello:::world" {
t.Fatalf("Invalid job args: %v", job.Args)
}
if job.Env().Get("DEBUG") != "1" {
t.Fatalf("Invalid job env: %v", job.Env)
}
if job.Env().Get("VERBOSITY") != "42" {
t.Fatalf("Invalid job env: %v", job.Env)
}
if len(job.Env().Map()) != 2 {
t.Fatalf("Invalid job env: %v", job.Env)
}
if err := job.Run(); err != nil {
t.Fatal(err)
}
if !called {
t.Fatalf("Job was not called")
}
}
func TestCatchallEmptyName(t *testing.T) {
eng := New()
var called bool
eng.RegisterCatchall(func(job *Job) error {
called = true
return nil
})
err := eng.Job("").Run()
if err == nil {
t.Fatalf("Engine.Job(\"\").Run() should return an error")
}
if called {
t.Fatalf("Engine.Job(\"\").Run() should return an error")
}
}
// Ensure that a job within a job both using the same underlying standard
// output writer does not close the output of the outer job when the inner
// job's stdout is wrapped with a NopCloser. When not wrapped, it should
// close the outer job's output.
func TestNestedJobSharedOutput(t *testing.T) {
var (
outerHandler Handler
innerHandler Handler
wrapOutput bool
)
outerHandler = func(job *Job) error {
job.Stdout.Write([]byte("outer1"))
innerJob := job.Eng.Job("innerJob")
if wrapOutput {
innerJob.Stdout.Add(ioutils.NopWriteCloser(job.Stdout))
} else {
innerJob.Stdout.Add(job.Stdout)
}
if err := innerJob.Run(); err != nil {
t.Fatal(err)
}
// If wrapOutput was *false* this write will do nothing.
// FIXME (jlhawn): It should cause an error to write to
// closed output.
job.Stdout.Write([]byte(" outer2"))
return nil
}
innerHandler = func(job *Job) error {
job.Stdout.Write([]byte(" inner"))
return nil
}
eng := New()
eng.Register("outerJob", outerHandler)
eng.Register("innerJob", innerHandler)
// wrapOutput starts *false* so the expected
// output of running the outer job will be:
//
// "outer1 inner"
//
outBuf := new(bytes.Buffer)
outerJob := eng.Job("outerJob")
outerJob.Stdout.Add(outBuf)
if err := outerJob.Run(); err != nil {
t.Fatal(err)
}
expectedOutput := "outer1 inner"
if outBuf.String() != expectedOutput {
t.Fatalf("expected job output to be %q, got %q", expectedOutput, outBuf.String())
}
// Set wrapOutput to true so that the expected
// output of running the outer job will be:
//
// "outer1 inner outer2"
//
wrapOutput = true
outBuf.Reset()
outerJob = eng.Job("outerJob")
outerJob.Stdout.Add(outBuf)
if err := outerJob.Run(); err != nil {
t.Fatal(err)
}
expectedOutput = "outer1 inner outer2"
if outBuf.String() != expectedOutput {
t.Fatalf("expected job output to be %q, got %q", expectedOutput, outBuf.String())
}
}

View file

@ -1,313 +0,0 @@
package engine
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/docker/docker/pkg/ioutils"
)
type Env []string
// Get returns the last value associated with the given key. If there are no
// values associated with the key, Get returns the empty string.
func (env *Env) Get(key string) (value string) {
// not using Map() because of the extra allocations https://github.com/docker/docker/pull/7488#issuecomment-51638315
for _, kv := range *env {
if strings.Index(kv, "=") == -1 {
continue
}
parts := strings.SplitN(kv, "=", 2)
if parts[0] != key {
continue
}
if len(parts) < 2 {
value = ""
} else {
value = parts[1]
}
}
return
}
func (env *Env) Exists(key string) bool {
_, exists := env.Map()[key]
return exists
}
// Len returns the number of keys in the environment.
// Note that len(env) might be different from env.Len(),
// because the same key might be set multiple times.
func (env *Env) Len() int {
return len(env.Map())
}
func (env *Env) Init(src *Env) {
(*env) = make([]string, 0, len(*src))
for _, val := range *src {
(*env) = append((*env), val)
}
}
func (env *Env) GetBool(key string) (value bool) {
s := strings.ToLower(strings.Trim(env.Get(key), " \t"))
if s == "" || s == "0" || s == "no" || s == "false" || s == "none" {
return false
}
return true
}
func (env *Env) SetBool(key string, value bool) {
if value {
env.Set(key, "1")
} else {
env.Set(key, "0")
}
}
func (env *Env) GetTime(key string) (time.Time, error) {
t, err := time.Parse(time.RFC3339Nano, env.Get(key))
return t, err
}
func (env *Env) SetTime(key string, t time.Time) {
env.Set(key, t.Format(time.RFC3339Nano))
}
func (env *Env) GetInt(key string) int {
return int(env.GetInt64(key))
}
func (env *Env) GetInt64(key string) int64 {
s := strings.Trim(env.Get(key), " \t")
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0
}
return val
}
func (env *Env) SetInt(key string, value int) {
env.Set(key, fmt.Sprintf("%d", value))
}
func (env *Env) SetInt64(key string, value int64) {
env.Set(key, fmt.Sprintf("%d", value))
}
// Returns nil if key not found
func (env *Env) GetList(key string) []string {
sval := env.Get(key)
if sval == "" {
return nil
}
l := make([]string, 0, 1)
if err := json.Unmarshal([]byte(sval), &l); err != nil {
l = append(l, sval)
}
return l
}
func (env *Env) GetSubEnv(key string) *Env {
sval := env.Get(key)
if sval == "" {
return nil
}
buf := bytes.NewBufferString(sval)
var sub Env
if err := sub.Decode(buf); err != nil {
return nil
}
return &sub
}
func (env *Env) SetSubEnv(key string, sub *Env) error {
var buf bytes.Buffer
if err := sub.Encode(&buf); err != nil {
return err
}
env.Set(key, string(buf.Bytes()))
return nil
}
func (env *Env) GetJson(key string, iface interface{}) error {
sval := env.Get(key)
if sval == "" {
return nil
}
return json.Unmarshal([]byte(sval), iface)
}
func (env *Env) SetJson(key string, value interface{}) error {
sval, err := json.Marshal(value)
if err != nil {
return err
}
env.Set(key, string(sval))
return nil
}
func (env *Env) SetList(key string, value []string) error {
return env.SetJson(key, value)
}
func (env *Env) Set(key, value string) {
*env = append(*env, key+"="+value)
}
func NewDecoder(src io.Reader) *Decoder {
return &Decoder{
json.NewDecoder(src),
}
}
type Decoder struct {
*json.Decoder
}
func (decoder *Decoder) Decode() (*Env, error) {
m := make(map[string]interface{})
if err := decoder.Decoder.Decode(&m); err != nil {
return nil, err
}
env := &Env{}
for key, value := range m {
env.SetAuto(key, value)
}
return env, nil
}
// DecodeEnv decodes `src` as a json dictionary, and adds
// each decoded key-value pair to the environment.
//
// If `src` cannot be decoded as a json dictionary, an error
// is returned.
func (env *Env) Decode(src io.Reader) error {
m := make(map[string]interface{})
d := json.NewDecoder(src)
// We need this or we'll lose data when we decode int64 in json
d.UseNumber()
if err := d.Decode(&m); err != nil {
return err
}
for k, v := range m {
env.SetAuto(k, v)
}
return nil
}
func (env *Env) SetAuto(k string, v interface{}) {
// Issue 7941 - if the value in the incoming JSON is null then treat it
// as if they never specified the property at all.
if v == nil {
return
}
// FIXME: we fix-convert float values to int, because
// encoding/json decodes integers to float64, but cannot encode them back.
// (See https://golang.org/src/pkg/encoding/json/decode.go#L46)
if fval, ok := v.(float64); ok {
env.SetInt64(k, int64(fval))
} else if sval, ok := v.(string); ok {
env.Set(k, sval)
} else if val, err := json.Marshal(v); err == nil {
env.Set(k, string(val))
} else {
env.Set(k, fmt.Sprintf("%v", v))
}
}
func changeFloats(v interface{}) interface{} {
switch v := v.(type) {
case float64:
return int(v)
case map[string]interface{}:
for key, val := range v {
v[key] = changeFloats(val)
}
case []interface{}:
for idx, val := range v {
v[idx] = changeFloats(val)
}
}
return v
}
func (env *Env) Encode(dst io.Writer) error {
m := make(map[string]interface{})
for k, v := range env.Map() {
var val interface{}
if err := json.Unmarshal([]byte(v), &val); err == nil {
// FIXME: we fix-convert float values to int, because
// encoding/json decodes integers to float64, but cannot encode them back.
// (See https://golang.org/src/pkg/encoding/json/decode.go#L46)
m[k] = changeFloats(val)
} else {
m[k] = v
}
}
if err := json.NewEncoder(dst).Encode(&m); err != nil {
return err
}
return nil
}
func (env *Env) WriteTo(dst io.Writer) (int64, error) {
wc := ioutils.NewWriteCounter(dst)
err := env.Encode(wc)
return wc.Count, err
}
func (env *Env) Import(src interface{}) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("ImportEnv: %s", err)
}
}()
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(src); err != nil {
return err
}
if err := env.Decode(&buf); err != nil {
return err
}
return nil
}
func (env *Env) Map() map[string]string {
m := make(map[string]string)
for _, kv := range *env {
parts := strings.SplitN(kv, "=", 2)
m[parts[0]] = parts[1]
}
return m
}
// MultiMap returns a representation of env as a
// map of string arrays, keyed by string.
// This is the same structure as http headers for example,
// which allow each key to have multiple values.
func (env *Env) MultiMap() map[string][]string {
m := make(map[string][]string)
for _, kv := range *env {
parts := strings.SplitN(kv, "=", 2)
m[parts[0]] = append(m[parts[0]], parts[1])
}
return m
}
// InitMultiMap removes all values in env, then initializes
// new values from the contents of m.
func (env *Env) InitMultiMap(m map[string][]string) {
(*env) = make([]string, 0, len(m))
for k, vals := range m {
for _, v := range vals {
env.Set(k, v)
}
}
}

View file

@ -1,366 +0,0 @@
package engine
import (
"bytes"
"encoding/json"
"testing"
"time"
"github.com/docker/docker/pkg/stringutils"
)
func TestEnvLenZero(t *testing.T) {
env := &Env{}
if env.Len() != 0 {
t.Fatalf("%d", env.Len())
}
}
func TestEnvLenNotZero(t *testing.T) {
env := &Env{}
env.Set("foo", "bar")
env.Set("ga", "bu")
if env.Len() != 2 {
t.Fatalf("%d", env.Len())
}
}
func TestEnvLenDup(t *testing.T) {
env := &Env{
"foo=bar",
"foo=baz",
"a=b",
}
// len(env) != env.Len()
if env.Len() != 2 {
t.Fatalf("%d", env.Len())
}
}
func TestEnvGetDup(t *testing.T) {
env := &Env{
"foo=bar",
"foo=baz",
"foo=bif",
}
expected := "bif"
if v := env.Get("foo"); v != expected {
t.Fatalf("expect %q, got %q", expected, v)
}
}
func TestNewJob(t *testing.T) {
job := mkJob(t, "dummy", "--level=awesome")
if job.Name != "dummy" {
t.Fatalf("Wrong job name: %s", job.Name)
}
if len(job.Args) != 1 {
t.Fatalf("Wrong number of job arguments: %d", len(job.Args))
}
if job.Args[0] != "--level=awesome" {
t.Fatalf("Wrong job arguments: %s", job.Args[0])
}
}
func TestSetenv(t *testing.T) {
job := mkJob(t, "dummy")
job.Setenv("foo", "bar")
if val := job.Getenv("foo"); val != "bar" {
t.Fatalf("Getenv returns incorrect value: %s", val)
}
job.Setenv("bar", "")
if val := job.Getenv("bar"); val != "" {
t.Fatalf("Getenv returns incorrect value: %s", val)
}
if val := job.Getenv("nonexistent"); val != "" {
t.Fatalf("Getenv returns incorrect value: %s", val)
}
}
func TestDecodeEnv(t *testing.T) {
job := mkJob(t, "dummy")
type tmp struct {
Id1 int64
Id2 int64
}
body := []byte("{\"tags\":{\"Id1\":123, \"Id2\":1234567}}")
if err := job.DecodeEnv(bytes.NewBuffer(body)); err != nil {
t.Fatalf("DecodeEnv failed: %v", err)
}
mytag := tmp{}
if val := job.GetenvJson("tags", &mytag); val != nil {
t.Fatalf("GetenvJson returns incorrect value: %s", val)
}
if mytag.Id1 != 123 || mytag.Id2 != 1234567 {
t.Fatal("Get wrong values set by job.DecodeEnv")
}
}
func TestSetenvBool(t *testing.T) {
job := mkJob(t, "dummy")
job.SetenvBool("foo", true)
if val := job.GetenvBool("foo"); !val {
t.Fatalf("GetenvBool returns incorrect value: %t", val)
}
job.SetenvBool("bar", false)
if val := job.GetenvBool("bar"); val {
t.Fatalf("GetenvBool returns incorrect value: %t", val)
}
if val := job.GetenvBool("nonexistent"); val {
t.Fatalf("GetenvBool returns incorrect value: %t", val)
}
}
func TestSetenvTime(t *testing.T) {
job := mkJob(t, "dummy")
now := time.Now()
job.SetenvTime("foo", now)
if val, err := job.GetenvTime("foo"); err != nil {
t.Fatalf("GetenvTime failed to parse: %v", err)
} else {
nowStr := now.Format(time.RFC3339)
valStr := val.Format(time.RFC3339)
if nowStr != valStr {
t.Fatalf("GetenvTime returns incorrect value: %s, Expected: %s", valStr, nowStr)
}
}
job.Setenv("bar", "Obviously I'm not a date")
if val, err := job.GetenvTime("bar"); err == nil {
t.Fatalf("GetenvTime was supposed to fail, instead returned: %s", val)
}
}
func TestSetenvInt(t *testing.T) {
job := mkJob(t, "dummy")
job.SetenvInt("foo", -42)
if val := job.GetenvInt("foo"); val != -42 {
t.Fatalf("GetenvInt returns incorrect value: %d", val)
}
job.SetenvInt("bar", 42)
if val := job.GetenvInt("bar"); val != 42 {
t.Fatalf("GetenvInt returns incorrect value: %d", val)
}
if val := job.GetenvInt("nonexistent"); val != 0 {
t.Fatalf("GetenvInt returns incorrect value: %d", val)
}
}
func TestSetenvList(t *testing.T) {
job := mkJob(t, "dummy")
job.SetenvList("foo", []string{"bar"})
if val := job.GetenvList("foo"); len(val) != 1 || val[0] != "bar" {
t.Fatalf("GetenvList returns incorrect value: %v", val)
}
job.SetenvList("bar", nil)
if val := job.GetenvList("bar"); val != nil {
t.Fatalf("GetenvList returns incorrect value: %v", val)
}
if val := job.GetenvList("nonexistent"); val != nil {
t.Fatalf("GetenvList returns incorrect value: %v", val)
}
}
func TestEnviron(t *testing.T) {
job := mkJob(t, "dummy")
job.Setenv("foo", "bar")
val, exists := job.Environ()["foo"]
if !exists {
t.Fatalf("foo not found in the environ")
}
if val != "bar" {
t.Fatalf("bar not found in the environ")
}
}
func TestMultiMap(t *testing.T) {
e := &Env{}
e.Set("foo", "bar")
e.Set("bar", "baz")
e.Set("hello", "world")
m := e.MultiMap()
e2 := &Env{}
e2.Set("old_key", "something something something")
e2.InitMultiMap(m)
if v := e2.Get("old_key"); v != "" {
t.Fatalf("%#v", v)
}
if v := e2.Get("bar"); v != "baz" {
t.Fatalf("%#v", v)
}
if v := e2.Get("hello"); v != "world" {
t.Fatalf("%#v", v)
}
}
func testMap(l int) [][2]string {
res := make([][2]string, l)
for i := 0; i < l; i++ {
t := [2]string{stringutils.GenerateRandomAsciiString(5), stringutils.GenerateRandomAsciiString(20)}
res[i] = t
}
return res
}
func BenchmarkSet(b *testing.B) {
fix := testMap(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
env := &Env{}
for _, kv := range fix {
env.Set(kv[0], kv[1])
}
}
}
func BenchmarkSetJson(b *testing.B) {
fix := testMap(100)
type X struct {
f string
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
env := &Env{}
for _, kv := range fix {
if err := env.SetJson(kv[0], X{kv[1]}); err != nil {
b.Fatal(err)
}
}
}
}
func BenchmarkGet(b *testing.B) {
fix := testMap(100)
env := &Env{}
for _, kv := range fix {
env.Set(kv[0], kv[1])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, kv := range fix {
env.Get(kv[0])
}
}
}
func BenchmarkGetJson(b *testing.B) {
fix := testMap(100)
env := &Env{}
type X struct {
f string
}
for _, kv := range fix {
env.SetJson(kv[0], X{kv[1]})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, kv := range fix {
if err := env.GetJson(kv[0], &X{}); err != nil {
b.Fatal(err)
}
}
}
}
func BenchmarkEncode(b *testing.B) {
fix := testMap(100)
env := &Env{}
type X struct {
f string
}
// half a json
for i, kv := range fix {
if i%2 != 0 {
if err := env.SetJson(kv[0], X{kv[1]}); err != nil {
b.Fatal(err)
}
continue
}
env.Set(kv[0], kv[1])
}
var writer bytes.Buffer
b.ResetTimer()
for i := 0; i < b.N; i++ {
env.Encode(&writer)
writer.Reset()
}
}
func BenchmarkDecode(b *testing.B) {
fix := testMap(100)
env := &Env{}
type X struct {
f string
}
// half a json
for i, kv := range fix {
if i%2 != 0 {
if err := env.SetJson(kv[0], X{kv[1]}); err != nil {
b.Fatal(err)
}
continue
}
env.Set(kv[0], kv[1])
}
var writer bytes.Buffer
env.Encode(&writer)
denv := &Env{}
reader := bytes.NewReader(writer.Bytes())
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := denv.Decode(reader)
if err != nil {
b.Fatal(err)
}
reader.Seek(0, 0)
}
}
func TestLongNumbers(t *testing.T) {
type T struct {
TestNum int64
}
v := T{67108864}
var buf bytes.Buffer
e := &Env{}
e.SetJson("Test", v)
if err := e.Encode(&buf); err != nil {
t.Fatal(err)
}
res := make(map[string]T)
if err := json.Unmarshal(buf.Bytes(), &res); err != nil {
t.Fatal(err)
}
if res["Test"].TestNum != v.TestNum {
t.Fatalf("TestNum %d, expected %d", res["Test"].TestNum, v.TestNum)
}
}
func TestLongNumbersArray(t *testing.T) {
type T struct {
TestNum []int64
}
v := T{[]int64{67108864}}
var buf bytes.Buffer
e := &Env{}
e.SetJson("Test", v)
if err := e.Encode(&buf); err != nil {
t.Fatal(err)
}
res := make(map[string]T)
if err := json.Unmarshal(buf.Bytes(), &res); err != nil {
t.Fatal(err)
}
if res["Test"].TestNum[0] != v.TestNum[0] {
t.Fatalf("TestNum %d, expected %d", res["Test"].TestNum, v.TestNum)
}
}

View file

@ -1,21 +0,0 @@
package engine
type Hack map[string]interface{}
func (eng *Engine) HackGetGlobalVar(key string) interface{} {
if eng.hack == nil {
return nil
}
val, exists := eng.hack[key]
if !exists {
return nil
}
return val
}
func (eng *Engine) HackSetGlobalVar(key string, val interface{}) {
if eng.hack == nil {
eng.hack = make(Hack)
}
eng.hack[key] = val
}

View file

@ -1,11 +0,0 @@
package engine
import (
"testing"
)
var globalTestID string
func mkJob(t *testing.T, name string, args ...string) *Job {
return New().Job(name, args...)
}

View file

@ -1,42 +0,0 @@
package engine
import (
"net/http"
"path"
)
// ServeHTTP executes a job as specified by the http request `r`, and sends the
// result as an http response.
// This method allows an Engine instance to be passed as a standard http.Handler interface.
//
// Note that the protocol used in this method is a convenience wrapper and is not the canonical
// implementation of remote job execution. This is because HTTP/1 does not handle stream multiplexing,
// and so cannot differentiate stdout from stderr. Additionally, headers cannot be added to a response
// once data has been written to the body, which makes it inconvenient to return metadata such
// as the exit status.
//
func (eng *Engine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
jobName = path.Base(r.URL.Path)
jobArgs, exists = r.URL.Query()["a"]
)
if !exists {
jobArgs = []string{}
}
w.Header().Set("Job-Name", jobName)
for _, arg := range jobArgs {
w.Header().Add("Job-Args", arg)
}
job := eng.Job(jobName, jobArgs...)
job.Stdout.Add(w)
job.Stderr.Add(w)
// FIXME: distinguish job status from engine error in Run()
// The former should be passed as a special header, the former
// should cause a 500 status
w.WriteHeader(http.StatusOK)
// The exit status cannot be sent reliably with HTTP1, because headers
// can only be sent before the body.
// (we could possibly use http footers via chunked encoding, but I couldn't find
// how to use them in net/http)
job.Run()
}

View file

@ -1,222 +0,0 @@
package engine
import (
"bytes"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// A job is the fundamental unit of work in the docker engine.
// Everything docker can do should eventually be exposed as a job.
// For example: execute a process in a container, create a new container,
// download an archive from the internet, serve the http api, etc.
//
// The job API is designed after unix processes: a job has a name, arguments,
// environment variables, standard streams for input, output and error.
type Job struct {
Eng *Engine
Name string
Args []string
env *Env
Stdout *Output
Stderr *Output
Stdin *Input
handler Handler
end time.Time
closeIO bool
// When closed, the job has been cancelled.
// Note: not all jobs implement cancellation.
// See Job.Cancel() and Job.WaitCancelled()
cancelled chan struct{}
cancelOnce sync.Once
}
// Run executes the job and blocks until the job completes.
// If the job fails it returns an error
func (job *Job) Run() (err error) {
defer func() {
// Wait for all background tasks to complete
if job.closeIO {
if err := job.Stdout.Close(); err != nil {
logrus.Error(err)
}
if err := job.Stderr.Close(); err != nil {
logrus.Error(err)
}
if err := job.Stdin.Close(); err != nil {
logrus.Error(err)
}
}
}()
if job.Eng.IsShutdown() && !job.GetenvBool("overrideShutdown") {
return fmt.Errorf("engine is shutdown")
}
// FIXME: this is a temporary workaround to avoid Engine.Shutdown
// waiting 5 seconds for server/api.ServeApi to complete (which it never will)
// everytime the daemon is cleanly restarted.
// The permanent fix is to implement Job.Stop and Job.OnStop so that
// ServeApi can cooperate and terminate cleanly.
if job.Name != "serveapi" {
job.Eng.l.Lock()
job.Eng.tasks.Add(1)
job.Eng.l.Unlock()
defer job.Eng.tasks.Done()
}
// FIXME: make this thread-safe
// FIXME: implement wait
if !job.end.IsZero() {
return fmt.Errorf("%s: job has already completed", job.Name)
}
// Log beginning and end of the job
if job.Eng.Logging {
logrus.Infof("+job %s", job.CallString())
defer func() {
okerr := "OK"
if err != nil {
okerr = fmt.Sprintf("ERR: %s", err)
}
logrus.Infof("-job %s %s", job.CallString(), okerr)
}()
}
if job.handler == nil {
return fmt.Errorf("%s: command not found", job.Name)
}
var errorMessage = bytes.NewBuffer(nil)
job.Stderr.Add(errorMessage)
err = job.handler(job)
job.end = time.Now()
return
}
func (job *Job) CallString() string {
return fmt.Sprintf("%s(%s)", job.Name, strings.Join(job.Args, ", "))
}
func (job *Job) Env() *Env {
return job.env
}
func (job *Job) EnvExists(key string) (value bool) {
return job.env.Exists(key)
}
func (job *Job) Getenv(key string) (value string) {
return job.env.Get(key)
}
func (job *Job) GetenvBool(key string) (value bool) {
return job.env.GetBool(key)
}
func (job *Job) SetenvBool(key string, value bool) {
job.env.SetBool(key, value)
}
func (job *Job) GetenvTime(key string) (value time.Time, err error) {
return job.env.GetTime(key)
}
func (job *Job) SetenvTime(key string, value time.Time) {
job.env.SetTime(key, value)
}
func (job *Job) GetenvSubEnv(key string) *Env {
return job.env.GetSubEnv(key)
}
func (job *Job) SetenvSubEnv(key string, value *Env) error {
return job.env.SetSubEnv(key, value)
}
func (job *Job) GetenvInt64(key string) int64 {
return job.env.GetInt64(key)
}
func (job *Job) GetenvInt(key string) int {
return job.env.GetInt(key)
}
func (job *Job) SetenvInt64(key string, value int64) {
job.env.SetInt64(key, value)
}
func (job *Job) SetenvInt(key string, value int) {
job.env.SetInt(key, value)
}
// Returns nil if key not found
func (job *Job) GetenvList(key string) []string {
return job.env.GetList(key)
}
func (job *Job) GetenvJson(key string, iface interface{}) error {
return job.env.GetJson(key, iface)
}
func (job *Job) SetenvJson(key string, value interface{}) error {
return job.env.SetJson(key, value)
}
func (job *Job) SetenvList(key string, value []string) error {
return job.env.SetJson(key, value)
}
func (job *Job) Setenv(key, value string) {
job.env.Set(key, value)
}
// DecodeEnv decodes `src` as a json dictionary, and adds
// each decoded key-value pair to the environment.
//
// If `src` cannot be decoded as a json dictionary, an error
// is returned.
func (job *Job) DecodeEnv(src io.Reader) error {
return job.env.Decode(src)
}
func (job *Job) EncodeEnv(dst io.Writer) error {
return job.env.Encode(dst)
}
func (job *Job) ImportEnv(src interface{}) (err error) {
return job.env.Import(src)
}
func (job *Job) Environ() map[string]string {
return job.env.Map()
}
func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {
return fmt.Fprintf(job.Stdout, format, args...)
}
func (job *Job) Errorf(format string, args ...interface{}) (n int, err error) {
return fmt.Fprintf(job.Stderr, format, args...)
}
func (job *Job) SetCloseIO(val bool) {
job.closeIO = val
}
// When called, causes the Job.WaitCancelled channel to unblock.
func (job *Job) Cancel() {
job.cancelOnce.Do(func() {
close(job.cancelled)
})
}
// Returns a channel which is closed ("never blocks") when the job is cancelled.
func (job *Job) WaitCancelled() <-chan struct{} {
return job.cancelled
}

View file

@ -1,47 +0,0 @@
package engine
import (
"bytes"
"errors"
"fmt"
"testing"
)
func TestJobOK(t *testing.T) {
eng := New()
eng.Register("return_ok", func(job *Job) error { return nil })
err := eng.Job("return_ok").Run()
if err != nil {
t.Fatalf("Expected: err=%v\nReceived: err=%v", nil, err)
}
}
func TestJobErr(t *testing.T) {
eng := New()
eng.Register("return_err", func(job *Job) error { return errors.New("return_err") })
err := eng.Job("return_err").Run()
if err == nil {
t.Fatalf("When a job returns error, Run() should return an error")
}
}
func TestJobStdoutString(t *testing.T) {
eng := New()
// FIXME: test multiple combinations of output and status
eng.Register("say_something_in_stdout", func(job *Job) error {
job.Printf("Hello world\n")
return nil
})
job := eng.Job("say_something_in_stdout")
var outputBuffer = bytes.NewBuffer(nil)
job.Stdout.Add(outputBuffer)
if err := job.Run(); err != nil {
t.Fatal(err)
}
fmt.Println(outputBuffer)
var output = Tail(outputBuffer, 1)
if expectedOutput := "Hello world"; output != expectedOutput {
t.Fatalf("Stdout last line:\nExpected: %v\nReceived: %v", expectedOutput, output)
}
}

View file

@ -1,78 +0,0 @@
package engine
import (
"testing"
"time"
)
func TestShutdownEmpty(t *testing.T) {
eng := New()
if eng.IsShutdown() {
t.Fatalf("IsShutdown should be false")
}
eng.Shutdown()
if !eng.IsShutdown() {
t.Fatalf("IsShutdown should be true")
}
}
func TestShutdownAfterRun(t *testing.T) {
eng := New()
eng.Register("foo", func(job *Job) error {
return nil
})
if err := eng.Job("foo").Run(); err != nil {
t.Fatal(err)
}
eng.Shutdown()
if err := eng.Job("foo").Run(); err == nil {
t.Fatalf("%#v", *eng)
}
}
// An approximate and racy, but better-than-nothing test that
//
func TestShutdownDuringRun(t *testing.T) {
var (
jobDelay time.Duration = 500 * time.Millisecond
jobDelayLow time.Duration = 100 * time.Millisecond
jobDelayHigh time.Duration = 700 * time.Millisecond
)
eng := New()
var completed bool
eng.Register("foo", func(job *Job) error {
time.Sleep(jobDelay)
completed = true
return nil
})
go eng.Job("foo").Run()
time.Sleep(50 * time.Millisecond)
done := make(chan struct{})
var startShutdown time.Time
go func() {
startShutdown = time.Now()
eng.Shutdown()
close(done)
}()
time.Sleep(50 * time.Millisecond)
if err := eng.Job("foo").Run(); err == nil {
t.Fatalf("run on shutdown should fail: %#v", *eng)
}
<-done
// Verify that Shutdown() blocks for roughly 500ms, instead
// of returning almost instantly.
//
// We use >100ms to leave ample margin for race conditions between
// goroutines. It's possible (but unlikely in reasonable testing
// conditions), that this test will cause a false positive or false
// negative. But it's probably better than not having any test
// for the 99.999% of time where testing conditions are reasonable.
if d := time.Since(startShutdown); d.Nanoseconds() < jobDelayLow.Nanoseconds() {
t.Fatalf("shutdown did not block long enough: %v", d)
} else if d.Nanoseconds() > jobDelayHigh.Nanoseconds() {
t.Fatalf("shutdown blocked too long: %v", d)
}
if !completed {
t.Fatalf("job did not complete")
}
}

View file

@ -1,188 +0,0 @@
package engine
import (
"bytes"
"fmt"
"io"
"strings"
"sync"
"unicode"
)
type Output struct {
sync.Mutex
dests []io.Writer
tasks sync.WaitGroup
used bool
}
// Tail returns the n last lines of a buffer
// stripped out of trailing white spaces, if any.
//
// if n <= 0, returns an empty string
func Tail(buffer *bytes.Buffer, n int) string {
if n <= 0 {
return ""
}
s := strings.TrimRightFunc(buffer.String(), unicode.IsSpace)
i := len(s) - 1
for ; i >= 0 && n > 0; i-- {
if s[i] == '\n' {
n--
if n == 0 {
break
}
}
}
// when i == -1, return the whole string which is s[0:]
return s[i+1:]
}
// NewOutput returns a new Output object with no destinations attached.
// Writing to an empty Output will cause the written data to be discarded.
func NewOutput() *Output {
return &Output{}
}
// Return true if something was written on this output
func (o *Output) Used() bool {
o.Lock()
defer o.Unlock()
return o.used
}
// Add attaches a new destination to the Output. Any data subsequently written
// to the output will be written to the new destination in addition to all the others.
// This method is thread-safe.
func (o *Output) Add(dst io.Writer) {
o.Lock()
defer o.Unlock()
o.dests = append(o.dests, dst)
}
// Set closes and remove existing destination and then attaches a new destination to
// the Output. Any data subsequently written to the output will be written to the new
// destination in addition to all the others. This method is thread-safe.
func (o *Output) Set(dst io.Writer) {
o.Close()
o.Lock()
defer o.Unlock()
o.dests = []io.Writer{dst}
}
// AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
// and returns its reading end for consumption by the caller.
// This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
// This method is thread-safe.
func (o *Output) AddPipe() (io.Reader, error) {
r, w := io.Pipe()
o.Add(w)
return r, nil
}
// Write writes the same data to all registered destinations.
// This method is thread-safe.
func (o *Output) Write(p []byte) (n int, err error) {
o.Lock()
defer o.Unlock()
o.used = true
var firstErr error
for _, dst := range o.dests {
_, err := dst.Write(p)
if err != nil && firstErr == nil {
firstErr = err
}
}
return len(p), firstErr
}
// Close unregisters all destinations and waits for all background
// AddTail and AddString tasks to complete.
// The Close method of each destination is called if it exists.
func (o *Output) Close() error {
o.Lock()
defer o.Unlock()
var firstErr error
for _, dst := range o.dests {
if closer, ok := dst.(io.Closer); ok {
err := closer.Close()
if err != nil && firstErr == nil {
firstErr = err
}
}
}
o.tasks.Wait()
o.dests = nil
return firstErr
}
type Input struct {
src io.Reader
sync.Mutex
}
// NewInput returns a new Input object with no source attached.
// Reading to an empty Input will return io.EOF.
func NewInput() *Input {
return &Input{}
}
// Read reads from the input in a thread-safe way.
func (i *Input) Read(p []byte) (n int, err error) {
i.Mutex.Lock()
defer i.Mutex.Unlock()
if i.src == nil {
return 0, io.EOF
}
return i.src.Read(p)
}
// Closes the src
// Not thread safe on purpose
func (i *Input) Close() error {
if i.src != nil {
if closer, ok := i.src.(io.Closer); ok {
return closer.Close()
}
}
return nil
}
// Add attaches a new source to the input.
// Add can only be called once per input. Subsequent calls will
// return an error.
func (i *Input) Add(src io.Reader) error {
i.Mutex.Lock()
defer i.Mutex.Unlock()
if i.src != nil {
return fmt.Errorf("Maximum number of sources reached: 1")
}
i.src = src
return nil
}
// AddEnv starts a new goroutine which will decode all subsequent data
// as a stream of json-encoded objects, and point `dst` to the last
// decoded object.
// The result `env` can be queried using the type-neutral Env interface.
// It is not safe to query `env` until the Output is closed.
func (o *Output) AddEnv() (dst *Env, err error) {
src, err := o.AddPipe()
if err != nil {
return nil, err
}
dst = &Env{}
o.tasks.Add(1)
go func() {
defer o.tasks.Done()
decoder := NewDecoder(src)
for {
env, err := decoder.Decode()
if err != nil {
return
}
*dst = *env
}
}()
return dst, nil
}

View file

@ -1,215 +0,0 @@
package engine
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"
)
type sentinelWriteCloser struct {
calledWrite bool
calledClose bool
}
func (w *sentinelWriteCloser) Write(p []byte) (int, error) {
w.calledWrite = true
return len(p), nil
}
func (w *sentinelWriteCloser) Close() error {
w.calledClose = true
return nil
}
func TestOutputAddEnv(t *testing.T) {
input := "{\"foo\": \"bar\", \"answer_to_life_the_universe_and_everything\": 42}"
o := NewOutput()
result, err := o.AddEnv()
if err != nil {
t.Fatal(err)
}
o.Write([]byte(input))
o.Close()
if v := result.Get("foo"); v != "bar" {
t.Errorf("Expected %v, got %v", "bar", v)
}
if v := result.GetInt("answer_to_life_the_universe_and_everything"); v != 42 {
t.Errorf("Expected %v, got %v", 42, v)
}
if v := result.Get("this-value-doesnt-exist"); v != "" {
t.Errorf("Expected %v, got %v", "", v)
}
}
func TestOutputAddClose(t *testing.T) {
o := NewOutput()
var s sentinelWriteCloser
o.Add(&s)
if err := o.Close(); err != nil {
t.Fatal(err)
}
// Write data after the output is closed.
// Write should succeed, but no destination should receive it.
if _, err := o.Write([]byte("foo bar")); err != nil {
t.Fatal(err)
}
if !s.calledClose {
t.Fatal("Output.Close() didn't close the destination")
}
}
func TestOutputAddPipe(t *testing.T) {
var testInputs = []string{
"hello, world!",
"One\nTwo\nThree",
"",
"A line\nThen another nl-terminated line\n",
"A line followed by an empty line\n\n",
}
for _, input := range testInputs {
expectedOutput := input
o := NewOutput()
r, err := o.AddPipe()
if err != nil {
t.Fatal(err)
}
go func(o *Output) {
if n, err := o.Write([]byte(input)); err != nil {
t.Error(err)
} else if n != len(input) {
t.Errorf("Expected %d, got %d", len(input), n)
}
if err := o.Close(); err != nil {
t.Error(err)
}
}(o)
output, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
if string(output) != expectedOutput {
t.Errorf("Last line is not stored as return string.\nExpected: '%s'\nGot: '%s'", expectedOutput, output)
}
}
}
func TestTail(t *testing.T) {
var tests = make(map[string][]string)
tests["hello, world!"] = []string{
"",
"hello, world!",
"hello, world!",
"hello, world!",
}
tests["One\nTwo\nThree"] = []string{
"",
"Three",
"Two\nThree",
"One\nTwo\nThree",
}
tests["One\nTwo\n\n\n"] = []string{
"",
"Two",
"One\nTwo",
}
for input, outputs := range tests {
for n, expectedOutput := range outputs {
output := Tail(bytes.NewBufferString(input), n)
if output != expectedOutput {
t.Errorf("Tail n=%d returned wrong result.\nExpected: '%s'\nGot : '%s'", n, expectedOutput, output)
}
}
}
}
func lastLine(txt string) string {
scanner := bufio.NewScanner(strings.NewReader(txt))
var lastLine string
for scanner.Scan() {
lastLine = scanner.Text()
}
return lastLine
}
func TestOutputAdd(t *testing.T) {
o := NewOutput()
b := &bytes.Buffer{}
o.Add(b)
input := "hello, world!"
if n, err := o.Write([]byte(input)); err != nil {
t.Fatal(err)
} else if n != len(input) {
t.Fatalf("Expected %d, got %d", len(input), n)
}
if output := b.String(); output != input {
t.Fatalf("Received wrong data from Add.\nExpected: '%s'\nGot: '%s'", input, output)
}
}
func TestOutputWriteError(t *testing.T) {
o := NewOutput()
buf := &bytes.Buffer{}
o.Add(buf)
r, w := io.Pipe()
input := "Hello there"
expectedErr := fmt.Errorf("This is an error")
r.CloseWithError(expectedErr)
o.Add(w)
n, err := o.Write([]byte(input))
if err != expectedErr {
t.Fatalf("Output.Write() should return the first error encountered, if any")
}
if buf.String() != input {
t.Fatalf("Output.Write() should attempt write on all destinations, even after encountering an error")
}
if n != len(input) {
t.Fatalf("Output.Write() should return the size of the input if it successfully writes to at least one destination")
}
}
func TestInputAddEmpty(t *testing.T) {
i := NewInput()
var b bytes.Buffer
if err := i.Add(&b); err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(i)
if err != nil {
t.Fatal(err)
}
if len(data) > 0 {
t.Fatalf("Read from empty input should yield no data")
}
}
func TestInputAddTwo(t *testing.T) {
i := NewInput()
var b1 bytes.Buffer
// First add should succeed
if err := i.Add(&b1); err != nil {
t.Fatal(err)
}
var b2 bytes.Buffer
// Second add should fail
if err := i.Add(&b2); err == nil {
t.Fatalf("Adding a second source should return an error")
}
}
func TestInputAddNotEmpty(t *testing.T) {
i := NewInput()
b := bytes.NewBufferString("hello world\nabc")
expectedResult := b.String()
i.Add(b)
result, err := ioutil.ReadAll(i)
if err != nil {
t.Fatal(err)
}
if string(result) != expectedResult {
t.Fatalf("Expected: %v\nReceived: %v", expectedResult, result)
}
}

View file

@ -24,15 +24,15 @@ func checkPidFileAlreadyExists(path string) error {
return nil
}
func New(path string) (file *PidFile, err error) {
func New(path string) (*PidFile, error) {
if err := checkPidFileAlreadyExists(path); err != nil {
return nil, err
}
if err := ioutil.WriteFile(path, []byte(fmt.Sprintf("%d", os.Getpid())), 0644); err != nil {
return nil, err
}
file = &PidFile{path: path}
err = ioutil.WriteFile(path, []byte(fmt.Sprintf("%d", os.Getpid())), 0644)
return file, err
return &PidFile{path: path}, nil
}
func (file PidFile) Remove() error {