Vendoring Libnetwork caf22bd9a6a53dfe91b0266274155bc69235e8ed
* fixes https://github.com/docker/docker/issues/23622 * fixes a memory leak issue with bulk sync * fixes external DNS resolution issue after live restore Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
parent
312749435b
commit
7f2f6ed0d6
9 changed files with 120 additions and 60 deletions
|
@ -65,7 +65,7 @@ clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837
|
|||
clone git github.com/imdario/mergo 0.2.1
|
||||
|
||||
#get libnetwork packages
|
||||
clone git github.com/docker/libnetwork 96d45528599c32354230480a1ebc0657cd4d077f
|
||||
clone git github.com/docker/libnetwork caf22bd9a6a53dfe91b0266274155bc69235e8ed
|
||||
clone git github.com/docker/go-events 39718a26497694185f8fb58a7d6f31947f3dc42d
|
||||
clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
|
||||
clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
|
||||
|
|
|
@ -810,12 +810,13 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s
|
|||
// Create sandbox and process options first. Key generation depends on an option
|
||||
if sb == nil {
|
||||
sb = &sandbox{
|
||||
id: stringid.GenerateRandomID(),
|
||||
containerID: containerID,
|
||||
endpoints: epHeap{},
|
||||
epPriority: map[string]int{},
|
||||
config: containerConfig{},
|
||||
controller: c,
|
||||
id: stringid.GenerateRandomID(),
|
||||
containerID: containerID,
|
||||
endpoints: epHeap{},
|
||||
epPriority: map[string]int{},
|
||||
populatedEndpoints: map[string]struct{}{},
|
||||
config: containerConfig{},
|
||||
controller: c,
|
||||
}
|
||||
}
|
||||
sBox = sb
|
||||
|
|
|
@ -588,13 +588,26 @@ func (d *driver) createNetwork(config *networkConfiguration) error {
|
|||
defer osl.InitOSContext()()
|
||||
|
||||
networkList := d.getNetworks()
|
||||
for _, nw := range networkList {
|
||||
for i, nw := range networkList {
|
||||
nw.Lock()
|
||||
nwConfig := nw.config
|
||||
nw.Unlock()
|
||||
if err := nwConfig.Conflicts(config); err != nil {
|
||||
return types.ForbiddenErrorf("cannot create network %s (%s): conflicts with network %s (%s): %s",
|
||||
config.ID, config.BridgeName, nwConfig.ID, nwConfig.BridgeName, err.Error())
|
||||
if config.DefaultBridge {
|
||||
// We encountered and identified a stale default network
|
||||
// We must delete it as libnetwork is the source of thruth
|
||||
// The default network being created must be the only one
|
||||
// This can happen only from docker 1.12 on ward
|
||||
logrus.Infof("Removing stale default bridge network %s (%s)", nwConfig.ID, nwConfig.BridgeName)
|
||||
if err := d.DeleteNetwork(nwConfig.ID); err != nil {
|
||||
logrus.Warnf("Failed to remove stale default network: %s (%s): %v. Will remove from store.", nwConfig.ID, nwConfig.BridgeName, err)
|
||||
d.storeDelete(nwConfig)
|
||||
}
|
||||
networkList = append(networkList[:i], networkList[i+1:]...)
|
||||
} else {
|
||||
return types.ForbiddenErrorf("cannot create network %s (%s): conflicts with network %s (%s): %s",
|
||||
config.ID, config.BridgeName, nwConfig.ID, nwConfig.BridgeName, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -762,12 +775,6 @@ func (d *driver) DeleteNetwork(nid string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Cannot remove network if endpoints are still present
|
||||
if len(n.endpoints) != 0 {
|
||||
err = ActiveEndpointsError(n.id)
|
||||
return err
|
||||
}
|
||||
|
||||
// We only delete the bridge when it's not the default bridge. This is keep the backward compatible behavior.
|
||||
if !config.DefaultBridge {
|
||||
if err := d.nlh.LinkDel(n.bridge.Link); err != nil {
|
||||
|
|
|
@ -330,11 +330,15 @@ func (nDB *NetworkDB) bulkSyncTables() {
|
|||
// successfully completed bulk sync in this iteration.
|
||||
updatedNetworks := make([]string, 0, len(networks))
|
||||
for _, nid := range networks {
|
||||
var found bool
|
||||
for _, completedNid := range completed {
|
||||
if nid == completedNid {
|
||||
continue
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
updatedNetworks = append(updatedNetworks, nid)
|
||||
}
|
||||
}
|
||||
|
@ -449,8 +453,9 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
|||
// Wait on a response only if it is unsolicited.
|
||||
if unsolicited {
|
||||
startTime := time.Now()
|
||||
t := time.NewTimer(30 * time.Second)
|
||||
select {
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-t.C:
|
||||
logrus.Errorf("Bulk sync to node %s timed out", node)
|
||||
case <-ch:
|
||||
nDB.Lock()
|
||||
|
@ -459,6 +464,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
|||
|
||||
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
|
||||
}
|
||||
t.Stop()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -68,23 +68,24 @@ func (sb *sandbox) processOptions(options ...SandboxOption) {
|
|||
type epHeap []*endpoint
|
||||
|
||||
type sandbox struct {
|
||||
id string
|
||||
containerID string
|
||||
config containerConfig
|
||||
extDNS []string
|
||||
osSbox osl.Sandbox
|
||||
controller *controller
|
||||
resolver Resolver
|
||||
resolverOnce sync.Once
|
||||
refCnt int
|
||||
endpoints epHeap
|
||||
epPriority map[string]int
|
||||
joinLeaveDone chan struct{}
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
isStub bool
|
||||
inDelete bool
|
||||
ingress bool
|
||||
id string
|
||||
containerID string
|
||||
config containerConfig
|
||||
extDNS []string
|
||||
osSbox osl.Sandbox
|
||||
controller *controller
|
||||
resolver Resolver
|
||||
resolverOnce sync.Once
|
||||
refCnt int
|
||||
endpoints epHeap
|
||||
epPriority map[string]int
|
||||
populatedEndpoints map[string]struct{}
|
||||
joinLeaveDone chan struct{}
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
isStub bool
|
||||
inDelete bool
|
||||
ingress bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -728,7 +729,7 @@ func (sb *sandbox) restoreOslSandbox() error {
|
|||
}
|
||||
}
|
||||
if ep.needResolver() {
|
||||
sb.startResolver()
|
||||
sb.startResolver(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -761,7 +762,7 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
|
|||
ep.Unlock()
|
||||
|
||||
if ep.needResolver() {
|
||||
sb.startResolver()
|
||||
sb.startResolver(false)
|
||||
}
|
||||
|
||||
if i != nil && i.srcName != "" {
|
||||
|
@ -798,6 +799,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Make sure to add the endpoint to the populated endpoint set
|
||||
// before populating loadbalancers.
|
||||
sb.Lock()
|
||||
sb.populatedEndpoints[ep.ID()] = struct{}{}
|
||||
sb.Unlock()
|
||||
|
||||
// Populate load balancer only after updating all the other
|
||||
// information including gateway and other routes so that
|
||||
// loadbalancers are populated all the network state is in
|
||||
|
@ -830,6 +837,7 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
|
|||
releaseOSSboxResources(osSbox, ep)
|
||||
}
|
||||
|
||||
delete(sb.populatedEndpoints, ep.ID())
|
||||
sb.Lock()
|
||||
if len(sb.endpoints) == 0 {
|
||||
// sb.endpoints should never be empty and this is unexpected error condition
|
||||
|
@ -879,6 +887,13 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (sb *sandbox) isEndpointPopulated(ep *endpoint) bool {
|
||||
sb.Lock()
|
||||
_, ok := sb.populatedEndpoints[ep.ID()]
|
||||
sb.Unlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// joinLeaveStart waits to ensure there are no joins or leaves in progress and
|
||||
// marks this join/leave in progress without race
|
||||
func (sb *sandbox) joinLeaveStart() {
|
||||
|
|
|
@ -21,7 +21,7 @@ const (
|
|||
filePerm = 0644
|
||||
)
|
||||
|
||||
func (sb *sandbox) startResolver() {
|
||||
func (sb *sandbox) startResolver(restore bool) {
|
||||
sb.resolverOnce.Do(func() {
|
||||
var err error
|
||||
sb.resolver = NewResolver(sb)
|
||||
|
@ -31,10 +31,16 @@ func (sb *sandbox) startResolver() {
|
|||
}
|
||||
}()
|
||||
|
||||
err = sb.rebuildDNS()
|
||||
if err != nil {
|
||||
log.Errorf("Updating resolv.conf failed for container %s, %q", sb.ContainerID(), err)
|
||||
return
|
||||
// In the case of live restore container is already running with
|
||||
// right resolv.conf contents created before. Just update the
|
||||
// external DNS servers from the restored sandbox for embedded
|
||||
// server to use.
|
||||
if !restore {
|
||||
err = sb.rebuildDNS()
|
||||
if err != nil {
|
||||
log.Errorf("Updating resolv.conf failed for container %s, %q", sb.ContainerID(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
sb.resolver.SetExtServers(sb.extDNS)
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
|
||||
// Stub implementations for DNS related functions
|
||||
|
||||
func (sb *sandbox) startResolver() {
|
||||
func (sb *sandbox) startResolver(bool) {
|
||||
}
|
||||
|
||||
func (sb *sandbox) setupResolutionFiles() error {
|
||||
|
|
|
@ -27,6 +27,7 @@ type sbState struct {
|
|||
dbExists bool
|
||||
Eps []epState
|
||||
EpPriority map[string]int
|
||||
ExtDNS []string
|
||||
}
|
||||
|
||||
func (sbs *sbState) Key() []string {
|
||||
|
@ -113,6 +114,10 @@ func (sbs *sbState) CopyTo(o datastore.KVObject) error {
|
|||
dstSbs.Eps = append(dstSbs.Eps, eps)
|
||||
}
|
||||
|
||||
for _, dns := range sbs.ExtDNS {
|
||||
dstSbs.ExtDNS = append(dstSbs.ExtDNS, dns)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -126,6 +131,7 @@ func (sb *sandbox) storeUpdate() error {
|
|||
ID: sb.id,
|
||||
Cid: sb.containerID,
|
||||
EpPriority: sb.epPriority,
|
||||
ExtDNS: sb.extDNS,
|
||||
}
|
||||
|
||||
retry:
|
||||
|
@ -191,13 +197,15 @@ func (c *controller) sandboxCleanup(activeSandboxes map[string]interface{}) {
|
|||
sbs := kvo.(*sbState)
|
||||
|
||||
sb := &sandbox{
|
||||
id: sbs.ID,
|
||||
controller: sbs.c,
|
||||
containerID: sbs.Cid,
|
||||
endpoints: epHeap{},
|
||||
dbIndex: sbs.dbIndex,
|
||||
isStub: true,
|
||||
dbExists: true,
|
||||
id: sbs.ID,
|
||||
controller: sbs.c,
|
||||
containerID: sbs.Cid,
|
||||
endpoints: epHeap{},
|
||||
populatedEndpoints: map[string]struct{}{},
|
||||
dbIndex: sbs.dbIndex,
|
||||
isStub: true,
|
||||
dbExists: true,
|
||||
extDNS: sbs.ExtDNS,
|
||||
}
|
||||
|
||||
msg := " for cleanup"
|
||||
|
|
|
@ -184,14 +184,20 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
|
|||
func (n *network) connectedLoadbalancers() []*loadBalancer {
|
||||
c := n.getController()
|
||||
|
||||
serviceBindings := make([]*service, 0, len(c.serviceBindings))
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, s := range c.serviceBindings {
|
||||
serviceBindings = append(serviceBindings, s)
|
||||
}
|
||||
c.Unlock()
|
||||
|
||||
var lbs []*loadBalancer
|
||||
for _, s := range c.serviceBindings {
|
||||
for _, s := range serviceBindings {
|
||||
s.Lock()
|
||||
if lb, ok := s.loadBalancers[n.ID()]; ok {
|
||||
lbs = append(lbs, lb)
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
return lbs
|
||||
|
@ -229,12 +235,14 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
|
|||
continue
|
||||
}
|
||||
|
||||
lb.service.Lock()
|
||||
addService := true
|
||||
for _, ip := range lb.backEnds {
|
||||
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
|
||||
eIP, gwIP, addService)
|
||||
addService = false
|
||||
}
|
||||
lb.service.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,6 +253,10 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
n.WalkEndpoints(func(e Endpoint) bool {
|
||||
ep := e.(*endpoint)
|
||||
if sb, ok := ep.getSandbox(); ok {
|
||||
if !sb.isEndpointPopulated(ep) {
|
||||
return false
|
||||
}
|
||||
|
||||
var gwIP net.IP
|
||||
if ep := sb.getGatewayEndpoint(); ep != nil {
|
||||
gwIP = ep.Iface().Address().IP
|
||||
|
@ -264,6 +276,10 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por
|
|||
n.WalkEndpoints(func(e Endpoint) bool {
|
||||
ep := e.(*endpoint)
|
||||
if sb, ok := ep.getSandbox(); ok {
|
||||
if !sb.isEndpointPopulated(ep) {
|
||||
return false
|
||||
}
|
||||
|
||||
var gwIP net.IP
|
||||
if ep := sb.getGatewayEndpoint(); ep != nil {
|
||||
gwIP = ep.Iface().Address().IP
|
||||
|
@ -356,15 +372,13 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
}
|
||||
|
||||
if err := i.DelDestination(s, d); err != nil {
|
||||
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
|
||||
return
|
||||
logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
|
||||
}
|
||||
|
||||
if rmService {
|
||||
s.SchedName = ipvs.RoundRobin
|
||||
if err := i.DelService(s); err != nil {
|
||||
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
|
||||
return
|
||||
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
|
||||
}
|
||||
|
||||
var iPorts []*PortConfig
|
||||
|
@ -372,13 +386,11 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
iPorts = ingressPorts
|
||||
if err := programIngress(gwIP, iPorts, true); err != nil {
|
||||
logrus.Errorf("Failed to delete ingress: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
|
||||
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -454,12 +466,17 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
|
|||
rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
|
||||
addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
|
||||
if err := iptables.RawCombinedOutput(rule...); err != nil {
|
||||
return fmt.Errorf("setting up rule failed, %v: %v", rule, err)
|
||||
errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
|
||||
if !isDelete {
|
||||
return fmt.Errorf("%s", errStr)
|
||||
}
|
||||
|
||||
logrus.Infof("%s", errStr)
|
||||
}
|
||||
}
|
||||
|
||||
if err := plumbProxy(iPort, isDelete); err != nil {
|
||||
return fmt.Errorf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
|
||||
logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue