queue.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "slices"
  21. "sync"
  22. "time"
  23. "github.com/88250/gulu"
  24. "github.com/siyuan-note/logging"
  25. "github.com/siyuan-note/siyuan/kernel/util"
  26. )
  27. var (
  28. taskQueue []*Task
  29. queueLock = sync.Mutex{}
  30. )
  31. type Task struct {
  32. Action string
  33. Handler reflect.Value
  34. Args []interface{}
  35. Created time.Time
  36. Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
  37. Delay time.Duration
  38. Timeout time.Duration
  39. }
  40. func AppendTask(action string, handler interface{}, args ...interface{}) {
  41. appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
  42. }
  43. func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
  44. appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
  45. }
  46. func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
  47. appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
  48. }
  49. func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
  50. if util.IsExiting.Load() {
  51. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  52. return
  53. }
  54. task := &Task{
  55. Action: action,
  56. Handler: reflect.ValueOf(handler),
  57. Args: args,
  58. Created: time.Now(),
  59. Async: async,
  60. Delay: delay,
  61. Timeout: timeout,
  62. }
  63. if gulu.Str.Contains(action, uniqueActions) {
  64. if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
  65. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  66. return
  67. }
  68. }
  69. queueLock.Lock()
  70. defer queueLock.Unlock()
  71. taskQueue = append(taskQueue, task)
  72. }
  73. func containTask(task *Task, tasks []*Task) bool {
  74. for _, t := range tasks {
  75. if t.Action == task.Action {
  76. if len(t.Args) != len(task.Args) {
  77. return false
  78. }
  79. for i, arg := range t.Args {
  80. if arg != task.Args[i] {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. }
  87. return false
  88. }
  89. func getCurrentTasks() (ret []*Task) {
  90. queueLock.Lock()
  91. defer queueLock.Unlock()
  92. currentTaskLock.Lock()
  93. if nil != currentTask {
  94. ret = append(ret, currentTask)
  95. }
  96. currentTaskLock.Unlock()
  97. for _, task := range taskQueue {
  98. ret = append(ret, task)
  99. }
  100. return
  101. }
  102. const (
  103. RepoCheckout = "task.repo.checkout" // 从快照中检出
  104. DatabaseIndexFull = "task.database.index.full" // 重建索引
  105. DatabaseIndex = "task.database.index" // 数据库索引
  106. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  107. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  108. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  109. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  110. HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
  111. HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
  112. HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
  113. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  114. ReloadUI = "task.reload.ui" // 重载 UI
  115. AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
  116. AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
  117. CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
  118. ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
  119. ReloadProtyle = "task.reload.protyle" // 重新加载编辑器
  120. SetRefDynamicText = "task.ref.setDynamicText" // 设置引用的动态锚文本
  121. SetDefRefCount = "task.def.setRefCount" // 设置定义的引用计数
  122. PushMsg = "task.push.msg" // 推送消息
  123. )
  124. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  125. var uniqueActions = []string{
  126. RepoCheckout,
  127. DatabaseIndexFull,
  128. DatabaseIndexCommit,
  129. OCRImage,
  130. HistoryGenerateFile,
  131. HistoryDatabaseIndexFull,
  132. HistoryDatabaseIndexCommit,
  133. AssetContentDatabaseIndexFull,
  134. AssetContentDatabaseIndexCommit,
  135. ReloadAttributeView,
  136. ReloadProtyle,
  137. SetRefDynamicText,
  138. SetDefRefCount,
  139. }
  140. func ContainIndexTask() bool {
  141. tasks := getCurrentTasks()
  142. for _, task := range tasks {
  143. if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
  144. return true
  145. }
  146. }
  147. return false
  148. }
  149. func StatusJob() {
  150. var items []map[string]interface{}
  151. count := map[string]int{}
  152. actionLangs := util.TaskActionLangs[util.Lang]
  153. queueLock.Lock()
  154. for _, task := range taskQueue {
  155. action := task.Action
  156. if c := count[action]; 2 < c {
  157. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  158. continue
  159. }
  160. count[action]++
  161. if nil != actionLangs {
  162. if label := actionLangs[task.Action]; nil != label {
  163. action = label.(string)
  164. } else {
  165. continue
  166. }
  167. }
  168. item := map[string]interface{}{"action": action}
  169. items = append(items, item)
  170. }
  171. defer queueLock.Unlock()
  172. currentTaskLock.Lock()
  173. if nil != currentTask && nil != actionLangs {
  174. if label := actionLangs[currentTask.Action]; nil != label {
  175. items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
  176. }
  177. }
  178. currentTaskLock.Unlock()
  179. if 1 > len(items) {
  180. items = []map[string]interface{}{}
  181. }
  182. data := map[string]interface{}{}
  183. data["tasks"] = items
  184. util.PushBackgroundTask(data)
  185. }
  186. func ExecTaskJob() {
  187. task := popTask()
  188. if nil == task {
  189. return
  190. }
  191. if util.IsExiting.Load() {
  192. return
  193. }
  194. execTask(task)
  195. }
  196. func popTask() (ret *Task) {
  197. queueLock.Lock()
  198. defer queueLock.Unlock()
  199. if 1 > len(taskQueue) {
  200. return
  201. }
  202. for i, task := range taskQueue {
  203. if time.Since(task.Created) <= task.Delay {
  204. continue
  205. }
  206. if !task.Async {
  207. ret = task
  208. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  209. return
  210. }
  211. }
  212. return
  213. }
  214. func ExecAsyncTaskJob() {
  215. tasks := popAsyncTasks()
  216. if 1 > len(tasks) {
  217. return
  218. }
  219. if util.IsExiting.Load() {
  220. return
  221. }
  222. for _, task := range tasks {
  223. go func() {
  224. execTask(task)
  225. }()
  226. }
  227. }
  228. func popAsyncTasks() (ret []*Task) {
  229. queueLock.Lock()
  230. defer queueLock.Unlock()
  231. if 1 > len(taskQueue) {
  232. return
  233. }
  234. var popedIndexes []int
  235. for i, task := range taskQueue {
  236. if !task.Async {
  237. continue
  238. }
  239. if time.Since(task.Created) <= task.Delay {
  240. continue
  241. }
  242. if task.Async {
  243. ret = append(ret, task)
  244. popedIndexes = append(popedIndexes, i)
  245. }
  246. }
  247. if 0 < len(popedIndexes) {
  248. var newQueue []*Task
  249. for i, task := range taskQueue {
  250. if !slices.Contains(popedIndexes, i) {
  251. newQueue = append(newQueue, task)
  252. }
  253. }
  254. taskQueue = newQueue
  255. }
  256. return
  257. }
  258. var (
  259. currentTask *Task
  260. currentTaskLock = sync.Mutex{}
  261. )
  262. func execTask(task *Task) {
  263. if nil == task {
  264. return
  265. }
  266. defer logging.Recover()
  267. args := make([]reflect.Value, len(task.Args))
  268. for i, v := range task.Args {
  269. if nil == v {
  270. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  271. } else {
  272. args[i] = reflect.ValueOf(v)
  273. }
  274. }
  275. if !task.Async {
  276. currentTaskLock.Lock()
  277. currentTask = task
  278. currentTaskLock.Unlock()
  279. }
  280. ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
  281. defer cancel()
  282. ch := make(chan bool, 1)
  283. go func() {
  284. task.Handler.Call(args)
  285. ch <- true
  286. }()
  287. select {
  288. case <-ctx.Done():
  289. logging.LogWarnf("task [%s] timeout", task.Action)
  290. case <-ch:
  291. //logging.LogInfof("task [%s] done", task.Action)
  292. }
  293. if !task.Async {
  294. currentTaskLock.Lock()
  295. currentTask = nil
  296. currentTaskLock.Unlock()
  297. }
  298. }