sync.go 21 KB


  1. // SiYuan - Refactor your thinking
  2. // Copyright (c) 2020-present, b3log.org
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package model
  17. import (
  18. "errors"
  19. "fmt"
  20. "net/http"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/88250/go-humanize"
  30. "github.com/88250/gulu"
  31. "github.com/88250/lute/html"
  32. "github.com/gorilla/websocket"
  33. "github.com/siyuan-note/dejavu"
  34. "github.com/siyuan-note/dejavu/cloud"
  35. "github.com/siyuan-note/logging"
  36. "github.com/siyuan-note/siyuan/kernel/cache"
  37. "github.com/siyuan-note/siyuan/kernel/conf"
  38. "github.com/siyuan-note/siyuan/kernel/filesys"
  39. "github.com/siyuan-note/siyuan/kernel/sql"
  40. "github.com/siyuan-note/siyuan/kernel/treenode"
  41. "github.com/siyuan-note/siyuan/kernel/util"
  42. )
  43. func SyncDataDownload() {
  44. defer logging.Recover()
  45. if !checkSync(false, false, true) {
  46. return
  47. }
  48. util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
  49. if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
  50. util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
  51. return
  52. }
  53. lockSync()
  54. defer unlockSync()
  55. now := util.CurrentTimeMillis()
  56. Conf.Sync.Synced = now
  57. err := syncRepoDownload()
  58. code := 1
  59. if nil != err {
  60. code = 2
  61. }
  62. util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
  63. }
  64. func SyncDataUpload() {
  65. defer logging.Recover()
  66. if !checkSync(false, false, true) {
  67. return
  68. }
  69. util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
  70. if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
  71. util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
  72. return
  73. }
  74. lockSync()
  75. defer unlockSync()
  76. now := util.CurrentTimeMillis()
  77. Conf.Sync.Synced = now
  78. err := syncRepoUpload()
  79. code := 1
  80. if nil != err {
  81. code = 2
  82. }
  83. util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
  84. return
  85. }
  86. var (
  87. syncSameCount = atomic.Int32{}
  88. autoSyncErrCount = 0
  89. fixSyncInterval = 5 * time.Minute
  90. syncPlanTimeLock = sync.Mutex{}
  91. syncPlanTime = time.Now().Add(fixSyncInterval)
  92. BootSyncSucc = -1 // -1:未执行,0:执行成功,1:执行失败
  93. ExitSyncSucc = -1
  94. )
  95. func SyncDataJob() {
  96. syncPlanTimeLock.Lock()
  97. if time.Now().Before(syncPlanTime) {
  98. syncPlanTimeLock.Unlock()
  99. return
  100. }
  101. syncPlanTimeLock.Unlock()
  102. SyncData(false)
  103. }
  104. func BootSyncData() {
  105. defer logging.Recover()
  106. if Conf.Sync.Perception {
  107. connectSyncWebSocket()
  108. }
  109. if !checkSync(true, false, false) {
  110. return
  111. }
  112. if !isProviderOnline(false) {
  113. BootSyncSucc = 1
  114. util.PushErrMsg(Conf.Language(76), 7000)
  115. return
  116. }
  117. lockSync()
  118. defer unlockSync()
  119. util.IncBootProgress(3, "Syncing data from the cloud...")
  120. BootSyncSucc = 0
  121. logging.LogInfof("sync before boot")
  122. now := util.CurrentTimeMillis()
  123. Conf.Sync.Synced = now
  124. util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
  125. err := bootSyncRepo()
  126. code := 1
  127. if nil != err {
  128. code = 2
  129. }
  130. util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
  131. return
  132. }
  133. func SyncData(byHand bool) {
  134. syncData(false, byHand)
  135. }
  136. func lockSync() {
  137. syncLock.Lock()
  138. isSyncing.Store(true)
  139. }
  140. func unlockSync() {
  141. isSyncing.Store(false)
  142. syncLock.Unlock()
  143. }
  144. func syncData(exit, byHand bool) {
  145. defer logging.Recover()
  146. if !checkSync(false, exit, byHand) {
  147. return
  148. }
  149. lockSync()
  150. defer unlockSync()
  151. util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
  152. if !exit && !isProviderOnline(byHand) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
  153. util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
  154. return
  155. }
  156. if exit {
  157. ExitSyncSucc = 0
  158. logging.LogInfof("sync before exit")
  159. msgId := util.PushMsg(Conf.Language(81), 1000*60*15)
  160. defer func() {
  161. util.PushClearMsg(msgId)
  162. }()
  163. }
  164. now := util.CurrentTimeMillis()
  165. Conf.Sync.Synced = now
  166. dataChanged, err := syncRepo(exit, byHand)
  167. code := 1
  168. if nil != err {
  169. code = 2
  170. }
  171. util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
  172. if nil == webSocketConn && Conf.Sync.Perception {
  173. // 如果 websocket 连接已经断开,则重新连接
  174. connectSyncWebSocket()
  175. }
  176. if 1 == Conf.Sync.Mode && nil != webSocketConn && Conf.Sync.Perception && dataChanged {
  177. // 如果处于自动同步模式且不是又 WS 触发的同步,则通知其他设备上的内核进行同步
  178. request := map[string]interface{}{
  179. "cmd": "synced",
  180. "synced": Conf.Sync.Synced,
  181. }
  182. if writeErr := webSocketConn.WriteJSON(request); nil != writeErr {
  183. logging.LogErrorf("write websocket message failed: %v", writeErr)
  184. }
  185. }
  186. return
  187. }
  188. func checkSync(boot, exit, byHand bool) bool {
  189. if 2 == Conf.Sync.Mode && !boot && !exit && !byHand { // 手动模式下只有启动和退出进行同步
  190. return false
  191. }
  192. if 3 == Conf.Sync.Mode && !byHand { // 完全手动模式下只有手动进行同步
  193. return false
  194. }
  195. if !Conf.Sync.Enabled {
  196. if byHand {
  197. util.PushMsg(Conf.Language(124), 5000)
  198. }
  199. return false
  200. }
  201. if !cloud.IsValidCloudDirName(Conf.Sync.CloudName) {
  202. if byHand {
  203. util.PushMsg(Conf.Language(123), 5000)
  204. }
  205. return false
  206. }
  207. switch Conf.Sync.Provider {
  208. case conf.ProviderSiYuan:
  209. if !IsSubscriber() {
  210. return false
  211. }
  212. case conf.ProviderWebDAV, conf.ProviderS3:
  213. if !IsPaidUser() {
  214. return false
  215. }
  216. }
  217. if isSyncing.Load() {
  218. logging.LogWarnf("sync is in progress")
  219. planSyncAfter(fixSyncInterval)
  220. return false
  221. }
  222. if 7 < autoSyncErrCount && !byHand {
  223. logging.LogErrorf("failed to auto-sync too many times, delay auto-sync 64 minutes")
  224. util.PushErrMsg(Conf.Language(125), 1000*60*60)
  225. planSyncAfter(64 * time.Minute)
  226. return false
  227. }
  228. return true
  229. }
  230. // incReindex 增量重建索引。
  231. func incReindex(upserts, removes []string) (upsertRootIDs, removeRootIDs []string) {
  232. upsertRootIDs = []string{}
  233. removeRootIDs = []string{}
  234. util.IncBootProgress(3, "Sync reindexing...")
  235. removeRootIDs = removeIndexes(removes) // 先执行 remove,否则移动文档时 upsert 会被忽略,导致未被索引
  236. upsertRootIDs = upsertIndexes(upserts)
  237. if 1 > len(removeRootIDs) {
  238. removeRootIDs = []string{}
  239. }
  240. if 1 > len(upsertRootIDs) {
  241. upsertRootIDs = []string{}
  242. }
  243. return
  244. }
  245. func removeIndexes(removeFilePaths []string) (removeRootIDs []string) {
  246. bootProgressPart := int32(10 / float64(len(removeFilePaths)))
  247. for _, removeFile := range removeFilePaths {
  248. if !strings.HasSuffix(removeFile, ".sy") {
  249. continue
  250. }
  251. id := strings.TrimSuffix(filepath.Base(removeFile), ".sy")
  252. removeRootIDs = append(removeRootIDs, id)
  253. block := treenode.GetBlockTree(id)
  254. if nil != block {
  255. msg := fmt.Sprintf(Conf.Language(39), block.RootID)
  256. util.IncBootProgress(bootProgressPart, msg)
  257. util.PushStatusBar(msg)
  258. bts := treenode.GetBlockTreesByRootID(block.RootID)
  259. for _, b := range bts {
  260. cache.RemoveBlockIAL(b.ID)
  261. }
  262. cache.RemoveDocIAL(block.Path)
  263. treenode.RemoveBlockTreesByRootID(block.RootID)
  264. sql.RemoveTreeQueue(block.RootID)
  265. }
  266. }
  267. if 1 > len(removeRootIDs) {
  268. removeRootIDs = []string{}
  269. }
  270. return
  271. }
  272. func upsertIndexes(upsertFilePaths []string) (upsertRootIDs []string) {
  273. luteEngine := util.NewLute()
  274. bootProgressPart := int32(10 / float64(len(upsertFilePaths)))
  275. for _, upsertFile := range upsertFilePaths {
  276. if !strings.HasSuffix(upsertFile, ".sy") {
  277. continue
  278. }
  279. upsertFile = filepath.ToSlash(upsertFile)
  280. if strings.HasPrefix(upsertFile, "/") {
  281. upsertFile = upsertFile[1:]
  282. }
  283. idx := strings.Index(upsertFile, "/")
  284. if 0 > idx {
  285. // .sy 直接出现在 data 文件夹下,没有出现在笔记本文件夹下的情况
  286. continue
  287. }
  288. box := upsertFile[:idx]
  289. p := strings.TrimPrefix(upsertFile, box)
  290. msg := fmt.Sprintf(Conf.Language(40), strings.TrimSuffix(path.Base(p), ".sy"))
  291. util.IncBootProgress(bootProgressPart, msg)
  292. util.PushStatusBar(msg)
  293. tree, err0 := filesys.LoadTree(box, p, luteEngine)
  294. if nil != err0 {
  295. continue
  296. }
  297. treenode.IndexBlockTree(tree)
  298. sql.UpsertTreeQueue(tree)
  299. bts := treenode.GetBlockTreesByRootID(tree.ID)
  300. for _, b := range bts {
  301. cache.RemoveBlockIAL(b.ID)
  302. }
  303. cache.RemoveDocIAL(tree.Path)
  304. upsertRootIDs = append(upsertRootIDs, tree.Root.ID)
  305. }
  306. if 1 > len(upsertRootIDs) {
  307. upsertRootIDs = []string{}
  308. }
  309. return
  310. }
  311. func SetCloudSyncDir(name string) {
  312. if !cloud.IsValidCloudDirName(name) {
  313. util.PushErrMsg(Conf.Language(37), 5000)
  314. return
  315. }
  316. if Conf.Sync.CloudName == name {
  317. return
  318. }
  319. Conf.Sync.CloudName = name
  320. Conf.Save()
  321. }
  322. func SetSyncGenerateConflictDoc(b bool) {
  323. Conf.Sync.GenerateConflictDoc = b
  324. Conf.Save()
  325. return
  326. }
  327. func SetSyncEnable(b bool) {
  328. Conf.Sync.Enabled = b
  329. Conf.Save()
  330. return
  331. }
  332. func SetSyncPerception(b bool) {
  333. if util.ContainerDocker == util.Container {
  334. b = false
  335. }
  336. Conf.Sync.Perception = b
  337. Conf.Save()
  338. if b {
  339. connectSyncWebSocket()
  340. } else {
  341. closeSyncWebSocket()
  342. }
  343. return
  344. }
  345. func SetSyncMode(mode int) {
  346. Conf.Sync.Mode = mode
  347. Conf.Save()
  348. return
  349. }
  350. func SetSyncProvider(provider int) (err error) {
  351. Conf.Sync.Provider = provider
  352. Conf.Save()
  353. return
  354. }
  355. func SetSyncProviderS3(s3 *conf.S3) (err error) {
  356. s3.Endpoint = strings.TrimSpace(s3.Endpoint)
  357. s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint)
  358. s3.AccessKey = strings.TrimSpace(s3.AccessKey)
  359. s3.SecretKey = strings.TrimSpace(s3.SecretKey)
  360. s3.Bucket = strings.TrimSpace(s3.Bucket)
  361. s3.Region = strings.TrimSpace(s3.Region)
  362. s3.Timeout = util.NormalizeTimeout(s3.Timeout)
  363. if !cloud.IsValidCloudDirName(s3.Bucket) {
  364. util.PushErrMsg(Conf.Language(37), 5000)
  365. return
  366. }
  367. Conf.Sync.S3 = s3
  368. Conf.Save()
  369. return
  370. }
  371. func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) {
  372. webdav.Endpoint = strings.TrimSpace(webdav.Endpoint)
  373. webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint)
  374. // 不支持配置坚果云 WebDAV 进行同步 https://github.com/siyuan-note/siyuan/issues/7657
  375. if strings.Contains(strings.ToLower(webdav.Endpoint), "dav.jianguoyun.com") {
  376. err = errors.New(Conf.Language(194))
  377. return
  378. }
  379. webdav.Username = strings.TrimSpace(webdav.Username)
  380. webdav.Password = strings.TrimSpace(webdav.Password)
  381. webdav.Timeout = util.NormalizeTimeout(webdav.Timeout)
  382. Conf.Sync.WebDAV = webdav
  383. Conf.Save()
  384. return
  385. }
  386. var (
  387. syncLock = sync.Mutex{}
  388. isSyncing = atomic.Bool{}
  389. )
  390. func CreateCloudSyncDir(name string) (err error) {
  391. if conf.ProviderSiYuan != Conf.Sync.Provider {
  392. err = errors.New(Conf.Language(131))
  393. return
  394. }
  395. name = strings.TrimSpace(name)
  396. name = gulu.Str.RemoveInvisible(name)
  397. if !cloud.IsValidCloudDirName(name) {
  398. return errors.New(Conf.Language(37))
  399. }
  400. repo, err := newRepository()
  401. if nil != err {
  402. return
  403. }
  404. err = repo.CreateCloudRepo(name)
  405. if nil != err {
  406. err = errors.New(formatRepoErrorMsg(err))
  407. return
  408. }
  409. return
  410. }
  411. func RemoveCloudSyncDir(name string) (err error) {
  412. if conf.ProviderSiYuan != Conf.Sync.Provider {
  413. err = errors.New(Conf.Language(131))
  414. return
  415. }
  416. msgId := util.PushMsg(Conf.Language(116), 15000)
  417. if "" == name {
  418. return
  419. }
  420. repo, err := newRepository()
  421. if nil != err {
  422. return
  423. }
  424. err = repo.RemoveCloudRepo(name)
  425. if nil != err {
  426. err = errors.New(formatRepoErrorMsg(err))
  427. return
  428. }
  429. util.PushClearMsg(msgId)
  430. time.Sleep(500 * time.Millisecond)
  431. if Conf.Sync.CloudName == name {
  432. Conf.Sync.CloudName = "main"
  433. Conf.Save()
  434. util.PushMsg(Conf.Language(155), 5000)
  435. }
  436. return
  437. }
  438. func ListCloudSyncDir() (syncDirs []*Sync, hSize string, err error) {
  439. syncDirs = []*Sync{}
  440. var dirs []*cloud.Repo
  441. var size int64
  442. repo, err := newRepository()
  443. if nil != err {
  444. return
  445. }
  446. dirs, size, err = repo.GetCloudRepos()
  447. if nil != err {
  448. err = errors.New(formatRepoErrorMsg(err))
  449. return
  450. }
  451. if 1 > len(dirs) {
  452. dirs = append(dirs, &cloud.Repo{
  453. Name: "main",
  454. Size: 0,
  455. Updated: time.Now().Format("2006-01-02 15:04:05"),
  456. })
  457. }
  458. for _, d := range dirs {
  459. dirSize := d.Size
  460. sync := &Sync{
  461. Size: dirSize,
  462. HSize: "-",
  463. Updated: d.Updated,
  464. CloudName: d.Name,
  465. }
  466. if conf.ProviderSiYuan == Conf.Sync.Provider {
  467. sync.HSize = humanize.BytesCustomCeil(uint64(dirSize), 2)
  468. }
  469. syncDirs = append(syncDirs, sync)
  470. }
  471. hSize = "-"
  472. if conf.ProviderSiYuan == Conf.Sync.Provider {
  473. hSize = humanize.BytesCustomCeil(uint64(size), 2)
  474. }
  475. return
  476. }
  477. func formatRepoErrorMsg(err error) string {
  478. msg := html.EscapeString(err.Error())
  479. if errors.Is(err, cloud.ErrCloudAuthFailed) {
  480. msg = Conf.Language(31)
  481. } else if errors.Is(err, cloud.ErrCloudObjectNotFound) {
  482. msg = Conf.Language(129)
  483. } else if errors.Is(err, dejavu.ErrLockCloudFailed) {
  484. msg = Conf.Language(188)
  485. } else if errors.Is(err, dejavu.ErrCloudLocked) {
  486. msg = Conf.Language(189)
  487. } else if errors.Is(err, dejavu.ErrRepoFatal) {
  488. msg = Conf.Language(23)
  489. } else if errors.Is(err, cloud.ErrSystemTimeIncorrect) {
  490. msg = Conf.Language(195)
  491. } else if errors.Is(err, cloud.ErrDeprecatedVersion) {
  492. msg = Conf.Language(212)
  493. } else if errors.Is(err, cloud.ErrCloudCheckFailed) {
  494. msg = Conf.Language(213)
  495. } else if errors.Is(err, cloud.ErrCloudServiceUnavailable) {
  496. msg = Conf.language(219)
  497. } else {
  498. msgLowerCase := strings.ToLower(msg)
  499. if strings.Contains(msgLowerCase, "permission denied") || strings.Contains(msg, "access is denied") {
  500. msg = Conf.Language(33)
  501. } else if strings.Contains(msgLowerCase, "device or resource busy") || strings.Contains(msg, "is being used by another") {
  502. msg = fmt.Sprintf(Conf.Language(85), err)
  503. } else if strings.Contains(msgLowerCase, "cipher: message authentication failed") {
  504. msg = Conf.Language(135)
  505. } else if strings.Contains(msgLowerCase, "no such host") || strings.Contains(msgLowerCase, "connection failed") || strings.Contains(msgLowerCase, "hostname resolution") || strings.Contains(msgLowerCase, "No address associated with hostname") {
  506. msg = Conf.Language(24)
  507. } else if strings.Contains(msgLowerCase, "net/http: request canceled while waiting for connection") || strings.Contains(msgLowerCase, "exceeded while awaiting") || strings.Contains(msgLowerCase, "context deadline exceeded") || strings.Contains(msgLowerCase, "timeout") || strings.Contains(msgLowerCase, "context cancellation while reading body") {
  508. msg = Conf.Language(24)
  509. } else if strings.Contains(msgLowerCase, "connection was") || strings.Contains(msgLowerCase, "reset by peer") || strings.Contains(msgLowerCase, "refused") || strings.Contains(msgLowerCase, "socket") || strings.Contains(msgLowerCase, "closed idle connection") || strings.Contains(msgLowerCase, "eof") {
  510. msg = Conf.Language(28)
  511. }
  512. }
  513. msg += " (Provider: " + conf.ProviderToStr(Conf.Sync.Provider) + ")"
  514. return msg
  515. }
  516. func getSyncIgnoreLines() (ret []string) {
  517. ignore := filepath.Join(util.DataDir, ".siyuan", "syncignore")
  518. err := os.MkdirAll(filepath.Dir(ignore), 0755)
  519. if nil != err {
  520. return
  521. }
  522. if !gulu.File.IsExist(ignore) {
  523. if err = gulu.File.WriteFileSafer(ignore, nil, 0644); nil != err {
  524. logging.LogErrorf("create syncignore [%s] failed: %s", ignore, err)
  525. return
  526. }
  527. }
  528. data, err := os.ReadFile(ignore)
  529. if nil != err {
  530. logging.LogErrorf("read syncignore [%s] failed: %s", ignore, err)
  531. return
  532. }
  533. dataStr := string(data)
  534. dataStr = strings.ReplaceAll(dataStr, "\r\n", "\n")
  535. ret = strings.Split(dataStr, "\n")
  536. // 默认忽略帮助文档
  537. ret = append(ret, "20210808180117-6v0mkxr/**/*")
  538. ret = append(ret, "20210808180117-czj9bvb/**/*")
  539. ret = append(ret, "20211226090932-5lcq56f/**/*")
  540. ret = append(ret, "20240530133126-axarxgx/**/*")
  541. ret = gulu.Str.RemoveDuplicatedElem(ret)
  542. return
  543. }
  544. func IncSync() {
  545. syncSameCount.Store(0)
  546. planSyncAfter(30 * time.Second)
  547. }
  548. func planSyncAfter(d time.Duration) {
  549. syncPlanTimeLock.Lock()
  550. syncPlanTime = time.Now().Add(d)
  551. syncPlanTimeLock.Unlock()
  552. }
  553. func isProviderOnline(byHand bool) (ret bool) {
  554. checkURL := util.GetCloudSyncServer()
  555. skipTlsVerify := false
  556. switch Conf.Sync.Provider {
  557. case conf.ProviderSiYuan:
  558. case conf.ProviderS3:
  559. checkURL = Conf.Sync.S3.Endpoint
  560. skipTlsVerify = Conf.Sync.S3.SkipTlsVerify
  561. case conf.ProviderWebDAV:
  562. checkURL = Conf.Sync.WebDAV.Endpoint
  563. skipTlsVerify = Conf.Sync.WebDAV.SkipTlsVerify
  564. default:
  565. logging.LogWarnf("unknown provider: %d", Conf.Sync.Provider)
  566. return false
  567. }
  568. if ret = util.IsOnline(checkURL, skipTlsVerify); !ret {
  569. if 1 > autoSyncErrCount || byHand {
  570. util.PushErrMsg(Conf.Language(76)+" (Provider: "+conf.ProviderToStr(Conf.Sync.Provider)+")", 5000)
  571. }
  572. if !byHand {
  573. planSyncAfter(fixSyncInterval)
  574. autoSyncErrCount++
  575. }
  576. }
  577. return
  578. }
  579. var (
  580. webSocketConn *websocket.Conn
  581. webSocketConnLock = sync.Mutex{}
  582. )
  583. type OnlineKernel struct {
  584. ID string `json:"id"`
  585. Hostname string `json:"hostname"`
  586. OS string `json:"os"`
  587. Ver string `json:"ver"`
  588. }
  589. var (
  590. onlineKernels []*OnlineKernel
  591. onlineKernelsLock = sync.Mutex{}
  592. )
  593. func GetOnlineKernels() (ret []*OnlineKernel) {
  594. ret = []*OnlineKernel{}
  595. onlineKernelsLock.Lock()
  596. tmp := onlineKernels
  597. onlineKernelsLock.Unlock()
  598. for _, kernel := range tmp {
  599. if kernel.ID == KernelID {
  600. continue
  601. }
  602. ret = append(ret, kernel)
  603. }
  604. return
  605. }
  606. var closedSyncWebSocket = atomic.Bool{}
  607. func closeSyncWebSocket() {
  608. defer logging.Recover()
  609. webSocketConnLock.Lock()
  610. defer webSocketConnLock.Unlock()
  611. if nil != webSocketConn {
  612. webSocketConn.Close()
  613. webSocketConn = nil
  614. closedSyncWebSocket.Store(true)
  615. }
  616. logging.LogInfof("sync websocket closed")
  617. }
  618. func connectSyncWebSocket() {
  619. defer logging.Recover()
  620. if !Conf.Sync.Enabled || !IsSubscriber() || conf.ProviderSiYuan != Conf.Sync.Provider {
  621. return
  622. }
  623. if util.ContainerDocker == util.Container {
  624. return
  625. }
  626. webSocketConnLock.Lock()
  627. defer webSocketConnLock.Unlock()
  628. if nil != webSocketConn {
  629. return
  630. }
  631. //logging.LogInfof("connecting sync websocket...")
  632. var dialErr error
  633. webSocketConn, dialErr = dialSyncWebSocket()
  634. if nil != dialErr {
  635. logging.LogWarnf("connect sync websocket failed: %s", dialErr)
  636. return
  637. }
  638. logging.LogInfof("sync websocket connected")
  639. webSocketConn.SetCloseHandler(func(code int, text string) error {
  640. logging.LogWarnf("sync websocket closed: %d, %s", code, text)
  641. return nil
  642. })
  643. go func() {
  644. defer logging.Recover()
  645. for {
  646. result := gulu.Ret.NewResult()
  647. if readErr := webSocketConn.ReadJSON(&result); nil != readErr {
  648. time.Sleep(1 * time.Second)
  649. if closedSyncWebSocket.Load() {
  650. return
  651. }
  652. reconnected := false
  653. for retries := 0; retries < 7; retries++ {
  654. time.Sleep(7 * time.Second)
  655. if nil == Conf.GetUser() {
  656. return
  657. }
  658. //logging.LogInfof("reconnecting sync websocket...")
  659. webSocketConn, dialErr = dialSyncWebSocket()
  660. if nil != dialErr {
  661. logging.LogWarnf("reconnect sync websocket failed: %s", dialErr)
  662. continue
  663. }
  664. logging.LogInfof("sync websocket reconnected")
  665. reconnected = true
  666. break
  667. }
  668. if !reconnected {
  669. logging.LogWarnf("reconnect sync websocket failed, do not retry")
  670. webSocketConn = nil
  671. return
  672. }
  673. continue
  674. }
  675. logging.LogInfof("sync websocket message: %v", result)
  676. data := result.Data.(map[string]interface{})
  677. switch data["cmd"].(string) {
  678. case "synced":
  679. syncData(false, false)
  680. case "kernels":
  681. onlineKernelsLock.Lock()
  682. onlineKernels = []*OnlineKernel{}
  683. for _, kernel := range data["kernels"].([]interface{}) {
  684. kernelMap := kernel.(map[string]interface{})
  685. onlineKernels = append(onlineKernels, &OnlineKernel{
  686. ID: kernelMap["id"].(string),
  687. Hostname: kernelMap["hostname"].(string),
  688. OS: kernelMap["os"].(string),
  689. Ver: kernelMap["ver"].(string),
  690. })
  691. }
  692. onlineKernelsLock.Unlock()
  693. }
  694. }
  695. }()
  696. }
  697. var KernelID = gulu.Rand.String(7)
  698. func dialSyncWebSocket() (c *websocket.Conn, err error) {
  699. endpoint := util.GetCloudWebSocketServer() + "/apis/siyuan/dejavu/ws"
  700. header := http.Header{
  701. "User-Agent": []string{util.UserAgent},
  702. "x-siyuan-uid": []string{Conf.GetUser().UserId},
  703. "x-siyuan-kernel": []string{KernelID},
  704. "x-siyuan-ver": []string{util.Ver},
  705. "x-siyuan-os": []string{runtime.GOOS},
  706. "x-siyuan-hostname": []string{util.GetDeviceName()},
  707. "x-siyuan-repo": []string{Conf.Sync.CloudName},
  708. }
  709. c, _, err = websocket.DefaultDialer.Dial(endpoint, header)
  710. if nil == err {
  711. closedSyncWebSocket.Store(false)
  712. }
  713. return
  714. }