123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- package libnetwork
- //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
- import (
- "fmt"
- "net"
- "os"
- "github.com/Sirupsen/logrus"
- "github.com/docker/go-events"
- "github.com/docker/libnetwork/datastore"
- "github.com/docker/libnetwork/discoverapi"
- "github.com/docker/libnetwork/driverapi"
- "github.com/docker/libnetwork/networkdb"
- "github.com/gogo/protobuf/proto"
- )
- type agent struct {
- networkDB *networkdb.NetworkDB
- bindAddr string
- epTblCancel func()
- driverCancelFuncs map[string][]func()
- }
- func getBindAddr(ifaceName string) (string, error) {
- iface, err := net.InterfaceByName(ifaceName)
- if err != nil {
- return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
- }
- addrs, err := iface.Addrs()
- if err != nil {
- return "", fmt.Errorf("failed to get interface addresses: %v", err)
- }
- for _, a := range addrs {
- addr, ok := a.(*net.IPNet)
- if !ok {
- continue
- }
- addrIP := addr.IP
- if addrIP.IsLinkLocalUnicast() {
- continue
- }
- return addrIP.String(), nil
- }
- return "", fmt.Errorf("failed to get bind address")
- }
- func resolveAddr(addrOrInterface string) (string, error) {
- // Try and see if this is a valid IP address
- if net.ParseIP(addrOrInterface) != nil {
- return addrOrInterface, nil
- }
- // If not a valid IP address, it should be a valid interface
- return getBindAddr(addrOrInterface)
- }
- func (c *controller) agentInit(bindAddrOrInterface string) error {
- if !c.cfg.Daemon.IsAgent {
- return nil
- }
- bindAddr, err := resolveAddr(bindAddrOrInterface)
- if err != nil {
- return err
- }
- hostname, _ := os.Hostname()
- nDB, err := networkdb.New(&networkdb.Config{
- BindAddr: bindAddr,
- NodeName: hostname,
- })
- if err != nil {
- return err
- }
- ch, cancel := nDB.Watch("endpoint_table", "", "")
- c.agent = &agent{
- networkDB: nDB,
- bindAddr: bindAddr,
- epTblCancel: cancel,
- driverCancelFuncs: make(map[string][]func()),
- }
- go c.handleTableEvents(ch, c.handleEpTableEvent)
- return nil
- }
- func (c *controller) agentJoin(remotes []string) error {
- if c.agent == nil {
- return nil
- }
- return c.agent.networkDB.Join(remotes)
- }
- func (c *controller) agentDriverNotify(d driverapi.Driver) {
- if c.agent == nil {
- return
- }
- d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
- Address: c.agent.bindAddr,
- Self: true,
- })
- }
- func (c *controller) agentClose() {
- if c.agent == nil {
- return
- }
- for _, cancelFuncs := range c.agent.driverCancelFuncs {
- for _, cancel := range cancelFuncs {
- cancel()
- }
- }
- c.agent.epTblCancel()
- c.agent.networkDB.Close()
- }
- func (n *network) isClusterEligible() bool {
- if n.driverScope() != datastore.GlobalScope {
- return false
- }
- c := n.getController()
- if c.agent == nil {
- return false
- }
- return true
- }
- func (n *network) joinCluster() error {
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- return c.agent.networkDB.JoinNetwork(n.ID())
- }
- func (n *network) leaveCluster() error {
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- return c.agent.networkDB.LeaveNetwork(n.ID())
- }
- func (ep *endpoint) addToCluster() error {
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- if !ep.isAnonymous() && ep.Iface().Address() != nil {
- if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
- return err
- }
- buf, err := proto.Marshal(&EndpointRecord{
- Name: ep.Name(),
- ServiceName: ep.svcName,
- ServiceID: ep.svcID,
- EndpointIP: ep.Iface().Address().IP.String(),
- })
- if err != nil {
- return err
- }
- if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
- return err
- }
- }
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *endpoint) deleteFromCluster() error {
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- if !ep.isAnonymous() {
- if ep.Iface().Address() != nil {
- if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
- return err
- }
- }
- if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
- return err
- }
- }
- if ep.joinInfo == nil {
- return nil
- }
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
- return err
- }
- }
- return nil
- }
- func (n *network) addDriverWatches() {
- if !n.isClusterEligible() {
- return
- }
- c := n.getController()
- for _, tableName := range n.driverTables {
- ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
- c.Lock()
- c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
- c.Unlock()
- go c.handleTableEvents(ch, n.handleDriverTableEvent)
- d, err := n.driver(false)
- if err != nil {
- logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
- return
- }
- c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
- d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
- return false
- })
- }
- }
- func (n *network) cancelDriverWatches() {
- if !n.isClusterEligible() {
- return
- }
- c := n.getController()
- c.Lock()
- cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
- delete(c.agent.driverCancelFuncs, n.ID())
- c.Unlock()
- for _, cancel := range cancelFuncs {
- cancel()
- }
- }
- func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
- for {
- select {
- case ev, ok := <-ch:
- if !ok {
- return
- }
- fn(ev)
- }
- }
- }
- func (n *network) handleDriverTableEvent(ev events.Event) {
- d, err := n.driver(false)
- if err != nil {
- logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
- return
- }
- var (
- etype driverapi.EventType
- tname string
- key string
- value []byte
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Create
- case networkdb.DeleteEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- case networkdb.UpdateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- }
- d.EventNotify(etype, n.ID(), tname, key, value)
- }
- func (c *controller) handleEpTableEvent(ev events.Event) {
- var (
- nid string
- eid string
- value []byte
- isAdd bool
- epRec EndpointRecord
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- isAdd = true
- case networkdb.DeleteEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- case networkdb.UpdateEvent:
- logrus.Errorf("Unexpected update service table event = %#v", event)
- }
- nw, err := c.NetworkByID(nid)
- if err != nil {
- logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
- return
- }
- n := nw.(*network)
- err = proto.Unmarshal(value, &epRec)
- if err != nil {
- logrus.Errorf("Failed to unmarshal service table value: %v", err)
- return
- }
- name := epRec.Name
- svcName := epRec.ServiceName
- svcID := epRec.ServiceID
- ip := net.ParseIP(epRec.EndpointIP)
- if name == "" || ip == nil {
- logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
- return
- }
- if isAdd {
- if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
- return
- }
- n.addSvcRecords(name, ip, nil, true)
- } else {
- if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
- return
- }
- n.deleteSvcRecords(name, ip, nil, true)
- }
- }
|