file.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160
  1. package v1
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. url2 "net/url"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "github.com/IceWhaleTech/CasaOS-Common/utils/logger"
  19. "github.com/IceWhaleTech/CasaOS/model"
  20. "github.com/gorilla/websocket"
  21. "github.com/robfig/cron/v3"
  22. "github.com/tidwall/gjson"
  23. "github.com/IceWhaleTech/CasaOS/pkg/utils/common_err"
  24. "github.com/IceWhaleTech/CasaOS/pkg/utils/file"
  25. "github.com/IceWhaleTech/CasaOS/service"
  26. model2 "github.com/IceWhaleTech/CasaOS/service/model"
  27. "github.com/gin-gonic/gin"
  28. uuid "github.com/satori/go.uuid"
  29. "go.uber.org/zap"
  30. "github.com/h2non/filetype"
  31. )
  32. type ListReq struct {
  33. model.PageReq
  34. Path string `json:"path" form:"path"`
  35. //Refresh bool `json:"refresh"`
  36. }
  37. type ObjResp struct {
  38. Name string `json:"name"`
  39. Size int64 `json:"size"`
  40. IsDir bool `json:"is_dir"`
  41. Modified time.Time `json:"modified"`
  42. Sign string `json:"sign"`
  43. Thumb string `json:"thumb"`
  44. Type int `json:"type"`
  45. Path string `json:"path"`
  46. Date time.Time `json:"date"`
  47. Extensions map[string]interface{} `json:"extensions"`
  48. }
  49. type FsListResp struct {
  50. Content []ObjResp `json:"content"`
  51. Total int64 `json:"total"`
  52. Readme string `json:"readme,omitempty"`
  53. Write bool `json:"write,omitempty"`
  54. Provider string `json:"provider,omitempty"`
  55. Index int `json:"index"`
  56. Size int `json:"size"`
  57. }
  58. var (
  59. // 升级成 WebSocket 协议
  60. upgraderFile = websocket.Upgrader{
  61. // 允许CORS跨域请求
  62. CheckOrigin: func(r *http.Request) bool {
  63. return true
  64. },
  65. }
  66. conn *websocket.Conn
  67. err error
  68. )
  69. // @Summary 读取文件
  70. // @Produce application/json
  71. // @Accept application/json
  72. // @Tags file
  73. // @Security ApiKeyAuth
  74. // @Param path query string true "路径"
  75. // @Success 200 {string} string "ok"
  76. // @Router /file/read [get]
  77. func GetFilerContent(c *gin.Context) {
  78. filePath := c.Query("path")
  79. if len(filePath) == 0 {
  80. c.JSON(common_err.CLIENT_ERROR, model.Result{
  81. Success: common_err.INVALID_PARAMS,
  82. Message: common_err.GetMsg(common_err.INVALID_PARAMS),
  83. })
  84. return
  85. }
  86. if !file.Exists(filePath) {
  87. c.JSON(common_err.SERVICE_ERROR, model.Result{
  88. Success: common_err.FILE_DOES_NOT_EXIST,
  89. Message: common_err.GetMsg(common_err.FILE_DOES_NOT_EXIST),
  90. })
  91. return
  92. }
  93. // 文件读取任务是将文件内容读取到内存中。
  94. info, err := ioutil.ReadFile(filePath)
  95. if err != nil {
  96. c.JSON(common_err.SERVICE_ERROR, model.Result{
  97. Success: common_err.FILE_READ_ERROR,
  98. Message: common_err.GetMsg(common_err.FILE_READ_ERROR),
  99. Data: err.Error(),
  100. })
  101. return
  102. }
  103. result := string(info)
  104. c.JSON(common_err.SUCCESS, model.Result{
  105. Success: common_err.SUCCESS,
  106. Message: common_err.GetMsg(common_err.SUCCESS),
  107. Data: result,
  108. })
  109. }
  110. func GetLocalFile(c *gin.Context) {
  111. path := c.Query("path")
  112. if len(path) == 0 {
  113. c.JSON(http.StatusOK, model.Result{
  114. Success: common_err.INVALID_PARAMS,
  115. Message: common_err.GetMsg(common_err.INVALID_PARAMS),
  116. })
  117. return
  118. }
  119. if !file.Exists(path) {
  120. c.JSON(http.StatusOK, model.Result{
  121. Success: common_err.FILE_DOES_NOT_EXIST,
  122. Message: common_err.GetMsg(common_err.FILE_DOES_NOT_EXIST),
  123. })
  124. return
  125. }
  126. c.File(path)
  127. }
  128. // @Summary download
  129. // @Produce application/json
  130. // @Accept application/json
  131. // @Tags file
  132. // @Security ApiKeyAuth
  133. // @Param format query string false "Compression format" Enums(zip,tar,targz)
  134. // @Param files query string true "file list eg: filename1,filename2,filename3 "
  135. // @Success 200 {string} string "ok"
  136. // @Router /file/download [get]
  137. func GetDownloadFile(c *gin.Context) {
  138. t := c.Query("format")
  139. files := c.Query("files")
  140. if len(files) == 0 {
  141. c.JSON(common_err.CLIENT_ERROR, model.Result{
  142. Success: common_err.INVALID_PARAMS,
  143. Message: common_err.GetMsg(common_err.INVALID_PARAMS),
  144. })
  145. return
  146. }
  147. list := strings.Split(files, ",")
  148. for _, v := range list {
  149. if !file.Exists(v) {
  150. c.JSON(common_err.SERVICE_ERROR, model.Result{
  151. Success: common_err.FILE_DOES_NOT_EXIST,
  152. Message: common_err.GetMsg(common_err.FILE_DOES_NOT_EXIST),
  153. })
  154. return
  155. }
  156. }
  157. c.Header("Content-Type", "application/octet-stream")
  158. c.Header("Content-Transfer-Encoding", "binary")
  159. c.Header("Cache-Control", "no-cache")
  160. // handles only single files not folders and multiple files
  161. if len(list) == 1 {
  162. filePath := list[0]
  163. info, err := os.Stat(filePath)
  164. if err != nil {
  165. c.JSON(http.StatusOK, model.Result{
  166. Success: common_err.FILE_DOES_NOT_EXIST,
  167. Message: common_err.GetMsg(common_err.FILE_DOES_NOT_EXIST),
  168. })
  169. return
  170. }
  171. if !info.IsDir() {
  172. // 打开文件
  173. fileTmp, _ := os.Open(filePath)
  174. defer fileTmp.Close()
  175. // 获取文件的名称
  176. fileName := path.Base(filePath)
  177. c.Header("Content-Disposition", "attachment; filename*=utf-8''"+url2.PathEscape(fileName))
  178. c.File(filePath)
  179. return
  180. }
  181. }
  182. extension, ar, err := file.GetCompressionAlgorithm(t)
  183. if err != nil {
  184. c.JSON(common_err.CLIENT_ERROR, model.Result{
  185. Success: common_err.INVALID_PARAMS,
  186. Message: common_err.GetMsg(common_err.INVALID_PARAMS),
  187. })
  188. return
  189. }
  190. err = ar.Create(c.Writer)
  191. if err != nil {
  192. c.JSON(common_err.SERVICE_ERROR, model.Result{
  193. Success: common_err.SERVICE_ERROR,
  194. Message: common_err.GetMsg(common_err.SERVICE_ERROR),
  195. Data: err.Error(),
  196. })
  197. return
  198. }
  199. defer ar.Close()
  200. commonDir := file.CommonPrefix(filepath.Separator, list...)
  201. currentPath := filepath.Base(commonDir)
  202. name := "_" + currentPath
  203. name += extension
  204. c.Header("Content-Disposition", "attachment; filename*=utf-8''"+url.PathEscape(name))
  205. for _, fname := range list {
  206. err = file.AddFile(ar, fname, commonDir)
  207. if err != nil {
  208. log.Printf("Failed to archive %s: %v", fname, err)
  209. }
  210. }
  211. }
  212. func GetDownloadSingleFile(c *gin.Context) {
  213. filePath := c.Query("path")
  214. if len(filePath) == 0 {
  215. c.JSON(common_err.CLIENT_ERROR, model.Result{
  216. Success: common_err.INVALID_PARAMS,
  217. Message: common_err.GetMsg(common_err.INVALID_PARAMS),
  218. })
  219. return
  220. }
  221. fileName := path.Base(filePath)
  222. // c.Header("Content-Disposition", "inline")
  223. c.Header("Content-Disposition", "attachment; filename*=utf-8''"+url2.PathEscape(fileName))
  224. fi, err := os.Open(filePath)
  225. if err != nil {
  226. panic(err)
  227. }
  228. // We only have to pass the file header = first 261 bytes
  229. buffer := make([]byte, 261)
  230. _, _ = fi.Read(buffer)
  231. kind, _ := filetype.Match(buffer)
  232. if kind != filetype.Unknown {
  233. c.Header("Content-Type", kind.MIME.Value)
  234. }
  235. node, err := os.Stat(filePath)
  236. // Set the Last-Modified header to the timestamp
  237. c.Header("Last-Modified", node.ModTime().UTC().Format(http.TimeFormat))
  238. knownSize := node.Size() >= 0
  239. if knownSize {
  240. c.Header("Content-Length", strconv.FormatInt(node.Size(), 10))
  241. }
  242. http.ServeContent(c.Writer, c.Request, fileName, node.ModTime(), fi)
  243. //http.ServeFile(c.Writer, c.Request, filePath)
  244. defer fi.Close()
  245. return
  246. fileTmp, err := os.Open(filePath)
  247. if err != nil {
  248. c.JSON(common_err.SERVICE_ERROR, model.Result{
  249. Success: common_err.FILE_DOES_NOT_EXIST,
  250. Message: common_err.GetMsg(common_err.FILE_DOES_NOT_EXIST),
  251. })
  252. return
  253. }
  254. defer fileTmp.Close()
  255. c.File(filePath)
  256. }
  257. // @Summary 获取目录列表
  258. // @Produce application/json
  259. // @Accept application/json
  260. // @Tags file
  261. // @Security ApiKeyAuth
  262. // @Param path query string false "路径"
  263. // @Success 200 {string} string "ok"
  264. // @Router /file/dirpath [get]
  265. func DirPath(c *gin.Context) {
  266. var req ListReq
  267. if err := c.ShouldBind(&req); err != nil {
  268. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.CLIENT_ERROR), Data: err.Error()})
  269. return
  270. }
  271. req.Validate()
  272. info, err := service.MyService.System().GetDirPath(req.Path)
  273. if err != nil {
  274. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  275. return
  276. }
  277. shares := service.MyService.Shares().GetSharesList()
  278. sharesMap := make(map[string]string)
  279. for _, v := range shares {
  280. sharesMap[v.Path] = fmt.Sprint(v.ID)
  281. }
  282. // if len(info) <= (req.Page-1)*req.Size {
  283. // c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.INVALID_PARAMS), Data: "page out of range"})
  284. // return
  285. // }
  286. forEnd := req.Index * req.Size
  287. if forEnd > len(info) {
  288. forEnd = len(info)
  289. }
  290. for i := (req.Index - 1) * req.Size; i < forEnd; i++ {
  291. if v, ok := sharesMap[info[i].Path]; ok {
  292. ex := make(map[string]interface{})
  293. shareEx := make(map[string]string)
  294. shareEx["shared"] = "true"
  295. shareEx["id"] = v
  296. ex["share"] = shareEx
  297. ex["mounted"] = false
  298. info[i].Extensions = ex
  299. }
  300. }
  301. if strings.HasPrefix(req.Path, "/mnt") || strings.HasPrefix(req.Path, "/media") {
  302. for i := (req.Index - 1) * req.Size; i < forEnd; i++ {
  303. ex := info[i].Extensions
  304. if ex == nil {
  305. ex = make(map[string]interface{})
  306. }
  307. mounted := service.IsMounted(info[i].Path)
  308. ex["mounted"] = mounted
  309. info[i].Extensions = ex
  310. }
  311. }
  312. // Hide the files or folders in operation
  313. fileQueue := make(map[string]string)
  314. if len(service.OpStrArr) > 0 {
  315. for _, v := range service.OpStrArr {
  316. v, ok := service.FileQueue.Load(v)
  317. if !ok {
  318. continue
  319. }
  320. vt := v.(model.FileOperate)
  321. for _, i := range vt.Item {
  322. lastPath := i.From[strings.LastIndex(i.From, "/")+1:]
  323. fileQueue[vt.To+"/"+lastPath] = i.From
  324. }
  325. }
  326. }
  327. pathList := []ObjResp{}
  328. for i := (req.Index - 1) * req.Size; i < forEnd; i++ {
  329. if info[i].Name == ".temp" && info[i].IsDir {
  330. continue
  331. }
  332. if _, ok := fileQueue[info[i].Path]; !ok {
  333. t := ObjResp{}
  334. t.IsDir = info[i].IsDir
  335. t.Name = info[i].Name
  336. t.Modified = info[i].Date
  337. t.Date = info[i].Date
  338. t.Size = info[i].Size
  339. t.Path = info[i].Path
  340. t.Extensions = info[i].Extensions
  341. pathList = append(pathList, t)
  342. }
  343. }
  344. flist := FsListResp{
  345. Content: pathList,
  346. Total: int64(len(info)),
  347. // Readme: "",
  348. // Write: true,
  349. // Provider: "local",
  350. Index: req.Index,
  351. Size: req.Size,
  352. }
  353. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: flist})
  354. }
  355. // @Summary rename file or dir
  356. // @Produce application/json
  357. // @Accept application/json
  358. // @Tags file
  359. // @Security ApiKeyAuth
  360. // @Param oldpath body string true "path of old"
  361. // @Param newpath body string true "path of new"
  362. // @Success 200 {string} string "ok"
  363. // @Router /file/rename [put]
  364. func RenamePath(c *gin.Context) {
  365. json := make(map[string]string)
  366. c.ShouldBind(&json)
  367. op := json["old_path"]
  368. np := json["new_path"]
  369. if len(op) == 0 || len(np) == 0 {
  370. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  371. return
  372. }
  373. mounted := service.IsMounted(op)
  374. if mounted {
  375. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.MOUNTED_DIRECTIORIES, Message: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES), Data: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES)})
  376. return
  377. }
  378. success, err := service.MyService.System().RenameFile(op, np)
  379. c.JSON(common_err.SUCCESS, model.Result{Success: success, Message: common_err.GetMsg(success), Data: err})
  380. }
  381. // @Summary create folder
  382. // @Produce application/json
  383. // @Accept application/json
  384. // @Tags file
  385. // @Security ApiKeyAuth
  386. // @Param path body string true "path of folder"
  387. // @Success 200 {string} string "ok"
  388. // @Router /file/mkdir [post]
  389. func MkdirAll(c *gin.Context) {
  390. json := make(map[string]string)
  391. c.ShouldBind(&json)
  392. path := json["path"]
  393. var code int
  394. if len(path) == 0 {
  395. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  396. return
  397. }
  398. // decodedPath, err := url.QueryUnescape(path)
  399. // if err != nil {
  400. // c.JSON(http.StatusOK, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  401. // return
  402. // }
  403. code, _ = service.MyService.System().MkdirAll(path)
  404. c.JSON(common_err.SUCCESS, model.Result{Success: code, Message: common_err.GetMsg(code)})
  405. }
  406. // @Summary create file
  407. // @Produce application/json
  408. // @Accept application/json
  409. // @Tags file
  410. // @Security ApiKeyAuth
  411. // @Param path body string true "path of folder (path need to url encode)"
  412. // @Success 200 {string} string "ok"
  413. // @Router /file/create [post]
  414. func PostCreateFile(c *gin.Context) {
  415. json := make(map[string]string)
  416. c.ShouldBind(&json)
  417. path := json["path"]
  418. var code int
  419. if len(path) == 0 {
  420. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  421. return
  422. }
  423. // decodedPath, err := url.QueryUnescape(path)
  424. // if err != nil {
  425. // c.JSON(http.StatusOK, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  426. // return
  427. // }
  428. code, _ = service.MyService.System().CreateFile(path)
  429. c.JSON(common_err.SUCCESS, model.Result{Success: code, Message: common_err.GetMsg(code)})
  430. }
  431. // @Summary upload file
  432. // @Produce application/json
  433. // @Accept application/json
  434. // @Tags file
  435. // @Security ApiKeyAuth
  436. // @Param path formData string false "file path"
  437. // @Param file formData file true "file"
  438. // @Success 200 {string} string "ok"
  439. // @Router /file/upload [get]
  440. func GetFileUpload(c *gin.Context) {
  441. relative := c.Query("relativePath")
  442. fileName := c.Query("filename")
  443. chunkNumber := c.Query("chunkNumber")
  444. totalChunks, _ := strconv.Atoi(c.DefaultQuery("totalChunks", "0"))
  445. path := c.Query("path")
  446. dirPath := ""
  447. hash := file.GetHashByContent([]byte(fileName))
  448. if file.Exists(path + "/" + relative) {
  449. c.JSON(http.StatusConflict, model.Result{Success: http.StatusConflict, Message: common_err.GetMsg(common_err.FILE_ALREADY_EXISTS)})
  450. return
  451. }
  452. tempDir := filepath.Join(path, ".temp", hash+strconv.Itoa(totalChunks)) + "/"
  453. if fileName != relative {
  454. dirPath = strings.TrimSuffix(relative, fileName)
  455. tempDir += dirPath
  456. file.MkDir(path + "/" + dirPath)
  457. }
  458. tempDir += chunkNumber
  459. if !file.CheckNotExist(tempDir) {
  460. c.JSON(200, model.Result{Success: 200, Message: common_err.GetMsg(common_err.FILE_ALREADY_EXISTS)})
  461. return
  462. }
  463. c.JSON(204, model.Result{Success: 204, Message: common_err.GetMsg(common_err.SUCCESS)})
  464. }
  465. // @Summary upload file
  466. // @Produce application/json
  467. // @Accept multipart/form-data
  468. // @Tags file
  469. // @Security ApiKeyAuth
  470. // @Param path formData string false "file path"
  471. // @Param file formData file true "file"
  472. // @Success 200 {string} string "ok"
  473. // @Router /file/upload [post]
  474. func PostFileUpload(c *gin.Context) {
  475. f, _, _ := c.Request.FormFile("file")
  476. relative := c.PostForm("relativePath")
  477. fileName := c.PostForm("filename")
  478. totalChunks, _ := strconv.Atoi(c.DefaultPostForm("totalChunks", "0"))
  479. chunkNumber := c.PostForm("chunkNumber")
  480. dirPath := ""
  481. path := c.PostForm("path")
  482. hash := file.GetHashByContent([]byte(fileName))
  483. if len(path) == 0 {
  484. logger.Error("path should not be empty")
  485. c.JSON(http.StatusBadRequest, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  486. return
  487. }
  488. tempDir := filepath.Join(path, ".temp", hash+strconv.Itoa(totalChunks)) + "/"
  489. if fileName != relative {
  490. dirPath = strings.TrimSuffix(relative, fileName)
  491. tempDir += dirPath
  492. if err := file.MkDir(path + "/" + dirPath); err != nil {
  493. logger.Error("error when trying to create `"+path+"/"+dirPath+"`", zap.Error(err))
  494. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  495. return
  496. }
  497. }
  498. path += "/" + relative
  499. if !file.CheckNotExist(tempDir + chunkNumber) {
  500. if err := file.RMDir(tempDir + chunkNumber); err != nil {
  501. logger.Error("error when trying to remove existing `"+tempDir+chunkNumber+"`", zap.Error(err))
  502. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  503. return
  504. }
  505. }
  506. if totalChunks > 1 {
  507. if err := file.IsNotExistMkDir(tempDir); err != nil {
  508. logger.Error("error when trying to create `"+tempDir+"`", zap.Error(err))
  509. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  510. return
  511. }
  512. out, err := os.OpenFile(tempDir+chunkNumber, os.O_WRONLY|os.O_CREATE, 0o644)
  513. if err != nil {
  514. logger.Error("error when trying to open `"+tempDir+chunkNumber+"` for creation", zap.Error(err))
  515. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  516. return
  517. }
  518. defer out.Close()
  519. if _, err := io.Copy(out, f); err != nil { // recommend to use https://github.com/iceber/iouring-go for faster copy
  520. logger.Error("error when trying to write to `"+tempDir+chunkNumber+"`", zap.Error(err))
  521. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  522. return
  523. }
  524. fileNum, err := ioutil.ReadDir(tempDir)
  525. if err != nil {
  526. logger.Error("error when trying to read number of files under `"+tempDir+"`", zap.Error(err))
  527. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  528. return
  529. }
  530. if totalChunks == len(fileNum) {
  531. if err := file.SpliceFiles(tempDir, path, totalChunks, 1); err != nil {
  532. logger.Error("error when trying to splice files under `"+tempDir+"`", zap.Error(err))
  533. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  534. return
  535. }
  536. if err := file.RMDir(tempDir); err != nil {
  537. logger.Error("error when trying to remove `"+tempDir+"`", zap.Error(err))
  538. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  539. return
  540. }
  541. }
  542. } else {
  543. out, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o644)
  544. if err != nil {
  545. logger.Error("error when trying to open `"+path+"` for creation", zap.Error(err))
  546. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: err.Error()})
  547. return
  548. }
  549. defer out.Close()
  550. if _, err := io.Copy(out, f); err != nil { // recommend to use https://github.com/iceber/iouring-go for faster copy
  551. logger.Error("error when trying to write to `"+path+"`", zap.Error(err))
  552. c.JSON(http.StatusInternalServerError, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  553. return
  554. }
  555. }
  556. c.JSON(http.StatusOK, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  557. }
  558. func PostFileOctet(c *gin.Context) {
  559. content_length := c.Request.ContentLength
  560. if content_length <= 0 || content_length > 1024*1024*1024*2*1024 {
  561. log.Printf("content_length error\n")
  562. c.JSON(http.StatusBadRequest, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.CLIENT_ERROR), Data: "content_length error"})
  563. return
  564. }
  565. content_type_, has_key := c.Request.Header["Content-Type"]
  566. if !has_key {
  567. log.Printf("Content-Type error\n")
  568. c.JSON(http.StatusBadRequest, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.CLIENT_ERROR), Data: "Content-Type error"})
  569. return
  570. }
  571. if len(content_type_) != 1 {
  572. log.Printf("Content-Type count error\n")
  573. c.JSON(http.StatusBadRequest, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.CLIENT_ERROR), Data: "Content-Type count error"})
  574. return
  575. }
  576. content_type := content_type_[0]
  577. const BOUNDARY string = "; boundary="
  578. loc := strings.Index(content_type, BOUNDARY)
  579. if loc == -1 {
  580. log.Printf("Content-Type error, no boundary\n")
  581. c.JSON(http.StatusBadRequest, model.Result{Success: common_err.CLIENT_ERROR, Message: common_err.GetMsg(common_err.CLIENT_ERROR), Data: "Content-Type error, no boundary"})
  582. return
  583. }
  584. boundary := []byte(content_type[(loc + len(BOUNDARY)):])
  585. log.Printf("[%s]\n\n", boundary)
  586. read_data := make([]byte, 1024*24)
  587. var read_total int = 0
  588. for {
  589. file_header, file_data, err := file.ParseFromHead(read_data, read_total, append(boundary, []byte("\r\n")...), c.Request.Body)
  590. if err != nil {
  591. log.Printf("%v", err)
  592. return
  593. }
  594. log.Printf("file :%s\n", file_header)
  595. //
  596. //os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o644)
  597. f, err := os.OpenFile(file_header["path"]+"/"+file_header["filename"], os.O_WRONLY|os.O_CREATE, 0o644)
  598. if err != nil {
  599. log.Printf("create file fail:%v\n", err)
  600. return
  601. }
  602. f.Write(file_data)
  603. file_data = nil
  604. temp_data, reach_end, err := file.ReadToBoundary(boundary, c.Request.Body, f)
  605. f.Close()
  606. if err != nil {
  607. log.Printf("%v\n", err)
  608. return
  609. }
  610. if reach_end {
  611. break
  612. } else {
  613. copy(read_data[0:], temp_data)
  614. read_total = len(temp_data)
  615. continue
  616. }
  617. }
  618. c.JSON(http.StatusOK, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  619. }
  620. // @Summary copy or move file
  621. // @Produce application/json
  622. // @Accept application/json
  623. // @Tags file
  624. // @Security ApiKeyAuth
  625. // @Param body body model.FileOperate true "type:move,copy"
  626. // @Success 200 {string} string "ok"
  627. // @Router /file/operate [post]
  628. func PostOperateFileOrDir(c *gin.Context) {
  629. list := model.FileOperate{}
  630. c.ShouldBind(&list)
  631. if len(list.Item) == 0 {
  632. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  633. return
  634. }
  635. if list.To == list.Item[0].From[:strings.LastIndex(list.Item[0].From, "/")] {
  636. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SOURCE_DES_SAME, Message: common_err.GetMsg(common_err.SOURCE_DES_SAME)})
  637. return
  638. }
  639. var total int64 = 0
  640. for i := 0; i < len(list.Item); i++ {
  641. size, err := file.GetFileOrDirSize(list.Item[i].From)
  642. if err != nil {
  643. continue
  644. }
  645. list.Item[i].Size = size
  646. total += size
  647. if list.Type == "move" {
  648. mounted := service.IsMounted(list.Item[i].From)
  649. if mounted {
  650. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.MOUNTED_DIRECTIORIES, Message: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES), Data: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES)})
  651. return
  652. }
  653. }
  654. }
  655. list.TotalSize = total
  656. list.ProcessedSize = 0
  657. uid := uuid.NewV4().String()
  658. service.FileQueue.Store(uid, list)
  659. service.OpStrArr = append(service.OpStrArr, uid)
  660. if len(service.OpStrArr) == 1 {
  661. go service.ExecOpFile()
  662. go service.CheckFileStatus()
  663. go service.MyService.Notify().SendFileOperateNotify(false)
  664. }
  665. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  666. }
  667. // @Summary delete file
  668. // @Produce application/json
  669. // @Accept application/json
  670. // @Tags file
  671. // @Security ApiKeyAuth
  672. // @Param body body string true "paths eg ["/a/b/c","/d/e/f"]"
  673. // @Success 200 {string} string "ok"
  674. // @Router /file/delete [delete]
  675. func DeleteFile(c *gin.Context) {
  676. paths := []string{}
  677. c.ShouldBind(&paths)
  678. if len(paths) == 0 {
  679. c.JSON(common_err.CLIENT_ERROR, model.Result{Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS)})
  680. return
  681. }
  682. // path := c.Query("path")
  683. // paths := strings.Split(path, ",")
  684. for _, v := range paths {
  685. mounted := service.IsMounted(v)
  686. if mounted {
  687. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.MOUNTED_DIRECTIORIES, Message: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES), Data: common_err.GetMsg(common_err.MOUNTED_DIRECTIORIES)})
  688. return
  689. }
  690. }
  691. for _, v := range paths {
  692. err := os.RemoveAll(v)
  693. if err != nil {
  694. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.FILE_DELETE_ERROR, Message: common_err.GetMsg(common_err.FILE_DELETE_ERROR), Data: err})
  695. return
  696. }
  697. }
  698. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  699. }
  700. // @Summary update file
  701. // @Produce application/json
  702. // @Accept application/json
  703. // @Tags file
  704. // @Security ApiKeyAuth
  705. // @Param path body string true "path"
  706. // @Param content body string true "content"
  707. // @Success 200 {string} string "ok"
  708. // @Router /file/update [put]
  709. func PutFileContent(c *gin.Context) {
  710. fi := model.FileUpdate{}
  711. c.ShouldBind(&fi)
  712. // path := c.PostForm("path")
  713. // content := c.PostForm("content")
  714. if !file.Exists(fi.FilePath) {
  715. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.FILE_ALREADY_EXISTS, Message: common_err.GetMsg(common_err.FILE_ALREADY_EXISTS)})
  716. return
  717. }
  718. // err := os.Remove(path)
  719. f, err := os.Stat(fi.FilePath)
  720. if err != nil {
  721. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.FILE_ALREADY_EXISTS, Message: common_err.GetMsg(common_err.FILE_ALREADY_EXISTS)})
  722. return
  723. }
  724. fm := f.Mode()
  725. if err != nil {
  726. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.FILE_DELETE_ERROR, Message: common_err.GetMsg(common_err.FILE_DELETE_ERROR), Data: err})
  727. return
  728. }
  729. os.OpenFile(fi.FilePath, os.O_CREATE, fm)
  730. err = file.WriteToFullPath([]byte(fi.FileContent), fi.FilePath, fm)
  731. if err != nil {
  732. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  733. return
  734. }
  735. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  736. }
  737. // @Summary image thumbnail/original image
  738. // @Produce application/json
  739. // @Accept application/json
  740. // @Tags file
  741. // @Security ApiKeyAuth
  742. // @Param path query string true "path"
  743. // @Param type query string false "original,thumbnail" Enums(original,thumbnail)
  744. // @Success 200 {string} string "ok"
  745. // @Router /file/image [get]
  746. func GetFileImage(c *gin.Context) {
  747. t := c.Query("type")
  748. path := c.Query("path")
  749. if !file.Exists(path) {
  750. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.FILE_ALREADY_EXISTS, Message: common_err.GetMsg(common_err.FILE_ALREADY_EXISTS)})
  751. return
  752. }
  753. if t == "thumbnail" {
  754. f, err := file.GetImage(path, 100, 0)
  755. if err != nil {
  756. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  757. return
  758. }
  759. c.Writer.WriteString(string(f))
  760. return
  761. }
  762. f, err := os.Open(path)
  763. if err != nil {
  764. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  765. return
  766. }
  767. defer f.Close()
  768. data, err := ioutil.ReadAll(f)
  769. if err != nil {
  770. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  771. return
  772. }
  773. c.Writer.WriteString(string(data))
  774. }
  775. func DeleteOperateFileOrDir(c *gin.Context) {
  776. id := c.Param("id")
  777. if id == "0" {
  778. service.FileQueue = sync.Map{}
  779. service.OpStrArr = []string{}
  780. } else {
  781. service.FileQueue.Delete(id)
  782. tempList := []string{}
  783. for _, v := range service.OpStrArr {
  784. if v != id {
  785. tempList = append(tempList, v)
  786. }
  787. }
  788. service.OpStrArr = tempList
  789. }
  790. go service.MyService.Notify().SendFileOperateNotify(true)
  791. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  792. }
  793. func GetSize(c *gin.Context) {
  794. json := make(map[string]string)
  795. c.ShouldBind(&json)
  796. path := json["path"]
  797. size, err := file.GetFileOrDirSize(path)
  798. if err != nil {
  799. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  800. return
  801. }
  802. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: size})
  803. }
  804. func GetFileCount(c *gin.Context) {
  805. json := make(map[string]string)
  806. c.ShouldBind(&json)
  807. path := json["path"]
  808. list, err := ioutil.ReadDir(path)
  809. if err != nil {
  810. c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()})
  811. return
  812. }
  813. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: len(list)})
  814. }
  815. type CenterHandler struct {
  816. // 广播通道,有数据则循环每个用户广播出去
  817. broadcast chan []byte
  818. // 注册通道,有用户进来 则推到用户集合map中
  819. register chan *Client
  820. // 注销通道,有用户关闭连接 则将该用户剔出集合map中
  821. unregister chan *Client
  822. // 用户集合,每个用户本身也在跑两个协程,监听用户的读、写的状态
  823. clients map[string]*Client
  824. }
  825. type Client struct {
  826. handler *CenterHandler
  827. conn *websocket.Conn
  828. // 每个用户自己的循环跑起来的状态监控
  829. send chan []byte
  830. ID string `json:"id"`
  831. IP string `json:"ip"`
  832. Name service.Name `json:"name"`
  833. RtcSupported bool `json:"rtcSupported"`
  834. TimerId int `json:"timerId"`
  835. LastBeat time.Time `json:"lastBeat"`
  836. }
  837. type PeerModel struct {
  838. ID string `json:"id"`
  839. Name service.Name `json:"name"`
  840. RtcSupported bool `json:"rtcSupported"`
  841. }
  842. func ConnectWebSocket(c *gin.Context) {
  843. peerId := c.Query("peer")
  844. writer := c.Writer
  845. request := c.Request
  846. key := uuid.NewV4().String()
  847. //peerModel := service.MyService.Peer().GetPeerByUserAgent(c.Request.UserAgent())
  848. peerModel := model2.PeerDriveDBModel{}
  849. name := service.GetName(request)
  850. if conn, err = upgraderFile.Upgrade(writer, request, writer.Header()); err != nil {
  851. log.Println(err)
  852. return
  853. }
  854. client := &Client{handler: &handler, conn: conn, send: make(chan []byte, 256), ID: service.GetPeerId(request, key), IP: service.GetIP(request), Name: name, RtcSupported: true, TimerId: 0, LastBeat: time.Now()}
  855. if peerId != "" || len(peerModel.ID) > 0 {
  856. if len(peerModel.ID) == 0 {
  857. peerModel = service.MyService.Peer().GetPeerByID(peerId)
  858. }
  859. if len(peerModel.ID) > 0 {
  860. key = peerId
  861. client.ID = peerModel.ID
  862. client.Name = service.GetNameByDB(peerModel)
  863. }
  864. }
  865. var list = service.MyService.Peer().GetPeers()
  866. if len(peerModel.ID) == 0 {
  867. peerModel.ID = key
  868. peerModel.DisplayName = name.DisplayName
  869. peerModel.DeviceName = name.DeviceName
  870. peerModel.Model = name.Model
  871. peerModel.OS = name.OS
  872. peerModel.Browser = name.Browser
  873. peerModel.UserAgent = c.Request.UserAgent()
  874. peerModel.IP = client.IP
  875. service.MyService.Peer().CreatePeer(&peerModel)
  876. list = append(list, peerModel)
  877. }
  878. cookie := http.Cookie{
  879. Name: "peerid",
  880. Value: key,
  881. Path: "/",
  882. }
  883. http.SetCookie(writer, &cookie)
  884. if len(list) > 10 {
  885. kickoutList := []Client{}
  886. count := len(list) - 10
  887. for i := len(list) - 1; count > 0 && i > -1; i-- {
  888. if _, ok := handler.clients[list[i].ID]; !ok {
  889. count--
  890. kickoutList = append(kickoutList, Client{ID: list[i].ID, Name: service.GetNameByDB(list[i]), IP: list[i].IP})
  891. service.MyService.Peer().DeletePeer(list[i].ID)
  892. }
  893. }
  894. // if len(kickoutList) > 0 {
  895. // other := make(map[string]interface{})
  896. // other["type"] = "kickout"
  897. // other["peers"] = kickoutList
  898. // otherBy, err := json.Marshal(other)
  899. // fmt.Println(err)
  900. // client.handler.broadcast <- otherBy
  901. // }
  902. }
  903. list = service.MyService.Peer().GetPeers()
  904. if len(list) > 10 {
  905. fmt.Println("解决完后依然有溢出", list)
  906. }
  907. currentPeer := PeerModel{ID: client.ID, Name: client.Name, RtcSupported: client.RtcSupported}
  908. pmsg := make(map[string]interface{})
  909. pmsg["type"] = "peer-joined"
  910. pmsg["peer"] = currentPeer
  911. pby, err := json.Marshal(pmsg)
  912. fmt.Println(err)
  913. for _, v := range handler.clients {
  914. v.send <- pby
  915. }
  916. //client.handler.broadcast <- pby
  917. clients := []PeerModel{}
  918. for _, v := range client.handler.clients {
  919. if _, ok := handler.clients[v.ID]; ok {
  920. clients = append(clients, PeerModel{ID: v.ID, Name: v.Name, RtcSupported: v.RtcSupported})
  921. }
  922. }
  923. other := make(map[string]interface{})
  924. other["type"] = "peers"
  925. other["peers"] = clients
  926. otherBy, err := json.Marshal(other)
  927. fmt.Println(err)
  928. client.send <- otherBy
  929. // 推给监控中心注册到用户集合中
  930. handler.register <- client
  931. client.send <- []byte(`{"type":"ping"}`)
  932. data := make(map[string]string)
  933. data["displayName"] = client.Name.DisplayName
  934. data["deviceName"] = client.Name.DeviceName
  935. data["id"] = client.ID
  936. msg := make(map[string]interface{})
  937. msg["type"] = "display-name"
  938. msg["message"] = data
  939. by, _ := json.Marshal(msg)
  940. client.send <- by
  941. // 每个 client 都挂起 2 个新的协程,监控读、写状态
  942. go client.writePump()
  943. go client.readPump()
  944. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)})
  945. }
  946. var handler = CenterHandler{broadcast: make(chan []byte),
  947. register: make(chan *Client),
  948. unregister: make(chan *Client),
  949. clients: make(map[string]*Client)}
  950. func init() {
  951. // 起个协程跑起来,监听注册、注销、消息 3 个 channel
  952. go handler.monitoring()
  953. crontab := cron.New(cron.WithSeconds()) //精确到秒
  954. //定义定时器调用的任务函数
  955. task := func() {
  956. handler.broadcast <- []byte(`{"type":"ping"}`)
  957. }
  958. //定时任务
  959. spec := "*/30 * * * * ?" //cron表达式,每五秒一次
  960. // 添加定时任务,
  961. crontab.AddFunc(spec, task)
  962. // 启动定时器
  963. crontab.Start()
  964. }
  965. func (c *Client) writePump() {
  966. defer func() {
  967. c.handler.unregister <- c
  968. c.conn.Close()
  969. }()
  970. for {
  971. // 广播推过来的新消息,马上通过websocket推给自己
  972. message, _ := <-c.send
  973. fmt.Println("推送消息", string(message), "1")
  974. if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
  975. return
  976. }
  977. }
  978. }
  979. // 读,监听客户端是否有推送内容过来服务端
  980. func (c *Client) readPump() {
  981. defer func() {
  982. c.handler.unregister <- c
  983. c.conn.Close()
  984. }()
  985. for {
  986. // 循环监听是否该用户是否要发言
  987. _, message, err := c.conn.ReadMessage()
  988. if err != nil {
  989. // 异常关闭的处理
  990. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  991. log.Printf("error: %v", err)
  992. }
  993. c.handler.broadcast <- []byte(`{"type":"peer-left","peerId":"` + c.ID + `"}`)
  994. break
  995. }
  996. // 要的话,推给广播中心,广播中心再推给每个用户
  997. t := gjson.GetBytes(message, "type")
  998. if t.String() == "disconnect" {
  999. c.handler.unregister <- c
  1000. c.conn.Close()
  1001. // clients := []Client{}
  1002. // list := service.MyService.Peer().GetPeers()
  1003. // for _, v := range list {
  1004. // if _, ok := handler.clients[v.ID]; ok {
  1005. // clients = append(clients, *handler.clients[v.ID])
  1006. // } else {
  1007. // clients = append(clients, Client{ID: v.ID, Name: service.GetNameByDB(v), IP: v.IP, Offline: true})
  1008. // }
  1009. // }
  1010. // other := make(map[string]interface{})
  1011. // other["type"] = "peers"
  1012. // other["peers"] = clients
  1013. // otherBy, err := json.Marshal(other)
  1014. // fmt.Println(err)
  1015. c.handler.broadcast <- []byte(`{"type":"peer-left","peerId":"` + c.ID + `"}`)
  1016. //c.handler.broadcast <- otherBy
  1017. break
  1018. } else if t.String() == "pong" {
  1019. c.LastBeat = time.Now()
  1020. continue
  1021. }
  1022. to := gjson.GetBytes(message, "to")
  1023. if len(to.String()) > 0 {
  1024. toC := c.handler.clients[to.String()]
  1025. if toC == nil {
  1026. continue
  1027. }
  1028. data := map[string]interface{}{}
  1029. json.Unmarshal(message, &data)
  1030. data["sender"] = c.ID
  1031. delete(data, "to")
  1032. message, err = json.Marshal(data)
  1033. toC.send <- message
  1034. continue
  1035. }
  1036. c.handler.broadcast <- message
  1037. }
  1038. }
  1039. func (ch *CenterHandler) monitoring() {
  1040. for {
  1041. select {
  1042. // 注册,新用户连接过来会推进注册通道,这里接收推进来的用户指针
  1043. case client := <-ch.register:
  1044. ch.clients[client.ID] = client
  1045. // 注销,关闭连接或连接异常会将用户推出群聊
  1046. case client := <-ch.unregister:
  1047. delete(ch.clients, client.ID)
  1048. // 消息,监听到有新消息到来
  1049. case message := <-ch.broadcast:
  1050. println("消息来了,message:" + string(message))
  1051. // 推送给每个用户的通道,每个用户都有跑协程起了writePump的监听
  1052. for _, client := range ch.clients {
  1053. client.send <- message
  1054. }
  1055. }
  1056. }
  1057. }
  1058. func GetPeers(c *gin.Context) {
  1059. peers := service.MyService.Peer().GetPeers()
  1060. for i := 0; i < len(peers); i++ {
  1061. if _, ok := handler.clients[peers[i].ID]; ok {
  1062. peers[i].Online = true
  1063. }
  1064. }
  1065. c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: peers})
  1066. }