queue.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/88250/gulu"
  23. "github.com/siyuan-note/logging"
  24. "github.com/siyuan-note/siyuan/kernel/util"
  25. )
  26. var (
  27. taskQueue []*Task
  28. queueLock = sync.Mutex{}
  29. )
  30. type Task struct {
  31. Action string
  32. Handler reflect.Value
  33. Args []interface{}
  34. Created time.Time
  35. Timeout time.Duration
  36. }
  37. func AppendTask(action string, handler interface{}, args ...interface{}) {
  38. AppendTaskWithTimeout(action, 24*time.Hour, handler, args...)
  39. }
  40. func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
  41. if util.IsExiting.Load() {
  42. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  43. return
  44. }
  45. currentActions := getCurrentActions()
  46. if gulu.Str.Contains(action, currentActions) && gulu.Str.Contains(action, uniqueActions) {
  47. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  48. return
  49. }
  50. queueLock.Lock()
  51. defer queueLock.Unlock()
  52. taskQueue = append(taskQueue, &Task{
  53. Action: action,
  54. Timeout: timeout,
  55. Handler: reflect.ValueOf(handler),
  56. Args: args,
  57. Created: time.Now(),
  58. })
  59. }
  60. func getCurrentActions() (ret []string) {
  61. queueLock.Lock()
  62. currentTaskActionLock.Lock()
  63. if "" != currentTaskAction {
  64. ret = append(ret, currentTaskAction)
  65. }
  66. currentTaskActionLock.Unlock()
  67. for _, task := range taskQueue {
  68. ret = append(ret, task.Action)
  69. }
  70. queueLock.Unlock()
  71. return
  72. }
  73. const (
  74. RepoCheckout = "task.repo.checkout" // 从快照中检出
  75. DatabaseIndexFull = "task.database.index.full" // 重建索引
  76. DatabaseIndex = "task.database.index" // 数据库索引
  77. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  78. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  79. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  80. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  81. HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
  82. HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
  83. HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
  84. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  85. ReloadUI = "task.reload.ui" // 重载 UI
  86. AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
  87. AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
  88. CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
  89. )
  90. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  91. var uniqueActions = []string{
  92. RepoCheckout,
  93. DatabaseIndexFull,
  94. DatabaseIndexCommit,
  95. OCRImage,
  96. HistoryGenerateFile,
  97. HistoryDatabaseIndexFull,
  98. HistoryDatabaseIndexCommit,
  99. AssetContentDatabaseIndexFull,
  100. AssetContentDatabaseIndexCommit,
  101. }
  102. func ContainIndexTask() bool {
  103. actions := getCurrentActions()
  104. for _, action := range actions {
  105. if gulu.Str.Contains(action, []string{DatabaseIndexFull, DatabaseIndex}) {
  106. return true
  107. }
  108. }
  109. return false
  110. }
  111. func StatusJob() {
  112. var items []map[string]interface{}
  113. count := map[string]int{}
  114. actionLangs := util.TaskActionLangs[util.Lang]
  115. queueLock.Lock()
  116. for _, task := range taskQueue {
  117. action := task.Action
  118. if c := count[action]; 2 < c {
  119. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  120. continue
  121. }
  122. count[action]++
  123. if nil != actionLangs {
  124. if label := actionLangs[task.Action]; nil != label {
  125. action = label.(string)
  126. }
  127. }
  128. item := map[string]interface{}{"action": action}
  129. items = append(items, item)
  130. }
  131. defer queueLock.Unlock()
  132. currentTaskActionLock.Lock()
  133. if "" != currentTaskAction {
  134. if nil != actionLangs {
  135. if label := actionLangs[currentTaskAction]; nil != label {
  136. items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
  137. }
  138. }
  139. }
  140. currentTaskActionLock.Unlock()
  141. if 1 > len(items) {
  142. items = []map[string]interface{}{}
  143. }
  144. data := map[string]interface{}{}
  145. data["tasks"] = items
  146. util.PushBackgroundTask(data)
  147. }
  148. func ExecTaskJob() {
  149. task := popTask()
  150. if nil == task {
  151. return
  152. }
  153. if util.IsExiting.Load() {
  154. return
  155. }
  156. execTask(task)
  157. }
  158. func popTask() (ret *Task) {
  159. queueLock.Lock()
  160. defer queueLock.Unlock()
  161. if 0 == len(taskQueue) {
  162. return
  163. }
  164. ret = taskQueue[0]
  165. taskQueue = taskQueue[1:]
  166. return
  167. }
  168. var (
  169. currentTaskAction string
  170. currentTaskActionLock = sync.Mutex{}
  171. )
  172. func execTask(task *Task) {
  173. defer logging.Recover()
  174. args := make([]reflect.Value, len(task.Args))
  175. for i, v := range task.Args {
  176. if nil == v {
  177. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  178. } else {
  179. args[i] = reflect.ValueOf(v)
  180. }
  181. }
  182. currentTaskActionLock.Lock()
  183. currentTaskAction = task.Action
  184. currentTaskActionLock.Unlock()
  185. ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
  186. defer cancel()
  187. ch := make(chan bool, 1)
  188. go func() {
  189. task.Handler.Call(args)
  190. ch <- true
  191. }()
  192. select {
  193. case <-ctx.Done():
  194. logging.LogWarnf("task [%s] timeout", task.Action)
  195. case <-ch:
  196. //logging.LogInfof("task [%s] done", task.Action)
  197. }
  198. currentTaskActionLock.Lock()
  199. currentTaskAction = ""
  200. currentTaskActionLock.Unlock()
  201. }