Browse Source

Socketio modification (#771)

link 2 years ago
parent
commit
cd79e51f8f
10 changed files with 95 additions and 237 deletions
  1. 1 1
      cmd/migration-tool/main.go
  2. 7 5
      main.go
  3. 12 32
      route/periodical.go
  4. 1 0
      route/route.go
  5. 40 36
      route/socket.go
  6. 12 1
      route/v1/file.go
  7. 2 3
      route/v1/system.go
  8. 10 157
      service/notify.go
  9. 9 2
      service/service.go
  10. 1 0
      service/system.go

+ 1 - 1
cmd/migration-tool/main.go

@@ -48,7 +48,7 @@ func init() {
 	sqliteDB = sqlite.GetDb(dbFlag)
 	// gredis.GetRedisConn(config.RedisInfo),
 
-	service.MyService = service.NewService(sqliteDB, "")
+	service.MyService = service.NewService(sqliteDB, "", nil)
 }
 
 func main() {

+ 7 - 5
main.go

@@ -11,7 +11,6 @@ import (
 	"github.com/IceWhaleTech/CasaOS-Common/model"
 	"github.com/IceWhaleTech/CasaOS-Common/utils/constants"
 	"github.com/IceWhaleTech/CasaOS-Common/utils/logger"
-	"github.com/IceWhaleTech/CasaOS/model/notify"
 	"github.com/IceWhaleTech/CasaOS/pkg/cache"
 	"github.com/IceWhaleTech/CasaOS/pkg/config"
 	"github.com/IceWhaleTech/CasaOS/pkg/sqlite"
@@ -21,6 +20,7 @@ import (
 	"github.com/IceWhaleTech/CasaOS/service"
 	"github.com/IceWhaleTech/CasaOS/types"
 	"github.com/coreos/go-systemd/daemon"
+	"github.com/gin-gonic/gin"
 	"go.uber.org/zap"
 
 	"github.com/robfig/cron"
@@ -53,7 +53,7 @@ func init() {
 	sqliteDB = sqlite.GetDb(*dbFlag)
 	// gredis.GetRedisConn(config.RedisInfo),
 
-	service.MyService = service.NewService(sqliteDB, config.CommonInfo.RuntimePath)
+	service.MyService = service.NewService(sqliteDB, config.CommonInfo.RuntimePath, route.SocketIo())
 
 	service.Cache = cache.Init()
 
@@ -74,15 +74,17 @@ func init() {
 // @name Authorization
 // @BasePath /v1
 func main() {
-	service.NotifyMsg = make(chan notify.Message, 10)
 	if *versionFlag {
 		return
 	}
-	go route.SocketInit(service.NotifyMsg)
 	// model.Setup()
 	// gredis.Setup()
 
 	r := route.InitRouter()
+	defer service.SocketServer.Close()
+	r.GET("/v1/socketio/*any", gin.WrapH(service.SocketServer))
+	r.POST("/v1/socketio/*any", gin.WrapH(service.SocketServer))
+
 	// service.SyncTask(sqliteDB)
 	cron2 := cron.New()
 	// every day execution
@@ -108,7 +110,7 @@ func main() {
 	if err != nil {
 		panic(err)
 	}
-	routers := []string{"sys", "port", "file", "folder", "batch", "image", "samba", "notify"}
+	routers := []string{"sys", "port", "file", "folder", "batch", "image", "samba", "notify", "socketio"}
 	for _, v := range routers {
 		err = service.MyService.Gateway().CreateRoute(&model.Route{
 			Path:   "/v1/" + v,

+ 12 - 32
route/periodical.go

@@ -22,37 +22,6 @@ import (
 	"github.com/IceWhaleTech/CasaOS/service"
 )
 
-func SendNetINfoBySocket() {
-	netList := service.MyService.System().GetNetInfo()
-	newNet := []model.IOCountersStat{}
-	nets := service.MyService.System().GetNet(true)
-	for _, n := range netList {
-		for _, netCardName := range nets {
-			if n.Name == netCardName {
-				item := *(*model.IOCountersStat)(unsafe.Pointer(&n))
-				item.State = strings.TrimSpace(service.MyService.System().GetNetState(n.Name))
-				item.Time = time.Now().Unix()
-				newNet = append(newNet, item)
-				break
-			}
-		}
-	}
-	service.MyService.Notify().SendNetInfoBySocket(newNet)
-}
-
-func SendCPUBySocket() {
-	cpu := service.MyService.System().GetCpuPercent()
-	num := service.MyService.System().GetCpuCoreNum()
-	cpuData := make(map[string]interface{})
-	cpuData["percent"] = cpu
-	cpuData["num"] = num
-	service.MyService.Notify().SendCPUInfoBySocket(cpuData)
-}
-
-func SendMemBySocket() {
-	service.MyService.Notify().SendMemInfoBySocket(service.MyService.System().GetMemInfo())
-}
-
 func SendAllHardwareStatusBySocket() {
 	netList := service.MyService.System().GetNetInfo()
 	newNet := []model.IOCountersStat{}
@@ -89,7 +58,18 @@ func SendAllHardwareStatusBySocket() {
 
 	memInfo := service.MyService.System().GetMemInfo()
 
-	service.MyService.Notify().SendAllHardwareStatusBySocket(memInfo, cpuData, newNet)
+	body := make(map[string]interface{})
+
+	body["sys_mem"] = memInfo
+
+	body["sys_cpu"] = cpuData
+
+	body["sys_net"] = newNet
+	systemTempMap := service.MyService.Notify().GetSystemTempMap()
+	for k, v := range systemTempMap {
+		body[k] = v
+	}
+	service.MyService.Notify().SendNotify("sys_hardware_status", body)
 }
 
 // func MonitoryUSB() {

+ 1 - 0
route/route.go

@@ -119,6 +119,7 @@ func InitRouter() *gin.Engine {
 			v1FolderGroup.PUT("/name", v1.RenamePath)
 			v1FolderGroup.GET("", v1.DirPath)   ///file/dirpath
 			v1FolderGroup.POST("", v1.MkdirAll) ///file/mkdir
+			v1FolderGroup.GET("/size", v1.GetSize)
 		}
 		v1BatchGroup := v1Group.Group("/batch")
 		v1BatchGroup.Use()

+ 40 - 36
route/socket.go

@@ -11,48 +11,52 @@
 package route
 
 import (
-	"strconv"
-	"time"
-
-	"github.com/IceWhaleTech/CasaOS-Common/utils/port"
-	"github.com/IceWhaleTech/CasaOS/model/notify"
-	"github.com/IceWhaleTech/CasaOS/pkg/config"
+	"github.com/IceWhaleTech/CasaOS-Common/utils/logger"
 	"github.com/IceWhaleTech/CasaOS/service"
-	f "github.com/ambelovsky/gosf"
+	socketio "github.com/googollee/go-socket.io"
+	"go.uber.org/zap"
 )
 
-func SocketInit(msg chan notify.Message) {
-	// set socket port
-	socketPort := 0
-	if len(config.ServerInfo.SocketPort) == 0 {
-		socketPort, _ = port.GetAvailablePort("tcp")
-		config.ServerInfo.SocketPort = strconv.Itoa(socketPort)
-		config.Cfg.Section("server").Key("SocketPort").SetValue(strconv.Itoa(socketPort))
-		config.Cfg.SaveTo(config.SystemConfigInfo.ConfigPath)
-	} else {
-		socketPort, _ = strconv.Atoi(config.ServerInfo.SocketPort)
-		if !port.IsPortAvailable(socketPort, "tcp") {
-			socketPort, _ := port.GetAvailablePort("tcp")
-			config.ServerInfo.SocketPort = strconv.Itoa(socketPort)
-			config.Cfg.Section("server").Key("SocketPort").SetValue(strconv.Itoa(socketPort))
-			config.Cfg.SaveTo(config.SystemConfigInfo.ConfigPath)
-		}
-	}
-
-	f.OnConnect(func(c *f.Client, request *f.Request) {
+func SocketIo() *socketio.Server {
+	server := socketio.NewServer(nil)
+	server.OnConnect("/", func(s socketio.Conn) error {
+		s.SetContext("")
+		logger.Info("connected", zap.Any("id", s.ID()))
+		s.Join("public")
 		service.ClientCount += 1
+		return nil
 	})
-	f.OnDisconnect(func(c *f.Client, request *f.Request) {
-		service.ClientCount -= 1
+
+	server.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
+		logger.Info("notice", zap.Any("msg", msg))
+		s.Emit("reply", "have "+msg)
 	})
-	go func(msg chan notify.Message) {
-		for v := range msg {
-			f.Broadcast("", v.Path, &v.Msg)
-			time.Sleep(time.Millisecond * 100)
-		}
-	}(msg)
 
-	f.Startup(map[string]interface{}{
-		"port": socketPort,
+	// server.OnEvent("/chat", "msg", func(s socketio.Conn, msg string) string {
+	// 	s.SetContext(msg)
+	// 	return "recv " + msg
+	// })
+
+	// server.OnEvent("/", "bye", func(s socketio.Conn) string {
+	// 	last := s.Context().(string)
+	// 	s.Emit("bye", last)
+	// 	s.Close()
+	// 	return last
+	// })
+
+	server.OnError("/", func(s socketio.Conn, e error) {
+		logger.Error("meet error", zap.Any("error", e))
+	})
+
+	server.OnDisconnect("/", func(s socketio.Conn, reason string) {
+		service.ClientCount -= 1
+		logger.Info("closed", zap.Any("reason", reason))
 	})
+
+	go func() {
+		if err := server.Serve(); err != nil {
+			logger.Error("error when trying to  listen socketio ", zap.Any("error", err))
+		}
+	}()
+	return server
 }

+ 12 - 1
route/v1/file.go

@@ -184,7 +184,7 @@ func GetDownloadFile(c *gin.Context) {
 func GetDownloadSingleFile(c *gin.Context) {
 	filePath := c.Query("path")
 	if len(filePath) == 0 {
-		c.JSON(service.ClientCount, model.Result{
+		c.JSON(common_err.CLIENT_ERROR, model.Result{
 			Success: common_err.INVALID_PARAMS,
 			Message: common_err.GetMsg(common_err.INVALID_PARAMS),
 		})
@@ -649,3 +649,14 @@ func DeleteOperateFileOrDir(c *gin.Context) {
 	go service.MyService.Notify().SendFileOperateNotify(true)
 	c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
 }
+func GetSize(c *gin.Context) {
+	json := make(map[string]string)
+	c.ShouldBind(&json)
+	path := json["path"]
+	size, err := file.GetFileOrDirSize(path)
+	if err != nil {
+		c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
+		return
+	}
+	c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: size})
+}

+ 2 - 3
route/v1/system.go

@@ -336,11 +336,10 @@ func GetSystemProxy(c *gin.Context) {
 
 func PutSystemState(c *gin.Context) {
 	state := c.Param("state")
-	if state == "off" {
+	if strings.ToLower(state) == "off" {
 		service.MyService.System().SystemShutdown()
-	} else if state == "restart" {
+	} else if strings.ToLower(state) == "restart" {
 		service.MyService.System().SystemReboot()
-
 	}
 	c.JSON(http.StatusOK, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: "The operation will be completed shortly."})
 }

+ 10 - 157
service/notify.go

@@ -10,14 +10,14 @@ import (
 	"github.com/IceWhaleTech/CasaOS/model/notify"
 	"github.com/IceWhaleTech/CasaOS/service/model"
 	"github.com/IceWhaleTech/CasaOS/types"
-	"github.com/ambelovsky/gosf"
+
 	socketio "github.com/googollee/go-socket.io"
 	"github.com/gorilla/websocket"
 	"gorm.io/gorm"
 )
 
 var (
-	NotifyMsg   chan notify.Message
+	//NotifyMsg   chan notify.Message
 	ClientCount int
 )
 
@@ -31,12 +31,9 @@ type NotifyServer interface {
 	MarkRead(id string, state int)
 	//	SendText(m model.AppNotify)
 	SendUninstallAppBySocket(app notifyCommon.Application)
-	SendNetInfoBySocket(netList []model2.IOCountersStat)
-	SendCPUInfoBySocket(cpu map[string]interface{})
-	SendMemInfoBySocket(mem map[string]interface{})
+
 	SendFileOperateNotify(nowSend bool)
 	SendInstallAppBySocket(app notifyCommon.Application)
-	SendAllHardwareStatusBySocket(mem map[string]interface{}, cpu map[string]interface{}, netList []model2.IOCountersStat)
 	SendStorageBySocket(message notify.StorageMessage)
 	SendNotify(path string, message map[string]interface{})
 	SettingSystemTempData(message map[string]interface{})
@@ -55,57 +52,11 @@ func (i *notifyServer) SettingSystemTempData(message map[string]interface{}) {
 }
 
 func (i *notifyServer) SendNotify(path string, message map[string]interface{}) {
-	msg := gosf.Message{}
-	msg.Body = message
-	msg.Success = true
-	msg.Text = path
-
-	notify := notify.Message{}
-	notify.Path = path
-	notify.Msg = msg
-
-	NotifyMsg <- notify
+	SocketServer.BroadcastToRoom("/", "public", path, message)
 }
 
 func (i *notifyServer) SendStorageBySocket(message notify.StorageMessage) {
-	body := make(map[string]interface{})
-	body["data"] = message
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "storage_status"
-
-	notify := notify.Message{}
-	notify.Path = "storage_status"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
-}
-
-func (i *notifyServer) SendAllHardwareStatusBySocket(mem map[string]interface{}, cpu map[string]interface{}, netList []model2.IOCountersStat) {
-	body := make(map[string]interface{})
-
-	body["sys_mem"] = mem
-
-	body["sys_cpu"] = cpu
-
-	body["sys_net"] = netList
-
-	for k, v := range i.SystemTempMap {
-		body[k] = v
-	}
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "sys_hardware_status"
-
-	notify := notify.Message{}
-	notify.Path = "sys_hardware_status"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
+	SocketServer.BroadcastToRoom("/", "public", "storage_status", message)
 }
 
 // Send periodic broadcast messages
@@ -122,17 +73,8 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) {
 		listMsg := make(map[string]interface{})
 		if len == 0 {
 			model.Data = []string{}
-
 			listMsg["file_operate"] = model
-			msg := gosf.Message{}
-			msg.Success = true
-			msg.Body = listMsg
-			msg.Text = "file_operate"
-
-			notify := notify.Message{}
-			notify.Path = "file_operate"
-			notify.Msg = msg
-			NotifyMsg <- notify
+			SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg)
 			return
 		}
 
@@ -180,16 +122,7 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) {
 		model.Data = list
 
 		listMsg["file_operate"] = model
-
-		msg := gosf.Message{}
-		msg.Success = true
-		msg.Body = listMsg
-		msg.Text = "file_operate"
-
-		notify := notify.Message{}
-		notify.Path = "file_operate"
-		notify.Msg = msg
-		NotifyMsg <- notify
+		SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg)
 	} else {
 		for {
 
@@ -246,99 +179,19 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) {
 			model.Data = list
 
 			listMsg["file_operate"] = model
-
-			msg := gosf.Message{}
-			msg.Success = true
-			msg.Body = listMsg
-			msg.Text = "file_operate"
-
-			notify := notify.Message{}
-			notify.Path = "file_operate"
-			notify.Msg = msg
-			NotifyMsg <- notify
+			SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg)
 			time.Sleep(time.Second * 3)
 		}
 	}
 }
 
-func (i *notifyServer) SendMemInfoBySocket(mem map[string]interface{}) {
-	body := make(map[string]interface{})
-	body["data"] = mem
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "sys_mem"
-
-	notify := notify.Message{}
-	notify.Path = "sys_mem"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
-}
-
 func (i *notifyServer) SendInstallAppBySocket(app notifyCommon.Application) {
-	body := make(map[string]interface{})
-	body["data"] = app
+	SocketServer.BroadcastToRoom("/", "public", "app_install", app)
 
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "app_install"
-
-	notify := notify.Message{}
-	notify.Path = "app_install"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
-}
-
-func (i *notifyServer) SendCPUInfoBySocket(cpu map[string]interface{}) {
-	body := make(map[string]interface{})
-	body["data"] = cpu
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "sys_cpu"
-
-	notify := notify.Message{}
-	notify.Path = "sys_cpu"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
-}
-
-func (i *notifyServer) SendNetInfoBySocket(netList []model2.IOCountersStat) {
-	body := make(map[string]interface{})
-	body["data"] = netList
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "sys_net"
-
-	notify := notify.Message{}
-	notify.Path = "sys_net"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
 }
 
 func (i *notifyServer) SendUninstallAppBySocket(app notifyCommon.Application) {
-	body := make(map[string]interface{})
-	body["data"] = app
-
-	msg := gosf.Message{}
-	msg.Body = body
-	msg.Success = true
-	msg.Text = "app_uninstall"
-
-	notify := notify.Message{}
-	notify.Path = "app_uninstall"
-	notify.Msg = msg
-
-	NotifyMsg <- notify
+	SocketServer.BroadcastToRoom("/", "public", "app_uninstall", app)
 }
 
 func (i *notifyServer) SSR() {

+ 9 - 2
service/service.go

@@ -12,15 +12,18 @@ package service
 
 import (
 	"github.com/IceWhaleTech/CasaOS-Common/external"
+	"github.com/IceWhaleTech/CasaOS-Common/utils/logger"
+	socketio "github.com/googollee/go-socket.io"
 	"github.com/gorilla/websocket"
 	"github.com/patrickmn/go-cache"
+	"go.uber.org/zap"
 	"gorm.io/gorm"
 )
 
 var Cache *cache.Cache
 
 var MyService Repository
-
+var SocketServer *socketio.Server
 var (
 	WebSocketConns []*websocket.Conn
 	SocketRun      bool
@@ -37,7 +40,11 @@ type Repository interface {
 	Gateway() external.ManagementService
 }
 
-func NewService(db *gorm.DB, RuntimePath string) Repository {
+func NewService(db *gorm.DB, RuntimePath string, socket *socketio.Server) Repository {
+	if socket == nil {
+		logger.Error("socket is nil", zap.Any("error", "socket is nil"))
+	}
+	SocketServer = socket
 	gatewayManagement, err := external.NewManagementService(RuntimePath)
 	if err != nil && len(RuntimePath) > 0 {
 		panic(err)

+ 1 - 0
service/system.go

@@ -384,6 +384,7 @@ func (s *systemService) GetCPUPower() map[string]string {
 }
 
 func (s *systemService) SystemReboot() error {
+	//cmd := exec.Command("/bin/bash", "-c", "reboot")
 	arg := []string{"6"}
 	cmd := exec.Command("init", arg...)
 	_, err := cmd.CombinedOutput()