websocket.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package util
  17. import (
  18. "sync"
  19. "time"
  20. "github.com/88250/gulu"
  21. "github.com/olahol/melody"
  22. "github.com/siyuan-note/eventbus"
  23. )
  24. var (
  25. WebSocketServer = melody.New()
  26. // map[string]map[string]*melody.Session{}
  27. sessions = sync.Map{} // {appId, {sessionId, session}}
  28. )
  29. func BroadcastByTypeAndApp(typ, app, cmd string, code int, msg string, data interface{}) {
  30. appSessions, ok := sessions.Load(app)
  31. if !ok {
  32. return
  33. }
  34. appSessions.(*sync.Map).Range(func(key, value interface{}) bool {
  35. session := value.(*melody.Session)
  36. if t, ok := session.Get("type"); ok && typ == t {
  37. event := NewResult()
  38. event.Cmd = cmd
  39. event.Code = code
  40. event.Msg = msg
  41. event.Data = data
  42. session.Write(event.Bytes())
  43. }
  44. return true
  45. })
  46. }
  47. // BroadcastByType 广播所有实例上 typ 类型的会话。
  48. func BroadcastByType(typ, cmd string, code int, msg string, data interface{}) {
  49. typeSessions := SessionsByType(typ)
  50. for _, sess := range typeSessions {
  51. event := NewResult()
  52. event.Cmd = cmd
  53. event.Code = code
  54. event.Msg = msg
  55. event.Data = data
  56. sess.Write(event.Bytes())
  57. }
  58. }
  59. func SessionsByType(typ string) (ret []*melody.Session) {
  60. ret = []*melody.Session{}
  61. sessions.Range(func(key, value interface{}) bool {
  62. appSessions := value.(*sync.Map)
  63. appSessions.Range(func(key, value interface{}) bool {
  64. session := value.(*melody.Session)
  65. if t, ok := session.Get("type"); ok && typ == t {
  66. ret = append(ret, session)
  67. }
  68. return true
  69. })
  70. return true
  71. })
  72. return
  73. }
  74. func AddPushChan(session *melody.Session) {
  75. appID := session.Request.URL.Query().Get("app")
  76. session.Set("app", appID)
  77. id := session.Request.URL.Query().Get("id")
  78. session.Set("id", id)
  79. typ := session.Request.URL.Query().Get("type")
  80. session.Set("type", typ)
  81. if appSessions, ok := sessions.Load(appID); !ok {
  82. appSess := &sync.Map{}
  83. appSess.Store(id, session)
  84. sessions.Store(appID, appSess)
  85. } else {
  86. (appSessions.(*sync.Map)).Store(id, session)
  87. }
  88. }
  89. func RemovePushChan(session *melody.Session) {
  90. app, _ := session.Get("app")
  91. id, _ := session.Get("id")
  92. if nil == app || nil == id {
  93. return
  94. }
  95. appSess, _ := sessions.Load(app)
  96. if nil != appSess {
  97. appSessions := appSess.(*sync.Map)
  98. appSessions.Delete(id)
  99. if 1 > lenOfSyncMap(appSessions) {
  100. sessions.Delete(app)
  101. }
  102. }
  103. }
  104. func lenOfSyncMap(m *sync.Map) (ret int) {
  105. m.Range(func(key, value interface{}) bool {
  106. ret++
  107. return true
  108. })
  109. return
  110. }
  111. func ClosePushChan(id string) {
  112. sessions.Range(func(key, value interface{}) bool {
  113. appSessions := value.(*sync.Map)
  114. appSessions.Range(func(key, value interface{}) bool {
  115. session := value.(*melody.Session)
  116. if sid, _ := session.Get("id"); sid == id {
  117. session.CloseWithMsg([]byte(" close websocket"))
  118. RemovePushChan(session)
  119. }
  120. return true
  121. })
  122. return true
  123. })
  124. }
  125. func ReloadUIResetScroll() {
  126. BroadcastByType("main", "reloadui", 0, "", map[string]interface{}{"resetScroll": true})
  127. }
  128. func ReloadUI() {
  129. BroadcastByType("main", "reloadui", 0, "", nil)
  130. }
  131. func PushTxErr(msg string, code int, data interface{}) {
  132. BroadcastByType("main", "txerr", code, msg, data)
  133. }
  134. func PushUpdateMsg(msgId string, msg string, timeout int) {
  135. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  136. return
  137. }
  138. func PushMsg(msg string, timeout int) (msgId string) {
  139. msgId = gulu.Rand.String(7)
  140. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  141. return
  142. }
  143. func PushErrMsg(msg string, timeout int) (msgId string) {
  144. msgId = gulu.Rand.String(7)
  145. BroadcastByType("main", "msg", -1, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  146. return
  147. }
  148. func PushStatusBar(msg string) {
  149. msg += " (" + time.Now().Format("2006-01-02 15:04:05") + ")"
  150. BroadcastByType("main", "statusbar", 0, msg, nil)
  151. }
  152. func PushBackgroundTask(data map[string]interface{}) {
  153. BroadcastByType("main", "backgroundtask", 0, "", data)
  154. }
  155. func PushReloadFiletree() {
  156. BroadcastByType("filetree", "reloadFiletree", 0, "", nil)
  157. }
  158. type BlockStatResult struct {
  159. RuneCount int `json:"runeCount"`
  160. WordCount int `json:"wordCount"`
  161. LinkCount int `json:"linkCount"`
  162. ImageCount int `json:"imageCount"`
  163. RefCount int `json:"refCount"`
  164. BlockCount int `json:"blockCount"`
  165. }
  166. func ContextPushMsg(context map[string]interface{}, msg string) {
  167. switch context[eventbus.CtxPushMsg].(int) {
  168. case eventbus.CtxPushMsgToNone:
  169. break
  170. case eventbus.CtxPushMsgToProgress:
  171. PushEndlessProgress(msg)
  172. case eventbus.CtxPushMsgToStatusBar:
  173. PushStatusBar(msg)
  174. case eventbus.CtxPushMsgToStatusBarAndProgress:
  175. PushStatusBar(msg)
  176. PushEndlessProgress(msg)
  177. }
  178. }
  179. const (
  180. PushProgressCodeProgressed = 0 // 有进度
  181. PushProgressCodeEndless = 1 // 无进度
  182. PushProgressCodeEnd = 2 // 关闭进度
  183. )
  184. func PushClearAllMsg() {
  185. ClearPushProgress(100)
  186. PushClearMsg("")
  187. }
  188. func ClearPushProgress(total int) {
  189. PushProgress(PushProgressCodeEnd, total, total, "")
  190. }
  191. func PushEndlessProgress(msg string) {
  192. PushProgress(PushProgressCodeEndless, 1, 1, msg)
  193. }
  194. func PushProgress(code, current, total int, msg string) {
  195. BroadcastByType("main", "progress", code, msg, map[string]interface{}{
  196. "current": current,
  197. "total": total,
  198. })
  199. }
  200. // PushClearMsg 会清空指定消息。
  201. func PushClearMsg(msgId string) {
  202. BroadcastByType("main", "cmsg", 0, "", map[string]interface{}{"id": msgId})
  203. }
  204. // PushClearProgress 取消进度遮罩。
  205. func PushClearProgress() {
  206. BroadcastByType("main", "cprogress", 0, "", nil)
  207. }
  208. func PushReloadDoc(rootID string) {
  209. BroadcastByType("main", "reloaddoc", 0, "", rootID)
  210. }
  211. func PushSaveDoc(rootID, typ string, sources interface{}) {
  212. evt := NewCmdResult("savedoc", 0, PushModeBroadcast)
  213. evt.Data = map[string]interface{}{
  214. "rootID": rootID,
  215. "type": typ,
  216. "sources": sources,
  217. }
  218. PushEvent(evt)
  219. }
  220. func PushReloadDocInfo(docInfo map[string]any) {
  221. BroadcastByType("filetree", "reloadDocInfo", 0, "", docInfo)
  222. }
  223. func PushReloadProtyle(rootID string) {
  224. BroadcastByType("protyle", "reload", 0, "", rootID)
  225. }
  226. func PushSetRefDynamicText(rootID, blockID, defBlockID, refText string) {
  227. BroadcastByType("main", "setRefDynamicText", 0, "", map[string]interface{}{"rootID": rootID, "blockID": blockID, "defBlockID": defBlockID, "refText": refText})
  228. }
  229. func PushSetDefRefCount(rootID, blockID string, refIDs []string, refCount, rootRefCount int) {
  230. BroadcastByType("main", "setDefRefCount", 0, "", map[string]interface{}{"rootID": rootID, "blockID": blockID, "refCount": refCount, "rootRefCount": rootRefCount, "refIDs": refIDs})
  231. }
  232. func PushProtyleLoading(rootID, msg string) {
  233. BroadcastByType("protyle", "addLoading", 0, msg, rootID)
  234. }
  235. func PushReloadEmojiConf() {
  236. BroadcastByType("main", "reloadEmojiConf", 0, "", nil)
  237. }
  238. func PushDownloadProgress(id string, percent float32) {
  239. evt := NewCmdResult("downloadProgress", 0, PushModeBroadcast)
  240. evt.Data = map[string]interface{}{
  241. "id": id,
  242. "percent": percent,
  243. }
  244. PushEvent(evt)
  245. }
  246. func PushEvent(event *Result) {
  247. msg := event.Bytes()
  248. mode := event.PushMode
  249. switch mode {
  250. case PushModeBroadcast:
  251. Broadcast(msg)
  252. case PushModeSingleSelf:
  253. single(msg, event.AppId, event.SessionId)
  254. case PushModeBroadcastExcludeSelf:
  255. broadcastOthers(msg, event.SessionId)
  256. case PushModeBroadcastExcludeSelfApp:
  257. broadcastOtherApps(msg, event.AppId)
  258. case PushModeBroadcastApp:
  259. broadcastApp(msg, event.AppId)
  260. case PushModeBroadcastMainExcludeSelfApp:
  261. broadcastOtherAppMains(msg, event.AppId)
  262. }
  263. }
  264. func single(msg []byte, appId, sid string) {
  265. sessions.Range(func(key, value interface{}) bool {
  266. appSessions := value.(*sync.Map)
  267. if key != appId {
  268. return true
  269. }
  270. appSessions.Range(func(key, value interface{}) bool {
  271. session := value.(*melody.Session)
  272. if id, _ := session.Get("id"); id == sid {
  273. session.Write(msg)
  274. }
  275. return true
  276. })
  277. return true
  278. })
  279. }
  280. func Broadcast(msg []byte) {
  281. sessions.Range(func(key, value interface{}) bool {
  282. appSessions := value.(*sync.Map)
  283. appSessions.Range(func(key, value interface{}) bool {
  284. session := value.(*melody.Session)
  285. session.Write(msg)
  286. return true
  287. })
  288. return true
  289. })
  290. }
  291. func broadcastOtherApps(msg []byte, excludeApp string) {
  292. sessions.Range(func(key, value interface{}) bool {
  293. appSessions := value.(*sync.Map)
  294. appSessions.Range(func(key, value interface{}) bool {
  295. session := value.(*melody.Session)
  296. if app, _ := session.Get("app"); app == excludeApp {
  297. return true
  298. }
  299. session.Write(msg)
  300. return true
  301. })
  302. return true
  303. })
  304. }
  305. func broadcastOtherAppMains(msg []byte, excludeApp string) {
  306. sessions.Range(func(key, value interface{}) bool {
  307. appSessions := value.(*sync.Map)
  308. appSessions.Range(func(key, value interface{}) bool {
  309. session := value.(*melody.Session)
  310. if app, _ := session.Get("app"); app == excludeApp {
  311. return true
  312. }
  313. if t, ok := session.Get("type"); ok && "main" != t {
  314. return true
  315. }
  316. session.Write(msg)
  317. return true
  318. })
  319. return true
  320. })
  321. }
  322. func broadcastApp(msg []byte, app string) {
  323. sessions.Range(func(key, value interface{}) bool {
  324. appSessions := value.(*sync.Map)
  325. appSessions.Range(func(key, value interface{}) bool {
  326. session := value.(*melody.Session)
  327. if sessionApp, _ := session.Get("app"); sessionApp != app {
  328. return true
  329. }
  330. session.Write(msg)
  331. return true
  332. })
  333. return true
  334. })
  335. }
  336. func broadcastOthers(msg []byte, excludeSID string) {
  337. sessions.Range(func(key, value interface{}) bool {
  338. appSessions := value.(*sync.Map)
  339. appSessions.Range(func(key, value interface{}) bool {
  340. session := value.(*melody.Session)
  341. if id, _ := session.Get("id"); id == excludeSID {
  342. return true
  343. }
  344. session.Write(msg)
  345. return true
  346. })
  347. return true
  348. })
  349. }
  350. func CountSessions() (ret int) {
  351. sessions.Range(func(key, value interface{}) bool {
  352. ret++
  353. return true
  354. })
  355. return
  356. }