pipeline.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package scheduler
  2. import (
  3. "sort"
  4. "github.com/docker/swarmkit/api"
  5. )
  6. var (
  7. defaultFilters = []Filter{
  8. // Always check for readiness first.
  9. &ReadyFilter{},
  10. &ResourceFilter{},
  11. &PluginFilter{},
  12. &ConstraintFilter{},
  13. }
  14. )
  15. type checklistEntry struct {
  16. f Filter
  17. enabled bool
  18. // failureCount counts the number of nodes that this filter failed
  19. // against.
  20. failureCount int
  21. }
  22. type checklistByFailures []checklistEntry
  23. func (c checklistByFailures) Len() int { return len(c) }
  24. func (c checklistByFailures) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
  25. func (c checklistByFailures) Less(i, j int) bool { return c[i].failureCount < c[j].failureCount }
  26. // Pipeline runs a set of filters against nodes.
  27. type Pipeline struct {
  28. // checklist is a slice of filters to run
  29. checklist []checklistEntry
  30. }
  31. // NewPipeline returns a pipeline with the default set of filters.
  32. func NewPipeline() *Pipeline {
  33. p := &Pipeline{}
  34. for _, f := range defaultFilters {
  35. p.checklist = append(p.checklist, checklistEntry{f: f})
  36. }
  37. return p
  38. }
  39. // Process a node through the filter pipeline.
  40. // Returns true if all filters pass, false otherwise.
  41. func (p *Pipeline) Process(n *NodeInfo) bool {
  42. for i, entry := range p.checklist {
  43. if entry.enabled && !entry.f.Check(n) {
  44. // Immediately stop on first failure.
  45. p.checklist[i].failureCount++
  46. return false
  47. }
  48. }
  49. for i := range p.checklist {
  50. p.checklist[i].failureCount = 0
  51. }
  52. return true
  53. }
  54. // SetTask sets up the filters to process a new task. Once this is called,
  55. // Process can be called repeatedly to try to assign the task various nodes.
  56. func (p *Pipeline) SetTask(t *api.Task) {
  57. for i := range p.checklist {
  58. p.checklist[i].enabled = p.checklist[i].f.SetTask(t)
  59. p.checklist[i].failureCount = 0
  60. }
  61. }
  62. // Explain returns a string explaining why a task could not be scheduled.
  63. func (p *Pipeline) Explain() string {
  64. var explanation string
  65. // Sort from most failures to least
  66. sortedByFailures := make([]checklistEntry, len(p.checklist))
  67. copy(sortedByFailures, p.checklist)
  68. sort.Sort(sort.Reverse(checklistByFailures(sortedByFailures)))
  69. for _, entry := range sortedByFailures {
  70. if entry.failureCount > 0 {
  71. if len(explanation) > 0 {
  72. explanation += "; "
  73. }
  74. explanation += entry.f.Explain(entry.failureCount)
  75. }
  76. }
  77. return explanation
  78. }