queue.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // SiYuan - Build Your Eternal Digital Garden
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/88250/gulu"
  23. "github.com/siyuan-note/logging"
  24. "github.com/siyuan-note/siyuan/kernel/util"
  25. )
  26. var (
  27. taskQueue []*Task
  28. queueLock = sync.Mutex{}
  29. )
  30. type Task struct {
  31. Action string
  32. Handler reflect.Value
  33. Args []interface{}
  34. Created time.Time
  35. }
  36. func AppendTask(action string, handler interface{}, args ...interface{}) {
  37. if util.IsExiting {
  38. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  39. return
  40. }
  41. currentActions := getCurrentActions()
  42. if gulu.Str.Contains(action, currentActions) && gulu.Str.Contains(action, uniqueActions) {
  43. //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
  44. return
  45. }
  46. queueLock.Lock()
  47. defer queueLock.Unlock()
  48. taskQueue = append(taskQueue, newTask(action, handler, args...))
  49. }
  50. func newTask(action string, handler interface{}, args ...interface{}) *Task {
  51. return &Task{
  52. Action: action,
  53. Handler: reflect.ValueOf(handler),
  54. Args: args,
  55. Created: time.Now(),
  56. }
  57. }
  58. func getCurrentActions() (ret []string) {
  59. queueLock.Lock()
  60. defer queueLock.Unlock()
  61. if "" != currentTaskAction {
  62. ret = append(ret, currentTaskAction)
  63. }
  64. for _, task := range taskQueue {
  65. ret = append(ret, task.Action)
  66. }
  67. return
  68. }
  69. const (
  70. RepoCheckout = "task.repo.checkout" // 从快照中检出
  71. DatabaseIndexFull = "task.database.index.full" // 重建索引
  72. DatabaseIndex = "task.database.index" // 数据库索引
  73. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  74. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  75. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  76. DatabaseCache = "task.database.cache" // 数据库缓存
  77. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  78. HistoryGenerateDoc = "task.history.generateDoc" // 生成文件历史
  79. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  80. ReloadUI = "task.reload.ui" // 重载 UI
  81. )
  82. // uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
  83. var uniqueActions = []string{
  84. RepoCheckout,
  85. DatabaseIndexFull,
  86. DatabaseIndexCommit,
  87. OCRImage,
  88. HistoryGenerateDoc,
  89. DatabaseIndexEmbedBlock,
  90. }
  91. func Contain(action string, moreActions ...string) bool {
  92. actions := append(moreActions, action)
  93. actions = gulu.Str.RemoveDuplicatedElem(actions)
  94. for _, task := range taskQueue {
  95. if gulu.Str.Contains(task.Action, actions) {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. func StatusJob() {
  102. tasks := taskQueue
  103. var items []map[string]interface{}
  104. count := map[string]int{}
  105. actionLangs := util.TaskActionLangs[util.Lang]
  106. for _, task := range tasks {
  107. action := task.Action
  108. if c := count[action]; 2 < c {
  109. logging.LogWarnf("too many tasks [%s], ignore show its status", action)
  110. continue
  111. }
  112. count[action]++
  113. if nil != actionLangs {
  114. if label := actionLangs[task.Action]; nil != label {
  115. action = label.(string)
  116. }
  117. }
  118. item := map[string]interface{}{"action": action}
  119. items = append(items, item)
  120. }
  121. if "" != currentTaskAction {
  122. if nil != actionLangs {
  123. if label := actionLangs[currentTaskAction]; nil != label {
  124. items = append([]map[string]interface{}{map[string]interface{}{"action": label.(string)}}, items...)
  125. }
  126. }
  127. }
  128. if 1 > len(items) {
  129. items = []map[string]interface{}{}
  130. }
  131. data := map[string]interface{}{}
  132. data["tasks"] = items
  133. util.PushBackgroundTask(data)
  134. }
  135. func ExecTaskJob() {
  136. task := popTask()
  137. if nil == task {
  138. return
  139. }
  140. if util.IsExiting {
  141. return
  142. }
  143. execTask(task)
  144. }
  145. var currentTaskAction string
  146. func popTask() (ret *Task) {
  147. queueLock.Lock()
  148. defer queueLock.Unlock()
  149. if 0 == len(taskQueue) {
  150. return
  151. }
  152. ret = taskQueue[0]
  153. taskQueue = taskQueue[1:]
  154. return
  155. }
  156. func execTask(task *Task) {
  157. defer logging.Recover()
  158. args := make([]reflect.Value, len(task.Args))
  159. for i, v := range task.Args {
  160. if nil == v {
  161. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  162. } else {
  163. args[i] = reflect.ValueOf(v)
  164. }
  165. }
  166. currentTaskAction = task.Action
  167. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  168. defer cancel()
  169. ch := make(chan bool, 1)
  170. go func() {
  171. task.Handler.Call(args)
  172. ch <- true
  173. }()
  174. select {
  175. case <-ctx.Done():
  176. logging.LogWarnf("task [%s] timeout", task.Action)
  177. case <-ch:
  178. //logging.LogInfof("task [%s] done", task.Action)
  179. }
  180. }