notify.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. json2 "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "time"
  9. "github.com/IceWhaleTech/CasaOS-Common/utils/logger"
  10. "github.com/IceWhaleTech/CasaOS/common"
  11. model2 "github.com/IceWhaleTech/CasaOS/model"
  12. "github.com/IceWhaleTech/CasaOS/model/notify"
  13. "github.com/IceWhaleTech/CasaOS/service/model"
  14. "github.com/IceWhaleTech/CasaOS/types"
  15. "go.uber.org/zap"
  16. "golang.org/x/sync/syncmap"
  17. socketio "github.com/googollee/go-socket.io"
  18. "github.com/gorilla/websocket"
  19. "gorm.io/gorm"
  20. )
  21. type NotifyServer interface {
  22. GetLog(id string) model.AppNotify
  23. AddLog(log model.AppNotify)
  24. UpdateLog(log model.AppNotify)
  25. UpdateLogByCustomID(log model.AppNotify)
  26. DelLog(id string)
  27. GetList(c int) (list []model.AppNotify)
  28. MarkRead(id string, state int)
  29. // SendText(m model.AppNotify)
  30. // SendUninstallAppBySocket(app notifyCommon.Application)
  31. SendFileOperateNotify(nowSend bool)
  32. //SendInstallAppBySocket(app notifyCommon.Application)
  33. SendNotify(name string, message map[string]interface{})
  34. SettingSystemTempData(message map[string]interface{})
  35. GetSystemTempMap() syncmap.Map
  36. }
  37. type notifyServer struct {
  38. db *gorm.DB
  39. SystemTempMap syncmap.Map //[string]interface{}
  40. }
  41. func (i *notifyServer) SettingSystemTempData(message map[string]interface{}) {
  42. for k, v := range message {
  43. i.SystemTempMap.Store(k, v)
  44. //i.SystemTempMap[k] = v
  45. }
  46. }
  47. func (i *notifyServer) SendNotify(name string, message map[string]interface{}) {
  48. msg := make(map[string]string)
  49. for k, v := range message {
  50. bt, _ := json.Marshal(v)
  51. msg[k] = string(bt)
  52. }
  53. response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, name, msg)
  54. if err != nil {
  55. logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
  56. return
  57. }
  58. if response.StatusCode() != http.StatusOK {
  59. logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
  60. }
  61. // SocketServer.BroadcastToRoom("/", "public", path, message)
  62. }
  63. // Send periodic broadcast messages
  64. func (i *notifyServer) SendFileOperateNotify(nowSend bool) {
  65. if nowSend {
  66. len := 0
  67. FileQueue.Range(func(k, v interface{}) bool {
  68. len++
  69. return true
  70. })
  71. model := notify.NotifyModel{}
  72. listMsg := make(map[string]interface{})
  73. if len == 0 {
  74. model.Data = []string{}
  75. listMsg["file_operate"] = model
  76. msg := make(map[string]string)
  77. for k, v := range listMsg {
  78. bt, _ := json.Marshal(v)
  79. msg[k] = string(bt)
  80. }
  81. response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
  82. if err != nil {
  83. logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
  84. }
  85. if response.StatusCode() != http.StatusOK {
  86. logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
  87. }
  88. return
  89. }
  90. model.State = "NORMAL"
  91. list := []notify.File{}
  92. OpStrArrbak := OpStrArr
  93. for _, v := range OpStrArrbak {
  94. tempItem, ok := FileQueue.Load(v)
  95. temp := tempItem.(model2.FileOperate)
  96. if !ok {
  97. continue
  98. }
  99. task := notify.File{}
  100. task.Id = v
  101. task.ProcessedSize = temp.ProcessedSize
  102. task.TotalSize = temp.TotalSize
  103. task.To = temp.To
  104. task.Type = temp.Type
  105. if task.ProcessedSize == 0 {
  106. task.Status = "STARTING"
  107. } else {
  108. task.Status = "PROCESSING"
  109. }
  110. if temp.Finished || temp.ProcessedSize >= temp.TotalSize {
  111. task.Finished = true
  112. task.Status = "FINISHED"
  113. FileQueue.Delete(v)
  114. OpStrArr = OpStrArr[1:]
  115. go ExecOpFile()
  116. list = append(list, task)
  117. continue
  118. }
  119. for _, v := range temp.Item {
  120. if v.Size != v.ProcessedSize {
  121. task.ProcessingPath = v.From
  122. break
  123. }
  124. }
  125. list = append(list, task)
  126. }
  127. model.Data = list
  128. listMsg["file_operate"] = model
  129. msg := make(map[string]string)
  130. for k, v := range listMsg {
  131. bt, _ := json.Marshal(v)
  132. msg[k] = string(bt)
  133. }
  134. response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
  135. if err != nil {
  136. logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
  137. }
  138. if response.StatusCode() != http.StatusOK {
  139. logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
  140. }
  141. } else {
  142. for {
  143. len := 0
  144. FileQueue.Range(func(k, v interface{}) bool {
  145. len++
  146. return true
  147. })
  148. if len == 0 {
  149. return
  150. }
  151. listMsg := make(map[string]interface{})
  152. model := notify.NotifyModel{}
  153. model.State = "NORMAL"
  154. list := []notify.File{}
  155. OpStrArrbak := OpStrArr
  156. for _, v := range OpStrArrbak {
  157. tempItem, ok := FileQueue.Load(v)
  158. temp := tempItem.(model2.FileOperate)
  159. if !ok {
  160. continue
  161. }
  162. task := notify.File{}
  163. task.Id = v
  164. task.ProcessedSize = temp.ProcessedSize
  165. task.TotalSize = temp.TotalSize
  166. task.To = temp.To
  167. task.Type = temp.Type
  168. if task.ProcessedSize == 0 {
  169. task.Status = "STARTING"
  170. } else {
  171. task.Status = "PROCESSING"
  172. }
  173. if temp.Finished || temp.ProcessedSize >= temp.TotalSize {
  174. task.Finished = true
  175. task.Status = "FINISHED"
  176. FileQueue.Delete(v)
  177. OpStrArr = OpStrArr[1:]
  178. go ExecOpFile()
  179. list = append(list, task)
  180. continue
  181. }
  182. for _, v := range temp.Item {
  183. if v.Size != v.ProcessedSize {
  184. task.ProcessingPath = v.From
  185. break
  186. }
  187. }
  188. list = append(list, task)
  189. }
  190. model.Data = list
  191. listMsg["file_operate"] = model
  192. msg := make(map[string]string)
  193. for k, v := range listMsg {
  194. bt, _ := json.Marshal(v)
  195. msg[k] = string(bt)
  196. }
  197. response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
  198. if err != nil {
  199. logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
  200. }
  201. if response.StatusCode() != http.StatusOK {
  202. logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
  203. }
  204. time.Sleep(time.Second * 3)
  205. }
  206. }
  207. }
  208. // func (i *notifyServer) SendInstallAppBySocket(app notifyCommon.Application) {
  209. // SocketServer.BroadcastToRoom("/", "public", "app_install", app)
  210. // }
  211. // func (i *notifyServer) SendUninstallAppBySocket(app notifyCommon.Application) {
  212. // SocketServer.BroadcastToRoom("/", "public", "app_uninstall", app)
  213. // }
  214. func (i *notifyServer) SSR() {
  215. server := socketio.NewServer(nil)
  216. fmt.Println(server)
  217. }
  218. func (i notifyServer) GetList(c int) (list []model.AppNotify) {
  219. i.db.Where("class = ?", c).Where(i.db.Where("state = ?", types.NOTIFY_DYNAMICE).Or("state = ?", types.NOTIFY_UNREAD)).Find(&list)
  220. return
  221. }
  222. func (i *notifyServer) AddLog(log model.AppNotify) {
  223. i.db.Create(&log)
  224. }
  225. func (i *notifyServer) UpdateLog(log model.AppNotify) {
  226. i.db.Save(&log)
  227. }
  228. func (i *notifyServer) UpdateLogByCustomID(log model.AppNotify) {
  229. if len(log.CustomId) == 0 {
  230. return
  231. }
  232. i.db.Model(&model.AppNotify{}).Select("*").Where("custom_id = ? ", log.CustomId).Updates(log)
  233. }
  234. func (i *notifyServer) GetLog(id string) model.AppNotify {
  235. var log model.AppNotify
  236. i.db.Where("custom_id = ? ", id).First(&log)
  237. return log
  238. }
  239. func (i *notifyServer) MarkRead(id string, state int) {
  240. if id == "0" {
  241. i.db.Model(&model.AppNotify{}).Where("1 = ?", 1).Update("state", state)
  242. return
  243. }
  244. i.db.Model(&model.AppNotify{}).Where("id = ? ", id).Update("state", state)
  245. }
  246. func (i *notifyServer) DelLog(id string) {
  247. var log model.AppNotify
  248. i.db.Where("custom_id = ?", id).Delete(&log)
  249. }
  250. func SendMeg() {
  251. // for {
  252. // mt, message, err := ws.ReadMessage()
  253. // if err != nil {
  254. // break
  255. // }
  256. // notify := model.NotifyMssage{}
  257. // json2.Unmarshal(message, &notify)
  258. // if notify.Type == "read" {
  259. // service.MyService.Notify().MarkRead(notify.Data, types.NOTIFY_READ)
  260. // }
  261. // if notify.Type == "app" {
  262. // go func(ws *websocket.Conn) {
  263. for {
  264. list := MyService.Notify().GetList(types.NOTIFY_APP)
  265. json, _ := json2.Marshal(list)
  266. if len(list) > 0 {
  267. var temp []*websocket.Conn
  268. for _, v := range WebSocketConns {
  269. err := v.WriteMessage(1, json)
  270. if err == nil {
  271. temp = append(temp, v)
  272. }
  273. }
  274. WebSocketConns = temp
  275. for _, v := range list {
  276. MyService.Notify().MarkRead(v.Id, types.NOTIFY_READ)
  277. }
  278. }
  279. if len(WebSocketConns) == 0 {
  280. SocketRun = false
  281. }
  282. time.Sleep(time.Second * 2)
  283. }
  284. // }(ws)
  285. // }
  286. // }
  287. }
  288. // func (i notifyServer) SendText(m model.AppNotify) {
  289. // list := []model.AppNotify{}
  290. // list = append(list, m)
  291. // json, _ := json2.Marshal(list)
  292. // var temp []*websocket.Conn
  293. // for _, v := range WebSocketConns {
  294. // err := v.WriteMessage(1, json)
  295. // if err == nil {
  296. // temp = append(temp, v)
  297. // }
  298. // }
  299. // WebSocketConns = temp
  300. // if len(WebSocketConns) == 0 {
  301. // SocketRun = false
  302. // }
  303. // }
  304. func (i *notifyServer) GetSystemTempMap() syncmap.Map {
  305. return i.SystemTempMap
  306. }
  307. func NewNotifyService(db *gorm.DB) NotifyServer {
  308. return &notifyServer{db: db, SystemTempMap: syncmap.Map{}}
  309. }