queue.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "slices"
  21. "sync"
  22. "time"
  23. "github.com/88250/gulu"
  24. "github.com/siyuan-note/logging"
  25. "github.com/siyuan-note/siyuan/kernel/util"
  26. )
  27. var (
  28. taskQueue []*Task
  29. queueLock = sync.Mutex{}
  30. )
  31. type Task struct {
  32. Action string
  33. Handler reflect.Value
  34. Args []interface{}
  35. Created time.Time
  36. Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
  37. Delay time.Duration
  38. Timeout time.Duration
  39. }
  40. func AppendTask(action string, handler interface{}, args ...interface{}) {
  41. appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
  42. }
  43. func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
  44. appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
  45. }
  46. func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
  47. appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
  48. }
  49. func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
  50. if util.IsExiting.Load() {
  51. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  52. return
  53. }
  54. task := &Task{
  55. Action: action,
  56. Handler: reflect.ValueOf(handler),
  57. Args: args,
  58. Created: time.Now(),
  59. Async: async,
  60. Delay: delay,
  61. Timeout: timeout,
  62. }
  63. if gulu.Str.Contains(action, uniqueActions) {
  64. if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
  65. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  66. return
  67. }
  68. }
  69. queueLock.Lock()
  70. defer queueLock.Unlock()
  71. taskQueue = append(taskQueue, task)
  72. }
  73. func containTask(task *Task, tasks []*Task) bool {
  74. for _, t := range tasks {
  75. if t.Action == task.Action {
  76. if len(t.Args) != len(task.Args) {
  77. return false
  78. }
  79. for i, arg := range t.Args {
  80. if arg != task.Args[i] {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. }
  87. return false
  88. }
  89. func getCurrentTasks() (ret []*Task) {
  90. queueLock.Lock()
  91. defer queueLock.Unlock()
  92. currentTaskLock.Lock()
  93. if nil != currentTask {
  94. ret = append(ret, currentTask)
  95. }
  96. currentTaskLock.Unlock()
  97. for _, task := range taskQueue {
  98. ret = append(ret, task)
  99. }
  100. return
  101. }
  102. const (
  103. RepoCheckout = "task.repo.checkout" // 从快照中检出
  104. DatabaseIndexFull = "task.database.index.full" // 重建索引
  105. DatabaseIndex = "task.database.index" // 数据库索引
  106. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  107. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  108. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  109. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  110. HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
  111. HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
  112. HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
  113. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  114. ReloadUI = "task.reload.ui" // 重载 UI
  115. AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
  116. AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
  117. CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
  118. ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
  119. SetRefDynamicText = "task.ref.setDynamicText" // 设置引用的动态锚文本
  120. SetDefRefCount = "task.def.setRefCount" // 设置定义的引用计数
  121. PushMsg = "task.push.msg" // 推送消息
  122. )
  123. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  124. var uniqueActions = []string{
  125. RepoCheckout,
  126. DatabaseIndexFull,
  127. DatabaseIndexCommit,
  128. OCRImage,
  129. HistoryGenerateFile,
  130. HistoryDatabaseIndexFull,
  131. HistoryDatabaseIndexCommit,
  132. AssetContentDatabaseIndexFull,
  133. AssetContentDatabaseIndexCommit,
  134. ReloadAttributeView,
  135. SetRefDynamicText,
  136. SetDefRefCount,
  137. }
  138. func ContainIndexTask() bool {
  139. tasks := getCurrentTasks()
  140. for _, task := range tasks {
  141. if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
  142. return true
  143. }
  144. }
  145. return false
  146. }
  147. func StatusJob() {
  148. var items []map[string]interface{}
  149. count := map[string]int{}
  150. actionLangs := util.TaskActionLangs[util.Lang]
  151. queueLock.Lock()
  152. for _, task := range taskQueue {
  153. action := task.Action
  154. if c := count[action]; 2 < c {
  155. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  156. continue
  157. }
  158. count[action]++
  159. if nil != actionLangs {
  160. if label := actionLangs[task.Action]; nil != label {
  161. action = label.(string)
  162. } else {
  163. continue
  164. }
  165. }
  166. item := map[string]interface{}{"action": action}
  167. items = append(items, item)
  168. }
  169. defer queueLock.Unlock()
  170. currentTaskLock.Lock()
  171. if nil != currentTask && nil != actionLangs {
  172. if label := actionLangs[currentTask.Action]; nil != label {
  173. items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
  174. }
  175. }
  176. currentTaskLock.Unlock()
  177. if 1 > len(items) {
  178. items = []map[string]interface{}{}
  179. }
  180. data := map[string]interface{}{}
  181. data["tasks"] = items
  182. util.PushBackgroundTask(data)
  183. }
  184. func ExecTaskJob() {
  185. task := popTask()
  186. if nil == task {
  187. return
  188. }
  189. if util.IsExiting.Load() {
  190. return
  191. }
  192. execTask(task)
  193. }
  194. func popTask() (ret *Task) {
  195. queueLock.Lock()
  196. defer queueLock.Unlock()
  197. if 1 > len(taskQueue) {
  198. return
  199. }
  200. for i, task := range taskQueue {
  201. if time.Since(task.Created) <= task.Delay {
  202. continue
  203. }
  204. if !task.Async {
  205. ret = task
  206. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  207. return
  208. }
  209. }
  210. return
  211. }
  212. func ExecAsyncTaskJob() {
  213. tasks := popAsyncTasks()
  214. if 1 > len(tasks) {
  215. return
  216. }
  217. if util.IsExiting.Load() {
  218. return
  219. }
  220. for _, task := range tasks {
  221. go func() {
  222. execTask(task)
  223. }()
  224. }
  225. }
  226. func popAsyncTasks() (ret []*Task) {
  227. queueLock.Lock()
  228. defer queueLock.Unlock()
  229. if 1 > len(taskQueue) {
  230. return
  231. }
  232. var popedIndexes []int
  233. for i, task := range taskQueue {
  234. if !task.Async {
  235. continue
  236. }
  237. if time.Since(task.Created) <= task.Delay {
  238. continue
  239. }
  240. if task.Async {
  241. ret = append(ret, task)
  242. popedIndexes = append(popedIndexes, i)
  243. }
  244. }
  245. if 0 < len(popedIndexes) {
  246. var newQueue []*Task
  247. for i, task := range taskQueue {
  248. if !slices.Contains(popedIndexes, i) {
  249. newQueue = append(newQueue, task)
  250. }
  251. }
  252. taskQueue = newQueue
  253. }
  254. return
  255. }
  256. var (
  257. currentTask *Task
  258. currentTaskLock = sync.Mutex{}
  259. )
  260. func execTask(task *Task) {
  261. if nil == task {
  262. return
  263. }
  264. defer logging.Recover()
  265. args := make([]reflect.Value, len(task.Args))
  266. for i, v := range task.Args {
  267. if nil == v {
  268. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  269. } else {
  270. args[i] = reflect.ValueOf(v)
  271. }
  272. }
  273. if !task.Async {
  274. currentTaskLock.Lock()
  275. currentTask = task
  276. currentTaskLock.Unlock()
  277. }
  278. ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
  279. defer cancel()
  280. ch := make(chan bool, 1)
  281. go func() {
  282. task.Handler.Call(args)
  283. ch <- true
  284. }()
  285. select {
  286. case <-ctx.Done():
  287. logging.LogWarnf("task [%s] timeout", task.Action)
  288. case <-ch:
  289. //logging.LogInfof("task [%s] done", task.Action)
  290. }
  291. if !task.Async {
  292. currentTaskLock.Lock()
  293. currentTask = nil
  294. currentTaskLock.Unlock()
  295. }
  296. }