[1.13.x] Cherry-pick SwarmKit update to 0af40501a9cc98cd3e9425d2e4246dd3eff5526e

This fix cherry-pick SwarmKit update to 0af40501a9cc98cd3e9425d2e4246dd3eff5526e
for branch 1.13.x. The following has been added:
- [docker/swarmkit#1909]: raft: Disable address change detection
- [docker/swarmkit#1910]: Fix issue in service update of published ports in host mode

This fix fixes #30199 in docker.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
Yong Tang 2017-01-30 15:22:48 -08:00
parent 2527cfcc49
commit 1e0179589d
5 changed files with 147 additions and 12 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 335561b66a44bf214224afe879e4368204e7fa45
github.com/docker/swarmkit 0af40501a9cc98cd3e9425d2e4246dd3eff5526e
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -371,12 +371,15 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
s := v.Service.Copy()
if nc.nwkAllocator.IsServiceAllocated(s) {
break
}
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) {
break
}
updatePortsInHostPublishMode(s)
} else {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
}
}
if _, err := a.store.Batch(func(batch *store.Batch) error {
@ -641,6 +644,36 @@ func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch,
return nil
}
// This function prepares the service object for being updated when the change regards
// the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
// to the current ingress mode runtime state ports plus the newly configured publish mode ports,
// so that the service allocation invoked on this new service object will trigger the deallocation
// of any old publish mode port and allocation of any new one.
func updatePortsInHostPublishMode(s *api.Service) {
if s.Endpoint != nil {
var portConfigs []*api.PortConfig
for _, portConfig := range s.Endpoint.Ports {
if portConfig.PublishMode == api.PublishModeIngress {
portConfigs = append(portConfigs, portConfig)
}
}
s.Endpoint.Ports = portConfigs
}
if s.Spec.Endpoint != nil {
if s.Endpoint == nil {
s.Endpoint = &api.Endpoint{}
}
for _, portConfig := range s.Spec.Endpoint.Ports {
if portConfig.PublishMode == api.PublishModeIngress {
continue
}
s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
}
}
func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
nc := a.netCtx

View file

@ -283,6 +283,12 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
return true
}
// PortsAllocatedInHostPublishMode returns if the passed service has its published ports in
// host (non ingress) mode allocated
func (na *NetworkAllocator) PortsAllocatedInHostPublishMode(s *api.Service) bool {
return na.portAllocator.portsAllocatedInHostPublishMode(s)
}
// IsServiceAllocated returns if the passed service has its network resources allocated or not.
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
// If endpoint mode is VIP and allocator does not have the

View file

@ -38,6 +38,71 @@ type portSpace struct {
dynamicPortSpace *idm.Idm
}
type allocatedPorts map[api.PortConfig]map[uint32]*api.PortConfig
// addState add the state of an allocated port to the collection.
// `allocatedPorts` is a map of portKey:publishedPort:portState.
// In case the value of the portKey is missing, the map
// publishedPort:portState is created automatically
func (ps allocatedPorts) addState(p *api.PortConfig) {
portKey := getPortConfigKey(p)
if _, ok := ps[portKey]; !ok {
ps[portKey] = make(map[uint32]*api.PortConfig)
}
ps[portKey][p.PublishedPort] = p
}
// delState delete the state of an allocated port from the collection.
// `allocatedPorts` is a map of portKey:publishedPort:portState.
//
// If publishedPort is non-zero, then it is user defined. We will try to
// remove the portState from `allocatedPorts` directly and return
// the portState (or nil if no portState exists)
//
// If publishedPort is zero, then it is dynamically allocated. We will try
// to remove the portState from `allocatedPorts`, as long as there is
// a portState associated with a non-zero publishedPort.
// Note multiple dynamically allocated ports might exists. In this case,
// we will remove only at a time so both allocated ports are tracked.
//
// Note because of the potential co-existence of user-defined and dynamically
// allocated ports, delState has to be called for user-defined port first.
// dynamically allocated ports should be removed later.
func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig {
portKey := getPortConfigKey(p)
portStateMap, ok := ps[portKey]
// If name, port, protocol values don't match then we
// are not allocated.
if !ok {
return nil
}
if p.PublishedPort != 0 {
// If SwarmPort was user defined but the port state
// SwarmPort doesn't match we are not allocated.
v := portStateMap[p.PublishedPort]
// Delete state from allocatedPorts
delete(portStateMap, p.PublishedPort)
return v
}
// If PublishedPort == 0 and we don't have non-zero port
// then we are not allocated
for publishedPort, v := range portStateMap {
if publishedPort != 0 {
// Delete state from allocatedPorts
delete(portStateMap, publishedPort)
return v
}
}
return nil
}
func newPortAllocator() (*portAllocator, error) {
portSpaces := make(map[api.PortConfig_Protocol]*portSpace)
for _, protocol := range []api.PortConfig_Protocol{api.ProtocolTCP, api.ProtocolUDP} {
@ -191,6 +256,33 @@ func (pa *portAllocator) serviceDeallocatePorts(s *api.Service) {
s.Endpoint.Ports = nil
}
func (pa *portAllocator) portsAllocatedInHostPublishMode(s *api.Service) bool {
if s.Endpoint == nil && s.Spec.Endpoint == nil {
return true
}
portStates := allocatedPorts{}
if s.Endpoint != nil {
for _, portState := range s.Endpoint.Ports {
if portState.PublishMode == api.PublishModeHost {
portStates.addState(portState)
}
}
}
if s.Spec.Endpoint != nil {
for _, portConfig := range s.Spec.Endpoint.Ports {
if portConfig.PublishMode == api.PublishModeHost {
if portStates.delState(portConfig) == nil {
return false
}
}
}
}
return true
}
func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
// If service has no user-defined endpoint and allocated endpoint,
// we assume it is allocated and return true.

View file

@ -1419,11 +1419,15 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
officialHost, officialPort, _ := net.SplitHostPort(conn.Addr)
if officialHost != lastSeenHost {
reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort)
log.G(ctx).Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr)
if err := n.handleAddressChange(ctx, conn, reconnectAddr); err != nil {
log.G(ctx).WithError(err).Error("failed to hande address change")
}
return
log.G(ctx).Debugf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr)
// TODO(aaronl): Address changes are temporarily disabled.
// See https://github.com/docker/docker/issues/30455.
// This should be reenabled in the future with additional
// safeguards (perhaps storing multiple addresses per node).
//if err := n.handleAddressChange(ctx, conn, reconnectAddr); err != nil {
// log.G(ctx).WithError(err).Error("failed to hande address change")
//}
//return
}
}