Vendor swarmkit @27fbaef4ceed6
Adds a new task runtime to support network attachmemt for plain old docker containers so that they can participate multi-host networks and get access to services. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
parent
426a0af075
commit
39143ef15d
59 changed files with 3284 additions and 1371 deletions
|
@ -72,7 +72,7 @@ clone git github.com/imdario/mergo 0.2.1
|
|||
|
||||
#get libnetwork packages
|
||||
clone git github.com/docker/libnetwork 82fb373e3eaa4e9bbb5b5ac148b0a3a71f80fca6
|
||||
clone git github.com/docker/go-events afb2b9f2c23f33ada1a22b03651775fdc65a5089
|
||||
clone git github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
|
||||
clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
|
||||
clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
|
||||
clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
|
||||
|
@ -85,7 +85,7 @@ clone git github.com/vishvananda/netlink e73bad418fd727ed3a02830b1af1ad0283a1de6
|
|||
clone git github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060
|
||||
clone git github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374
|
||||
clone git github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d
|
||||
clone git github.com/coreos/etcd 06e2338108fdc694349aed923f4a7e45cf0cec1f
|
||||
clone git github.com/coreos/etcd 3a49cbb769ebd8d1dd25abb1e83386e9883a5707
|
||||
clone git github.com/ugorji/go f1f1a805ed361a0e078bb537e4ea78cd37dcf065
|
||||
clone git github.com/hashicorp/consul v0.5.2
|
||||
clone git github.com/boltdb/bolt v1.2.1
|
||||
|
@ -144,11 +144,11 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
|
|||
clone git github.com/docker/containerd 8508d2bec90b96403143a1104cdcbd56f6aeb361
|
||||
|
||||
# cluster
|
||||
clone git github.com/docker/swarmkit 8a761950fb4d9251c335dc6149a8a02756cb3b10
|
||||
clone git github.com/docker/swarmkit 27fbaef4ceed648bb575969ccc9083a6e104a719
|
||||
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
|
||||
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b
|
||||
clone git github.com/google/certificate-transparency 025a5cab06f6a819c455d9fdc9e2a1b6d0982284
|
||||
clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||
clone git github.com/google/certificate-transparency 0f6e3d1d1ba4d03fdaab7cd716f36255c2e48341
|
||||
clone git golang.org/x/crypto 3fbbcd23f1cb824e69491a5930cfeff09b12f4d2 https://github.com/golang/crypto.git
|
||||
clone git golang.org/x/time a4bde12657593d5e90d0533a3e4fd95e635124cb https://github.com/golang/time.git
|
||||
clone git github.com/mreiferson/go-httpclient 63fe23f7434723dc904c901043af07931f293c47
|
||||
|
|
|
@ -69,3 +69,7 @@ JSON dictionary:
|
|||
or
|
||||
|
||||
{"driver":"postgres","data_source":"postgres://user:password@host/db"}
|
||||
|
||||
or
|
||||
|
||||
{"driver":"mysql","data_source":"user:password@tcp(hostname:3306)/db?parseTime=true"}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/asn1"
|
||||
"encoding/json"
|
||||
|
@ -59,26 +60,35 @@ type AuthRemote struct {
|
|||
AuthKeyName string `json:"auth_key"`
|
||||
}
|
||||
|
||||
// CAConstraint specifies various CA constraints on the signed certificate.
|
||||
// CAConstraint would verify against (and override) the CA
|
||||
// extensions in the given CSR.
|
||||
type CAConstraint struct {
|
||||
IsCA bool `json:"is_ca"`
|
||||
MaxPathLen int `json:"max_path_len"`
|
||||
MaxPathLenZero bool `json:"max_path_len_zero"`
|
||||
}
|
||||
|
||||
// A SigningProfile stores information that the CA needs to store
|
||||
// signature policy.
|
||||
type SigningProfile struct {
|
||||
Usage []string `json:"usages"`
|
||||
IssuerURL []string `json:"issuer_urls"`
|
||||
OCSP string `json:"ocsp_url"`
|
||||
CRL string `json:"crl_url"`
|
||||
CA bool `json:"is_ca"`
|
||||
OCSPNoCheck bool `json:"ocsp_no_check"`
|
||||
ExpiryString string `json:"expiry"`
|
||||
BackdateString string `json:"backdate"`
|
||||
AuthKeyName string `json:"auth_key"`
|
||||
RemoteName string `json:"remote"`
|
||||
NotBefore time.Time `json:"not_before"`
|
||||
NotAfter time.Time `json:"not_after"`
|
||||
NameWhitelistString string `json:"name_whitelist"`
|
||||
AuthRemote AuthRemote `json:"auth_remote"`
|
||||
CTLogServers []string `json:"ct_log_servers"`
|
||||
AllowedExtensions []OID `json:"allowed_extensions"`
|
||||
CertStore string `json:"cert_store"`
|
||||
Usage []string `json:"usages"`
|
||||
IssuerURL []string `json:"issuer_urls"`
|
||||
OCSP string `json:"ocsp_url"`
|
||||
CRL string `json:"crl_url"`
|
||||
CAConstraint CAConstraint `json:"ca_constraint"`
|
||||
OCSPNoCheck bool `json:"ocsp_no_check"`
|
||||
ExpiryString string `json:"expiry"`
|
||||
BackdateString string `json:"backdate"`
|
||||
AuthKeyName string `json:"auth_key"`
|
||||
RemoteName string `json:"remote"`
|
||||
NotBefore time.Time `json:"not_before"`
|
||||
NotAfter time.Time `json:"not_after"`
|
||||
NameWhitelistString string `json:"name_whitelist"`
|
||||
AuthRemote AuthRemote `json:"auth_remote"`
|
||||
CTLogServers []string `json:"ct_log_servers"`
|
||||
AllowedExtensions []OID `json:"allowed_extensions"`
|
||||
CertStore string `json:"cert_store"`
|
||||
|
||||
Policies []CertificatePolicy
|
||||
Expiry time.Duration
|
||||
|
@ -86,6 +96,8 @@ type SigningProfile struct {
|
|||
Provider auth.Provider
|
||||
RemoteProvider auth.Provider
|
||||
RemoteServer string
|
||||
RemoteCAs *x509.CertPool
|
||||
ClientCert *tls.Certificate
|
||||
CSRWhitelist *CSRWhitelist
|
||||
NameWhitelist *regexp.Regexp
|
||||
ExtensionWhitelist map[string]bool
|
||||
|
@ -303,6 +315,44 @@ func (p *Signing) OverrideRemotes(remote string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetClientCertKeyPairFromFile updates the properties to set client certificates for mutual
|
||||
// authenticated TLS remote requests
|
||||
func (p *Signing) SetClientCertKeyPairFromFile(certFile string, keyFile string) error {
|
||||
if certFile != "" && keyFile != "" {
|
||||
cert, err := helpers.LoadClientCertificate(certFile, keyFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, profile := range p.Profiles {
|
||||
profile.ClientCert = cert
|
||||
}
|
||||
p.Default.ClientCert = cert
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetRemoteCAsFromFile reads root CAs from file and updates the properties to set remote CAs for TLS
|
||||
// remote requests
|
||||
func (p *Signing) SetRemoteCAsFromFile(caFile string) error {
|
||||
if caFile != "" {
|
||||
remoteCAs, err := helpers.LoadPEMCertPool(caFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.SetRemoteCAs(remoteCAs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetRemoteCAs updates the properties to set remote CAs for TLS
|
||||
// remote requests
|
||||
func (p *Signing) SetRemoteCAs(remoteCAs *x509.CertPool) {
|
||||
for _, profile := range p.Profiles {
|
||||
profile.RemoteCAs = remoteCAs
|
||||
}
|
||||
p.Default.RemoteCAs = remoteCAs
|
||||
}
|
||||
|
||||
// NeedsRemoteSigner returns true if one of the profiles has a remote set
|
||||
func (p *Signing) NeedsRemoteSigner() bool {
|
||||
for _, profile := range p.Profiles {
|
||||
|
@ -360,6 +410,11 @@ func (p *SigningProfile) validProfile(isDefault bool) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if p.AuthRemote.RemoteName == "" && p.AuthRemote.AuthKeyName != "" {
|
||||
log.Debugf("invalid auth remote profile: no remote signer specified")
|
||||
return false
|
||||
}
|
||||
|
||||
if p.RemoteName != "" {
|
||||
log.Debugf("validate remote profile")
|
||||
|
||||
|
@ -375,6 +430,7 @@ func (p *SigningProfile) validProfile(isDefault bool) bool {
|
|||
|
||||
if p.AuthRemote.RemoteName != "" {
|
||||
log.Debugf("invalid remote profile: auth remote is also specified")
|
||||
return false
|
||||
}
|
||||
} else if p.AuthRemote.RemoteName != "" {
|
||||
log.Debugf("validate auth remote profile")
|
||||
|
@ -409,6 +465,43 @@ func (p *SigningProfile) validProfile(isDefault bool) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// This checks if the SigningProfile object contains configurations that are only effective with a local signer
|
||||
// which has access to CA private key.
|
||||
func (p *SigningProfile) hasLocalConfig() bool {
|
||||
if p.Usage != nil ||
|
||||
p.IssuerURL != nil ||
|
||||
p.OCSP != "" ||
|
||||
p.ExpiryString != "" ||
|
||||
p.BackdateString != "" ||
|
||||
p.CAConstraint.IsCA != false ||
|
||||
!p.NotBefore.IsZero() ||
|
||||
!p.NotAfter.IsZero() ||
|
||||
p.NameWhitelistString != "" ||
|
||||
len(p.CTLogServers) != 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// warnSkippedSettings prints a log warning message about skipped settings
|
||||
// in a SigningProfile, usually due to remote signer.
|
||||
func (p *Signing) warnSkippedSettings() {
|
||||
const warningMessage = `The configuration value by "usages", "issuer_urls", "ocsp_url", "crl_url", "ca_constraint", "expiry", "backdate", "not_before", "not_after", "cert_store" and "ct_log_servers" are skipped`
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if (p.Default.RemoteName != "" || p.Default.AuthRemote.RemoteName != "") && p.Default.hasLocalConfig() {
|
||||
log.Warning("default profile points to a remote signer: ", warningMessage)
|
||||
}
|
||||
|
||||
for name, profile := range p.Profiles {
|
||||
if (profile.RemoteName != "" || profile.AuthRemote.RemoteName != "") && profile.hasLocalConfig() {
|
||||
log.Warningf("Profiles[%s] points to a remote signer: %s", name, warningMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Signing codifies the signature configuration policy for a CA.
|
||||
type Signing struct {
|
||||
Profiles map[string]*SigningProfile `json:"profiles"`
|
||||
|
@ -450,6 +543,9 @@ func (p *Signing) Valid() bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
p.warnSkippedSettings()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -149,6 +149,8 @@ const (
|
|||
|
||||
// UnknownProfile indicates that the profile does not exist.
|
||||
UnknownProfile // 54XX
|
||||
|
||||
UnmatchedWhitelist // 55xx
|
||||
)
|
||||
|
||||
// The following are API client related errors, and should be
|
||||
|
@ -313,6 +315,8 @@ func New(category Category, reason Reason) *Error {
|
|||
msg = "Policy violation request"
|
||||
case UnknownProfile:
|
||||
msg = "Unknown policy profile"
|
||||
case UnmatchedWhitelist:
|
||||
msg = "Request does not match policy whitelist"
|
||||
default:
|
||||
panic(fmt.Sprintf("Unsupported CFSSL error reason %d under category PolicyError.",
|
||||
reason))
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/asn1"
|
||||
"encoding/pem"
|
||||
|
@ -311,11 +312,23 @@ func ParseOneCertificateFromPEM(certsPEM []byte) ([]*x509.Certificate, []byte, e
|
|||
|
||||
// LoadPEMCertPool loads a pool of PEM certificates from file.
|
||||
func LoadPEMCertPool(certsFile string) (*x509.CertPool, error) {
|
||||
if certsFile == "" {
|
||||
return nil, nil
|
||||
}
|
||||
pemCerts, err := ioutil.ReadFile(certsFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return PEMToCertPool(pemCerts)
|
||||
}
|
||||
|
||||
// PEMToCertPool concerts PEM certificates to a CertPool.
|
||||
func PEMToCertPool(pemCerts []byte) (*x509.CertPool, error) {
|
||||
if len(pemCerts) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
certPool := x509.NewCertPool()
|
||||
if !certPool.AppendCertsFromPEM(pemCerts) {
|
||||
return nil, errors.New("failed to load cert pool")
|
||||
|
@ -477,3 +490,29 @@ func SignerAlgo(priv crypto.Signer) x509.SignatureAlgorithm {
|
|||
return x509.UnknownSignatureAlgorithm
|
||||
}
|
||||
}
|
||||
|
||||
// LoadClientCertificate load key/certificate from pem files
|
||||
func LoadClientCertificate(certFile string, keyFile string) (*tls.Certificate, error) {
|
||||
if certFile != "" && keyFile != "" {
|
||||
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err != nil {
|
||||
log.Critical("Unable to read client certificate from file: %s or key from file: %s", certFile, keyFile)
|
||||
return nil, err
|
||||
}
|
||||
log.Debug("Client certificate loaded ")
|
||||
return &cert, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// CreateTLSConfig creates a tls.Config object from certs and roots
|
||||
func CreateTLSConfig(remoteCAs *x509.CertPool, cert *tls.Certificate) *tls.Config {
|
||||
var certs []tls.Certificate
|
||||
if cert != nil {
|
||||
certs = []tls.Certificate{*cert}
|
||||
}
|
||||
return &tls.Config{
|
||||
Certificates: certs,
|
||||
RootCAs: remoteCAs,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,13 +48,16 @@ func New(req *csr.CertificateRequest) (cert, csrPEM, key []byte, err error) {
|
|||
if req.CA.Expiry != "" {
|
||||
policy.Default.ExpiryString = req.CA.Expiry
|
||||
policy.Default.Expiry, err = time.ParseDuration(req.CA.Expiry)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
signer.MaxPathLen = req.CA.PathLength
|
||||
policy.Default.CAConstraint.MaxPathLen = req.CA.PathLength
|
||||
if req.CA.PathLength != 0 && req.CA.PathLenZero == true {
|
||||
log.Infof("ignore invalid 'pathlenzero' value")
|
||||
} else {
|
||||
signer.MaxPathLenZero = req.CA.PathLenZero
|
||||
policy.Default.CAConstraint.MaxPathLenZero = req.CA.PathLenZero
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,12 +75,11 @@ func New(req *csr.CertificateRequest) (cert, csrPEM, key []byte, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
s, err := local.NewSigner(priv, nil, signer.DefaultSigAlgo(priv), nil)
|
||||
s, err := local.NewSigner(priv, nil, signer.DefaultSigAlgo(priv), policy)
|
||||
if err != nil {
|
||||
log.Errorf("failed to create signer: %v", err)
|
||||
return
|
||||
}
|
||||
s.SetPolicy(policy)
|
||||
|
||||
signReq := signer.SignRequest{Hosts: req.Hosts, Request: string(csrPEM)}
|
||||
cert, err = s.Sign(signReq)
|
||||
|
@ -143,11 +145,11 @@ func NewFromSigner(req *csr.CertificateRequest, priv crypto.Signer) (cert, csrPE
|
|||
}
|
||||
}
|
||||
|
||||
signer.MaxPathLen = req.CA.PathLength
|
||||
policy.Default.CAConstraint.MaxPathLen = req.CA.PathLength
|
||||
if req.CA.PathLength != 0 && req.CA.PathLenZero == true {
|
||||
log.Infof("ignore invalid 'pathlenzero' value")
|
||||
} else {
|
||||
signer.MaxPathLenZero = req.CA.PathLenZero
|
||||
policy.Default.CAConstraint.MaxPathLenZero = req.CA.PathLenZero
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,12 +158,11 @@ func NewFromSigner(req *csr.CertificateRequest, priv crypto.Signer) (cert, csrPE
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
s, err := local.NewSigner(priv, nil, signer.DefaultSigAlgo(priv), nil)
|
||||
s, err := local.NewSigner(priv, nil, signer.DefaultSigAlgo(priv), policy)
|
||||
if err != nil {
|
||||
log.Errorf("failed to create signer: %v", err)
|
||||
return
|
||||
}
|
||||
s.SetPolicy(policy)
|
||||
|
||||
signReq := signer.SignRequest{Request: string(csrPEM)}
|
||||
cert, err = s.Sign(signReq)
|
||||
|
@ -217,7 +218,7 @@ var CAPolicy = func() *config.Signing {
|
|||
Usage: []string{"cert sign", "crl sign"},
|
||||
ExpiryString: "43800h",
|
||||
Expiry: 5 * helpers.OneYear,
|
||||
CA: true,
|
||||
CAConstraint: config.CAConstraint{IsCA: true},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
@ -63,13 +62,6 @@ func SetLogger(logger SyslogWriter) {
|
|||
syslogWriter = logger
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Only define loglevel flag once.
|
||||
if flag.Lookup("loglevel") == nil {
|
||||
flag.IntVar(&Level, "loglevel", LevelInfo, "Log level (0 = DEBUG, 5 = FATAL)")
|
||||
}
|
||||
}
|
||||
|
||||
func print(l int, msg string) {
|
||||
if l >= Level {
|
||||
if syslogWriter != nil {
|
||||
|
|
|
@ -115,9 +115,6 @@ func (s *Signer) sign(template *x509.Certificate, profile *config.SigningProfile
|
|||
template.EmailAddresses = nil
|
||||
s.ca = template
|
||||
initRoot = true
|
||||
} else if template.IsCA {
|
||||
template.DNSNames = nil
|
||||
template.EmailAddresses = nil
|
||||
}
|
||||
|
||||
derBytes, err := x509.CreateCertificate(rand.Reader, template, s.ca, template.PublicKey, s.priv)
|
||||
|
@ -250,18 +247,21 @@ func (s *Signer) Sign(req signer.SignRequest) (cert []byte, err error) {
|
|||
}
|
||||
|
||||
if safeTemplate.IsCA {
|
||||
if !profile.CA {
|
||||
return nil, cferr.New(cferr.CertificateError, cferr.InvalidRequest)
|
||||
if !profile.CAConstraint.IsCA {
|
||||
log.Error("local signer policy disallows issuing CA certificate")
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidRequest)
|
||||
}
|
||||
|
||||
if s.ca != nil && s.ca.MaxPathLen > 0 {
|
||||
if safeTemplate.MaxPathLen >= s.ca.MaxPathLen {
|
||||
log.Error("local signer certificate disallows CA MaxPathLen extending")
|
||||
// do not sign a cert with pathlen > current
|
||||
return nil, cferr.New(cferr.CertificateError, cferr.InvalidRequest)
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidRequest)
|
||||
}
|
||||
} else if s.ca != nil && s.ca.MaxPathLen == 0 && s.ca.MaxPathLenZero {
|
||||
log.Error("local signer certificate disallows issuing CA certificate")
|
||||
// signer has pathlen of 0, do not sign more intermediate CAs
|
||||
return nil, cferr.New(cferr.CertificateError, cferr.InvalidRequest)
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidRequest)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,17 +272,17 @@ func (s *Signer) Sign(req signer.SignRequest) (cert []byte, err error) {
|
|||
if profile.NameWhitelist != nil {
|
||||
if safeTemplate.Subject.CommonName != "" {
|
||||
if profile.NameWhitelist.Find([]byte(safeTemplate.Subject.CommonName)) == nil {
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidPolicy)
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.UnmatchedWhitelist)
|
||||
}
|
||||
}
|
||||
for _, name := range safeTemplate.DNSNames {
|
||||
if profile.NameWhitelist.Find([]byte(name)) == nil {
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidPolicy)
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.UnmatchedWhitelist)
|
||||
}
|
||||
}
|
||||
for _, name := range safeTemplate.EmailAddresses {
|
||||
if profile.NameWhitelist.Find([]byte(name)) == nil {
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.InvalidPolicy)
|
||||
return nil, cferr.New(cferr.PolicyError, cferr.UnmatchedWhitelist)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -352,7 +352,7 @@ func (s *Signer) Sign(req signer.SignRequest) (cert []byte, err error) {
|
|||
|
||||
for _, server := range profile.CTLogServers {
|
||||
log.Infof("submitting poisoned precertificate to %s", server)
|
||||
var ctclient = client.New(server)
|
||||
var ctclient = client.New(server, nil)
|
||||
var resp *ct.SignedCertificateTimestamp
|
||||
resp, err = ctclient.AddPreChain(prechain)
|
||||
if err != nil {
|
||||
|
|
|
@ -23,12 +23,6 @@ import (
|
|||
"github.com/cloudflare/cfssl/info"
|
||||
)
|
||||
|
||||
// MaxPathLen is the default path length for a new CA certificate.
|
||||
var MaxPathLen = 2
|
||||
|
||||
// MaxPathLenZero indicates whether a new CA certificate has pathlen=0
|
||||
var MaxPathLenZero = false
|
||||
|
||||
// Subject contains the information that should be used to override the
|
||||
// subject information when signing a certificate.
|
||||
type Subject struct {
|
||||
|
@ -294,7 +288,15 @@ func FillTemplate(template *x509.Certificate, defaultProfile, profile *config.Si
|
|||
template.KeyUsage = ku
|
||||
template.ExtKeyUsage = eku
|
||||
template.BasicConstraintsValid = true
|
||||
template.IsCA = profile.CA
|
||||
template.IsCA = profile.CAConstraint.IsCA
|
||||
if template.IsCA {
|
||||
template.MaxPathLen = profile.CAConstraint.MaxPathLen
|
||||
if template.MaxPathLen == 0 {
|
||||
template.MaxPathLenZero = profile.CAConstraint.MaxPathLenZero
|
||||
}
|
||||
template.DNSNames = nil
|
||||
template.EmailAddresses = nil
|
||||
}
|
||||
template.SubjectKeyId = ski
|
||||
|
||||
if ocspURL != "" {
|
||||
|
|
|
@ -96,3 +96,26 @@ func Exist(name string) bool {
|
|||
_, err := os.Stat(name)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// ZeroToEnd zeros a file starting from SEEK_CUR to its SEEK_END. May temporarily
|
||||
// shorten the length of the file.
|
||||
func ZeroToEnd(f *os.File) error {
|
||||
// TODO: support FALLOC_FL_ZERO_RANGE
|
||||
off, err := f.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lenf, lerr := f.Seek(0, os.SEEK_END)
|
||||
if lerr != nil {
|
||||
return lerr
|
||||
}
|
||||
if err = f.Truncate(off); err != nil {
|
||||
return err
|
||||
}
|
||||
// make sure blocks remain allocated
|
||||
if err = Preallocate(f, lenf, true); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Seek(off, os.SEEK_SET)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -232,7 +232,7 @@ func (l *raftLog) term(i uint64) (uint64, error) {
|
|||
if err == nil {
|
||||
return t, nil
|
||||
}
|
||||
if err == ErrCompacted {
|
||||
if err == ErrCompacted || err == ErrUnavailable {
|
||||
return 0, err
|
||||
}
|
||||
panic(err) // TODO(bdarnell)
|
||||
|
@ -339,7 +339,7 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
|
|||
return ErrCompacted
|
||||
}
|
||||
|
||||
length := l.lastIndex() - fi + 1
|
||||
length := l.lastIndex() + 1 - fi
|
||||
if lo < fi || hi > fi+length {
|
||||
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
|
||||
}
|
||||
|
|
12
vendor/src/github.com/coreos/etcd/raft/node.go
vendored
12
vendor/src/github.com/coreos/etcd/raft/node.go
vendored
|
@ -144,6 +144,9 @@ type Node interface {
|
|||
// to match MemoryStorage.Compact.
|
||||
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
|
||||
|
||||
// TransferLeadership attempts to transfer leadership to the given transferee.
|
||||
TransferLeadership(ctx context.Context, lead, transferee uint64)
|
||||
|
||||
// ReadIndex request a read state. The read state will be set in the ready.
|
||||
// Read state has a read index. Once the application advances further than the read
|
||||
// index, any linearizable read requests issued before the read request can be
|
||||
|
@ -485,6 +488,15 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
|
||||
select {
|
||||
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
|
||||
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
|
||||
case <-n.done:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
|
||||
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
|
||||
}
|
||||
|
|
12
vendor/src/github.com/coreos/etcd/raft/raft.go
vendored
12
vendor/src/github.com/coreos/etcd/raft/raft.go
vendored
|
@ -590,11 +590,6 @@ func (r *raft) Step(m pb.Message) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
if m.Type == pb.MsgTransferLeader {
|
||||
if r.state != StateLeader {
|
||||
r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From)
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case m.Term == 0:
|
||||
|
@ -874,6 +869,13 @@ func stepFollower(r *raft, m pb.Message) {
|
|||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||
}
|
||||
case pb.MsgTransferLeader:
|
||||
if r.lead == None {
|
||||
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
|
||||
return
|
||||
}
|
||||
m.To = r.lead
|
||||
r.send(m)
|
||||
case pb.MsgTimeoutNow:
|
||||
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
||||
r.campaign(campaignTransfer)
|
||||
|
|
|
@ -130,6 +130,9 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
|
|||
if i < offset {
|
||||
return 0, ErrCompacted
|
||||
}
|
||||
if int(i-offset) >= len(ms.ents) {
|
||||
return 0, ErrUnavailable
|
||||
}
|
||||
return ms.ents[i-offset].Term, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ func max(a, b uint64) uint64 {
|
|||
|
||||
func IsLocalMsg(msgt pb.MessageType) bool {
|
||||
return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
|
||||
msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader
|
||||
msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum
|
||||
}
|
||||
|
||||
func IsResponseMsg(msgt pb.MessageType) bool {
|
||||
|
|
30
vendor/src/github.com/coreos/etcd/wal/wal.go
vendored
30
vendor/src/github.com/coreos/etcd/wal/wal.go
vendored
|
@ -131,22 +131,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// rename of directory with locked files doesn't work on windows; close
|
||||
// the WAL to release the locks so the directory can be renamed
|
||||
w.Close()
|
||||
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
}
|
||||
return newWAL, nil
|
||||
return w.renameWal(tmpdirpath)
|
||||
}
|
||||
|
||||
// Open opens the WAL at the given snap.
|
||||
|
@ -301,6 +286,18 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||
state.Reset()
|
||||
return nil, state, nil, err
|
||||
}
|
||||
// decodeRecord() will return io.EOF if it detects a zero record,
|
||||
// but this zero record may be followed by non-zero records from
|
||||
// a torn write. Overwriting some of these non-zero records, but
|
||||
// not all, will cause CRC errors on WAL open. Since the records
|
||||
// were never fully synced to disk in the first place, it's safe
|
||||
// to zero them out to avoid any CRC errors from new writes.
|
||||
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = nil
|
||||
|
@ -319,7 +316,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||
|
||||
if w.tail() != nil {
|
||||
// create encoder (chain crc with the decoder), enable appending
|
||||
_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
|
||||
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
||||
}
|
||||
w.decoder = nil
|
||||
|
|
38
vendor/src/github.com/coreos/etcd/wal/wal_unix.go
vendored
Normal file
38
vendor/src/github.com/coreos/etcd/wal/wal_unix.go
vendored
Normal file
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows
|
||||
|
||||
package wal
|
||||
|
||||
import "os"
|
||||
|
||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||
// the lock and trying to reacquire it quickly can be flaky because
|
||||
// it's possible the process will fork to spawn a process while this is
|
||||
// happening. The fds are set up as close-on-exec by the Go runtime,
|
||||
// but there is a window between the fork and the exec where another
|
||||
// process holds the lock.
|
||||
|
||||
if err := os.RemoveAll(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
|
||||
return w, nil
|
||||
}
|
41
vendor/src/github.com/coreos/etcd/wal/wal_windows.go
vendored
Normal file
41
vendor/src/github.com/coreos/etcd/wal/wal_windows.go
vendored
Normal file
|
@ -0,0 +1,41 @@
|
|||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
)
|
||||
|
||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
// rename of directory with locked files doesn't work on
|
||||
// windows; close the WAL to release the locks so the directory
|
||||
// can be renamed
|
||||
w.Close()
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(w.dir, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
}
|
||||
return newWAL, nil
|
||||
}
|
|
@ -7,7 +7,7 @@ import (
|
|||
)
|
||||
|
||||
// NewTCPSocket creates a TCP socket listener with the specified address and
|
||||
// the specified tls configuration. If TLSConfig is set, will encapsulate the
|
||||
// and the specified tls configuration. If TLSConfig is set, will encapsulate the
|
||||
// TCP listener inside a TLS one.
|
||||
func NewTCPSocket(addr string, tlsConfig *tls.Config) (net.Listener, error) {
|
||||
l, err := net.Listen("tcp", addr)
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
package events
|
||||
|
||||
import "github.com/Sirupsen/logrus"
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
||||
// component is to dispatch events to configured endpoints. Reliability can be
|
||||
|
@ -10,7 +15,10 @@ type Broadcaster struct {
|
|||
events chan Event
|
||||
adds chan configureRequest
|
||||
removes chan configureRequest
|
||||
closed chan chan struct{}
|
||||
|
||||
shutdown chan struct{}
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewBroadcaster appends one or more sinks to the list of sinks. The
|
||||
|
@ -19,11 +27,12 @@ type Broadcaster struct {
|
|||
// its own. Use of EventQueue and RetryingSink should be used here.
|
||||
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
||||
b := Broadcaster{
|
||||
sinks: sinks,
|
||||
events: make(chan Event),
|
||||
adds: make(chan configureRequest),
|
||||
removes: make(chan configureRequest),
|
||||
closed: make(chan chan struct{}),
|
||||
sinks: sinks,
|
||||
events: make(chan Event),
|
||||
adds: make(chan configureRequest),
|
||||
removes: make(chan configureRequest),
|
||||
shutdown: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the broadcaster
|
||||
|
@ -82,24 +91,19 @@ func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
|
|||
// Close the broadcaster, ensuring that all messages are flushed to the
|
||||
// underlying sink before returning.
|
||||
func (b *Broadcaster) Close() error {
|
||||
select {
|
||||
case <-b.closed:
|
||||
// already closed
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
// do a little chan handoff dance to synchronize closing
|
||||
closed := make(chan struct{})
|
||||
b.closed <- closed
|
||||
close(b.closed)
|
||||
<-closed
|
||||
return nil
|
||||
}
|
||||
b.once.Do(func() {
|
||||
close(b.shutdown)
|
||||
})
|
||||
|
||||
<-b.closed
|
||||
return nil
|
||||
}
|
||||
|
||||
// run is the main broadcast loop, started when the broadcaster is created.
|
||||
// Under normal conditions, it waits for events on the event channel. After
|
||||
// Close is called, this goroutine will exit.
|
||||
func (b *Broadcaster) run() {
|
||||
defer close(b.closed)
|
||||
remove := func(target Sink) {
|
||||
for i, sink := range b.sinks {
|
||||
if sink == target {
|
||||
|
@ -143,7 +147,7 @@ func (b *Broadcaster) run() {
|
|||
case request := <-b.removes:
|
||||
remove(request.sink)
|
||||
request.response <- nil
|
||||
case closing := <-b.closed:
|
||||
case <-b.shutdown:
|
||||
// close all the underlying sinks
|
||||
for _, sink := range b.sinks {
|
||||
if err := sink.Close(); err != nil && err != ErrSinkClosed {
|
||||
|
@ -151,8 +155,24 @@ func (b *Broadcaster) run() {
|
|||
Errorf("broadcaster: closing sink failed")
|
||||
}
|
||||
}
|
||||
closing <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b Broadcaster) String() string {
|
||||
// Serialize copy of this broadcaster without the sync.Once, to avoid
|
||||
// a data race.
|
||||
|
||||
b2 := map[string]interface{}{
|
||||
"sinks": b.sinks,
|
||||
"events": b.events,
|
||||
"adds": b.adds,
|
||||
"removes": b.removes,
|
||||
|
||||
"shutdown": b.shutdown,
|
||||
"closed": b.closed,
|
||||
}
|
||||
|
||||
return fmt.Sprint(b2)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,10 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Channel provides a sink that can be listened on. The writer and channel
|
||||
// listener must operate in separate goroutines.
|
||||
//
|
||||
|
@ -8,6 +13,7 @@ type Channel struct {
|
|||
C chan Event
|
||||
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewChannel returns a channel. If buffer is zero, the channel is
|
||||
|
@ -37,11 +43,19 @@ func (ch *Channel) Write(event Event) error {
|
|||
|
||||
// Close the channel sink.
|
||||
func (ch *Channel) Close() error {
|
||||
select {
|
||||
case <-ch.closed:
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
ch.once.Do(func() {
|
||||
close(ch.closed)
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch Channel) String() string {
|
||||
// Serialize a copy of the Channel that doesn't contain the sync.Once,
|
||||
// to avoid a data race.
|
||||
ch2 := map[string]interface{}{
|
||||
"C": ch.C,
|
||||
"closed": ch.closed,
|
||||
}
|
||||
return fmt.Sprint(ch2)
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ func (f *Filter) Write(event Event) error {
|
|||
func (f *Filter) Close() error {
|
||||
// TODO(stevvooe): Not all sinks should have Close.
|
||||
if f.closed {
|
||||
return ErrSinkClosed
|
||||
return nil
|
||||
}
|
||||
|
||||
f.closed = true
|
||||
|
|
|
@ -52,7 +52,7 @@ func (eq *Queue) Close() error {
|
|||
defer eq.mu.Unlock()
|
||||
|
||||
if eq.closed {
|
||||
return ErrSinkClosed
|
||||
return nil
|
||||
}
|
||||
|
||||
// set closed flag
|
||||
|
|
21
vendor/src/github.com/docker/go-events/retry.go
vendored
21
vendor/src/github.com/docker/go-events/retry.go
vendored
|
@ -1,6 +1,7 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -18,6 +19,7 @@ type RetryingSink struct {
|
|||
sink Sink
|
||||
strategy RetryStrategy
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewRetryingSink returns a sink that will retry writes to a sink, backing
|
||||
|
@ -81,13 +83,22 @@ retry:
|
|||
|
||||
// Close closes the sink and the underlying sink.
|
||||
func (rs *RetryingSink) Close() error {
|
||||
select {
|
||||
case <-rs.closed:
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
rs.once.Do(func() {
|
||||
close(rs.closed)
|
||||
return rs.sink.Close()
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs RetryingSink) String() string {
|
||||
// Serialize a copy of the RetryingSink without the sync.Once, to avoid
|
||||
// a data race.
|
||||
rs2 := map[string]interface{}{
|
||||
"sink": rs.sink,
|
||||
"strategy": rs.strategy,
|
||||
"closed": rs.closed,
|
||||
}
|
||||
return fmt.Sprint(rs2)
|
||||
}
|
||||
|
||||
// RetryStrategy defines a strategy for retrying event sink writes.
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"github.com/boltdb/bolt"
|
||||
"github.com/docker/swarmkit/agent/exec"
|
||||
"github.com/docker/swarmkit/api"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
|
@ -17,7 +17,7 @@ type Config struct {
|
|||
|
||||
// Managers provides the manager backend used by the agent. It will be
|
||||
// updated with managers weights as observed by the agent.
|
||||
Managers picker.Remotes
|
||||
Managers remotes.Remotes
|
||||
|
||||
// Executor specifies the executor to use for the agent.
|
||||
Executor exec.Executor
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
"github.com/docker/swarmkit/ioutils"
|
||||
"github.com/docker/swarmkit/log"
|
||||
"github.com/docker/swarmkit/manager"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -178,7 +178,7 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
|
||||
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
|
||||
if n.config.JoinAddr != "" {
|
||||
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
|
||||
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,7 +204,7 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
}()
|
||||
|
||||
certDir := filepath.Join(n.config.StateDir, "certificates")
|
||||
securityConfig, err := ca.LoadOrCreateSecurityConfig(ctx, certDir, n.config.JoinToken, ca.ManagerRole, picker.NewPicker(n.remotes), issueResponseChan)
|
||||
securityConfig, err := ca.LoadOrCreateSecurityConfig(ctx, certDir, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
updates := ca.RenewTLSConfig(ctx, securityConfig, certDir, picker.NewPicker(n.remotes), forceCertRenewal)
|
||||
updates := ca.RenewTLSConfig(ctx, securityConfig, certDir, n.remotes, forceCertRenewal)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
@ -533,31 +533,29 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state := grpc.Idle
|
||||
client := api.NewHealthClient(conn)
|
||||
for {
|
||||
s, err := conn.WaitForStateChange(ctx, state)
|
||||
resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
|
||||
if err != nil {
|
||||
n.setControlSocket(nil)
|
||||
return err
|
||||
}
|
||||
if s == grpc.Ready {
|
||||
n.setControlSocket(conn)
|
||||
if ready != nil {
|
||||
close(ready)
|
||||
ready = nil
|
||||
}
|
||||
} else if state == grpc.Shutdown {
|
||||
n.setControlSocket(nil)
|
||||
if resp.Status == api.HealthCheckResponse_SERVING {
|
||||
break
|
||||
}
|
||||
state = s
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
n.setControlSocket(conn)
|
||||
if ready != nil {
|
||||
close(ready)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) waitRole(ctx context.Context, role string) {
|
||||
func (n *Node) waitRole(ctx context.Context, role string) error {
|
||||
n.roleCond.L.Lock()
|
||||
if role == n.role {
|
||||
n.roleCond.L.Unlock()
|
||||
return
|
||||
return nil
|
||||
}
|
||||
finishCh := make(chan struct{})
|
||||
defer close(finishCh)
|
||||
|
@ -572,18 +570,24 @@ func (n *Node) waitRole(ctx context.Context, role string) {
|
|||
defer n.roleCond.L.Unlock()
|
||||
for role != n.role {
|
||||
n.roleCond.Wait()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
|
||||
for {
|
||||
n.waitRole(ctx, ca.ManagerRole)
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteAddr, _ := n.remotes.Select(n.nodeID)
|
||||
m, err := manager.New(&manager.Config{
|
||||
ForceNewCluster: n.config.ForceNewCluster,
|
||||
|
@ -620,14 +624,14 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
|
|||
go func(ready chan struct{}) {
|
||||
select {
|
||||
case <-ready:
|
||||
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
|
||||
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, remotes.DefaultObservationWeight)
|
||||
case <-connCtx.Done():
|
||||
}
|
||||
}(ready)
|
||||
ready = nil
|
||||
}
|
||||
|
||||
n.waitRole(ctx, ca.AgentRole)
|
||||
err = n.waitRole(ctx, ca.AgentRole)
|
||||
|
||||
n.Lock()
|
||||
n.manager = nil
|
||||
|
@ -641,6 +645,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
|
|||
<-done
|
||||
}
|
||||
connCancel()
|
||||
n.setControlSocket(nil)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -651,15 +656,15 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
|
|||
type persistentRemotes struct {
|
||||
sync.RWMutex
|
||||
c *sync.Cond
|
||||
picker.Remotes
|
||||
remotes.Remotes
|
||||
storePath string
|
||||
lastSavedState []api.Peer
|
||||
}
|
||||
|
||||
func newPersistentRemotes(f string, remotes ...api.Peer) *persistentRemotes {
|
||||
func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
|
||||
pr := &persistentRemotes{
|
||||
storePath: f,
|
||||
Remotes: picker.NewRemotes(remotes...),
|
||||
Remotes: remotes.NewRemotes(peers...),
|
||||
}
|
||||
pr.c = sync.NewCond(pr.RLocker())
|
||||
return pr
|
||||
|
|
|
@ -79,6 +79,9 @@ func (sr *statusReporter) run(ctx context.Context) {
|
|||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
sr.mu.Lock() // released during wait, below.
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -88,27 +91,29 @@ func (sr *statusReporter) run(ctx context.Context) {
|
|||
}
|
||||
}()
|
||||
|
||||
sr.mu.Lock() // released during wait, below.
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
for {
|
||||
if len(sr.statuses) == 0 {
|
||||
sr.cond.Wait()
|
||||
}
|
||||
|
||||
for taskID, status := range sr.statuses {
|
||||
if sr.closed {
|
||||
// TODO(stevvooe): Add support here for waiting until all
|
||||
// statuses are flushed before shutting down.
|
||||
return
|
||||
}
|
||||
if sr.closed {
|
||||
// TODO(stevvooe): Add support here for waiting until all
|
||||
// statuses are flushed before shutting down.
|
||||
return
|
||||
}
|
||||
|
||||
for taskID, status := range sr.statuses {
|
||||
delete(sr.statuses, taskID) // delete the entry, while trying to send.
|
||||
|
||||
sr.mu.Unlock()
|
||||
err := sr.reporter.UpdateTaskStatus(ctx, taskID, status)
|
||||
sr.mu.Lock()
|
||||
|
||||
// reporter might be closed during UpdateTaskStatus call
|
||||
if sr.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed reporting status to agent")
|
||||
|
||||
|
|
69
vendor/src/github.com/docker/swarmkit/agent/resource.go
vendored
Normal file
69
vendor/src/github.com/docker/swarmkit/agent/resource.go
vendored
Normal file
|
@ -0,0 +1,69 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"github.com/docker/swarmkit/api"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type resourceAllocator struct {
|
||||
agent *Agent
|
||||
}
|
||||
|
||||
// ResourceAllocator is an interface to allocate resource such as
|
||||
// network attachments from a worker node.
|
||||
type ResourceAllocator interface {
|
||||
// AttachNetwork creates a network attachment in the manager
|
||||
// given a target network and a unique ID representing the
|
||||
// connecting entity and optionally a list of ipv4/ipv6
|
||||
// addresses to be assigned to the attachment. AttachNetwork
|
||||
// returns a unique ID for the attachment if successfull or an
|
||||
// error in case of failure.
|
||||
AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error)
|
||||
|
||||
// DetachNetworks deletes a network attachment for the passed
|
||||
// attachment ID. The attachment ID is obtained from a
|
||||
// previous AttachNetwork call.
|
||||
DetachNetwork(ctx context.Context, aID string) error
|
||||
}
|
||||
|
||||
// AttachNetwork creates a network attachment.
|
||||
func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) {
|
||||
var taskID string
|
||||
if err := r.agent.withSession(ctx, func(session *session) error {
|
||||
client := api.NewResourceAllocatorClient(session.conn)
|
||||
r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{
|
||||
Config: &api.NetworkAttachmentConfig{
|
||||
Target: target,
|
||||
Addresses: addresses,
|
||||
},
|
||||
ContainerID: id,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskID = r.AttachmentID
|
||||
return nil
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// DetachNetwork deletes a network attachment.
|
||||
func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error {
|
||||
return r.agent.withSession(ctx, func(session *session) error {
|
||||
client := api.NewResourceAllocatorClient(session.conn)
|
||||
_, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{
|
||||
AttachmentID: aID,
|
||||
})
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// ResourceAllocator provides an interface to access resource
|
||||
// allocation methods such as AttachNetwork and DetachNetwork.
|
||||
func (a *Agent) ResourceAllocator() ResourceAllocator {
|
||||
return &resourceAllocator{agent: a}
|
||||
}
|
|
@ -6,8 +6,8 @@ import (
|
|||
|
||||
"github.com/docker/swarmkit/api"
|
||||
"github.com/docker/swarmkit/log"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/protobuf/ptypes"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
@ -307,7 +307,7 @@ func (s *session) close() error {
|
|||
return errSessionClosed
|
||||
default:
|
||||
if s.conn != nil {
|
||||
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight)
|
||||
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight)
|
||||
s.conn.Close()
|
||||
}
|
||||
close(s.closed)
|
||||
|
|
|
@ -134,8 +134,6 @@ func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error {
|
|||
if err := PutTaskStatus(tx, task.ID, &task.Status); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
status = &task.Status
|
||||
} else {
|
||||
task.Status = *status // overwrite the stale manager status with ours.
|
||||
}
|
||||
|
@ -181,7 +179,7 @@ func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
|
|||
go func() {
|
||||
<-ctx.Done()
|
||||
w.mu.Lock()
|
||||
defer w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
delete(w.listeners, key) // remove the listener if the context is closed.
|
||||
}()
|
||||
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
package api
|
||||
|
||||
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto
|
||||
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto
|
||||
|
|
1105
vendor/src/github.com/docker/swarmkit/api/resource.pb.go
vendored
Normal file
1105
vendor/src/github.com/docker/swarmkit/api/resource.pb.go
vendored
Normal file
File diff suppressed because it is too large
Load diff
34
vendor/src/github.com/docker/swarmkit/api/resource.proto
vendored
Normal file
34
vendor/src/github.com/docker/swarmkit/api/resource.proto
vendored
Normal file
|
@ -0,0 +1,34 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package docker.swarmkit.v1;
|
||||
|
||||
import "types.proto";
|
||||
import "gogoproto/gogo.proto";
|
||||
import "plugin/plugin.proto";
|
||||
|
||||
// Allocator is the API provided by a manager group for agents to control the allocation of certain entities.
|
||||
//
|
||||
// API methods on this service are used only by agent nodes.
|
||||
service ResourceAllocator {
|
||||
rpc AttachNetwork(AttachNetworkRequest) returns (AttachNetworkResponse) {
|
||||
option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-worker" roles: "swarm-manager" };
|
||||
};
|
||||
rpc DetachNetwork(DetachNetworkRequest) returns (DetachNetworkResponse) {
|
||||
option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-worker" roles: "swarm-manager" };
|
||||
};
|
||||
}
|
||||
|
||||
message AttachNetworkRequest {
|
||||
NetworkAttachmentConfig config = 1;
|
||||
string container_id = 2 [(gogoproto.customname) = "ContainerID"];
|
||||
}
|
||||
|
||||
message AttachNetworkResponse {
|
||||
string attachment_id = 1 [(gogoproto.customname) = "AttachmentID"];
|
||||
}
|
||||
|
||||
message DetachNetworkRequest {
|
||||
string attachment_id = 1 [(gogoproto.customname) = "AttachmentID"];
|
||||
}
|
||||
|
||||
message DetachNetworkResponse {}
|
File diff suppressed because it is too large
Load diff
|
@ -72,19 +72,7 @@ message ServiceSpec {
|
|||
// UpdateConfig controls the rate and policy of updates.
|
||||
UpdateConfig update = 6;
|
||||
|
||||
// NetworkAttachmentConfig specifies how a service should be attached to a particular network.
|
||||
//
|
||||
// For now, this is a simple struct, but this can include future information
|
||||
// instructing Swarm on how this service should work on the particular
|
||||
// network.
|
||||
message NetworkAttachmentConfig {
|
||||
// Target specifies the target network for attachment. This value may be a
|
||||
// network name or identifier. Only identifiers are supported at this time.
|
||||
string target = 1;
|
||||
// Aliases specifies a list of discoverable alternate names for the service on this Target.
|
||||
repeated string aliases = 2;
|
||||
}
|
||||
repeated NetworkAttachmentConfig networks = 7;
|
||||
repeated NetworkAttachmentConfig networks = 7 [deprecated=true];
|
||||
|
||||
// Service endpoint specifies the user provided configuration
|
||||
// to properly discover and load balance a service.
|
||||
|
@ -103,6 +91,7 @@ message GlobalService {
|
|||
|
||||
message TaskSpec {
|
||||
oneof runtime {
|
||||
NetworkAttachmentSpec attachment = 8;
|
||||
ContainerSpec container = 1;
|
||||
}
|
||||
|
||||
|
@ -118,6 +107,19 @@ message TaskSpec {
|
|||
// LogDriver specifies the log driver to use for the task. Any runtime will
|
||||
// direct logs into the specified driver for the duration of the task.
|
||||
Driver log_driver = 6;
|
||||
|
||||
// Networks specifies the list of network attachment
|
||||
// configurations (which specify the network and per-network
|
||||
// aliases) that this task spec is bound to.
|
||||
repeated NetworkAttachmentConfig networks = 7;
|
||||
}
|
||||
|
||||
// NetworkAttachmentSpec specifies runtime parameters required to attach
|
||||
// a container to a network.
|
||||
message NetworkAttachmentSpec {
|
||||
// ContainerID spcifies a unique ID of the container for which
|
||||
// this attachment is for.
|
||||
string container_id = 1 [(gogoproto.customname) = "ContainerID"];
|
||||
}
|
||||
|
||||
// Container specifies runtime parameters for a container.
|
||||
|
@ -234,6 +236,15 @@ message NetworkSpec {
|
|||
bool internal = 4;
|
||||
|
||||
IPAMOptions ipam = 5 [(gogoproto.customname) = "IPAM"];
|
||||
|
||||
// Attachable allows external(to swarm) entities to manually
|
||||
// attach to this network. With this flag enabled, external
|
||||
// entities such as containers running in an worker node in
|
||||
// the cluster can manually attach to this network and access
|
||||
// the services attached to this network. If this flag is not
|
||||
// enabled(default case) no manual attachment to this network
|
||||
// can happen.
|
||||
bool attachable = 6;
|
||||
}
|
||||
|
||||
// ClusterSpec specifies global cluster settings.
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -407,6 +407,23 @@ message TaskStatus {
|
|||
}
|
||||
}
|
||||
|
||||
// NetworkAttachmentConfig specifies how a service should be attached to a particular network.
|
||||
//
|
||||
// For now, this is a simple struct, but this can include future information
|
||||
// instructing Swarm on how this service should work on the particular
|
||||
// network.
|
||||
message NetworkAttachmentConfig {
|
||||
// Target specifies the target network for attachment. This value may be a
|
||||
// network name or identifier. Only identifiers are supported at this time.
|
||||
string target = 1;
|
||||
// Aliases specifies a list of discoverable alternate names for the service on this Target.
|
||||
repeated string aliases = 2;
|
||||
// Addresses specifies a list of ipv4 and ipv6 addresses
|
||||
// preferred. If these addresses are not available then the
|
||||
// attachment might fail.
|
||||
repeated string addresses = 3;
|
||||
}
|
||||
|
||||
// IPAMConfig specifies parameters for IP Address Management.
|
||||
message IPAMConfig {
|
||||
// TODO(stevvooe): It may make more sense to manage IPAM and network
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
"github.com/docker/swarmkit/api"
|
||||
"github.com/docker/swarmkit/identity"
|
||||
"github.com/docker/swarmkit/ioutils"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -155,7 +155,7 @@ func (rca *RootCA) IssueAndSaveNewCertificates(paths CertPaths, cn, ou, org stri
|
|||
|
||||
// RequestAndSaveNewCertificates gets new certificates issued, either by signing them locally if a signer is
|
||||
// available, or by requesting them from the remote server at remoteAddr.
|
||||
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, paths CertPaths, token string, picker *picker.Picker, transport credentials.TransportAuthenticator, nodeInfo chan<- api.IssueNodeCertificateResponse) (*tls.Certificate, error) {
|
||||
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, paths CertPaths, token string, remotes remotes.Remotes, transport credentials.TransportAuthenticator, nodeInfo chan<- api.IssueNodeCertificateResponse) (*tls.Certificate, error) {
|
||||
// Create a new key/pair and CSR for the new manager
|
||||
// Write the new CSR and the new key to a temporary location so we can survive crashes on rotation
|
||||
tempPaths := genTempPaths(paths)
|
||||
|
@ -170,7 +170,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, paths Cert
|
|||
// responding properly (for example, it may have just been demoted).
|
||||
var signedCert []byte
|
||||
for i := 0; i != 5; i++ {
|
||||
signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, picker, transport, nodeInfo)
|
||||
signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, remotes, transport, nodeInfo)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -423,33 +423,38 @@ func GetLocalRootCA(baseDir string) (RootCA, error) {
|
|||
}
|
||||
|
||||
// GetRemoteCA returns the remote endpoint's CA certificate
|
||||
func GetRemoteCA(ctx context.Context, d digest.Digest, picker *picker.Picker) (RootCA, error) {
|
||||
// We need a valid picker to be able to Dial to a remote CA
|
||||
if picker == nil {
|
||||
return RootCA{}, fmt.Errorf("valid remote address picker required")
|
||||
}
|
||||
|
||||
func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) {
|
||||
// This TLS Config is intentionally using InsecureSkipVerify. Either we're
|
||||
// doing TOFU, in which case we don't validate the remote CA, or we're using
|
||||
// a user supplied hash to check the integrity of the CA certificate.
|
||||
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecureCreds),
|
||||
grpc.WithBackoffMaxDelay(10 * time.Second),
|
||||
grpc.WithPicker(picker)}
|
||||
grpc.WithTimeout(5 * time.Second),
|
||||
grpc.WithBackoffMaxDelay(5 * time.Second),
|
||||
}
|
||||
|
||||
firstAddr, err := picker.PickAddr()
|
||||
peer, err := r.Select()
|
||||
if err != nil {
|
||||
return RootCA{}, err
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(firstAddr, opts...)
|
||||
conn, err := grpc.Dial(peer.Addr, opts...)
|
||||
if err != nil {
|
||||
return RootCA{}, err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := api.NewCAClient(conn)
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
return
|
||||
}
|
||||
r.Observe(peer, remotes.DefaultObservationWeight)
|
||||
}()
|
||||
response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{})
|
||||
if err != nil {
|
||||
return RootCA{}, err
|
||||
|
@ -596,16 +601,12 @@ func GenerateAndWriteNewKey(paths CertPaths) (csr, key []byte, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// GetRemoteSignedCertificate submits a CSR to a remote CA server address
|
||||
// available through a picker, and that is part of a CA identified by a
|
||||
// specific certificate pool.
|
||||
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, picker *picker.Picker, creds credentials.TransportAuthenticator, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
|
||||
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
|
||||
// and that is part of a CA identified by a specific certificate pool.
|
||||
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportAuthenticator, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
|
||||
if rootCAPool == nil {
|
||||
return nil, fmt.Errorf("valid root CA pool required")
|
||||
}
|
||||
if picker == nil {
|
||||
return nil, fmt.Errorf("valid remote address picker required")
|
||||
}
|
||||
|
||||
if creds == nil {
|
||||
// This is our only non-MTLS request, and it happens when we are boostraping our TLS certs
|
||||
|
@ -613,17 +614,18 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
|
||||
}
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(creds),
|
||||
grpc.WithBackoffMaxDelay(10 * time.Second),
|
||||
grpc.WithPicker(picker)}
|
||||
|
||||
firstAddr, err := picker.PickAddr()
|
||||
peer, err := r.Select()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(firstAddr, opts...)
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(creds),
|
||||
grpc.WithTimeout(5 * time.Second),
|
||||
grpc.WithBackoffMaxDelay(5 * time.Second),
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(peer.Addr, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -655,8 +657,11 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
// Exponential backoff with Max of 30 seconds to wait for a new retry
|
||||
for {
|
||||
// Send the Request and retrieve the certificate
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
|
||||
if err != nil {
|
||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -672,6 +677,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
// retry until the certificate gets updated per our
|
||||
// current request.
|
||||
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
|
||||
r.Observe(peer, remotes.DefaultObservationWeight)
|
||||
return statusResponse.Certificate.Certificate, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/swarmkit/api"
|
||||
"github.com/docker/swarmkit/identity"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -183,7 +183,7 @@ func getCAHashFromToken(token string) (digest.Digest, error) {
|
|||
// LoadOrCreateSecurityConfig encapsulates the security logic behind joining a cluster.
|
||||
// Every node requires at least a set of TLS certificates with which to join the cluster with.
|
||||
// In the case of a manager, these certificates will be used both for client and server credentials.
|
||||
func LoadOrCreateSecurityConfig(ctx context.Context, baseCertDir, token, proposedRole string, picker *picker.Picker, nodeInfo chan<- api.IssueNodeCertificateResponse) (*SecurityConfig, error) {
|
||||
func LoadOrCreateSecurityConfig(ctx context.Context, baseCertDir, token, proposedRole string, remotes remotes.Remotes, nodeInfo chan<- api.IssueNodeCertificateResponse) (*SecurityConfig, error) {
|
||||
paths := NewConfigPaths(baseCertDir)
|
||||
|
||||
var (
|
||||
|
@ -217,7 +217,7 @@ func LoadOrCreateSecurityConfig(ctx context.Context, baseCertDir, token, propose
|
|||
// just been demoted, for example).
|
||||
|
||||
for i := 0; i != 5; i++ {
|
||||
rootCA, err = GetRemoteCA(ctx, d, picker)
|
||||
rootCA, err = GetRemoteCA(ctx, d, remotes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -267,7 +267,7 @@ func LoadOrCreateSecurityConfig(ctx context.Context, baseCertDir, token, propose
|
|||
} else {
|
||||
// There was an error loading our Credentials, let's get a new certificate issued
|
||||
// Last argument is nil because at this point we don't have any valid TLS creds
|
||||
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, paths.Node, token, picker, nil, nodeInfo)
|
||||
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, paths.Node, token, remotes, nil, nodeInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ func LoadOrCreateSecurityConfig(ctx context.Context, baseCertDir, token, propose
|
|||
|
||||
// RenewTLSConfig will continuously monitor for the necessity of renewing the local certificates, either by
|
||||
// issuing them locally if key-material is available, or requesting them from a remote CA.
|
||||
func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string, picker *picker.Picker, renew <-chan struct{}) <-chan CertificateUpdate {
|
||||
func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string, remotes remotes.Remotes, renew <-chan struct{}) <-chan CertificateUpdate {
|
||||
paths := NewConfigPaths(baseCertDir)
|
||||
updates := make(chan CertificateUpdate)
|
||||
|
||||
|
@ -344,7 +344,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string,
|
|||
tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx,
|
||||
paths.Node,
|
||||
"",
|
||||
picker,
|
||||
remotes,
|
||||
s.ClientTLSCreds,
|
||||
nil)
|
||||
if err != nil {
|
||||
|
|
|
@ -165,7 +165,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|||
nodes, err = store.FindNodes(tx, store.All)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing all services in store while trying to allocate during init: %v", err)
|
||||
return fmt.Errorf("error listing all nodes in store while trying to allocate during init: %v", err)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
|
@ -420,9 +420,9 @@ func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
|
|||
}
|
||||
|
||||
func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
|
||||
// If service is nil or if task network attachments have
|
||||
// already been filled in no need to do anything else.
|
||||
if s == nil || len(t.Networks) != 0 {
|
||||
// If task network attachments have already been filled in no
|
||||
// need to do anything else.
|
||||
if len(t.Networks) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -431,19 +431,31 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
|
|||
// The service to which this task belongs is trying to expose
|
||||
// ports to the external world. Automatically attach the task
|
||||
// to the ingress network.
|
||||
if s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0 {
|
||||
if s != nil && s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0 {
|
||||
networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
|
||||
}
|
||||
|
||||
a.store.View(func(tx store.ReadTx) {
|
||||
for _, na := range s.Spec.Networks {
|
||||
// Always prefer NetworkAttachmentConfig in the TaskSpec
|
||||
specNetworks := t.Spec.Networks
|
||||
if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
|
||||
specNetworks = s.Spec.Networks
|
||||
}
|
||||
|
||||
for _, na := range specNetworks {
|
||||
n := store.GetNetwork(tx, na.Target)
|
||||
if n != nil {
|
||||
var aliases []string
|
||||
var addresses []string
|
||||
|
||||
for _, a := range na.Aliases {
|
||||
aliases = append(aliases, a)
|
||||
}
|
||||
networks = append(networks, &api.NetworkAttachment{Network: n, Aliases: aliases})
|
||||
for _, a := range na.Addresses {
|
||||
addresses = append(addresses, a)
|
||||
}
|
||||
|
||||
networks = append(networks, &api.NetworkAttachment{Network: n, Aliases: aliases, Addresses: addresses})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -508,12 +520,12 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
|
|||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Populate network attachments in the task
|
||||
// based on service spec.
|
||||
a.taskCreateNetworkAttachments(t, s)
|
||||
}
|
||||
|
||||
// Populate network attachments in the task
|
||||
// based on service spec.
|
||||
a.taskCreateNetworkAttachments(t, s)
|
||||
|
||||
nc.unallocatedTasks[t.ID] = t
|
||||
}
|
||||
|
||||
|
|
|
@ -197,8 +197,14 @@ func (na *NetworkAllocator) ServiceAllocate(s *api.Service) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Always prefer NetworkAttachmentConfig in the TaskSpec
|
||||
specNetworks := s.Spec.Task.Networks
|
||||
if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
|
||||
specNetworks = s.Spec.Networks
|
||||
}
|
||||
|
||||
outer:
|
||||
for _, nAttach := range s.Spec.Networks {
|
||||
for _, nAttach := range specNetworks {
|
||||
for _, vip := range s.Endpoint.VirtualIPs {
|
||||
if vip.NetworkID == nAttach.Target {
|
||||
continue outer
|
||||
|
@ -286,7 +292,7 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
|
|||
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
|
||||
// If endpoint mode is VIP and allocator does not have the
|
||||
// service in VIP allocated set then it is not allocated.
|
||||
if len(s.Spec.Networks) != 0 &&
|
||||
if (len(s.Spec.Task.Networks) != 0 || len(s.Spec.Networks) != 0) &&
|
||||
(s.Spec.Endpoint == nil ||
|
||||
s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP) {
|
||||
if _, ok := na.services[s.ID]; !ok {
|
||||
|
@ -527,7 +533,11 @@ func (na *NetworkAllocator) allocateNetworkIPs(nAttach *api.NetworkAttachment) e
|
|||
var err error
|
||||
addr, _, err = net.ParseCIDR(rawAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
addr = net.ParseIP(rawAddr)
|
||||
|
||||
if addr == nil {
|
||||
return fmt.Errorf("could not parse address string %s: %v", rawAddr, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -171,7 +171,12 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe
|
|||
}
|
||||
|
||||
for _, s := range services {
|
||||
for _, na := range s.Spec.Networks {
|
||||
specNetworks := s.Spec.Task.Networks
|
||||
if len(specNetworks) == 0 {
|
||||
specNetworks = s.Spec.Networks
|
||||
}
|
||||
|
||||
for _, na := range specNetworks {
|
||||
if na.Target == request.NetworkID {
|
||||
return nil, grpc.Errorf(codes.FailedPrecondition, "network %s is in use", request.NetworkID)
|
||||
}
|
||||
|
|
|
@ -327,8 +327,20 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
|
|||
return nil
|
||||
}
|
||||
// temporary disable network update
|
||||
if request.Spec != nil && !reflect.DeepEqual(request.Spec.Networks, service.Spec.Networks) {
|
||||
return errNetworkUpdateNotSupported
|
||||
if request.Spec != nil {
|
||||
requestSpecNetworks := request.Spec.Task.Networks
|
||||
if len(requestSpecNetworks) == 0 {
|
||||
requestSpecNetworks = request.Spec.Networks
|
||||
}
|
||||
|
||||
specNetworks := service.Spec.Task.Networks
|
||||
if len(specNetworks) == 0 {
|
||||
specNetworks = service.Spec.Networks
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(requestSpecNetworks, specNetworks) {
|
||||
return errNetworkUpdateNotSupported
|
||||
}
|
||||
}
|
||||
|
||||
// orchestrator is designed to be stateless, so it should not deal
|
||||
|
|
|
@ -19,8 +19,8 @@ import (
|
|||
"github.com/docker/swarmkit/manager/state"
|
||||
"github.com/docker/swarmkit/manager/state/store"
|
||||
"github.com/docker/swarmkit/manager/state/watch"
|
||||
"github.com/docker/swarmkit/picker"
|
||||
"github.com/docker/swarmkit/protobuf/ptypes"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -153,7 +153,7 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
|
|||
// TODO(stevvooe): Calculate weight of manager selection based on
|
||||
// cluster-level observations, such as number of connections and
|
||||
// load.
|
||||
Weight: picker.DefaultObservationWeight,
|
||||
Weight: remotes.DefaultObservationWeight,
|
||||
})
|
||||
}
|
||||
return mgrs
|
||||
|
@ -209,7 +209,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
|
|||
for _, p := range peers {
|
||||
mgrs = append(mgrs, &api.WeightedPeer{
|
||||
Peer: p,
|
||||
Weight: picker.DefaultObservationWeight,
|
||||
Weight: remotes.DefaultObservationWeight,
|
||||
})
|
||||
}
|
||||
d.mu.Lock()
|
||||
|
@ -854,7 +854,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
|
|||
|
||||
for {
|
||||
// After each message send, we need to check the nodes sessionID hasn't
|
||||
// changed. If it has, we will the stream and make the node
|
||||
// changed. If it has, we will shut down the stream and make the node
|
||||
// re-register.
|
||||
node, err := d.nodes.GetWithSession(nodeID, sessionID)
|
||||
if err != nil {
|
||||
|
|
|
@ -86,7 +86,7 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// CheckRateLimit returs error if node with specified id is allowed to re-register
|
||||
// CheckRateLimit returns error if node with specified id is allowed to re-register
|
||||
// again.
|
||||
func (s *nodeStore) CheckRateLimit(id string) error {
|
||||
s.mu.Lock()
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/docker/swarmkit/manager/keymanager"
|
||||
"github.com/docker/swarmkit/manager/orchestrator"
|
||||
"github.com/docker/swarmkit/manager/raftpicker"
|
||||
"github.com/docker/swarmkit/manager/resourceapi"
|
||||
"github.com/docker/swarmkit/manager/scheduler"
|
||||
"github.com/docker/swarmkit/manager/state/raft"
|
||||
"github.com/docker/swarmkit/manager/state/store"
|
||||
|
@ -275,9 +276,12 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
}
|
||||
|
||||
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
|
||||
baseResourceAPI := resourceapi.New(m.RaftNode.MemoryStore())
|
||||
healthServer := health.NewHealthServer()
|
||||
localHealthServer := health.NewHealthServer()
|
||||
|
||||
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
|
||||
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
|
||||
authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.Dispatcher, authorize)
|
||||
authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
|
||||
authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
|
||||
|
@ -289,6 +293,7 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, cs, m.RaftNode, ca.WithMetadataForwardTLSInfo)
|
||||
proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, cs, m.RaftNode, ca.WithMetadataForwardTLSInfo)
|
||||
proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, cs, m.RaftNode, ca.WithMetadataForwardTLSInfo)
|
||||
proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, cs, m.RaftNode, ca.WithMetadataForwardTLSInfo)
|
||||
|
||||
// localProxyControlAPI is a special kind of proxy. It is only wired up
|
||||
// to receive requests from a trusted local socket, and these requests
|
||||
|
@ -306,10 +311,13 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
api.RegisterRaftServer(m.server, authenticatedRaftAPI)
|
||||
api.RegisterHealthServer(m.server, authenticatedHealthAPI)
|
||||
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
|
||||
api.RegisterControlServer(m.localserver, localProxyControlAPI)
|
||||
api.RegisterControlServer(m.server, authenticatedControlAPI)
|
||||
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
|
||||
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
|
||||
|
||||
api.RegisterControlServer(m.localserver, localProxyControlAPI)
|
||||
api.RegisterHealthServer(m.localserver, localHealthServer)
|
||||
|
||||
errServe := make(chan error, 2)
|
||||
for proto, l := range m.listeners {
|
||||
go m.serveListener(ctx, errServe, proto, l)
|
||||
|
@ -317,11 +325,14 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
|
||||
// Set the raft server as serving for the health server
|
||||
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
|
||||
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
|
||||
|
||||
defer func() {
|
||||
m.server.Stop()
|
||||
m.localserver.Stop()
|
||||
}()
|
||||
|
||||
if err := m.RaftNode.JoinAndStart(); err != nil {
|
||||
for _, lis := range m.listeners {
|
||||
lis.Close()
|
||||
}
|
||||
return fmt.Errorf("can't initialize raft node: %v", err)
|
||||
}
|
||||
|
||||
|
@ -336,13 +347,11 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
}()
|
||||
|
||||
if err := raft.WaitForLeader(ctx, m.RaftNode); err != nil {
|
||||
m.server.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := raft.WaitForCluster(ctx, m.RaftNode)
|
||||
if err != nil {
|
||||
m.server.Stop()
|
||||
return err
|
||||
}
|
||||
raftConfig := c.Spec.Raft
|
||||
|
@ -527,29 +536,31 @@ func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error {
|
|||
|
||||
}
|
||||
|
||||
// handleLeadershipEvents reads out and discards all of the messages when the manager is stopped,
|
||||
// otherwise it handles the is leader event or is follower event.
|
||||
// handleLeadershipEvents handles the is leader event or is follower event.
|
||||
func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
|
||||
for leadershipEvent := range leadershipCh {
|
||||
// read out and discard all of the messages when we've stopped
|
||||
// don't acquire the mutex yet. if stopped is closed, we don't need
|
||||
// this stops this loop from starving Run()'s attempt to Lock
|
||||
for {
|
||||
select {
|
||||
case <-m.stopped:
|
||||
continue
|
||||
default:
|
||||
// do nothing, we're not stopped
|
||||
}
|
||||
// we're not stopping so NOW acquire the mutex
|
||||
m.mu.Lock()
|
||||
newState := leadershipEvent.(raft.LeadershipState)
|
||||
case leadershipEvent := <-leadershipCh:
|
||||
m.mu.Lock()
|
||||
select {
|
||||
case <-m.stopped:
|
||||
m.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
newState := leadershipEvent.(raft.LeadershipState)
|
||||
|
||||
if newState == raft.IsLeader {
|
||||
m.becomeLeader(ctx)
|
||||
} else if newState == raft.IsFollower {
|
||||
m.becomeFollower()
|
||||
if newState == raft.IsLeader {
|
||||
m.becomeLeader(ctx)
|
||||
} else if newState == raft.IsFollower {
|
||||
m.becomeFollower()
|
||||
}
|
||||
m.mu.Unlock()
|
||||
case <-m.stopped:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -609,7 +620,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
|
|||
m.globalOrchestrator = orchestrator.NewGlobalOrchestrator(s)
|
||||
m.taskReaper = orchestrator.NewTaskReaper(s)
|
||||
m.scheduler = scheduler.New(s)
|
||||
m.keyManager = keymanager.New(m.RaftNode.MemoryStore(), keymanager.DefaultConfig())
|
||||
m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
|
||||
|
||||
// TODO(stevvooe): Allocate a context that can be used to
|
||||
// shutdown underlying manager processes when leadership is
|
||||
|
|
|
@ -288,16 +288,8 @@ func (u *Updater) updateTask(ctx context.Context, slot slot, updated *api.Task)
|
|||
return err
|
||||
}
|
||||
|
||||
u.removeOldTasks(ctx, batch, slot)
|
||||
|
||||
for _, t := range slot {
|
||||
if t.DesiredState == api.TaskStateRunning {
|
||||
// Wait for the old task to stop or time out, and then set the new one
|
||||
// to RUNNING.
|
||||
delayStartCh = u.restarts.DelayStart(ctx, nil, t, updated.ID, 0, true)
|
||||
break
|
||||
}
|
||||
}
|
||||
oldTask := u.removeOldTasks(ctx, batch, slot)
|
||||
delayStartCh = u.restarts.DelayStart(ctx, nil, oldTask, updated.ID, 0, true)
|
||||
|
||||
return nil
|
||||
|
||||
|
@ -333,34 +325,29 @@ func (u *Updater) useExistingTask(ctx context.Context, slot slot, existing *api.
|
|||
}
|
||||
}
|
||||
if len(removeTasks) != 0 || existing.DesiredState != api.TaskStateRunning {
|
||||
var delayStartCh <-chan struct{}
|
||||
_, err := u.store.Batch(func(batch *store.Batch) error {
|
||||
u.removeOldTasks(ctx, batch, removeTasks)
|
||||
oldTask := u.removeOldTasks(ctx, batch, removeTasks)
|
||||
|
||||
if existing.DesiredState != api.TaskStateRunning {
|
||||
err := batch.Update(func(tx store.Tx) error {
|
||||
t := store.GetTask(tx, existing.ID)
|
||||
if t == nil {
|
||||
return fmt.Errorf("task %s not found while trying to start it", existing.ID)
|
||||
}
|
||||
if t.DesiredState >= api.TaskStateRunning {
|
||||
return fmt.Errorf("task %s was already started when reached by updater", existing.ID)
|
||||
}
|
||||
t.DesiredState = api.TaskStateRunning
|
||||
return store.UpdateTask(tx, t)
|
||||
})
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("starting task %s failed", existing.ID)
|
||||
}
|
||||
delayStartCh = u.restarts.DelayStart(ctx, nil, oldTask, existing.ID, 0, true)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("updater batch transaction failed")
|
||||
}
|
||||
|
||||
if delayStartCh != nil {
|
||||
<-delayStartCh
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, removeTasks []*api.Task) {
|
||||
// removeOldTasks shuts down the given tasks and returns one of the tasks that
|
||||
// was shut down, or nil.
|
||||
func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, removeTasks []*api.Task) *api.Task {
|
||||
var removedTask *api.Task
|
||||
for _, original := range removeTasks {
|
||||
err := batch.Update(func(tx store.Tx) error {
|
||||
t := store.GetTask(tx, original.ID)
|
||||
|
@ -375,8 +362,12 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove
|
|||
})
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("shutting down stale task %s failed", original.ID)
|
||||
} else {
|
||||
removedTask = original
|
||||
}
|
||||
}
|
||||
|
||||
return removedTask
|
||||
}
|
||||
|
||||
func (u *Updater) isTaskDirty(t *api.Task) bool {
|
||||
|
|
124
vendor/src/github.com/docker/swarmkit/manager/resourceapi/allocator.go
vendored
Normal file
124
vendor/src/github.com/docker/swarmkit/manager/resourceapi/allocator.go
vendored
Normal file
|
@ -0,0 +1,124 @@
|
|||
package resourceapi
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/docker/swarmkit/api"
|
||||
"github.com/docker/swarmkit/ca"
|
||||
"github.com/docker/swarmkit/identity"
|
||||
"github.com/docker/swarmkit/manager/state/store"
|
||||
"github.com/docker/swarmkit/protobuf/ptypes"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidArgument = errors.New("invalid argument")
|
||||
)
|
||||
|
||||
// ResourceAllocator handles resource allocation of cluster entities.
|
||||
type ResourceAllocator struct {
|
||||
store *store.MemoryStore
|
||||
}
|
||||
|
||||
// New returns an instance of the allocator
|
||||
func New(store *store.MemoryStore) *ResourceAllocator {
|
||||
return &ResourceAllocator{store: store}
|
||||
}
|
||||
|
||||
// AttachNetwork allows the node to request the resources
|
||||
// allocation needed for a network attachment on the specific node.
|
||||
// - Returns `InvalidArgument` if the Spec is malformed.
|
||||
// - Returns `NotFound` if the Network is not found.
|
||||
// - Returns `PermissionDenied` if the Network is not manually attachable.
|
||||
// - Returns an error if the creation fails.
|
||||
func (ra *ResourceAllocator) AttachNetwork(ctx context.Context, request *api.AttachNetworkRequest) (*api.AttachNetworkResponse, error) {
|
||||
nodeInfo, err := ca.RemoteNode(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var network *api.Network
|
||||
ra.store.View(func(tx store.ReadTx) {
|
||||
network = store.GetNetwork(tx, request.Config.Target)
|
||||
if network == nil {
|
||||
if networks, err := store.FindNetworks(tx, store.ByName(request.Config.Target)); err == nil && len(networks) == 1 {
|
||||
network = networks[0]
|
||||
}
|
||||
}
|
||||
})
|
||||
if network == nil {
|
||||
return nil, grpc.Errorf(codes.NotFound, "network %s not found", request.Config.Target)
|
||||
}
|
||||
|
||||
if !network.Spec.Attachable {
|
||||
return nil, grpc.Errorf(codes.PermissionDenied, "network %s not manually attachable", request.Config.Target)
|
||||
}
|
||||
|
||||
t := &api.Task{
|
||||
ID: identity.NewID(),
|
||||
NodeID: nodeInfo.NodeID,
|
||||
Spec: api.TaskSpec{
|
||||
Runtime: &api.TaskSpec_Attachment{
|
||||
Attachment: &api.NetworkAttachmentSpec{
|
||||
ContainerID: request.ContainerID,
|
||||
},
|
||||
},
|
||||
Networks: []*api.NetworkAttachmentConfig{
|
||||
{
|
||||
Target: network.ID,
|
||||
Addresses: request.Config.Addresses,
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: api.TaskStatus{
|
||||
State: api.TaskStateNew,
|
||||
Timestamp: ptypes.MustTimestampProto(time.Now()),
|
||||
Message: "created",
|
||||
},
|
||||
DesiredState: api.TaskStateRunning,
|
||||
// TODO: Add Network attachment.
|
||||
}
|
||||
|
||||
if err := ra.store.Update(func(tx store.Tx) error {
|
||||
return store.CreateTask(tx, t)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &api.AttachNetworkResponse{AttachmentID: t.ID}, nil
|
||||
}
|
||||
|
||||
// DetachNetwork allows the node to request the release of
|
||||
// the resources associated to the network attachment.
|
||||
// - Returns `InvalidArgument` if attachment ID is not provided.
|
||||
// - Returns `NotFound` if the attachment is not found.
|
||||
// - Returns an error if the deletion fails.
|
||||
func (ra *ResourceAllocator) DetachNetwork(ctx context.Context, request *api.DetachNetworkRequest) (*api.DetachNetworkResponse, error) {
|
||||
if request.AttachmentID == "" {
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
|
||||
}
|
||||
|
||||
nodeInfo, err := ca.RemoteNode(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ra.store.Update(func(tx store.Tx) error {
|
||||
t := store.GetTask(tx, request.AttachmentID)
|
||||
if t == nil {
|
||||
return grpc.Errorf(codes.NotFound, "attachment %s not found", request.AttachmentID)
|
||||
}
|
||||
if t.NodeID != nodeInfo.NodeID {
|
||||
return grpc.Errorf(codes.PermissionDenied, "attachment %s doesn't belong to this node", request.AttachmentID)
|
||||
}
|
||||
|
||||
return store.DeleteTask(tx, request.AttachmentID)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &api.DetachNetworkResponse{}, nil
|
||||
}
|
|
@ -33,7 +33,8 @@ type Cluster struct {
|
|||
|
||||
// removed contains the list of removed Members,
|
||||
// those ids cannot be reused
|
||||
removed map[uint64]bool
|
||||
removed map[uint64]bool
|
||||
heartbeatTicks int
|
||||
|
||||
PeersBroadcast *watch.Queue
|
||||
}
|
||||
|
@ -43,21 +44,40 @@ type Member struct {
|
|||
*api.RaftMember
|
||||
|
||||
api.RaftClient
|
||||
Conn *grpc.ClientConn
|
||||
Conn *grpc.ClientConn
|
||||
tick int
|
||||
active bool
|
||||
}
|
||||
|
||||
// NewCluster creates a new Cluster neighbors
|
||||
// list for a raft Member
|
||||
func NewCluster() *Cluster {
|
||||
// NewCluster creates a new Cluster neighbors list for a raft Member.
|
||||
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
|
||||
func NewCluster(heartbeatTicks int) *Cluster {
|
||||
// TODO(abronan): generate Cluster ID for federation
|
||||
|
||||
return &Cluster{
|
||||
members: make(map[uint64]*Member),
|
||||
removed: make(map[uint64]bool),
|
||||
heartbeatTicks: heartbeatTicks,
|
||||
PeersBroadcast: watch.NewQueue(),
|
||||
}
|
||||
}
|
||||
|
||||
// Tick increases ticks for all members. After heartbeatTicks node marked as
|
||||
// inactive.
|
||||
func (c *Cluster) Tick() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for _, m := range c.members {
|
||||
if !m.active {
|
||||
continue
|
||||
}
|
||||
m.tick++
|
||||
if m.tick > c.heartbeatTicks {
|
||||
m.active = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Members returns the list of raft Members in the Cluster.
|
||||
func (c *Cluster) Members() map[uint64]*Member {
|
||||
members := make(map[uint64]*Member)
|
||||
|
@ -106,26 +126,42 @@ func (c *Cluster) AddMember(member *Member) error {
|
|||
if c.removed[member.RaftID] {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
member.active = true
|
||||
member.tick = 0
|
||||
|
||||
c.members[member.RaftID] = member
|
||||
|
||||
c.broadcastUpdate()
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveMember removes a node from the Cluster Memberlist.
|
||||
// RemoveMember removes a node from the Cluster Memberlist, and adds it to
|
||||
// the removed list.
|
||||
func (c *Cluster) RemoveMember(id uint64) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.removed[id] = true
|
||||
|
||||
if c.members[id] != nil {
|
||||
conn := c.members[id].Conn
|
||||
if conn != nil {
|
||||
_ = conn.Close()
|
||||
return c.clearMember(id)
|
||||
}
|
||||
|
||||
// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
|
||||
// to the removed list.
|
||||
func (c *Cluster) ClearMember(id uint64) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
return c.clearMember(id)
|
||||
}
|
||||
|
||||
func (c *Cluster) clearMember(id uint64) error {
|
||||
m, ok := c.members[id]
|
||||
if ok {
|
||||
if m.Conn != nil {
|
||||
m.Conn.Close()
|
||||
}
|
||||
delete(c.members, id)
|
||||
}
|
||||
|
||||
c.removed[id] = true
|
||||
c.broadcastUpdate()
|
||||
return nil
|
||||
}
|
||||
|
@ -152,7 +188,6 @@ func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *M
|
|||
newMember := *oldMember
|
||||
newMember.Conn = newConn.Conn
|
||||
newMember.RaftClient = newConn.RaftClient
|
||||
|
||||
c.members[id] = &newMember
|
||||
|
||||
return nil
|
||||
|
@ -168,11 +203,40 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
|
|||
// Clear resets the list of active Members and removed Members.
|
||||
func (c *Cluster) Clear() {
|
||||
c.mu.Lock()
|
||||
for _, member := range c.members {
|
||||
if member.Conn != nil {
|
||||
member.Conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
c.members = make(map[uint64]*Member)
|
||||
c.removed = make(map[uint64]bool)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// ReportActive reports that member is acive (called ProcessRaftMessage),
|
||||
func (c *Cluster) ReportActive(id uint64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
m, ok := c.members[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
m.tick = 0
|
||||
m.active = true
|
||||
}
|
||||
|
||||
// Active returns true if node is active.
|
||||
func (c *Cluster) Active(id uint64) bool {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
m, ok := c.members[id]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return m.active
|
||||
}
|
||||
|
||||
// ValidateConfigurationChange takes a proposed ConfChange and
|
||||
// ensures that it is valid.
|
||||
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
|
@ -223,8 +287,7 @@ func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
|
|||
continue
|
||||
}
|
||||
|
||||
connState, err := m.Conn.State()
|
||||
if err == nil && connState == grpc.Ready {
|
||||
if c.Active(m.RaftID) {
|
||||
nreachable++
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,6 @@ type Node struct {
|
|||
|
||||
Address string
|
||||
StateDir string
|
||||
Error error
|
||||
|
||||
raftStore *raft.MemoryStorage
|
||||
memoryStore *store.MemoryStore
|
||||
|
@ -119,6 +118,7 @@ type Node struct {
|
|||
leadershipBroadcast *watch.Queue
|
||||
|
||||
// used to coordinate shutdown
|
||||
// Lock should be used only in stop(), all other functions should use RLock.
|
||||
stopMu sync.RWMutex
|
||||
// used for membership management checks
|
||||
membershipLock sync.Mutex
|
||||
|
@ -176,7 +176,7 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
|
|||
n := &Node{
|
||||
Ctx: ctx,
|
||||
cancel: cancel,
|
||||
cluster: membership.NewCluster(),
|
||||
cluster: membership.NewCluster(cfg.ElectionTick),
|
||||
tlsCredentials: opts.TLSCredentials,
|
||||
raftStore: raftStore,
|
||||
Address: opts.Addr,
|
||||
|
@ -224,10 +224,15 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
|
|||
}
|
||||
|
||||
// JoinAndStart joins and starts the raft server
|
||||
func (n *Node) JoinAndStart() error {
|
||||
func (n *Node) JoinAndStart() (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
n.done()
|
||||
}
|
||||
}()
|
||||
|
||||
loadAndStartErr := n.loadAndStart(n.Ctx, n.opts.ForceNewCluster)
|
||||
if loadAndStartErr != nil && loadAndStartErr != errNoWAL {
|
||||
n.ticker.Stop()
|
||||
return loadAndStartErr
|
||||
}
|
||||
|
||||
|
@ -270,6 +275,9 @@ func (n *Node) JoinAndStart() error {
|
|||
n.Node = raft.StartNode(n.Config, []raft.Peer{})
|
||||
|
||||
if err := n.registerNodes(resp.Members); err != nil {
|
||||
if walErr := n.wal.Close(); err != nil {
|
||||
n.Config.Logger.Errorf("raft: error closing WAL: %v", walErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
|
@ -281,6 +289,9 @@ func (n *Node) JoinAndStart() error {
|
|||
}
|
||||
n.Node = raft.StartNode(n.Config, []raft.Peer{peer})
|
||||
if err := n.Campaign(n.Ctx); err != nil {
|
||||
if walErr := n.wal.Close(); err != nil {
|
||||
n.Config.Logger.Errorf("raft: error closing WAL: %v", walErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -324,15 +335,24 @@ func (n *Node) MemoryStore() *store.MemoryStore {
|
|||
return n.memoryStore
|
||||
}
|
||||
|
||||
func (n *Node) done() {
|
||||
n.cluster.Clear()
|
||||
|
||||
n.ticker.Stop()
|
||||
n.leadershipBroadcast.Close()
|
||||
n.cluster.PeersBroadcast.Close()
|
||||
n.memoryStore.Close()
|
||||
|
||||
close(n.doneCh)
|
||||
}
|
||||
|
||||
// Run is the main loop for a Raft node, it goes along the state machine,
|
||||
// acting on the messages received from other Raft nodes in the cluster.
|
||||
//
|
||||
// Before running the main loop, it first starts the raft node based on saved
|
||||
// cluster state. If no saved state exists, it starts a single-node cluster.
|
||||
func (n *Node) Run(ctx context.Context) error {
|
||||
defer func() {
|
||||
close(n.doneCh)
|
||||
}()
|
||||
defer n.done()
|
||||
|
||||
wasLeader := false
|
||||
|
||||
|
@ -340,7 +360,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|||
select {
|
||||
case <-n.ticker.C():
|
||||
n.Tick()
|
||||
|
||||
n.cluster.Tick()
|
||||
case rd := <-n.Ready():
|
||||
raftConfig := DefaultRaftConfig()
|
||||
n.memoryStore.View(func(readTx store.ReadTx) {
|
||||
|
@ -453,9 +473,6 @@ func (n *Node) Run(ctx context.Context) error {
|
|||
return ErrMemberRemoved
|
||||
case <-n.stopCh:
|
||||
n.stop()
|
||||
n.leadershipBroadcast.Close()
|
||||
n.cluster.PeersBroadcast.Close()
|
||||
n.memoryStore.Close()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -481,13 +498,6 @@ func (n *Node) stop() {
|
|||
n.waitProp.Wait()
|
||||
n.asyncTasks.Wait()
|
||||
|
||||
members := n.cluster.Members()
|
||||
for _, member := range members {
|
||||
if member.Conn != nil {
|
||||
_ = member.Conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
n.Stop()
|
||||
n.ticker.Stop()
|
||||
if err := n.wal.Close(); err != nil {
|
||||
|
@ -517,20 +527,26 @@ func (n *Node) IsLeader() bool {
|
|||
return n.isLeader()
|
||||
}
|
||||
|
||||
// leader returns the id of the leader, without the protection of lock
|
||||
// leader returns the id of the leader, without the protection of lock and
|
||||
// membership check, so it's caller task.
|
||||
func (n *Node) leader() uint64 {
|
||||
if !n.IsMember() {
|
||||
return 0
|
||||
}
|
||||
return n.Node.Status().Lead
|
||||
}
|
||||
|
||||
// Leader returns the id of the leader, with the protection of lock
|
||||
func (n *Node) Leader() uint64 {
|
||||
func (n *Node) Leader() (uint64, error) {
|
||||
n.stopMu.RLock()
|
||||
defer n.stopMu.RUnlock()
|
||||
|
||||
return n.leader()
|
||||
if !n.IsMember() {
|
||||
return 0, ErrNoRaftMember
|
||||
}
|
||||
leader := n.leader()
|
||||
if leader == 0 {
|
||||
return 0, ErrNoClusterLeader
|
||||
}
|
||||
|
||||
return leader, nil
|
||||
}
|
||||
|
||||
// ReadyForProposals returns true if the node has broadcasted a message
|
||||
|
@ -760,6 +776,8 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
|||
return nil, ErrMemberRemoved
|
||||
}
|
||||
|
||||
n.cluster.ReportActive(msg.Message.From)
|
||||
|
||||
if msg.Message.Type == raftpb.MsgProp {
|
||||
// We don't accepted forwarded proposals. Our
|
||||
// current architecture depends on only the leader
|
||||
|
@ -923,15 +941,17 @@ func (n *Node) SubscribePeers() (q chan events.Event, cancel func()) {
|
|||
func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
|
||||
memberlist := make(map[uint64]*api.RaftMember)
|
||||
members := n.cluster.Members()
|
||||
leaderID := n.Leader()
|
||||
leaderID, err := n.Leader()
|
||||
if err != nil {
|
||||
leaderID = 0
|
||||
}
|
||||
|
||||
for id, member := range members {
|
||||
reachability := api.RaftMemberStatus_REACHABLE
|
||||
leader := false
|
||||
|
||||
if member.RaftID != n.Config.ID {
|
||||
connState, err := member.Conn.State()
|
||||
if err != nil || connState != grpc.Ready {
|
||||
if !n.cluster.Active(member.RaftID) {
|
||||
reachability = api.RaftMemberStatus_UNREACHABLE
|
||||
}
|
||||
}
|
||||
|
@ -1111,7 +1131,10 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
|
|||
if err != nil {
|
||||
n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err)
|
||||
} else {
|
||||
n.cluster.ReplaceMemberConnection(m.To, conn, newConn)
|
||||
err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn)
|
||||
if err != nil {
|
||||
newConn.Conn.Close()
|
||||
}
|
||||
}
|
||||
} else if m.Type == raftpb.MsgSnap {
|
||||
n.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||
|
@ -1129,12 +1152,14 @@ type applyResult struct {
|
|||
// an error or until the raft node finalizes all the proposals on node
|
||||
// shutdown.
|
||||
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
|
||||
n.waitProp.Add(1)
|
||||
defer n.waitProp.Done()
|
||||
|
||||
n.stopMu.RLock()
|
||||
if !n.canSubmitProposal() {
|
||||
n.stopMu.RUnlock()
|
||||
return nil, ErrStopped
|
||||
}
|
||||
n.waitProp.Add(1)
|
||||
defer n.waitProp.Done()
|
||||
n.stopMu.RUnlock()
|
||||
|
||||
r.ID = n.reqIDGen.Next()
|
||||
|
||||
|
|
|
@ -445,18 +445,24 @@ func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
n.cluster.Clear()
|
||||
oldMembers := n.cluster.Members()
|
||||
|
||||
if !forceNewCluster {
|
||||
for _, member := range snapshot.Membership.Members {
|
||||
if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(oldMembers, member.RaftID)
|
||||
}
|
||||
}
|
||||
|
||||
for _, removedMember := range snapshot.Membership.Removed {
|
||||
n.cluster.RemoveMember(removedMember)
|
||||
delete(oldMembers, removedMember)
|
||||
}
|
||||
|
||||
for member := range oldMembers {
|
||||
n.cluster.ClearMember(member)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -35,19 +35,19 @@ func Register(server *grpc.Server, node *Node) {
|
|||
// WaitForLeader waits until node observe some leader in cluster. It returns
|
||||
// error if ctx was cancelled before leader appeared.
|
||||
func WaitForLeader(ctx context.Context, n *Node) error {
|
||||
l := n.Leader()
|
||||
if l != 0 {
|
||||
_, err := n.Leader()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
ticker := time.NewTicker(50 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for l == 0 {
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
l = n.Leader()
|
||||
_, err = n.Leader()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package picker
|
||||
package remotes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -8,9 +8,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/docker/swarmkit/api"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")
|
||||
|
@ -204,134 +201,3 @@ func (mwr *remotesWeightedRandom) observe(peer api.Peer, weight float64) {
|
|||
|
||||
mwr.remotes[peer] = int(math.Ceil(wn))
|
||||
}
|
||||
|
||||
// Picker implements a grpc Picker
|
||||
type Picker struct {
|
||||
r Remotes
|
||||
peer api.Peer // currently selected remote peer
|
||||
conn *grpc.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var _ grpc.Picker = &Picker{}
|
||||
|
||||
// NewPicker returns a Picker
|
||||
func NewPicker(r Remotes, initial ...string) *Picker {
|
||||
var peer api.Peer
|
||||
if len(initial) == 0 {
|
||||
peer, _ = r.Select() // empty in case of error
|
||||
} else {
|
||||
peer = api.Peer{Addr: initial[0]}
|
||||
}
|
||||
return &Picker{r: r, peer: peer}
|
||||
}
|
||||
|
||||
// Init does initial processing for the Picker, e.g., initiate some connections.
|
||||
func (p *Picker) Init(cc *grpc.ClientConn) error {
|
||||
p.mu.Lock()
|
||||
peer := p.peer
|
||||
p.mu.Unlock()
|
||||
|
||||
p.r.ObserveIfExists(peer, DefaultObservationWeight)
|
||||
c, err := grpc.NewConn(cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.conn = c
|
||||
p.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
|
||||
// or some error happens.
|
||||
func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||||
p.mu.Lock()
|
||||
peer := p.peer
|
||||
p.mu.Unlock()
|
||||
transport, err := p.conn.Wait(ctx)
|
||||
if err != nil {
|
||||
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
|
||||
}
|
||||
|
||||
return transport, err
|
||||
}
|
||||
|
||||
// PickAddr picks a peer address for connecting. This will be called repeated for
|
||||
// connecting/reconnecting.
|
||||
func (p *Picker) PickAddr() (string, error) {
|
||||
p.mu.Lock()
|
||||
peer := p.peer
|
||||
p.mu.Unlock()
|
||||
|
||||
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr
|
||||
|
||||
var err error
|
||||
peer, err = p.r.Select()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.peer = peer
|
||||
p.mu.Unlock()
|
||||
return peer.Addr, err
|
||||
}
|
||||
|
||||
// State returns the connectivity state of the underlying connections.
|
||||
func (p *Picker) State() (grpc.ConnectivityState, error) {
|
||||
return p.conn.State(), nil
|
||||
}
|
||||
|
||||
// WaitForStateChange blocks until the state changes to something other than
|
||||
// the sourceState. It returns the new state or error.
|
||||
func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
|
||||
p.mu.Lock()
|
||||
conn := p.conn
|
||||
peer := p.peer
|
||||
p.mu.Unlock()
|
||||
|
||||
state, err := conn.WaitForStateChange(ctx, sourceState)
|
||||
if err != nil {
|
||||
return state, err
|
||||
}
|
||||
|
||||
// TODO(stevvooe): We may want to actually score the transition by checking
|
||||
// sourceState.
|
||||
|
||||
// TODO(stevvooe): This is questionable, but we'll see how it works.
|
||||
switch state {
|
||||
case grpc.Idle:
|
||||
p.r.ObserveIfExists(peer, DefaultObservationWeight)
|
||||
case grpc.Connecting:
|
||||
p.r.ObserveIfExists(peer, DefaultObservationWeight)
|
||||
case grpc.Ready:
|
||||
p.r.ObserveIfExists(peer, DefaultObservationWeight)
|
||||
case grpc.TransientFailure:
|
||||
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
|
||||
case grpc.Shutdown:
|
||||
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
|
||||
}
|
||||
|
||||
return state, err
|
||||
}
|
||||
|
||||
// Reset the current connection and force a reconnect to another address.
|
||||
func (p *Picker) Reset() error {
|
||||
p.mu.Lock()
|
||||
conn := p.conn
|
||||
p.mu.Unlock()
|
||||
|
||||
conn.NotifyReset()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes all the Conn's owned by this Picker.
|
||||
func (p *Picker) Close() error {
|
||||
p.mu.Lock()
|
||||
conn := p.conn
|
||||
p.mu.Unlock()
|
||||
|
||||
return conn.Close()
|
||||
}
|
|
@ -17,7 +17,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/google/certificate-transparency/go"
|
||||
"github.com/mreiferson/go-httpclient"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -25,6 +24,7 @@ import (
|
|||
const (
|
||||
AddChainPath = "/ct/v1/add-chain"
|
||||
AddPreChainPath = "/ct/v1/add-pre-chain"
|
||||
AddJSONPath = "/ct/v1/add-json"
|
||||
GetSTHPath = "/ct/v1/get-sth"
|
||||
GetEntriesPath = "/ct/v1/get-entries"
|
||||
)
|
||||
|
@ -57,6 +57,12 @@ type addChainResponse struct {
|
|||
Signature string `json:"signature"` // Log signature for this SCT
|
||||
}
|
||||
|
||||
// addJSONRequest represents the JSON request body sent ot the add-json CT
|
||||
// method.
|
||||
type addJSONRequest struct {
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
// getSTHResponse respresents the JSON response to the get-sth CT method
|
||||
type getSTHResponse struct {
|
||||
TreeSize uint64 `json:"tree_size"` // Number of certs in the current tree
|
||||
|
@ -102,18 +108,12 @@ type getEntryAndProofResponse struct {
|
|||
// New constructs a new LogClient instance.
|
||||
// |uri| is the base URI of the CT log instance to interact with, e.g.
|
||||
// http://ct.googleapis.com/pilot
|
||||
func New(uri string) *LogClient {
|
||||
var c LogClient
|
||||
c.uri = uri
|
||||
transport := &httpclient.Transport{
|
||||
ConnectTimeout: 10 * time.Second,
|
||||
RequestTimeout: 30 * time.Second,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
DisableKeepAlives: false,
|
||||
// |hc| is the underlying client to be used for HTTP requests to the CT log.
|
||||
func New(uri string, hc *http.Client) *LogClient {
|
||||
if hc == nil {
|
||||
hc = new(http.Client)
|
||||
}
|
||||
c.httpClient = &http.Client{Transport: transport}
|
||||
return &c
|
||||
return &LogClient{uri: uri, httpClient: hc}
|
||||
}
|
||||
|
||||
// Makes a HTTP call to |uri|, and attempts to parse the response as a JSON
|
||||
|
@ -154,7 +154,6 @@ func (c *LogClient) postAndParse(uri string, req interface{}, res interface{}) (
|
|||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
httpReq.Header.Set("Keep-Alive", "timeout=15, max=100")
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
// Read all of the body, if there is one, so that the http.Client can do
|
||||
|
@ -277,6 +276,37 @@ func (c *LogClient) AddChainWithContext(ctx context.Context, chain []ct.ASN1Cert
|
|||
return c.addChainWithRetry(ctx, AddChainPath, chain)
|
||||
}
|
||||
|
||||
func (c *LogClient) AddJSON(data interface{}) (*ct.SignedCertificateTimestamp, error) {
|
||||
req := addJSONRequest{
|
||||
Data: data,
|
||||
}
|
||||
var resp addChainResponse
|
||||
_, _, err := c.postAndParse(c.uri+AddJSONPath, &req, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawLogID, err := base64.StdEncoding.DecodeString(resp.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawSignature, err := base64.StdEncoding.DecodeString(resp.Signature)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ds, err := ct.UnmarshalDigitallySigned(bytes.NewReader(rawSignature))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var logID ct.SHA256Hash
|
||||
copy(logID[:], rawLogID)
|
||||
return &ct.SignedCertificateTimestamp{
|
||||
SCTVersion: resp.SCTVersion,
|
||||
LogID: logID,
|
||||
Timestamp: resp.Timestamp,
|
||||
Extensions: ct.CTExtensions(resp.Extensions),
|
||||
Signature: *ds}, nil
|
||||
}
|
||||
|
||||
// GetSTH retrieves the current STH from the log.
|
||||
// Returns a populated SignedTreeHead, or a non-nil error.
|
||||
func (c *LogClient) GetSTH() (sth *ct.SignedTreeHead, err error) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"container/list"
|
||||
"crypto"
|
||||
"encoding/asn1"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -23,6 +24,8 @@ const (
|
|||
const (
|
||||
MaxCertificateLength = (1 << 24) - 1
|
||||
MaxExtensionsLength = (1 << 16) - 1
|
||||
MaxSCTInListLength = (1 << 16) - 1
|
||||
MaxSCTListLength = (1 << 16) - 1
|
||||
)
|
||||
|
||||
func writeUint(w io.Writer, value uint64, numBytes int) error {
|
||||
|
@ -80,13 +83,12 @@ func readVarBytes(r io.Reader, numLenBytes int) ([]byte, error) {
|
|||
return nil, err
|
||||
}
|
||||
data := make([]byte, l)
|
||||
n, err := r.Read(data)
|
||||
if err != nil {
|
||||
if n, err := io.ReadFull(r, data); err != nil {
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return nil, fmt.Errorf("short read: expected %d but got %d", l, n)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if n != int(l) {
|
||||
return nil, fmt.Errorf("short read: expected %d but got %d", l, n)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
|
@ -510,3 +512,52 @@ func SerializeSTHSignatureInput(sth SignedTreeHead) ([]byte, error) {
|
|||
return nil, fmt.Errorf("unsupported STH version %d", sth.Version)
|
||||
}
|
||||
}
|
||||
|
||||
// SCTListSerializedLength determines the length of the required buffer should a SCT List need to be serialized
|
||||
func SCTListSerializedLength(scts []SignedCertificateTimestamp) (int, error) {
|
||||
if len(scts) == 0 {
|
||||
return 0, fmt.Errorf("SCT List empty")
|
||||
}
|
||||
|
||||
sctListLen := 0
|
||||
for i, sct := range scts {
|
||||
n, err := sct.SerializedLength()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("unable to determine length of SCT in position %d: %v", i, err)
|
||||
}
|
||||
if n > MaxSCTInListLength {
|
||||
return 0, fmt.Errorf("SCT in position %d too large: %d", i, n)
|
||||
}
|
||||
sctListLen += 2 + n
|
||||
}
|
||||
|
||||
return sctListLen, nil
|
||||
}
|
||||
|
||||
// SerializeSCTList serializes the passed-in slice of SignedCertificateTimestamp into a
|
||||
// byte slice as a SignedCertificateTimestampList (see RFC6962 Section 3.3)
|
||||
func SerializeSCTList(scts []SignedCertificateTimestamp) ([]byte, error) {
|
||||
size, err := SCTListSerializedLength(scts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fullSize := 2 + size // 2 bytes for length + size of SCT list
|
||||
if fullSize > MaxSCTListLength {
|
||||
return nil, fmt.Errorf("SCT List too large to serialize: %d", fullSize)
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Grow(fullSize)
|
||||
if err = writeUint(buf, uint64(size), 2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, sct := range scts {
|
||||
serialized, err := SerializeSCT(sct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = writeVarBytes(buf, serialized, 2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return asn1.Marshal(buf.Bytes()) // transform to Octet String
|
||||
}
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
*.sw[op]
|
|
@ -1,11 +0,0 @@
|
|||
language: go
|
||||
go:
|
||||
- 1.1
|
||||
install:
|
||||
- go get github.com/bmizerany/assert
|
||||
script:
|
||||
- pushd $TRAVIS_BUILD_DIR
|
||||
- go test
|
||||
- popd
|
||||
notifications:
|
||||
email: false
|
|
@ -1,41 +0,0 @@
|
|||
## go-httpclient
|
||||
|
||||
**requires Go 1.1+** as of `v0.4.0` the API has been completely re-written for Go 1.1 (for a Go
|
||||
1.0.x compatible release see [1adef50](https://github.com/mreiferson/go-httpclient/tree/1adef50))
|
||||
|
||||
[](http://travis-ci.org/mreiferson/go-httpclient)
|
||||
|
||||
Provides an HTTP Transport that implements the `RoundTripper` interface and
|
||||
can be used as a built in replacement for the standard library's, providing:
|
||||
|
||||
* connection timeouts
|
||||
* request timeouts
|
||||
|
||||
This is a thin wrapper around `http.Transport` that sets dial timeouts and uses
|
||||
Go's internal timer scheduler to call the Go 1.1+ `CancelRequest()` API.
|
||||
|
||||
### Example
|
||||
|
||||
```go
|
||||
transport := &httpclient.Transport{
|
||||
ConnectTimeout: 1*time.Second,
|
||||
RequestTimeout: 10*time.Second,
|
||||
ResponseHeaderTimeout: 5*time.Second,
|
||||
}
|
||||
defer transport.Close()
|
||||
|
||||
client := &http.Client{Transport: transport}
|
||||
req, _ := http.NewRequest("GET", "http://127.0.0.1/test", nil)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
```
|
||||
|
||||
*Note:* you will want to re-use a single client object rather than creating one for each request, otherwise you will end up [leaking connections](https://code.google.com/p/go/issues/detail?id=4049#c3).
|
||||
|
||||
### Reference Docs
|
||||
|
||||
For API docs see [godoc](http://godoc.org/github.com/mreiferson/go-httpclient).
|
|
@ -1,237 +0,0 @@
|
|||
/*
|
||||
Provides an HTTP Transport that implements the `RoundTripper` interface and
|
||||
can be used as a built in replacement for the standard library's, providing:
|
||||
|
||||
* connection timeouts
|
||||
* request timeouts
|
||||
|
||||
This is a thin wrapper around `http.Transport` that sets dial timeouts and uses
|
||||
Go's internal timer scheduler to call the Go 1.1+ `CancelRequest()` API.
|
||||
*/
|
||||
package httpclient
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// returns the current version of the package
|
||||
func Version() string {
|
||||
return "0.4.1"
|
||||
}
|
||||
|
||||
// Transport implements the RoundTripper interface and can be used as a replacement
|
||||
// for Go's built in http.Transport implementing end-to-end request timeouts.
|
||||
//
|
||||
// transport := &httpclient.Transport{
|
||||
// ConnectTimeout: 1*time.Second,
|
||||
// ResponseHeaderTimeout: 5*time.Second,
|
||||
// RequestTimeout: 10*time.Second,
|
||||
// }
|
||||
// defer transport.Close()
|
||||
//
|
||||
// client := &http.Client{Transport: transport}
|
||||
// req, _ := http.NewRequest("GET", "http://127.0.0.1/test", nil)
|
||||
// resp, err := client.Do(req)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// defer resp.Body.Close()
|
||||
//
|
||||
type Transport struct {
|
||||
// Proxy specifies a function to return a proxy for a given
|
||||
// *http.Request. If the function returns a non-nil error, the
|
||||
// request is aborted with the provided error.
|
||||
// If Proxy is nil or returns a nil *url.URL, no proxy is used.
|
||||
Proxy func(*http.Request) (*url.URL, error)
|
||||
|
||||
// Dial specifies the dial function for creating TCP
|
||||
// connections. This will override the Transport's ConnectTimeout and
|
||||
// ReadWriteTimeout settings.
|
||||
// If Dial is nil, a dialer is generated on demand matching the Transport's
|
||||
// options.
|
||||
Dial func(network, addr string) (net.Conn, error)
|
||||
|
||||
// TLSClientConfig specifies the TLS configuration to use with
|
||||
// tls.Client. If nil, the default configuration is used.
|
||||
TLSClientConfig *tls.Config
|
||||
|
||||
// DisableKeepAlives, if true, prevents re-use of TCP connections
|
||||
// between different HTTP requests.
|
||||
DisableKeepAlives bool
|
||||
|
||||
// DisableCompression, if true, prevents the Transport from
|
||||
// requesting compression with an "Accept-Encoding: gzip"
|
||||
// request header when the Request contains no existing
|
||||
// Accept-Encoding value. If the Transport requests gzip on
|
||||
// its own and gets a gzipped response, it's transparently
|
||||
// decoded in the Response.Body. However, if the user
|
||||
// explicitly requested gzip it is not automatically
|
||||
// uncompressed.
|
||||
DisableCompression bool
|
||||
|
||||
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
|
||||
// (keep-alive) to keep per-host. If zero,
|
||||
// http.DefaultMaxIdleConnsPerHost is used.
|
||||
MaxIdleConnsPerHost int
|
||||
|
||||
// ConnectTimeout, if non-zero, is the maximum amount of time a dial will wait for
|
||||
// a connect to complete.
|
||||
ConnectTimeout time.Duration
|
||||
|
||||
// ResponseHeaderTimeout, if non-zero, specifies the amount of
|
||||
// time to wait for a server's response headers after fully
|
||||
// writing the request (including its body, if any). This
|
||||
// time does not include the time to read the response body.
|
||||
ResponseHeaderTimeout time.Duration
|
||||
|
||||
// RequestTimeout, if non-zero, specifies the amount of time for the entire
|
||||
// request to complete (including all of the above timeouts + entire response body).
|
||||
// This should never be less than the sum total of the above two timeouts.
|
||||
RequestTimeout time.Duration
|
||||
|
||||
// ReadWriteTimeout, if non-zero, will set a deadline for every Read and
|
||||
// Write operation on the request connection.
|
||||
ReadWriteTimeout time.Duration
|
||||
|
||||
// TCPWriteBufferSize, the size of the operating system's write
|
||||
// buffer associated with the connection.
|
||||
TCPWriteBufferSize int
|
||||
|
||||
// TCPReadBuffserSize, the size of the operating system's read
|
||||
// buffer associated with the connection.
|
||||
TCPReadBufferSize int
|
||||
|
||||
starter sync.Once
|
||||
transport *http.Transport
|
||||
}
|
||||
|
||||
// Close cleans up the Transport, currently a no-op
|
||||
func (t *Transport) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Transport) lazyStart() {
|
||||
if t.Dial == nil {
|
||||
t.Dial = func(netw, addr string) (net.Conn, error) {
|
||||
c, err := net.DialTimeout(netw, addr, t.ConnectTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if t.TCPReadBufferSize != 0 || t.TCPWriteBufferSize != 0 {
|
||||
if tcpCon, ok := c.(*net.TCPConn); ok {
|
||||
if t.TCPWriteBufferSize != 0 {
|
||||
if err = tcpCon.SetWriteBuffer(t.TCPWriteBufferSize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if t.TCPReadBufferSize != 0 {
|
||||
if err = tcpCon.SetReadBuffer(t.TCPReadBufferSize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err = errors.New("Not Tcp Connection")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if t.ReadWriteTimeout > 0 {
|
||||
timeoutConn := &rwTimeoutConn{
|
||||
TCPConn: c.(*net.TCPConn),
|
||||
rwTimeout: t.ReadWriteTimeout,
|
||||
}
|
||||
return timeoutConn, nil
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
t.transport = &http.Transport{
|
||||
Dial: t.Dial,
|
||||
Proxy: t.Proxy,
|
||||
TLSClientConfig: t.TLSClientConfig,
|
||||
DisableKeepAlives: t.DisableKeepAlives,
|
||||
DisableCompression: t.DisableCompression,
|
||||
MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
|
||||
ResponseHeaderTimeout: t.ResponseHeaderTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Transport) CancelRequest(req *http.Request) {
|
||||
t.starter.Do(t.lazyStart)
|
||||
|
||||
t.transport.CancelRequest(req)
|
||||
}
|
||||
|
||||
func (t *Transport) CloseIdleConnections() {
|
||||
t.starter.Do(t.lazyStart)
|
||||
|
||||
t.transport.CloseIdleConnections()
|
||||
}
|
||||
|
||||
func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) {
|
||||
t.starter.Do(t.lazyStart)
|
||||
|
||||
t.transport.RegisterProtocol(scheme, rt)
|
||||
}
|
||||
|
||||
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
|
||||
t.starter.Do(t.lazyStart)
|
||||
|
||||
if t.RequestTimeout > 0 {
|
||||
timer := time.AfterFunc(t.RequestTimeout, func() {
|
||||
t.transport.CancelRequest(req)
|
||||
})
|
||||
|
||||
resp, err = t.transport.RoundTrip(req)
|
||||
if err != nil {
|
||||
timer.Stop()
|
||||
} else {
|
||||
resp.Body = &bodyCloseInterceptor{ReadCloser: resp.Body, timer: timer}
|
||||
}
|
||||
} else {
|
||||
resp, err = t.transport.RoundTrip(req)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type bodyCloseInterceptor struct {
|
||||
io.ReadCloser
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
func (bci *bodyCloseInterceptor) Close() error {
|
||||
bci.timer.Stop()
|
||||
return bci.ReadCloser.Close()
|
||||
}
|
||||
|
||||
// A net.Conn that sets a deadline for every Read or Write operation
|
||||
type rwTimeoutConn struct {
|
||||
*net.TCPConn
|
||||
rwTimeout time.Duration
|
||||
}
|
||||
|
||||
func (c *rwTimeoutConn) Read(b []byte) (int, error) {
|
||||
err := c.TCPConn.SetDeadline(time.Now().Add(c.rwTimeout))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return c.TCPConn.Read(b)
|
||||
}
|
||||
|
||||
func (c *rwTimeoutConn) Write(b []byte) (int, error) {
|
||||
err := c.TCPConn.SetDeadline(time.Now().Add(c.rwTimeout))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return c.TCPConn.Write(b)
|
||||
}
|
Loading…
Add table
Reference in a new issue