Merge pull request #29433 from aaronlehmann/swarm-plugins-1.13

[1.13] Support v2 plugins in swarm mode
This commit is contained in:
Victor Vieux 2016-12-16 12:47:52 -08:00 committed by GitHub
commit 7d4318c83c
21 changed files with 374 additions and 249 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types/network"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/docker/plugin"
networktypes "github.com/docker/libnetwork/types"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/agent/secrets"
@ -45,12 +46,39 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
}
}
// add v1 plugins
addPlugins("Volume", info.Plugins.Volume)
// Add builtin driver "overlay" (the only builtin multi-host driver) to
// the plugin list by default.
addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
addPlugins("Authorization", info.Plugins.Authorization)
// add v2 plugins
v2Plugins, err := plugin.GetManager().List()
if err == nil {
for _, plgn := range v2Plugins {
for _, typ := range plgn.Config.Interface.Types {
if typ.Prefix != "docker" || !plgn.Enabled {
continue
}
plgnTyp := typ.Capability
if typ.Capability == "volumedriver" {
plgnTyp = "Volume"
} else if typ.Capability == "networkdriver" {
plgnTyp = "Network"
}
plgnName := plgn.Name
if plgn.Tag != "" {
plgnName += ":" + plgn.Tag
}
plugins[api.PluginDescription{
Type: plgnTyp,
Name: plgnName,
}] = struct{}{}
}
}
}
pluginFields := make([]api.PluginDescription, 0, len(plugins))
for k := range plugins {
pluginFields = append(pluginFields, k)

View file

@ -139,19 +139,25 @@ func (d *SwarmDaemon) getServiceTasks(c *check.C, service string) []swarm.Task {
return tasks
}
func (d *SwarmDaemon) checkServiceRunningTasks(service string) func(*check.C) (interface{}, check.CommentInterface) {
func (d *SwarmDaemon) checkServiceTasksInState(service string, state swarm.TaskState, message string) func(*check.C) (interface{}, check.CommentInterface) {
return func(c *check.C) (interface{}, check.CommentInterface) {
tasks := d.getServiceTasks(c, service)
var runningCount int
var count int
for _, task := range tasks {
if task.Status.State == swarm.TaskStateRunning {
runningCount++
if task.Status.State == state {
if message == "" || strings.Contains(task.Status.Message, message) {
count++
}
}
}
return runningCount, nil
return count, nil
}
}
func (d *SwarmDaemon) checkServiceRunningTasks(service string) func(*check.C) (interface{}, check.CommentInterface) {
return d.checkServiceTasksInState(service, swarm.TaskStateRunning, "")
}
func (d *SwarmDaemon) checkServiceUpdateState(service string) func(*check.C) (interface{}, check.CommentInterface) {
return func(c *check.C) (interface{}, check.CommentInterface) {
service := d.getService(c, service)

View file

@ -279,6 +279,22 @@ func (s *DockerExternalVolumeSuite) TearDownSuite(c *check.C) {
c.Assert(err, checker.IsNil)
}
func (s *DockerExternalVolumeSuite) TestVolumeCLICreateOptionConflict(c *check.C) {
dockerCmd(c, "volume", "create", "test")
out, _, err := dockerCmdWithError("volume", "create", "test", "--driver", volumePluginName)
c.Assert(err, check.NotNil, check.Commentf("volume create exception name already in use with another driver"))
c.Assert(out, checker.Contains, "A volume named test already exists")
out, _ = dockerCmd(c, "volume", "inspect", "--format={{ .Driver }}", "test")
_, _, err = dockerCmdWithError("volume", "create", "test", "--driver", strings.TrimSpace(out))
c.Assert(err, check.IsNil)
// make sure hidden --name option conflicts with positional arg name
out, _, err = dockerCmdWithError("volume", "create", "--name", "test2", "test2")
c.Assert(err, check.NotNil, check.Commentf("Conflicting options: either specify --name or provide positional arg, not both"))
}
func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverNamed(c *check.C) {
err := s.d.StartWithBusybox()
c.Assert(err, checker.IsNil)

View file

@ -0,0 +1,52 @@
// +build !windows
package main
import (
"encoding/json"
"strings"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/pkg/integration/checker"
"github.com/go-check/check"
)
func (s *DockerSwarmSuite) TestSwarmVolumePlugin(c *check.C) {
d := s.AddDaemon(c, true, true)
out, err := d.Cmd("service", "create", "--mount", "type=volume,source=my-volume,destination=/foo,volume-driver=customvolumedriver", "--name", "top", "busybox", "top")
c.Assert(err, checker.IsNil, check.Commentf(out))
// Make sure task stays pending before plugin is available
waitAndAssert(c, defaultReconciliationTimeout, d.checkServiceTasksInState("top", swarm.TaskStatePending, "missing plugin on 1 node"), checker.Equals, 1)
plugin := newVolumePlugin(c, "customvolumedriver")
defer plugin.Close()
// create a dummy volume to trigger lazy loading of the plugin
out, err = d.Cmd("volume", "create", "-d", "customvolumedriver", "hello")
// TODO(aaronl): It will take about 15 seconds for swarm to realize the
// plugin was loaded. Switching the test over to plugin v2 would avoid
// this long delay.
// make sure task has been deployed.
waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, 1)
out, err = d.Cmd("ps", "-q")
c.Assert(err, checker.IsNil)
containerID := strings.TrimSpace(out)
out, err = d.Cmd("inspect", "-f", "{{json .Mounts}}", containerID)
c.Assert(err, checker.IsNil)
var mounts []struct {
Name string
Driver string
}
c.Assert(json.NewDecoder(strings.NewReader(out)).Decode(&mounts), checker.IsNil)
c.Assert(len(mounts), checker.Equals, 1, check.Commentf(out))
c.Assert(mounts[0].Name, checker.Equals, "my-volume")
c.Assert(mounts[0].Driver, checker.Equals, "customvolumedriver")
}

View file

@ -29,21 +29,6 @@ func (s *DockerSuite) TestVolumeCLICreate(c *check.C) {
c.Assert(name, check.Equals, "test2")
}
func (s *DockerSuite) TestVolumeCLICreateOptionConflict(c *check.C) {
dockerCmd(c, "volume", "create", "test")
out, _, err := dockerCmdWithError("volume", "create", "test", "--driver", "nosuchdriver")
c.Assert(err, check.NotNil, check.Commentf("volume create exception name already in use with another driver"))
c.Assert(out, checker.Contains, "A volume named test already exists")
out, _ = dockerCmd(c, "volume", "inspect", "--format={{ .Driver }}", "test")
_, _, err = dockerCmdWithError("volume", "create", "test", "--driver", strings.TrimSpace(out))
c.Assert(err, check.IsNil)
// make sure hidden --name option conflicts with positional arg name
out, _, err = dockerCmdWithError("volume", "create", "--name", "test2", "test2")
c.Assert(err, check.NotNil, check.Commentf("Conflicting options: either specify --name or provide positional arg, not both"))
}
func (s *DockerSuite) TestVolumeCLIInspect(c *check.C) {
c.Assert(
exec.Command(dockerBinary, "volume", "inspect", "doesntexist").Run(),

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit b5f07ce49c66d2f5feee83998b23d4c905b78155
github.com/docker/swarmkit 99adeb1c6b33cebc81c31dd05b163080033062f2
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -37,6 +37,8 @@ type Agent struct {
started chan struct{}
startOnce sync.Once // start only once
ready chan struct{}
leaving chan struct{}
leaveOnce sync.Once
stopped chan struct{} // requests shutdown
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
@ -53,6 +55,7 @@ func New(config *Config) (*Agent, error) {
config: config,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
leaving: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
@ -78,6 +81,37 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}
// Leave instructs the agent to leave the cluster. This method will shutdown
// assignment processing and remove all assignments from the node.
// Leave blocks until worker has finished closing all task managers or agent
// is closed.
func (a *Agent) Leave(ctx context.Context) error {
select {
case <-a.started:
default:
return errAgentNotStarted
}
a.leaveOnce.Do(func() {
close(a.leaving)
})
// agent could be closed while Leave is in progress
var err error
ch := make(chan struct{})
go func() {
err = a.worker.Wait(ctx)
close(ch)
}()
select {
case <-ch:
return err
case <-a.closed:
return ErrClosed
}
}
// Stop shuts down the agent, blocking until full shutdown. If the agent is not
// started, Stop will block until the agent has fully shutdown.
func (a *Agent) Stop(ctx context.Context) error {
@ -151,6 +185,7 @@ func (a *Agent) run(ctx context.Context) {
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
leaving = a.leaving
subscriptions = map[string]context.CancelFunc{}
)
@ -171,7 +206,21 @@ func (a *Agent) run(ctx context.Context) {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case <-leaving:
leaving = nil
// TODO(stevvooe): Signal to the manager that the node is leaving.
// when leaving we remove all assignments.
if err := a.worker.Assign(ctx, nil); err != nil {
log.G(ctx).WithError(err).Error("failed removing all assignments")
}
case msg := <-session.assignments:
// if we have left, accept no more assignments
if leaving == nil {
continue
}
switch msg.Type {
case api.AssignmentsMessage_COMPLETE:
// Need to assign secrets before tasks, because tasks might depend on new secrets

View file

@ -193,12 +193,11 @@ func (_m *MockLogPublisherProvider) EXPECT() *_MockLogPublisherProviderRecorder
return _m.recorder
}
func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error) {
func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error) {
ret := _m.ctrl.Call(_m, "Publisher", ctx, subscriptionID)
ret0, _ := ret[0].(LogPublisher)
ret1, _ := ret[1].(func())
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockLogPublisherProviderRecorder) Publisher(arg0, arg1 interface{}) *gomock.Call {

View file

@ -1,6 +1,7 @@
package agent
import (
"sync"
"time"
"github.com/docker/swarmkit/agent/exec"
@ -19,8 +20,10 @@ type taskManager struct {
updateq chan *api.Task
shutdown chan struct{}
closed chan struct{}
shutdown chan struct{}
shutdownOnce sync.Once
closed chan struct{}
closeOnce sync.Once
}
func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
@ -48,20 +51,15 @@ func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
}
}
// Close shuts down the task manager, blocking until it is stopped.
// Close shuts down the task manager, blocking until it is closed.
func (tm *taskManager) Close() error {
select {
case <-tm.closed:
return nil
case <-tm.shutdown:
default:
tm.shutdownOnce.Do(func() {
close(tm.shutdown)
}
})
select {
case <-tm.closed:
return nil
}
<-tm.closed
return nil
}
func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
@ -106,7 +104,8 @@ func (tm *taskManager) run(ctx context.Context) {
// always check for shutdown before running.
select {
case <-tm.shutdown:
continue // ignore run request and handle shutdown
shutdown = tm.shutdown // a little questionable
continue // ignore run request and handle shutdown
case <-tm.closed:
continue
default:
@ -230,25 +229,19 @@ func (tm *taskManager) run(ctx context.Context) {
continue // wait until operation actually exits.
}
// TODO(stevvooe): This should be left for the repear.
// make an attempt at removing. this is best effort. any errors will be
// retried by the reaper later.
if err := tm.ctlr.Remove(ctx); err != nil {
log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
}
if err := tm.ctlr.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing controller")
}
// disable everything, and prepare for closing.
statusq = nil
errs = nil
shutdown = nil
close(tm.closed)
tm.closeOnce.Do(func() {
close(tm.closed)
})
case <-tm.closed:
return
case <-ctx.Done():
tm.closeOnce.Do(func() {
close(tm.closed)
})
return
}
}

View file

@ -40,6 +40,9 @@ type Worker interface {
// Subscribe to log messages matching the subscription.
Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
// Wait blocks until all task managers have closed
Wait(ctx context.Context) error
}
// statusReporterKey protects removal map from panic.
@ -57,6 +60,9 @@ type worker struct {
taskManagers map[string]*taskManager
mu sync.RWMutex
closed bool
closers sync.WaitGroup // keeps track of active closers
}
func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
@ -106,6 +112,10 @@ func (w *worker) Init(ctx context.Context) error {
// Close performs worker cleanup when no longer needed.
func (w *worker) Close() {
w.mu.Lock()
w.closed = true
w.mu.Unlock()
w.taskevents.Close()
}
@ -118,6 +128,10 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return ErrClosed
}
log.G(ctx).WithFields(logrus.Fields{
"len(assignments)": len(assignments),
}).Debug("(*worker).Assign")
@ -140,6 +154,10 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return ErrClosed
}
log.G(ctx).WithFields(logrus.Fields{
"len(assignments)": len(assignments),
}).Debug("(*worker).Update")
@ -222,10 +240,22 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig
}
closeManager := func(tm *taskManager) {
// when a task is no longer assigned, we shutdown the task manager for
// it and leave cleanup to the sweeper.
if err := tm.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing task manager")
go func(tm *taskManager) {
defer w.closers.Done()
// when a task is no longer assigned, we shutdown the task manager
if err := tm.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing task manager")
}
}(tm)
// make an attempt at removing. this is best effort. any errors will be
// retried by the reaper later.
if err := tm.ctlr.Remove(ctx); err != nil {
log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
}
if err := tm.ctlr.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing controller")
}
}
@ -359,6 +389,8 @@ func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (
return nil, err
}
w.taskManagers[task.ID] = tm
// keep track of active tasks
w.closers.Add(1)
return tm, nil
}
@ -484,3 +516,18 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
}
}
}
func (w *worker) Wait(ctx context.Context) error {
ch := make(chan struct{})
go func() {
w.closers.Wait()
close(ch)
}()
select {
case <-ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View file

@ -43,9 +43,8 @@ func (*NodeCertificateStatusRequest) ProtoMessage() {}
func (*NodeCertificateStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorCa, []int{0} }
type NodeCertificateStatusResponse struct {
Status *IssuanceStatus `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate" json:"certificate,omitempty"`
RootCABundle []byte `protobuf:"bytes,3,opt,name=root_ca_bundle,json=rootCaBundle,proto3" json:"root_ca_bundle,omitempty"`
Status *IssuanceStatus `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate" json:"certificate,omitempty"`
}
func (m *NodeCertificateStatusResponse) Reset() { *m = NodeCertificateStatusResponse{} }
@ -182,9 +181,8 @@ func (m *NodeCertificateStatusResponse) Copy() *NodeCertificateStatusResponse {
}
o := &NodeCertificateStatusResponse{
Status: m.Status.Copy(),
Certificate: m.Certificate.Copy(),
RootCABundle: m.RootCABundle,
Status: m.Status.Copy(),
Certificate: m.Certificate.Copy(),
}
return o
@ -276,7 +274,7 @@ func (this *NodeCertificateStatusResponse) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s := make([]string, 0, 6)
s = append(s, "&api.NodeCertificateStatusResponse{")
if this.Status != nil {
s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n")
@ -284,7 +282,6 @@ func (this *NodeCertificateStatusResponse) GoString() string {
if this.Certificate != nil {
s = append(s, "Certificate: "+fmt.Sprintf("%#v", this.Certificate)+",\n")
}
s = append(s, "RootCABundle: "+fmt.Sprintf("%#v", this.RootCABundle)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -642,12 +639,6 @@ func (m *NodeCertificateStatusResponse) MarshalTo(data []byte) (int, error) {
}
i += n2
}
if len(m.RootCABundle) > 0 {
data[i] = 0x1a
i++
i = encodeVarintCa(data, i, uint64(len(m.RootCABundle)))
i += copy(data[i:], m.RootCABundle)
}
return i, nil
}
@ -1112,10 +1103,6 @@ func (m *NodeCertificateStatusResponse) Size() (n int) {
l = m.Certificate.Size()
n += 1 + l + sovCa(uint64(l))
}
l = len(m.RootCABundle)
if l > 0 {
n += 1 + l + sovCa(uint64(l))
}
return n
}
@ -1213,7 +1200,6 @@ func (this *NodeCertificateStatusResponse) String() string {
s := strings.Join([]string{`&NodeCertificateStatusResponse{`,
`Status:` + strings.Replace(fmt.Sprintf("%v", this.Status), "IssuanceStatus", "IssuanceStatus", 1) + `,`,
`Certificate:` + strings.Replace(fmt.Sprintf("%v", this.Certificate), "Certificate", "Certificate", 1) + `,`,
`RootCABundle:` + fmt.Sprintf("%v", this.RootCABundle) + `,`,
`}`,
}, "")
return s
@ -1462,37 +1448,6 @@ func (m *NodeCertificateStatusResponse) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field RootCABundle", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCa
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCa
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.RootCABundle = append(m.RootCABundle[:0], data[iNdEx:postIndex]...)
if m.RootCABundle == nil {
m.RootCABundle = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCa(data[iNdEx:])
@ -2141,44 +2096,42 @@ var (
func init() { proto.RegisterFile("ca.proto", fileDescriptorCa) }
var fileDescriptorCa = []byte{
// 624 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x54, 0xc1, 0x6e, 0xd3, 0x4a,
0x14, 0xed, 0xb8, 0x7d, 0x69, 0x7b, 0x93, 0x97, 0x56, 0xd3, 0x56, 0x0a, 0x69, 0xea, 0x54, 0x66,
0xd1, 0x6e, 0x48, 0xdb, 0x80, 0x58, 0xc0, 0x86, 0xda, 0x48, 0x55, 0x85, 0x40, 0x68, 0x2a, 0xd8,
0x46, 0xae, 0x7d, 0x09, 0x56, 0x12, 0x8f, 0xf1, 0x8c, 0x0b, 0xdd, 0x21, 0x51, 0xf1, 0x07, 0x08,
0xbe, 0x82, 0xef, 0xa8, 0x58, 0xb1, 0x42, 0xac, 0x22, 0xea, 0x0f, 0x40, 0x7c, 0x02, 0xf2, 0xd8,
0xa6, 0x69, 0xeb, 0x84, 0xb2, 0x8a, 0xe7, 0xce, 0x39, 0x67, 0xce, 0x3d, 0x73, 0x33, 0x30, 0xe7,
0xd8, 0xad, 0x20, 0xe4, 0x92, 0x53, 0xea, 0x72, 0xa7, 0x87, 0x61, 0x4b, 0xbc, 0xb6, 0xc3, 0x41,
0xcf, 0x93, 0xad, 0xa3, 0x9d, 0x7a, 0x59, 0x1e, 0x07, 0x28, 0x52, 0x40, 0xbd, 0x2c, 0x02, 0x74,
0xf2, 0xc5, 0x72, 0x97, 0x77, 0xb9, 0xfa, 0xdc, 0x4a, 0xbe, 0xb2, 0xea, 0x52, 0xd0, 0x8f, 0xba,
0x9e, 0xbf, 0x95, 0xfe, 0xa4, 0x45, 0xc3, 0x82, 0xc6, 0x13, 0xee, 0xa2, 0x85, 0xa1, 0xf4, 0x5e,
0x78, 0x8e, 0x2d, 0xf1, 0x40, 0xda, 0x32, 0x12, 0x0c, 0x5f, 0x45, 0x28, 0x24, 0xbd, 0x09, 0xb3,
0x3e, 0x77, 0xb1, 0xe3, 0xb9, 0x35, 0xb2, 0x4e, 0x36, 0xe7, 0x4d, 0x88, 0x87, 0xcd, 0x52, 0x42,
0xd9, 0x7f, 0xc8, 0x4a, 0xc9, 0xd6, 0xbe, 0x6b, 0x7c, 0x23, 0xb0, 0x36, 0x46, 0x45, 0x04, 0xdc,
0x17, 0x48, 0xef, 0x41, 0x49, 0xa8, 0x8a, 0x52, 0x29, 0xb7, 0x8d, 0xd6, 0xd5, 0x86, 0x5a, 0xfb,
0x42, 0x44, 0xb6, 0xef, 0xe4, 0xdc, 0x8c, 0x41, 0x77, 0xa1, 0xec, 0x9c, 0x0b, 0xd7, 0x34, 0x25,
0xd0, 0x2c, 0x12, 0x18, 0x39, 0x9f, 0x8d, 0x72, 0xe8, 0x5d, 0xa8, 0x86, 0x9c, 0xcb, 0x8e, 0x63,
0x77, 0x0e, 0x23, 0xdf, 0xed, 0x63, 0x6d, 0x7a, 0x9d, 0x6c, 0x56, 0xcc, 0xc5, 0x78, 0xd8, 0xac,
0x30, 0xce, 0xa5, 0xb5, 0x6b, 0xaa, 0x3a, 0xab, 0x24, 0x38, 0xcb, 0x4e, 0x57, 0xc6, 0x09, 0x81,
0xd5, 0xc4, 0x15, 0x5e, 0xea, 0x2e, 0x4f, 0xe7, 0x0e, 0xcc, 0x84, 0xbc, 0x8f, 0xaa, 0xa9, 0x6a,
0xbb, 0x51, 0xe4, 0x29, 0x61, 0x32, 0xde, 0x47, 0x53, 0xab, 0x11, 0xa6, 0xd0, 0xf4, 0x06, 0x4c,
0x3b, 0x22, 0x54, 0x8d, 0x54, 0xcc, 0xd9, 0x78, 0xd8, 0x9c, 0xb6, 0x0e, 0x18, 0x4b, 0x6a, 0x74,
0x19, 0xfe, 0x93, 0xbc, 0x87, 0xbe, 0xf2, 0x37, 0xcf, 0xd2, 0x85, 0xf1, 0x81, 0x40, 0xa3, 0xd8,
0x46, 0x16, 0xef, 0x75, 0x6e, 0x89, 0x3e, 0x85, 0x05, 0x05, 0x1a, 0xe0, 0xe0, 0x10, 0x43, 0xf1,
0xd2, 0x0b, 0x94, 0x85, 0x6a, 0x7b, 0x63, 0x9c, 0xef, 0x83, 0x00, 0x9d, 0xd6, 0xe3, 0x3f, 0x70,
0x56, 0x4d, 0xf8, 0xe7, 0x6b, 0x63, 0x0d, 0x56, 0xf7, 0x50, 0xa6, 0xf9, 0x5d, 0x4d, 0xc7, 0x78,
0x00, 0x8d, 0xe2, 0xed, 0xcc, 0xf5, 0xfa, 0xc5, 0x8b, 0x4d, 0x9c, 0x57, 0x2e, 0xdc, 0x9b, 0xb1,
0x02, 0x4b, 0x7b, 0x28, 0x9f, 0xf9, 0x7d, 0xee, 0xf4, 0x1e, 0xe1, 0x71, 0x2e, 0x1c, 0xc2, 0xf2,
0xc5, 0x72, 0x26, 0xb8, 0x06, 0x10, 0xa9, 0x62, 0xa7, 0x87, 0xc7, 0x99, 0xde, 0x7c, 0x94, 0xc3,
0xe8, 0x7d, 0x98, 0x3d, 0xc2, 0x50, 0x78, 0xdc, 0xcf, 0x86, 0x68, 0xb5, 0xa8, 0xf1, 0xe7, 0x29,
0xc4, 0x9c, 0x39, 0x1d, 0x36, 0xa7, 0x58, 0xce, 0x68, 0x9f, 0x68, 0xa0, 0x59, 0xbb, 0xf4, 0x1d,
0x51, 0x67, 0x5f, 0x69, 0x8a, 0x6e, 0x15, 0x69, 0x4d, 0x48, 0xa7, 0xbe, 0x7d, 0x7d, 0x42, 0xda,
0x9e, 0x31, 0xf7, 0xe5, 0xf3, 0xcf, 0x4f, 0x9a, 0xb6, 0x48, 0xe8, 0x1b, 0xa8, 0x8c, 0x06, 0x40,
0x37, 0xc6, 0x68, 0x5d, 0x4e, 0xae, 0xbe, 0xf9, 0x77, 0x60, 0x76, 0xd8, 0x8a, 0x3a, 0x6c, 0x01,
0xfe, 0x57, 0xc8, 0x5b, 0x03, 0xdb, 0xb7, 0xbb, 0x18, 0xb6, 0x3f, 0x6a, 0xa0, 0xe6, 0x2a, 0x8b,
0xa2, 0x68, 0x2a, 0x8b, 0xa3, 0x98, 0xf0, 0x37, 0x2a, 0x8e, 0x62, 0xd2, 0xc0, 0x8f, 0x44, 0xf1,
0x9e, 0xc0, 0x4a, 0xe1, 0xdb, 0x43, 0xb7, 0xc7, 0x8d, 0xf5, 0xb8, 0xc7, 0xae, 0xbe, 0xf3, 0x0f,
0x8c, 0xcb, 0x46, 0xcc, 0xc6, 0xe9, 0x99, 0x3e, 0xf5, 0xfd, 0x4c, 0x9f, 0xfa, 0x75, 0xa6, 0x93,
0xb7, 0xb1, 0x4e, 0x4e, 0x63, 0x9d, 0x7c, 0x8d, 0x75, 0xf2, 0x23, 0xd6, 0xc9, 0x61, 0x49, 0x3d,
0xb7, 0xb7, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x34, 0x4d, 0x5b, 0xfa, 0xd3, 0x05, 0x00, 0x00,
// 586 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x54, 0xcb, 0x6e, 0xd3, 0x40,
0x14, 0xcd, 0x38, 0x25, 0x69, 0x6f, 0x42, 0x8a, 0xa6, 0x89, 0x14, 0xf2, 0x70, 0x2a, 0xb3, 0x68,
0x37, 0x38, 0x6d, 0x60, 0x05, 0x1b, 0x92, 0x20, 0x55, 0x11, 0x02, 0x21, 0x47, 0xb0, 0xad, 0x5c,
0x67, 0x08, 0x56, 0x12, 0x8f, 0xf1, 0x8c, 0x0b, 0xd9, 0x21, 0x51, 0xf1, 0x07, 0x08, 0x56, 0x7c,
0x02, 0xdf, 0x11, 0xb1, 0x62, 0xc9, 0x2a, 0x22, 0xfe, 0x00, 0xc4, 0x27, 0x20, 0x8f, 0x6d, 0x9a,
0x87, 0x13, 0xda, 0x55, 0x3c, 0xd7, 0xe7, 0x9c, 0x7b, 0xee, 0xc9, 0xf5, 0xc0, 0xb6, 0xa1, 0xab,
0xb6, 0x43, 0x39, 0xc5, 0xb8, 0x47, 0x8d, 0x01, 0x71, 0x54, 0xf6, 0x56, 0x77, 0x46, 0x03, 0x93,
0xab, 0xe7, 0xc7, 0xa5, 0x0c, 0x1f, 0xdb, 0x84, 0x05, 0x80, 0x52, 0x86, 0xd9, 0xc4, 0x88, 0x0e,
0xf9, 0x3e, 0xed, 0x53, 0xf1, 0x58, 0xf7, 0x9f, 0xc2, 0xea, 0x9e, 0x3d, 0x74, 0xfb, 0xa6, 0x55,
0x0f, 0x7e, 0x82, 0xa2, 0xd2, 0x86, 0xca, 0x33, 0xda, 0x23, 0x6d, 0xe2, 0x70, 0xf3, 0x95, 0x69,
0xe8, 0x9c, 0x74, 0xb9, 0xce, 0x5d, 0xa6, 0x91, 0x37, 0x2e, 0x61, 0x1c, 0xdf, 0x81, 0xb4, 0x45,
0x7b, 0xe4, 0xd4, 0xec, 0x15, 0xd1, 0x3e, 0x3a, 0xdc, 0x69, 0x81, 0x37, 0xad, 0xa5, 0x7c, 0x4a,
0xe7, 0xb1, 0x96, 0xf2, 0x5f, 0x75, 0x7a, 0xca, 0x57, 0x04, 0xd5, 0x35, 0x2a, 0xcc, 0xa6, 0x16,
0x23, 0xf8, 0x01, 0xa4, 0x98, 0xa8, 0x08, 0x95, 0x4c, 0x43, 0x51, 0x57, 0x07, 0x52, 0x3b, 0x8c,
0xb9, 0xba, 0x65, 0x44, 0xdc, 0x90, 0x81, 0x9b, 0x90, 0x31, 0x2e, 0x85, 0x8b, 0x92, 0x10, 0xa8,
0xc5, 0x09, 0xcc, 0xf5, 0xd7, 0xe6, 0x39, 0xca, 0x05, 0x82, 0xb2, 0xaf, 0x4e, 0x96, 0x5c, 0x46,
0x53, 0xde, 0x87, 0x2d, 0x87, 0x0e, 0x89, 0x30, 0x97, 0x6b, 0x54, 0xe2, 0xb4, 0x7d, 0xa6, 0x46,
0x87, 0xa4, 0x25, 0x15, 0x91, 0x26, 0xd0, 0xf8, 0x36, 0x24, 0x0d, 0xe6, 0x08, 0x43, 0xd9, 0x56,
0xda, 0x9b, 0xd6, 0x92, 0xed, 0xae, 0xa6, 0xf9, 0x35, 0x9c, 0x87, 0x1b, 0x9c, 0x0e, 0x88, 0x55,
0x4c, 0xfa, 0xa1, 0x69, 0xc1, 0x41, 0xf9, 0x84, 0xa0, 0x12, 0x6f, 0x23, 0x8c, 0xe9, 0x2a, 0x69,
0xe3, 0xe7, 0xb0, 0x2b, 0x40, 0x23, 0x32, 0x3a, 0x23, 0x0e, 0x7b, 0x6d, 0xda, 0xc2, 0x42, 0xae,
0x71, 0xb0, 0xce, 0x77, 0xd7, 0x26, 0x86, 0xfa, 0xf4, 0x1f, 0x5c, 0xcb, 0xf9, 0xfc, 0xcb, 0xb3,
0x52, 0x85, 0xf2, 0x09, 0xe1, 0x1a, 0xa5, 0xbc, 0xdd, 0x5c, 0x4d, 0x47, 0x79, 0x04, 0x95, 0xf8,
0xd7, 0xa1, 0xeb, 0xfd, 0xc5, 0x3f, 0xc8, 0x77, 0x9e, 0x5d, 0xcc, 0xbf, 0x00, 0x7b, 0x27, 0x84,
0xbf, 0xb0, 0x86, 0xd4, 0x18, 0x3c, 0x21, 0xe3, 0x48, 0xd8, 0x81, 0xfc, 0x62, 0x39, 0x14, 0xac,
0x02, 0xb8, 0xa2, 0x78, 0x3a, 0x20, 0xe3, 0x50, 0x6f, 0xc7, 0x8d, 0x60, 0xf8, 0x21, 0xa4, 0xcf,
0x89, 0xc3, 0x4c, 0x6a, 0x85, 0xcb, 0x50, 0x8e, 0x1b, 0xfc, 0x65, 0x00, 0x69, 0x6d, 0x4d, 0xa6,
0xb5, 0x84, 0x16, 0x31, 0x1a, 0x17, 0x12, 0x48, 0xed, 0x26, 0xfe, 0x80, 0x44, 0xef, 0x95, 0xa1,
0x70, 0x3d, 0x4e, 0x6b, 0x43, 0x3a, 0xa5, 0xa3, 0xab, 0x13, 0x82, 0xf1, 0x94, 0xed, 0xef, 0xdf,
0x7e, 0x7f, 0x91, 0xa4, 0x5b, 0x08, 0xbf, 0x83, 0xec, 0x7c, 0x00, 0xf8, 0x60, 0x8d, 0xd6, 0x72,
0x72, 0xa5, 0xc3, 0xff, 0x03, 0xc3, 0x66, 0x05, 0xd1, 0x6c, 0x17, 0x6e, 0x0a, 0xe4, 0xdd, 0x91,
0x6e, 0xe9, 0x7d, 0xe2, 0x34, 0x3e, 0x4b, 0x20, 0xf6, 0x2a, 0x8c, 0x22, 0x6e, 0x2b, 0xe3, 0xa3,
0xd8, 0xf0, 0x19, 0xc5, 0x47, 0xb1, 0x69, 0xe1, 0xe7, 0xa2, 0xf8, 0x88, 0xa0, 0x10, 0x7b, 0x87,
0xe0, 0xa3, 0x75, 0x6b, 0xbd, 0xee, 0xd2, 0x2a, 0x1d, 0x5f, 0x83, 0xb1, 0x6c, 0xa4, 0x55, 0x99,
0xcc, 0xe4, 0xc4, 0xcf, 0x99, 0x9c, 0xf8, 0x33, 0x93, 0xd1, 0x7b, 0x4f, 0x46, 0x13, 0x4f, 0x46,
0x3f, 0x3c, 0x19, 0xfd, 0xf2, 0x64, 0x74, 0x96, 0x12, 0xd7, 0xe6, 0xbd, 0xbf, 0x01, 0x00, 0x00,
0xff, 0xff, 0xe7, 0x80, 0x3b, 0x00, 0x9b, 0x05, 0x00, 0x00,
}

View file

@ -36,7 +36,6 @@ message NodeCertificateStatusRequest {
message NodeCertificateStatusResponse {
IssuanceStatus status = 1;
Certificate certificate = 2;
bytes root_ca_bundle = 3 [(gogoproto.customname) = "RootCABundle"];
}
message IssueNodeCertificateRequest {

View file

@ -102,15 +102,13 @@ type RootCA struct {
// Key will only be used by the original manager to put the private
// key-material in raft, no signing operations depend on it.
Key []byte
// Cert includes the PEM encoded Certificate bundle for the Root CA
// Cert includes the PEM encoded Certificate for the Root CA
Cert []byte
Pool *x509.CertPool
// Digest of the serialized bytes of the certificate
Digest digest.Digest
// This signer will be nil if the node doesn't have the appropriate key material
Signer cfsigner.Signer
// Path stores the location on disk where the RootCA lives
Path CertPaths
}
// CanSign ensures that the signer has all three necessary elements needed to operate
@ -165,9 +163,9 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
// Get the remote manager to issue a CA signed certificate for this node
// Retry up to 5 times in case the manager we first try to contact isn't
// responding properly (for example, it may have just been demoted).
var response *api.NodeCertificateStatusResponse
var signedCert []byte
for i := 0; i != 5; i++ {
response, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo)
signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo)
if err == nil {
break
}
@ -179,7 +177,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
// Доверяй, но проверяй.
// Before we overwrite our local key + certificate, let's make sure the server gave us one that is valid
// Create an X509Cert so we can .Verify()
certBlock, _ := pem.Decode(response.Certificate.Certificate)
certBlock, _ := pem.Decode(signedCert)
if certBlock == nil {
return nil, errors.New("failed to parse certificate PEM")
}
@ -187,34 +185,17 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
if err != nil {
return nil, err
}
// We retrieve the certificate with the current root pool, so we know this was issued by a legitimate manager.
// However, there might have been a server-side root rotation, so we verify this cert with a new pool.
// If we got a valid response.RootCABundle, turn it into a Pool, and verify the newly minted certificate using it.
var (
newRootErr error
newRootCA RootCA
)
rootCAPool := rca.Pool
if response.RootCABundle != nil {
newRootCA, newRootErr = NewRootCA(response.RootCABundle, nil, rca.Path, time.Minute)
if newRootErr == nil {
// The response.RootCABundle we got from the remote server seems to be good, use it
rootCAPool = newRootCA.Pool
}
}
// Create VerifyOptions with either the new certificate bundle, or the old pool
// Include our current root pool
opts := x509.VerifyOptions{
Roots: rootCAPool,
Roots: rca.Pool,
}
// Check to see if this certificate was signed by one of the CAs, and isn't expired
// Check to see if this certificate was signed by our CA, and isn't expired
if _, err := X509Cert.Verify(opts); err != nil {
return nil, err
}
// Create a valid TLSKeyPair out of the PEM encoded private key and certificate
tlsKeyPair, err := tls.X509KeyPair(response.Certificate.Certificate, key)
tlsKeyPair, err := tls.X509KeyPair(signedCert, key)
if err != nil {
return nil, err
}
@ -230,16 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
return nil, err
}
// If a CA certificate bundle exists it has been validated before. If it's different, let's write it to disk.
// Root rotation should always happen by appending a new CA cert, and later removing the old one,
// so it's safer to do it in this order of operations (write root, write certificate)
if newRootErr == nil && !bytes.Equal(rca.Cert, response.RootCABundle) {
if err := newRootCA.saveCertificate(); err != nil {
return nil, err
}
}
if err := kw.Write(response.Certificate.Certificate, key, kekUpdate); err != nil {
if err := kw.Write(signedCert, key, kekUpdate); err != nil {
return nil, err
}
@ -344,28 +316,10 @@ func (rca *RootCA) AppendFirstRootPEM(cert []byte) ([]byte, error) {
return certChain, nil
}
func (rca *RootCA) saveCertificate() error {
if rca.Cert == nil {
return errors.New("no valid certificate bundle found")
}
if rca.Path.Cert == "" {
return errors.New("no path found for this root CA")
}
// Make sure the necessary dirs exist and they are writable
err := os.MkdirAll(filepath.Dir(rca.Path.Cert), 0755)
if err != nil {
return err
}
return ioutils.AtomicWriteFile(rca.Path.Cert, rca.Cert, 0644)
}
// NewRootCA creates a new RootCA object from unparsed PEM cert bundle and key byte
// slices. key may be nil, and in this case NewRootCA will return a RootCA
// without a signer.
func NewRootCA(certBytes, keyBytes []byte, paths CertPaths, certExpiry time.Duration) (RootCA, error) {
func NewRootCA(certBytes, keyBytes []byte, certExpiry time.Duration) (RootCA, error) {
// Parse all the certificates in the cert bundle
parsedCerts, err := helpers.ParseCertificatesPEM(certBytes)
if err != nil {
@ -391,7 +345,7 @@ func NewRootCA(certBytes, keyBytes []byte, paths CertPaths, certExpiry time.Dura
if len(keyBytes) == 0 {
// This RootCA does not have a valid signer.
return RootCA{Cert: certBytes, Digest: digest, Pool: pool, Path: paths}, nil
return RootCA{Cert: certBytes, Digest: digest, Pool: pool}, nil
}
var (
@ -433,7 +387,7 @@ func NewRootCA(certBytes, keyBytes []byte, paths CertPaths, certExpiry time.Dura
keyBlock, _ := pem.Decode(keyBytes)
if keyBlock == nil {
// This RootCA does not have a valid signer.
return RootCA{Cert: certBytes, Digest: digest, Pool: pool, Path: paths}, nil
return RootCA{Cert: certBytes, Digest: digest, Pool: pool}, nil
}
if passphraseStr != "" && !x509.IsEncryptedPEMBlock(keyBlock) {
keyBytes, err = EncryptECPrivateKey(keyBytes, passphraseStr)
@ -442,7 +396,7 @@ func NewRootCA(certBytes, keyBytes []byte, paths CertPaths, certExpiry time.Dura
}
}
return RootCA{Signer: signer, Key: keyBytes, Digest: digest, Cert: certBytes, Pool: pool, Path: paths}, nil
return RootCA{Signer: signer, Key: keyBytes, Digest: digest, Cert: certBytes, Pool: pool}, nil
}
func ensureCertKeyMatch(cert *x509.Certificate, key crypto.PublicKey) error {
@ -460,7 +414,8 @@ func ensureCertKeyMatch(cert *x509.Certificate, key crypto.PublicKey) error {
return errors.New("certificate key mismatch")
}
// GetLocalRootCA returns the PEM-encoded root CA Certificate if it exists
// GetLocalRootCA validates if the contents of the file are a valid self-signed
// CA certificate, and returns the PEM-encoded Certificate if so
func GetLocalRootCA(paths CertPaths) (RootCA, error) {
// Check if we have a Certificate file
cert, err := ioutil.ReadFile(paths.Cert)
@ -472,7 +427,17 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) {
return RootCA{}, err
}
return NewRootCA(cert, nil, paths, DefaultNodeCertExpiration)
key, err := ioutil.ReadFile(paths.Key)
if err != nil {
if !os.IsNotExist(err) {
return RootCA{}, err
}
// There may not be a local key. It's okay to pass in a nil
// key. We'll get a root CA without a signer.
key = nil
}
return NewRootCA(cert, key, DefaultNodeCertExpiration)
}
func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) {
@ -565,13 +530,13 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
return RootCA{}, err
}
rootCA, err := NewRootCA(cert, key, paths, DefaultNodeCertExpiration)
rootCA, err := NewRootCA(cert, key, DefaultNodeCertExpiration)
if err != nil {
return RootCA{}, err
}
// save the cert to disk
if err := rootCA.saveCertificate(); err != nil {
if err := saveRootCA(rootCA, paths); err != nil {
return RootCA{}, err
}
@ -580,7 +545,7 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
// and that is part of a CA identified by a specific certificate pool.
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) (*api.NodeCertificateStatusResponse, error) {
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
if rootCAPool == nil {
return nil, errors.New("valid root CA pool required")
}
@ -632,7 +597,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
}
// If the certificate was issued, return
if statusResponse.Status != nil && statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Certificate == nil {
return nil, errors.New("no certificate in CertificateStatus response")
}
@ -644,7 +609,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
// current request.
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
r.Observe(peer, remotes.DefaultObservationWeight)
return statusResponse, nil
return statusResponse.Certificate.Certificate, nil
}
}
@ -678,6 +643,17 @@ func readCertValidity(kr KeyReader) (time.Time, time.Time, error) {
}
func saveRootCA(rootCA RootCA, paths CertPaths) error {
// Make sure the necessary dirs exist and they are writable
err := os.MkdirAll(filepath.Dir(paths.Cert), 0755)
if err != nil {
return err
}
// If the root certificate got returned successfully, save the rootCA to disk.
return ioutils.AtomicWriteFile(paths.Cert, rootCA.Cert, 0644)
}
// GenerateNewCSR returns a newly generated key and CSR signed with said key
func GenerateNewCSR() (csr, key []byte, err error) {
req := &cfcsr.CertificateRequest{

View file

@ -119,15 +119,8 @@ func (s *SecurityConfig) UpdateRootCA(cert, key []byte, certExpiry time.Duration
s.mu.Lock()
defer s.mu.Unlock()
// Create a new RootCA, keeping the path of the old RootCA
rootCA, err := NewRootCA(cert, key, s.rootCA.Path, certExpiry)
if err != nil {
return err
}
// Attempt to write the new certificate to disk
err = rootCA.saveCertificate()
rootCA, err := NewRootCA(cert, key, certExpiry)
if err == nil {
// No errors, save the current rootCA
s.rootCA = &rootCA
}
@ -238,8 +231,7 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote
}
// Save root CA certificate to disk
rootCA.Path = paths
if err = rootCA.saveCertificate(); err != nil {
if err = saveRootCA(rootCA, paths); err != nil {
return RootCA{}, err
}
@ -410,6 +402,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
// Since the expiration of the certificate is managed remotely we should update our
// retry timer on every iteration of this loop.
// Retrieve the current certificate expiration information.
validFrom, validUntil, err := readCertValidity(s.KeyReader())
if err != nil {
// We failed to read the expiration, let's stick with the starting default

View file

@ -142,9 +142,8 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
// If this certificate has a final state, return it immediately (both pending and renew are transition states)
if isFinalState(node.Certificate.Status) {
return &api.NodeCertificateStatusResponse{
Status: &node.Certificate.Status,
Certificate: &node.Certificate,
RootCABundle: s.securityConfig.RootCA().Cert,
Status: &node.Certificate.Status,
Certificate: &node.Certificate,
}, nil
}
@ -165,9 +164,8 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
if isFinalState(v.Node.Certificate.Status) {
cert := v.Node.Certificate.Copy()
return &api.NodeCertificateStatusResponse{
Status: &cert.Status,
Certificate: cert,
RootCABundle: s.securityConfig.RootCA().Cert,
Status: &cert.Status,
Certificate: cert,
}, nil
}
}

View file

@ -746,7 +746,7 @@ func (na *NetworkAllocator) allocatePools(n *api.Network) (map[string]string, er
}
for i, ic := range ipamConfigs {
poolID, poolIP, _, err := ipam.RequestPool(asName, ic.Subnet, ic.Range, nil, false)
poolID, poolIP, _, err := ipam.RequestPool(asName, ic.Subnet, ic.Range, dOptions, false)
if err != nil {
// Rollback by releasing all the resources allocated so far.
releasePools(ipam, ipamConfigs[:i], pools)

View file

@ -2,6 +2,7 @@ package scheduler
import (
"fmt"
"strings"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/constraint"
@ -93,6 +94,15 @@ type PluginFilter struct {
t *api.Task
}
func referencesVolumePlugin(mount api.Mount) bool {
return mount.Type == api.MountTypeVolume &&
mount.VolumeOptions != nil &&
mount.VolumeOptions.DriverConfig != nil &&
mount.VolumeOptions.DriverConfig.Name != "" &&
mount.VolumeOptions.DriverConfig.Name != "local"
}
// SetTask returns true when the filter is enabled for a given task.
func (f *PluginFilter) SetTask(t *api.Task) bool {
c := t.Spec.GetContainer()
@ -100,12 +110,9 @@ func (f *PluginFilter) SetTask(t *api.Task) bool {
var volumeTemplates bool
if c != nil {
for _, mount := range c.Mounts {
if mount.Type == api.MountTypeVolume &&
mount.VolumeOptions != nil &&
mount.VolumeOptions.DriverConfig != nil &&
mount.VolumeOptions.DriverConfig.Name != "" &&
mount.VolumeOptions.DriverConfig.Name != "local" {
if referencesVolumePlugin(mount) {
volumeTemplates = true
break
}
}
}
@ -128,7 +135,7 @@ func (f *PluginFilter) Check(n *NodeInfo) bool {
container := f.t.Spec.GetContainer()
if container != nil {
for _, mount := range container.Mounts {
if mount.VolumeOptions != nil && mount.VolumeOptions.DriverConfig != nil {
if referencesVolumePlugin(mount) {
if !f.pluginExistsOnNode("Volume", mount.VolumeOptions.DriverConfig.Name, nodePlugins) {
return false
}
@ -138,16 +145,30 @@ func (f *PluginFilter) Check(n *NodeInfo) bool {
// Check if all network plugins required by task are installed on node
for _, tn := range f.t.Networks {
if !f.pluginExistsOnNode("Network", tn.Network.DriverState.Name, nodePlugins) {
return false
if tn.Network != nil && tn.Network.DriverState != nil && tn.Network.DriverState.Name != "" {
if !f.pluginExistsOnNode("Network", tn.Network.DriverState.Name, nodePlugins) {
return false
}
}
}
return true
}
// pluginExistsOnNode returns true if the (pluginName, pluginType) pair is present in nodePlugins
func (f *PluginFilter) pluginExistsOnNode(pluginType string, pluginName string, nodePlugins []api.PluginDescription) bool {
for _, np := range nodePlugins {
if pluginType == np.Type && pluginName == np.Name {
if pluginType != np.Type {
continue
}
if pluginName == np.Name {
return true
}
// This does not use the reference package to avoid the
// overhead of parsing references as part of the scheduling
// loop. This is okay only because plugin names are a very
// strict subset of the reference grammar that is always
// name:tag.
if strings.HasPrefix(np.Name, pluginName) && np.Name[len(pluginName):] == ":latest" {
return true
}
}

View file

@ -11,12 +11,7 @@ var (
// Always check for readiness first.
&ReadyFilter{},
&ResourceFilter{},
// TODO(stevvooe): Do not filter based on plugins since they are lazy
// loaded in the engine. We can add this back when we can schedule
// plugins in the future.
// &PluginFilter{},
&PluginFilter{},
&ConstraintFilter{},
}
)

View file

@ -297,12 +297,10 @@ func (n *Node) run(ctx context.Context) (err error) {
go func() {
managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
wg.Done()
cancel()
}()
go func() {
agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
wg.Done()
cancel()
}()
go func() {
@ -330,6 +328,14 @@ func (n *Node) Stop(ctx context.Context) error {
default:
return errNodeNotStarted
}
// ask agent to clean up assignments
n.Lock()
if n.agent != nil {
if err := n.agent.Leave(ctx); err != nil {
log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
}
}
n.Unlock()
n.stopOnce.Do(func() {
close(n.stopped)

View file

@ -14,7 +14,7 @@ var (
// errInvalidName is a typed error returned when creating a volume with a name that is not valid on the platform
errInvalidName = errors.New("volume name is not valid on this platform")
// errNameConflict is a typed error returned on create when a volume exists with the given name, but for a different driver
errNameConflict = errors.New("conflict: volume name must be unique")
errNameConflict = errors.New("volume name must be unique")
)
// OpErr is the error type returned by functions in the store package. It describes

View file

@ -290,8 +290,17 @@ func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, err
vDriverName := v.DriverName()
var conflict bool
if driverName != "" && vDriverName != driverName {
conflict = true
if driverName != "" {
// Retrieve canonical driver name to avoid inconsistencies (for example
// "plugin" vs. "plugin:latest")
vd, err := volumedrivers.GetDriver(driverName)
if err != nil {
return nil, err
}
if vDriverName != vd.Name() {
conflict = true
}
}
// let's check if the found volume ref