edge.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963
  1. package solver
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/moby/buildkit/solver/internal/pipe"
  7. digest "github.com/opencontainers/go-digest"
  8. "github.com/pkg/errors"
  9. "github.com/sirupsen/logrus"
  10. )
  11. type edgeStatusType int
  12. const (
  13. edgeStatusInitial edgeStatusType = iota
  14. edgeStatusCacheFast
  15. edgeStatusCacheSlow
  16. edgeStatusComplete
  17. )
  18. func (t edgeStatusType) String() string {
  19. return []string{"initial", "cache-fast", "cache-slow", "complete"}[t]
  20. }
  21. func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
  22. e := &edge{
  23. edge: ed,
  24. op: op,
  25. depRequests: map[pipe.Receiver]*dep{},
  26. keyMap: map[string]struct{}{},
  27. cacheRecords: map[string]*CacheRecord{},
  28. cacheRecordsLoaded: map[string]struct{}{},
  29. index: index,
  30. }
  31. return e
  32. }
  33. type edge struct {
  34. edge Edge
  35. op activeOp
  36. edgeState
  37. depRequests map[pipe.Receiver]*dep
  38. deps []*dep
  39. cacheMapReq pipe.Receiver
  40. cacheMapDone bool
  41. cacheMapIndex int
  42. cacheMapDigests []digest.Digest
  43. execReq pipe.Receiver
  44. execCacheLoad bool
  45. err error
  46. cacheRecords map[string]*CacheRecord
  47. cacheRecordsLoaded map[string]struct{}
  48. keyMap map[string]struct{}
  49. noCacheMatchPossible bool
  50. allDepsCompletedCacheFast bool
  51. allDepsCompletedCacheSlow bool
  52. allDepsStateCacheSlow bool
  53. allDepsCompleted bool
  54. hasActiveOutgoing bool
  55. releaserCount int
  56. keysDidChange bool
  57. index *edgeIndex
  58. secondaryExporters []expDep
  59. }
  60. // dep holds state for a dependant edge
  61. type dep struct {
  62. req pipe.Receiver
  63. edgeState
  64. index Index
  65. keyMap map[string]*CacheKey
  66. slowCacheReq pipe.Receiver
  67. slowCacheComplete bool
  68. slowCacheFoundKey bool
  69. slowCacheKey *ExportableCacheKey
  70. err error
  71. }
  72. // expDep holds secorndary exporter info for dependency
  73. type expDep struct {
  74. index int
  75. cacheKey CacheKeyWithSelector
  76. }
  77. func newDep(i Index) *dep {
  78. return &dep{index: i, keyMap: map[string]*CacheKey{}}
  79. }
  80. // edgePipe is a pipe for requests between two edges
  81. type edgePipe struct {
  82. *pipe.Pipe
  83. From, Target *edge
  84. mu sync.Mutex
  85. }
  86. // edgeState hold basic mutable state info for an edge
  87. type edgeState struct {
  88. state edgeStatusType
  89. result *SharedCachedResult
  90. cacheMap *CacheMap
  91. keys []ExportableCacheKey
  92. }
  93. type edgeRequest struct {
  94. desiredState edgeStatusType
  95. currentState edgeState
  96. currentKeys int
  97. }
  98. // incrementReferenceCount increases the number of times release needs to be
  99. // called to release the edge. Called on merging edges.
  100. func (e *edge) incrementReferenceCount() {
  101. e.releaserCount++
  102. }
  103. // release releases the edge resources
  104. func (e *edge) release() {
  105. if e.releaserCount > 0 {
  106. e.releaserCount--
  107. return
  108. }
  109. e.index.Release(e)
  110. if e.result != nil {
  111. go e.result.Release(context.TODO())
  112. }
  113. }
  114. // commitOptions returns parameters for the op execution
  115. func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) {
  116. k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
  117. if len(e.deps) == 0 {
  118. keys := make([]*CacheKey, 0, len(e.cacheMapDigests))
  119. for _, dgst := range e.cacheMapDigests {
  120. keys = append(keys, NewCacheKey(dgst, e.edge.Index))
  121. }
  122. return keys, nil
  123. }
  124. inputs := make([][]CacheKeyWithSelector, len(e.deps))
  125. results := make([]CachedResult, len(e.deps))
  126. for i, dep := range e.deps {
  127. for _, k := range dep.result.CacheKeys() {
  128. inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector})
  129. }
  130. if dep.slowCacheKey != nil {
  131. inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
  132. }
  133. results[i] = dep.result
  134. }
  135. k.deps = inputs
  136. return []*CacheKey{k}, results
  137. }
  138. // isComplete returns true if edge state is final and will never change
  139. func (e *edge) isComplete() bool {
  140. return e.err != nil || e.result != nil
  141. }
  142. // finishIncoming finalizes the incoming pipe request
  143. func (e *edge) finishIncoming(req pipe.Sender) {
  144. err := e.err
  145. if req.Request().Canceled && err == nil {
  146. err = context.Canceled
  147. }
  148. if debugScheduler {
  149. logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
  150. }
  151. req.Finalize(&e.edgeState, err)
  152. }
  153. // updateIncoming updates the current value of incoming pipe request
  154. func (e *edge) updateIncoming(req pipe.Sender) {
  155. if debugScheduler {
  156. logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
  157. }
  158. req.Update(&e.edgeState)
  159. }
  160. // probeCache is called with unprocessed cache keys for dependency
  161. // if the key could match the edge, the cacheRecords for dependency are filled
  162. func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
  163. if len(depKeys) == 0 {
  164. return false
  165. }
  166. if e.op.IgnoreCache() {
  167. return false
  168. }
  169. keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
  170. if err != nil {
  171. e.err = errors.Wrap(err, "error on cache query")
  172. }
  173. found := false
  174. for _, k := range keys {
  175. if _, ok := d.keyMap[k.ID]; !ok {
  176. d.keyMap[k.ID] = k
  177. found = true
  178. }
  179. }
  180. return found
  181. }
  182. // checkDepMatchPossible checks if any cache matches are possible past this point
  183. func (e *edge) checkDepMatchPossible(dep *dep) {
  184. depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
  185. if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keyMap) == 0) {
  186. e.noCacheMatchPossible = true
  187. }
  188. }
  189. // slowCacheFunc returns the result based cache func for dependency if it exists
  190. func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
  191. if e.cacheMap == nil {
  192. return nil
  193. }
  194. return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
  195. }
  196. // preprocessFunc returns result based cache func
  197. func (e *edge) preprocessFunc(dep *dep) PreprocessFunc {
  198. if e.cacheMap == nil {
  199. return nil
  200. }
  201. return e.cacheMap.Deps[int(dep.index)].PreprocessFunc
  202. }
  203. // allDepsHaveKeys checks if all dependencies have at least one key. used for
  204. // determining if there is enough data for combining cache key for edge
  205. func (e *edge) allDepsHaveKeys(matching bool) bool {
  206. if e.cacheMap == nil {
  207. return false
  208. }
  209. for _, d := range e.deps {
  210. cond := len(d.keys) == 0
  211. if matching {
  212. cond = len(d.keyMap) == 0
  213. }
  214. if cond && d.slowCacheKey == nil && d.result == nil {
  215. return false
  216. }
  217. }
  218. return true
  219. }
  220. // depKeys returns all current dependency cache keys
  221. func (e *edge) currentIndexKey() *CacheKey {
  222. if e.cacheMap == nil {
  223. return nil
  224. }
  225. keys := make([][]CacheKeyWithSelector, len(e.deps))
  226. for i, d := range e.deps {
  227. if len(d.keys) == 0 && d.result == nil {
  228. return nil
  229. }
  230. for _, k := range d.keys {
  231. keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
  232. }
  233. if d.result != nil {
  234. for _, rk := range d.result.CacheKeys() {
  235. keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk})
  236. }
  237. if d.slowCacheKey != nil {
  238. keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
  239. }
  240. }
  241. }
  242. k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
  243. k.deps = keys
  244. return k
  245. }
  246. // slow cache keys can be computed in 2 phases if there are multiple deps.
  247. // first evaluate ones that didn't match any definition based keys
  248. func (e *edge) skipPhase2SlowCache(dep *dep) bool {
  249. isPhase1 := false
  250. for _, dep := range e.deps {
  251. if (!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil || dep.state < edgeStatusCacheSlow) && len(dep.keyMap) == 0 {
  252. isPhase1 = true
  253. break
  254. }
  255. }
  256. if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
  257. return true
  258. }
  259. return false
  260. }
  261. func (e *edge) skipPhase2FastCache(dep *dep) bool {
  262. isPhase1 := false
  263. for _, dep := range e.deps {
  264. if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
  265. isPhase1 = true
  266. break
  267. }
  268. }
  269. if isPhase1 && len(dep.keyMap) > 0 {
  270. return true
  271. }
  272. return false
  273. }
  274. // unpark is called by the scheduler with incoming requests and updates for
  275. // previous calls.
  276. // To avoid deadlocks and resource leaks this function needs to follow
  277. // following rules:
  278. // 1) this function needs to return unclosed outgoing requests if some incoming
  279. // requests were not completed
  280. // 2) this function may not return outgoing requests if it has completed all
  281. // incoming requests
  282. func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
  283. // process all incoming changes
  284. depChanged := false
  285. for _, upt := range updates {
  286. if changed := e.processUpdate(upt); changed {
  287. depChanged = true
  288. }
  289. }
  290. if depChanged {
  291. // the dep responses had changes. need to reevaluate edge state
  292. e.recalcCurrentState()
  293. }
  294. desiredState, done := e.respondToIncoming(incoming, allPipes)
  295. if done {
  296. return
  297. }
  298. cacheMapReq := false
  299. // set up new outgoing requests if needed
  300. if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) {
  301. index := e.cacheMapIndex
  302. e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
  303. cm, err := e.op.CacheMap(ctx, index)
  304. return cm, errors.Wrap(err, "failed to load cache key")
  305. })
  306. cacheMapReq = true
  307. }
  308. // execute op
  309. if e.execReq == nil && desiredState == edgeStatusComplete {
  310. if ok := e.execIfPossible(f); ok {
  311. return
  312. }
  313. }
  314. if e.execReq == nil {
  315. if added := e.createInputRequests(desiredState, f, false); !added && !e.hasActiveOutgoing && !cacheMapReq {
  316. logrus.Errorf("buildkit scheluding error: leaving incoming open. forcing solve. Please report this with BUILDKIT_SCHEDULER_DEBUG=1")
  317. debugSchedulerPreUnpark(e, incoming, updates, allPipes)
  318. e.createInputRequests(desiredState, f, true)
  319. }
  320. }
  321. }
  322. func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
  323. return ExportableCacheKey{
  324. CacheKey: k,
  325. Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache},
  326. }
  327. }
  328. func (e *edge) markFailed(f *pipeFactory, err error) {
  329. e.err = err
  330. e.postpone(f)
  331. }
  332. // processUpdate is called by unpark for every updated pipe request
  333. func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
  334. // response for cachemap request
  335. if upt == e.cacheMapReq && upt.Status().Completed {
  336. if err := upt.Status().Err; err != nil {
  337. e.cacheMapReq = nil
  338. if !upt.Status().Canceled && e.err == nil {
  339. e.err = err
  340. }
  341. } else {
  342. resp := upt.Status().Value.(*cacheMapResp)
  343. e.cacheMap = resp.CacheMap
  344. e.cacheMapDone = resp.complete
  345. e.cacheMapIndex++
  346. if len(e.deps) == 0 {
  347. e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
  348. if !e.op.IgnoreCache() {
  349. keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
  350. if err != nil {
  351. logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
  352. } else {
  353. for _, k := range keys {
  354. records, err := e.op.Cache().Records(k)
  355. if err != nil {
  356. logrus.Errorf("error receiving cache records: %v", err)
  357. continue
  358. }
  359. for _, r := range records {
  360. e.cacheRecords[r.ID] = r
  361. }
  362. e.keys = append(e.keys, e.makeExportable(k, records))
  363. }
  364. }
  365. }
  366. e.state = edgeStatusCacheSlow
  367. }
  368. if e.allDepsHaveKeys(false) {
  369. e.keysDidChange = true
  370. }
  371. // probe keys that were loaded before cache map
  372. for i, dep := range e.deps {
  373. e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
  374. e.checkDepMatchPossible(dep)
  375. }
  376. if !e.cacheMapDone {
  377. e.cacheMapReq = nil
  378. }
  379. }
  380. return true
  381. }
  382. // response for exec request
  383. if upt == e.execReq && upt.Status().Completed {
  384. if err := upt.Status().Err; err != nil {
  385. e.execReq = nil
  386. if e.execCacheLoad {
  387. for k := range e.cacheRecordsLoaded {
  388. delete(e.cacheRecords, k)
  389. }
  390. } else if !upt.Status().Canceled && e.err == nil {
  391. e.err = err
  392. }
  393. } else {
  394. e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
  395. e.state = edgeStatusComplete
  396. }
  397. return true
  398. }
  399. // response for requests to dependencies
  400. if dep, ok := e.depRequests[upt]; ok {
  401. if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
  402. if e.err == nil {
  403. e.err = err
  404. }
  405. dep.err = err
  406. }
  407. state := upt.Status().Value.(*edgeState)
  408. if len(dep.keys) < len(state.keys) {
  409. newKeys := state.keys[len(dep.keys):]
  410. if e.cacheMap != nil {
  411. e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
  412. dep.edgeState.keys = state.keys
  413. if e.allDepsHaveKeys(false) {
  414. e.keysDidChange = true
  415. }
  416. }
  417. depChanged = true
  418. }
  419. if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
  420. e.keysDidChange = true
  421. }
  422. recheck := state.state != dep.state
  423. dep.edgeState = *state
  424. if recheck && e.cacheMap != nil {
  425. e.checkDepMatchPossible(dep)
  426. depChanged = true
  427. }
  428. return
  429. }
  430. // response for result based cache function
  431. for i, dep := range e.deps {
  432. if upt == dep.slowCacheReq && upt.Status().Completed {
  433. if err := upt.Status().Err; err != nil {
  434. dep.slowCacheReq = nil
  435. if !upt.Status().Canceled && e.err == nil {
  436. e.err = upt.Status().Err
  437. }
  438. } else if !dep.slowCacheComplete {
  439. dgst := upt.Status().Value.(digest.Digest)
  440. if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
  441. k := NewCacheKey(dgst, -1)
  442. dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
  443. slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
  444. defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
  445. for _, dk := range dep.result.CacheKeys() {
  446. defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector})
  447. }
  448. dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})
  449. // connect def key to slow key
  450. e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
  451. }
  452. dep.slowCacheComplete = true
  453. e.keysDidChange = true
  454. e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
  455. }
  456. return true
  457. }
  458. }
  459. return
  460. }
  461. // recalcCurrentState is called by unpark to recompute internal state after
  462. // the state of dependencies has changed
  463. func (e *edge) recalcCurrentState() {
  464. // TODO: fast pass to detect incomplete results
  465. newKeys := map[string]*CacheKey{}
  466. for i, dep := range e.deps {
  467. if i == 0 {
  468. for id, k := range dep.keyMap {
  469. if _, ok := e.keyMap[id]; ok {
  470. continue
  471. }
  472. newKeys[id] = k
  473. }
  474. } else {
  475. for id := range newKeys {
  476. if _, ok := dep.keyMap[id]; !ok {
  477. delete(newKeys, id)
  478. }
  479. }
  480. }
  481. if len(newKeys) == 0 {
  482. break
  483. }
  484. }
  485. for key := range newKeys {
  486. e.keyMap[key] = struct{}{}
  487. }
  488. for _, r := range newKeys {
  489. // TODO: add all deps automatically
  490. mergedKey := r.clone()
  491. mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
  492. for i, dep := range e.deps {
  493. if dep.result != nil {
  494. for _, dk := range dep.result.CacheKeys() {
  495. mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk})
  496. }
  497. if dep.slowCacheKey != nil {
  498. mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
  499. }
  500. } else {
  501. for _, k := range dep.keys {
  502. mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
  503. }
  504. }
  505. }
  506. records, err := e.op.Cache().Records(mergedKey)
  507. if err != nil {
  508. logrus.Errorf("error receiving cache records: %v", err)
  509. continue
  510. }
  511. for _, r := range records {
  512. if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
  513. e.cacheRecords[r.ID] = r
  514. }
  515. }
  516. e.keys = append(e.keys, e.makeExportable(mergedKey, records))
  517. }
  518. // detect lower/upper bound for current state
  519. allDepsCompletedCacheFast := e.cacheMap != nil
  520. allDepsCompletedCacheSlow := e.cacheMap != nil
  521. allDepsStateCacheSlow := true
  522. allDepsCompleted := true
  523. stLow := edgeStatusInitial // minimal possible state
  524. stHigh := edgeStatusCacheSlow // maximum possible state
  525. if e.cacheMap != nil {
  526. for _, dep := range e.deps {
  527. isSlowCacheIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
  528. isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
  529. if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
  530. stLow = dep.state
  531. if stLow > edgeStatusCacheSlow {
  532. stLow = edgeStatusCacheSlow
  533. }
  534. }
  535. effectiveState := dep.state
  536. if dep.state == edgeStatusCacheSlow && isSlowCacheIncomplete {
  537. effectiveState = edgeStatusCacheFast
  538. }
  539. if dep.state == edgeStatusComplete && isSlowCacheIncomplete {
  540. effectiveState = edgeStatusCacheFast
  541. }
  542. if effectiveState < stHigh {
  543. stHigh = effectiveState
  544. }
  545. if isSlowIncomplete || dep.state < edgeStatusComplete {
  546. allDepsCompleted = false
  547. }
  548. if dep.state < edgeStatusCacheFast {
  549. allDepsCompletedCacheFast = false
  550. }
  551. if isSlowCacheIncomplete || dep.state < edgeStatusCacheSlow {
  552. allDepsCompletedCacheSlow = false
  553. }
  554. if dep.state < edgeStatusCacheSlow && len(dep.keyMap) == 0 {
  555. allDepsStateCacheSlow = false
  556. }
  557. }
  558. if stLow > e.state {
  559. e.state = stLow
  560. }
  561. if stHigh > e.state {
  562. e.state = stHigh
  563. }
  564. if !e.cacheMapDone && len(e.keys) == 0 {
  565. e.state = edgeStatusInitial
  566. }
  567. e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast
  568. e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow
  569. e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow
  570. e.allDepsCompleted = e.cacheMapDone && allDepsCompleted
  571. if e.allDepsStateCacheSlow && len(e.cacheRecords) > 0 && e.state == edgeStatusCacheFast {
  572. openKeys := map[string]struct{}{}
  573. for _, dep := range e.deps {
  574. isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
  575. if !isSlowIncomplete {
  576. openDepKeys := map[string]struct{}{}
  577. for key := range dep.keyMap {
  578. if _, ok := e.keyMap[key]; !ok {
  579. openDepKeys[key] = struct{}{}
  580. }
  581. }
  582. if len(openKeys) != 0 {
  583. for k := range openKeys {
  584. if _, ok := openDepKeys[k]; !ok {
  585. delete(openKeys, k)
  586. }
  587. }
  588. } else {
  589. openKeys = openDepKeys
  590. }
  591. if len(openKeys) == 0 {
  592. e.state = edgeStatusCacheSlow
  593. if debugScheduler {
  594. logrus.Debugf("upgrade to cache-slow because no open keys")
  595. }
  596. }
  597. }
  598. }
  599. }
  600. }
  601. }
  602. // respondToIncoming responds to all incoming requests. completing or
  603. // updating them when possible
  604. func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
  605. // detect the result state for the requests
  606. allIncomingCanComplete := true
  607. desiredState := e.state
  608. allCanceled := true
  609. // check incoming requests
  610. // check if all requests can be either answered or canceled
  611. if !e.isComplete() {
  612. for _, req := range incoming {
  613. if !req.Request().Canceled {
  614. allCanceled = false
  615. if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
  616. desiredState = r.desiredState
  617. if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
  618. allIncomingCanComplete = false
  619. }
  620. }
  621. }
  622. }
  623. }
  624. // do not set allIncomingCanComplete if active ongoing can modify the state
  625. if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing {
  626. allIncomingCanComplete = false
  627. }
  628. if debugScheduler {
  629. logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
  630. }
  631. if allIncomingCanComplete && e.hasActiveOutgoing {
  632. // cancel all current requests
  633. for _, p := range allPipes {
  634. p.Cancel()
  635. }
  636. // can close all but one requests
  637. var leaveOpen pipe.Sender
  638. for _, req := range incoming {
  639. if !req.Request().Canceled {
  640. leaveOpen = req
  641. break
  642. }
  643. }
  644. for _, req := range incoming {
  645. if leaveOpen == nil || leaveOpen == req {
  646. leaveOpen = req
  647. continue
  648. }
  649. e.finishIncoming(req)
  650. }
  651. return desiredState, true
  652. }
  653. // can complete, finish and return
  654. if allIncomingCanComplete && !e.hasActiveOutgoing {
  655. for _, req := range incoming {
  656. e.finishIncoming(req)
  657. }
  658. return desiredState, true
  659. }
  660. // update incoming based on current state
  661. for _, req := range incoming {
  662. r := req.Request().Payload.(*edgeRequest)
  663. if req.Request().Canceled {
  664. e.finishIncoming(req)
  665. } else if !e.hasActiveOutgoing && e.state >= r.desiredState {
  666. e.finishIncoming(req)
  667. } else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled {
  668. e.updateIncoming(req)
  669. }
  670. }
  671. return desiredState, false
  672. }
  673. // createInputRequests creates new requests for dependencies or async functions
  674. // that need to complete to continue processing the edge
  675. func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) bool {
  676. addedNew := false
  677. // initialize deps state
  678. if e.deps == nil {
  679. e.depRequests = make(map[pipe.Receiver]*dep)
  680. e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
  681. for i := range e.edge.Vertex.Inputs() {
  682. e.deps = append(e.deps, newDep(Index(i)))
  683. }
  684. }
  685. // cycle all dependencies. set up outgoing requests if needed
  686. for _, dep := range e.deps {
  687. desiredStateDep := dep.state
  688. if e.noCacheMatchPossible || force {
  689. desiredStateDep = edgeStatusComplete
  690. } else if dep.state == edgeStatusInitial && desiredState > dep.state {
  691. desiredStateDep = edgeStatusCacheFast
  692. } else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
  693. // wait all deps to complete cache fast before continuing with slow cache
  694. if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) {
  695. if !e.skipPhase2FastCache(dep) && e.cacheMap != nil {
  696. desiredStateDep = edgeStatusCacheSlow
  697. }
  698. }
  699. } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
  700. // if all deps have completed cache-slow or content based cache for input is available
  701. if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) {
  702. if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
  703. desiredStateDep = edgeStatusComplete
  704. }
  705. }
  706. } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
  707. if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
  708. desiredStateDep = edgeStatusComplete
  709. }
  710. }
  711. // outgoing request is needed
  712. if dep.state < desiredStateDep {
  713. addNew := true
  714. if dep.req != nil && !dep.req.Status().Completed {
  715. if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
  716. dep.req.Cancel()
  717. } else {
  718. addNew = false
  719. }
  720. }
  721. if addNew {
  722. req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
  723. currentState: dep.edgeState,
  724. desiredState: desiredStateDep,
  725. currentKeys: len(dep.keys),
  726. })
  727. e.depRequests[req] = dep
  728. dep.req = req
  729. addedNew = true
  730. }
  731. }
  732. // initialize function to compute cache key based on dependency result
  733. if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil {
  734. pfn := e.preprocessFunc(dep)
  735. fn := e.slowCacheFunc(dep)
  736. res := dep.result
  737. func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) {
  738. dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
  739. v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res)
  740. return v, errors.Wrap(err, "failed to compute cache key")
  741. })
  742. }(pfn, fn, res, dep.index)
  743. addedNew = true
  744. }
  745. }
  746. return addedNew
  747. }
  748. // execIfPossible creates a request for getting the edge result if there is
  749. // enough state
  750. func (e *edge) execIfPossible(f *pipeFactory) bool {
  751. if len(e.cacheRecords) > 0 {
  752. if e.keysDidChange {
  753. e.postpone(f)
  754. return true
  755. }
  756. e.execReq = f.NewFuncRequest(e.loadCache)
  757. e.execCacheLoad = true
  758. for req := range e.depRequests {
  759. req.Cancel()
  760. }
  761. return true
  762. } else if e.allDepsCompleted {
  763. if e.keysDidChange {
  764. e.postpone(f)
  765. return true
  766. }
  767. e.execReq = f.NewFuncRequest(e.execOp)
  768. e.execCacheLoad = false
  769. return true
  770. }
  771. return false
  772. }
  773. // postpone delays exec to next unpark invocation if we have unprocessed keys
  774. func (e *edge) postpone(f *pipeFactory) {
  775. f.NewFuncRequest(func(context.Context) (interface{}, error) {
  776. return nil, nil
  777. })
  778. }
  779. // loadCache creates a request to load edge result from cache
  780. func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
  781. recs := make([]*CacheRecord, 0, len(e.cacheRecords))
  782. for _, r := range e.cacheRecords {
  783. recs = append(recs, r)
  784. }
  785. rec := getBestResult(recs)
  786. e.cacheRecordsLoaded[rec.ID] = struct{}{}
  787. logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
  788. res, err := e.op.LoadCache(ctx, rec)
  789. if err != nil {
  790. logrus.Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err)
  791. return nil, errors.Wrap(err, "failed to load cache")
  792. }
  793. return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil
  794. }
  795. // execOp creates a request to execute the vertex operation
  796. func (e *edge) execOp(ctx context.Context) (interface{}, error) {
  797. cacheKeys, inputs := e.commitOptions()
  798. results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
  799. if err != nil {
  800. return nil, errors.WithStack(err)
  801. }
  802. index := e.edge.Index
  803. if len(results) <= int(index) {
  804. return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results))
  805. }
  806. res := results[int(index)]
  807. for i := range results {
  808. if i != int(index) {
  809. go results[i].Release(context.TODO())
  810. }
  811. }
  812. var exporters []CacheExporter
  813. for _, cacheKey := range cacheKeys {
  814. ck, err := e.op.Cache().Save(cacheKey, res, time.Now())
  815. if err != nil {
  816. return nil, err
  817. }
  818. if exp, ok := ck.Exporter.(*exporter); ok {
  819. exp.edge = e
  820. }
  821. exps := make([]CacheExporter, 0, len(subExporters))
  822. for _, exp := range subExporters {
  823. exps = append(exps, exp.Exporter)
  824. }
  825. exporters = append(exporters, ck.Exporter)
  826. exporters = append(exporters, exps...)
  827. }
  828. ek := make([]ExportableCacheKey, 0, len(cacheKeys))
  829. for _, ck := range cacheKeys {
  830. ek = append(ek, ExportableCacheKey{
  831. CacheKey: ck,
  832. Exporter: &mergedExporter{exporters: exporters},
  833. })
  834. }
  835. return NewCachedResult(res, ek), nil
  836. }
  837. func toResultSlice(cres []CachedResult) (out []Result) {
  838. out = make([]Result, len(cres))
  839. for i := range cres {
  840. out[i] = cres[i].(Result)
  841. }
  842. return out
  843. }
  844. func isEqualState(s1, s2 edgeState) bool {
  845. if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) {
  846. return false
  847. }
  848. return true
  849. }
  850. func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector {
  851. out := make([]CacheKeyWithSelector, len(keys))
  852. for i, k := range keys {
  853. out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k}
  854. }
  855. return out
  856. }