|
@@ -6,6 +6,7 @@ import (
|
|
"runtime"
|
|
"runtime"
|
|
"sort"
|
|
"sort"
|
|
"strings"
|
|
"strings"
|
|
|
|
+ "sync"
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/Sirupsen/logrus"
|
|
apierrors "github.com/docker/docker/api/errors"
|
|
apierrors "github.com/docker/docker/api/errors"
|
|
@@ -99,15 +100,40 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
|
|
return daemon.netController.Networks()
|
|
return daemon.netController.Networks()
|
|
}
|
|
}
|
|
|
|
|
|
-func isIngressNetwork(name string) bool {
|
|
|
|
- return name == "ingress"
|
|
|
|
|
|
+type ingressJob struct {
|
|
|
|
+ create *clustertypes.NetworkCreateRequest
|
|
|
|
+ ip net.IP
|
|
}
|
|
}
|
|
|
|
|
|
-var ingressChan = make(chan struct{}, 1)
|
|
|
|
|
|
+var (
|
|
|
|
+ ingressWorkerOnce sync.Once
|
|
|
|
+ ingressJobsChannel chan *ingressJob
|
|
|
|
+ ingressID string
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+func (daemon *Daemon) startIngressWorker() {
|
|
|
|
+ ingressJobsChannel = make(chan *ingressJob, 100)
|
|
|
|
+ go func() {
|
|
|
|
+ for {
|
|
|
|
+ select {
|
|
|
|
+ case r := <-ingressJobsChannel:
|
|
|
|
+ if r.create != nil {
|
|
|
|
+ daemon.setupIngress(r.create, r.ip, ingressID)
|
|
|
|
+ ingressID = r.create.ID
|
|
|
|
+ } else {
|
|
|
|
+ daemon.releaseIngress(ingressID)
|
|
|
|
+ ingressID = ""
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+}
|
|
|
|
|
|
-func ingressWait() func() {
|
|
|
|
- ingressChan <- struct{}{}
|
|
|
|
- return func() { <-ingressChan }
|
|
|
|
|
|
+// enqueueIngressJob adds a ingress add/rm request to the worker queue.
|
|
|
|
+// It guarantees the worker is started.
|
|
|
|
+func (daemon *Daemon) enqueueIngressJob(job *ingressJob) {
|
|
|
|
+ ingressWorkerOnce.Do(daemon.startIngressWorker)
|
|
|
|
+ ingressJobsChannel <- job
|
|
}
|
|
}
|
|
|
|
|
|
// SetupIngress setups ingress networking.
|
|
// SetupIngress setups ingress networking.
|
|
@@ -116,72 +142,93 @@ func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nod
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
+ daemon.enqueueIngressJob(&ingressJob{&create, ip})
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
|
|
- go func() {
|
|
|
|
- controller := daemon.netController
|
|
|
|
- controller.AgentInitWait()
|
|
|
|
|
|
+// ReleaseIngress releases the ingress networking.
|
|
|
|
+func (daemon *Daemon) ReleaseIngress() error {
|
|
|
|
+ daemon.enqueueIngressJob(&ingressJob{nil, nil})
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
|
|
- if n, err := daemon.GetNetworkByName(create.Name); err == nil && n != nil && n.ID() != create.ID {
|
|
|
|
- if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
|
|
|
|
- logrus.Errorf("Failed to delete stale ingress sandbox: %v", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {
|
|
|
|
+ controller := daemon.netController
|
|
|
|
+ controller.AgentInitWait()
|
|
|
|
|
|
- // Cleanup any stale endpoints that might be left over during previous iterations
|
|
|
|
- epList := n.Endpoints()
|
|
|
|
- for _, ep := range epList {
|
|
|
|
- if err := ep.Delete(true); err != nil {
|
|
|
|
- logrus.Errorf("Failed to delete endpoint %s (%s): %v", ep.Name(), ep.ID(), err)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ if staleID != "" && staleID != create.ID {
|
|
|
|
+ daemon.releaseIngress(staleID)
|
|
|
|
+ }
|
|
|
|
|
|
- if err := n.Delete(); err != nil {
|
|
|
|
- logrus.Errorf("Failed to delete stale ingress network %s: %v", n.ID(), err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil {
|
|
|
|
+ // If it is any other error other than already
|
|
|
|
+ // exists error log error and return.
|
|
|
|
+ if _, ok := err.(libnetwork.NetworkNameError); !ok {
|
|
|
|
+ logrus.Errorf("Failed creating ingress network: %v", err)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
+ // Otherwise continue down the call to create or recreate sandbox.
|
|
|
|
+ }
|
|
|
|
|
|
- if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil {
|
|
|
|
- // If it is any other error other than already
|
|
|
|
- // exists error log error and return.
|
|
|
|
- if _, ok := err.(libnetwork.NetworkNameError); !ok {
|
|
|
|
- logrus.Errorf("Failed creating ingress network: %v", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ n, err := daemon.GetNetworkByID(create.ID)
|
|
|
|
+ if err != nil {
|
|
|
|
+ logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
|
|
|
|
+ }
|
|
|
|
|
|
- // Otherwise continue down the call to create or recreate sandbox.
|
|
|
|
|
|
+ sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
|
|
|
|
+ if err != nil {
|
|
|
|
+ if _, ok := err.(networktypes.ForbiddenError); !ok {
|
|
|
|
+ logrus.Errorf("Failed creating ingress sandbox: %v", err)
|
|
}
|
|
}
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- n, err := daemon.GetNetworkByID(create.ID)
|
|
|
|
- if err != nil {
|
|
|
|
- logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
|
|
|
|
+ if err != nil {
|
|
|
|
+ logrus.Errorf("Failed creating ingress endpoint: %v", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
|
|
|
|
- if err != nil {
|
|
|
|
- if _, ok := err.(networktypes.ForbiddenError); !ok {
|
|
|
|
- logrus.Errorf("Failed creating ingress sandbox: %v", err)
|
|
|
|
- }
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ if err := ep.Join(sb, nil); err != nil {
|
|
|
|
+ logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
|
|
|
|
- if err != nil {
|
|
|
|
- logrus.Errorf("Failed creating ingress endpoint: %v", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ if err := sb.EnableService(); err != nil {
|
|
|
|
+ logrus.Errorf("Failed enabling service for ingress sandbox")
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
- if err := ep.Join(sb, nil); err != nil {
|
|
|
|
- logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
|
|
|
|
- }
|
|
|
|
|
|
+func (daemon *Daemon) releaseIngress(id string) {
|
|
|
|
+ controller := daemon.netController
|
|
|
|
|
|
- if err := sb.EnableService(); err != nil {
|
|
|
|
- logrus.WithError(err).Error("Failed enabling service for ingress sandbox")
|
|
|
|
|
|
+ if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
|
|
|
|
+ logrus.Errorf("Failed to delete ingress sandbox: %v", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if id == "" {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ n, err := controller.NetworkByID(id)
|
|
|
|
+ if err != nil {
|
|
|
|
+ logrus.Errorf("failed to retrieve ingress network %s: %v", id, err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, ep := range n.Endpoints() {
|
|
|
|
+ if err := ep.Delete(true); err != nil {
|
|
|
|
+ logrus.Errorf("Failed to delete endpoint %s (%s): %v", ep.Name(), ep.ID(), err)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
- }()
|
|
|
|
|
|
+ }
|
|
|
|
|
|
- return nil
|
|
|
|
|
|
+ if err := n.Delete(); err != nil {
|
|
|
|
+ logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
|
|
// SetNetworkBootstrapKeys sets the bootstrap keys.
|
|
// SetNetworkBootstrapKeys sets the bootstrap keys.
|
|
@@ -228,13 +275,6 @@ func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.N
|
|
}
|
|
}
|
|
|
|
|
|
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
|
|
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
|
|
- // If there is a pending ingress network creation wait here
|
|
|
|
- // since ingress network creation can happen via node download
|
|
|
|
- // from manager or task download.
|
|
|
|
- if isIngressNetwork(create.Name) {
|
|
|
|
- defer ingressWait()()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
|
|
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
|
|
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
|
|
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
|
|
return nil, apierrors.NewRequestForbiddenError(err)
|
|
return nil, apierrors.NewRequestForbiddenError(err)
|
|
@@ -267,6 +307,7 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string
|
|
libnetwork.NetworkOptionDriverOpts(create.Options),
|
|
libnetwork.NetworkOptionDriverOpts(create.Options),
|
|
libnetwork.NetworkOptionLabels(create.Labels),
|
|
libnetwork.NetworkOptionLabels(create.Labels),
|
|
libnetwork.NetworkOptionAttachable(create.Attachable),
|
|
libnetwork.NetworkOptionAttachable(create.Attachable),
|
|
|
|
+ libnetwork.NetworkOptionIngress(create.Ingress),
|
|
}
|
|
}
|
|
|
|
|
|
if create.IPAM != nil {
|
|
if create.IPAM != nil {
|
|
@@ -286,10 +327,6 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string
|
|
nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false))
|
|
nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false))
|
|
}
|
|
}
|
|
|
|
|
|
- if isIngressNetwork(create.Name) {
|
|
|
|
- nwOptions = append(nwOptions, libnetwork.NetworkOptionIngress())
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
|
|
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|