Vendor swarmkit

Update swarmkit to deec7ba2c4ef48f20ebe9674afbcced606a5339e, from the master branch.

Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
This commit is contained in:
Akihiro Suda 2016-12-05 04:56:40 +00:00
parent 24ffa2fa77
commit 7375507eea
26 changed files with 277 additions and 157 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 53fcdeba083627b1a2616612a103019b73c6d5ce
github.com/docker/swarmkit deec7ba2c4ef48f20ebe9674afbcced606a5339e
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -454,7 +454,7 @@ func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescr
return desc, err
}
// nodesEqual returns true if the node states are functionaly equal, ignoring status,
// nodesEqual returns true if the node states are functionally equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a node update to executor.

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/swarmkit/api"
)
// secrets is a map that keeps all the currenty available secrets to the agent
// secrets is a map that keeps all the currently available secrets to the agent
// mapped by secret ID
type secrets struct {
mu sync.RWMutex

View file

@ -226,16 +226,6 @@ func (s *session) logSubscriptions(ctx context.Context) error {
client := api.NewLogBrokerClient(s.conn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if grpc.Code(err) == codes.Unimplemented {
log.Warning("manager does not support log subscriptions")
// Don't return, because returning would bounce the session
select {
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
if err != nil {
return err
}
@ -243,6 +233,16 @@ func (s *session) logSubscriptions(ctx context.Context) error {
for {
resp, err := subscriptions.Recv()
if grpc.Code(err) == codes.Unimplemented {
log.Warning("manager does not support log subscriptions")
// Don't return, because returning would bounce the session
select {
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
if err != nil {
return err
}

View file

@ -441,10 +441,10 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
for _, tm := range w.taskManagers {
if match(tm.task) {
wg.Add(1)
go func() {
go func(tm *taskManager) {
defer wg.Done()
tm.Logs(ctx, *subscription.Options, publisher)
}()
}(tm)
}
}
w.mu.Unlock()

View file

@ -43,8 +43,9 @@ func (*NodeCertificateStatusRequest) ProtoMessage() {}
func (*NodeCertificateStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorCa, []int{0} }
type NodeCertificateStatusResponse struct {
Status *IssuanceStatus `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate" json:"certificate,omitempty"`
Status *IssuanceStatus `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate" json:"certificate,omitempty"`
RootCABundle []byte `protobuf:"bytes,3,opt,name=root_ca_bundle,json=rootCaBundle,proto3" json:"root_ca_bundle,omitempty"`
}
func (m *NodeCertificateStatusResponse) Reset() { *m = NodeCertificateStatusResponse{} }
@ -183,8 +184,9 @@ func (m *NodeCertificateStatusResponse) Copy() *NodeCertificateStatusResponse {
}
o := &NodeCertificateStatusResponse{
Status: m.Status.Copy(),
Certificate: m.Certificate.Copy(),
Status: m.Status.Copy(),
Certificate: m.Certificate.Copy(),
RootCABundle: m.RootCABundle,
}
return o
@ -277,7 +279,7 @@ func (this *NodeCertificateStatusResponse) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s := make([]string, 0, 7)
s = append(s, "&api.NodeCertificateStatusResponse{")
if this.Status != nil {
s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n")
@ -285,6 +287,7 @@ func (this *NodeCertificateStatusResponse) GoString() string {
if this.Certificate != nil {
s = append(s, "Certificate: "+fmt.Sprintf("%#v", this.Certificate)+",\n")
}
s = append(s, "RootCABundle: "+fmt.Sprintf("%#v", this.RootCABundle)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -643,6 +646,12 @@ func (m *NodeCertificateStatusResponse) MarshalTo(data []byte) (int, error) {
}
i += n2
}
if len(m.RootCABundle) > 0 {
data[i] = 0x1a
i++
i = encodeVarintCa(data, i, uint64(len(m.RootCABundle)))
i += copy(data[i:], m.RootCABundle)
}
return i, nil
}
@ -1112,6 +1121,10 @@ func (m *NodeCertificateStatusResponse) Size() (n int) {
l = m.Certificate.Size()
n += 1 + l + sovCa(uint64(l))
}
l = len(m.RootCABundle)
if l > 0 {
n += 1 + l + sovCa(uint64(l))
}
return n
}
@ -1212,6 +1225,7 @@ func (this *NodeCertificateStatusResponse) String() string {
s := strings.Join([]string{`&NodeCertificateStatusResponse{`,
`Status:` + strings.Replace(fmt.Sprintf("%v", this.Status), "IssuanceStatus", "IssuanceStatus", 1) + `,`,
`Certificate:` + strings.Replace(fmt.Sprintf("%v", this.Certificate), "Certificate", "Certificate", 1) + `,`,
`RootCABundle:` + fmt.Sprintf("%v", this.RootCABundle) + `,`,
`}`,
}, "")
return s
@ -1461,6 +1475,37 @@ func (m *NodeCertificateStatusResponse) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field RootCABundle", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCa
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCa
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.RootCABundle = append(m.RootCABundle[:0], data[iNdEx:postIndex]...)
if m.RootCABundle == nil {
m.RootCABundle = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCa(data[iNdEx:])
@ -2128,44 +2173,46 @@ var (
func init() { proto.RegisterFile("ca.proto", fileDescriptorCa) }
var fileDescriptorCa = []byte{
// 615 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x4c,
0x14, 0xed, 0xb8, 0xfd, 0xd2, 0xf6, 0x26, 0x5f, 0x8b, 0xa6, 0xad, 0x14, 0xd2, 0xd4, 0xa9, 0xcc,
0xa2, 0x65, 0x81, 0xd3, 0x06, 0x56, 0xb0, 0x21, 0x09, 0x52, 0x15, 0xa1, 0x22, 0x34, 0x11, 0x6c,
0x2b, 0xc7, 0x19, 0x82, 0x15, 0xc7, 0x63, 0x3c, 0xe3, 0x40, 0x76, 0x48, 0x20, 0xde, 0x00, 0xc1,
0x8a, 0x47, 0xe0, 0x39, 0x22, 0x56, 0x48, 0x6c, 0x58, 0x45, 0xc4, 0x0f, 0x80, 0x78, 0x04, 0xe4,
0xb1, 0x4d, 0xf3, 0xe3, 0x84, 0xb2, 0xf2, 0xcc, 0x9d, 0x73, 0xce, 0xbd, 0xf7, 0xcc, 0xf5, 0xc0,
0x86, 0x69, 0xe8, 0xae, 0xc7, 0x04, 0xc3, 0xb8, 0xcd, 0xcc, 0x2e, 0xf5, 0x74, 0xfe, 0xd2, 0xf0,
0x7a, 0x5d, 0x4b, 0xe8, 0xfd, 0xd3, 0x42, 0x56, 0x0c, 0x5c, 0xca, 0x23, 0x40, 0x21, 0xcb, 0x5d,
0x6a, 0x26, 0x9b, 0xdd, 0x0e, 0xeb, 0x30, 0xb9, 0x2c, 0x87, 0xab, 0x38, 0xba, 0xe3, 0xda, 0x7e,
0xc7, 0x72, 0xca, 0xd1, 0x27, 0x0a, 0x6a, 0x75, 0x28, 0x3e, 0x62, 0x6d, 0x5a, 0xa7, 0x9e, 0xb0,
0x9e, 0x59, 0xa6, 0x21, 0x68, 0x53, 0x18, 0xc2, 0xe7, 0x84, 0xbe, 0xf0, 0x29, 0x17, 0xf8, 0x06,
0xac, 0x3b, 0xac, 0x4d, 0x2f, 0xac, 0x76, 0x1e, 0x1d, 0xa2, 0xe3, 0xcd, 0x1a, 0x04, 0xa3, 0x52,
0x26, 0xa4, 0x34, 0x1e, 0x90, 0x4c, 0x78, 0xd4, 0x68, 0x6b, 0x9f, 0x10, 0x1c, 0x2c, 0x50, 0xe1,
0x2e, 0x73, 0x38, 0xc5, 0x77, 0x21, 0xc3, 0x65, 0x44, 0xaa, 0x64, 0x2b, 0x9a, 0x3e, 0xdf, 0x90,
0xde, 0xe0, 0xdc, 0x37, 0x1c, 0x33, 0xe1, 0xc6, 0x0c, 0x5c, 0x85, 0xac, 0x79, 0x29, 0x9c, 0x57,
0xa4, 0x40, 0x29, 0x4d, 0x60, 0x22, 0x3f, 0x99, 0xe4, 0x68, 0xdf, 0x10, 0xec, 0x87, 0xea, 0x74,
0xa6, 0xca, 0xa4, 0xcb, 0x3b, 0xb0, 0xe6, 0x31, 0x9b, 0xca, 0xe2, 0xb6, 0x2a, 0xc5, 0x34, 0xed,
0x90, 0x49, 0x98, 0x4d, 0x6b, 0x4a, 0x1e, 0x11, 0x89, 0xc6, 0xd7, 0x61, 0xd5, 0xe4, 0x9e, 0x2c,
0x28, 0x57, 0x5b, 0x0f, 0x46, 0xa5, 0xd5, 0x7a, 0x93, 0x90, 0x30, 0x86, 0x77, 0xe1, 0x3f, 0xc1,
0xba, 0xd4, 0xc9, 0xaf, 0x86, 0xa6, 0x91, 0x68, 0x83, 0xcf, 0x21, 0x67, 0xf4, 0x0d, 0xcb, 0x36,
0x5a, 0x96, 0x6d, 0x89, 0x41, 0x7e, 0x4d, 0xa6, 0xbb, 0xb9, 0x28, 0x5d, 0xd3, 0xa5, 0xa6, 0x5e,
0x9d, 0x20, 0x90, 0x29, 0xba, 0xf6, 0x1e, 0x41, 0x31, 0xbd, 0xab, 0xd8, 0xf5, 0xab, 0x5c, 0x1e,
0x7e, 0x0c, 0xdb, 0x12, 0xd4, 0xa3, 0xbd, 0x16, 0xf5, 0xf8, 0x73, 0xcb, 0x95, 0x1d, 0x6d, 0x55,
0x8e, 0x96, 0xd6, 0x75, 0xfe, 0x07, 0x4e, 0xb6, 0x42, 0xfe, 0xe5, 0x5e, 0x3b, 0x80, 0xfd, 0x33,
0x2a, 0x08, 0x63, 0xa2, 0x5e, 0x9d, 0x37, 0x5b, 0xbb, 0x0f, 0xc5, 0xf4, 0xe3, 0xb8, 0xea, 0xc3,
0xe9, 0xfb, 0x0e, 0x2b, 0xcf, 0x4d, 0x5f, 0xe7, 0x1e, 0xec, 0x9c, 0x51, 0xf1, 0xc4, 0xb1, 0x99,
0xd9, 0x7d, 0x48, 0x07, 0x89, 0xb0, 0x07, 0xbb, 0xd3, 0xe1, 0x58, 0xf0, 0x00, 0xc0, 0x97, 0xc1,
0x8b, 0x2e, 0x1d, 0xc4, 0x7a, 0x9b, 0x7e, 0x02, 0xc3, 0xf7, 0x60, 0xbd, 0x4f, 0x3d, 0x6e, 0x31,
0x27, 0x9e, 0xad, 0xfd, 0xb4, 0xc6, 0x9f, 0x46, 0x90, 0xda, 0xda, 0x70, 0x54, 0x5a, 0x21, 0x09,
0xa3, 0xf2, 0x56, 0x01, 0xa5, 0x5e, 0xc5, 0x6f, 0x90, 0xcc, 0x3d, 0xd7, 0x14, 0x2e, 0xa7, 0x69,
0x2d, 0x71, 0xa7, 0x70, 0x72, 0x75, 0x42, 0xd4, 0x9e, 0xb6, 0xf1, 0xe5, 0xf3, 0xcf, 0x8f, 0x8a,
0x72, 0x0d, 0xe1, 0x57, 0x90, 0x9b, 0x34, 0x00, 0x1f, 0x2d, 0xd0, 0x9a, 0x75, 0xae, 0x70, 0xfc,
0x77, 0x60, 0x9c, 0x6c, 0x4f, 0x26, 0xdb, 0x86, 0xff, 0x25, 0xf2, 0x56, 0xcf, 0x70, 0x8c, 0x0e,
0xf5, 0x2a, 0x1f, 0x14, 0x90, 0x73, 0x15, 0x5b, 0x91, 0x36, 0x95, 0xe9, 0x56, 0x2c, 0xf9, 0x2b,
0xd3, 0xad, 0x58, 0x36, 0xf0, 0x13, 0x56, 0xbc, 0x43, 0xb0, 0x97, 0xfa, 0x24, 0xe1, 0x93, 0x45,
0x63, 0xbd, 0xe8, 0x0d, 0x2c, 0x9c, 0xfe, 0x03, 0x63, 0xb6, 0x90, 0x5a, 0x71, 0x38, 0x56, 0x57,
0xbe, 0x8f, 0xd5, 0x95, 0x5f, 0x63, 0x15, 0xbd, 0x0e, 0x54, 0x34, 0x0c, 0x54, 0xf4, 0x35, 0x50,
0xd1, 0x8f, 0x40, 0x45, 0xad, 0x8c, 0x7c, 0x85, 0x6f, 0xff, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xcf,
0xc4, 0x68, 0xc2, 0xea, 0x05, 0x00, 0x00,
// 651 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x54, 0xc1, 0x6e, 0xd3, 0x4a,
0x14, 0xed, 0xb8, 0x7d, 0x69, 0x7b, 0xe3, 0x97, 0x56, 0xd3, 0x56, 0x0a, 0x69, 0xea, 0x54, 0x66,
0xd1, 0xb2, 0x20, 0x6d, 0x03, 0x62, 0x01, 0x1b, 0xe2, 0x20, 0x55, 0x15, 0x2a, 0x42, 0x53, 0xc1,
0x36, 0x9a, 0x38, 0x43, 0xb0, 0xe2, 0x78, 0x8c, 0x67, 0x5c, 0xc8, 0x0e, 0x09, 0xc4, 0x1f, 0x20,
0xf8, 0x0a, 0xbe, 0xa3, 0x62, 0x85, 0x84, 0x84, 0x58, 0x45, 0xd4, 0x1f, 0x80, 0xf8, 0x04, 0xe4,
0xb1, 0x43, 0x93, 0xc6, 0x09, 0x65, 0x15, 0xcf, 0xf5, 0x39, 0xe7, 0xde, 0x73, 0x7c, 0x33, 0xb0,
0x64, 0xd3, 0xaa, 0x1f, 0x70, 0xc9, 0x31, 0x6e, 0x73, 0xbb, 0xcb, 0x82, 0xaa, 0x78, 0x49, 0x83,
0x5e, 0xd7, 0x91, 0xd5, 0xd3, 0x83, 0x52, 0x5e, 0xf6, 0x7d, 0x26, 0x12, 0x40, 0x29, 0x2f, 0x7c,
0x66, 0x0f, 0x0f, 0xeb, 0x1d, 0xde, 0xe1, 0xea, 0x71, 0x2f, 0x7e, 0x4a, 0xab, 0x6b, 0xbe, 0x1b,
0x76, 0x1c, 0x6f, 0x2f, 0xf9, 0x49, 0x8a, 0x66, 0x03, 0xca, 0x8f, 0x78, 0x9b, 0x35, 0x58, 0x20,
0x9d, 0x67, 0x8e, 0x4d, 0x25, 0x3b, 0x91, 0x54, 0x86, 0x82, 0xb0, 0x17, 0x21, 0x13, 0x12, 0x5f,
0x87, 0x45, 0x8f, 0xb7, 0x59, 0xd3, 0x69, 0x17, 0xd1, 0x36, 0xda, 0x5d, 0xb6, 0x20, 0x1a, 0x54,
0x72, 0x31, 0xe5, 0xe8, 0x01, 0xc9, 0xc5, 0xaf, 0x8e, 0xda, 0xe6, 0x37, 0x04, 0x5b, 0x53, 0x54,
0x84, 0xcf, 0x3d, 0xc1, 0xf0, 0x5d, 0xc8, 0x09, 0x55, 0x51, 0x2a, 0xf9, 0x9a, 0x59, 0x9d, 0x34,
0x54, 0x3d, 0x12, 0x22, 0xa4, 0x9e, 0x3d, 0xe4, 0xa6, 0x0c, 0x5c, 0x87, 0xbc, 0x7d, 0x21, 0x5c,
0xd4, 0x94, 0x40, 0x25, 0x4b, 0x60, 0xa4, 0x3f, 0x19, 0xe5, 0xe0, 0x3b, 0x50, 0x08, 0x38, 0x97,
0x4d, 0x9b, 0x36, 0x5b, 0xa1, 0xd7, 0x76, 0x59, 0x71, 0x7e, 0x1b, 0xed, 0xea, 0xd6, 0x6a, 0x34,
0xa8, 0xe8, 0x84, 0x73, 0xd9, 0xa8, 0x5b, 0xaa, 0x4e, 0xf4, 0x18, 0xd7, 0xa0, 0xc9, 0xc9, 0xfc,
0x8a, 0x60, 0x33, 0x9e, 0x8a, 0x5d, 0x72, 0x37, 0x4c, 0xe7, 0x36, 0x2c, 0x04, 0xdc, 0x65, 0xca,
0x54, 0xa1, 0x56, 0xce, 0x9a, 0x29, 0x66, 0x12, 0xee, 0x32, 0x4b, 0x2b, 0x22, 0xa2, 0xd0, 0xf8,
0x1a, 0xcc, 0xdb, 0x22, 0x50, 0x46, 0x74, 0x6b, 0x31, 0x1a, 0x54, 0xe6, 0x1b, 0x27, 0x84, 0xc4,
0x35, 0xbc, 0x0e, 0xff, 0x49, 0xde, 0x65, 0x9e, 0x9a, 0x6f, 0x99, 0x24, 0x07, 0x7c, 0x0c, 0x3a,
0x3d, 0xa5, 0x8e, 0x4b, 0x5b, 0x8e, 0xeb, 0xc8, 0x7e, 0x71, 0x41, 0xb5, 0xbb, 0x31, 0xad, 0xdd,
0x89, 0xcf, 0xec, 0x6a, 0x7d, 0x84, 0x40, 0xc6, 0xe8, 0xe6, 0x7b, 0x04, 0xe5, 0x6c, 0x57, 0xe9,
0xd7, 0xba, 0xca, 0x47, 0xc7, 0x8f, 0x61, 0x45, 0x81, 0x7a, 0xac, 0xd7, 0x62, 0x81, 0x78, 0xee,
0xf8, 0xca, 0x51, 0xa1, 0xb6, 0x33, 0x73, 0xae, 0xe3, 0x3f, 0x70, 0x52, 0x88, 0xf9, 0x17, 0x67,
0x73, 0x0b, 0x36, 0x0f, 0x99, 0x4c, 0x3e, 0xc7, 0x64, 0xd8, 0xe6, 0x7d, 0x28, 0x67, 0xbf, 0x4e,
0xa7, 0xde, 0x1e, 0xdf, 0x93, 0x78, 0x72, 0x7d, 0x6c, 0x0d, 0xcc, 0x0d, 0x58, 0x3b, 0x64, 0xf2,
0x89, 0xe7, 0x72, 0xbb, 0xfb, 0x90, 0xf5, 0x87, 0xc2, 0x01, 0xac, 0x8f, 0x97, 0x53, 0xc1, 0x2d,
0x80, 0x50, 0x15, 0x9b, 0x5d, 0xd6, 0x4f, 0xf5, 0x96, 0xc3, 0x21, 0x0c, 0xdf, 0x83, 0xc5, 0x53,
0x16, 0x08, 0x87, 0x7b, 0xe9, 0x4e, 0x6e, 0x66, 0x19, 0x7f, 0x9a, 0x40, 0xac, 0x85, 0xb3, 0x41,
0x65, 0x8e, 0x0c, 0x19, 0xb5, 0xb7, 0x1a, 0x68, 0x8d, 0x3a, 0x7e, 0x83, 0x54, 0xef, 0x09, 0x53,
0x78, 0x2f, 0x4b, 0x6b, 0x46, 0x3a, 0xa5, 0xfd, 0xab, 0x13, 0x12, 0x7b, 0xe6, 0xd2, 0xe7, 0x4f,
0x3f, 0x3f, 0x6a, 0xda, 0x2a, 0xc2, 0xaf, 0x40, 0x1f, 0x0d, 0x00, 0xef, 0x4c, 0xd1, 0xba, 0x9c,
0x5c, 0x69, 0xf7, 0xef, 0xc0, 0xb4, 0xd9, 0x86, 0x6a, 0xb6, 0x02, 0xff, 0x2b, 0xe4, 0xcd, 0x1e,
0xf5, 0x68, 0x87, 0x05, 0xb5, 0x0f, 0x1a, 0xa8, 0xbd, 0x4a, 0xa3, 0xc8, 0xda, 0xca, 0xec, 0x28,
0x66, 0xfc, 0x2b, 0xb3, 0xa3, 0x98, 0xb5, 0xf0, 0x23, 0x51, 0xbc, 0x43, 0xb0, 0x91, 0x79, 0x95,
0xe1, 0xfd, 0x69, 0x6b, 0x3d, 0xed, 0xee, 0x2c, 0x1d, 0xfc, 0x03, 0xe3, 0xf2, 0x20, 0x56, 0xf9,
0xec, 0xdc, 0x98, 0xfb, 0x7e, 0x6e, 0xcc, 0xfd, 0x3a, 0x37, 0xd0, 0xeb, 0xc8, 0x40, 0x67, 0x91,
0x81, 0xbe, 0x44, 0x06, 0xfa, 0x11, 0x19, 0xa8, 0x95, 0x53, 0xb7, 0xf7, 0xad, 0xdf, 0x01, 0x00,
0x00, 0xff, 0xff, 0x69, 0xb6, 0x7e, 0x90, 0x22, 0x06, 0x00, 0x00,
}

View file

@ -36,6 +36,7 @@ message NodeCertificateStatusRequest {
message NodeCertificateStatusResponse {
IssuanceStatus status = 1;
Certificate certificate = 2;
bytes root_ca_bundle = 3 [(gogoproto.customname) = "RootCABundle"];
}
message IssueNodeCertificateRequest {

View file

@ -86,7 +86,7 @@ func (AssignmentsMessage_Type) EnumDescriptor() ([]byte, []int) {
// SessionRequest starts a session.
type SessionRequest struct {
Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
// SessionID can be provided to attempt resuming an exising session. If the
// SessionID can be provided to attempt resuming an existing session. If the
// SessionID is empty or invalid, a new SessionID will be assigned.
//
// See SessionMessage.SessionID for details.

View file

@ -62,7 +62,7 @@ service Dispatcher { // maybe dispatch, al likes this
// SessionRequest starts a session.
message SessionRequest {
NodeDescription description = 1;
// SessionID can be provided to attempt resuming an exising session. If the
// SessionID can be provided to attempt resuming an existing session. If the
// SessionID is empty or invalid, a new SessionID will be assigned.
//
// See SessionMessage.SessionID for details.

View file

@ -6,7 +6,7 @@ import (
"github.com/docker/swarmkit/api"
)
// TasksEqualStable returns true if the tasks are functionaly equal, ignoring status,
// TasksEqualStable returns true if the tasks are functionally equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a task update to a controller.

View file

@ -113,7 +113,7 @@ type Endpoint_VirtualIP struct {
// strictly a logical IP and there may not be any
// interfaces assigned this IP address or any route
// created for this address. More than one to
// accomodate for both IPv4 and IPv6
// accommodate for both IPv4 and IPv6
Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"`
}

View file

@ -98,7 +98,7 @@ message Endpoint {
// strictly a logical IP and there may not be any
// interfaces assigned this IP address or any route
// created for this address. More than one to
// accomodate for both IPv4 and IPv6
// accommodate for both IPv4 and IPv6
string addr = 2;
}

View file

@ -299,7 +299,7 @@ type TaskSpec struct {
Networks []*NetworkAttachmentConfig `protobuf:"bytes,7,rep,name=networks" json:"networks,omitempty"`
// ForceUpdate is a counter that triggers an update even if no relevant
// parameters have been changed. We do this to allow forced restarts
// using the same reconcilation-based mechanism that performs rolling
// using the same reconciliation-based mechanism that performs rolling
// updates.
ForceUpdate uint64 `protobuf:"varint,9,opt,name=force_update,json=forceUpdate,proto3" json:"force_update,omitempty"`
}

View file

@ -115,7 +115,7 @@ message TaskSpec {
// ForceUpdate is a counter that triggers an update even if no relevant
// parameters have been changed. We do this to allow forced restarts
// using the same reconcilation-based mechanism that performs rolling
// using the same reconciliation-based mechanism that performs rolling
// updates.
uint64 force_update = 9;
}

View file

@ -102,13 +102,15 @@ type RootCA struct {
// Key will only be used by the original manager to put the private
// key-material in raft, no signing operations depend on it.
Key []byte
// Cert includes the PEM encoded Certificate for the Root CA
// Cert includes the PEM encoded Certificate bundle for the Root CA
Cert []byte
Pool *x509.CertPool
// Digest of the serialized bytes of the certificate
Digest digest.Digest
// This signer will be nil if the node doesn't have the appropriate key material
Signer cfsigner.Signer
// Path stores the location on disk where the RootCA lives
Path CertPaths
}
// CanSign ensures that the signer has all three necessary elements needed to operate
@ -163,9 +165,9 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
// Get the remote manager to issue a CA signed certificate for this node
// Retry up to 5 times in case the manager we first try to contact isn't
// responding properly (for example, it may have just been demoted).
var signedCert []byte
var response *api.NodeCertificateStatusResponse
for i := 0; i != 5; i++ {
signedCert, err = GetRemoteSignedCertificate(ctx, csr, rca.Pool, config)
response, err = GetRemoteSignedCertificate(ctx, csr, rca.Pool, config)
if err == nil {
break
}
@ -177,7 +179,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
// Доверяй, но проверяй.
// Before we overwrite our local key + certificate, let's make sure the server gave us one that is valid
// Create an X509Cert so we can .Verify()
certBlock, _ := pem.Decode(signedCert)
certBlock, _ := pem.Decode(response.Certificate.Certificate)
if certBlock == nil {
return nil, errors.New("failed to parse certificate PEM")
}
@ -185,17 +187,34 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
if err != nil {
return nil, err
}
// Include our current root pool
opts := x509.VerifyOptions{
Roots: rca.Pool,
// We retrieve the certificate with the current root pool, so we know this was issued by a legitimate manager.
// However, there might have been a server-side root rotation, so we verify this cert with a new pool.
// If we got a valid response.RootCABundle, turn it into a Pool, and verify the newly minted certificate using it.
var (
newRootErr error
newRootCA RootCA
)
rootCAPool := rca.Pool
if response.RootCABundle != nil {
newRootCA, newRootErr = NewRootCA(response.RootCABundle, nil, rca.Path, time.Minute)
if newRootErr == nil {
// The response.RootCABundle we got from the remote server seems to be good, use it
rootCAPool = newRootCA.Pool
}
}
// Check to see if this certificate was signed by our CA, and isn't expired
// Create VerifyOptions with either the new certificate bundle, or the old pool
opts := x509.VerifyOptions{
Roots: rootCAPool,
}
// Check to see if this certificate was signed by one of the CAs, and isn't expired
if _, err := X509Cert.Verify(opts); err != nil {
return nil, err
}
// Create a valid TLSKeyPair out of the PEM encoded private key and certificate
tlsKeyPair, err := tls.X509KeyPair(signedCert, key)
tlsKeyPair, err := tls.X509KeyPair(response.Certificate.Certificate, key)
if err != nil {
return nil, err
}
@ -211,7 +230,16 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
return nil, err
}
if err := kw.Write(signedCert, key, kekUpdate); err != nil {
// If a CA certificate bundle exists it has been validated before. If it's different, let's write it to disk.
// Root rotation should always happen by appending a new CA cert, and later removing the old one,
// so it's safer to do it in this order of operations (write root, write certificate)
if newRootErr == nil && !bytes.Equal(rca.Cert, response.RootCABundle) {
if err := newRootCA.saveCertificate(); err != nil {
return nil, err
}
}
if err := kw.Write(response.Certificate.Certificate, key, kekUpdate); err != nil {
return nil, err
}
@ -316,10 +344,28 @@ func (rca *RootCA) AppendFirstRootPEM(cert []byte) ([]byte, error) {
return certChain, nil
}
func (rca *RootCA) saveCertificate() error {
if rca.Cert == nil {
return errors.New("no valid certificate bundle found")
}
if rca.Path.Cert == "" {
return errors.New("no path found for this root CA")
}
// Make sure the necessary dirs exist and they are writable
err := os.MkdirAll(filepath.Dir(rca.Path.Cert), 0755)
if err != nil {
return err
}
return ioutils.AtomicWriteFile(rca.Path.Cert, rca.Cert, 0644)
}
// NewRootCA creates a new RootCA object from unparsed PEM cert bundle and key byte
// slices. key may be nil, and in this case NewRootCA will return a RootCA
// without a signer.
func NewRootCA(certBytes, keyBytes []byte, certExpiry time.Duration) (RootCA, error) {
func NewRootCA(certBytes, keyBytes []byte, paths CertPaths, certExpiry time.Duration) (RootCA, error) {
// Parse all the certificates in the cert bundle
parsedCerts, err := helpers.ParseCertificatesPEM(certBytes)
if err != nil {
@ -345,7 +391,7 @@ func NewRootCA(certBytes, keyBytes []byte, certExpiry time.Duration) (RootCA, er
if len(keyBytes) == 0 {
// This RootCA does not have a valid signer.
return RootCA{Cert: certBytes, Digest: digest, Pool: pool}, nil
return RootCA{Cert: certBytes, Digest: digest, Pool: pool, Path: paths}, nil
}
var (
@ -387,7 +433,7 @@ func NewRootCA(certBytes, keyBytes []byte, certExpiry time.Duration) (RootCA, er
keyBlock, _ := pem.Decode(keyBytes)
if keyBlock == nil {
// This RootCA does not have a valid signer.
return RootCA{Cert: certBytes, Digest: digest, Pool: pool}, nil
return RootCA{Cert: certBytes, Digest: digest, Pool: pool, Path: paths}, nil
}
if passphraseStr != "" && !x509.IsEncryptedPEMBlock(keyBlock) {
keyBytes, err = EncryptECPrivateKey(keyBytes, passphraseStr)
@ -396,7 +442,7 @@ func NewRootCA(certBytes, keyBytes []byte, certExpiry time.Duration) (RootCA, er
}
}
return RootCA{Signer: signer, Key: keyBytes, Digest: digest, Cert: certBytes, Pool: pool}, nil
return RootCA{Signer: signer, Key: keyBytes, Digest: digest, Cert: certBytes, Pool: pool, Path: paths}, nil
}
func ensureCertKeyMatch(cert *x509.Certificate, key crypto.PublicKey) error {
@ -414,8 +460,7 @@ func ensureCertKeyMatch(cert *x509.Certificate, key crypto.PublicKey) error {
return errors.New("certificate key mismatch")
}
// GetLocalRootCA validates if the contents of the file are a valid self-signed
// CA certificate, and returns the PEM-encoded Certificate if so
// GetLocalRootCA returns the PEM-encoded root CA Certificate if it exists
func GetLocalRootCA(paths CertPaths) (RootCA, error) {
// Check if we have a Certificate file
cert, err := ioutil.ReadFile(paths.Cert)
@ -427,17 +472,7 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) {
return RootCA{}, err
}
key, err := ioutil.ReadFile(paths.Key)
if err != nil {
if !os.IsNotExist(err) {
return RootCA{}, err
}
// There may not be a local key. It's okay to pass in a nil
// key. We'll get a root CA without a signer.
key = nil
}
return NewRootCA(cert, key, DefaultNodeCertExpiration)
return NewRootCA(cert, nil, paths, DefaultNodeCertExpiration)
}
func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) {
@ -530,13 +565,13 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
return RootCA{}, err
}
rootCA, err := NewRootCA(cert, key, DefaultNodeCertExpiration)
rootCA, err := NewRootCA(cert, key, paths, DefaultNodeCertExpiration)
if err != nil {
return RootCA{}, err
}
// save the cert to disk
if err := saveRootCA(rootCA, paths); err != nil {
if err := rootCA.saveCertificate(); err != nil {
return RootCA{}, err
}
@ -545,7 +580,7 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
// and that is part of a CA identified by a specific certificate pool.
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x509.CertPool, config CertificateRequestConfig) ([]byte, error) {
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x509.CertPool, config CertificateRequestConfig) (*api.NodeCertificateStatusResponse, error) {
if rootCAPool == nil {
return nil, errors.New("valid root CA pool required")
}
@ -594,7 +629,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
}
// If the certificate was issued, return
if statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Status != nil && statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Certificate == nil {
return nil, errors.New("no certificate in CertificateStatus response")
}
@ -606,7 +641,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
// current request.
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
return statusResponse.Certificate.Certificate, nil
return statusResponse, nil
}
}
@ -640,17 +675,6 @@ func readCertValidity(kr KeyReader) (time.Time, time.Time, error) {
}
func saveRootCA(rootCA RootCA, paths CertPaths) error {
// Make sure the necessary dirs exist and they are writable
err := os.MkdirAll(filepath.Dir(paths.Cert), 0755)
if err != nil {
return err
}
// If the root certificate got returned successfully, save the rootCA to disk.
return ioutils.AtomicWriteFile(paths.Cert, rootCA.Cert, 0644)
}
// GenerateNewCSR returns a newly generated key and CSR signed with said key
func GenerateNewCSR() (csr, key []byte, err error) {
req := &cfcsr.CertificateRequest{

View file

@ -120,8 +120,15 @@ func (s *SecurityConfig) UpdateRootCA(cert, key []byte, certExpiry time.Duration
s.mu.Lock()
defer s.mu.Unlock()
rootCA, err := NewRootCA(cert, key, certExpiry)
// Create a new RootCA, keeping the path of the old RootCA
rootCA, err := NewRootCA(cert, key, s.rootCA.Path, certExpiry)
if err != nil {
return err
}
// Attempt to write the new certificate to disk
err = rootCA.saveCertificate()
if err == nil {
// No errors, save the current rootCA
s.rootCA = &rootCA
}
@ -232,7 +239,8 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote
}
// Save root CA certificate to disk
if err = saveRootCA(rootCA, paths); err != nil {
rootCA.Path = paths
if err = rootCA.saveCertificate(); err != nil {
return RootCA{}, err
}
@ -454,7 +462,6 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
// Since the expiration of the certificate is managed remotely we should update our
// retry timer on every iteration of this loop.
// Retrieve the current certificate expiration information.
validFrom, validUntil, err := readCertValidity(s.KeyReader())
if err != nil {
// We failed to read the expiration, let's stick with the starting default

View file

@ -142,8 +142,9 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
// If this certificate has a final state, return it immediately (both pending and renew are transition states)
if isFinalState(node.Certificate.Status) {
return &api.NodeCertificateStatusResponse{
Status: &node.Certificate.Status,
Certificate: &node.Certificate,
Status: &node.Certificate.Status,
Certificate: &node.Certificate,
RootCABundle: s.securityConfig.RootCA().Cert,
}, nil
}
@ -164,8 +165,9 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
if isFinalState(v.Node.Certificate.Status) {
cert := v.Node.Certificate.Copy()
return &api.NodeCertificateStatusResponse{
Status: &cert.Status,
Certificate: cert,
Status: &cert.Status,
Certificate: cert,
RootCABundle: s.securityConfig.RootCA().Cert,
}, nil
}
}

View file

@ -1,8 +1,13 @@
package log
import "google.golang.org/grpc/grpclog"
import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)
func init() {
ctx := WithModule(context.Background(), "grpc")
// completely replace the grpc logger with the logrus logger.
grpclog.SetLogger(L)
grpclog.SetLogger(G(ctx))
}

View file

@ -573,9 +573,18 @@ func (na *NetworkAllocator) allocateDriverState(n *api.Network) error {
return err
}
var options map[string]string
options := make(map[string]string)
// reconcile the driver specific options from the network spec
// and from the operational state retrieved from the store
if n.Spec.DriverConfig != nil {
options = n.Spec.DriverConfig.Options
for k, v := range n.Spec.DriverConfig.Options {
options[k] = v
}
}
if n.DriverState != nil {
for k, v := range n.DriverState.Options {
options[k] = v
}
}
// Construct IPAM data for driver consumption.

View file

@ -101,7 +101,8 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe
err := s.store.Update(func(tx store.Tx) error {
cluster = store.GetCluster(tx, request.ClusterID)
if cluster == nil {
return nil
return grpc.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
}
cluster.Meta.Version = *request.ClusterVersion
cluster.Spec = *request.Spec.Copy()
@ -145,9 +146,6 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe
if err != nil {
return nil, err
}
if cluster == nil {
return nil, grpc.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
}
redactedClusters := redactClusters([]*api.Cluster{cluster})

View file

@ -211,7 +211,7 @@ func (s *Server) UpdateNode(ctx context.Context, request *api.UpdateNodeRequest)
err := s.store.Update(func(tx store.Tx) error {
node = store.GetNode(tx, request.NodeID)
if node == nil {
return nil
return grpc.Errorf(codes.NotFound, "node %s not found", request.NodeID)
}
// Demotion sanity checks.
@ -245,9 +245,6 @@ func (s *Server) UpdateNode(ctx context.Context, request *api.UpdateNodeRequest)
if err != nil {
return nil, err
}
if node == nil {
return nil, grpc.Errorf(codes.NotFound, "node %s not found", request.NodeID)
}
if demote && s.raft != nil {
// TODO(abronan): the remove can potentially fail and leave the node with

View file

@ -66,7 +66,7 @@ func (s *Server) UpdateSecret(ctx context.Context, request *api.UpdateSecretRequ
err := s.store.Update(func(tx store.Tx) error {
secret = store.GetSecret(tx, request.SecretID)
if secret == nil {
return nil
return grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID)
}
// Check if the Name is different than the current name, or the secret is non-nil and different
@ -85,9 +85,6 @@ func (s *Server) UpdateSecret(ctx context.Context, request *api.UpdateSecretRequ
if err != nil {
return nil, err
}
if secret == nil {
return nil, grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID)
}
log.G(ctx).WithFields(logrus.Fields{
"secret.ID": request.SecretID,

View file

@ -505,7 +505,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
err := s.store.Update(func(tx store.Tx) error {
service = store.GetService(tx, request.ServiceID)
if service == nil {
return nil
return grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}
// temporary disable network update
requestSpecNetworks := request.Spec.Task.Networks
@ -552,9 +552,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
if err != nil {
return nil, err
}
if service == nil {
return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}
return &api.UpdateServiceResponse{
Service: service,
}, nil

View file

@ -388,26 +388,39 @@ func (m *Manager) Run(parent context.Context) error {
close(m.started)
errCh := make(chan error, 1)
go func() {
err := m.raftNode.Run(ctx)
if err != nil {
errCh <- err
log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx)
}
}()
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
return runErr
}
default:
}
return err
}
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return returnErr(err)
}
c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil {
return err
return returnErr(err)
}
raftConfig := c.Spec.Raft
if err := m.watchForKEKChanges(ctx); err != nil {
return err
return returnErr(err)
}
if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
@ -426,7 +439,8 @@ func (m *Manager) Run(parent context.Context) error {
}
m.mu.Unlock()
m.Stop(ctx)
return err
return returnErr(err)
}
const stopTimeout = 8 * time.Second

View file

@ -549,7 +549,7 @@ func (n *Node) needsSnapshot() bool {
keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil {
n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
// we want to wait for the last index written with the old DEK to be commited, else a snapshot taken
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
// written with the new key to supercede any WAL written with an old DEK.
n.waitForAppliedIndex = n.writtenIndex
@ -892,18 +892,39 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
return n.removeMember(ctx, id)
}
// processRaftMessageLogger is used to lazily create a logger for
// ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
// formatting strings and allocating a logger when it won't be used.
func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
fields := logrus.Fields{
"method": "(*Node).ProcessRaftMessage",
}
if n.IsMember() {
fields["raft_id"] = fmt.Sprintf("%x", n.Config.ID)
}
if msg != nil && msg.Message != nil {
fields["from"] = fmt.Sprintf("%x", msg.Message.From)
}
return log.G(ctx).WithFields(fields)
}
// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
return &api.ProcessRaftMessageResponse{}, nil
}
// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error())
}
var sourceHost string
@ -921,16 +942,16 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
if msg.Message.Type == raftpb.MsgVote {
member := n.cluster.GetMember(msg.Message.From)
if member == nil || member.Conn == nil {
log.G(ctx).Errorf("received vote request from unknown member %x", msg.Message.From)
return nil, ErrMemberUnknown
n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
return &api.ProcessRaftMessageResponse{}, nil
}
healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancel()
if err := member.HealthCheck(healthCtx); err != nil {
log.G(ctx).WithError(err).Warningf("member %x which sent vote request failed health check", msg.Message.From)
return nil, errors.Wrap(err, "member unreachable")
n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil
}
}
@ -939,19 +960,18 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
return &api.ProcessRaftMessageResponse{}, nil
}
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return nil, ErrNoRaftMember
}
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
return nil, err
if n.IsMember() {
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
}
}
return &api.ProcessRaftMessageResponse{}, nil
@ -1337,7 +1357,7 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
"github.com/pkg/errors"
@ -694,7 +695,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
case <-done:
// Fail out if m.Run() returns error, otherwise wait for
// role change.
if runErr != nil {
if runErr != nil && runErr != raft.ErrMemberRemoved {
err = runErr
} else {
err = <-roleChanged