Use controller methods for handling the encyrption keys from agent
instead of the Provider interface methods. Signed-off-by: Santhosh Manohar <santhosh@docker.com>
This commit is contained in:
parent
b85caa0cfd
commit
c4d5bbad7a
3 changed files with 106 additions and 80 deletions
|
@ -31,7 +31,6 @@ type agent struct {
|
|||
bindAddr string
|
||||
epTblCancel func()
|
||||
driverCancelFuncs map[string][]func()
|
||||
keys []*types.EncryptionKey
|
||||
}
|
||||
|
||||
func getBindAddr(ifaceName string) (string, error) {
|
||||
|
@ -72,18 +71,18 @@ func resolveAddr(addrOrInterface string) (string, error) {
|
|||
return getBindAddr(addrOrInterface)
|
||||
}
|
||||
|
||||
func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
|
||||
func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
|
||||
// Find the new key and add it to the key ring
|
||||
a := c.agent
|
||||
for _, key := range keys {
|
||||
same := false
|
||||
for _, aKey := range a.keys {
|
||||
if same = aKey.LamportTime == key.LamportTime; same {
|
||||
for _, cKey := range c.keys {
|
||||
if same = cKey.LamportTime == key.LamportTime; same {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !same {
|
||||
a.keys = append(a.keys, key)
|
||||
c.keys = append(c.keys, key)
|
||||
if key.Subsystem == "networking:gossip" {
|
||||
a.networkDB.SetKey(key.Key)
|
||||
}
|
||||
|
@ -93,24 +92,24 @@ func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
|
|||
// Find the deleted key. If the deleted key was the primary key,
|
||||
// a new primary key should be set before removing if from keyring.
|
||||
deleted := []byte{}
|
||||
for i, aKey := range a.keys {
|
||||
for i, cKey := range c.keys {
|
||||
same := false
|
||||
for _, key := range keys {
|
||||
if same = key.LamportTime == aKey.LamportTime; same {
|
||||
if same = key.LamportTime == cKey.LamportTime; same {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !same {
|
||||
if aKey.Subsystem == "networking:gossip" {
|
||||
deleted = aKey.Key
|
||||
if cKey.Subsystem == "networking:gossip" {
|
||||
deleted = cKey.Key
|
||||
}
|
||||
a.keys = append(a.keys[:i], a.keys[i+1:]...)
|
||||
c.keys = append(c.keys[:i], c.keys[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(ByTime(a.keys))
|
||||
for _, key := range a.keys {
|
||||
sort.Sort(ByTime(c.keys))
|
||||
for _, key := range c.keys {
|
||||
if key.Subsystem == "networking:gossip" {
|
||||
a.networkDB.SetPrimaryKey(key.Key)
|
||||
break
|
||||
|
@ -122,16 +121,60 @@ func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) agentInit(bindAddrOrInterface string, keys []*types.EncryptionKey) error {
|
||||
func (c *controller) agentSetup() error {
|
||||
clusterProvider := c.cfg.Daemon.ClusterProvider
|
||||
|
||||
bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress())
|
||||
remote := clusterProvider.GetRemoteAddress()
|
||||
remoteAddr, _, _ := net.SplitHostPort(remote)
|
||||
|
||||
// Determine the BindAddress from RemoteAddress or through best-effort routing
|
||||
if !isValidClusteringIP(bindAddr) {
|
||||
if !isValidClusteringIP(remoteAddr) {
|
||||
remote = "8.8.8.8:53"
|
||||
}
|
||||
conn, err := net.Dial("udp", remote)
|
||||
if err == nil {
|
||||
bindHostPort := conn.LocalAddr().String()
|
||||
bindAddr, _, _ = net.SplitHostPort(bindHostPort)
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if bindAddr != "" && c.agent == nil {
|
||||
if err := c.agentInit(bindAddr); err != nil {
|
||||
logrus.Errorf("Error in agentInit : %v", err)
|
||||
} else {
|
||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||
if capability.DataScope == datastore.GlobalScope {
|
||||
c.agentDriverNotify(driver)
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if c.agent != nil {
|
||||
close(c.agentInitDone)
|
||||
}
|
||||
}
|
||||
}
|
||||
if remoteAddr != "" {
|
||||
if err := c.agentJoin(remoteAddr); err != nil {
|
||||
logrus.Errorf("Error in agentJoin : %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) agentInit(bindAddrOrInterface string) error {
|
||||
if !c.isAgent() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// sort the keys by lamport time
|
||||
sort.Sort(ByTime(keys))
|
||||
sort.Sort(ByTime(c.keys))
|
||||
|
||||
gossipkey := [][]byte{}
|
||||
for _, key := range keys {
|
||||
for _, key := range c.keys {
|
||||
if key.Subsystem == "networking:gossip" {
|
||||
gossipkey = append(gossipkey, key.Key)
|
||||
}
|
||||
|
@ -160,7 +203,6 @@ func (c *controller) agentInit(bindAddrOrInterface string, keys []*types.Encrypt
|
|||
bindAddr: bindAddr,
|
||||
epTblCancel: cancel,
|
||||
driverCancelFuncs: make(map[string][]func()),
|
||||
keys: keys,
|
||||
}
|
||||
|
||||
go c.handleTableEvents(ch, c.handleEpTableEvent)
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package cluster
|
||||
|
||||
import "github.com/docker/libnetwork/types"
|
||||
|
||||
// Provider provides clustering config details
|
||||
type Provider interface {
|
||||
IsManager() bool
|
||||
|
@ -9,6 +7,4 @@ type Provider interface {
|
|||
GetListenAddress() string
|
||||
GetRemoteAddress() string
|
||||
ListenClusterEvents() <-chan struct{}
|
||||
GetNetworkKeys() []*types.EncryptionKey
|
||||
SetNetworkKeys([]*types.EncryptionKey)
|
||||
}
|
||||
|
|
|
@ -117,6 +117,9 @@ type NetworkController interface {
|
|||
|
||||
// Wait for agent initialization complete in libnetwork controller
|
||||
AgentInitWait()
|
||||
|
||||
// SetKeys configures the encryption key for gossip and overlay data path
|
||||
SetKeys(keys []*types.EncryptionKey) error
|
||||
}
|
||||
|
||||
// NetworkWalker is a client provided function which will be used to walk the Networks.
|
||||
|
@ -130,23 +133,25 @@ type SandboxWalker func(sb Sandbox) bool
|
|||
type sandboxTable map[string]*sandbox
|
||||
|
||||
type controller struct {
|
||||
id string
|
||||
drvRegistry *drvregistry.DrvRegistry
|
||||
sandboxes sandboxTable
|
||||
cfg *config.Config
|
||||
stores []datastore.DataStore
|
||||
discovery hostdiscovery.HostDiscovery
|
||||
extKeyListener net.Listener
|
||||
watchCh chan *endpoint
|
||||
unWatchCh chan *endpoint
|
||||
svcRecords map[string]svcInfo
|
||||
nmap map[string]*netWatch
|
||||
serviceBindings map[string]*service
|
||||
defOsSbox osl.Sandbox
|
||||
ingressSandbox *sandbox
|
||||
sboxOnce sync.Once
|
||||
agent *agent
|
||||
agentInitDone chan struct{}
|
||||
id string
|
||||
drvRegistry *drvregistry.DrvRegistry
|
||||
sandboxes sandboxTable
|
||||
cfg *config.Config
|
||||
stores []datastore.DataStore
|
||||
discovery hostdiscovery.HostDiscovery
|
||||
extKeyListener net.Listener
|
||||
watchCh chan *endpoint
|
||||
unWatchCh chan *endpoint
|
||||
svcRecords map[string]svcInfo
|
||||
nmap map[string]*netWatch
|
||||
serviceBindings map[string]*service
|
||||
defOsSbox osl.Sandbox
|
||||
ingressSandbox *sandbox
|
||||
sboxOnce sync.Once
|
||||
agent *agent
|
||||
agentInitDone chan struct{}
|
||||
keys []*types.EncryptionKey
|
||||
clusterConfigAvailable bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -220,55 +225,38 @@ func isValidClusteringIP(addr string) bool {
|
|||
return addr != "" && !net.ParseIP(addr).IsLoopback() && !net.ParseIP(addr).IsUnspecified()
|
||||
}
|
||||
|
||||
// libnetwork side of agent depends on the keys. On the first receipt of
|
||||
// keys setup the agent. For subsequent key set handle the key change
|
||||
func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
|
||||
if len(c.keys) == 0 {
|
||||
c.keys = keys
|
||||
if c.agent != nil {
|
||||
return (fmt.Errorf("libnetwork agent setup without keys"))
|
||||
}
|
||||
if c.clusterConfigAvailable {
|
||||
return c.agentSetup()
|
||||
}
|
||||
log.Debugf("received encryption keys before cluster config")
|
||||
return nil
|
||||
}
|
||||
if c.agent == nil {
|
||||
c.keys = keys
|
||||
return nil
|
||||
}
|
||||
return c.handleKeyChange(keys)
|
||||
}
|
||||
|
||||
func (c *controller) clusterAgentInit() {
|
||||
clusterProvider := c.cfg.Daemon.ClusterProvider
|
||||
for {
|
||||
select {
|
||||
case <-clusterProvider.ListenClusterEvents():
|
||||
c.clusterConfigAvailable = true
|
||||
if !c.isDistributedControl() {
|
||||
keys := clusterProvider.GetNetworkKeys()
|
||||
// If the agent is already setup this could be a key change notificaiton
|
||||
if c.agent != nil {
|
||||
c.agentHandleKeys(keys)
|
||||
}
|
||||
|
||||
bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress())
|
||||
remote := clusterProvider.GetRemoteAddress()
|
||||
remoteAddr, _, _ := net.SplitHostPort(remote)
|
||||
|
||||
// Determine the BindAddress from RemoteAddress or through best-effort routing
|
||||
if !isValidClusteringIP(bindAddr) {
|
||||
if !isValidClusteringIP(remoteAddr) {
|
||||
remote = "8.8.8.8:53"
|
||||
}
|
||||
conn, err := net.Dial("udp", remote)
|
||||
if err == nil {
|
||||
bindHostPort := conn.LocalAddr().String()
|
||||
bindAddr, _, _ = net.SplitHostPort(bindHostPort)
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if bindAddr != "" && len(keys) > 0 && c.agent == nil {
|
||||
if err := c.agentInit(bindAddr, keys); err != nil {
|
||||
log.Errorf("Error in agentInit : %v", err)
|
||||
} else {
|
||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||
if capability.DataScope == datastore.GlobalScope {
|
||||
c.agentDriverNotify(driver)
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if c.agent != nil {
|
||||
close(c.agentInitDone)
|
||||
}
|
||||
}
|
||||
}
|
||||
if remoteAddr != "" {
|
||||
if err := c.agentJoin(remoteAddr); err != nil {
|
||||
log.Errorf("Error in agentJoin : %v", err)
|
||||
}
|
||||
// agent initialization needs encyrption keys and bind/remote IP which
|
||||
// comes from the daemon cluster events
|
||||
if len(c.keys) > 0 {
|
||||
c.agentSetup()
|
||||
}
|
||||
} else {
|
||||
c.agentInitDone = make(chan struct{})
|
||||
|
|
Loading…
Reference in a new issue