|
@@ -32,9 +32,10 @@ type State struct {
|
|
StartedAt time.Time
|
|
StartedAt time.Time
|
|
FinishedAt time.Time
|
|
FinishedAt time.Time
|
|
Health *Health
|
|
Health *Health
|
|
|
|
+ Removed bool `json:"-"`
|
|
|
|
|
|
- waitStop chan struct{}
|
|
|
|
- waitRemove chan struct{}
|
|
|
|
|
|
+ stopWaiters []chan<- StateStatus
|
|
|
|
+ removeOnlyWaiters []chan<- StateStatus
|
|
}
|
|
}
|
|
|
|
|
|
// StateStatus is used to return container wait results.
|
|
// StateStatus is used to return container wait results.
|
|
@@ -57,12 +58,9 @@ func (s StateStatus) Err() error {
|
|
return s.err
|
|
return s.err
|
|
}
|
|
}
|
|
|
|
|
|
-// NewState creates a default state object with a fresh channel for state changes.
|
|
|
|
|
|
+// NewState creates a default state object.
|
|
func NewState() *State {
|
|
func NewState() *State {
|
|
- return &State{
|
|
|
|
- waitStop: make(chan struct{}),
|
|
|
|
- waitRemove: make(chan struct{}),
|
|
|
|
- }
|
|
|
|
|
|
+ return &State{}
|
|
}
|
|
}
|
|
|
|
|
|
// String returns a human-readable description of the state
|
|
// String returns a human-readable description of the state
|
|
@@ -182,11 +180,10 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|
s.Lock()
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
defer s.Unlock()
|
|
|
|
|
|
- if condition == WaitConditionNotRunning && !s.Running {
|
|
|
|
- // Buffer so we can put it in the channel now.
|
|
|
|
- resultC := make(chan StateStatus, 1)
|
|
|
|
|
|
+ // Buffer so we can put status and finish even nobody receives it.
|
|
|
|
+ resultC := make(chan StateStatus, 1)
|
|
|
|
|
|
- // Send the current status.
|
|
|
|
|
|
+ if s.conditionAlreadyMet(condition) {
|
|
resultC <- StateStatus{
|
|
resultC <- StateStatus{
|
|
exitCode: s.ExitCode(),
|
|
exitCode: s.ExitCode(),
|
|
err: s.Err(),
|
|
err: s.Err(),
|
|
@@ -195,19 +192,16 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|
return resultC
|
|
return resultC
|
|
}
|
|
}
|
|
|
|
|
|
- // If we are waiting only for removal, the waitStop channel should
|
|
|
|
- // remain nil and block forever.
|
|
|
|
- var waitStop chan struct{}
|
|
|
|
- if condition < WaitConditionRemoved {
|
|
|
|
- waitStop = s.waitStop
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Always wait for removal, just in case the container gets removed
|
|
|
|
- // while it is still in a "created" state, in which case it is never
|
|
|
|
- // actually stopped.
|
|
|
|
- waitRemove := s.waitRemove
|
|
|
|
|
|
+ waitC := make(chan StateStatus, 1)
|
|
|
|
|
|
- resultC := make(chan StateStatus, 1)
|
|
|
|
|
|
+ // Removal wakes up both removeOnlyWaiters and stopWaiters
|
|
|
|
+ // Container could be removed while still in "created" state
|
|
|
|
+ // in which case it is never actually stopped
|
|
|
|
+ if condition == WaitConditionRemoved {
|
|
|
|
+ s.removeOnlyWaiters = append(s.removeOnlyWaiters, waitC)
|
|
|
|
+ } else {
|
|
|
|
+ s.stopWaiters = append(s.stopWaiters, waitC)
|
|
|
|
+ }
|
|
|
|
|
|
go func() {
|
|
go func() {
|
|
select {
|
|
select {
|
|
@@ -218,23 +212,25 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|
err: ctx.Err(),
|
|
err: ctx.Err(),
|
|
}
|
|
}
|
|
return
|
|
return
|
|
- case <-waitStop:
|
|
|
|
- case <-waitRemove:
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- s.Lock()
|
|
|
|
- result := StateStatus{
|
|
|
|
- exitCode: s.ExitCode(),
|
|
|
|
- err: s.Err(),
|
|
|
|
|
|
+ case status := <-waitC:
|
|
|
|
+ resultC <- status
|
|
}
|
|
}
|
|
- s.Unlock()
|
|
|
|
-
|
|
|
|
- resultC <- result
|
|
|
|
}()
|
|
}()
|
|
|
|
|
|
return resultC
|
|
return resultC
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (s *State) conditionAlreadyMet(condition WaitCondition) bool {
|
|
|
|
+ switch condition {
|
|
|
|
+ case WaitConditionNotRunning:
|
|
|
|
+ return !s.Running
|
|
|
|
+ case WaitConditionRemoved:
|
|
|
|
+ return s.Removed
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return false
|
|
|
|
+}
|
|
|
|
+
|
|
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
|
|
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
|
|
func (s *State) IsRunning() bool {
|
|
func (s *State) IsRunning() bool {
|
|
s.Lock()
|
|
s.Lock()
|
|
@@ -292,8 +288,8 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
|
|
}
|
|
}
|
|
s.ExitCodeValue = exitStatus.ExitCode
|
|
s.ExitCodeValue = exitStatus.ExitCode
|
|
s.OOMKilled = exitStatus.OOMKilled
|
|
s.OOMKilled = exitStatus.OOMKilled
|
|
- close(s.waitStop) // fire waiters for stop
|
|
|
|
- s.waitStop = make(chan struct{})
|
|
|
|
|
|
+
|
|
|
|
+ s.notifyAndClear(&s.stopWaiters)
|
|
}
|
|
}
|
|
|
|
|
|
// SetRestarting sets the container state to "restarting" without locking.
|
|
// SetRestarting sets the container state to "restarting" without locking.
|
|
@@ -308,8 +304,8 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
|
|
s.FinishedAt = time.Now().UTC()
|
|
s.FinishedAt = time.Now().UTC()
|
|
s.ExitCodeValue = exitStatus.ExitCode
|
|
s.ExitCodeValue = exitStatus.ExitCode
|
|
s.OOMKilled = exitStatus.OOMKilled
|
|
s.OOMKilled = exitStatus.OOMKilled
|
|
- close(s.waitStop) // fire waiters for stop
|
|
|
|
- s.waitStop = make(chan struct{})
|
|
|
|
|
|
+
|
|
|
|
+ s.notifyAndClear(&s.stopWaiters)
|
|
}
|
|
}
|
|
|
|
|
|
// SetError sets the container's error state. This is useful when we want to
|
|
// SetError sets the container's error state. This is useful when we want to
|
|
@@ -374,22 +370,19 @@ func (s *State) IsDead() bool {
|
|
return res
|
|
return res
|
|
}
|
|
}
|
|
|
|
|
|
-// SetRemoved assumes this container is already in the "dead" state and
|
|
|
|
-// closes the internal waitRemove channel to unblock callers waiting for a
|
|
|
|
-// container to be removed.
|
|
|
|
|
|
+// SetRemoved assumes this container is already in the "dead" state and notifies all waiters.
|
|
func (s *State) SetRemoved() {
|
|
func (s *State) SetRemoved() {
|
|
s.SetRemovalError(nil)
|
|
s.SetRemovalError(nil)
|
|
}
|
|
}
|
|
|
|
|
|
// SetRemovalError is to be called in case a container remove failed.
|
|
// SetRemovalError is to be called in case a container remove failed.
|
|
-// It sets an error and closes the internal waitRemove channel to unblock
|
|
|
|
-// callers waiting for the container to be removed.
|
|
|
|
|
|
+// It sets an error and notifies all waiters.
|
|
func (s *State) SetRemovalError(err error) {
|
|
func (s *State) SetRemovalError(err error) {
|
|
s.SetError(err)
|
|
s.SetError(err)
|
|
s.Lock()
|
|
s.Lock()
|
|
- close(s.waitRemove) // Unblock those waiting on remove.
|
|
|
|
- // Recreate the channel so next ContainerWait will work
|
|
|
|
- s.waitRemove = make(chan struct{})
|
|
|
|
|
|
+ s.Removed = true
|
|
|
|
+ s.notifyAndClear(&s.removeOnlyWaiters)
|
|
|
|
+ s.notifyAndClear(&s.stopWaiters)
|
|
s.Unlock()
|
|
s.Unlock()
|
|
}
|
|
}
|
|
|
|
|
|
@@ -400,3 +393,15 @@ func (s *State) Err() error {
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
|
|
|
|
+ result := StateStatus{
|
|
|
|
+ exitCode: s.ExitCodeValue,
|
|
|
|
+ err: s.Err(),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, c := range *waiters {
|
|
|
|
+ c <- result
|
|
|
|
+ }
|
|
|
|
+ *waiters = nil
|
|
|
|
+}
|