Merge pull request #17278 from mavenugo/etchosts

Vendoring libnetwork & libkv with fixes
This commit is contained in:
Tibor Vass 2015-10-22 19:02:02 -04:00
commit 37da495d4c
15 changed files with 183 additions and 31 deletions

View file

@ -21,12 +21,12 @@ clone git github.com/vdemeester/shakers 3c10293ce22b900c27acad7b28656196fcc2f73b
clone git golang.org/x/net 3cffabab72adf04f8e3b01c5baf775361837b5fe https://github.com/golang/net.git
#get libnetwork packages
clone git github.com/docker/libnetwork f3c8ebf46b890d4612c5d98e792280d13abdb761
clone git github.com/docker/libnetwork bf041154d27ed34ed39722328c8f1b0144a56fe2
clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
clone git github.com/hashicorp/memberlist 9a1e242e454d2443df330bdd51a436d5a9058fc4
clone git github.com/hashicorp/serf 7151adcef72687bf95f451a2e0ba15cb19412bf2
clone git github.com/docker/libkv 749af6c5b3fb755bec1738cc5e0d3a6f1574d730
clone git github.com/docker/libkv c2aac5dbbaa5c872211edea7c0f32b3bd67e7410
clone git github.com/vishvananda/netns 604eaf189ee867d8c147fafc28def2394e878d25
clone git github.com/vishvananda/netlink 4b5dce31de6d42af5bb9811c6d265472199e0fec
clone git github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060

View file

