123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963 |
- package solver
- import (
- "context"
- "sync"
- "time"
- "github.com/moby/buildkit/solver/internal/pipe"
- digest "github.com/opencontainers/go-digest"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- )
- type edgeStatusType int
- const (
- edgeStatusInitial edgeStatusType = iota
- edgeStatusCacheFast
- edgeStatusCacheSlow
- edgeStatusComplete
- )
- func (t edgeStatusType) String() string {
- return []string{"initial", "cache-fast", "cache-slow", "complete"}[t]
- }
- func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
- e := &edge{
- edge: ed,
- op: op,
- depRequests: map[pipe.Receiver]*dep{},
- keyMap: map[string]struct{}{},
- cacheRecords: map[string]*CacheRecord{},
- cacheRecordsLoaded: map[string]struct{}{},
- index: index,
- }
- return e
- }
- type edge struct {
- edge Edge
- op activeOp
- edgeState
- depRequests map[pipe.Receiver]*dep
- deps []*dep
- cacheMapReq pipe.Receiver
- cacheMapDone bool
- cacheMapIndex int
- cacheMapDigests []digest.Digest
- execReq pipe.Receiver
- execCacheLoad bool
- err error
- cacheRecords map[string]*CacheRecord
- cacheRecordsLoaded map[string]struct{}
- keyMap map[string]struct{}
- noCacheMatchPossible bool
- allDepsCompletedCacheFast bool
- allDepsCompletedCacheSlow bool
- allDepsStateCacheSlow bool
- allDepsCompleted bool
- hasActiveOutgoing bool
- releaserCount int
- keysDidChange bool
- index *edgeIndex
- secondaryExporters []expDep
- }
- // dep holds state for a dependant edge
- type dep struct {
- req pipe.Receiver
- edgeState
- index Index
- keyMap map[string]*CacheKey
- slowCacheReq pipe.Receiver
- slowCacheComplete bool
- slowCacheFoundKey bool
- slowCacheKey *ExportableCacheKey
- err error
- }
- // expDep holds secorndary exporter info for dependency
- type expDep struct {
- index int
- cacheKey CacheKeyWithSelector
- }
- func newDep(i Index) *dep {
- return &dep{index: i, keyMap: map[string]*CacheKey{}}
- }
- // edgePipe is a pipe for requests between two edges
- type edgePipe struct {
- *pipe.Pipe
- From, Target *edge
- mu sync.Mutex
- }
- // edgeState hold basic mutable state info for an edge
- type edgeState struct {
- state edgeStatusType
- result *SharedCachedResult
- cacheMap *CacheMap
- keys []ExportableCacheKey
- }
- type edgeRequest struct {
- desiredState edgeStatusType
- currentState edgeState
- currentKeys int
- }
- // incrementReferenceCount increases the number of times release needs to be
- // called to release the edge. Called on merging edges.
- func (e *edge) incrementReferenceCount() {
- e.releaserCount++
- }
- // release releases the edge resources
- func (e *edge) release() {
- if e.releaserCount > 0 {
- e.releaserCount--
- return
- }
- e.index.Release(e)
- if e.result != nil {
- go e.result.Release(context.TODO())
- }
- }
- // commitOptions returns parameters for the op execution
- func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) {
- k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
- if len(e.deps) == 0 {
- keys := make([]*CacheKey, 0, len(e.cacheMapDigests))
- for _, dgst := range e.cacheMapDigests {
- keys = append(keys, NewCacheKey(dgst, e.edge.Index))
- }
- return keys, nil
- }
- inputs := make([][]CacheKeyWithSelector, len(e.deps))
- results := make([]CachedResult, len(e.deps))
- for i, dep := range e.deps {
- for _, k := range dep.result.CacheKeys() {
- inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector})
- }
- if dep.slowCacheKey != nil {
- inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
- }
- results[i] = dep.result
- }
- k.deps = inputs
- return []*CacheKey{k}, results
- }
- // isComplete returns true if edge state is final and will never change
- func (e *edge) isComplete() bool {
- return e.err != nil || e.result != nil
- }
- // finishIncoming finalizes the incoming pipe request
- func (e *edge) finishIncoming(req pipe.Sender) {
- err := e.err
- if req.Request().Canceled && err == nil {
- err = context.Canceled
- }
- if debugScheduler {
- logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
- }
- req.Finalize(&e.edgeState, err)
- }
- // updateIncoming updates the current value of incoming pipe request
- func (e *edge) updateIncoming(req pipe.Sender) {
- if debugScheduler {
- logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
- }
- req.Update(&e.edgeState)
- }
- // probeCache is called with unprocessed cache keys for dependency
- // if the key could match the edge, the cacheRecords for dependency are filled
- func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
- if len(depKeys) == 0 {
- return false
- }
- if e.op.IgnoreCache() {
- return false
- }
- keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
- if err != nil {
- e.err = errors.Wrap(err, "error on cache query")
- }
- found := false
- for _, k := range keys {
- if _, ok := d.keyMap[k.ID]; !ok {
- d.keyMap[k.ID] = k
- found = true
- }
- }
- return found
- }
- // checkDepMatchPossible checks if any cache matches are possible past this point
- func (e *edge) checkDepMatchPossible(dep *dep) {
- depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
- if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keyMap) == 0) {
- e.noCacheMatchPossible = true
- }
- }
- // slowCacheFunc returns the result based cache func for dependency if it exists
- func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
- if e.cacheMap == nil {
- return nil
- }
- return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
- }
- // preprocessFunc returns result based cache func
- func (e *edge) preprocessFunc(dep *dep) PreprocessFunc {
- if e.cacheMap == nil {
- return nil
- }
- return e.cacheMap.Deps[int(dep.index)].PreprocessFunc
- }
- // allDepsHaveKeys checks if all dependencies have at least one key. used for
- // determining if there is enough data for combining cache key for edge
- func (e *edge) allDepsHaveKeys(matching bool) bool {
- if e.cacheMap == nil {
- return false
- }
- for _, d := range e.deps {
- cond := len(d.keys) == 0
- if matching {
- cond = len(d.keyMap) == 0
- }
- if cond && d.slowCacheKey == nil && d.result == nil {
- return false
- }
- }
- return true
- }
- // depKeys returns all current dependency cache keys
- func (e *edge) currentIndexKey() *CacheKey {
- if e.cacheMap == nil {
- return nil
- }
- keys := make([][]CacheKeyWithSelector, len(e.deps))
- for i, d := range e.deps {
- if len(d.keys) == 0 && d.result == nil {
- return nil
- }
- for _, k := range d.keys {
- keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
- }
- if d.result != nil {
- for _, rk := range d.result.CacheKeys() {
- keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk})
- }
- if d.slowCacheKey != nil {
- keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
- }
- }
- }
- k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
- k.deps = keys
- return k
- }
- // slow cache keys can be computed in 2 phases if there are multiple deps.
- // first evaluate ones that didn't match any definition based keys
- func (e *edge) skipPhase2SlowCache(dep *dep) bool {
- isPhase1 := false
- for _, dep := range e.deps {
- if (!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil || dep.state < edgeStatusCacheSlow) && len(dep.keyMap) == 0 {
- isPhase1 = true
- break
- }
- }
- if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
- return true
- }
- return false
- }
- func (e *edge) skipPhase2FastCache(dep *dep) bool {
- isPhase1 := false
- for _, dep := range e.deps {
- if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
- isPhase1 = true
- break
- }
- }
- if isPhase1 && len(dep.keyMap) > 0 {
- return true
- }
- return false
- }
- // unpark is called by the scheduler with incoming requests and updates for
- // previous calls.
- // To avoid deadlocks and resource leaks this function needs to follow
- // following rules:
- // 1) this function needs to return unclosed outgoing requests if some incoming
- // requests were not completed
- // 2) this function may not return outgoing requests if it has completed all
- // incoming requests
- func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
- // process all incoming changes
- depChanged := false
- for _, upt := range updates {
- if changed := e.processUpdate(upt); changed {
- depChanged = true
- }
- }
- if depChanged {
- // the dep responses had changes. need to reevaluate edge state
- e.recalcCurrentState()
- }
- desiredState, done := e.respondToIncoming(incoming, allPipes)
- if done {
- return
- }
- cacheMapReq := false
- // set up new outgoing requests if needed
- if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) {
- index := e.cacheMapIndex
- e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
- cm, err := e.op.CacheMap(ctx, index)
- return cm, errors.Wrap(err, "failed to load cache key")
- })
- cacheMapReq = true
- }
- // execute op
- if e.execReq == nil && desiredState == edgeStatusComplete {
- if ok := e.execIfPossible(f); ok {
- return
- }
- }
- if e.execReq == nil {
- if added := e.createInputRequests(desiredState, f, false); !added && !e.hasActiveOutgoing && !cacheMapReq {
- logrus.Errorf("buildkit scheluding error: leaving incoming open. forcing solve. Please report this with BUILDKIT_SCHEDULER_DEBUG=1")
- debugSchedulerPreUnpark(e, incoming, updates, allPipes)
- e.createInputRequests(desiredState, f, true)
- }
- }
- }
- func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
- return ExportableCacheKey{
- CacheKey: k,
- Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache},
- }
- }
- func (e *edge) markFailed(f *pipeFactory, err error) {
- e.err = err
- e.postpone(f)
- }
- // processUpdate is called by unpark for every updated pipe request
- func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
- // response for cachemap request
- if upt == e.cacheMapReq && upt.Status().Completed {
- if err := upt.Status().Err; err != nil {
- e.cacheMapReq = nil
- if !upt.Status().Canceled && e.err == nil {
- e.err = err
- }
- } else {
- resp := upt.Status().Value.(*cacheMapResp)
- e.cacheMap = resp.CacheMap
- e.cacheMapDone = resp.complete
- e.cacheMapIndex++
- if len(e.deps) == 0 {
- e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
- if !e.op.IgnoreCache() {
- keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
- if err != nil {
- logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
- } else {
- for _, k := range keys {
- records, err := e.op.Cache().Records(k)
- if err != nil {
- logrus.Errorf("error receiving cache records: %v", err)
- continue
- }
- for _, r := range records {
- e.cacheRecords[r.ID] = r
- }
- e.keys = append(e.keys, e.makeExportable(k, records))
- }
- }
- }
- e.state = edgeStatusCacheSlow
- }
- if e.allDepsHaveKeys(false) {
- e.keysDidChange = true
- }
- // probe keys that were loaded before cache map
- for i, dep := range e.deps {
- e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
- e.checkDepMatchPossible(dep)
- }
- if !e.cacheMapDone {
- e.cacheMapReq = nil
- }
- }
- return true
- }
- // response for exec request
- if upt == e.execReq && upt.Status().Completed {
- if err := upt.Status().Err; err != nil {
- e.execReq = nil
- if e.execCacheLoad {
- for k := range e.cacheRecordsLoaded {
- delete(e.cacheRecords, k)
- }
- } else if !upt.Status().Canceled && e.err == nil {
- e.err = err
- }
- } else {
- e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
- e.state = edgeStatusComplete
- }
- return true
- }
- // response for requests to dependencies
- if dep, ok := e.depRequests[upt]; ok {
- if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
- if e.err == nil {
- e.err = err
- }
- dep.err = err
- }
- state := upt.Status().Value.(*edgeState)
- if len(dep.keys) < len(state.keys) {
- newKeys := state.keys[len(dep.keys):]
- if e.cacheMap != nil {
- e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
- dep.edgeState.keys = state.keys
- if e.allDepsHaveKeys(false) {
- e.keysDidChange = true
- }
- }
- depChanged = true
- }
- if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
- e.keysDidChange = true
- }
- recheck := state.state != dep.state
- dep.edgeState = *state
- if recheck && e.cacheMap != nil {
- e.checkDepMatchPossible(dep)
- depChanged = true
- }
- return
- }
- // response for result based cache function
- for i, dep := range e.deps {
- if upt == dep.slowCacheReq && upt.Status().Completed {
- if err := upt.Status().Err; err != nil {
- dep.slowCacheReq = nil
- if !upt.Status().Canceled && e.err == nil {
- e.err = upt.Status().Err
- }
- } else if !dep.slowCacheComplete {
- dgst := upt.Status().Value.(digest.Digest)
- if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
- k := NewCacheKey(dgst, -1)
- dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
- slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
- defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
- for _, dk := range dep.result.CacheKeys() {
- defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector})
- }
- dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})
- // connect def key to slow key
- e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
- }
- dep.slowCacheComplete = true
- e.keysDidChange = true
- e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
- }
- return true
- }
- }
- return
- }
- // recalcCurrentState is called by unpark to recompute internal state after
- // the state of dependencies has changed
- func (e *edge) recalcCurrentState() {
- // TODO: fast pass to detect incomplete results
- newKeys := map[string]*CacheKey{}
- for i, dep := range e.deps {
- if i == 0 {
- for id, k := range dep.keyMap {
- if _, ok := e.keyMap[id]; ok {
- continue
- }
- newKeys[id] = k
- }
- } else {
- for id := range newKeys {
- if _, ok := dep.keyMap[id]; !ok {
- delete(newKeys, id)
- }
- }
- }
- if len(newKeys) == 0 {
- break
- }
- }
- for key := range newKeys {
- e.keyMap[key] = struct{}{}
- }
- for _, r := range newKeys {
- // TODO: add all deps automatically
- mergedKey := r.clone()
- mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
- for i, dep := range e.deps {
- if dep.result != nil {
- for _, dk := range dep.result.CacheKeys() {
- mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk})
- }
- if dep.slowCacheKey != nil {
- mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
- }
- } else {
- for _, k := range dep.keys {
- mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
- }
- }
- }
- records, err := e.op.Cache().Records(mergedKey)
- if err != nil {
- logrus.Errorf("error receiving cache records: %v", err)
- continue
- }
- for _, r := range records {
- if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
- e.cacheRecords[r.ID] = r
- }
- }
- e.keys = append(e.keys, e.makeExportable(mergedKey, records))
- }
- // detect lower/upper bound for current state
- allDepsCompletedCacheFast := e.cacheMap != nil
- allDepsCompletedCacheSlow := e.cacheMap != nil
- allDepsStateCacheSlow := true
- allDepsCompleted := true
- stLow := edgeStatusInitial // minimal possible state
- stHigh := edgeStatusCacheSlow // maximum possible state
- if e.cacheMap != nil {
- for _, dep := range e.deps {
- isSlowCacheIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
- isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
- if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
- stLow = dep.state
- if stLow > edgeStatusCacheSlow {
- stLow = edgeStatusCacheSlow
- }
- }
- effectiveState := dep.state
- if dep.state == edgeStatusCacheSlow && isSlowCacheIncomplete {
- effectiveState = edgeStatusCacheFast
- }
- if dep.state == edgeStatusComplete && isSlowCacheIncomplete {
- effectiveState = edgeStatusCacheFast
- }
- if effectiveState < stHigh {
- stHigh = effectiveState
- }
- if isSlowIncomplete || dep.state < edgeStatusComplete {
- allDepsCompleted = false
- }
- if dep.state < edgeStatusCacheFast {
- allDepsCompletedCacheFast = false
- }
- if isSlowCacheIncomplete || dep.state < edgeStatusCacheSlow {
- allDepsCompletedCacheSlow = false
- }
- if dep.state < edgeStatusCacheSlow && len(dep.keyMap) == 0 {
- allDepsStateCacheSlow = false
- }
- }
- if stLow > e.state {
- e.state = stLow
- }
- if stHigh > e.state {
- e.state = stHigh
- }
- if !e.cacheMapDone && len(e.keys) == 0 {
- e.state = edgeStatusInitial
- }
- e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast
- e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow
- e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow
- e.allDepsCompleted = e.cacheMapDone && allDepsCompleted
- if e.allDepsStateCacheSlow && len(e.cacheRecords) > 0 && e.state == edgeStatusCacheFast {
- openKeys := map[string]struct{}{}
- for _, dep := range e.deps {
- isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
- if !isSlowIncomplete {
- openDepKeys := map[string]struct{}{}
- for key := range dep.keyMap {
- if _, ok := e.keyMap[key]; !ok {
- openDepKeys[key] = struct{}{}
- }
- }
- if len(openKeys) != 0 {
- for k := range openKeys {
- if _, ok := openDepKeys[k]; !ok {
- delete(openKeys, k)
- }
- }
- } else {
- openKeys = openDepKeys
- }
- if len(openKeys) == 0 {
- e.state = edgeStatusCacheSlow
- if debugScheduler {
- logrus.Debugf("upgrade to cache-slow because no open keys")
- }
- }
- }
- }
- }
- }
- }
- // respondToIncoming responds to all incoming requests. completing or
- // updating them when possible
- func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
- // detect the result state for the requests
- allIncomingCanComplete := true
- desiredState := e.state
- allCanceled := true
- // check incoming requests
- // check if all requests can be either answered or canceled
- if !e.isComplete() {
- for _, req := range incoming {
- if !req.Request().Canceled {
- allCanceled = false
- if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
- desiredState = r.desiredState
- if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
- allIncomingCanComplete = false
- }
- }
- }
- }
- }
- // do not set allIncomingCanComplete if active ongoing can modify the state
- if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing {
- allIncomingCanComplete = false
- }
- if debugScheduler {
- logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
- }
- if allIncomingCanComplete && e.hasActiveOutgoing {
- // cancel all current requests
- for _, p := range allPipes {
- p.Cancel()
- }
- // can close all but one requests
- var leaveOpen pipe.Sender
- for _, req := range incoming {
- if !req.Request().Canceled {
- leaveOpen = req
- break
- }
- }
- for _, req := range incoming {
- if leaveOpen == nil || leaveOpen == req {
- leaveOpen = req
- continue
- }
- e.finishIncoming(req)
- }
- return desiredState, true
- }
- // can complete, finish and return
- if allIncomingCanComplete && !e.hasActiveOutgoing {
- for _, req := range incoming {
- e.finishIncoming(req)
- }
- return desiredState, true
- }
- // update incoming based on current state
- for _, req := range incoming {
- r := req.Request().Payload.(*edgeRequest)
- if req.Request().Canceled {
- e.finishIncoming(req)
- } else if !e.hasActiveOutgoing && e.state >= r.desiredState {
- e.finishIncoming(req)
- } else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled {
- e.updateIncoming(req)
- }
- }
- return desiredState, false
- }
- // createInputRequests creates new requests for dependencies or async functions
- // that need to complete to continue processing the edge
- func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) bool {
- addedNew := false
- // initialize deps state
- if e.deps == nil {
- e.depRequests = make(map[pipe.Receiver]*dep)
- e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
- for i := range e.edge.Vertex.Inputs() {
- e.deps = append(e.deps, newDep(Index(i)))
- }
- }
- // cycle all dependencies. set up outgoing requests if needed
- for _, dep := range e.deps {
- desiredStateDep := dep.state
- if e.noCacheMatchPossible || force {
- desiredStateDep = edgeStatusComplete
- } else if dep.state == edgeStatusInitial && desiredState > dep.state {
- desiredStateDep = edgeStatusCacheFast
- } else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
- // wait all deps to complete cache fast before continuing with slow cache
- if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) {
- if !e.skipPhase2FastCache(dep) && e.cacheMap != nil {
- desiredStateDep = edgeStatusCacheSlow
- }
- }
- } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
- // if all deps have completed cache-slow or content based cache for input is available
- if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) {
- if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
- desiredStateDep = edgeStatusComplete
- }
- }
- } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
- if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
- desiredStateDep = edgeStatusComplete
- }
- }
- // outgoing request is needed
- if dep.state < desiredStateDep {
- addNew := true
- if dep.req != nil && !dep.req.Status().Completed {
- if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
- dep.req.Cancel()
- } else {
- addNew = false
- }
- }
- if addNew {
- req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
- currentState: dep.edgeState,
- desiredState: desiredStateDep,
- currentKeys: len(dep.keys),
- })
- e.depRequests[req] = dep
- dep.req = req
- addedNew = true
- }
- }
- // initialize function to compute cache key based on dependency result
- if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil {
- pfn := e.preprocessFunc(dep)
- fn := e.slowCacheFunc(dep)
- res := dep.result
- func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) {
- dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
- v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res)
- return v, errors.Wrap(err, "failed to compute cache key")
- })
- }(pfn, fn, res, dep.index)
- addedNew = true
- }
- }
- return addedNew
- }
- // execIfPossible creates a request for getting the edge result if there is
- // enough state
- func (e *edge) execIfPossible(f *pipeFactory) bool {
- if len(e.cacheRecords) > 0 {
- if e.keysDidChange {
- e.postpone(f)
- return true
- }
- e.execReq = f.NewFuncRequest(e.loadCache)
- e.execCacheLoad = true
- for req := range e.depRequests {
- req.Cancel()
- }
- return true
- } else if e.allDepsCompleted {
- if e.keysDidChange {
- e.postpone(f)
- return true
- }
- e.execReq = f.NewFuncRequest(e.execOp)
- e.execCacheLoad = false
- return true
- }
- return false
- }
- // postpone delays exec to next unpark invocation if we have unprocessed keys
- func (e *edge) postpone(f *pipeFactory) {
- f.NewFuncRequest(func(context.Context) (interface{}, error) {
- return nil, nil
- })
- }
- // loadCache creates a request to load edge result from cache
- func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
- recs := make([]*CacheRecord, 0, len(e.cacheRecords))
- for _, r := range e.cacheRecords {
- recs = append(recs, r)
- }
- rec := getBestResult(recs)
- e.cacheRecordsLoaded[rec.ID] = struct{}{}
- logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
- res, err := e.op.LoadCache(ctx, rec)
- if err != nil {
- logrus.Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err)
- return nil, errors.Wrap(err, "failed to load cache")
- }
- return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil
- }
- // execOp creates a request to execute the vertex operation
- func (e *edge) execOp(ctx context.Context) (interface{}, error) {
- cacheKeys, inputs := e.commitOptions()
- results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
- if err != nil {
- return nil, errors.WithStack(err)
- }
- index := e.edge.Index
- if len(results) <= int(index) {
- return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results))
- }
- res := results[int(index)]
- for i := range results {
- if i != int(index) {
- go results[i].Release(context.TODO())
- }
- }
- var exporters []CacheExporter
- for _, cacheKey := range cacheKeys {
- ck, err := e.op.Cache().Save(cacheKey, res, time.Now())
- if err != nil {
- return nil, err
- }
- if exp, ok := ck.Exporter.(*exporter); ok {
- exp.edge = e
- }
- exps := make([]CacheExporter, 0, len(subExporters))
- for _, exp := range subExporters {
- exps = append(exps, exp.Exporter)
- }
- exporters = append(exporters, ck.Exporter)
- exporters = append(exporters, exps...)
- }
- ek := make([]ExportableCacheKey, 0, len(cacheKeys))
- for _, ck := range cacheKeys {
- ek = append(ek, ExportableCacheKey{
- CacheKey: ck,
- Exporter: &mergedExporter{exporters: exporters},
- })
- }
- return NewCachedResult(res, ek), nil
- }
- func toResultSlice(cres []CachedResult) (out []Result) {
- out = make([]Result, len(cres))
- for i := range cres {
- out[i] = cres[i].(Result)
- }
- return out
- }
- func isEqualState(s1, s2 edgeState) bool {
- if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) {
- return false
- }
- return true
- }
- func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector {
- out := make([]CacheKeyWithSelector, len(keys))
- for i, k := range keys {
- out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k}
- }
- return out
- }
|