|
@@ -130,9 +130,13 @@ func (s *scheduler) dispatch(e *edge) {
|
|
|
pf := &pipeFactory{s: s, e: e}
|
|
|
|
|
|
// unpark the edge
|
|
|
- debugSchedulerPreUnpark(e, inc, updates, out)
|
|
|
+ if debugScheduler {
|
|
|
+ debugSchedulerPreUnpark(e, inc, updates, out)
|
|
|
+ }
|
|
|
e.unpark(inc, updates, out, pf)
|
|
|
- debugSchedulerPostUnpark(e, inc)
|
|
|
+ if debugScheduler {
|
|
|
+ debugSchedulerPostUnpark(e, inc)
|
|
|
+ }
|
|
|
|
|
|
postUnpark:
|
|
|
// set up new requests that didn't complete/were added by this run
|
|
@@ -361,9 +365,6 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro
|
|
|
}
|
|
|
|
|
|
func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) {
|
|
|
- if !debugScheduler {
|
|
|
- return
|
|
|
- }
|
|
|
logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest())
|
|
|
|
|
|
for i, dep := range e.deps {
|
|
@@ -371,7 +372,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
|
|
|
if dep.req != nil {
|
|
|
des = dep.req.Request().(*edgeRequest).desiredState
|
|
|
}
|
|
|
- logrus.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil)
|
|
|
+ logrus.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v preprocessfunc=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil, e.preprocessFunc(dep) != nil)
|
|
|
}
|
|
|
|
|
|
for i, in := range inc {
|
|
@@ -400,9 +401,6 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
|
|
|
}
|
|
|
|
|
|
func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) {
|
|
|
- if !debugScheduler {
|
|
|
- return
|
|
|
- }
|
|
|
for i, in := range inc {
|
|
|
logrus.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed)
|
|
|
}
|