@ -794,6 +794,26 @@ func (s *DockerDaemonSuite) TestDaemonBridgeFixedCidr(c *check.C) {
}
}
func (s *DockerDaemonSuite) TestDaemonBridgeFixedCidrFixedCIDREqualBridgeNetwork(c *check.C) {
d := s.d
bridgeName := "external-bridge"
bridgeIP := "172.27.42.1/16"
out, err := createInterface(c, "bridge", bridgeName, bridgeIP)
c.Assert(err, check.IsNil, check.Commentf(out))
defer deleteInterface(c, bridgeName)
err = d.StartWithBusybox("--bridge", bridgeName, "--fixed-cidr", bridgeIP)
c.Assert(err, check.IsNil)
defer s.d.Restart()
out, err = d.Cmd("run", "-d", "busybox", "top")
c.Assert(err, check.IsNil, check.Commentf(out))
cid1 := strings.TrimSpace(out)
defer d.Cmd("stop", cid1)
}
func (s *DockerDaemonSuite) TestDaemonDefaultGatewayIPv4Implicit(c *check.C) {
defaultNetworkBridge := "docker0"
deleteInterface(c, defaultNetworkBridge)

View file

@ -330,6 +330,9 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
}
val = bucket.Get([]byte(key))
if val == nil {
return store.ErrKeyNotFound
}
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
if dbIndex != previous.LastIndex {
return store.ErrKeyModified

View file

@ -467,6 +467,13 @@ func (s *Consul) AtomicDelete(key string, previous *store.KVPair) (bool, error)
}
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
// Extra Get operation to check on the key
_, err := s.Get(key)
if err != nil && err == store.ErrKeyNotFound {
return false, err
}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {

View file

@ -368,6 +368,10 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
_, err := s.client.Delete(context.Background(), s.normalize(key), delOpts)
if err != nil {
if etcdError, ok := err.(etcd.Error); ok {
// Key Not Found
if etcdError.Code == etcd.ErrorCodeKeyNotFound {
return false, store.ErrKeyNotFound
}
// Compare failed
if etcdError.Code == etcd.ErrorCodeTestFailed {
return false, store.ErrKeyModified

View file

@ -347,9 +347,15 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro
err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
if err != nil {
// Key not found
if err == zk.ErrNoNode {
return false, store.ErrKeyNotFound
}
// Compare failed
if err == zk.ErrBadVersion {
return false, store.ErrKeyModified
}
// General store error
return false, err
}
return true, nil

View file

@ -218,7 +218,14 @@ func (c *controller) initDiscovery(watcher discovery.Watcher) error {
}
c.discovery = hostdiscovery.NewHostDiscovery(watcher)
return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback)
return c.discovery.Watch(c.activeCallback, c.hostJoinCallback, c.hostLeaveCallback)
}
func (c *controller) activeCallback() {
ds := c.getStore(datastore.GlobalScope)
if ds != nil && !ds.Active() {
ds.RestartWatch()
}
}
func (c *controller) hostJoinCallback(nodes []net.IP) {

View file

@ -34,6 +34,10 @@ type DataStore interface {
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// RestartWatch retriggers stopped Watches
RestartWatch()
// Active returns if the store is active
Active() bool
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
@ -53,9 +57,11 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
scope string
store store.Store
cache *cache
watchCh chan struct{}
active bool
sync.Mutex
}
@ -204,7 +210,7 @@ func newClient(scope string, kv string, addr string, config *store.Config, cache
return nil, err
}
ds := &datastore{scope: scope, store: store}
ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{})}
if cached {
ds.cache = newCache(ds)
}
@ -239,6 +245,10 @@ func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Active() bool {
return ds.active
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
@ -259,6 +269,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
kvoCh := make(chan KVObject)
go func() {
retry_watch:
var err error
// Make sure to get a new instance of watch channel
ds.Lock()
watchCh := ds.watchCh
ds.Unlock()
loop:
for {
select {
case <-stopCh:
@ -269,12 +288,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
// for the watch can exit resulting in a nil value in
// channel.
if kvPair == nil {
close(sCh)
return
ds.Lock()
ds.active = false
ds.Unlock()
break loop
}
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
if err = dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
@ -283,11 +305,31 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
kvoCh <- dstO
}
}
// Wait on watch channel for a re-trigger when datastore becomes active
<-watchCh
kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
}
goto retry_watch
}()
return kvoCh, nil
}
func (ds *datastore) RestartWatch() {
ds.Lock()
defer ds.Unlock()
ds.active = true
watchCh := ds.watchCh
ds.watchCh = make(chan struct{})
close(watchCh)
}
func (ds *datastore) KVStore() store.Store {
return ds.store
}

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"regexp"
"sync"
)
// Record Structure for a single host record
@ -21,14 +22,47 @@ func (r Record) WriteTo(w io.Writer) (int64, error) {
return int64(n), err
}
// Default hosts config records slice
var defaultContent = []Record{
{Hosts: "localhost", IP: "127.0.0.1"},
{Hosts: "localhost ip6-localhost ip6-loopback", IP: "::1"},
{Hosts: "ip6-localnet", IP: "fe00::0"},
{Hosts: "ip6-mcastprefix", IP: "ff00::0"},
{Hosts: "ip6-allnodes", IP: "ff02::1"},
{Hosts: "ip6-allrouters", IP: "ff02::2"},
var (
// Default hosts config records slice
defaultContent = []Record{
{Hosts: "localhost", IP: "127.0.0.1"},
{Hosts: "localhost ip6-localhost ip6-loopback", IP: "::1"},
{Hosts: "ip6-localnet", IP: "fe00::0"},
{Hosts: "ip6-mcastprefix", IP: "ff00::0"},
{Hosts: "ip6-allnodes", IP: "ff02::1"},
{Hosts: "ip6-allrouters", IP: "ff02::2"},
}
// A cache of path level locks for synchronizing /etc/hosts
// updates on a file level
pathMap = make(map[string]*sync.Mutex)
// A package level mutex to synchronize the cache itself
pathMutex sync.Mutex
)
func pathLock(path string) func() {
pathMutex.Lock()
defer pathMutex.Unlock()
pl, ok := pathMap[path]
if !ok {
pl = &sync.Mutex{}
pathMap[path] = pl
}
pl.Lock()
return func() {
pl.Unlock()
}
}
// Drop drops the path string from the path cache
func Drop(path string) {
pathMutex.Lock()
defer pathMutex.Unlock()
delete(pathMap, path)
}
// Build function
@ -36,6 +70,8 @@ var defaultContent = []Record{
// IP, hostname, and domainname set main record leave empty for no master record
// extraContent is an array of extra host records.
func Build(path, IP, hostname, domainname string, extraContent []Record) error {
defer pathLock(path)()
content := bytes.NewBuffer(nil)
if IP != "" {
//set main record
@ -68,6 +104,8 @@ func Build(path, IP, hostname, domainname string, extraContent []Record) error {
// Add adds an arbitrary number of Records to an already existing /etc/hosts file
func Add(path string, recs []Record) error {
defer pathLock(path)()
if len(recs) == 0 {
return nil
}
@ -95,6 +133,8 @@ func Add(path string, recs []Record) error {
// Delete deletes an arbitrary number of Records already existing in /etc/hosts file
func Delete(path string, recs []Record) error {
defer pathLock(path)()
if len(recs) == 0 {
return nil
}
@ -118,6 +158,8 @@ func Delete(path string, recs []Record) error {
// IP is new IP address
// hostname is hostname to search for to replace IP
func Update(path, IP, hostname string) error {
defer pathLock(path)()
old, err := ioutil.ReadFile(path)
if err != nil {
return err

View file

@ -34,7 +34,7 @@ func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
}
func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error {
func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
h.Lock()
d := h.watcher
h.Unlock()
@ -42,15 +42,16 @@ func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCall
return types.BadRequestErrorf("invalid discovery watcher")
}
discoveryCh, errCh := d.Watch(h.stopChan)
go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback)
go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
return nil
}
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) {
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
for {
select {
case entries := <-ch:
h.processCallback(entries, joinCallback, leaveCallback)
h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
case err := <-errCh:
if err != nil {
log.Errorf("discovery error: %v", err)
@ -71,7 +72,8 @@ func (h *hostDiscovery) StopDiscovery() error {
return nil
}
func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) {
func (h *hostDiscovery) processCallback(entries discovery.Entries,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
updated := hosts(entries)
h.Lock()
existing := h.nodes
@ -79,6 +81,7 @@ func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback
h.nodes = updated
h.Unlock()
activeCallback()
if len(added) > 0 {
joinCallback(added)
}

View file

@ -5,13 +5,16 @@ import "net"
// JoinCallback provides a callback event for new node joining the cluster
type JoinCallback func(entries []net.IP)
// ActiveCallback provides a callback event for active discovery event
type ActiveCallback func()
// LeaveCallback provides a callback event for node leaving the cluster
type LeaveCallback func(entries []net.IP)
// HostDiscovery primary interface
type HostDiscovery interface {
//Watch Node join and leave cluster events
Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error
Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error
// StopDiscovery stops the discovery perocess
StopDiscovery() error
// Fetch returns a list of host IPs that are currently discovered

View file

@ -250,11 +250,6 @@ func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
ones, bits := pool.Mask.Size()
numAddresses := uint64(1 << uint(bits-ones))
if ipVer == v4 {
// Do not let broadcast address be reserved
numAddresses--
}
// Allow /64 subnet
if ipVer == v6 && numAddresses == 0 {
numAddresses--
@ -270,6 +265,11 @@ func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
// Do the same for IPv6 so that bridge ip starts with XXXX...::1
h.Set(0)
// Do not let broadcast address be reserved
if ipVer == v4 {
h.Set(numAddresses - 1)
}
a.Lock()
a.addresses[key] = h
a.Unlock()

View file

@ -78,7 +78,11 @@ func (a *allocator) ReleasePool(poolID string) error {
// RequestAddress requests an address from the address pool
func (a *allocator) RequestAddress(poolID string, address net.IP, options map[string]string) (*net.IPNet, map[string]string, error) {
var prefAddress string
var (
prefAddress string
retAddress *net.IPNet
err error
)
if address != nil {
prefAddress = address.String()
}
@ -87,7 +91,9 @@ func (a *allocator) RequestAddress(poolID string, address net.IP, options map[st
if err := a.call("RequestAddress", req, res); err != nil {
return nil, nil, err
}
retAddress, err := types.ParseCIDR(res.Address)
if res.Address != "" {
retAddress, err = types.ParseCIDR(res.Address)
}
return retAddress, res.Data, err
}

View file

@ -182,6 +182,10 @@ func (sb *sandbox) Delete() error {
}
}
// Container is going away. Path cache in etchosts is most
// likely not required any more. Drop it.
etchosts.Drop(sb.config.hostsPath)
if sb.osSbox != nil {
sb.osSbox.Destroy()
}

View file

@ -308,6 +308,11 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
c.Lock()
nw.localEps[ep.ID()] = ep
// If we had learned that from the kv store remove it
// from remote ep list now that we know that this is
// indeed a local endpoint
delete(nw.remoteEps, ep.ID())
c.Unlock()
return
}