|
@@ -19,6 +19,7 @@ package task
|
|
|
import (
|
|
|
"context"
|
|
|
"reflect"
|
|
|
+ "slices"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
@@ -37,23 +38,24 @@ type Task struct {
|
|
|
Handler reflect.Value
|
|
|
Args []interface{}
|
|
|
Created time.Time
|
|
|
+ Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
|
|
|
Delay time.Duration
|
|
|
Timeout time.Duration
|
|
|
}
|
|
|
|
|
|
func AppendTask(action string, handler interface{}, args ...interface{}) {
|
|
|
- appendTaskWithDelayTimeout(action, 0, 24*time.Hour, handler, args...)
|
|
|
+ appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
|
|
|
}
|
|
|
|
|
|
-func AppendTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
|
|
|
- appendTaskWithDelayTimeout(action, delay, 24*time.Hour, handler, args...)
|
|
|
+func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
|
|
|
+ appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
|
|
|
}
|
|
|
|
|
|
func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
|
|
|
- appendTaskWithDelayTimeout(action, 0, timeout, handler, args...)
|
|
|
+ appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
|
|
|
}
|
|
|
|
|
|
-func appendTaskWithDelayTimeout(action string, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
|
|
|
+func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
|
|
|
if util.IsExiting.Load() {
|
|
|
//logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
|
|
|
return
|
|
@@ -64,6 +66,7 @@ func appendTaskWithDelayTimeout(action string, delay, timeout time.Duration, han
|
|
|
Handler: reflect.ValueOf(handler),
|
|
|
Args: args,
|
|
|
Created: time.Now(),
|
|
|
+ Async: async,
|
|
|
Delay: delay,
|
|
|
Timeout: timeout,
|
|
|
}
|
|
@@ -175,13 +178,11 @@ func StatusJob() {
|
|
|
if nil != actionLangs {
|
|
|
if label := actionLangs[task.Action]; nil != label {
|
|
|
action = label.(string)
|
|
|
+ } else {
|
|
|
+ continue
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if "" == action {
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
item := map[string]interface{}{"action": action}
|
|
|
items = append(items, item)
|
|
|
}
|
|
@@ -204,8 +205,8 @@ func StatusJob() {
|
|
|
}
|
|
|
|
|
|
func ExecTaskJob() {
|
|
|
- task := popTask()
|
|
|
- if nil == task {
|
|
|
+ syncTask, asyncTasks := popTasks()
|
|
|
+ if nil == syncTask && 1 > len(asyncTasks) {
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -213,10 +214,18 @@ func ExecTaskJob() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- execTask(task)
|
|
|
+ for _, asyncTask := range asyncTasks {
|
|
|
+ go func() {
|
|
|
+ execTask(asyncTask)
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ if nil != syncTask {
|
|
|
+ execTask(syncTask)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func popTask() (ret *Task) {
|
|
|
+func popTasks() (syncTask *Task, asyncTasks []*Task) {
|
|
|
queueLock.Lock()
|
|
|
defer queueLock.Unlock()
|
|
|
|
|
@@ -224,13 +233,32 @@ func popTask() (ret *Task) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ var popedIndexes []int
|
|
|
for i, task := range taskQueue {
|
|
|
- if time.Since(task.Created) > task.Delay {
|
|
|
- ret = task
|
|
|
- taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
|
|
|
- return
|
|
|
+ if time.Since(task.Created) <= task.Delay {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if task.Async {
|
|
|
+ asyncTasks = append(asyncTasks, task)
|
|
|
+ popedIndexes = append(popedIndexes, i)
|
|
|
+ } else {
|
|
|
+ if nil == syncTask {
|
|
|
+ syncTask = task
|
|
|
+ popedIndexes = append(popedIndexes, i)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if 0 < len(popedIndexes) {
|
|
|
+ var newQueue []*Task
|
|
|
+ for i, task := range taskQueue {
|
|
|
+ if !slices.Contains(popedIndexes, i) {
|
|
|
+ newQueue = append(newQueue, task)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ taskQueue = newQueue
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -240,6 +268,10 @@ var (
|
|
|
)
|
|
|
|
|
|
func execTask(task *Task) {
|
|
|
+ if nil == task {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
defer logging.Recover()
|
|
|
|
|
|
args := make([]reflect.Value, len(task.Args))
|
|
@@ -251,9 +283,11 @@ func execTask(task *Task) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- currentTaskLock.Lock()
|
|
|
- currentTask = task
|
|
|
- currentTaskLock.Unlock()
|
|
|
+ if !task.Async {
|
|
|
+ currentTaskLock.Lock()
|
|
|
+ currentTask = task
|
|
|
+ currentTaskLock.Unlock()
|
|
|
+ }
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
|
|
|
defer cancel()
|
|
@@ -267,10 +301,12 @@ func execTask(task *Task) {
|
|
|
case <-ctx.Done():
|
|
|
logging.LogWarnf("task [%s] timeout", task.Action)
|
|
|
case <-ch:
|
|
|
- //logging.LogInfof("task [%s] done", task.Action)
|
|
|
+ logging.LogInfof("task [%s] done", task.Action)
|
|
|
}
|
|
|
|
|
|
- currentTaskLock.Lock()
|
|
|
- currentTask = nil
|
|
|
- currentTaskLock.Unlock()
|
|
|
+ if !task.Async {
|
|
|
+ currentTaskLock.Lock()
|
|
|
+ currentTask = nil
|
|
|
+ currentTaskLock.Unlock()
|
|
|
+ }
|
|
|
}
|