Vendor swarmkit ae29cf2

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2017-05-11 15:18:12 -07:00
parent 69c35dad8e
commit 721b7a7fad
24 changed files with 296 additions and 40 deletions

View file

@ -105,7 +105,7 @@ github.com/docker/containerd 8ef7df579710405c4bb6e0812495671002ce08e0
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit f420c4b9e1535170fc229db97ee8ac32374020b1
github.com/docker/swarmkit ae29cf24355ef2106b63884d2f9b0a6406e5a144
github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

View file

@ -2074,7 +2074,7 @@ func init() { proto.RegisterFile("ca.proto", fileDescriptorCa) }
var fileDescriptorCa = []byte{
// 610 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xee, 0xba, 0x25, 0x6d, 0x27, 0xa1, 0x45, 0xdb, 0x56, 0x32, 0x69, 0xea, 0x54, 0xe6, 0xd0,
0x72, 0xc0, 0x6d, 0x03, 0x27, 0xb8, 0x90, 0x04, 0xa9, 0x8a, 0x50, 0x11, 0xda, 0x08, 0xae, 0x95,
0xe3, 0x2c, 0xc1, 0x8a, 0xe3, 0x35, 0xde, 0x75, 0x20, 0x37, 0x24, 0x10, 0x6f, 0x80, 0xe0, 0xc4,

View file

@ -15993,7 +15993,7 @@ func init() { proto.RegisterFile("control.proto", fileDescriptorControl) }
var fileDescriptorControl = []byte{
// 2096 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x5a, 0x4b, 0x6f, 0x1b, 0xc9,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x5a, 0x4b, 0x6f, 0x1b, 0xc9,
0x11, 0x36, 0x1f, 0x12, 0xa9, 0xa2, 0x44, 0x49, 0x2d, 0x39, 0x21, 0x68, 0x47, 0x32, 0xc6, 0xb1,
0x4d, 0x07, 0x0e, 0x95, 0xa5, 0xb3, 0x88, 0xb3, 0x41, 0x1e, 0x2b, 0xd1, 0xeb, 0x70, 0xb5, 0x2b,
0x1b, 0x23, 0x6b, 0x91, 0x1b, 0x41, 0x91, 0x2d, 0x65, 0x4c, 0x8a, 0xc3, 0xcc, 0x0c, 0xb5, 0x2b,

View file

@ -3781,7 +3781,7 @@ func init() { proto.RegisterFile("dispatcher.proto", fileDescriptorDispatcher) }
var fileDescriptorDispatcher = []byte{
// 983 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x6f, 0x1b, 0x45,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xf7, 0x38, 0xce, 0x26, 0x7e, 0x4e, 0x82, 0x19, 0xaa, 0xb0, 0xac, 0x54, 0xc7, 0x6c, 0x68,
0x14, 0xa9, 0x61, 0x53, 0xcc, 0x9f, 0x0b, 0x51, 0x20, 0x8e, 0x2d, 0xc5, 0x6a, 0x93, 0x46, 0x13,
0xb7, 0x3d, 0x5a, 0x6b, 0xef, 0x74, 0xb3, 0x38, 0xde, 0x59, 0x76, 0xc6, 0x2d, 0x3e, 0x20, 0x71,

View file

@ -700,7 +700,7 @@ func init() { proto.RegisterFile("health.proto", fileDescriptorHealth) }
var fileDescriptorHealth = []byte{
// 287 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xc9, 0x48, 0x4d, 0xcc,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc9, 0x48, 0x4d, 0xcc,
0x29, 0xc9, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x4a, 0xc9, 0x4f, 0xce, 0x4e, 0x2d,
0xd2, 0x2b, 0x2e, 0x4f, 0x2c, 0xca, 0xcd, 0xce, 0x2c, 0xd1, 0x2b, 0x33, 0x94, 0x12, 0x49, 0xcf,
0x4f, 0xcf, 0x07, 0x4b, 0xeb, 0x83, 0x58, 0x10, 0x95, 0x52, 0xc2, 0x05, 0x39, 0xa5, 0xe9, 0x99,

View file

@ -3366,7 +3366,7 @@ func init() { proto.RegisterFile("logbroker.proto", fileDescriptorLogbroker) }
var fileDescriptorLogbroker = []byte{
// 940 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x95, 0xcf, 0x6f, 0x1b, 0x45,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x95, 0xcf, 0x6f, 0x1b, 0x45,
0x14, 0xc7, 0x33, 0xeb, 0xc4, 0x3f, 0x9e, 0x9b, 0xc4, 0x1d, 0xa7, 0x91, 0x65, 0xa8, 0x6d, 0x6d,
0xa5, 0x62, 0x45, 0xc5, 0x6e, 0x8d, 0x50, 0x91, 0x2a, 0x21, 0x6a, 0x5c, 0x21, 0x0b, 0x37, 0x41,
0x63, 0x47, 0x70, 0x8b, 0xd6, 0xde, 0xe9, 0xb2, 0xf2, 0x7a, 0xc7, 0xec, 0x8c, 0x13, 0x90, 0x38,

View file

@ -7470,7 +7470,7 @@ func init() { proto.RegisterFile("objects.proto", fileDescriptorObjects) }
var fileDescriptorObjects = []byte{
// 1405 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x57, 0xc1, 0x6f, 0x1b, 0x45,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xc1, 0x6f, 0x1b, 0x45,
0x17, 0xef, 0xda, 0x1b, 0xdb, 0xfb, 0x9c, 0x58, 0xf9, 0xa6, 0xf9, 0xf2, 0x6d, 0xf3, 0x05, 0x3b,
0xb8, 0x02, 0x55, 0xa8, 0x72, 0x4a, 0x29, 0x28, 0x0d, 0x14, 0x6a, 0x27, 0x11, 0xb5, 0x4a, 0x69,
0x34, 0x2d, 0x2d, 0x37, 0x33, 0xd9, 0x9d, 0xba, 0x8b, 0xd7, 0x3b, 0xab, 0x9d, 0xb1, 0x8b, 0x6f,

View file

@ -3587,7 +3587,7 @@ func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
var fileDescriptorRaft = []byte{
// 949 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x96, 0x4f, 0x6f, 0x1b, 0x45,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0x4f, 0x6f, 0x1b, 0x45,
0x18, 0xc6, 0x77, 0xed, 0xad, 0x9d, 0xbc, 0x6e, 0xe2, 0x68, 0x42, 0xc2, 0x76, 0x29, 0x8e, 0xbb,
0x45, 0xaa, 0x5b, 0x91, 0xb5, 0x30, 0x48, 0xa0, 0x42, 0x0f, 0xb1, 0x63, 0xc9, 0xa6, 0xad, 0x53,
0x6d, 0x12, 0xe8, 0x2d, 0xac, 0x77, 0x27, 0xee, 0x62, 0x7b, 0xc7, 0xcc, 0x8c, 0x1d, 0xb8, 0xa0,

View file

@ -1065,7 +1065,7 @@ func init() { proto.RegisterFile("resource.proto", fileDescriptorResource) }
var fileDescriptorResource = []byte{
// 368 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x4a, 0x2d, 0xce,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x4a, 0x2d, 0xce,
0x2f, 0x2d, 0x4a, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x4a, 0xc9, 0x4f, 0xce,
0x4e, 0x2d, 0xd2, 0x2b, 0x2e, 0x4f, 0x2c, 0xca, 0xcd, 0xce, 0x2c, 0xd1, 0x2b, 0x33, 0x94, 0xe2,
0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0x86, 0x28, 0x90, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x33, 0xf5,

View file

@ -1321,7 +1321,7 @@ func init() { proto.RegisterFile("snapshot.proto", fileDescriptorSnapshot) }
var fileDescriptorSnapshot = []byte{
// 466 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0x3f, 0x6f, 0x13, 0x4d,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x93, 0x3f, 0x6f, 0x13, 0x4d,
0x10, 0x87, 0xbd, 0xfe, 0x77, 0xce, 0x44, 0xc9, 0xfb, 0xb2, 0xa2, 0x58, 0x19, 0x38, 0x8c, 0xa1,
0x70, 0x75, 0x80, 0x41, 0x02, 0x21, 0x85, 0x22, 0x11, 0x05, 0x05, 0x29, 0xd6, 0x28, 0xa2, 0x3d,
0x9f, 0xc7, 0xce, 0x71, 0xf8, 0xd6, 0xda, 0xd9, 0x38, 0x94, 0xf0, 0xed, 0x5c, 0x52, 0x52, 0x21,

View file

@ -5884,7 +5884,7 @@ func init() { proto.RegisterFile("specs.proto", fileDescriptorSpecs) }
var fileDescriptorSpecs = []byte{
// 1824 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x73, 0x1b, 0x49,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x73, 0x1b, 0x49,
0x15, 0xb7, 0x6c, 0x59, 0x7f, 0xde, 0xc8, 0x89, 0xd2, 0x24, 0x61, 0xac, 0xb0, 0xb2, 0xa2, 0x0d,
0xc1, 0xcb, 0x16, 0x72, 0x61, 0xa8, 0x25, 0x4b, 0x58, 0x40, 0xb2, 0x84, 0x63, 0x8c, 0x1d, 0x55,
0xdb, 0x1b, 0xc8, 0x49, 0xd5, 0x9e, 0x69, 0x4b, 0x53, 0x1e, 0x75, 0x0f, 0xdd, 0x3d, 0xda, 0xd2,

View file

@ -16084,7 +16084,7 @@ func init() { proto.RegisterFile("types.proto", fileDescriptorTypes) }
var fileDescriptorTypes = []byte{
// 4658 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x5a, 0x4d, 0x6c, 0x23, 0x47,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x5a, 0x4d, 0x6c, 0x23, 0x47,
0x76, 0x16, 0x7f, 0x45, 0x3e, 0x52, 0x52, 0x4f, 0x8d, 0x3c, 0xd6, 0xd0, 0x63, 0x49, 0x6e, 0x7b,
0xd6, 0x3f, 0xeb, 0xd0, 0xf3, 0x63, 0x1b, 0x63, 0x3b, 0x6b, 0x9b, 0x7f, 0x1a, 0x71, 0x47, 0x22,
0x89, 0x22, 0x35, 0xb3, 0x3e, 0x24, 0x8d, 0x56, 0x77, 0x89, 0x6a, 0xab, 0xd9, 0xc5, 0x74, 0x17,

View file

@ -4523,7 +4523,7 @@ func init() { proto.RegisterFile("watch.proto", fileDescriptorWatch) }
var fileDescriptorWatch = []byte{
// 1155 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x96, 0xbb, 0x73, 0x1b, 0xd5,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x96, 0xbb, 0x73, 0x1b, 0xd5,
0x17, 0xc7, 0xb5, 0x8a, 0xbc, 0x92, 0x8e, 0xac, 0xc4, 0x73, 0xed, 0x24, 0xfb, 0xd3, 0x2f, 0x48,
0x42, 0x0c, 0xe0, 0x21, 0x41, 0x01, 0x13, 0xc2, 0x00, 0x81, 0x19, 0x4b, 0x16, 0x23, 0x91, 0xb1,
0xec, 0xb9, 0xb6, 0xe3, 0x52, 0xb3, 0xde, 0x3d, 0x56, 0x16, 0xed, 0x43, 0xdc, 0x5d, 0xc9, 0x71,

View file

@ -3,7 +3,6 @@ package controlapi
import (
"errors"
"reflect"
"strconv"
"strings"
"time"
@ -433,10 +432,9 @@ func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error
if network == nil {
continue
}
if network.Spec.Internal {
if allocator.IsIngressNetwork(network) {
return grpc.Errorf(codes.InvalidArgument,
"Service cannot be explicitly attached to %q network which is a swarm internal network",
network.Spec.Annotations.Name)
"Service cannot be explicitly attached to the ingress network %q", network.Spec.Annotations.Name)
}
}
return nil
@ -490,18 +488,32 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return nil
}
pcToString := func(pc *api.PortConfig) string {
port := strconv.FormatUint(uint64(pc.PublishedPort), 10)
return port + "/" + pc.Protocol.String()
type portSpec struct {
protocol api.PortConfig_Protocol
publishedPort uint32
}
reqPorts := make(map[string]bool)
for _, pc := range spec.Endpoint.Ports {
if pc.PublishedPort > 0 {
reqPorts[pcToString(pc)] = true
pcToStruct := func(pc *api.PortConfig) portSpec {
return portSpec{
protocol: pc.Protocol,
publishedPort: pc.PublishedPort,
}
}
if len(reqPorts) == 0 {
ingressPorts := make(map[portSpec]struct{})
hostModePorts := make(map[portSpec]struct{})
for _, pc := range spec.Endpoint.Ports {
if pc.PublishedPort == 0 {
continue
}
switch pc.PublishMode {
case api.PublishModeIngress:
ingressPorts[pcToStruct(pc)] = struct{}{}
case api.PublishModeHost:
hostModePorts[pcToStruct(pc)] = struct{}{}
}
}
if len(ingressPorts) == 0 && len(hostModePorts) == 0 {
return nil
}
@ -517,6 +529,31 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return err
}
isPortInUse := func(pc *api.PortConfig, service *api.Service) error {
if pc.PublishedPort == 0 {
return nil
}
switch pc.PublishMode {
case api.PublishModeHost:
if _, ok := ingressPorts[pcToStruct(pc)]; ok {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s) as a host-published port", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
// Multiple services with same port in host publish mode can
// coexist - this is handled by the scheduler.
return nil
case api.PublishModeIngress:
_, ingressConflict := ingressPorts[pcToStruct(pc)]
_, hostModeConflict := hostModePorts[pcToStruct(pc)]
if ingressConflict || hostModeConflict {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s) as an ingress port", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
}
return nil
}
for _, service := range services {
// If service ID is the same (and not "") then this is an update
if serviceID != "" && serviceID == service.ID {
@ -524,15 +561,15 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
}
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
if err := isPortInUse(pc, service); err != nil {
return err
}
}
}
if service.Endpoint != nil {
for _, pc := range service.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
if err := isPortInUse(pc, service); err != nil {
return err
}
}
}

View file

@ -529,6 +529,8 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
return nil, err
}
validTaskUpdates := make([]*api.UpdateTaskStatusRequest_TaskStatusUpdate, 0, len(r.Updates))
// Validate task updates
for _, u := range r.Updates {
if u.Status == nil {
@ -541,7 +543,8 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
t = store.GetTask(tx, u.TaskID)
})
if t == nil {
log.WithField("task.id", u.TaskID).Warn("cannot find target task in store")
// Task may have been deleted
log.WithField("task.id", u.TaskID).Debug("cannot find target task in store")
continue
}
@ -550,14 +553,13 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
log.WithField("task.id", u.TaskID).Error(err)
return nil, err
}
validTaskUpdates = append(validTaskUpdates, u)
}
d.taskUpdatesLock.Lock()
// Enqueue task updates
for _, u := range r.Updates {
if u.Status == nil {
continue
}
for _, u := range validTaskUpdates {
d.taskUpdates[u.TaskID] = u.Status
}
@ -606,7 +608,8 @@ func (d *Dispatcher) processUpdates(ctx context.Context) {
logger := log.WithField("task.id", taskID)
task := store.GetTask(tx, taskID)
if task == nil {
logger.Errorf("task unavailable")
// Task may have been deleted
logger.Debug("cannot find target task in store")
return nil
}

View file

@ -28,6 +28,7 @@ import (
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
"github.com/docker/swarmkit/manager/logbroker"
"github.com/docker/swarmkit/manager/metrics"
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/replicated"
@ -123,6 +124,7 @@ type Config struct {
type Manager struct {
config Config
collector *metrics.Collector
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
@ -214,6 +216,7 @@ func New(config *Config) (*Manager, error) {
m := &Manager{
config: *config,
collector: metrics.NewCollector(raftNode.MemoryStore()),
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
logbroker: logbroker.New(raftNode.MemoryStore()),
@ -503,6 +506,13 @@ func (m *Manager) Run(parent context.Context) error {
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
// Start metrics collection.
go func(collector *metrics.Collector) {
if err := collector.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("collector failed with an error")
}
}(m.collector)
close(m.started)
go func() {
@ -579,6 +589,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {
m.raftNode.Cancel()
m.collector.Stop()
m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop()

View file

@ -0,0 +1,104 @@
package metrics
import (
"context"
"strings"
metrics "github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/store"
)
var (
ns = metrics.NewNamespace("swarm", "manager", nil)
nodesMetric metrics.LabeledGauge
)
func init() {
nodesMetric = ns.NewLabeledGauge("nodes", "The number of nodes", "", "state")
for _, state := range api.NodeStatus_State_name {
nodesMetric.WithValues(strings.ToLower(state)).Set(0)
}
metrics.Register(ns)
}
// Collector collects swarmkit metrics
type Collector struct {
store *store.MemoryStore
// stopChan signals to the state machine to stop running.
stopChan chan struct{}
// doneChan is closed when the state machine terminates.
doneChan chan struct{}
}
// NewCollector creates a new metrics collector
func NewCollector(store *store.MemoryStore) *Collector {
return &Collector{
store: store,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
}
func (c *Collector) updateNodeState(prevNode, newNode *api.Node) {
// Skip updates if nothing changed.
if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State {
return
}
if prevNode != nil {
nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1)
}
if newNode != nil {
nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1)
}
}
// Run contains the collector event loop
func (c *Collector) Run(ctx context.Context) error {
defer close(c.doneChan)
watcher, cancel, err := store.ViewAndWatch(c.store, func(readTx store.ReadTx) error {
nodes, err := store.FindNodes(readTx, store.All)
if err != nil {
return err
}
for _, node := range nodes {
c.updateNodeState(nil, node)
}
return nil
})
if err != nil {
return err
}
defer cancel()
for {
select {
case event := <-watcher:
switch v := event.(type) {
case api.EventCreateNode:
c.updateNodeState(nil, v.Node)
case api.EventUpdateNode:
c.updateNodeState(v.OldNode, v.Node)
case api.EventDeleteNode:
c.updateNodeState(v.Node, nil)
}
case <-c.stopChan:
return nil
}
}
}
// Stop stops the collector.
func (c *Collector) Stop() {
close(c.stopChan)
<-c.doneChan
// Clean the metrics on exit.
for _, state := range api.NodeStatus_State_name {
nodesMetric.WithValues(strings.ToLower(state)).Set(0)
}
}

View file

@ -260,3 +260,44 @@ func (f *PlatformFilter) Explain(nodes int) string {
}
return fmt.Sprintf("unsupported platform on %d nodes", nodes)
}
// HostPortFilter checks that the node has a specific port available.
type HostPortFilter struct {
t *api.Task
}
// SetTask returns true when the filter is enabled for a given task.
func (f *HostPortFilter) SetTask(t *api.Task) bool {
if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
f.t = t
return true
}
}
}
return false
}
// Check returns true if the task can be scheduled into the given node.
func (f *HostPortFilter) Check(n *NodeInfo) bool {
for _, port := range f.t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
if _, ok := n.usedHostPorts[portSpec]; ok {
return false
}
}
}
return true
}
// Explain returns an explanation of a failure.
func (f *HostPortFilter) Explain(nodes int) string {
if nodes == 1 {
return "host-mode port already in use on 1 node"
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}

View file

@ -8,6 +8,12 @@ import (
"golang.org/x/net/context"
)
// hostPortSpec specifies a used host port.
type hostPortSpec struct {
protocol api.PortConfig_Protocol
publishedPort uint32
}
// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
@ -15,6 +21,7 @@ type NodeInfo struct {
ActiveTasksCount int
ActiveTasksCountByService map[string]int
AvailableResources api.Resources
usedHostPorts map[hostPortSpec]struct{}
// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
@ -30,6 +37,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
Tasks: make(map[string]*api.Task),
ActiveTasksCountByService: make(map[string]int),
AvailableResources: availableResources,
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[string][]time.Time),
}
@ -57,6 +65,15 @@ func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs
if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
delete(nodeInfo.usedHostPorts, portSpec)
}
}
}
return true
}
@ -84,6 +101,15 @@ func (nodeInfo *NodeInfo) addTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs
if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
nodeInfo.usedHostPorts[portSpec] = struct{}{}
}
}
}
if t.DesiredState <= api.TaskStateRunning {
nodeInfo.ActiveTasksCount++
nodeInfo.ActiveTasksCountByService[t.ServiceID]++

View file

@ -14,6 +14,7 @@ var (
&PluginFilter{},
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
}
)

View file

@ -1370,10 +1370,6 @@ func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
// LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
// if current machine is leader.
func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
return nil, errLostQuorum
}
cc, err := n.getLeaderConn()
if err == nil {
return cc, nil
@ -1381,6 +1377,10 @@ func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
if err == raftselector.ErrIsLeader {
return nil, err
}
if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
return nil, errLostQuorum
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {

View file

@ -16,6 +16,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/docker/docker/pkg/plugingetter"
metrics "github.com/docker/go-metrics"
"github.com/docker/swarmkit/agent"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
@ -41,6 +42,9 @@ const (
)
var (
nodeInfo metrics.LabeledGauge
nodeManager metrics.Gauge
errNodeStarted = errors.New("node: already started")
errNodeNotStarted = errors.New("node: not started")
certDirectory = "certificates"
@ -49,6 +53,16 @@ var (
ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
)
func init() {
ns := metrics.NewNamespace("swarm", "node", nil)
nodeInfo = ns.NewLabeledGauge("info", "Information related to the swarm", "",
"swarm_id",
"node_id",
)
nodeManager = ns.NewGauge("manager", "Whether this node is a manager or not", "")
metrics.Register(ns)
}
// Config provides values for a Node.
type Config struct {
// Hostname is the name of host for agent instance.
@ -346,6 +360,17 @@ func (n *Node) run(ctx context.Context) (err error) {
var wg sync.WaitGroup
wg.Add(3)
nodeInfo.WithValues(
securityConfig.ClientTLSCreds.Organization(),
securityConfig.ClientTLSCreds.NodeID(),
).Set(1)
if n.currentRole() == api.NodeRoleManager {
nodeManager.Set(1)
} else {
nodeManager.Set(0)
}
updates := renewer.Start(ctx)
go func() {
for certUpdate := range updates {
@ -357,6 +382,13 @@ func (n *Node) run(ctx context.Context) (err error) {
n.role = certUpdate.Role
n.roleCond.Broadcast()
n.Unlock()
// Export the new role.
if n.currentRole() == api.NodeRoleManager {
nodeManager.Set(1)
} else {
nodeManager.Set(0)
}
}
wg.Done()

View file

@ -1144,7 +1144,7 @@ func init() { proto.RegisterFile("plugin.proto", fileDescriptorPlugin) }
var fileDescriptorPlugin = []byte{
// 551 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x52, 0xc1, 0x6e, 0xd3, 0x40,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xad, 0xd3, 0x36, 0x4d, 0xc6, 0x69, 0x29, 0x2b, 0x54, 0xad, 0x7a, 0xb0, 0xab, 0x46, 0x42,
0x41, 0x42, 0xa9, 0xd4, 0x63, 0x6e, 0x94, 0x5c, 0x22, 0x01, 0x45, 0x0e, 0x12, 0x37, 0x2c, 0xd7,
0x3b, 0x4d, 0x96, 0x3a, 0x5e, 0x6b, 0x77, 0x4d, 0x0b, 0x27, 0x7e, 0x80, 0x0f, 0xe0, 0xca, 0xd7,

View file

@ -6,6 +6,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.0
# metrics
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
github.com/docker/go-metrics d466d4f6fd960e01820085bd7e1a24426ee7ef18
# etcd/raft
github.com/coreos/etcd ea5389a79f40206170582c1ea076191b8622cb8e https://github.com/aaronlehmann/etcd # for https://github.com/coreos/etcd/pull/7830