Integration with Docker Discovery

* integrated hostdiscovery package with the new Docker Discovery
* Integrated hostdiscovery package with libnetwork core
* removed libnetwork_discovery tag
* Introduced driver apis for discovery events
* moved overlay driver to make use of the discovery events
* Using Docker Discovery service.
* Changed integration-tests to make use of the new discovery

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2015-09-18 12:54:08 -07:00
parent 04aa94fa6e
commit 0066225da5
25 changed files with 477 additions and 260 deletions

View file

@ -3,17 +3,21 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/codegangsta/cli"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/reexec"
@ -37,6 +41,8 @@ const (
DefaultUnixSocket = "/var/run/dnet.sock"
cfgFileEnv = "LIBNETWORK_CFG"
defaultCfgFile = "/etc/default/libnetwork.toml"
defaultHeartbeat = time.Duration(10) * time.Second
ttlFactor = 2
)
var epConn *dnetConnection
@ -91,9 +97,66 @@ func processConfig(cfg *config.Config) []config.Option {
if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" {
options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address))
}
dOptions, err := startDiscovery(&cfg.Cluster)
if err != nil {
logrus.Infof("Skipping discovery : %s", err.Error())
} else {
options = append(options, dOptions...)
}
return options
}
func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
if cfg == nil {
return nil, fmt.Errorf("discovery requires a valid configuration")
}
hb := time.Duration(cfg.Heartbeat) * time.Second
if hb == 0 {
hb = defaultHeartbeat
}
logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String())
d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb)
if err != nil {
return nil, err
}
if cfg.Address == "" {
iface, err := net.InterfaceByName("eth0")
if err != nil {
return nil, err
}
addrs, err := iface.Addrs()
if err != nil || len(addrs) == 0 {
return nil, err
}
ip, _, _ := net.ParseCIDR(addrs[0].String())
cfg.Address = ip.String()
}
if ip := net.ParseIP(cfg.Address); ip == nil {
return nil, errors.New("address config should be either ipv4 or ipv6 address")
}
if err := d.Register(cfg.Address + ":0"); err != nil {
return nil, err
}
options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
go func() {
for {
select {
case <-time.After(hb):
if err := d.Register(cfg.Address + ":0"); err != nil {
logrus.Warn(err)
}
}
}
}()
return options, nil
}
func dnetApp(stdout, stderr io.Writer) error {
app := cli.NewApp()

View file

@ -3,8 +3,9 @@ title = "LibNetwork Configuration file"
[daemon]
debug = false
[cluster]
discovery = "token://22aa23948f4f6b31230687689636959e"
discovery = "consul://localhost:8500"
Address = "1.1.1.1"
Heartbeat = 20
[datastore]
embedded = false
[datastore.client]

View file

@ -5,6 +5,7 @@ import (
"github.com/BurntSushi/toml"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/netlabel"
)
@ -27,8 +28,9 @@ type DaemonCfg struct {
// ClusterCfg represents cluster configuration
type ClusterCfg struct {
Discovery string
Watcher discovery.Watcher
Address string
Discovery string
Heartbeat uint64
}
@ -108,6 +110,20 @@ func OptionKVProviderURL(url string) Option {
}
}
// OptionDiscoveryWatcher function returns an option setter for discovery watcher
func OptionDiscoveryWatcher(watcher discovery.Watcher) Option {
return func(c *Config) {
c.Cluster.Watcher = watcher
}
}
// OptionDiscoveryAddress function returns an option setter for self discovery address
func OptionDiscoveryAddress(address string) Option {
return func(c *Config) {
c.Cluster.Address = address
}
}
// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {

View file

@ -47,9 +47,11 @@ import (
"container/heap"
"fmt"
"net"
"strings"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/libnetwork/config"
@ -126,6 +128,7 @@ type controller struct {
sandboxes sandboxTable
cfg *config.Config
globalStore, localStore datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
sync.Mutex
}
@ -157,7 +160,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
}
if err := c.initDiscovery(); err != nil {
if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
// Failing to initalize discovery is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Discovery : %v", err)
@ -185,19 +188,57 @@ func (c *controller) validateHostDiscoveryConfig() bool {
return true
}
func (c *controller) initDiscovery() error {
func (c *controller) initDiscovery(watcher discovery.Watcher) error {
if c.cfg == nil {
return fmt.Errorf("discovery initialization requires a valid configuration")
}
hostDiscovery := hostdiscovery.NewHostDiscovery()
return hostDiscovery.StartDiscovery(&c.cfg.Cluster, c.hostJoinCallback, c.hostLeaveCallback)
c.discovery = hostdiscovery.NewHostDiscovery(watcher)
return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback)
}
func (c *controller) hostJoinCallback(hosts []net.IP) {
func (c *controller) hostJoinCallback(nodes []net.IP) {
c.processNodeDiscovery(nodes, true)
}
func (c *controller) hostLeaveCallback(hosts []net.IP) {
func (c *controller) hostLeaveCallback(nodes []net.IP) {
c.processNodeDiscovery(nodes, false)
}
func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) {
c.Lock()
drivers := []*driverData{}
for _, d := range c.drivers {
drivers = append(drivers, d)
}
c.Unlock()
for _, d := range drivers {
c.pushNodeDiscovery(d, nodes, add)
}
}
func (c *controller) pushNodeDiscovery(d *driverData, nodes []net.IP, add bool) {
var self net.IP
if c.cfg != nil {
addr := strings.Split(c.cfg.Cluster.Address, ":")
self = net.ParseIP(addr[0])
}
if d == nil || d.capability.DataScope != datastore.GlobalScope || nodes == nil {
return
}
for _, node := range nodes {
nodeData := driverapi.NodeDiscoveryData{Address: node.String(), Self: node.Equal(self)}
var err error
if add {
err = d.driver.DiscoverNew(driverapi.NodeDiscovery, nodeData)
} else {
err = d.driver.DiscoverDelete(driverapi.NodeDiscovery, nodeData)
}
if err != nil {
log.Debugf("discovery notification error : %v", err)
}
}
}
func (c *controller) Config() config.Config {
@ -219,9 +260,15 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
c.Unlock()
return driverapi.ErrActiveRegistration(networkType)
}
c.drivers[networkType] = &driverData{driver, capability}
dData := &driverData{driver, capability}
c.drivers[networkType] = dData
hd := c.discovery
c.Unlock()
if hd != nil {
c.pushNodeDiscovery(dData, hd.Fetch(), true)
}
return nil
}
@ -487,6 +534,16 @@ func (c *controller) loadDriver(networkType string) (*driverData, error) {
return dd, nil
}
func (c *controller) getDriver(networkType string) (*driverData, error) {
c.Lock()
defer c.Unlock()
dd, ok := c.drivers[networkType]
if !ok {
return nil, types.NotFoundErrorf("driver %s not found", networkType)
}
return dd, nil
}
func (c *controller) Stop() {
if c.localStore != nil {
c.localStore.KVStore().Close()

View file

@ -204,3 +204,65 @@ If the proxy is asked to remove an endpoint from a sandbox, the remote process s
where `NetworkID` and `EndpointID` have meanings as above. The success response is empty:
{}
### DiscoverNew Notification
libnetwork listens to inbuilt docker discovery notifications and passes it along to the interested drivers.
When the proxy receives a DiscoverNew notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverNew` of the form
{
"DiscoveryType": int,
"DiscoveryData": {
...
}
}
`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number.
`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType
The response indicating success is empty:
`{}`
* Node Discovery
Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data.
{
"DiscoveryType": int,
"DiscoveryData": {
"Address" : string
"self" : bool
}
}
### DiscoverDelete Notification
When the proxy receives a DiscoverDelete notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverDelete` of the form
{
"DiscoveryType": int,
"DiscoveryData": {
...
}
}
`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number.
`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType
The response indicating success is empty:
`{}`
* Node Discovery
Similar to the DiscoverNew call, Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data to be delted.
{
"DiscoveryType": int,
"DiscoveryData": {
"Address" : string
"self" : bool
}
}

View file

@ -40,6 +40,12 @@ type Driver interface {
// Leave method is invoked when a Sandbox detaches from an endpoint.
Leave(nid, eid string) error
// DiscoverNew is a notification for a new discovery event, Example:a new node joining a cluster
DiscoverNew(dType DiscoveryType, data interface{}) error
// DiscoverDelete is a notification for a discovery delete event, Example:a node leaving a cluster
DiscoverDelete(dType DiscoveryType, data interface{}) error
// Type returns the the type of this driver, the network type this driver manages
Type() string
}
@ -106,3 +112,17 @@ type DriverCallback interface {
type Capability struct {
DataScope datastore.DataScope
}
// DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on
type DiscoveryType int
const (
// NodeDiscovery represents Node join/leave events provided by discovery
NodeDiscovery = iota + 1
)
// NodeDiscoveryData represents the structure backing the node discovery data json string
type NodeDiscoveryData struct {
Address string
Self bool
}

View file

@ -1375,6 +1375,16 @@ func (d *driver) Type() string {
return networkType
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}
func parseEndpointOptions(epOptions map[string]interface{}) (*endpointConfiguration, error) {
if epOptions == nil {
return nil, nil

View file

@ -65,3 +65,13 @@ func (d *driver) Leave(nid, eid string) error {
func (d *driver) Type() string {
return networkType
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

View file

@ -65,3 +65,13 @@ func (d *driver) Leave(nid, eid string) error {
func (d *driver) Type() string {
return networkType
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

View file

@ -2,6 +2,7 @@ package overlay
import (
"fmt"
"net"
"github.com/docker/libnetwork/driverapi"
"github.com/vishvananda/netlink"
@ -73,7 +74,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
}
d.peerDbAdd(nid, eid, ep.addr.IP, ep.mac,
d.serfInstance.LocalMember().Addr, true)
net.ParseIP(d.bindAddress), true)
d.notifyCh <- ovNotify{
action: "join",
nid: nid,

View file

@ -156,23 +156,8 @@ func (n *network) initSandbox() error {
return fmt.Errorf("could not create bridge inside the network sandbox: %v", err)
}
vxlanName, err := createVxlan(n.vxlanID())
if err != nil {
return err
}
if err := sbox.AddInterface(vxlanName, "vxlan",
sbox.InterfaceOptions().Master("bridge1")); err != nil {
return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v",
err)
}
n.vxlanName = vxlanName
n.setSandbox(sbox)
n.driver.peerDbUpdateSandbox(n.id)
var nlSock *nl.NetlinkSocket
sbox.InvokeFunc(func() {
nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
@ -182,7 +167,27 @@ func (n *network) initSandbox() error {
})
go n.watchMiss(nlSock)
return n.initVxlan()
}
func (n *network) initVxlan() error {
var vxlanName string
n.Lock()
sbox := n.sbox
n.Unlock()
vxlanName, err := createVxlan(n.vxlanID())
if err != nil {
return err
}
if err = sbox.AddInterface(vxlanName, "vxlan",
sbox.InterfaceOptions().Master("bridge1")); err != nil {
return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", err)
}
n.vxlanName = vxlanName
n.driver.peerDbUpdateSandbox(n.id)
return nil
}

View file

@ -35,46 +35,12 @@ func (l *logWriter) Write(p []byte) (int, error) {
return len(p), nil
}
func getBindAddr(ifaceName string) (string, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
}
addrs, err := iface.Addrs()
if err != nil {
return "", fmt.Errorf("failed to get interface addresses: %v", err)
}
for _, a := range addrs {
addr, ok := a.(*net.IPNet)
if !ok {
continue
}
addrIP := addr.IP
if addrIP.IsLinkLocalUnicast() {
continue
}
return addrIP.String(), nil
}
return "", fmt.Errorf("failed to get bind address")
}
func (d *driver) serfInit() error {
var err error
config := serf.DefaultConfig()
config.Init()
if d.ifaceName != "" {
bindAddr, err := getBindAddr(d.ifaceName)
if err != nil {
return fmt.Errorf("getBindAddr error: %v", err)
}
config.MemberlistConfig.BindAddr = bindAddr
}
config.MemberlistConfig.BindAddr = d.bindAddress
d.eventCh = make(chan serf.Event, 4)
config.EventCh = d.eventCh
@ -93,13 +59,6 @@ func (d *driver) serfInit() error {
}
}()
if d.neighIP != "" {
if _, err = s.Join([]string{d.neighIP}, false); err != nil {
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
d.neighIP, err)
}
}
d.serfInstance = s
d.notifyCh = make(chan ovNotify)
@ -109,6 +68,17 @@ func (d *driver) serfInit() error {
return nil
}
func (d *driver) serfJoin() error {
if d.neighIP == "" {
return fmt.Errorf("no neighbor to join")
}
if _, err := d.serfInstance.Join([]string{d.neighIP}, false); err != nil {
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
d.neighIP, err)
}
return nil
}
func (d *driver) notifyEvent(event ovNotify) {
n := d.network(event.nid)
ep := n.endpoint(event.eid)
@ -246,3 +216,13 @@ func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
}
}
}
func (d *driver) isSerfAlive() bool {
d.Lock()
serfInstance := d.serfInstance
d.Unlock()
if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
return false
}
return true
}

View file

@ -6,6 +6,7 @@ import (
"net"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
@ -29,7 +30,7 @@ type driver struct {
eventCh chan serf.Event
notifyCh chan ovNotify
exitCh chan chan struct{}
ifaceName string
bindAddress string
neighIP string
config map[string]interface{}
peerDb peerNetworkMap
@ -38,7 +39,8 @@ type driver struct {
store datastore.DataStore
ipAllocator *idm.Idm
vxlanIdm *idm.Idm
sync.Once
once sync.Once
joinOnce sync.Once
sync.Mutex
}
@ -107,15 +109,7 @@ func (d *driver) configure() error {
return nil
}
d.Do(func() {
if ifaceName, ok := d.config[netlabel.OverlayBindInterface]; ok {
d.ifaceName = ifaceName.(string)
}
if neighIP, ok := d.config[netlabel.OverlayNeighborIP]; ok {
d.neighIP = neighIP.(string)
}
d.once.Do(func() {
provider, provOk := d.config[netlabel.KVProvider]
provURL, urlOk := d.config[netlabel.KVProviderURL]
@ -148,12 +142,6 @@ func (d *driver) configure() error {
err = fmt.Errorf("failed to initalize ipam id manager: %v", err)
return
}
err = d.serfInit()
if err != nil {
err = fmt.Errorf("initializing serf instance failed: %v", err)
}
})
return err
@ -162,3 +150,50 @@ func (d *driver) configure() error {
func (d *driver) Type() string {
return networkType
}
func (d *driver) nodeJoin(node string, self bool) {
if self && node != "" && !d.isSerfAlive() {
d.Lock()
d.bindAddress = node
d.Unlock()
err := d.serfInit()
if err != nil {
logrus.Errorf("initializing serf instance failed: %v", err)
return
}
}
if d.serfInstance != nil && !self && node != "" {
var err error
d.joinOnce.Do(func() {
d.Lock()
d.neighIP = node
d.Unlock()
err = d.serfJoin()
})
if err != nil {
logrus.Errorf("joining serf neighbor %s failed: %v", node, err)
d.Lock()
d.joinOnce = sync.Once{}
d.Unlock()
return
}
}
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
if dType == driverapi.NodeDiscovery {
nodeData, ok := data.(driverapi.NodeDiscoveryData)
if !ok {
return fmt.Errorf("invalid discovery data")
}
d.nodeJoin(nodeData.Address, nodeData.Self)
}
return nil
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

View file

@ -1,11 +1,11 @@
package overlay
import (
"net"
"testing"
"time"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
_ "github.com/docker/libnetwork/testutils"
)
@ -17,16 +17,28 @@ type driverTester struct {
const testNetworkType = "overlay"
func setupDriver(t *testing.T) *driverTester {
opt := make(map[string]interface{})
opt[netlabel.OverlayBindInterface] = "eth0"
dt := &driverTester{t: t}
if err := Init(dt, opt); err != nil {
if err := Init(dt, nil); err != nil {
t.Fatal(err)
}
if err := dt.d.configure(); err != nil {
t.Fatal(err)
}
iface, err := net.InterfaceByName("eth0")
if err != nil {
t.Fatal(err)
}
addrs, err := iface.Addrs()
if err != nil || len(addrs) == 0 {
t.Fatal(err)
}
data := driverapi.NodeDiscoveryData{
Address: addrs[0].String(),
Self: true,
}
dt.d.DiscoverNew(driverapi.NodeDiscovery, data)
return dt
}

View file

@ -4,7 +4,11 @@ with a remote driver.
*/
package api
import "net"
import (
"net"
"github.com/docker/libnetwork/driverapi"
)
// Response is the basic response structure used in all responses.
type Response struct {
@ -143,3 +147,14 @@ type LeaveRequest struct {
type LeaveResponse struct {
Response
}
// DiscoveryNotification represents a discovery notification
type DiscoveryNotification struct {
DiscoveryType driverapi.DiscoveryType
DiscoveryData interface{}
}
// DiscoveryResponse is used by libnetwork to log any plugin error processing the discovery notifications
type DiscoveryResponse struct {
Response
}

View file

@ -247,6 +247,30 @@ func (d *driver) Type() string {
return d.networkType
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
if dType != driverapi.NodeDiscovery {
return fmt.Errorf("Unknown discovery type : %v", dType)
}
notif := &api.DiscoveryNotification{
DiscoveryType: dType,
DiscoveryData: data,
}
return d.call("DiscoverNew", notif, &api.DiscoveryResponse{})
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
if dType != driverapi.NodeDiscovery {
return fmt.Errorf("Unknown discovery type : %v", dType)
}
notif := &api.DiscoveryNotification{
DiscoveryType: dType,
DiscoveryData: data,
}
return d.call("DiscoverDelete", notif, &api.DiscoveryResponse{})
}
func parseStaticRoutes(r api.JoinResponse) ([]*types.StaticRoute, error) {
var routes = make([]*types.StaticRoute, len(r.StaticRoutes))
for i, inRoute := range r.StaticRoutes {

View file

@ -335,6 +335,12 @@ func TestRemoteDriver(t *testing.T) {
},
}
})
handle(t, mux, "DiscoverNew", func(msg map[string]interface{}) interface{} {
return map[string]string{}
})
handle(t, mux, "DiscoverDelete", func(msg map[string]interface{}) interface{} {
return map[string]interface{}{}
})
p, err := plugins.Get(plugin, driverapi.NetworkPluginEndpointType)
if err != nil {
@ -382,6 +388,16 @@ func TestRemoteDriver(t *testing.T) {
if err = d.DeleteNetwork(netID); err != nil {
t.Fatal(err)
}
data := driverapi.NodeDiscoveryData{
Address: "192.168.1.1",
}
if err = d.DiscoverNew(driverapi.NodeDiscovery, data); err != nil {
t.Fatal(err)
}
if err = d.DiscoverDelete(driverapi.NodeDiscovery, data); err != nil {
t.Fatal(err)
}
}
type failEndpoint struct {

View file

@ -52,3 +52,13 @@ func (d *driver) Leave(nid, eid string) error {
func (d *driver) Type() string {
return networkType
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

View file

@ -1,73 +1,48 @@
// +build libnetwork_discovery
package hostdiscovery
import (
"errors"
"fmt"
"net"
"sync"
"time"
log "github.com/Sirupsen/logrus"
mapset "github.com/deckarep/golang-set"
"github.com/docker/libnetwork/config"
"github.com/docker/swarm/discovery"
// Anonymous import will be removed after we upgrade to latest swarm
_ "github.com/docker/swarm/discovery/file"
// Anonymous import will be removed after we upgrade to latest swarm
_ "github.com/docker/swarm/discovery/kv"
// Anonymous import will be removed after we upgrade to latest swarm
_ "github.com/docker/swarm/discovery/nodes"
// Anonymous import will be removed after we upgrade to latest swarm
_ "github.com/docker/swarm/discovery/token"
"github.com/docker/docker/pkg/discovery"
// Including KV
_ "github.com/docker/docker/pkg/discovery/kv"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/types"
)
const defaultHeartbeat = time.Duration(10) * time.Second
const TTLFactor = 3
type hostDiscovery struct {
discovery discovery.Discovery
nodes mapset.Set
stopChan chan struct{}
watcher discovery.Watcher
nodes mapset.Set
stopChan chan struct{}
sync.Mutex
}
// NewHostDiscovery function creates a host discovery object
func NewHostDiscovery() HostDiscovery {
return &hostDiscovery{nodes: mapset.NewSet(), stopChan: make(chan struct{})}
func init() {
consul.Register()
etcd.Register()
zookeeper.Register()
}
func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
if cfg == nil {
return fmt.Errorf("discovery requires a valid configuration")
}
hb := time.Duration(cfg.Heartbeat) * time.Second
if hb == 0 {
hb = defaultHeartbeat
}
d, err := discovery.New(cfg.Discovery, hb, TTLFactor*hb)
if err != nil {
return err
}
if ip := net.ParseIP(cfg.Address); ip == nil {
return errors.New("address config should be either ipv4 or ipv6 address")
}
if err := d.Register(cfg.Address + ":0"); err != nil {
return err
}
// NewHostDiscovery function creates a host discovery object
func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
}
func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error {
h.Lock()
h.discovery = d
d := h.watcher
h.Unlock()
if d == nil {
return types.BadRequestErrorf("invalid discovery watcher")
}
discoveryCh, errCh := d.Watch(h.stopChan)
go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback)
go h.sustainHeartbeat(d, hb, cfg)
return nil
}
@ -77,7 +52,9 @@ func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-ch
case entries := <-ch:
h.processCallback(entries, joinCallback, leaveCallback)
case err := <-errCh:
log.Errorf("discovery error: %v", err)
if err != nil {
log.Errorf("discovery error: %v", err)
}
case <-h.stopChan:
return
}
@ -87,26 +64,13 @@ func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-ch
func (h *hostDiscovery) StopDiscovery() error {
h.Lock()
stopChan := h.stopChan
h.discovery = nil
h.watcher = nil
h.Unlock()
close(stopChan)
return nil
}
func (h *hostDiscovery) sustainHeartbeat(d discovery.Discovery, hb time.Duration, config *config.ClusterCfg) {
for {
select {
case <-h.stopChan:
return
case <-time.After(hb):
if err := d.Register(config.Address + ":0"); err != nil {
log.Warn(err)
}
}
}
}
func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) {
updated := hosts(entries)
h.Lock()
@ -135,14 +99,14 @@ func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []ne
return
}
func (h *hostDiscovery) Fetch() ([]net.IP, error) {
func (h *hostDiscovery) Fetch() []net.IP {
h.Lock()
defer h.Unlock()
ips := []net.IP{}
for _, ipstr := range h.nodes.ToSlice() {
ips = append(ips, net.ParseIP(ipstr.(string)))
}
return ips, nil
return ips
}
func hosts(entries discovery.Entries) mapset.Set {

View file

@ -1,10 +1,6 @@
package hostdiscovery
import (
"net"
"github.com/docker/libnetwork/config"
)
import "net"
// JoinCallback provides a callback event for new node joining the cluster
type JoinCallback func(entries []net.IP)
@ -14,10 +10,10 @@ type LeaveCallback func(entries []net.IP)
// HostDiscovery primary interface
type HostDiscovery interface {
// StartDiscovery initiates the discovery process and provides appropriate callbacks
StartDiscovery(*config.ClusterCfg, JoinCallback, LeaveCallback) error
//Watch Node join and leave cluster events
Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error
// StopDiscovery stops the discovery perocess
StopDiscovery() error
// Fetch returns a list of host IPs that are currently discovered
Fetch() ([]net.IP, error)
Fetch() []net.IP
}

View file

@ -1,28 +0,0 @@
// +build !libnetwork_discovery
package hostdiscovery
import (
"net"
"github.com/docker/libnetwork/config"
)
type hostDiscovery struct{}
// NewHostDiscovery function creates a host discovery object
func NewHostDiscovery() HostDiscovery {
return &hostDiscovery{}
}
func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
return nil
}
func (h *hostDiscovery) StopDiscovery() error {
return nil
}
func (h *hostDiscovery) Fetch() ([]net.IP, error) {
return []net.IP{}, nil
}

View file

@ -1,80 +1,15 @@
// +build libnetwork_discovery
package hostdiscovery
import (
"net"
"testing"
"time"
mapset "github.com/deckarep/golang-set"
_ "github.com/docker/libnetwork/testutils"
"github.com/docker/libnetwork/config"
"github.com/docker/swarm/discovery"
"github.com/docker/docker/pkg/discovery"
)
func TestDiscovery(t *testing.T) {
_, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second)
if err != nil {
t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
}
hd := NewHostDiscovery()
config, err := config.ParseConfig("libnetwork.toml")
if err != nil {
t.Fatal(err)
}
err = hd.StartDiscovery(&config.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Duration(config.Cluster.Heartbeat*2) * time.Second)
hosts, err := hd.Fetch()
if err != nil {
t.Fatal(err)
}
found := false
for _, ip := range hosts {
if ip.Equal(net.ParseIP(config.Cluster.Address)) {
found = true
}
}
if !found {
t.Fatalf("Expecting hosts. But none discovered ")
}
err = hd.StopDiscovery()
if err != nil {
t.Fatal(err)
}
}
func TestBadDiscovery(t *testing.T) {
_, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second)
if err != nil {
t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
}
hd := NewHostDiscovery()
cfg := &config.Config{}
cfg.Cluster.Discovery = ""
err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
if err == nil {
t.Fatal("Invalid discovery configuration must fail")
}
cfg, err = config.ParseConfig("libnetwork.toml")
if err != nil {
t.Fatal(err)
}
cfg.Cluster.Address = "invalid"
err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
if err == nil {
t.Fatal("Invalid discovery address configuration must fail")
}
}
func TestDiff(t *testing.T) {
existing := mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
addedIP := "3.3.3.3"

View file

@ -1,6 +1,6 @@
title = "LibNetwork Configuration file"
[cluster]
discovery = "token://08469efb104bce980931ed24c8eb03a2"
Address = "1.1.1.1"
discovery = "consul://localhost:8500"
Address = "6.5.5.5"
Heartbeat = 3

View file

@ -70,6 +70,9 @@ title = "LibNetwork Configuration file"
[daemon]
debug = false
labels = [${labels}]
[cluster]
discovery = "consul://${bridge_ip}:8500"
Heartbeat = 10
[globalstore]
embedded = false
[globalstore.client]

View file

@ -81,9 +81,9 @@ unset cmap[dnet-3-multi]
## Setup
start_dnet 1 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet-1-overlay]=dnet-1-overlay
start_dnet 2 overlay $(docker inspect --format '{{.NetworkSettings.IPAddress}}' dnet-1-overlay) 1>>${INTEGRATION_ROOT}/test.log 2>&1
start_dnet 2 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet-2-overlay]=dnet-2-overlay
start_dnet 3 overlay $(docker inspect --format '{{.NetworkSettings.IPAddress}}' dnet-2-overlay) 1>>${INTEGRATION_ROOT}/test.log 2>&1
start_dnet 3 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet-3-overlay]=dnet-3-overlay
## Run the test cases