websocket.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // SiYuan - Build Your Eternal Digital Garden
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package util
  17. import (
  18. "sync"
  19. "github.com/88250/gulu"
  20. "github.com/88250/melody"
  21. )
  22. var (
  23. WebSocketServer = melody.New()
  24. // map[string]map[string]*melody.Session{}
  25. sessions = sync.Map{} // {appId, {sessionId, session}}
  26. )
  27. // BroadcastByType 广播所有实例上 typ 类型的会话。
  28. func BroadcastByType(typ, cmd string, code int, msg string, data interface{}) {
  29. typeSessions := SessionsByType(typ)
  30. for _, sess := range typeSessions {
  31. event := NewResult()
  32. event.Cmd = cmd
  33. event.Code = code
  34. event.Msg = msg
  35. event.Data = data
  36. sess.Write(event.Bytes())
  37. }
  38. }
  39. func SessionsByType(typ string) (ret []*melody.Session) {
  40. ret = []*melody.Session{}
  41. sessions.Range(func(key, value interface{}) bool {
  42. appSessions := value.(*sync.Map)
  43. appSessions.Range(func(key, value interface{}) bool {
  44. session := value.(*melody.Session)
  45. if t, ok := session.Get("type"); ok && typ == t {
  46. ret = append(ret, session)
  47. }
  48. return true
  49. })
  50. return true
  51. })
  52. return
  53. }
  54. func AddPushChan(session *melody.Session) {
  55. appID := session.Request.URL.Query().Get("app")
  56. session.Set("app", appID)
  57. id := session.Request.URL.Query().Get("id")
  58. session.Set("id", id)
  59. typ := session.Request.URL.Query().Get("type")
  60. session.Set("type", typ)
  61. if appSessions, ok := sessions.Load(appID); !ok {
  62. appSess := &sync.Map{}
  63. appSess.Store(id, session)
  64. sessions.Store(appID, appSess)
  65. } else {
  66. (appSessions.(*sync.Map)).Store(id, session)
  67. }
  68. }
  69. func RemovePushChan(session *melody.Session) {
  70. app, _ := session.Get("app")
  71. id, _ := session.Get("id")
  72. if nil == app || nil == id {
  73. return
  74. }
  75. appSess, _ := sessions.Load(app)
  76. if nil != appSess {
  77. appSessions := appSess.(*sync.Map)
  78. appSessions.Delete(id)
  79. if 1 > lenOfSyncMap(appSessions) {
  80. sessions.Delete(app)
  81. }
  82. }
  83. }
  84. func lenOfSyncMap(m *sync.Map) (ret int) {
  85. m.Range(func(key, value interface{}) bool {
  86. ret++
  87. return true
  88. })
  89. return
  90. }
  91. func ClosePushChan(id string) {
  92. sessions.Range(func(key, value interface{}) bool {
  93. appSessions := value.(*sync.Map)
  94. appSessions.Range(func(key, value interface{}) bool {
  95. session := value.(*melody.Session)
  96. if sid, _ := session.Get("id"); sid == id {
  97. session.CloseWithMsg([]byte(" close websocket"))
  98. RemovePushChan(session)
  99. }
  100. return true
  101. })
  102. return true
  103. })
  104. }
  105. func ReloadUI() {
  106. evt := NewCmdResult("reloadui", 0, PushModeBroadcast, 0)
  107. PushEvent(evt)
  108. }
  109. func PushTxErr(msg string, code int, data interface{}) {
  110. BroadcastByType("main", "txerr", code, msg, data)
  111. }
  112. func PushUpdateMsg(msgId string, msg string, timeout int) {
  113. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  114. return
  115. }
  116. func PushMsg(msg string, timeout int) (msgId string) {
  117. msgId = gulu.Rand.String(7)
  118. BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  119. return
  120. }
  121. func PushErrMsg(msg string, timeout int) (msgId string) {
  122. msgId = gulu.Rand.String(7)
  123. BroadcastByType("main", "msg", -1, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout})
  124. return
  125. }
  126. func PushStatusBar(msg string) {
  127. BroadcastByType("main", "statusbar", 0, msg, nil)
  128. }
  129. const (
  130. PushProgressCodeProgressed = 0 // 有进度
  131. PushProgressCodeEndless = 1 // 无进度
  132. PushProgressCodeEnd = 2 // 关闭进度
  133. )
  134. func ClearPushProgress(total int) {
  135. PushProgress(PushProgressCodeEnd, total, total, "")
  136. }
  137. func PushEndlessProgress(msg string) {
  138. PushProgress(PushProgressCodeEndless, 1, 1, msg)
  139. }
  140. func PushProgress(code, current, total int, msg string) {
  141. BroadcastByType("main", "progress", code, msg, map[string]interface{}{
  142. "current": current,
  143. "total": total,
  144. })
  145. }
  146. // PushClearMsg 会清空指定消息。
  147. func PushClearMsg(msgId string) {
  148. BroadcastByType("main", "cmsg", 0, "", map[string]interface{}{"id": msgId})
  149. }
  150. // PushClearProgress 取消进度遮罩。
  151. func PushClearProgress() {
  152. BroadcastByType("main", "cprogress", 0, "", nil)
  153. }
  154. func PushDownloadProgress(id string, percent float32) {
  155. evt := NewCmdResult("downloadProgress", 0, PushModeBroadcast, 0)
  156. evt.Data = map[string]interface{}{
  157. "id": id,
  158. "percent": percent,
  159. }
  160. PushEvent(evt)
  161. }
  162. func PushEvent(event *Result) {
  163. msg := event.Bytes()
  164. mode := event.PushMode
  165. if "reload" == event.Cmd {
  166. mode = event.ReloadPushMode
  167. }
  168. switch mode {
  169. case PushModeBroadcast:
  170. Broadcast(msg)
  171. case PushModeSingleSelf:
  172. single(msg, event.AppId, event.SessionId)
  173. case PushModeBroadcastExcludeSelf:
  174. broadcastOthers(msg, event.SessionId)
  175. case PushModeBroadcastExcludeSelfApp:
  176. broadcastOtherApps(msg, event.AppId)
  177. case PushModeNone:
  178. }
  179. }
  180. func single(msg []byte, appId, sid string) {
  181. sessions.Range(func(key, value interface{}) bool {
  182. appSessions := value.(*sync.Map)
  183. if key != appId {
  184. return true
  185. }
  186. appSessions.Range(func(key, value interface{}) bool {
  187. session := value.(*melody.Session)
  188. if id, _ := session.Get("id"); id == sid {
  189. session.Write(msg)
  190. }
  191. return true
  192. })
  193. return true
  194. })
  195. }
  196. func Broadcast(msg []byte) {
  197. sessions.Range(func(key, value interface{}) bool {
  198. appSessions := value.(*sync.Map)
  199. appSessions.Range(func(key, value interface{}) bool {
  200. session := value.(*melody.Session)
  201. session.Write(msg)
  202. return true
  203. })
  204. return true
  205. })
  206. }
  207. func broadcastOtherApps(msg []byte, excludeApp string) {
  208. sessions.Range(func(key, value interface{}) bool {
  209. appSessions := value.(*sync.Map)
  210. appSessions.Range(func(key, value interface{}) bool {
  211. session := value.(*melody.Session)
  212. if app, _ := session.Get("app"); app == excludeApp {
  213. return true
  214. }
  215. session.Write(msg)
  216. return true
  217. })
  218. return true
  219. })
  220. }
  221. func broadcastOthers(msg []byte, excludeSID string) {
  222. sessions.Range(func(key, value interface{}) bool {
  223. appSessions := value.(*sync.Map)
  224. appSessions.Range(func(key, value interface{}) bool {
  225. session := value.(*melody.Session)
  226. if id, _ := session.Get("id"); id == excludeSID {
  227. return true
  228. }
  229. session.Write(msg)
  230. return true
  231. })
  232. return true
  233. })
  234. }
  235. func CountSessions() (ret int) {
  236. sessions.Range(func(key, value interface{}) bool {
  237. ret++
  238. return true
  239. })
  240. return
  241. }