queue.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // SiYuan - Build Your Eternal Digital Garden
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "context"
  19. "github.com/siyuan-note/siyuan/kernel/util"
  20. "reflect"
  21. "sync"
  22. "time"
  23. "github.com/siyuan-note/logging"
  24. )
  25. var (
  26. taskQueue []*Task
  27. taskQueueStatus int
  28. queueLock = sync.Mutex{}
  29. )
  30. type Task struct {
  31. Action string
  32. Handler reflect.Value
  33. Args []interface{}
  34. Created time.Time
  35. }
  36. func PrependTask(action string, handler interface{}, args ...interface{}) {
  37. queueLock.Lock()
  38. defer queueLock.Unlock()
  39. if util.IsExiting {
  40. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  41. return
  42. }
  43. cancelTask(action, args...)
  44. taskQueue = append([]*Task{newTask(action, handler, args...)}, taskQueue...)
  45. }
  46. func AppendTask(action string, handler interface{}, args ...interface{}) {
  47. queueLock.Lock()
  48. defer queueLock.Unlock()
  49. if util.IsExiting {
  50. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  51. return
  52. }
  53. cancelTask(action, args...)
  54. taskQueue = append(taskQueue, newTask(action, handler, args...))
  55. }
  56. func cancelTask(action string, args ...interface{}) {
  57. for i := len(taskQueue) - 1; i >= 0; i-- {
  58. task := taskQueue[i]
  59. if action == task.Action {
  60. if len(task.Args) != len(args) {
  61. continue
  62. }
  63. for j, arg := range args {
  64. if arg != task.Args[j] {
  65. continue
  66. }
  67. }
  68. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  69. break
  70. }
  71. }
  72. }
  73. func newTask(action string, handler interface{}, args ...interface{}) *Task {
  74. return &Task{
  75. Action: action,
  76. Handler: reflect.ValueOf(handler),
  77. Args: args,
  78. Created: time.Now(),
  79. }
  80. }
  81. const (
  82. RepoCheckout = "task.repo.checkout" // 从快照中检出
  83. DatabaseIndexFull = "task.database.index.full" // 重建索引
  84. DatabaseIndex = "task.database.index" // 数据库索引
  85. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  86. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  87. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  88. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  89. HistoryGenerateDoc = "task.history.generateDoc" // 生成文件历史
  90. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  91. ReloadUI = "task.reload.ui" // 重载 UI
  92. )
  93. func ContainIndexTask() bool {
  94. for _, task := range taskQueue {
  95. if DatabaseIndex == task.Action || DatabaseIndexFull == task.Action {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. func StatusJob() {
  102. tasks := taskQueue
  103. data := map[string]interface{}{}
  104. var items []map[string]interface{}
  105. for _, task := range tasks {
  106. if OCRImage == task.Action || DatabaseIndexEmbedBlock == task.Action {
  107. continue
  108. }
  109. actionLangs := util.TaskActionLangs[util.Lang]
  110. action := task.Action
  111. if nil != actionLangs {
  112. if label := actionLangs[task.Action]; nil != label {
  113. action = label.(string)
  114. }
  115. }
  116. item := map[string]interface{}{
  117. "action": action,
  118. }
  119. items = append(items, item)
  120. }
  121. if 1 > len(items) {
  122. items = []map[string]interface{}{}
  123. }
  124. data["tasks"] = items
  125. util.PushBackgroundTask(data)
  126. }
  127. func ExecTaskJob() {
  128. task := popTask()
  129. if nil == task {
  130. return
  131. }
  132. if util.IsExiting {
  133. return
  134. }
  135. execTask(task)
  136. }
  137. func popTask() (ret *Task) {
  138. queueLock.Lock()
  139. defer queueLock.Unlock()
  140. if 0 == len(taskQueue) {
  141. return
  142. }
  143. ret = taskQueue[0]
  144. taskQueue = taskQueue[1:]
  145. return
  146. }
  147. func execTask(task *Task) {
  148. defer logging.Recover()
  149. args := make([]reflect.Value, len(task.Args))
  150. for i, v := range task.Args {
  151. if nil == v {
  152. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  153. } else {
  154. args[i] = reflect.ValueOf(v)
  155. }
  156. }
  157. ctx, cancel := context.WithTimeout(context.Background(), time.Second*12)
  158. defer cancel()
  159. ch := make(chan bool, 1)
  160. go func() {
  161. task.Handler.Call(args)
  162. ch <- true
  163. }()
  164. select {
  165. case <-ctx.Done():
  166. //logging.LogWarnf("task [%s] timeout", task.Action)
  167. case <-ch:
  168. //logging.LogInfof("task [%s] done", task.Action)
  169. }
  170. }