queue.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "slices"
  21. "sync"
  22. "time"
  23. "github.com/88250/gulu"
  24. "github.com/siyuan-note/logging"
  25. "github.com/siyuan-note/siyuan/kernel/util"
  26. )
  27. var (
  28. taskQueue []*Task
  29. queueLock = sync.Mutex{}
  30. )
  31. type Task struct {
  32. Action string
  33. Handler reflect.Value
  34. Args []interface{}
  35. Created time.Time
  36. Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
  37. Delay time.Duration
  38. Timeout time.Duration
  39. }
  40. func AppendTask(action string, handler interface{}, args ...interface{}) {
  41. appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
  42. }
  43. func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
  44. appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
  45. }
  46. func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
  47. appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
  48. }
  49. func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
  50. if util.IsExiting.Load() {
  51. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  52. return
  53. }
  54. task := &Task{
  55. Action: action,
  56. Handler: reflect.ValueOf(handler),
  57. Args: args,
  58. Created: time.Now(),
  59. Async: async,
  60. Delay: delay,
  61. Timeout: timeout,
  62. }
  63. if gulu.Str.Contains(action, uniqueActions) {
  64. if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
  65. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  66. return
  67. }
  68. }
  69. queueLock.Lock()
  70. defer queueLock.Unlock()
  71. taskQueue = append(taskQueue, task)
  72. }
  73. func containTask(task *Task, tasks []*Task) bool {
  74. for _, t := range tasks {
  75. if t.Action == task.Action {
  76. if len(t.Args) != len(task.Args) {
  77. return false
  78. }
  79. for i, arg := range t.Args {
  80. if !reflect.DeepEqual(arg, task.Args[i]) {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. }
  87. return false
  88. }
  89. func getCurrentTasks() (ret []*Task) {
  90. queueLock.Lock()
  91. defer queueLock.Unlock()
  92. currentTaskLock.Lock()
  93. if nil != currentTask {
  94. ret = append(ret, currentTask)
  95. }
  96. currentTaskLock.Unlock()
  97. for _, task := range taskQueue {
  98. ret = append(ret, task)
  99. }
  100. return
  101. }
  102. const (
  103. RepoCheckout = "task.repo.checkout" // 从快照中检出
  104. RepoAutoPurge = "task.repo.autoPurge" // 自动清理数据仓库
  105. DatabaseIndexFull = "task.database.index.full" // 重建索引
  106. DatabaseIndex = "task.database.index" // 数据库索引
  107. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  108. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  109. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  110. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  111. HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
  112. HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
  113. HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
  114. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  115. ReloadUI = "task.reload.ui" // 重载 UI
  116. AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
  117. AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
  118. CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
  119. ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
  120. ReloadProtyle = "task.reload.protyle" // 重新加载编辑器
  121. SetRefDynamicText = "task.ref.setDynamicText" // 设置引用的动态锚文本
  122. SetDefRefCount = "task.def.setRefCount" // 设置定义的引用计数
  123. UpdateIDs = "task.update.ids" // 更新 ID
  124. PushMsg = "task.push.msg" // 推送消息
  125. )
  126. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  127. var uniqueActions = []string{
  128. RepoCheckout,
  129. RepoAutoPurge,
  130. DatabaseIndexFull,
  131. DatabaseIndexCommit,
  132. OCRImage,
  133. HistoryGenerateFile,
  134. HistoryDatabaseIndexFull,
  135. HistoryDatabaseIndexCommit,
  136. AssetContentDatabaseIndexFull,
  137. AssetContentDatabaseIndexCommit,
  138. ReloadAttributeView,
  139. ReloadProtyle,
  140. SetRefDynamicText,
  141. SetDefRefCount,
  142. UpdateIDs,
  143. }
  144. func ContainIndexTask() bool {
  145. tasks := getCurrentTasks()
  146. for _, task := range tasks {
  147. if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
  148. return true
  149. }
  150. }
  151. return false
  152. }
  153. func StatusJob() {
  154. var items []map[string]interface{}
  155. count := map[string]int{}
  156. actionLangs := util.TaskActionLangs[util.Lang]
  157. queueLock.Lock()
  158. for _, task := range taskQueue {
  159. action := task.Action
  160. if c := count[action]; 2 < c {
  161. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  162. continue
  163. }
  164. count[action]++
  165. if nil != actionLangs {
  166. if label := actionLangs[task.Action]; nil != label {
  167. action = label.(string)
  168. } else {
  169. continue
  170. }
  171. }
  172. item := map[string]interface{}{"action": action}
  173. items = append(items, item)
  174. }
  175. defer queueLock.Unlock()
  176. currentTaskLock.Lock()
  177. if nil != currentTask && nil != actionLangs {
  178. if label := actionLangs[currentTask.Action]; nil != label {
  179. items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
  180. }
  181. }
  182. currentTaskLock.Unlock()
  183. if 1 > len(items) {
  184. items = []map[string]interface{}{}
  185. }
  186. data := map[string]interface{}{}
  187. data["tasks"] = items
  188. util.PushBackgroundTask(data)
  189. }
  190. func ExecTaskJob() {
  191. task := popTask()
  192. if nil == task {
  193. return
  194. }
  195. if util.IsExiting.Load() {
  196. return
  197. }
  198. execTask(task)
  199. }
  200. func popTask() (ret *Task) {
  201. queueLock.Lock()
  202. defer queueLock.Unlock()
  203. if 1 > len(taskQueue) {
  204. return
  205. }
  206. for i, task := range taskQueue {
  207. if time.Since(task.Created) <= task.Delay {
  208. continue
  209. }
  210. if !task.Async {
  211. ret = task
  212. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  213. return
  214. }
  215. }
  216. return
  217. }
  218. func ExecAsyncTaskJob() {
  219. tasks := popAsyncTasks()
  220. if 1 > len(tasks) {
  221. return
  222. }
  223. if util.IsExiting.Load() {
  224. return
  225. }
  226. for _, task := range tasks {
  227. go func() {
  228. execTask(task)
  229. }()
  230. }
  231. }
  232. func popAsyncTasks() (ret []*Task) {
  233. queueLock.Lock()
  234. defer queueLock.Unlock()
  235. if 1 > len(taskQueue) {
  236. return
  237. }
  238. var popedIndexes []int
  239. for i, task := range taskQueue {
  240. if !task.Async {
  241. continue
  242. }
  243. if time.Since(task.Created) <= task.Delay {
  244. continue
  245. }
  246. if task.Async {
  247. ret = append(ret, task)
  248. popedIndexes = append(popedIndexes, i)
  249. }
  250. }
  251. if 0 < len(popedIndexes) {
  252. var newQueue []*Task
  253. for i, task := range taskQueue {
  254. if !slices.Contains(popedIndexes, i) {
  255. newQueue = append(newQueue, task)
  256. }
  257. }
  258. taskQueue = newQueue
  259. }
  260. return
  261. }
  262. var (
  263. currentTask *Task
  264. currentTaskLock = sync.Mutex{}
  265. )
  266. func execTask(task *Task) {
  267. if nil == task {
  268. return
  269. }
  270. defer logging.Recover()
  271. args := make([]reflect.Value, len(task.Args))
  272. for i, v := range task.Args {
  273. if nil == v {
  274. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  275. } else {
  276. args[i] = reflect.ValueOf(v)
  277. }
  278. }
  279. if !task.Async {
  280. currentTaskLock.Lock()
  281. currentTask = task
  282. currentTaskLock.Unlock()
  283. }
  284. ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
  285. defer cancel()
  286. ch := make(chan bool, 1)
  287. go func() {
  288. task.Handler.Call(args)
  289. ch <- true
  290. }()
  291. select {
  292. case <-ctx.Done():
  293. logging.LogWarnf("task [%s] timeout", task.Action)
  294. case <-ch:
  295. //logging.LogInfof("task [%s] done", task.Action)
  296. }
  297. if !task.Async {
  298. currentTaskLock.Lock()
  299. currentTask = nil
  300. currentTaskLock.Unlock()
  301. }
  302. }