Revendor swarmkit

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2016-07-22 10:26:45 -07:00
parent 76b27c0808
commit d626875a94
21 changed files with 1304 additions and 477 deletions

View file

@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267
# cluster
clone git github.com/docker/swarmkit 38857c06dafcf939a56d2650d8e0011b5aace384
clone git github.com/docker/swarmkit 4d7e44321726f011d010cdb72d2230f5db2b604e
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b

View file

@ -2,10 +2,13 @@ package exec
import (
"fmt"
"reflect"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -182,6 +185,10 @@ func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus,
// is completed.
defer func() {
logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
if !reflect.DeepEqual(status, task.Status) {
status.Timestamp = ptypes.MustTimestampProto(time.Now())
}
}()
// extract the container status from the container, if supported.

View file

@ -31,10 +31,10 @@ const stateFilename = "state.json"
// NodeConfig provides values for a Node.
type NodeConfig struct {
// Hostname the name of host for agent instance.
// Hostname is the name of host for agent instance.
Hostname string
// JoinAddrs specifies node that should be used for the initial connection to
// JoinAddr specifies node that should be used for the initial connection to
// other manager in cluster. This should be only one address and optional,
// the actual remotes come from the stored state.
JoinAddr string
@ -60,6 +60,10 @@ type NodeConfig struct {
// and raft members connect to.
ListenRemoteAPI string
// AdvertiseRemoteAPI specifies the address that should be advertised
// for connections to the remote API (including the raft service).
AdvertiseRemoteAPI string
// Executor specifies the executor to use for the agent.
Executor exec.Executor
@ -425,6 +429,9 @@ func (n *Node) CertificateRequested() <-chan struct{} {
func (n *Node) setControlSocket(conn *grpc.ClientConn) {
n.Lock()
if n.conn != nil {
n.conn.Close()
}
n.conn = conn
n.connCond.Broadcast()
n.Unlock()
@ -478,7 +485,7 @@ func (n *Node) NodeMembership() api.NodeSpec_Membership {
return n.nodeMembership
}
// Manager return manager instance started by node. May be nil.
// Manager returns manager instance started by node. May be nil.
func (n *Node) Manager() *manager.Manager {
n.RLock()
defer n.RUnlock()
@ -542,6 +549,8 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
opts := []grpc.DialOption{}
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
// Using listen address instead of advertised address because this is a
// local connection.
addr := n.config.ListenControlAPI
opts = append(opts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
@ -571,11 +580,11 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
}
}
func (n *Node) waitRole(ctx context.Context, role string) error {
func (n *Node) waitRole(ctx context.Context, role string) {
n.roleCond.L.Lock()
if role == n.role {
n.roleCond.L.Unlock()
return nil
return
}
finishCh := make(chan struct{})
defer close(finishCh)
@ -591,17 +600,14 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
for role != n.role {
n.roleCond.Wait()
if ctx.Err() != nil {
return ctx.Err()
return
}
}
return nil
}
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
for {
if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
return err
}
n.waitRole(ctx, ca.ManagerRole)
if ctx.Err() != nil {
return ctx.Err()
}
@ -612,6 +618,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
@ -647,25 +654,24 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
ready = nil
}
if err := n.waitRole(ctx, ca.AgentRole); err != nil {
m.Stop(context.Background())
}
n.waitRole(ctx, ca.AgentRole)
n.Lock()
n.manager = nil
n.Unlock()
select {
case <-done:
case <-ctx.Done():
err = ctx.Err()
m.Stop(context.Background())
return ctx.Err()
<-done
}
connCancel()
n.Lock()
n.manager = nil
if n.conn != nil {
n.conn.Close()
if err != nil {
return err
}
n.Unlock()
}
}

View file

@ -118,10 +118,10 @@ func WalkTaskStatus(tx *bolt.Tx, fn func(id string, status *api.TaskStatus) erro
// PutTask places the task into the database.
func PutTask(tx *bolt.Tx, task *api.Task) error {
return withCreateTaskBucketIfNotExists(tx, task.ID, func(bkt *bolt.Bucket) error {
task = task.Copy()
task.Status = api.TaskStatus{} // blank out the status.
taskCopy := *task
taskCopy.Status = api.TaskStatus{} // blank out the status.
p, err := proto.Marshal(task)
p, err := proto.Marshal(&taskCopy)
if err != nil {
return err
}

View file

@ -247,10 +247,11 @@ func (tm *taskManager) run(ctx context.Context) {
//
// This used to decide whether or not to propagate a task update to a controller.
func tasksEqual(a, b *api.Task) bool {
a, b = a.Copy(), b.Copy()
// shallow copy
copyA, copyB := *a, *b
a.Status, b.Status = api.TaskStatus{}, api.TaskStatus{}
a.Meta, b.Meta = api.Meta{}, api.Meta{}
copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{}
copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(a, b)
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -71,6 +71,9 @@ type Service struct {
// the optional fields like node_port or virtual_ip and it
// could be auto allocated by the system.
Endpoint *Endpoint `protobuf:"bytes,4,opt,name=endpoint" json:"endpoint,omitempty"`
// UpdateStatus contains the status of an update, if one is in
// progress.
UpdateStatus *UpdateStatus `protobuf:"bytes,5,opt,name=update_status,json=updateStatus" json:"update_status,omitempty"`
}
func (m *Service) Reset() { *m = Service{} }
@ -278,10 +281,11 @@ func (m *Service) Copy() *Service {
}
o := &Service{
ID: m.ID,
Meta: *m.Meta.Copy(),
Spec: *m.Spec.Copy(),
Endpoint: m.Endpoint.Copy(),
ID: m.ID,
Meta: *m.Meta.Copy(),
Spec: *m.Spec.Copy(),
Endpoint: m.Endpoint.Copy(),
UpdateStatus: m.UpdateStatus.Copy(),
}
return o
@ -464,7 +468,7 @@ func (this *Service) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 8)
s := make([]string, 0, 9)
s = append(s, "&api.Service{")
s = append(s, "ID: "+fmt.Sprintf("%#v", this.ID)+",\n")
s = append(s, "Meta: "+strings.Replace(this.Meta.GoString(), `&`, ``, 1)+",\n")
@ -472,6 +476,9 @@ func (this *Service) GoString() string {
if this.Endpoint != nil {
s = append(s, "Endpoint: "+fmt.Sprintf("%#v", this.Endpoint)+",\n")
}
if this.UpdateStatus != nil {
s = append(s, "UpdateStatus: "+fmt.Sprintf("%#v", this.UpdateStatus)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
@ -785,6 +792,16 @@ func (m *Service) MarshalTo(data []byte) (int, error) {
}
i += n13
}
if m.UpdateStatus != nil {
data[i] = 0x2a
i++
i = encodeVarintObjects(data, i, uint64(m.UpdateStatus.Size()))
n14, err := m.UpdateStatus.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n14
}
return i, nil
}
@ -807,11 +824,11 @@ func (m *Endpoint) MarshalTo(data []byte) (int, error) {
data[i] = 0xa
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n14, err := m.Spec.MarshalTo(data[i:])
n15, err := m.Spec.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n14
i += n15
}
if len(m.Ports) > 0 {
for _, msg := range m.Ports {
@ -894,19 +911,19 @@ func (m *Task) MarshalTo(data []byte) (int, error) {
data[i] = 0x12
i++
i = encodeVarintObjects(data, i, uint64(m.Meta.Size()))
n15, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n15
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n16, err := m.Spec.MarshalTo(data[i:])
n16, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n16
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n17, err := m.Spec.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n17
if len(m.ServiceID) > 0 {
data[i] = 0x22
i++
@ -927,27 +944,27 @@ func (m *Task) MarshalTo(data []byte) (int, error) {
data[i] = 0x3a
i++
i = encodeVarintObjects(data, i, uint64(m.Annotations.Size()))
n17, err := m.Annotations.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n17
data[i] = 0x42
i++
i = encodeVarintObjects(data, i, uint64(m.ServiceAnnotations.Size()))
n18, err := m.ServiceAnnotations.MarshalTo(data[i:])
n18, err := m.Annotations.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n18
data[i] = 0x4a
data[i] = 0x42
i++
i = encodeVarintObjects(data, i, uint64(m.Status.Size()))
n19, err := m.Status.MarshalTo(data[i:])
i = encodeVarintObjects(data, i, uint64(m.ServiceAnnotations.Size()))
n19, err := m.ServiceAnnotations.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n19
data[i] = 0x4a
i++
i = encodeVarintObjects(data, i, uint64(m.Status.Size()))
n20, err := m.Status.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n20
if m.DesiredState != 0 {
data[i] = 0x50
i++
@ -969,21 +986,21 @@ func (m *Task) MarshalTo(data []byte) (int, error) {
data[i] = 0x62
i++
i = encodeVarintObjects(data, i, uint64(m.Endpoint.Size()))
n20, err := m.Endpoint.MarshalTo(data[i:])
n21, err := m.Endpoint.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n20
i += n21
}
if m.LogDriver != nil {
data[i] = 0x6a
i++
i = encodeVarintObjects(data, i, uint64(m.LogDriver.Size()))
n21, err := m.LogDriver.MarshalTo(data[i:])
n22, err := m.LogDriver.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n21
i += n22
}
return i, nil
}
@ -1007,11 +1024,11 @@ func (m *NetworkAttachment) MarshalTo(data []byte) (int, error) {
data[i] = 0xa
i++
i = encodeVarintObjects(data, i, uint64(m.Network.Size()))
n22, err := m.Network.MarshalTo(data[i:])
n23, err := m.Network.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n22
i += n23
}
if len(m.Addresses) > 0 {
for _, s := range m.Addresses {
@ -1070,38 +1087,38 @@ func (m *Network) MarshalTo(data []byte) (int, error) {
data[i] = 0x12
i++
i = encodeVarintObjects(data, i, uint64(m.Meta.Size()))
n23, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n23
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n24, err := m.Spec.MarshalTo(data[i:])
n24, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n24
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n25, err := m.Spec.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n25
if m.DriverState != nil {
data[i] = 0x22
i++
i = encodeVarintObjects(data, i, uint64(m.DriverState.Size()))
n25, err := m.DriverState.MarshalTo(data[i:])
n26, err := m.DriverState.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n25
i += n26
}
if m.IPAM != nil {
data[i] = 0x2a
i++
i = encodeVarintObjects(data, i, uint64(m.IPAM.Size()))
n26, err := m.IPAM.MarshalTo(data[i:])
n27, err := m.IPAM.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n26
i += n27
}
return i, nil
}
@ -1130,27 +1147,27 @@ func (m *Cluster) MarshalTo(data []byte) (int, error) {
data[i] = 0x12
i++
i = encodeVarintObjects(data, i, uint64(m.Meta.Size()))
n27, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n27
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n28, err := m.Spec.MarshalTo(data[i:])
n28, err := m.Meta.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n28
data[i] = 0x22
data[i] = 0x1a
i++
i = encodeVarintObjects(data, i, uint64(m.RootCA.Size()))
n29, err := m.RootCA.MarshalTo(data[i:])
i = encodeVarintObjects(data, i, uint64(m.Spec.Size()))
n29, err := m.Spec.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n29
data[i] = 0x22
i++
i = encodeVarintObjects(data, i, uint64(m.RootCA.Size()))
n30, err := m.RootCA.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n30
if len(m.NetworkBootstrapKeys) > 0 {
for _, msg := range m.NetworkBootstrapKeys {
data[i] = 0x2a
@ -1260,6 +1277,10 @@ func (m *Service) Size() (n int) {
l = m.Endpoint.Size()
n += 1 + l + sovObjects(uint64(l))
}
if m.UpdateStatus != nil {
l = m.UpdateStatus.Size()
n += 1 + l + sovObjects(uint64(l))
}
return n
}
@ -1467,6 +1488,7 @@ func (this *Service) String() string {
`Meta:` + strings.Replace(strings.Replace(this.Meta.String(), "Meta", "Meta", 1), `&`, ``, 1) + `,`,
`Spec:` + strings.Replace(strings.Replace(this.Spec.String(), "ServiceSpec", "ServiceSpec", 1), `&`, ``, 1) + `,`,
`Endpoint:` + strings.Replace(fmt.Sprintf("%v", this.Endpoint), "Endpoint", "Endpoint", 1) + `,`,
`UpdateStatus:` + strings.Replace(fmt.Sprintf("%v", this.UpdateStatus), "UpdateStatus", "UpdateStatus", 1) + `,`,
`}`,
}, "")
return s
@ -2160,6 +2182,39 @@ func (m *Service) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field UpdateStatus", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowObjects
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthObjects
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.UpdateStatus == nil {
m.UpdateStatus = &UpdateStatus{}
}
if err := m.UpdateStatus.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipObjects(data[iNdEx:])
@ -3527,67 +3582,68 @@ var (
)
var fileDescriptorObjects = []byte{
// 981 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xbc, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xaf, 0xed, 0x8d, 0xed, 0x7d, 0x8e, 0x23, 0x31, 0x54, 0xd5, 0x36, 0x84, 0xa4, 0xb8, 0x02,
0x71, 0x40, 0xae, 0x28, 0x05, 0x81, 0xa0, 0x42, 0xb6, 0x13, 0x81, 0x05, 0x81, 0x68, 0x5a, 0x85,
0xe3, 0x6a, 0xb2, 0x3b, 0x35, 0x8b, 0xed, 0xdd, 0xd5, 0xcc, 0x24, 0x55, 0x6e, 0x88, 0x0f, 0xc0,
0x47, 0xe0, 0xab, 0x70, 0x8d, 0x10, 0x07, 0x8e, 0x9c, 0x2a, 0xda, 0x1b, 0x27, 0xf8, 0x08, 0xbc,
0xf9, 0xb3, 0xf6, 0x56, 0x5e, 0x87, 0x56, 0xaa, 0x72, 0x58, 0x69, 0xfe, 0xfc, 0x7e, 0xbf, 0x79,
0xef, 0xcd, 0x7b, 0x6f, 0x07, 0xba, 0xd9, 0xc9, 0x0f, 0x3c, 0x52, 0xb2, 0x9f, 0x8b, 0x4c, 0x65,
0x84, 0xc4, 0x59, 0x34, 0xe5, 0xa2, 0x2f, 0x1f, 0x33, 0x31, 0x9f, 0x26, 0xaa, 0x7f, 0xf6, 0xfe,
0x76, 0x47, 0x9d, 0xe7, 0xdc, 0x01, 0xb6, 0x3b, 0x32, 0xe7, 0x51, 0x31, 0xb9, 0xa9, 0x92, 0x39,
0x97, 0x8a, 0xcd, 0xf3, 0x3b, 0x8b, 0x91, 0xdb, 0xba, 0x3e, 0xc9, 0x26, 0x99, 0x19, 0xde, 0xd1,
0x23, 0xbb, 0xda, 0xfb, 0xb5, 0x06, 0xde, 0x21, 0x57, 0x8c, 0x7c, 0x0a, 0xad, 0x33, 0x2e, 0x64,
0x92, 0xa5, 0x41, 0xed, 0x56, 0xed, 0xdd, 0xce, 0xdd, 0x37, 0xfa, 0xab, 0x27, 0xf7, 0x8f, 0x2d,
0x64, 0xe8, 0x5d, 0x3c, 0xd9, 0xbb, 0x46, 0x0b, 0x06, 0xf9, 0x0c, 0x20, 0x12, 0x9c, 0x29, 0x1e,
0x87, 0x4c, 0x05, 0x75, 0xc3, 0x7f, 0xb3, 0x8a, 0xff, 0xb0, 0x30, 0x8a, 0xfa, 0x8e, 0x30, 0x50,
0x9a, 0x7d, 0x9a, 0xc7, 0x05, 0xbb, 0xf1, 0x42, 0x6c, 0x47, 0x18, 0xa8, 0xde, 0xdf, 0x0d, 0xf0,
0xbe, 0xc9, 0x62, 0x4e, 0x6e, 0x40, 0x3d, 0x89, 0x8d, 0xf1, 0xfe, 0xb0, 0xf9, 0xec, 0xc9, 0x5e,
0x7d, 0xbc, 0x4f, 0x71, 0x85, 0xdc, 0x05, 0x6f, 0x8e, 0x1e, 0x3a, 0xb3, 0x82, 0x2a, 0x61, 0x1d,
0x01, 0xe7, 0x93, 0xc1, 0x92, 0x8f, 0xc0, 0xd3, 0x61, 0x75, 0xc6, 0xec, 0x54, 0x71, 0xf4, 0x99,
0x0f, 0x10, 0x53, 0xf0, 0x34, 0x9e, 0x1c, 0x40, 0x27, 0xe6, 0x32, 0x12, 0x49, 0xae, 0x74, 0x24,
0x3d, 0x43, 0xbf, 0xbd, 0x8e, 0xbe, 0xbf, 0x84, 0xd2, 0x32, 0x0f, 0x23, 0xd2, 0x44, 0x3f, 0xd5,
0xa9, 0x0c, 0x36, 0x8c, 0xc2, 0xee, 0x5a, 0x03, 0x0c, 0xca, 0x99, 0xe0, 0x38, 0xe4, 0x4b, 0xd8,
0x9a, 0xb3, 0x94, 0x4d, 0xb8, 0x08, 0x9d, 0x4a, 0xd3, 0xa8, 0xbc, 0x55, 0xe9, 0xba, 0x45, 0x5a,
0x21, 0xda, 0x9d, 0x97, 0xa7, 0xe8, 0x0e, 0x30, 0xa5, 0x58, 0xf4, 0xfd, 0x9c, 0xa7, 0x2a, 0x68,
0x19, 0x95, 0xb7, 0x2b, 0x6d, 0xe1, 0xea, 0x71, 0x26, 0xa6, 0x83, 0x05, 0x98, 0x96, 0x88, 0xe4,
0x0b, 0xe8, 0x44, 0x5c, 0xa8, 0xe4, 0x51, 0x12, 0xe1, 0xa5, 0x05, 0x6d, 0xa3, 0xb3, 0x57, 0xa5,
0x33, 0x5a, 0xc2, 0x9c, 0x53, 0x65, 0x66, 0xef, 0xb7, 0x1a, 0xb4, 0x1e, 0x70, 0x71, 0x96, 0x44,
0xaf, 0xf6, 0xba, 0x3f, 0x79, 0xee, 0xba, 0x2b, 0x2d, 0x73, 0xc7, 0xae, 0xdc, 0xf8, 0xc7, 0xd0,
0xe6, 0x69, 0x9c, 0x67, 0x09, 0x06, 0xc8, 0x5b, 0x9f, 0x2d, 0x07, 0x0e, 0x43, 0x17, 0xe8, 0xde,
0x2f, 0x75, 0x68, 0x17, 0xcb, 0xe4, 0x9e, 0xb3, 0xc0, 0xd6, 0xde, 0xad, 0xcb, 0x24, 0xb4, 0x09,
0xee, 0xf0, 0x7b, 0xb0, 0x91, 0x67, 0x42, 0x49, 0x74, 0xb6, 0xb1, 0x2e, 0x4d, 0x8e, 0x10, 0x30,
0xca, 0xd2, 0x47, 0xc9, 0x84, 0x5a, 0x30, 0xf9, 0x0e, 0x3a, 0x67, 0x89, 0x50, 0xa7, 0x6c, 0x16,
0x26, 0xb9, 0x44, 0xa7, 0x35, 0xf7, 0x9d, 0xcb, 0x8e, 0xec, 0x1f, 0x5b, 0xfc, 0xf8, 0x68, 0xb8,
0x85, 0xa1, 0x86, 0xc5, 0x54, 0x52, 0x70, 0x52, 0xe3, 0x5c, 0x6e, 0x1f, 0x82, 0xbf, 0xd8, 0x21,
0xef, 0x01, 0xa4, 0x36, 0x2b, 0xc2, 0xc5, 0x3d, 0x75, 0x91, 0xec, 0xbb, 0x5c, 0xc1, 0xeb, 0xf2,
0x1d, 0x60, 0x1c, 0x13, 0x02, 0x1e, 0x8b, 0x63, 0x61, 0x6e, 0xcd, 0xa7, 0x66, 0xdc, 0xfb, 0x7d,
0x03, 0xbc, 0x87, 0x4c, 0x4e, 0xaf, 0xba, 0xb2, 0xf5, 0x99, 0x2b, 0xf7, 0x8c, 0xee, 0x48, 0x9b,
0x02, 0xda, 0x1d, 0x6f, 0xe9, 0x8e, 0x4b, 0x0c, 0xed, 0x8e, 0x03, 0x58, 0x77, 0xe4, 0x2c, 0x53,
0xa6, 0x7c, 0x3d, 0x6a, 0xc6, 0xe4, 0x36, 0xb4, 0x52, 0x2c, 0x59, 0x4d, 0x6f, 0x1a, 0x3a, 0x20,
0xbd, 0xa9, 0xab, 0x18, 0xb9, 0x4d, 0xbd, 0x85, 0x44, 0x2c, 0x15, 0x96, 0xa6, 0x19, 0x96, 0x1f,
0xf6, 0x01, 0xe9, 0x4a, 0xae, 0x32, 0x21, 0x07, 0x4b, 0x58, 0x51, 0x2a, 0x25, 0x26, 0x39, 0x86,
0xd7, 0x0b, 0x7b, 0xcb, 0x82, 0xed, 0x97, 0x11, 0x24, 0x4e, 0xa1, 0xb4, 0x53, 0x6a, 0x4d, 0xfe,
0xfa, 0xd6, 0x64, 0x22, 0x58, 0xd5, 0x9a, 0x86, 0xd0, 0xc5, 0x3e, 0x97, 0x08, 0x6c, 0xf5, 0x7a,
0x85, 0x07, 0x80, 0x22, 0x5b, 0x6b, 0xba, 0xbd, 0x13, 0xe1, 0x74, 0xd3, 0x71, 0xcc, 0x8c, 0x0c,
0xa0, 0xed, 0xf2, 0x46, 0x06, 0x1d, 0x93, 0xbb, 0x2f, 0xd8, 0x92, 0x16, 0xb4, 0xe7, 0x8a, 0x76,
0xf3, 0x65, 0x8a, 0x16, 0x3b, 0x05, 0xcc, 0xb2, 0x49, 0x18, 0x8b, 0x04, 0xff, 0x7d, 0x41, 0xd7,
0x70, 0xb7, 0xab, 0xb8, 0xfb, 0x06, 0x41, 0x7d, 0x44, 0xdb, 0x61, 0xef, 0xa7, 0x1a, 0xbc, 0xb6,
0x62, 0x14, 0xf9, 0x10, 0xb3, 0xc2, 0x2e, 0x5e, 0xf6, 0xdf, 0x75, 0x3c, 0x5a, 0x60, 0xc9, 0x0e,
0xf8, 0xba, 0x46, 0xb8, 0x94, 0xdc, 0x56, 0xbf, 0x4f, 0x97, 0x0b, 0x24, 0x80, 0x16, 0x9b, 0x25,
0x4c, 0xef, 0x35, 0xcc, 0x5e, 0x31, 0xed, 0xfd, 0x5c, 0x87, 0x96, 0x13, 0xbb, 0xea, 0x0e, 0xea,
0x8e, 0x5d, 0xa9, 0xac, 0xfb, 0xb0, 0x69, 0xc3, 0xe9, 0x52, 0xc2, 0xfb, 0xdf, 0xa0, 0x76, 0x2c,
0xde, 0xa6, 0xc3, 0x7d, 0xf0, 0x92, 0x9c, 0xcd, 0xdd, 0x9f, 0xb2, 0xf2, 0xe4, 0xf1, 0xd1, 0xe0,
0xf0, 0xdb, 0xdc, 0x66, 0x76, 0x1b, 0x1d, 0xf5, 0xf4, 0x02, 0x35, 0xb4, 0xde, 0x3f, 0x18, 0x90,
0xd1, 0xec, 0x54, 0x2a, 0x2e, 0xae, 0x3a, 0x20, 0xee, 0xd8, 0x95, 0x80, 0x8c, 0xa0, 0x25, 0xb2,
0x4c, 0x85, 0x11, 0xbb, 0x2c, 0x16, 0x14, 0x21, 0xa3, 0xc1, 0x70, 0x4b, 0x13, 0x75, 0x23, 0xb1,
0x73, 0xda, 0xd4, 0xd4, 0x11, 0xc3, 0x26, 0x7f, 0xa3, 0x68, 0xbf, 0x27, 0xb8, 0x22, 0x95, 0x60,
0x79, 0x38, 0xe5, 0xe7, 0xfa, 0x49, 0xd1, 0x58, 0xf7, 0x18, 0x38, 0x48, 0x23, 0x71, 0x6e, 0x02,
0xf5, 0x15, 0x3f, 0xa7, 0xd7, 0x9d, 0xc0, 0xb0, 0xe0, 0xe3, 0xa2, 0x24, 0x9f, 0xc3, 0x0e, 0x5f,
0xc0, 0xb4, 0x62, 0x38, 0xc3, 0x17, 0x19, 0xfe, 0x58, 0xc2, 0x68, 0x86, 0x8a, 0xa6, 0xb7, 0x79,
0xf4, 0x26, 0x2f, 0x4b, 0x7d, 0x6d, 0x11, 0x23, 0x0d, 0x18, 0xee, 0x5c, 0x3c, 0xdd, 0xbd, 0xf6,
0x27, 0x7e, 0xff, 0x3e, 0xdd, 0xad, 0xfd, 0xf8, 0x6c, 0xb7, 0x76, 0x81, 0xdf, 0x1f, 0xf8, 0xfd,
0x85, 0xdf, 0x49, 0xd3, 0xbc, 0x4b, 0x3f, 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xb9, 0x27, 0xf6,
0x9e, 0x07, 0x0b, 0x00, 0x00,
// 1000 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xbc, 0x56, 0x4d, 0x6f, 0x1b, 0x45,
0x18, 0xae, 0x93, 0x8d, 0xed, 0x7d, 0x1d, 0x47, 0x62, 0xa8, 0xaa, 0x6d, 0x08, 0x49, 0x71, 0x05,
0xe2, 0x80, 0x5c, 0x51, 0x0a, 0xa2, 0x82, 0x0a, 0xd9, 0x4e, 0x04, 0x16, 0x04, 0xa2, 0x69, 0x09,
0xc7, 0xd5, 0x64, 0x77, 0x6a, 0x16, 0xdb, 0xbb, 0xab, 0x99, 0x71, 0xaa, 0xdc, 0x10, 0x3f, 0x00,
0x89, 0x3f, 0xc0, 0x5f, 0xe1, 0x9a, 0x03, 0x07, 0x8e, 0x9c, 0x2a, 0xda, 0x1b, 0x27, 0xf8, 0x09,
0xbc, 0xf3, 0xb1, 0xf6, 0x46, 0x5e, 0x87, 0x56, 0xaa, 0x72, 0x58, 0x69, 0x3e, 0x9e, 0xe7, 0x99,
0xf7, 0x6b, 0xde, 0x59, 0x68, 0x67, 0x27, 0x3f, 0xf0, 0x48, 0xc9, 0x6e, 0x2e, 0x32, 0x95, 0x11,
0x12, 0x67, 0xd1, 0x98, 0x8b, 0xae, 0x7c, 0xc2, 0xc4, 0x74, 0x9c, 0xa8, 0xee, 0xe9, 0xfb, 0xdb,
0x2d, 0x75, 0x96, 0x73, 0x07, 0xd8, 0x6e, 0xc9, 0x9c, 0x47, 0xc5, 0xe4, 0xa6, 0x4a, 0xa6, 0x5c,
0x2a, 0x36, 0xcd, 0xef, 0xcc, 0x47, 0x6e, 0xeb, 0xfa, 0x28, 0x1b, 0x65, 0x66, 0x78, 0x47, 0x8f,
0xec, 0x6a, 0xe7, 0xb7, 0x1a, 0x78, 0x87, 0x5c, 0x31, 0xf2, 0x09, 0x34, 0x4e, 0xb9, 0x90, 0x49,
0x96, 0x06, 0xb5, 0x5b, 0xb5, 0x77, 0x5b, 0x77, 0xdf, 0xe8, 0x2e, 0x9f, 0xdc, 0x3d, 0xb6, 0x90,
0xbe, 0x77, 0xfe, 0x74, 0xef, 0x1a, 0x2d, 0x18, 0xe4, 0x53, 0x80, 0x48, 0x70, 0xa6, 0x78, 0x1c,
0x32, 0x15, 0xac, 0x19, 0xfe, 0x9b, 0x55, 0xfc, 0x47, 0x85, 0x51, 0xd4, 0x77, 0x84, 0x9e, 0xd2,
0xec, 0x59, 0x1e, 0x17, 0xec, 0xf5, 0x17, 0x62, 0x3b, 0x42, 0x4f, 0x75, 0xfe, 0x5e, 0x07, 0xef,
0xeb, 0x2c, 0xe6, 0xe4, 0x06, 0xac, 0x25, 0xb1, 0x31, 0xde, 0xef, 0xd7, 0x9f, 0x3f, 0xdd, 0x5b,
0x1b, 0xee, 0x53, 0x5c, 0x21, 0x77, 0xc1, 0x9b, 0xa2, 0x87, 0xce, 0xac, 0xa0, 0x4a, 0x58, 0x47,
0xc0, 0xf9, 0x64, 0xb0, 0xe4, 0x23, 0xf0, 0x74, 0x58, 0x9d, 0x31, 0x3b, 0x55, 0x1c, 0x7d, 0xe6,
0x43, 0xc4, 0x14, 0x3c, 0x8d, 0x27, 0x07, 0xd0, 0x8a, 0xb9, 0x8c, 0x44, 0x92, 0x2b, 0x1d, 0x49,
0xcf, 0xd0, 0x6f, 0xaf, 0xa2, 0xef, 0x2f, 0xa0, 0xb4, 0xcc, 0xc3, 0x88, 0xd4, 0xd1, 0x4f, 0x35,
0x93, 0xc1, 0x86, 0x51, 0xd8, 0x5d, 0x69, 0x80, 0x41, 0x39, 0x13, 0x1c, 0x87, 0x7c, 0x01, 0x5b,
0x53, 0x96, 0xb2, 0x11, 0x17, 0xa1, 0x53, 0xa9, 0x1b, 0x95, 0xb7, 0x2a, 0x5d, 0xb7, 0x48, 0x2b,
0x44, 0xdb, 0xd3, 0xf2, 0x14, 0xdd, 0x01, 0xa6, 0x14, 0x8b, 0xbe, 0x9f, 0xf2, 0x54, 0x05, 0x0d,
0xa3, 0xf2, 0x76, 0xa5, 0x2d, 0x5c, 0x3d, 0xc9, 0xc4, 0xb8, 0x37, 0x07, 0xd3, 0x12, 0x91, 0x7c,
0x0e, 0xad, 0x88, 0x0b, 0x95, 0x3c, 0x4e, 0x22, 0x4c, 0x5a, 0xd0, 0x34, 0x3a, 0x7b, 0x55, 0x3a,
0x83, 0x05, 0xcc, 0x39, 0x55, 0x66, 0x76, 0x7e, 0x59, 0x83, 0xc6, 0x43, 0x2e, 0x4e, 0x93, 0xe8,
0xd5, 0xa6, 0xfb, 0xfe, 0x85, 0x74, 0x57, 0x5a, 0xe6, 0x8e, 0x5d, 0xca, 0xf8, 0xc7, 0xd0, 0xe4,
0x69, 0x9c, 0x67, 0x09, 0x06, 0xc8, 0x5b, 0x5d, 0x2d, 0x07, 0x0e, 0x43, 0xe7, 0x68, 0x0c, 0x6e,
0xdb, 0x56, 0x71, 0x78, 0x21, 0xd7, 0xb7, 0xaa, 0xe8, 0xdf, 0x1a, 0xa0, 0x4b, 0xd2, 0xe6, 0xac,
0x34, 0xeb, 0xfc, 0xba, 0x06, 0xcd, 0x42, 0x9d, 0xdc, 0x73, 0x8e, 0xd4, 0x56, 0x4b, 0x15, 0x58,
0xed, 0x89, 0xf3, 0xe1, 0x1e, 0x6c, 0xe4, 0x99, 0x50, 0x12, 0x63, 0xb6, 0xbe, 0xaa, 0xda, 0x8e,
0x10, 0x30, 0xc8, 0xd2, 0xc7, 0xc9, 0x88, 0x5a, 0x30, 0xf9, 0x0e, 0x5a, 0xa7, 0x89, 0x50, 0x33,
0x36, 0x09, 0x93, 0x5c, 0x62, 0xec, 0x34, 0xf7, 0x9d, 0xcb, 0x8e, 0xec, 0x1e, 0x5b, 0xfc, 0xf0,
0xa8, 0xbf, 0x85, 0x19, 0x83, 0xf9, 0x54, 0x52, 0x70, 0x52, 0xc3, 0x5c, 0x6e, 0x1f, 0x82, 0x3f,
0xdf, 0x21, 0xef, 0x01, 0xa4, 0xb6, 0xb8, 0xc2, 0x79, 0xba, 0xdb, 0x48, 0xf6, 0x5d, 0xc9, 0x61,
0xd6, 0x7d, 0x07, 0x18, 0xc6, 0x84, 0x80, 0xc7, 0xe2, 0x58, 0x98, 0xe4, 0xfb, 0xd4, 0x8c, 0x3b,
0xbf, 0x6f, 0x80, 0xf7, 0x88, 0xc9, 0xf1, 0x55, 0x37, 0x08, 0x7d, 0xe6, 0x52, 0xb9, 0xa0, 0x3b,
0xd2, 0x56, 0x92, 0x76, 0xc7, 0x5b, 0xb8, 0xe3, 0xea, 0x4b, 0xbb, 0xe3, 0x00, 0xd6, 0x1d, 0x39,
0xc9, 0x94, 0xa9, 0x0c, 0x8f, 0x9a, 0x31, 0xb9, 0x0d, 0x8d, 0x14, 0x6f, 0xbe, 0xa6, 0xd7, 0x0d,
0x1d, 0x90, 0x5e, 0xd7, 0xcd, 0x00, 0xb9, 0x75, 0xbd, 0x85, 0x44, 0xbc, 0x71, 0x2c, 0x4d, 0x33,
0xac, 0x10, 0x6c, 0x27, 0xd2, 0xdd, 0xdc, 0xca, 0xba, 0xee, 0x2d, 0x60, 0xc5, 0x8d, 0x2b, 0x31,
0xc9, 0x31, 0xbc, 0x5e, 0xd8, 0x5b, 0x16, 0x6c, 0xbe, 0x8c, 0x20, 0x71, 0x0a, 0xa5, 0x9d, 0x52,
0x87, 0xf3, 0x57, 0x77, 0x38, 0x13, 0xc1, 0xaa, 0x0e, 0xd7, 0x87, 0x36, 0xb6, 0xcb, 0x44, 0xe0,
0x8b, 0xa1, 0x57, 0x78, 0x00, 0x28, 0xb2, 0xb5, 0xe2, 0xd1, 0x70, 0x22, 0x9c, 0x6e, 0x3a, 0x8e,
0x99, 0x91, 0x1e, 0x34, 0x5d, 0xdd, 0xc8, 0xa0, 0x65, 0x6a, 0xf7, 0x05, 0x3b, 0xdb, 0x9c, 0x76,
0xe1, 0xee, 0x6f, 0xbe, 0xd4, 0xdd, 0xbf, 0x0f, 0x30, 0xc9, 0x46, 0x61, 0x2c, 0x12, 0x7c, 0x42,
0x83, 0xb6, 0xe1, 0x6e, 0x57, 0x71, 0xf7, 0x0d, 0x82, 0xfa, 0x88, 0xb6, 0xc3, 0xce, 0x4f, 0x35,
0x78, 0x6d, 0xc9, 0x28, 0xf2, 0x21, 0x56, 0x85, 0x5d, 0xbc, 0xec, 0xf9, 0x76, 0x3c, 0x5a, 0x60,
0xc9, 0x0e, 0xf8, 0xfa, 0x8e, 0x70, 0x29, 0xb9, 0xbd, 0xfd, 0x3e, 0x5d, 0x2c, 0x90, 0x00, 0x1a,
0x6c, 0x92, 0x30, 0xbd, 0xb7, 0x6e, 0xf6, 0x8a, 0x69, 0xe7, 0x67, 0x6c, 0xc4, 0x4e, 0xec, 0xaa,
0x1b, 0xb1, 0x3b, 0x76, 0xe9, 0x66, 0x3d, 0x80, 0x4d, 0x1b, 0x4e, 0x57, 0x12, 0xde, 0xff, 0x06,
0xb5, 0x65, 0xf1, 0xb6, 0x1c, 0x1e, 0x80, 0x97, 0xe4, 0x6c, 0xea, 0x9a, 0x70, 0xe5, 0xc9, 0xc3,
0xa3, 0xde, 0xe1, 0x37, 0xb9, 0xad, 0xec, 0x26, 0x3a, 0xea, 0xe9, 0x05, 0x6a, 0x68, 0x9d, 0x7f,
0x30, 0x20, 0x83, 0xc9, 0x4c, 0x2a, 0x2e, 0xae, 0x3a, 0x20, 0xee, 0xd8, 0xa5, 0x80, 0x0c, 0xa0,
0x21, 0xb2, 0x4c, 0x85, 0x11, 0xbb, 0x2c, 0x16, 0x14, 0x21, 0x83, 0x5e, 0x7f, 0x4b, 0x13, 0x75,
0x23, 0xb1, 0x73, 0x5a, 0xd7, 0xd4, 0x01, 0xc3, 0x26, 0x7f, 0xa3, 0x68, 0xbf, 0x27, 0xb8, 0x22,
0x95, 0x60, 0x79, 0x38, 0xe6, 0x67, 0xfa, 0xb5, 0x5a, 0x5f, 0xf5, 0x4f, 0x71, 0x90, 0x46, 0xe2,
0xcc, 0x04, 0xea, 0x4b, 0x7e, 0x46, 0xaf, 0x3b, 0x81, 0x7e, 0xc1, 0xc7, 0x45, 0x49, 0x3e, 0x83,
0x1d, 0x3e, 0x87, 0x69, 0xc5, 0x70, 0x82, 0x3f, 0x76, 0xf8, 0xb0, 0x84, 0xd1, 0x04, 0x15, 0x4d,
0x6f, 0xf3, 0xe8, 0x4d, 0x5e, 0x96, 0xfa, 0xca, 0x22, 0x06, 0x1a, 0xd0, 0xdf, 0x39, 0x7f, 0xb6,
0x7b, 0xed, 0x4f, 0xfc, 0xfe, 0x7d, 0xb6, 0x5b, 0xfb, 0xf1, 0xf9, 0x6e, 0xed, 0x1c, 0xbf, 0x3f,
0xf0, 0xfb, 0x0b, 0xbf, 0x93, 0xba, 0xf9, 0xbd, 0xfd, 0xe0, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff,
0x56, 0x49, 0xe6, 0x55, 0x4e, 0x0b, 0x00, 0x00,
}

View file

@ -62,6 +62,10 @@ message Service {
// the optional fields like node_port or virtual_ip and it
// could be auto allocated by the system.
Endpoint endpoint = 4;
// UpdateStatus contains the status of an update, if one is in
// progress.
UpdateStatus update_status = 5;
}
// Endpoint specified all the network parameters required to

File diff suppressed because it is too large Load diff

View file

@ -277,6 +277,54 @@ message UpdateConfig {
// Amount of time between updates.
Duration delay = 2 [(gogoproto.nullable) = false];
enum FailureAction {
PAUSE = 0;
CONTINUE = 1;
// TODO(aaronl): Add ROLLBACK as a supported failure mode.
// (#486)
}
// FailureAction is the action to take when an update failures.
// Currently, a failure is defined as a single updated task failing to
// reach the RUNNING state. In the future, there will be configuration
// to define what is treated as a failure (see #486 for a proposal).
FailureAction failure_action = 3;
}
// UpdateStatus is the status of an update in progress.
message UpdateStatus {
enum UpdateState {
UNKNOWN = 0;
UPDATING = 1;
PAUSED = 2;
COMPLETED = 3;
// TODO(aaronl): add ROLLING_BACK, ROLLED_BACK as part of
// rollback support.
}
// State is the state of this update. It indicates whether the
// update is in progress, completed, or is paused.
UpdateState state = 1;
// StartedAt is the time at which the update was started.
Timestamp started_at = 2;
// CompletedAt is the time at which the update completed.
Timestamp completed_at = 3;
// TODO(aaronl): Consider adding a timestamp showing when the most
// recent task update took place. Currently, this is nontrivial
// because each service update kicks off a replacement update, so
// updating the service object with a timestamp at every step along
// the rolling update would cause the rolling update to be constantly
// restarted.
// Message explains how the update got into its current state. For
// example, if the update is paused, it will explain what is preventing
// the update from proceeding (typically the failure of a task to start up
// when OnFailure is PAUSE).
string message = 4;
}
// TaskState enumerates the states that a task progresses through within an

View file

@ -122,7 +122,7 @@ func AuthorizeForwardedRoleAndOrg(ctx context.Context, authorizedRoles, forwarde
// This was a forwarded request. Authorize the forwarder, and
// check if the forwarded role matches one of the authorized
// roles.
forwardedID, forwardedOrg, forwardedOUs := forwardedTLSInfoFromContext(ctx)
_, forwardedID, forwardedOrg, forwardedOUs := forwardedTLSInfoFromContext(ctx)
if len(forwardedOUs) == 0 || forwardedID == "" || forwardedOrg == "" {
return "", grpc.Errorf(codes.PermissionDenied, "Permission denied: missing information in forwarded request")
@ -178,6 +178,10 @@ type RemoteNodeInfo struct {
// ForwardedBy contains information for the node that forwarded this
// request. It is set to nil if the request was received directly.
ForwardedBy *RemoteNodeInfo
// RemoteAddr is the address that this node is connecting to the cluster
// from.
RemoteAddr string
}
// RemoteNode returns the node ID and role from the client's TLS certificate.
@ -195,18 +199,30 @@ func RemoteNode(ctx context.Context) (RemoteNodeInfo, error) {
org = certSubj.Organization[0]
}
peer, ok := peer.FromContext(ctx)
if !ok {
return RemoteNodeInfo{}, grpc.Errorf(codes.PermissionDenied, "Permission denied: no peer info")
}
directInfo := RemoteNodeInfo{
Roles: certSubj.OrganizationalUnit,
NodeID: certSubj.CommonName,
Organization: org,
RemoteAddr: peer.Addr.String(),
}
if isForwardedRequest(ctx) {
cn, org, ous := forwardedTLSInfoFromContext(ctx)
remoteAddr, cn, org, ous := forwardedTLSInfoFromContext(ctx)
if len(ous) == 0 || cn == "" || org == "" {
return RemoteNodeInfo{}, grpc.Errorf(codes.PermissionDenied, "Permission denied: missing information in forwarded request")
}
return RemoteNodeInfo{Roles: ous, NodeID: cn, Organization: org, ForwardedBy: &directInfo}, nil
return RemoteNodeInfo{
Roles: ous,
NodeID: cn,
Organization: org,
ForwardedBy: &directInfo,
RemoteAddr: remoteAddr,
}, nil
}
return directInfo, nil

View file

@ -3,6 +3,7 @@ package ca
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
const (
@ -10,20 +11,24 @@ const (
certCNKey = "forwarded_cert_cn"
certOUKey = "forwarded_cert_ou"
certOrgKey = "forwarded_cert_org"
remoteAddrKey = "remote_addr"
)
// forwardedTLSInfoFromContext obtains forwarded TLS CN/OU from the grpc.MD
// object in ctx.
func forwardedTLSInfoFromContext(ctx context.Context) (string, string, []string) {
var cn, org string
func forwardedTLSInfoFromContext(ctx context.Context) (remoteAddr string, cn string, org string, ous []string) {
md, _ := metadata.FromContext(ctx)
if len(md[remoteAddrKey]) != 0 {
remoteAddr = md[remoteAddrKey][0]
}
if len(md[certCNKey]) != 0 {
cn = md[certCNKey][0]
}
if len(md[certOrgKey]) != 0 {
org = md[certOrgKey][0]
}
return cn, org, md[certOUKey]
ous = md[certOUKey]
return
}
func isForwardedRequest(ctx context.Context) bool {
@ -54,6 +59,7 @@ func WithMetadataForwardTLSInfo(ctx context.Context) (context.Context, error) {
org = certSubj.Organization[0]
}
}
// If there's no TLS cert, forward with blank TLS metadata.
// Note that the presence of this blank metadata is extremely
// important. Without it, it would look like manager is making
@ -62,6 +68,10 @@ func WithMetadataForwardTLSInfo(ctx context.Context) (context.Context, error) {
md[certCNKey] = []string{cn}
md[certOrgKey] = []string{org}
md[certOUKey] = ous
peer, ok := peer.FromContext(ctx)
if ok {
md[remoteAddrKey] = []string{peer.Addr.String()}
}
return metadata.NewContext(ctx, md), nil
}

View file

@ -16,7 +16,7 @@ import (
)
var (
// alpnProtoStr are the specified application level protocols for gRPC.
// alpnProtoStr is the specified application level protocols for gRPC.
alpnProtoStr = []string{"h2"}
)

View file

@ -3,6 +3,7 @@ package controlapi
import (
"errors"
"reflect"
"strconv"
"github.com/docker/engine-api/types/reference"
"github.com/docker/swarmkit/api"
@ -144,6 +145,10 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
return nil
}
if len(epSpec.Ports) > 0 && epSpec.Mode == api.ResolutionModeDNSRoundRobin {
return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: ports can't be used with dnsrr mode")
}
portSet := make(map[api.PortConfig]struct{})
for _, port := range epSpec.Ports {
if _, ok := portSet[*port]; ok {
@ -175,6 +180,59 @@ func validateServiceSpec(spec *api.ServiceSpec) error {
return nil
}
// checkPortConflicts does a best effort to find if the passed in spec has port
// conflicts with existing services.
func (s *Server) checkPortConflicts(spec *api.ServiceSpec) error {
if spec.Endpoint == nil {
return nil
}
pcToString := func(pc *api.PortConfig) string {
port := strconv.FormatUint(uint64(pc.PublishedPort), 10)
return port + "/" + pc.Protocol.String()
}
reqPorts := make(map[string]bool)
for _, pc := range spec.Endpoint.Ports {
if pc.PublishedPort > 0 {
reqPorts[pcToString(pc)] = true
}
}
if len(reqPorts) == 0 {
return nil
}
var (
services []*api.Service
err error
)
s.store.View(func(tx store.ReadTx) {
services, err = store.FindServices(tx, store.All)
})
if err != nil {
return err
}
for _, service := range services {
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
}
}
}
if service.Endpoint != nil {
for _, pc := range service.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
}
}
}
}
return nil
}
// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
@ -185,6 +243,10 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.checkPortConflicts(request.Spec); err != nil {
return nil, err
}
// TODO(aluzzardi): Consider using `Name` as a primary key to handle
// duplicate creations. See #65
service := &api.Service{
@ -239,6 +301,19 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
var service *api.Service
s.store.View(func(tx store.ReadTx) {
service = store.GetService(tx, request.ServiceID)
})
if service == nil {
return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
}
if request.Spec.Endpoint != nil && !reflect.DeepEqual(request.Spec.Endpoint, service.Spec.Endpoint) {
if err := s.checkPortConflicts(request.Spec); err != nil {
return nil, err
}
}
err := s.store.Update(func(tx store.Tx) error {
service = store.GetService(tx, request.ServiceID)
if service == nil {
@ -257,6 +332,10 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
service.Meta.Version = *request.ServiceVersion
service.Spec = *request.Spec.Copy()
// Reset update status
service.UpdateStatus = nil
return store.UpdateService(tx, service)
})
if err != nil {

View file

@ -52,7 +52,7 @@ var (
// ErrSessionInvalid returned when the session in use is no longer valid.
// The node should re-register and start a new session.
ErrSessionInvalid = errors.New("session invalid")
// ErrNodeNotFound returned when the Node doesn't exists in raft.
// ErrNodeNotFound returned when the Node doesn't exist in raft.
ErrNodeNotFound = errors.New("node not found")
)

View file

@ -33,7 +33,7 @@ import (
const (
// defaultTaskHistoryRetentionLimit is the number of tasks to keep.
defaultTaskHistoryRetentionLimit = 10
defaultTaskHistoryRetentionLimit = 5
)
// Config is used to tune the Manager.
@ -49,6 +49,9 @@ type Config struct {
// ProtoAddr fields will be used to create listeners otherwise.
ProtoListener map[string]net.Listener
// AdvertiseAddr is a map of addresses to advertise, by protocol.
AdvertiseAddr string
// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
JoinRaft string
@ -120,41 +123,17 @@ func New(config *Config) (*Manager, error) {
tcpAddr := config.ProtoAddr["tcp"]
if config.AdvertiseAddr != "" {
tcpAddr = config.AdvertiseAddr
}
if tcpAddr == "" {
return nil, errors.New("no tcp listen address or listener provided")
}
listenHost, listenPort, err := net.SplitHostPort(tcpAddr)
if err == nil {
ip := net.ParseIP(listenHost)
if ip != nil && ip.IsUnspecified() {
// Find our local IP address associated with the default route.
// This may not be the appropriate address to use for internal
// cluster communications, but it seems like the best default.
// The admin can override this address if necessary.
conn, err := net.Dial("udp", "8.8.8.8:53")
if err != nil {
return nil, fmt.Errorf("could not determine local IP address: %v", err)
}
localAddr := conn.LocalAddr().String()
conn.Close()
listenHost, _, err = net.SplitHostPort(localAddr)
if err != nil {
return nil, fmt.Errorf("could not split local IP address: %v", err)
}
tcpAddr = net.JoinHostPort(listenHost, listenPort)
}
}
// TODO(stevvooe): Reported address of manager is plumbed to listen addr
// for now, may want to make this separate. This can be tricky to get right
// so we need to make it easy to override. This needs to be the address
// through which agent nodes access the manager.
dispatcherConfig.Addr = tcpAddr
err = os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)
err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)
if err != nil {
return nil, fmt.Errorf("failed to create socket directory: %v", err)
}
@ -359,7 +338,7 @@ func (m *Manager) Run(parent context.Context) error {
if err != nil {
log.G(ctx).WithError(err).Error("failed to create allocator")
// TODO(stevvooe): It doesn't seem correct here to fail
// creating the allocator but then use it anyways.
// creating the allocator but then use it anyway.
}
go func(keyManager *keymanager.KeyManager) {

View file

@ -62,8 +62,14 @@ func (r *ReplicatedOrchestrator) Run(ctx context.Context) error {
if err = r.initTasks(ctx, readTx); err != nil {
return
}
err = r.initServices(readTx)
err = r.initCluster(readTx)
if err = r.initServices(readTx); err != nil {
return
}
if err = r.initCluster(readTx); err != nil {
return
}
})
if err != nil {
return err

View file

@ -31,8 +31,13 @@ type instanceRestartInfo struct {
}
type delayedStart struct {
// cancel is called to cancel the delayed start.
cancel func()
doneCh chan struct{}
// waiter is set to true if the next restart is waiting for this delay
// to complete.
waiter bool
}
// RestartSupervisor initiates and manages restarts. It's responsible for
@ -40,7 +45,7 @@ type delayedStart struct {
type RestartSupervisor struct {
mu sync.Mutex
store *store.MemoryStore
delays map[string]delayedStart
delays map[string]*delayedStart
history map[instanceTuple]*instanceRestartInfo
historyByService map[string]map[instanceTuple]struct{}
taskTimeout time.Duration
@ -50,18 +55,59 @@ type RestartSupervisor struct {
func NewRestartSupervisor(store *store.MemoryStore) *RestartSupervisor {
return &RestartSupervisor{
store: store,
delays: make(map[string]delayedStart),
delays: make(map[string]*delayedStart),
history: make(map[instanceTuple]*instanceRestartInfo),
historyByService: make(map[string]map[instanceTuple]struct{}),
taskTimeout: defaultOldTaskTimeout,
}
}
func (r *RestartSupervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) {
// Wait for the last restart delay to elapse.
select {
case <-oldDelay.doneCh:
case <-ctx.Done():
return
}
// Start the next restart
err := r.store.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
return nil
}
service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
}
return r.Restart(ctx, tx, cluster, service, *t)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart")
}
}
// Restart initiates a new task to replace t if appropriate under the service's
// restart policy.
func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error {
// TODO(aluzzardi): This function should not depend on `service`.
// Is the old task still in the process of restarting? If so, wait for
// its restart delay to elapse, to avoid tight restart loops (for
// example, when the image doesn't exist).
r.mu.Lock()
oldDelay, ok := r.delays[t.ID]
if ok {
if !oldDelay.waiter {
oldDelay.waiter = true
go r.waitRestart(ctx, oldDelay, cluster, t.ID)
}
r.mu.Unlock()
return nil
}
r.mu.Unlock()
t.DesiredState = api.TaskStateShutdown
err := store.UpdateTask(tx, &t)
if err != nil {
@ -87,10 +133,10 @@ func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *a
n := store.GetNode(tx, t.NodeID)
restartTask.DesiredState = api.TaskStateAccepted
restartTask.DesiredState = api.TaskStateReady
var restartDelay time.Duration
// Restart delay does not applied to drained nodes
// Restart delay is not applied to drained nodes
if n == nil || n.Spec.Availability != api.NodeAvailabilityDrain {
if t.Spec.Restart != nil && t.Spec.Restart.Delay != nil {
var err error
@ -254,7 +300,7 @@ func (r *RestartSupervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask
<-oldDelay.doneCh
r.mu.Lock()
}
r.delays[newTaskID] = delayedStart{cancel: cancel, doneCh: doneCh}
r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh}
r.mu.Unlock()
var watch chan events.Event

View file

@ -56,7 +56,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea
continue
}
// TODO(aluzzardi): This is shady. We should have a more generic condition.
if t.DesiredState != api.TaskStateAccepted || !isReplicatedService(service) {
if t.DesiredState != api.TaskStateReady || !isReplicatedService(service) {
continue
}
restartDelay := defaultRestartDelay
@ -80,7 +80,7 @@ func (r *ReplicatedOrchestrator) initTasks(ctx context.Context, readTx store.Rea
_ = batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, t.ID)
// TODO(aluzzardi): This is shady as well. We should have a more generic condition.
if t == nil || t.DesiredState != api.TaskStateAccepted {
if t == nil || t.DesiredState != api.TaskStateReady {
return nil
}
r.restarts.DelayStart(ctx, tx, nil, t.ID, restartDelay, true)

View file

@ -8,6 +8,7 @@ import (
"golang.org/x/net/context"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state"
@ -43,13 +44,17 @@ func (u *UpdateSupervisor) Update(ctx context.Context, cluster *api.Cluster, ser
id := service.ID
if update, ok := u.updates[id]; ok {
if !update.isServiceDirty(service) {
// There's already an update working towards this goal.
return
}
update.Cancel()
}
update := NewUpdater(u.store, u.restarts)
update := NewUpdater(u.store, u.restarts, cluster, service)
u.updates[id] = update
go func() {
update.Run(ctx, cluster, service, tasks)
update.Run(ctx, tasks)
u.l.Lock()
if u.updates[id] == update {
delete(u.updates, id)
@ -74,6 +79,9 @@ type Updater struct {
watchQueue *watch.Queue
restarts *RestartSupervisor
cluster *api.Cluster
newService *api.Service
// stopChan signals to the state machine to stop running.
stopChan chan struct{}
// doneChan is closed when the state machine terminates.
@ -81,11 +89,13 @@ type Updater struct {
}
// NewUpdater creates a new Updater.
func NewUpdater(store *store.MemoryStore, restartSupervisor *RestartSupervisor) *Updater {
func NewUpdater(store *store.MemoryStore, restartSupervisor *RestartSupervisor, cluster *api.Cluster, newService *api.Service) *Updater {
return &Updater{
store: store,
watchQueue: store.WatchQueue(),
restarts: restartSupervisor,
cluster: cluster.Copy(),
newService: newService.Copy(),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
@ -98,22 +108,35 @@ func (u *Updater) Cancel() {
}
// Run starts the update and returns only once its complete or cancelled.
func (u *Updater) Run(ctx context.Context, cluster *api.Cluster, service *api.Service, tasks []*api.Task) {
func (u *Updater) Run(ctx context.Context, tasks []*api.Task) {
defer close(u.doneChan)
service := u.newService
// If the update is in a PAUSED state, we should not do anything.
if service.UpdateStatus != nil && service.UpdateStatus.State == api.UpdateStatus_PAUSED {
return
}
dirtyTasks := []*api.Task{}
for _, t := range tasks {
if !reflect.DeepEqual(service.Spec.Task, t.Spec) ||
(t.Endpoint != nil &&
!reflect.DeepEqual(service.Spec.Endpoint, t.Endpoint.Spec)) {
if u.isTaskDirty(t) {
dirtyTasks = append(dirtyTasks, t)
}
}
// Abort immediately if all tasks are clean.
if len(dirtyTasks) == 0 {
if service.UpdateStatus != nil && service.UpdateStatus.State == api.UpdateStatus_UPDATING {
u.completeUpdate(ctx, service.ID)
}
return
}
// If there's no update in progress, we are starting one.
if service.UpdateStatus == nil {
u.startUpdate(ctx, service.ID)
}
parallelism := 0
if service.Spec.Update != nil {
parallelism = int(service.Spec.Update.Parallelism)
@ -130,39 +153,76 @@ func (u *Updater) Run(ctx context.Context, cluster *api.Cluster, service *api.Se
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
u.worker(ctx, cluster, service, taskQueue)
u.worker(ctx, taskQueue)
wg.Done()
}()
}
for _, t := range dirtyTasks {
// Wait for a worker to pick up the task or abort the update, whichever comes first.
select {
case <-u.stopChan:
break
var failedTaskWatch chan events.Event
case taskQueue <- t:
if service.Spec.Update == nil || service.Spec.Update.FailureAction == api.UpdateConfig_PAUSE {
var cancelWatch func()
failedTaskWatch, cancelWatch = state.Watch(
u.store.WatchQueue(),
state.EventUpdateTask{
Task: &api.Task{ServiceID: service.ID, Status: api.TaskStatus{State: api.TaskStateRunning}},
Checks: []state.TaskCheckFunc{state.TaskCheckServiceID, state.TaskCheckStateGreaterThan},
},
)
defer cancelWatch()
}
stopped := false
taskLoop:
for _, t := range dirtyTasks {
retryLoop:
for {
// Wait for a worker to pick up the task or abort the update, whichever comes first.
select {
case <-u.stopChan:
stopped = true
break taskLoop
case ev := <-failedTaskWatch:
failedTask := ev.(state.EventUpdateTask).Task
// If this failed/completed task has a spec matching
// the one we're updating to, we should pause the
// update.
if !u.isTaskDirty(failedTask) {
stopped = true
message := fmt.Sprintf("update paused due to failure or early termination of task %s", failedTask.ID)
u.pauseUpdate(ctx, service.ID, message)
break taskLoop
}
case taskQueue <- t:
break retryLoop
}
}
}
close(taskQueue)
wg.Wait()
if !stopped {
u.completeUpdate(ctx, service.ID)
}
}
func (u *Updater) worker(ctx context.Context, cluster *api.Cluster, service *api.Service, queue <-chan *api.Task) {
func (u *Updater) worker(ctx context.Context, queue <-chan *api.Task) {
for t := range queue {
updated := newTask(cluster, service, t.Slot)
updated := newTask(u.cluster, u.newService, t.Slot)
updated.DesiredState = api.TaskStateReady
if isGlobalService(service) {
if isGlobalService(u.newService) {
updated.NodeID = t.NodeID
}
if err := u.updateTask(ctx, service, t, updated); err != nil {
if err := u.updateTask(ctx, t, updated); err != nil {
log.G(ctx).WithError(err).WithField("task.id", t.ID).Error("update failed")
}
if service.Spec.Update != nil && (service.Spec.Update.Delay.Seconds != 0 || service.Spec.Update.Delay.Nanos != 0) {
delay, err := ptypes.Duration(&service.Spec.Update.Delay)
if u.newService.Spec.Update != nil && (u.newService.Spec.Update.Delay.Seconds != 0 || u.newService.Spec.Update.Delay.Nanos != 0) {
delay, err := ptypes.Duration(&u.newService.Spec.Update.Delay)
if err != nil {
log.G(ctx).WithError(err).Error("invalid update delay")
continue
@ -176,7 +236,7 @@ func (u *Updater) worker(ctx context.Context, cluster *api.Cluster, service *api
}
}
func (u *Updater) updateTask(ctx context.Context, service *api.Service, original, updated *api.Task) error {
func (u *Updater) updateTask(ctx context.Context, original, updated *api.Task) error {
log.G(ctx).Debugf("replacing %s with %s", original.ID, updated.ID)
// Kick off the watch before even creating the updated task. This is in order to avoid missing any event.
taskUpdates, cancel := state.Watch(u.watchQueue, state.EventUpdateTask{
@ -231,3 +291,86 @@ func (u *Updater) updateTask(ctx context.Context, service *api.Service, original
}
}
}
func (u *Updater) isTaskDirty(t *api.Task) bool {
return !reflect.DeepEqual(u.newService.Spec.Task, t.Spec) ||
(t.Endpoint != nil && !reflect.DeepEqual(u.newService.Spec.Endpoint, t.Endpoint.Spec))
}
func (u *Updater) isServiceDirty(service *api.Service) bool {
return !reflect.DeepEqual(u.newService.Spec.Task, service.Spec.Task) ||
!reflect.DeepEqual(u.newService.Spec.Endpoint, service.Spec.Endpoint)
}
func (u *Updater) startUpdate(ctx context.Context, serviceID string) {
err := u.store.Update(func(tx store.Tx) error {
service := store.GetService(tx, serviceID)
if service == nil {
return nil
}
if service.UpdateStatus != nil {
return nil
}
service.UpdateStatus = &api.UpdateStatus{
State: api.UpdateStatus_UPDATING,
Message: "update in progress",
StartedAt: ptypes.MustTimestampProto(time.Now()),
}
return store.UpdateService(tx, service)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to mark update of service %s in progress", serviceID)
}
}
func (u *Updater) pauseUpdate(ctx context.Context, serviceID, message string) {
log.G(ctx).Debugf("pausing update of service %s", serviceID)
err := u.store.Update(func(tx store.Tx) error {
service := store.GetService(tx, serviceID)
if service == nil {
return nil
}
if service.UpdateStatus == nil {
// The service was updated since we started this update
return nil
}
service.UpdateStatus.State = api.UpdateStatus_PAUSED
service.UpdateStatus.Message = message
return store.UpdateService(tx, service)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to pause update of service %s", serviceID)
}
}
func (u *Updater) completeUpdate(ctx context.Context, serviceID string) {
log.G(ctx).Debugf("update of service %s complete", serviceID)
err := u.store.Update(func(tx store.Tx) error {
service := store.GetService(tx, serviceID)
if service == nil {
return nil
}
if service.UpdateStatus == nil {
// The service was changed since we started this update
return nil
}
service.UpdateStatus.State = api.UpdateStatus_COMPLETED
service.UpdateStatus.Message = "update completed"
service.UpdateStatus.CompletedAt = ptypes.MustTimestampProto(time.Now())
return store.UpdateService(tx, service)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to mark update of service %s complete", serviceID)
}
}

View file

@ -2,8 +2,10 @@ package raft
import (
"errors"
"fmt"
"math"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
@ -537,13 +539,33 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
}
}
remoteAddr := req.Addr
// If the joining node sent an address like 0.0.0.0:4242, automatically
// determine its actual address based on the GRPC connection. This
// avoids the need for a prospective member to know its own address.
requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
if err != nil {
return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr)
}
requestIP := net.ParseIP(requestHost)
if requestIP != nil && requestIP.IsUnspecified() {
remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
if err != nil {
return nil, err
}
remoteAddr = net.JoinHostPort(remoteHost, requestPort)
}
// We do not bother submitting a configuration change for the
// new member if we can't contact it back using its address
if err := n.checkHealth(ctx, req.Addr, 5*time.Second); err != nil {
if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
return nil, err
}
err = n.addMember(ctx, req.Addr, raftID, nodeInfo.NodeID)
err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
if err != nil {
log.WithError(err).Errorf("failed to add member")
return nil, err

View file

@ -40,6 +40,11 @@ func TaskCheckNodeID(t1, t2 *api.Task) bool {
return t1.NodeID == t2.NodeID
}
// TaskCheckServiceID is a TaskCheckFunc for matching service IDs.
func TaskCheckServiceID(t1, t2 *api.Task) bool {
return t1.ServiceID == t2.ServiceID
}
// TaskCheckStateGreaterThan is a TaskCheckFunc for checking task state.
func TaskCheckStateGreaterThan(t1, t2 *api.Task) bool {
return t2.Status.State > t1.Status.State