Merge pull request #6118 from LK4D4/fix_race_conditions

Fix race conditions
This commit is contained in:
Michael Crosby 2014-05-30 10:43:56 -07:00
commit 9e58a77e26
7 changed files with 132 additions and 71 deletions

View file

@ -1,7 +1,6 @@
package daemon
import (
"container/list"
"fmt"
"io"
"io/ioutil"
@ -48,10 +47,43 @@ var (
validContainerNamePattern = regexp.MustCompile(`^/?` + validContainerNameChars + `+$`)
)
type contStore struct {
s map[string]*Container
sync.Mutex
}
func (c *contStore) Add(id string, cont *Container) {
c.Lock()
c.s[id] = cont
c.Unlock()
}
func (c *contStore) Get(id string) *Container {
c.Lock()
res := c.s[id]
c.Unlock()
return res
}
func (c *contStore) Delete(id string) {
c.Lock()
delete(c.s, id)
c.Unlock()
}
func (c *contStore) List() []*Container {
containers := new(History)
for _, cont := range c.s {
containers.Add(cont)
}
containers.Sort()
return *containers
}
type Daemon struct {
repository string
sysInitPath string
containers *list.List
containers *contStore
graph *graph.Graph
repositories *graph.TagStore
idIndex *utils.TruncIndex
@ -87,22 +119,7 @@ func remountPrivate(mountPoint string) error {
// List returns an array of all containers registered in the daemon.
func (daemon *Daemon) List() []*Container {
containers := new(History)
for e := daemon.containers.Front(); e != nil; e = e.Next() {
containers.Add(e.Value.(*Container))
}
containers.Sort()
return *containers
}
func (daemon *Daemon) getContainerElement(id string) *list.Element {
for e := daemon.containers.Front(); e != nil; e = e.Next() {
container := e.Value.(*Container)
if container.ID == id {
return e
}
}
return nil
return daemon.containers.List()
}
// Get looks for a container by the specified ID or name, and returns it.
@ -117,11 +134,7 @@ func (daemon *Daemon) Get(name string) *Container {
return nil
}
e := daemon.getContainerElement(id)
if e == nil {
return nil
}
return e.Value.(*Container)
return daemon.containers.Get(id)
}
// Exists returns a true if a container of the specified ID or name exists,
@ -177,7 +190,7 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool) err
container.stdinPipe = utils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
}
// done
daemon.containers.PushBack(container)
daemon.containers.Add(container.ID, container)
// don't update the Suffixarray if we're starting up
// we'll waste time if we update it for every container
@ -279,7 +292,7 @@ func (daemon *Daemon) Destroy(container *Container) error {
return fmt.Errorf("The given container is <nil>")
}
element := daemon.getContainerElement(container.ID)
element := daemon.containers.Get(container.ID)
if element == nil {
return fmt.Errorf("Container %v not found - maybe it was already destroyed?", container.ID)
}
@ -290,7 +303,7 @@ func (daemon *Daemon) Destroy(container *Container) error {
// Deregister the container before removing its directory, to avoid race conditions
daemon.idIndex.Delete(container.ID)
daemon.containers.Remove(element)
daemon.containers.Delete(container.ID)
if _, err := daemon.containerGraph.Purge(container.ID); err != nil {
utils.Debugf("Unable to remove container from link graph: %s", err)
@ -677,11 +690,11 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) {
if entity == nil {
return nil, fmt.Errorf("Could not find entity for %s", name)
}
e := daemon.getContainerElement(entity.ID())
e := daemon.containers.Get(entity.ID())
if e == nil {
return nil, fmt.Errorf("Could not find container for entity id %s", entity.ID())
}
return e.Value.(*Container), nil
return e, nil
}
func (daemon *Daemon) Children(name string) (map[string]*Container, error) {
@ -860,7 +873,7 @@ func NewDaemonFromDirectory(config *daemonconfig.Config, eng *engine.Engine) (*D
daemon := &Daemon{
repository: daemonRepo,
containers: list.New(),
containers: &contStore{s: make(map[string]*Container)},
graph: g,
repositories: repositories,
idIndex: utils.NewTruncIndex([]string{}),

View file

@ -47,9 +47,11 @@ func (d *driver) createContainer(c *execdriver.Command) (*libcontainer.Container
return nil, err
}
cmds := make(map[string]*exec.Cmd)
d.Lock()
for k, v := range d.activeContainers {
cmds[k] = v.cmd
}
d.Unlock()
if err := configuration.ParseConfiguration(container, cmds, c.Config["native"]); err != nil {
return nil, err
}
@ -86,7 +88,9 @@ func (d *driver) createNetwork(container *libcontainer.Container, c *execdriver.
}
if c.Network.ContainerID != "" {
d.Lock()
active := d.activeContainers[c.Network.ContainerID]
d.Unlock()
if active == nil || active.cmd.Process == nil {
return fmt.Errorf("%s is not a valid running container to join", c.Network.ContainerID)
}

View file

@ -8,6 +8,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"github.com/dotcloud/docker/daemon/execdriver"
@ -62,6 +63,7 @@ type driver struct {
root string
initPath string
activeContainers map[string]*activeContainer
sync.Mutex
}
func NewDriver(root, initPath string) (*driver, error) {
@ -87,10 +89,12 @@ func (d *driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallba
if err != nil {
return -1, err
}
d.Lock()
d.activeContainers[c.ID] = &activeContainer{
container: container,
cmd: &c.Cmd,
}
d.Unlock()
var (
dataPath = filepath.Join(d.root, c.ID)
@ -186,7 +190,9 @@ func (d *driver) Name() string {
}
func (d *driver) GetPidsForContainer(id string) ([]int, error) {
d.Lock()
active := d.activeContainers[id]
d.Unlock()
if active == nil {
return nil, fmt.Errorf("active container for %s does not exist", id)
@ -212,7 +218,9 @@ func (d *driver) createContainerRoot(id string) error {
}
func (d *driver) removeContainerRoot(id string) error {
d.Lock()
delete(d.activeContainers, id)
d.Unlock()
return os.RemoveAll(filepath.Join(d.root, id))
}

View file

@ -6,6 +6,7 @@ import (
"log"
"net"
"strings"
"sync"
"github.com/dotcloud/docker/daemon/networkdriver"
"github.com/dotcloud/docker/daemon/networkdriver/ipallocator"
@ -28,6 +29,24 @@ type networkInterface struct {
PortMappings []net.Addr // there are mappings to the host interfaces
}
type ifaces struct {
c map[string]*networkInterface
sync.Mutex
}
func (i *ifaces) Set(key string, n *networkInterface) {
i.Lock()
i.c[key] = n
i.Unlock()
}
func (i *ifaces) Get(key string) *networkInterface {
i.Lock()
res := i.c[key]
i.Unlock()
return res
}
var (
addrs = []string{
// Here we don't follow the convention of using the 1st IP of the range for the gateway.
@ -53,7 +72,7 @@ var (
bridgeNetwork *net.IPNet
defaultBindingIP = net.ParseIP("0.0.0.0")
currentInterfaces = make(map[string]*networkInterface)
currentInterfaces = ifaces{c: make(map[string]*networkInterface)}
)
func InitDriver(job *engine.Job) engine.Status {
@ -321,9 +340,9 @@ func Allocate(job *engine.Job) engine.Status {
size, _ := bridgeNetwork.Mask.Size()
out.SetInt("IPPrefixLen", size)
currentInterfaces[id] = &networkInterface{
currentInterfaces.Set(id, &networkInterface{
IP: *ip,
}
})
out.WriteTo(job.Stdout)
@ -334,7 +353,7 @@ func Allocate(job *engine.Job) engine.Status {
func Release(job *engine.Job) engine.Status {
var (
id = job.Args[0]
containerInterface = currentInterfaces[id]
containerInterface = currentInterfaces.Get(id)
ip net.IP
port int
proto string
@ -383,7 +402,7 @@ func AllocatePort(job *engine.Job) engine.Status {
origHostPort = job.GetenvInt("HostPort")
containerPort = job.GetenvInt("ContainerPort")
proto = job.Getenv("Proto")
network = currentInterfaces[id]
network = currentInterfaces.Get(id)
)
if hostIP != "" {

View file

@ -3,13 +3,15 @@ package graph
import (
"encoding/json"
"fmt"
"github.com/dotcloud/docker/image"
"github.com/dotcloud/docker/utils"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"github.com/dotcloud/docker/image"
"github.com/dotcloud/docker/utils"
)
const DEFAULTTAG = "latest"
@ -18,6 +20,7 @@ type TagStore struct {
path string
graph *Graph
Repositories map[string]Repository
sync.Mutex
}
type Repository map[string]string
@ -33,8 +36,8 @@ func NewTagStore(path string, graph *Graph) (*TagStore, error) {
Repositories: make(map[string]Repository),
}
// Load the json file if it exists, otherwise create it.
if err := store.Reload(); os.IsNotExist(err) {
if err := store.Save(); err != nil {
if err := store.reload(); os.IsNotExist(err) {
if err := store.save(); err != nil {
return nil, err
}
} else if err != nil {
@ -43,7 +46,7 @@ func NewTagStore(path string, graph *Graph) (*TagStore, error) {
return store, nil
}
func (store *TagStore) Save() error {
func (store *TagStore) save() error {
// Store the json ball
jsonData, err := json.Marshal(store)
if err != nil {
@ -55,7 +58,7 @@ func (store *TagStore) Save() error {
return nil
}
func (store *TagStore) Reload() error {
func (store *TagStore) reload() error {
jsonData, err := ioutil.ReadFile(store.path)
if err != nil {
return err
@ -74,6 +77,8 @@ func (store *TagStore) LookupImage(name string) (*image.Image, error) {
tag = DEFAULTTAG
}
img, err := store.GetImage(repos, tag)
store.Lock()
defer store.Unlock()
if err != nil {
return nil, err
} else if img == nil {
@ -87,6 +92,8 @@ func (store *TagStore) LookupImage(name string) (*image.Image, error) {
// Return a reverse-lookup table of all the names which refer to each image
// Eg. {"43b5f19b10584": {"base:latest", "base:v1"}}
func (store *TagStore) ByID() map[string][]string {
store.Lock()
defer store.Unlock()
byID := make(map[string][]string)
for repoName, repository := range store.Repositories {
for tag, id := range repository {
@ -130,8 +137,10 @@ func (store *TagStore) DeleteAll(id string) error {
}
func (store *TagStore) Delete(repoName, tag string) (bool, error) {
store.Lock()
defer store.Unlock()
deleted := false
if err := store.Reload(); err != nil {
if err := store.reload(); err != nil {
return false, err
}
if r, exists := store.Repositories[repoName]; exists {
@ -150,13 +159,15 @@ func (store *TagStore) Delete(repoName, tag string) (bool, error) {
deleted = true
}
} else {
fmt.Errorf("No such repository: %s", repoName)
return false, fmt.Errorf("No such repository: %s", repoName)
}
return deleted, store.Save()
return deleted, store.save()
}
func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
img, err := store.LookupImage(imageName)
store.Lock()
defer store.Unlock()
if err != nil {
return err
}
@ -169,7 +180,7 @@ func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
if err := validateTagName(tag); err != nil {
return err
}
if err := store.Reload(); err != nil {
if err := store.reload(); err != nil {
return err
}
var repo Repository
@ -183,11 +194,13 @@ func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
store.Repositories[repoName] = repo
}
repo[tag] = img.ID
return store.Save()
return store.save()
}
func (store *TagStore) Get(repoName string) (Repository, error) {
if err := store.Reload(); err != nil {
store.Lock()
defer store.Unlock()
if err := store.reload(); err != nil {
return nil, err
}
if r, exists := store.Repositories[repoName]; exists {
@ -198,6 +211,8 @@ func (store *TagStore) Get(repoName string) (Repository, error) {
func (store *TagStore) GetImage(repoName, tagOrID string) (*image.Image, error) {
repo, err := store.Get(repoName)
store.Lock()
defer store.Unlock()
if err != nil {
return nil, err
} else if repo == nil {
@ -215,6 +230,20 @@ func (store *TagStore) GetImage(repoName, tagOrID string) (*image.Image, error)
return nil, nil
}
func (store *TagStore) GetRepoRefs() map[string][]string {
store.Lock()
reporefs := make(map[string][]string)
for name, repository := range store.Repositories {
for tag, id := range repository {
shortID := utils.TruncateID(id)
reporefs[shortID] = append(reporefs[shortID], fmt.Sprintf("%s:%s", name, tag))
}
}
store.Unlock()
return reporefs
}
// Validate the name of a repository
func validateRepoName(name string) error {
if name == "" {

View file

@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"net"
"sync/atomic"
"syscall"
"unsafe"
)
@ -22,7 +23,7 @@ const (
SIOC_BRADDIF = 0x89a2
)
var nextSeqNr int
var nextSeqNr uint32
type ifreqHwaddr struct {
IfrnName [16]byte
@ -42,11 +43,6 @@ func nativeEndian() binary.ByteOrder {
return binary.LittleEndian
}
func getSeq() int {
nextSeqNr = nextSeqNr + 1
return nextSeqNr
}
func getIpFamily(ip net.IP) int {
if len(ip) <= net.IPv4len {
return syscall.AF_INET
@ -266,7 +262,7 @@ func newNetlinkRequest(proto, flags int) *NetlinkRequest {
Len: uint32(syscall.NLMSG_HDRLEN),
Type: uint16(proto),
Flags: syscall.NLM_F_REQUEST | uint16(flags),
Seq: uint32(getSeq()),
Seq: atomic.AddUint32(&nextSeqNr, 1),
},
}
}

View file

@ -39,6 +39,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@ -87,17 +88,17 @@ func InitServer(job *engine.Job) engine.Status {
c := make(chan os.Signal, 1)
gosignal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
interruptCount := 0
interruptCount := uint32(0)
for sig := range c {
go func() {
go func(sig os.Signal) {
log.Printf("Received signal '%v', starting shutdown of docker...\n", sig)
switch sig {
case os.Interrupt, syscall.SIGTERM:
// If the user really wants to interrupt, let him do so.
if interruptCount < 3 {
interruptCount++
if atomic.LoadUint32(&interruptCount) < 3 {
atomic.AddUint32(&interruptCount, 1)
// Initiate the cleanup only once
if interruptCount == 1 {
if atomic.LoadUint32(&interruptCount) == 1 {
utils.RemovePidFile(srv.daemon.Config().Pidfile)
srv.Close()
} else {
@ -109,7 +110,7 @@ func InitServer(job *engine.Job) engine.Status {
case syscall.SIGQUIT:
}
os.Exit(128 + int(sig.(syscall.Signal)))
}()
}(sig)
}
}()
job.Eng.Hack_SetGlobalVar("httpapi.server", srv)
@ -684,15 +685,7 @@ func (srv *Server) ImagesViz(job *engine.Job) engine.Status {
}
}
reporefs := make(map[string][]string)
for name, repository := range srv.daemon.Repositories().Repositories {
for tag, id := range repository {
reporefs[utils.TruncateID(id)] = append(reporefs[utils.TruncateID(id)], fmt.Sprintf("%s:%s", name, tag))
}
}
for id, repos := range reporefs {
for id, repos := range srv.daemon.Repositories().GetRepoRefs() {
job.Stdout.Write([]byte(" \"" + id + "\" [label=\"" + id + "\\n" + strings.Join(repos, "\\n") + "\",shape=box,fillcolor=\"paleturquoise\",style=\"filled,rounded\"];\n"))
}
job.Stdout.Write([]byte(" base [style=invisible]\n}\n"))
@ -713,6 +706,7 @@ func (srv *Server) Images(job *engine.Job) engine.Status {
return job.Error(err)
}
lookup := make(map[string]*engine.Env)
srv.daemon.Repositories().Lock()
for name, repository := range srv.daemon.Repositories().Repositories {
if job.Getenv("filter") != "" {
if match, _ := path.Match(job.Getenv("filter"), name); !match {
@ -742,6 +736,7 @@ func (srv *Server) Images(job *engine.Job) engine.Status {
}
}
srv.daemon.Repositories().Unlock()
outs := engine.NewTable("Created", len(lookup))
for _, value := range lookup {
@ -1303,9 +1298,6 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
return err
}
}
if err := srv.daemon.Repositories().Save(); err != nil {
return err
}
return nil
}