123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- package overlay
- //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
- import (
- "fmt"
- "net"
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/docker/libnetwork/datastore"
- "github.com/docker/libnetwork/discoverapi"
- "github.com/docker/libnetwork/driverapi"
- "github.com/docker/libnetwork/idm"
- "github.com/docker/libnetwork/netlabel"
- "github.com/docker/libnetwork/osl"
- "github.com/docker/libnetwork/types"
- "github.com/hashicorp/serf/serf"
- )
- const (
- networkType = "overlay"
- vethPrefix = "veth"
- vethLen = 7
- vxlanIDStart = 256
- vxlanIDEnd = (1 << 24) - 1
- vxlanPort = 4789
- vxlanEncap = 50
- secureOption = "encrypted"
- )
- var initVxlanIdm = make(chan (bool), 1)
- type driver struct {
- eventCh chan serf.Event
- notifyCh chan ovNotify
- exitCh chan chan struct{}
- bindAddress string
- advertiseAddress string
- neighIP string
- config map[string]interface{}
- peerDb peerNetworkMap
- secMap *encrMap
- serfInstance *serf.Serf
- networks networkTable
- store datastore.DataStore
- localStore datastore.DataStore
- vxlanIdm *idm.Idm
- once sync.Once
- joinOnce sync.Once
- localJoinOnce sync.Once
- keys []*key
- sync.Mutex
- }
- // Init registers a new instance of overlay driver
- func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
- c := driverapi.Capability{
- DataScope: datastore.GlobalScope,
- ConnectivityScope: datastore.GlobalScope,
- }
- d := &driver{
- networks: networkTable{},
- peerDb: peerNetworkMap{
- mp: map[string]*peerMap{},
- },
- secMap: &encrMap{nodes: map[string][]*spi{}},
- config: config,
- }
- if data, ok := config[netlabel.GlobalKVClient]; ok {
- var err error
- dsc, ok := data.(discoverapi.DatastoreConfigData)
- if !ok {
- return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
- }
- d.store, err = datastore.NewDataStoreFromConfig(dsc)
- if err != nil {
- return types.InternalErrorf("failed to initialize data store: %v", err)
- }
- }
- if data, ok := config[netlabel.LocalKVClient]; ok {
- var err error
- dsc, ok := data.(discoverapi.DatastoreConfigData)
- if !ok {
- return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
- }
- d.localStore, err = datastore.NewDataStoreFromConfig(dsc)
- if err != nil {
- return types.InternalErrorf("failed to initialize local data store: %v", err)
- }
- }
- if err := d.restoreEndpoints(); err != nil {
- logrus.Warnf("Failure during overlay endpoints restore: %v", err)
- }
- // If an error happened when the network join the sandbox during the endpoints restore
- // we should reset it now along with the once variable, so that subsequent endpoint joins
- // outside of the restore path can potentially fix the network join and succeed.
- for nid, n := range d.networks {
- if n.initErr != nil {
- logrus.Infof("resetting init error and once variable for network %s after unsuccessful endpoint restore: %v", nid, n.initErr)
- n.initErr = nil
- n.once = &sync.Once{}
- }
- }
- return dc.RegisterDriver(networkType, d, c)
- }
- // Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
- func (d *driver) restoreEndpoints() error {
- if d.localStore == nil {
- logrus.Warn("Cannot restore overlay endpoints because local datastore is missing")
- return nil
- }
- kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
- if err != nil && err != datastore.ErrKeyNotFound {
- return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
- }
- if err == datastore.ErrKeyNotFound {
- return nil
- }
- for _, kvo := range kvol {
- ep := kvo.(*endpoint)
- n := d.network(ep.nid)
- if n == nil {
- logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
- logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
- if err := d.deleteEndpointFromStore(ep); err != nil {
- logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
- }
- continue
- }
- n.addEndpoint(ep)
- s := n.getSubnetforIP(ep.addr)
- if s == nil {
- return fmt.Errorf("could not find subnet for endpoint %s", ep.id)
- }
- if err := n.joinSandbox(true); err != nil {
- return fmt.Errorf("restore network sandbox failed: %v", err)
- }
- if err := n.joinSubnetSandbox(s, true); err != nil {
- return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err)
- }
- Ifaces := make(map[string][]osl.IfaceOption)
- vethIfaceOption := make([]osl.IfaceOption, 1)
- vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName))
- Ifaces[fmt.Sprintf("%s+%s", "veth", "veth")] = vethIfaceOption
- err := n.sbox.Restore(Ifaces, nil, nil, nil)
- if err != nil {
- return fmt.Errorf("failed to restore overlay sandbox: %v", err)
- }
- n.incEndpointCount()
- d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
- }
- return nil
- }
- // Fini cleans up the driver resources
- func Fini(drv driverapi.Driver) {
- d := drv.(*driver)
- if d.exitCh != nil {
- waitCh := make(chan struct{})
- d.exitCh <- waitCh
- <-waitCh
- }
- }
- func (d *driver) configure() error {
- if d.store == nil {
- return nil
- }
- if d.vxlanIdm == nil {
- return d.initializeVxlanIdm()
- }
- return nil
- }
- func (d *driver) initializeVxlanIdm() error {
- var err error
- initVxlanIdm <- true
- defer func() { <-initVxlanIdm }()
- if d.vxlanIdm != nil {
- return nil
- }
- d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
- if err != nil {
- return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
- }
- return nil
- }
- func (d *driver) Type() string {
- return networkType
- }
- func (d *driver) IsBuiltIn() bool {
- return true
- }
- func validateSelf(node string) error {
- advIP := net.ParseIP(node)
- if advIP == nil {
- return fmt.Errorf("invalid self address (%s)", node)
- }
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return fmt.Errorf("Unable to get interface addresses %v", err)
- }
- for _, addr := range addrs {
- ip, _, err := net.ParseCIDR(addr.String())
- if err == nil && ip.Equal(advIP) {
- return nil
- }
- }
- return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
- }
- func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
- if self && !d.isSerfAlive() {
- d.Lock()
- d.advertiseAddress = advertiseAddress
- d.bindAddress = bindAddress
- d.Unlock()
- // If containers are already running on this network update the
- // advertiseaddress in the peerDB
- d.localJoinOnce.Do(func() {
- d.peerDBUpdateSelf()
- })
- // If there is no cluster store there is no need to start serf.
- if d.store != nil {
- if err := validateSelf(advertiseAddress); err != nil {
- logrus.Warnf("%s", err.Error())
- }
- err := d.serfInit()
- if err != nil {
- logrus.Errorf("initializing serf instance failed: %v", err)
- d.Lock()
- d.advertiseAddress = ""
- d.bindAddress = ""
- d.Unlock()
- return
- }
- }
- }
- d.Lock()
- if !self {
- d.neighIP = advertiseAddress
- }
- neighIP := d.neighIP
- d.Unlock()
- if d.serfInstance != nil && neighIP != "" {
- var err error
- d.joinOnce.Do(func() {
- err = d.serfJoin(neighIP)
- if err == nil {
- d.pushLocalDb()
- }
- })
- if err != nil {
- logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
- d.Lock()
- d.joinOnce = sync.Once{}
- d.Unlock()
- return
- }
- }
- }
- func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
- n := d.network(nid)
- if n == nil {
- logrus.Debugf("Error pushing local endpoint event for network %s", nid)
- return
- }
- ep := n.endpoint(eid)
- if ep == nil {
- logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
- return
- }
- if !d.isSerfAlive() {
- return
- }
- d.notifyCh <- ovNotify{
- action: "join",
- nw: n,
- ep: ep,
- }
- }
- // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
- func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
- var err error
- switch dType {
- case discoverapi.NodeDiscovery:
- nodeData, ok := data.(discoverapi.NodeDiscoveryData)
- if !ok || nodeData.Address == "" {
- return fmt.Errorf("invalid discovery data")
- }
- d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
- case discoverapi.DatastoreConfig:
- if d.store != nil {
- return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
- }
- dsc, ok := data.(discoverapi.DatastoreConfigData)
- if !ok {
- return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
- }
- d.store, err = datastore.NewDataStoreFromConfig(dsc)
- if err != nil {
- return types.InternalErrorf("failed to initialize data store: %v", err)
- }
- case discoverapi.EncryptionKeysConfig:
- encrData, ok := data.(discoverapi.DriverEncryptionConfig)
- if !ok {
- return fmt.Errorf("invalid encryption key notification data")
- }
- keys := make([]*key, 0, len(encrData.Keys))
- for i := 0; i < len(encrData.Keys); i++ {
- k := &key{
- value: encrData.Keys[i],
- tag: uint32(encrData.Tags[i]),
- }
- keys = append(keys, k)
- }
- if err := d.setKeys(keys); err != nil {
- logrus.Warn(err)
- }
- case discoverapi.EncryptionKeysUpdate:
- var newKey, delKey, priKey *key
- encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
- if !ok {
- return fmt.Errorf("invalid encryption key notification data")
- }
- if encrData.Key != nil {
- newKey = &key{
- value: encrData.Key,
- tag: uint32(encrData.Tag),
- }
- }
- if encrData.Primary != nil {
- priKey = &key{
- value: encrData.Primary,
- tag: uint32(encrData.PrimaryTag),
- }
- }
- if encrData.Prune != nil {
- delKey = &key{
- value: encrData.Prune,
- tag: uint32(encrData.PruneTag),
- }
- }
- if err := d.updateKeys(newKey, priKey, delKey); err != nil {
- logrus.Warn(err)
- }
- default:
- }
- return nil
- }
- // DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
- func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
- return nil
- }
|