websocket.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package util
  17. import (
  18. "sync"
  19. "time"
  20. "github.com/88250/gulu"
  21. "github.com/olahol/melody"
  22. "github.com/siyuan-note/eventbus"
  23. )
  24. var (
  25. WebSocketServer = melody.New()
  26. // map[string]map[string]*melody.Session{}
  27. sessions = sync.Map{} // {appId, {sessionId, session}}
  28. )
  29. // BroadcastByType 广播所有实例上 typ 类型的会话。
  30. func BroadcastByType(typ, cmd string, code int, msg string, data interface{}) {
  31. typeSessions := SessionsByType(typ)
  32. for _, sess := range typeSessions {
  33. event := NewResult()
  34. event.Cmd = cmd
  35. event.Code = code
  36. event.Msg = msg
  37. event.Data = data
  38. sess.Write(event.Bytes())
  39. }
  40. }
  41. func SessionsByType(typ string) (ret []*melody.Session) {
  42. ret = []*melody.Session{}
  43. sessions.Range(func(key, value interface{}) bool {
  44. appSessions := value.(*sync.Map)
  45. appSessions.Range(func(key, value interface{}) bool {
  46. session := value.(*melody.Session)
  47. if t, ok := session.Get("type"); ok && typ == t {
  48. ret = append(ret, session)
  49. }
  50. return true
  51. })
  52. return true
  53. })
  54. return
  55. }
  56. func AddPushChan(session *melody.Session) {
  57. appID := session.Request.URL.Query().Get("app")
  58. session.Set("app", appID)
  59. id := session.Request.URL.Query().Get("id")
  60. session.Set("id", id)
  61. typ := session.Request.URL.Query().Get("type")
  62. session.Set("type", typ)
  63. if appSessions, ok := sessions.Load(appID); !ok {
  64. appSess := &sync.Map{}
  65. appSess.Store(id, session)
  66. sessions.Store(appID, appSess)
  67. } else {
  68. (appSessions.(*sync.Map)).Store(id, session)
  69. }
  70. }
  71. func RemovePushChan(session *melody.Session) {
  72. app, _ := session.Get("app")
  73. id, _ := session.Get("id")
  74. if nil == app || nil == id {
  75. return
  76. }
  77. appSess, _ := sessions.Load(app)
  78. if nil != appSess {
  79. appSessions := appSess.(*sync.Map)
  80. appSessions.Delete(id)
  81. if 1 > lenOfSyncMap(appSessions) {
  82. sessions.Delete(app)
  83. }
  84. }
  85. }
  86. func lenOfSyncMap(m *sync.Map) (ret int) {
  87. m.Range(func(key, value interface{}) bool {
  88. ret++
  89. return true
  90. })
  91. return
  92. }
  93. func ClosePushChan(id string) {
  94. sessions.Range(func(key, value interface{}) bool {
  95. appSessions := value.(*sync.Map)
  96. appSessions.Range(func(key, value interface{}) bool {
  97. session := value.(*melody.Session)
  98. if sid, _ := session.Get("id"); sid == id {
  99. session.CloseWithMsg([]byte(" close websocket"))
  100. RemovePushChan(session)
  101. }
  102. return true
  103. })
  104. return true
  105. })
  106. }
  107. func ReloadUIResetScroll() {
  108. BroadcastByType("main", "reloadui", 0, "", map[string]interface{}{"resetScroll": true})
  109. }
  110. func ReloadUI() {
  111. BroadcastByType("main", "reloadui", 0, "", nil)
  112. }
  113. func PushTxErr(msg string, code int, data interface{}) {
  114. BroadcastByType("main", "txerr", code, msg, data)
  115. }
  116. func PushUpdateMsg(msgId string, msg string, timeout int) {
  117. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  118. return
  119. }
  120. func PushMsg(msg string, timeout int) (msgId string) {
  121. msgId = gulu.Rand.String(7)
  122. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  123. return
  124. }
  125. func PushErrMsg(msg string, timeout int) (msgId string) {
  126. msgId = gulu.Rand.String(7)
  127. BroadcastByType("main", "msg", -1, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  128. return
  129. }
  130. func PushStatusBar(msg string) {
  131. msg += " (" + time.Now().Format("2006-01-02 15:04:05") + ")"
  132. BroadcastByType("main", "statusbar", 0, msg, nil)
  133. }
  134. func PushBackgroundTask(data map[string]interface{}) {
  135. BroadcastByType("main", "backgroundtask", 0, "", data)
  136. }
  137. func PushReloadFiletree() {
  138. BroadcastByType("filetree", "reloadFiletree", 0, "", nil)
  139. }
  140. type BlockStatResult struct {
  141. RuneCount int `json:"runeCount"`
  142. WordCount int `json:"wordCount"`
  143. LinkCount int `json:"linkCount"`
  144. ImageCount int `json:"imageCount"`
  145. RefCount int `json:"refCount"`
  146. }
  147. func ContextPushMsg(context map[string]interface{}, msg string) {
  148. switch context[eventbus.CtxPushMsg].(int) {
  149. case eventbus.CtxPushMsgToNone:
  150. break
  151. case eventbus.CtxPushMsgToProgress:
  152. PushEndlessProgress(msg)
  153. case eventbus.CtxPushMsgToStatusBar:
  154. PushStatusBar(msg)
  155. case eventbus.CtxPushMsgToStatusBarAndProgress:
  156. PushStatusBar(msg)
  157. PushEndlessProgress(msg)
  158. }
  159. }
  160. const (
  161. PushProgressCodeProgressed = 0 // 有进度
  162. PushProgressCodeEndless = 1 // 无进度
  163. PushProgressCodeEnd = 2 // 关闭进度
  164. )
  165. func PushClearAllMsg() {
  166. ClearPushProgress(100)
  167. PushClearMsg("")
  168. }
  169. func ClearPushProgress(total int) {
  170. PushProgress(PushProgressCodeEnd, total, total, "")
  171. }
  172. func PushEndlessProgress(msg string) {
  173. PushProgress(PushProgressCodeEndless, 1, 1, msg)
  174. }
  175. func PushProgress(code, current, total int, msg string) {
  176. BroadcastByType("main", "progress", code, msg, map[string]interface{}{
  177. "current": current,
  178. "total": total,
  179. })
  180. }
  181. // PushClearMsg 会清空指定消息。
  182. func PushClearMsg(msgId string) {
  183. BroadcastByType("main", "cmsg", 0, "", map[string]interface{}{"id": msgId})
  184. }
  185. // PushClearProgress 取消进度遮罩。
  186. func PushClearProgress() {
  187. BroadcastByType("main", "cprogress", 0, "", nil)
  188. }
  189. func PushReloadAttrView(avID string) {
  190. BroadcastByType("protyle", "refreshAttributeView", 0, "", map[string]interface{}{"id": avID})
  191. }
  192. func PushReloadDoc(rootID string) {
  193. BroadcastByType("main", "reloaddoc", 0, "", rootID)
  194. }
  195. func PushSaveDoc(rootID, typ string, sources interface{}) {
  196. evt := NewCmdResult("savedoc", 0, PushModeBroadcast)
  197. evt.Data = map[string]interface{}{
  198. "rootID": rootID,
  199. "type": typ,
  200. "sources": sources,
  201. }
  202. PushEvent(evt)
  203. }
  204. func PushProtyleReload(rootID string) {
  205. BroadcastByType("protyle", "reload", 0, "", rootID)
  206. }
  207. func PushProtyleLoading(rootID, msg string) {
  208. BroadcastByType("protyle", "addLoading", 0, msg, rootID)
  209. }
  210. func PushDownloadProgress(id string, percent float32) {
  211. evt := NewCmdResult("downloadProgress", 0, PushModeBroadcast)
  212. evt.Data = map[string]interface{}{
  213. "id": id,
  214. "percent": percent,
  215. }
  216. PushEvent(evt)
  217. }
  218. func PushEvent(event *Result) {
  219. msg := event.Bytes()
  220. mode := event.PushMode
  221. switch mode {
  222. case PushModeBroadcast:
  223. Broadcast(msg)
  224. case PushModeSingleSelf:
  225. single(msg, event.AppId, event.SessionId)
  226. case PushModeBroadcastExcludeSelf:
  227. broadcastOthers(msg, event.SessionId)
  228. case PushModeBroadcastExcludeSelfApp:
  229. broadcastOtherApps(msg, event.AppId)
  230. case PushModeBroadcastApp:
  231. broadcastApp(msg, event.AppId)
  232. case PushModeBroadcastMainExcludeSelfApp:
  233. broadcastOtherAppMains(msg, event.AppId)
  234. }
  235. }
  236. func single(msg []byte, appId, sid string) {
  237. sessions.Range(func(key, value interface{}) bool {
  238. appSessions := value.(*sync.Map)
  239. if key != appId {
  240. return true
  241. }
  242. appSessions.Range(func(key, value interface{}) bool {
  243. session := value.(*melody.Session)
  244. if id, _ := session.Get("id"); id == sid {
  245. session.Write(msg)
  246. }
  247. return true
  248. })
  249. return true
  250. })
  251. }
  252. func Broadcast(msg []byte) {
  253. sessions.Range(func(key, value interface{}) bool {
  254. appSessions := value.(*sync.Map)
  255. appSessions.Range(func(key, value interface{}) bool {
  256. session := value.(*melody.Session)
  257. session.Write(msg)
  258. return true
  259. })
  260. return true
  261. })
  262. }
  263. func broadcastOtherApps(msg []byte, excludeApp string) {
  264. sessions.Range(func(key, value interface{}) bool {
  265. appSessions := value.(*sync.Map)
  266. appSessions.Range(func(key, value interface{}) bool {
  267. session := value.(*melody.Session)
  268. if app, _ := session.Get("app"); app == excludeApp {
  269. return true
  270. }
  271. session.Write(msg)
  272. return true
  273. })
  274. return true
  275. })
  276. }
  277. func broadcastOtherAppMains(msg []byte, excludeApp string) {
  278. sessions.Range(func(key, value interface{}) bool {
  279. appSessions := value.(*sync.Map)
  280. appSessions.Range(func(key, value interface{}) bool {
  281. session := value.(*melody.Session)
  282. if app, _ := session.Get("app"); app == excludeApp {
  283. return true
  284. }
  285. if t, ok := session.Get("type"); ok && "main" != t {
  286. return true
  287. }
  288. session.Write(msg)
  289. return true
  290. })
  291. return true
  292. })
  293. }
  294. func broadcastApp(msg []byte, app string) {
  295. sessions.Range(func(key, value interface{}) bool {
  296. appSessions := value.(*sync.Map)
  297. appSessions.Range(func(key, value interface{}) bool {
  298. session := value.(*melody.Session)
  299. if sessionApp, _ := session.Get("app"); sessionApp != app {
  300. return true
  301. }
  302. session.Write(msg)
  303. return true
  304. })
  305. return true
  306. })
  307. }
  308. func broadcastOthers(msg []byte, excludeSID string) {
  309. sessions.Range(func(key, value interface{}) bool {
  310. appSessions := value.(*sync.Map)
  311. appSessions.Range(func(key, value interface{}) bool {
  312. session := value.(*melody.Session)
  313. if id, _ := session.Get("id"); id == excludeSID {
  314. return true
  315. }
  316. session.Write(msg)
  317. return true
  318. })
  319. return true
  320. })
  321. }
  322. func CountSessions() (ret int) {
  323. sessions.Range(func(key, value interface{}) bool {
  324. ret++
  325. return true
  326. })
  327. return
  328. }