123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- package service
- import (
- "context"
- "encoding/json"
- json2 "encoding/json"
- "fmt"
- "net/http"
- "time"
- "github.com/IceWhaleTech/CasaOS-Common/utils/logger"
- "github.com/IceWhaleTech/CasaOS/common"
- model2 "github.com/IceWhaleTech/CasaOS/model"
- "github.com/IceWhaleTech/CasaOS/model/notify"
- "github.com/IceWhaleTech/CasaOS/service/model"
- "github.com/IceWhaleTech/CasaOS/types"
- "go.uber.org/zap"
- "golang.org/x/sync/syncmap"
- socketio "github.com/googollee/go-socket.io"
- "github.com/gorilla/websocket"
- "gorm.io/gorm"
- )
- type NotifyServer interface {
- GetLog(id string) model.AppNotify
- AddLog(log model.AppNotify)
- UpdateLog(log model.AppNotify)
- UpdateLogByCustomID(log model.AppNotify)
- DelLog(id string)
- GetList(c int) (list []model.AppNotify)
- MarkRead(id string, state int)
- // SendText(m model.AppNotify)
- // SendUninstallAppBySocket(app notifyCommon.Application)
- SendFileOperateNotify(nowSend bool)
- //SendInstallAppBySocket(app notifyCommon.Application)
- SendNotify(name string, message map[string]interface{})
- SettingSystemTempData(message map[string]interface{})
- GetSystemTempMap() syncmap.Map
- }
- type notifyServer struct {
- db *gorm.DB
- SystemTempMap syncmap.Map //[string]interface{}
- }
- func (i *notifyServer) SettingSystemTempData(message map[string]interface{}) {
- for k, v := range message {
- i.SystemTempMap.Store(k, v)
- //i.SystemTempMap[k] = v
- }
- }
- func (i *notifyServer) SendNotify(name string, message map[string]interface{}) {
- msg := make(map[string]string)
- for k, v := range message {
- bt, _ := json.Marshal(v)
- msg[k] = string(bt)
- }
- response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, name, msg)
- if err != nil {
- logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
- return
- }
- if response.StatusCode() != http.StatusOK {
- logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
- }
- // SocketServer.BroadcastToRoom("/", "public", path, message)
- }
- // Send periodic broadcast messages
- func (i *notifyServer) SendFileOperateNotify(nowSend bool) {
- if nowSend {
- len := 0
- FileQueue.Range(func(k, v interface{}) bool {
- len++
- return true
- })
- model := notify.NotifyModel{}
- listMsg := make(map[string]interface{})
- if len == 0 {
- model.Data = []string{}
- listMsg["file_operate"] = model
- msg := make(map[string]string)
- for k, v := range listMsg {
- bt, _ := json.Marshal(v)
- msg[k] = string(bt)
- }
- response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
- if err != nil {
- logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
- }
- if response.StatusCode() != http.StatusOK {
- logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
- }
- return
- }
- model.State = "NORMAL"
- list := []notify.File{}
- OpStrArrbak := OpStrArr
- for _, v := range OpStrArrbak {
- tempItem, ok := FileQueue.Load(v)
- temp := tempItem.(model2.FileOperate)
- if !ok {
- continue
- }
- task := notify.File{}
- task.Id = v
- task.ProcessedSize = temp.ProcessedSize
- task.TotalSize = temp.TotalSize
- task.To = temp.To
- task.Type = temp.Type
- if task.ProcessedSize == 0 {
- task.Status = "STARTING"
- } else {
- task.Status = "PROCESSING"
- }
- if temp.Finished || temp.ProcessedSize >= temp.TotalSize {
- task.Finished = true
- task.Status = "FINISHED"
- FileQueue.Delete(v)
- OpStrArr = OpStrArr[1:]
- go ExecOpFile()
- list = append(list, task)
- continue
- }
- for _, v := range temp.Item {
- if v.Size != v.ProcessedSize {
- task.ProcessingPath = v.From
- break
- }
- }
- list = append(list, task)
- }
- model.Data = list
- listMsg["file_operate"] = model
- msg := make(map[string]string)
- for k, v := range listMsg {
- bt, _ := json.Marshal(v)
- msg[k] = string(bt)
- }
- response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
- if err != nil {
- logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
- }
- if response.StatusCode() != http.StatusOK {
- logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
- }
- } else {
- for {
- len := 0
- FileQueue.Range(func(k, v interface{}) bool {
- len++
- return true
- })
- if len == 0 {
- return
- }
- listMsg := make(map[string]interface{})
- model := notify.NotifyModel{}
- model.State = "NORMAL"
- list := []notify.File{}
- OpStrArrbak := OpStrArr
- for _, v := range OpStrArrbak {
- tempItem, ok := FileQueue.Load(v)
- temp := tempItem.(model2.FileOperate)
- if !ok {
- continue
- }
- task := notify.File{}
- task.Id = v
- task.ProcessedSize = temp.ProcessedSize
- task.TotalSize = temp.TotalSize
- task.To = temp.To
- task.Type = temp.Type
- if task.ProcessedSize == 0 {
- task.Status = "STARTING"
- } else {
- task.Status = "PROCESSING"
- }
- if temp.Finished || temp.ProcessedSize >= temp.TotalSize {
- task.Finished = true
- task.Status = "FINISHED"
- FileQueue.Delete(v)
- OpStrArr = OpStrArr[1:]
- go ExecOpFile()
- list = append(list, task)
- continue
- }
- for _, v := range temp.Item {
- if v.Size != v.ProcessedSize {
- task.ProcessingPath = v.From
- break
- }
- }
- list = append(list, task)
- }
- model.Data = list
- listMsg["file_operate"] = model
- msg := make(map[string]string)
- for k, v := range listMsg {
- bt, _ := json.Marshal(v)
- msg[k] = string(bt)
- }
- response, err := MyService.MessageBus().PublishEventWithResponse(context.Background(), common.SERVICENAME, "casaos:file:operate", msg)
- if err != nil {
- logger.Error("failed to publish event to message bus", zap.Error(err), zap.Any("event", msg))
- }
- if response.StatusCode() != http.StatusOK {
- logger.Error("failed to publish event to message bus", zap.String("status", response.Status()), zap.Any("response", response))
- }
- time.Sleep(time.Second * 3)
- }
- }
- }
- // func (i *notifyServer) SendInstallAppBySocket(app notifyCommon.Application) {
- // SocketServer.BroadcastToRoom("/", "public", "app_install", app)
- // }
- // func (i *notifyServer) SendUninstallAppBySocket(app notifyCommon.Application) {
- // SocketServer.BroadcastToRoom("/", "public", "app_uninstall", app)
- // }
- func (i *notifyServer) SSR() {
- server := socketio.NewServer(nil)
- fmt.Println(server)
- }
- func (i notifyServer) GetList(c int) (list []model.AppNotify) {
- i.db.Where("class = ?", c).Where(i.db.Where("state = ?", types.NOTIFY_DYNAMICE).Or("state = ?", types.NOTIFY_UNREAD)).Find(&list)
- return
- }
- func (i *notifyServer) AddLog(log model.AppNotify) {
- i.db.Create(&log)
- }
- func (i *notifyServer) UpdateLog(log model.AppNotify) {
- i.db.Save(&log)
- }
- func (i *notifyServer) UpdateLogByCustomID(log model.AppNotify) {
- if len(log.CustomId) == 0 {
- return
- }
- i.db.Model(&model.AppNotify{}).Select("*").Where("custom_id = ? ", log.CustomId).Updates(log)
- }
- func (i *notifyServer) GetLog(id string) model.AppNotify {
- var log model.AppNotify
- i.db.Where("custom_id = ? ", id).First(&log)
- return log
- }
- func (i *notifyServer) MarkRead(id string, state int) {
- if id == "0" {
- i.db.Model(&model.AppNotify{}).Where("1 = ?", 1).Update("state", state)
- return
- }
- i.db.Model(&model.AppNotify{}).Where("id = ? ", id).Update("state", state)
- }
- func (i *notifyServer) DelLog(id string) {
- var log model.AppNotify
- i.db.Where("custom_id = ?", id).Delete(&log)
- }
- func SendMeg() {
- // for {
- // mt, message, err := ws.ReadMessage()
- // if err != nil {
- // break
- // }
- // notify := model.NotifyMssage{}
- // json2.Unmarshal(message, ¬ify)
- // if notify.Type == "read" {
- // service.MyService.Notify().MarkRead(notify.Data, types.NOTIFY_READ)
- // }
- // if notify.Type == "app" {
- // go func(ws *websocket.Conn) {
- for {
- list := MyService.Notify().GetList(types.NOTIFY_APP)
- json, _ := json2.Marshal(list)
- if len(list) > 0 {
- var temp []*websocket.Conn
- for _, v := range WebSocketConns {
- err := v.WriteMessage(1, json)
- if err == nil {
- temp = append(temp, v)
- }
- }
- WebSocketConns = temp
- for _, v := range list {
- MyService.Notify().MarkRead(v.Id, types.NOTIFY_READ)
- }
- }
- if len(WebSocketConns) == 0 {
- SocketRun = false
- }
- time.Sleep(time.Second * 2)
- }
- // }(ws)
- // }
- // }
- }
- // func (i notifyServer) SendText(m model.AppNotify) {
- // list := []model.AppNotify{}
- // list = append(list, m)
- // json, _ := json2.Marshal(list)
- // var temp []*websocket.Conn
- // for _, v := range WebSocketConns {
- // err := v.WriteMessage(1, json)
- // if err == nil {
- // temp = append(temp, v)
- // }
- // }
- // WebSocketConns = temp
- // if len(WebSocketConns) == 0 {
- // SocketRun = false
- // }
- // }
- func (i *notifyServer) GetSystemTempMap() syncmap.Map {
- return i.SystemTempMap
- }
- func NewNotifyService(db *gorm.DB) NotifyServer {
- return ¬ifyServer{db: db, SystemTempMap: syncmap.Map{}}
- }
|