|
@@ -44,6 +44,8 @@ type agent struct {
|
|
|
sync.Mutex
|
|
|
}
|
|
|
|
|
|
+const libnetworkEPTable = "endpoint_table"
|
|
|
+
|
|
|
func getBindAddr(ifaceName string) (string, error) {
|
|
|
iface, err := net.InterfaceByName(ifaceName)
|
|
|
if err != nil {
|
|
@@ -285,7 +287,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- ch, cancel := nDB.Watch("endpoint_table", "", "")
|
|
|
+ ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
|
|
|
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
|
|
|
|
|
|
c.Lock()
|
|
@@ -385,6 +387,111 @@ func (c *controller) agentClose() {
|
|
|
agent.networkDB.Close()
|
|
|
}
|
|
|
|
|
|
+// Task has the backend container details
|
|
|
+type Task struct {
|
|
|
+ Name string
|
|
|
+ EndpointID string
|
|
|
+ EndpointIP string
|
|
|
+ Info map[string]string
|
|
|
+}
|
|
|
+
|
|
|
+// ServiceInfo has service specific details along with the list of backend tasks
|
|
|
+type ServiceInfo struct {
|
|
|
+ VIP string
|
|
|
+ LocalLBIndex int
|
|
|
+ Tasks []Task
|
|
|
+ Ports []string
|
|
|
+}
|
|
|
+
|
|
|
+type epRecord struct {
|
|
|
+ ep EndpointRecord
|
|
|
+ info map[string]string
|
|
|
+ lbIndex int
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) Services() map[string]ServiceInfo {
|
|
|
+ eps := make(map[string]epRecord)
|
|
|
+
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ agent := n.getController().getAgent()
|
|
|
+ if agent == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
|
|
|
+ entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
|
|
|
+ for eid, value := range entries {
|
|
|
+ var epRec EndpointRecord
|
|
|
+ nid := n.ID()
|
|
|
+ if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
|
|
|
+ logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
|
|
|
+ eps[eid] = epRecord{
|
|
|
+ ep: epRec,
|
|
|
+ lbIndex: i,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Walk through the driver's tables, have the driver decode the entries
|
|
|
+ // and return the tuple {ep ID, value}. value is a string that coveys
|
|
|
+ // relevant info about the endpoint.
|
|
|
+ d, err := n.driver(true)
|
|
|
+ if err != nil {
|
|
|
+ logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ for _, table := range n.driverTables {
|
|
|
+ if table.objType != driverapi.EndpointObject {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
|
|
|
+ for key, value := range entries {
|
|
|
+ epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
|
|
|
+ if ep, ok := eps[epID]; !ok {
|
|
|
+ logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
|
|
|
+ } else {
|
|
|
+ ep.info = info
|
|
|
+ eps[epID] = ep
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // group the endpoints into a map keyed by the service name
|
|
|
+ sinfo := make(map[string]ServiceInfo)
|
|
|
+ for ep, epr := range eps {
|
|
|
+ var (
|
|
|
+ s ServiceInfo
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
+ if s, ok = sinfo[epr.ep.ServiceName]; !ok {
|
|
|
+ s = ServiceInfo{
|
|
|
+ VIP: epr.ep.VirtualIP,
|
|
|
+ LocalLBIndex: epr.lbIndex,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ports := []string{}
|
|
|
+ if s.Ports == nil {
|
|
|
+ for _, port := range epr.ep.IngressPorts {
|
|
|
+ p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
|
|
|
+ ports = append(ports, p)
|
|
|
+ }
|
|
|
+ s.Ports = ports
|
|
|
+ }
|
|
|
+ s.Tasks = append(s.Tasks, Task{
|
|
|
+ Name: epr.ep.Name,
|
|
|
+ EndpointID: ep,
|
|
|
+ EndpointIP: epr.ep.EndpointIP,
|
|
|
+ Info: epr.info,
|
|
|
+ })
|
|
|
+ sinfo[epr.ep.ServiceName] = s
|
|
|
+ }
|
|
|
+ return sinfo
|
|
|
+}
|
|
|
+
|
|
|
func (n *network) isClusterEligible() bool {
|
|
|
if n.driverScope() != datastore.GlobalScope {
|
|
|
return false
|
|
@@ -508,7 +615,7 @@ func (ep *endpoint) addServiceInfoToCluster() error {
|
|
|
}
|
|
|
|
|
|
if agent != nil {
|
|
|
- if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
|
|
|
+ if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
@@ -541,7 +648,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
|
|
}
|
|
|
|
|
|
if agent != nil {
|
|
|
- if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
|
|
|
+ if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
@@ -559,8 +666,8 @@ func (n *network) addDriverWatches() {
|
|
|
if agent == nil {
|
|
|
return
|
|
|
}
|
|
|
- for _, tableName := range n.driverTables {
|
|
|
- ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "")
|
|
|
+ for _, table := range n.driverTables {
|
|
|
+ ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
|
|
|
agent.Lock()
|
|
|
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
|
|
|
agent.Unlock()
|
|
@@ -571,9 +678,9 @@ func (n *network) addDriverWatches() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
|
|
|
+ agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
|
|
|
if nid == n.ID() {
|
|
|
- d.EventNotify(driverapi.Create, nid, tableName, key, value)
|
|
|
+ d.EventNotify(driverapi.Create, nid, table.name, key, value)
|
|
|
}
|
|
|
|
|
|
return false
|