queue.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "slices"
  21. "sync"
  22. "time"
  23. "github.com/88250/gulu"
  24. "github.com/siyuan-note/logging"
  25. "github.com/siyuan-note/siyuan/kernel/util"
  26. )
  27. var (
  28. taskQueue []*Task
  29. queueLock = sync.Mutex{}
  30. )
  31. type Task struct {
  32. Action string
  33. Handler reflect.Value
  34. Args []interface{}
  35. Created time.Time
  36. Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
  37. Delay time.Duration
  38. Timeout time.Duration
  39. }
  40. func AppendTask(action string, handler interface{}, args ...interface{}) {
  41. appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
  42. }
  43. func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
  44. appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
  45. }
  46. func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
  47. appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
  48. }
  49. func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
  50. if util.IsExiting.Load() {
  51. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  52. return
  53. }
  54. task := &Task{
  55. Action: action,
  56. Handler: reflect.ValueOf(handler),
  57. Args: args,
  58. Created: time.Now(),
  59. Async: async,
  60. Delay: delay,
  61. Timeout: timeout,
  62. }
  63. if gulu.Str.Contains(action, uniqueActions) {
  64. if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
  65. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  66. return
  67. }
  68. }
  69. queueLock.Lock()
  70. defer queueLock.Unlock()
  71. taskQueue = append(taskQueue, task)
  72. }
  73. func containTask(task *Task, tasks []*Task) bool {
  74. for _, t := range tasks {
  75. if t.Action == task.Action {
  76. if len(t.Args) != len(task.Args) {
  77. return false
  78. }
  79. for i, arg := range t.Args {
  80. if arg != task.Args[i] {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. }
  87. return false
  88. }
  89. func getCurrentTasks() (ret []*Task) {
  90. queueLock.Lock()
  91. defer queueLock.Unlock()
  92. currentTaskLock.Lock()
  93. if nil != currentTask {
  94. ret = append(ret, currentTask)
  95. }
  96. currentTaskLock.Unlock()
  97. for _, task := range taskQueue {
  98. ret = append(ret, task)
  99. }
  100. return
  101. }
  102. const (
  103. RepoCheckout = "task.repo.checkout" // 从快照中检出
  104. DatabaseIndexFull = "task.database.index.full" // 重建索引
  105. DatabaseIndex = "task.database.index" // 数据库索引
  106. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  107. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  108. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  109. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  110. HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
  111. HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
  112. HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
  113. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  114. ReloadUI = "task.reload.ui" // 重载 UI
  115. AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
  116. AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
  117. CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
  118. ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
  119. PushMsg = "task.push.msg" // 推送消息
  120. )
  121. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  122. var uniqueActions = []string{
  123. RepoCheckout,
  124. DatabaseIndexFull,
  125. DatabaseIndexCommit,
  126. OCRImage,
  127. HistoryGenerateFile,
  128. HistoryDatabaseIndexFull,
  129. HistoryDatabaseIndexCommit,
  130. AssetContentDatabaseIndexFull,
  131. AssetContentDatabaseIndexCommit,
  132. ReloadAttributeView,
  133. }
  134. func ContainIndexTask() bool {
  135. tasks := getCurrentTasks()
  136. for _, task := range tasks {
  137. if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
  138. return true
  139. }
  140. }
  141. return false
  142. }
  143. func StatusJob() {
  144. var items []map[string]interface{}
  145. count := map[string]int{}
  146. actionLangs := util.TaskActionLangs[util.Lang]
  147. queueLock.Lock()
  148. for _, task := range taskQueue {
  149. action := task.Action
  150. if c := count[action]; 2 < c {
  151. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  152. continue
  153. }
  154. count[action]++
  155. if nil != actionLangs {
  156. if label := actionLangs[task.Action]; nil != label {
  157. action = label.(string)
  158. } else {
  159. continue
  160. }
  161. }
  162. item := map[string]interface{}{"action": action}
  163. items = append(items, item)
  164. }
  165. defer queueLock.Unlock()
  166. currentTaskLock.Lock()
  167. if nil != currentTask && nil != actionLangs {
  168. if label := actionLangs[currentTask.Action]; nil != label {
  169. items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
  170. }
  171. }
  172. currentTaskLock.Unlock()
  173. if 1 > len(items) {
  174. items = []map[string]interface{}{}
  175. }
  176. data := map[string]interface{}{}
  177. data["tasks"] = items
  178. util.PushBackgroundTask(data)
  179. }
  180. func ExecTaskJob() {
  181. task := popTask()
  182. if nil == task {
  183. return
  184. }
  185. if util.IsExiting.Load() {
  186. return
  187. }
  188. execTask(task)
  189. }
  190. func popTask() (ret *Task) {
  191. queueLock.Lock()
  192. defer queueLock.Unlock()
  193. if 1 > len(taskQueue) {
  194. return
  195. }
  196. for i, task := range taskQueue {
  197. if time.Since(task.Created) <= task.Delay {
  198. continue
  199. }
  200. if !task.Async {
  201. ret = task
  202. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  203. return
  204. }
  205. }
  206. return
  207. }
  208. func ExecAsyncTaskJob() {
  209. tasks := popAsyncTasks()
  210. if 1 > len(tasks) {
  211. return
  212. }
  213. if util.IsExiting.Load() {
  214. return
  215. }
  216. for _, task := range tasks {
  217. go func() {
  218. execTask(task)
  219. }()
  220. }
  221. }
  222. func popAsyncTasks() (ret []*Task) {
  223. queueLock.Lock()
  224. defer queueLock.Unlock()
  225. if 1 > len(taskQueue) {
  226. return
  227. }
  228. var popedIndexes []int
  229. for i, task := range taskQueue {
  230. if !task.Async {
  231. continue
  232. }
  233. if time.Since(task.Created) <= task.Delay {
  234. continue
  235. }
  236. if task.Async {
  237. ret = append(ret, task)
  238. popedIndexes = append(popedIndexes, i)
  239. }
  240. }
  241. if 0 < len(popedIndexes) {
  242. var newQueue []*Task
  243. for i, task := range taskQueue {
  244. if !slices.Contains(popedIndexes, i) {
  245. newQueue = append(newQueue, task)
  246. }
  247. }
  248. taskQueue = newQueue
  249. }
  250. return
  251. }
  252. var (
  253. currentTask *Task
  254. currentTaskLock = sync.Mutex{}
  255. )
  256. func execTask(task *Task) {
  257. if nil == task {
  258. return
  259. }
  260. defer logging.Recover()
  261. args := make([]reflect.Value, len(task.Args))
  262. for i, v := range task.Args {
  263. if nil == v {
  264. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  265. } else {
  266. args[i] = reflect.ValueOf(v)
  267. }
  268. }
  269. if !task.Async {
  270. currentTaskLock.Lock()
  271. currentTask = task
  272. currentTaskLock.Unlock()
  273. }
  274. ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
  275. defer cancel()
  276. ch := make(chan bool, 1)
  277. go func() {
  278. task.Handler.Call(args)
  279. ch <- true
  280. }()
  281. select {
  282. case <-ctx.Done():
  283. logging.LogWarnf("task [%s] timeout", task.Action)
  284. case <-ch:
  285. //logging.LogInfof("task [%s] done", task.Action)
  286. }
  287. if !task.Async {
  288. currentTaskLock.Lock()
  289. currentTask = nil
  290. currentTaskLock.Unlock()
  291. }
  292. }