فهرست منبع

Merge remote-tracking branch 'origin/dev' into dev

Vanessa 2 سال پیش
والد
کامیت
49d038f411
2فایلهای تغییر یافته به همراه83 افزوده شده و 14 حذف شده
  1. 2 1
      kernel/model/conf.go
  2. 81 13
      kernel/model/sync.go

+ 2 - 1
kernel/model/conf.go

@@ -469,7 +469,7 @@ func Close(force bool, execInstallPkg int) (exitCode int) {
 	if !force {
 		if Conf.Sync.Enabled && 3 != Conf.Sync.Mode &&
 			((IsSubscriber() && conf.ProviderSiYuan == Conf.Sync.Provider) || conf.ProviderSiYuan != Conf.Sync.Provider) {
-			syncData(true, false)
+			syncData(true, false, false)
 			if 0 != ExitSyncSucc {
 				exitCode = 1
 				return
@@ -509,6 +509,7 @@ func Close(force bool, execInstallPkg int) (exitCode int) {
 		time.Sleep(4 * time.Second)
 	}
 	logging.LogInfof("exited kernel")
+	webSocketConn.Close()
 	util.WebSocketServer.Close()
 	go func() {
 		time.Sleep(500 * time.Millisecond)

+ 81 - 13
kernel/model/sync.go

@@ -23,6 +23,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
+	"runtime"
 	"strings"
 	"sync"
 	"time"
@@ -176,10 +177,10 @@ func BootSyncData() {
 }
 
 func SyncData(byHand bool) {
-	syncData(false, byHand)
+	syncData(false, byHand, false)
 }
 
-func syncData(exit, byHand bool) {
+func syncData(exit, byHand, byWebSocket bool) {
 	defer logging.Recover()
 
 	if !checkSync(false, exit, byHand) {
@@ -219,6 +220,23 @@ func syncData(exit, byHand bool) {
 		code = 2
 	}
 	util.BroadcastByType("main", "syncing", code, msg, nil)
+
+	if nil == webSocketConn {
+		// 如果 websocket 连接已经断开,则重新连接
+		connectSyncWebSocket()
+	}
+
+	if 1 == Conf.Sync.Mode && !byWebSocket {
+		// 如果处于自动同步模式且不是又 WS 触发的同步,则通知其他设备上的内核进行同步
+		request := map[string]interface{}{
+			"cmd":    "synced",
+			"synced": Conf.Sync.Synced,
+		}
+		if writeErr := webSocketConn.WriteJSON(request); nil != writeErr {
+			logging.LogErrorf("write websocket message failed: %v", writeErr)
+		}
+	}
+
 	return
 }
 
@@ -600,6 +618,23 @@ func isProviderOnline(byHand bool) (ret bool) {
 	return
 }
 
+var (
+	webSocketConn     *websocket.Conn
+	webSocketConnLock = sync.Mutex{}
+)
+
+type OnlineKernel struct {
+	ID       string `json:"id"`
+	Hostname string `json:"hostname"`
+	OS       string `json:"os"`
+	Ver      string `json:"ver"`
+}
+
+var (
+	onlineKernels     []*OnlineKernel
+	onlineKernelsLock = sync.Mutex{}
+)
+
 func connectSyncWebSocket() {
 	defer logging.Recover()
 
@@ -607,19 +642,27 @@ func connectSyncWebSocket() {
 		return
 	}
 
+	webSocketConnLock.Lock()
+	defer webSocketConnLock.Unlock()
+
+	if nil != webSocketConn {
+		return
+	}
+
 	if "1602224134353" != Conf.User.UserId {
 		return
 	}
 
 	logging.LogInfof("connecting sync websocket...")
-	c, dialErr := dialSyncWebSocket()
+	var dialErr error
+	webSocketConn, dialErr = dialSyncWebSocket()
 	if nil != dialErr {
 		logging.LogWarnf("connect sync websocket failed: %s", dialErr)
 		return
 	}
 
 	logging.LogInfof("sync websocket connected")
-	c.SetCloseHandler(func(code int, text string) error {
+	webSocketConn.SetCloseHandler(func(code int, text string) error {
 		logging.LogWarnf("sync websocket closed: %d, %s", code, text)
 		return nil
 	})
@@ -628,13 +671,13 @@ func connectSyncWebSocket() {
 		defer logging.Recover()
 
 		for {
-			result := map[string]interface{}{}
-			if readErr := c.ReadJSON(&result); nil != readErr {
+			result := gulu.Ret.NewResult()
+			if readErr := webSocketConn.ReadJSON(&result); nil != readErr {
 				reconnected := false
 				for retries := 0; retries < 7; retries++ {
 					time.Sleep(7 * time.Second)
 					logging.LogWarnf("reconnecting sync websocket...")
-					c, dialErr = dialSyncWebSocket()
+					webSocketConn, dialErr = dialSyncWebSocket()
 					if nil != dialErr {
 						logging.LogWarnf("reconnect sync websocket failed: %s", dialErr)
 						continue
@@ -646,6 +689,7 @@ func connectSyncWebSocket() {
 				}
 				if !reconnected {
 					logging.LogWarnf("reconnect sync websocket failed, do not retry")
+					webSocketConn = nil
 					return
 				}
 
@@ -653,6 +697,26 @@ func connectSyncWebSocket() {
 			}
 
 			logging.LogInfof("sync websocket message: %v", result)
+			data := result.Data.(map[string]interface{})
+			switch data["cmd"].(string) {
+			case "synced":
+				syncData(false, false, true)
+			case "kernels":
+				onlineKernelsLock.Lock()
+
+				onlineKernels = []*OnlineKernel{}
+				for _, kernel := range data["kernels"].([]interface{}) {
+					kernelMap := kernel.(map[string]interface{})
+					onlineKernels = append(onlineKernels, &OnlineKernel{
+						ID:       kernelMap["id"].(string),
+						Hostname: kernelMap["hostname"].(string),
+						OS:       kernelMap["os"].(string),
+						Ver:      kernelMap["ver"].(string),
+					})
+				}
+
+				onlineKernelsLock.Unlock()
+			}
 		}
 	}()
 
@@ -673,14 +737,18 @@ func connectSyncWebSocket() {
 	}()
 }
 
+var kernelID = gulu.Rand.String(7)
+
 func dialSyncWebSocket() (c *websocket.Conn, err error) {
-	path := "/apis/siyuan/dejavu/ws"
-	endpoint := util.AliyunWebSocketServer + path
-	//endpoint := "ws://127.0.0.1:64388" + path
+	//endpoint := "ws://127.0.0.1:64388" + "/apis/siyuan/dejavu/ws"
+	endpoint := util.AliyunWebSocketServer + "/apis/siyuan/dejavu/ws"
 	header := http.Header{
-		"x-siyuan-uid":    []string{Conf.User.UserId},
-		"x-siyuan-kernel": []string{gulu.Rand.String(7)},
-		"x-siyuan-ver":    []string{util.Ver},
+		"x-siyuan-uid":      []string{Conf.User.UserId},
+		"x-siyuan-kernel":   []string{kernelID},
+		"x-siyuan-ver":      []string{util.Ver},
+		"x-siyuan-os":       []string{runtime.GOOS},
+		"x-siyuan-hostname": []string{util.GetDeviceName()},
+		"x-siyuan-repo":     []string{Conf.Sync.CloudName},
 	}
 	c, _, err = websocket.DefaultDialer.Dial(endpoint, header)
 	return