pipeline.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package scheduler
  2. import (
  3. "sort"
  4. "github.com/docker/swarmkit/api"
  5. )
  6. var (
  7. defaultFilters = []Filter{
  8. // Always check for readiness first.
  9. &ReadyFilter{},
  10. &ResourceFilter{},
  11. &PluginFilter{},
  12. &ConstraintFilter{},
  13. &PlatformFilter{},
  14. &HostPortFilter{},
  15. }
  16. )
  17. type checklistEntry struct {
  18. f Filter
  19. enabled bool
  20. // failureCount counts the number of nodes that this filter failed
  21. // against.
  22. failureCount int
  23. }
  24. type checklistByFailures []checklistEntry
  25. func (c checklistByFailures) Len() int { return len(c) }
  26. func (c checklistByFailures) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
  27. func (c checklistByFailures) Less(i, j int) bool { return c[i].failureCount < c[j].failureCount }
  28. // Pipeline runs a set of filters against nodes.
  29. type Pipeline struct {
  30. // checklist is a slice of filters to run
  31. checklist []checklistEntry
  32. }
  33. // NewPipeline returns a pipeline with the default set of filters.
  34. func NewPipeline() *Pipeline {
  35. p := &Pipeline{}
  36. for _, f := range defaultFilters {
  37. p.checklist = append(p.checklist, checklistEntry{f: f})
  38. }
  39. return p
  40. }
  41. // Process a node through the filter pipeline.
  42. // Returns true if all filters pass, false otherwise.
  43. func (p *Pipeline) Process(n *NodeInfo) bool {
  44. for i, entry := range p.checklist {
  45. if entry.enabled && !entry.f.Check(n) {
  46. // Immediately stop on first failure.
  47. p.checklist[i].failureCount++
  48. return false
  49. }
  50. }
  51. for i := range p.checklist {
  52. p.checklist[i].failureCount = 0
  53. }
  54. return true
  55. }
  56. // SetTask sets up the filters to process a new task. Once this is called,
  57. // Process can be called repeatedly to try to assign the task various nodes.
  58. func (p *Pipeline) SetTask(t *api.Task) {
  59. for i := range p.checklist {
  60. p.checklist[i].enabled = p.checklist[i].f.SetTask(t)
  61. p.checklist[i].failureCount = 0
  62. }
  63. }
  64. // Explain returns a string explaining why a task could not be scheduled.
  65. func (p *Pipeline) Explain() string {
  66. var explanation string
  67. // Sort from most failures to least
  68. sortedByFailures := make([]checklistEntry, len(p.checklist))
  69. copy(sortedByFailures, p.checklist)
  70. sort.Sort(sort.Reverse(checklistByFailures(sortedByFailures)))
  71. for _, entry := range sortedByFailures {
  72. if entry.failureCount > 0 {
  73. if len(explanation) > 0 {
  74. explanation += "; "
  75. }
  76. explanation += entry.f.Explain(entry.failureCount)
  77. }
  78. }
  79. return explanation
  80. }