queue.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // SiYuan - Build Your Eternal Digital Garden
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package task
  17. import (
  18. "github.com/siyuan-note/siyuan/kernel/util"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/siyuan-note/logging"
  23. )
  24. var (
  25. taskQueue []*Task
  26. taskQueueStatus int
  27. queueLock = sync.Mutex{}
  28. )
  29. const (
  30. QueueStatusRunning = iota
  31. QueueStatusClosing
  32. )
  33. type Task struct {
  34. Action string
  35. Handler reflect.Value
  36. Args []interface{}
  37. Created time.Time
  38. }
  39. func PrependTask(action string, handler interface{}, args ...interface{}) {
  40. queueLock.Lock()
  41. defer queueLock.Unlock()
  42. if QueueStatusRunning != taskQueueStatus {
  43. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  44. return
  45. }
  46. cancelTask(action, args...)
  47. taskQueue = append([]*Task{newTask(action, handler, args...)}, taskQueue...)
  48. }
  49. func AppendTask(action string, handler interface{}, args ...interface{}) {
  50. queueLock.Lock()
  51. defer queueLock.Unlock()
  52. if QueueStatusRunning != taskQueueStatus {
  53. //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
  54. return
  55. }
  56. cancelTask(action, args...)
  57. taskQueue = append(taskQueue, newTask(action, handler, args...))
  58. }
  59. func cancelTask(action string, args ...interface{}) {
  60. for i := len(taskQueue) - 1; i >= 0; i-- {
  61. task := taskQueue[i]
  62. if action == task.Action {
  63. if len(task.Args) != len(args) {
  64. continue
  65. }
  66. for j, arg := range args {
  67. if arg != task.Args[j] {
  68. continue
  69. }
  70. }
  71. taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
  72. break
  73. }
  74. }
  75. }
  76. func newTask(action string, handler interface{}, args ...interface{}) *Task {
  77. return &Task{
  78. Action: action,
  79. Handler: reflect.ValueOf(handler),
  80. Args: args,
  81. Created: time.Now(),
  82. }
  83. }
  84. const (
  85. RepoCheckout = "task.repo.checkout" // 从快照中检出
  86. DatabaseIndexFull = "task.database.index.full" // 重建索引
  87. DatabaseIndex = "task.database.index" // 数据库索引
  88. DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
  89. DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
  90. DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
  91. OCRImage = "task.ocr.image" // 图片 OCR 提取文本
  92. HistoryGenerateDoc = "task.history.generateDoc" // 生成文件历史
  93. DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
  94. )
  95. func StatusLoop() {
  96. for {
  97. time.Sleep(5 * time.Second)
  98. tasks := taskQueue
  99. data := map[string]interface{}{}
  100. var items []map[string]interface{}
  101. for _, task := range tasks {
  102. if OCRImage == task.Action || DatabaseIndexEmbedBlock == task.Action {
  103. continue
  104. }
  105. actionLangs := util.TaskActionLangs[util.Lang]
  106. action := task.Action
  107. if nil != actionLangs {
  108. if label := actionLangs[task.Action]; nil != label {
  109. action = label.(string)
  110. }
  111. }
  112. item := map[string]interface{}{
  113. "action": action,
  114. }
  115. items = append(items, item)
  116. }
  117. if 1 > len(items) {
  118. items = []map[string]interface{}{}
  119. }
  120. data["tasks"] = items
  121. util.PushBackgroundTask(data)
  122. if 0 < len(tasks) {
  123. time.Sleep(5 * time.Second)
  124. }
  125. }
  126. }
  127. var taskWaitGroup = sync.WaitGroup{}
  128. func Loop() {
  129. for {
  130. time.Sleep(10 * time.Millisecond)
  131. if QueueStatusClosing == taskQueueStatus {
  132. clearQueue()
  133. break
  134. }
  135. task := popTask()
  136. if nil == task {
  137. continue
  138. }
  139. if util.IsExiting {
  140. break
  141. }
  142. taskWaitGroup.Add(1)
  143. go execTask(task)
  144. taskWaitGroup.Wait()
  145. }
  146. }
  147. func clearQueue() {
  148. queueLock.Lock()
  149. defer queueLock.Unlock()
  150. taskQueue = []*Task{}
  151. }
  152. func popTask() (ret *Task) {
  153. queueLock.Lock()
  154. defer queueLock.Unlock()
  155. if 0 == len(taskQueue) {
  156. return
  157. }
  158. ret = taskQueue[0]
  159. taskQueue = taskQueue[1:]
  160. return
  161. }
  162. func execTask(task *Task) {
  163. defer logging.Recover()
  164. args := make([]reflect.Value, len(task.Args))
  165. for i, v := range task.Args {
  166. if nil == v {
  167. args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
  168. } else {
  169. args[i] = reflect.ValueOf(v)
  170. }
  171. }
  172. task.Handler.Call(args)
  173. taskWaitGroup.Done()
  174. }