node_record.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package analytic
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/helper"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/gorilla/websocket"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // NodeRecordManager manages the node status retrieval process
  15. type NodeRecordManager struct {
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. wg sync.WaitGroup
  19. mu sync.Mutex
  20. }
  21. // RetryConfig holds configuration for retry logic
  22. type RetryConfig struct {
  23. BaseInterval time.Duration // Base retry interval
  24. MaxInterval time.Duration // Maximum retry interval
  25. MaxRetries int // Maximum consecutive failures before giving up temporarily
  26. BackoffMultiple float64 // Multiplier for exponential backoff
  27. ResetAfter time.Duration // Time to reset failure count if successful
  28. }
  29. // Default retry configuration
  30. var defaultRetryConfig = RetryConfig{
  31. BaseInterval: 5 * time.Second, // Start with 5 seconds
  32. MaxInterval: 5 * time.Minute, // Max 5 minutes between retries
  33. MaxRetries: 10, // Max 10 consecutive failures
  34. BackoffMultiple: 1.5, // 1.5x backoff each time
  35. ResetAfter: 30 * time.Second, // Reset failure count after 30s of success
  36. }
  37. // NodeRetryState tracks retry state for each node
  38. type NodeRetryState struct {
  39. FailureCount int
  40. LastRetryTime time.Time
  41. LastSuccessTime time.Time
  42. NextRetryTime time.Time
  43. }
  44. var (
  45. retryStates = make(map[uint64]*NodeRetryState)
  46. retryMutex sync.Mutex
  47. )
  48. // getRetryState gets or creates retry state for a node
  49. func getRetryState(envID uint64) *NodeRetryState {
  50. retryMutex.Lock()
  51. defer retryMutex.Unlock()
  52. if state, exists := retryStates[envID]; exists {
  53. return state
  54. }
  55. state := &NodeRetryState{
  56. FailureCount: 0,
  57. LastSuccessTime: time.Now(),
  58. NextRetryTime: time.Now(),
  59. }
  60. retryStates[envID] = state
  61. return state
  62. }
  63. // updateNodeStatus safely updates node status with proper timestamp
  64. func updateNodeStatus(envID uint64, status bool, reason string) {
  65. mutex.Lock()
  66. defer mutex.Unlock()
  67. now := time.Now()
  68. if NodeMap[envID] != nil {
  69. NodeMap[envID].Status = status
  70. NodeMap[envID].ResponseAt = now
  71. logger.Debugf("updateNodeStatus: Node[%d] status updated to %t (%s) at %v",
  72. envID, status, reason, now)
  73. } else {
  74. logger.Debugf("updateNodeStatus: Warning - Node[%d] not found in NodeMap", envID)
  75. }
  76. }
  77. // calculateNextRetryInterval calculates the next retry interval using exponential backoff
  78. func calculateNextRetryInterval(state *NodeRetryState, config RetryConfig) time.Duration {
  79. if state.FailureCount == 0 {
  80. return config.BaseInterval
  81. }
  82. interval := config.BaseInterval
  83. for i := 0; i < state.FailureCount-1; i++ {
  84. interval = time.Duration(float64(interval) * config.BackoffMultiple)
  85. if interval > config.MaxInterval {
  86. interval = config.MaxInterval
  87. break
  88. }
  89. }
  90. logger.Debugf("calculateNextRetryInterval: FailureCount=%d, NextInterval=%v",
  91. state.FailureCount, interval)
  92. return interval
  93. }
  94. // shouldRetry determines if we should retry connection for a node
  95. func shouldRetry(envID uint64, config RetryConfig) bool {
  96. state := getRetryState(envID)
  97. now := time.Now()
  98. // Check if we've exceeded max retries
  99. if state.FailureCount >= config.MaxRetries {
  100. // If we've been successful recently, reset the failure count
  101. if now.Sub(state.LastSuccessTime) < config.ResetAfter {
  102. logger.Debugf("shouldRetry: Resetting failure count for node %d due to recent success", envID)
  103. state.FailureCount = 0
  104. state.NextRetryTime = now
  105. return true
  106. }
  107. // Too many failures, back off for a longer period
  108. if now.Before(state.NextRetryTime) {
  109. logger.Debugf("shouldRetry: Node %d in backoff period until %v (failures: %d)",
  110. envID, state.NextRetryTime, state.FailureCount)
  111. return false
  112. }
  113. // Reset after long backoff period
  114. logger.Debugf("shouldRetry: Resetting failure count for node %d after backoff period", envID)
  115. state.FailureCount = config.MaxRetries / 2 // Start from middle to avoid immediate max again
  116. state.NextRetryTime = now
  117. return true
  118. }
  119. // Normal retry logic
  120. if now.Before(state.NextRetryTime) {
  121. return false
  122. }
  123. return true
  124. }
  125. // markConnectionFailure marks a connection failure and calculates next retry time
  126. func markConnectionFailure(envID uint64, config RetryConfig, err error) {
  127. state := getRetryState(envID)
  128. now := time.Now()
  129. state.FailureCount++
  130. state.LastRetryTime = now
  131. nextInterval := calculateNextRetryInterval(state, config)
  132. state.NextRetryTime = now.Add(nextInterval)
  133. logger.Debugf("markConnectionFailure: Node %d failed (count: %d), next retry at %v, error: %v",
  134. envID, state.FailureCount, state.NextRetryTime, err)
  135. // Update node status to offline
  136. updateNodeStatus(envID, false, "connection_failed")
  137. }
  138. // markConnectionSuccess marks a successful connection
  139. func markConnectionSuccess(envID uint64) {
  140. state := getRetryState(envID)
  141. now := time.Now()
  142. state.FailureCount = 0
  143. state.LastSuccessTime = now
  144. state.NextRetryTime = now // Can retry immediately if connection drops
  145. logger.Debugf("markConnectionSuccess: Node %d connection successful, failure count reset", envID)
  146. // Status will be updated in nodeAnalyticRecord when we receive actual data
  147. }
  148. // logCurrentNodeStatus logs current node status for debugging
  149. func logCurrentNodeStatus(prefix string) {
  150. mutex.Lock()
  151. defer mutex.Unlock()
  152. if NodeMap == nil {
  153. logger.Debugf("%s: NodeMap is nil", prefix)
  154. return
  155. }
  156. logger.Debugf("%s: Current NodeMap contains %d nodes", prefix, len(NodeMap))
  157. for envID, node := range NodeMap {
  158. if node == nil {
  159. logger.Debugf("%s: Node[%d] is nil", prefix, envID)
  160. continue
  161. }
  162. // Also log retry state
  163. retryMutex.Lock()
  164. state := retryStates[envID]
  165. retryMutex.Unlock()
  166. retryInfo := "no_retry_state"
  167. if state != nil {
  168. retryInfo = fmt.Sprintf("failures=%d,next_retry=%v",
  169. state.FailureCount, state.NextRetryTime)
  170. }
  171. logger.Debugf("%s: Node[%d] - Status: %t, ResponseAt: %v, RetryState: %s",
  172. prefix, envID, node.Status, node.ResponseAt, retryInfo)
  173. }
  174. }
  175. // NewNodeRecordManager creates a new NodeRecordManager with the provided context
  176. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  177. logger.Debug("Creating new NodeRecordManager")
  178. ctx, cancel := context.WithCancel(parentCtx)
  179. return &NodeRecordManager{
  180. ctx: ctx,
  181. cancel: cancel,
  182. }
  183. }
  184. // Start begins retrieving node status using the manager's context
  185. func (m *NodeRecordManager) Start() {
  186. m.mu.Lock()
  187. defer m.mu.Unlock()
  188. logger.Debug("NodeRecordManager: Starting node status retrieval")
  189. logCurrentNodeStatus("NodeRecordManager.Start - Before start")
  190. m.wg.Add(1)
  191. go func() {
  192. defer m.wg.Done()
  193. RetrieveNodesStatus(m.ctx)
  194. }()
  195. logger.Debug("NodeRecordManager: Started successfully")
  196. }
  197. // Stop cancels the current context and waits for operations to complete
  198. func (m *NodeRecordManager) Stop() {
  199. m.mu.Lock()
  200. defer m.mu.Unlock()
  201. logger.Debug("NodeRecordManager: Stopping node status retrieval")
  202. logCurrentNodeStatus("NodeRecordManager.Stop - Before stop")
  203. m.cancel()
  204. m.wg.Wait()
  205. logger.Debug("NodeRecordManager: Stopped successfully")
  206. logCurrentNodeStatus("NodeRecordManager.Stop - After stop")
  207. }
  208. // Restart stops and then restarts the node status retrieval
  209. func (m *NodeRecordManager) Restart() {
  210. logger.Debug("NodeRecordManager: Restarting node status retrieval")
  211. logCurrentNodeStatus("NodeRecordManager.Restart - Before restart")
  212. m.Stop()
  213. logger.Debug("NodeRecordManager: Creating new context for restart")
  214. // Create new context
  215. m.ctx, m.cancel = context.WithCancel(context.Background())
  216. // Start retrieval with new context
  217. m.Start()
  218. logger.Debug("NodeRecordManager: Restart completed")
  219. logCurrentNodeStatus("NodeRecordManager.Restart - After restart")
  220. }
  221. // For backward compatibility
  222. var (
  223. defaultManager *NodeRecordManager
  224. restartMu sync.Mutex
  225. )
  226. // InitDefaultManager initializes the default NodeRecordManager
  227. func InitDefaultManager() {
  228. logger.Debug("Initializing default NodeRecordManager")
  229. logCurrentNodeStatus("InitDefaultManager - Before init")
  230. if defaultManager != nil {
  231. logger.Debug("Default manager exists, stopping it first")
  232. defaultManager.Stop()
  233. }
  234. defaultManager = NewNodeRecordManager(context.Background())
  235. defaultManager.Start()
  236. logger.Debug("Default NodeRecordManager initialized")
  237. logCurrentNodeStatus("InitDefaultManager - After init")
  238. }
  239. // RestartRetrieveNodesStatus restarts the node status retrieval process
  240. // Kept for backward compatibility
  241. func RestartRetrieveNodesStatus() {
  242. restartMu.Lock()
  243. defer restartMu.Unlock()
  244. logger.Debug("RestartRetrieveNodesStatus called")
  245. logCurrentNodeStatus("RestartRetrieveNodesStatus - Before restart")
  246. if defaultManager == nil {
  247. logger.Debug("Default manager is nil, initializing new one")
  248. InitDefaultManager()
  249. return
  250. }
  251. logger.Debug("Restarting existing default manager")
  252. defaultManager.Restart()
  253. logger.Debug("RestartRetrieveNodesStatus completed")
  254. logCurrentNodeStatus("RestartRetrieveNodesStatus - After restart")
  255. }
  256. // StartRetrieveNodesStatus starts the node status retrieval with a custom context
  257. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  258. logger.Debug("Starting node status retrieval with custom context")
  259. logCurrentNodeStatus("StartRetrieveNodesStatus - Before start")
  260. manager := NewNodeRecordManager(ctx)
  261. manager.Start()
  262. logger.Debug("Custom NodeRecordManager started")
  263. logCurrentNodeStatus("StartRetrieveNodesStatus - After start")
  264. return manager
  265. }
  266. // StartDefaultManager starts the default node status retrieval manager
  267. // This should be called at system startup
  268. func StartDefaultManager() {
  269. restartMu.Lock()
  270. defer restartMu.Unlock()
  271. logger.Debug("StartDefaultManager called")
  272. logCurrentNodeStatus("StartDefaultManager - Before start")
  273. if defaultManager != nil {
  274. logger.Info("DefaultManager already running, restarting...")
  275. logger.Debug("Default manager exists, performing restart")
  276. defaultManager.Restart()
  277. return
  278. }
  279. logger.Info("Starting default NodeRecordManager...")
  280. logger.Debug("No default manager exists, initializing new one")
  281. InitDefaultManager()
  282. logger.Debug("StartDefaultManager completed")
  283. logCurrentNodeStatus("StartDefaultManager - After start")
  284. }
  285. // cleanupDisabledNodes removes retry states for environments that are no longer enabled
  286. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  287. retryMutex.Lock()
  288. defer retryMutex.Unlock()
  289. // Create a map for quick lookup
  290. enabledMap := make(map[uint64]bool)
  291. for _, id := range enabledEnvIDs {
  292. enabledMap[id] = true
  293. }
  294. // Remove retry states for disabled environments
  295. var cleanedUp []uint64
  296. for envID := range retryStates {
  297. if !enabledMap[envID] {
  298. delete(retryStates, envID)
  299. cleanedUp = append(cleanedUp, envID)
  300. }
  301. }
  302. if len(cleanedUp) > 0 {
  303. logger.Debugf("cleanupDisabledNodes: Cleaned up retry states for disabled environments: %v", cleanedUp)
  304. }
  305. }
  306. // removeFromNodeMap removes disabled nodes from NodeMap
  307. func removeFromNodeMap(enabledEnvIDs []uint64) {
  308. mutex.Lock()
  309. defer mutex.Unlock()
  310. // Create a map for quick lookup
  311. enabledMap := make(map[uint64]bool)
  312. for _, id := range enabledEnvIDs {
  313. enabledMap[id] = true
  314. }
  315. // Remove nodes for disabled environments
  316. var removed []uint64
  317. for envID := range NodeMap {
  318. if !enabledMap[envID] {
  319. delete(NodeMap, envID)
  320. removed = append(removed, envID)
  321. }
  322. }
  323. if len(removed) > 0 {
  324. logger.Debugf("removeFromNodeMap: Removed disabled nodes from NodeMap: %v", removed)
  325. }
  326. }
  327. // checkEnvironmentStillEnabled checks if an environment is still enabled
  328. func checkEnvironmentStillEnabled(envID uint64) bool {
  329. env := query.Environment
  330. environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
  331. if err != nil {
  332. logger.Debugf("checkEnvironmentStillEnabled: Environment ID %d no longer enabled or not found", envID)
  333. return false
  334. }
  335. return environment != nil
  336. }
  337. func RetrieveNodesStatus(ctx context.Context) {
  338. logger.Info("RetrieveNodesStatus start")
  339. logger.Debug("RetrieveNodesStatus: Initializing node status retrieval")
  340. defer logger.Info("RetrieveNodesStatus exited")
  341. defer logger.Debug("RetrieveNodesStatus: Cleanup completed")
  342. mutex.Lock()
  343. if NodeMap == nil {
  344. logger.Debug("RetrieveNodesStatus: NodeMap is nil, creating new one")
  345. NodeMap = make(TNodeMap)
  346. } else {
  347. logger.Debugf("RetrieveNodesStatus: NodeMap already exists with %d nodes", len(NodeMap))
  348. }
  349. mutex.Unlock()
  350. logCurrentNodeStatus("RetrieveNodesStatus - Initial state")
  351. // Add periodic environment checking ticker
  352. envCheckTicker := time.NewTicker(30 * time.Second) // Check every 30 seconds
  353. defer envCheckTicker.Stop()
  354. env := query.Environment
  355. envs, err := env.Where(env.Enabled.Is(true)).Find()
  356. if err != nil {
  357. logger.Error(err)
  358. logger.Debug("RetrieveNodesStatus: Failed to query enabled environments")
  359. return
  360. }
  361. logger.Debugf("RetrieveNodesStatus: Found %d enabled environments", len(envs))
  362. for i, e := range envs {
  363. logger.Debugf("RetrieveNodesStatus: Environment[%d] - ID: %d, Name: %s, Enabled: %t",
  364. i, e.ID, e.Name, e.Enabled)
  365. }
  366. // Get current enabled environment IDs
  367. var enabledEnvIDs []uint64
  368. for _, e := range envs {
  369. enabledEnvIDs = append(enabledEnvIDs, e.ID)
  370. }
  371. // Clean up disabled nodes
  372. cleanupDisabledNodes(enabledEnvIDs)
  373. removeFromNodeMap(enabledEnvIDs)
  374. var wg sync.WaitGroup
  375. defer wg.Wait()
  376. // Channel to signal when environment list changes
  377. envUpdateChan := make(chan []uint64, 1)
  378. // Start environment monitoring goroutine
  379. wg.Add(1)
  380. go func() {
  381. defer wg.Done()
  382. defer logger.Debug("RetrieveNodesStatus: Environment monitor goroutine completed")
  383. for {
  384. select {
  385. case <-ctx.Done():
  386. logger.Debug("RetrieveNodesStatus: Environment monitor context cancelled")
  387. return
  388. case <-envCheckTicker.C:
  389. // Re-check enabled environments
  390. currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
  391. if err != nil {
  392. logger.Error("RetrieveNodesStatus: Failed to re-query environments:", err)
  393. continue
  394. }
  395. var currentEnabledIDs []uint64
  396. for _, e := range currentEnvs {
  397. currentEnabledIDs = append(currentEnabledIDs, e.ID)
  398. }
  399. // Check if environment list changed
  400. if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
  401. logger.Debugf("RetrieveNodesStatus: Environment list changed from %v to %v", enabledEnvIDs, currentEnabledIDs)
  402. cleanupDisabledNodes(currentEnabledIDs)
  403. removeFromNodeMap(currentEnabledIDs)
  404. // Update the list
  405. enabledEnvIDs = currentEnabledIDs
  406. // Notify about the change
  407. select {
  408. case envUpdateChan <- currentEnabledIDs:
  409. default:
  410. // Non-blocking send
  411. }
  412. }
  413. }
  414. }
  415. }()
  416. for _, env := range envs {
  417. wg.Add(1)
  418. logger.Debugf("RetrieveNodesStatus: Starting goroutine for environment ID: %d, Name: %s", env.ID, env.Name)
  419. go func(e *model.Environment) {
  420. defer wg.Done()
  421. defer logger.Debugf("RetrieveNodesStatus: Goroutine completed for environment ID: %d", e.ID)
  422. // Retry ticker - check every 1 second but use backoff logic to determine actual retry
  423. retryTicker := time.NewTicker(1 * time.Second)
  424. defer retryTicker.Stop()
  425. for {
  426. select {
  427. case <-ctx.Done():
  428. logger.Debugf("RetrieveNodesStatus: Context cancelled for environment ID: %d", e.ID)
  429. return
  430. case newEnabledIDs := <-envUpdateChan:
  431. // Check if this environment is still enabled
  432. found := false
  433. for _, id := range newEnabledIDs {
  434. if id == e.ID {
  435. found = true
  436. break
  437. }
  438. }
  439. if !found {
  440. logger.Debugf("RetrieveNodesStatus: Environment ID %d has been disabled, stopping goroutine", e.ID)
  441. return
  442. }
  443. case <-retryTicker.C:
  444. // Double-check if environment is still enabled before retrying
  445. if !checkEnvironmentStillEnabled(e.ID) {
  446. logger.Debugf("RetrieveNodesStatus: Environment ID %d no longer enabled, stopping goroutine", e.ID)
  447. // Clean up retry state
  448. retryMutex.Lock()
  449. delete(retryStates, e.ID)
  450. retryMutex.Unlock()
  451. return
  452. }
  453. // Check if we should retry based on backoff logic
  454. if !shouldRetry(e.ID, defaultRetryConfig) {
  455. continue // Skip this iteration
  456. }
  457. logger.Debugf("RetrieveNodesStatus: Attempting connection to environment ID: %d", e.ID)
  458. if err := nodeAnalyticRecord(e, ctx); err != nil {
  459. logger.Error(err)
  460. logger.Debugf("RetrieveNodesStatus: Connection failed for environment ID: %d, error: %v", e.ID, err)
  461. markConnectionFailure(e.ID, defaultRetryConfig, err)
  462. } else {
  463. logger.Debugf("RetrieveNodesStatus: Connection successful for environment ID: %d", e.ID)
  464. markConnectionSuccess(e.ID)
  465. }
  466. }
  467. }
  468. }(env)
  469. }
  470. logger.Debug("RetrieveNodesStatus: All goroutines started, waiting for completion")
  471. }
  472. // equalUint64Slices compares two uint64 slices for equality
  473. func equalUint64Slices(a, b []uint64) bool {
  474. if len(a) != len(b) {
  475. return false
  476. }
  477. // Create maps for comparison
  478. mapA := make(map[uint64]bool)
  479. mapB := make(map[uint64]bool)
  480. for _, v := range a {
  481. mapA[v] = true
  482. }
  483. for _, v := range b {
  484. mapB[v] = true
  485. }
  486. // Compare maps
  487. for k := range mapA {
  488. if !mapB[k] {
  489. return false
  490. }
  491. }
  492. for k := range mapB {
  493. if !mapA[k] {
  494. return false
  495. }
  496. }
  497. return true
  498. }
  499. func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
  500. logger.Debugf("nodeAnalyticRecord: Starting for environment ID: %d, Name: %s", env.ID, env.Name)
  501. scopeCtx, cancel := context.WithCancel(ctx)
  502. defer cancel()
  503. node, err := InitNode(env)
  504. mutex.Lock()
  505. NodeMap[env.ID] = node
  506. mutex.Unlock()
  507. logger.Debugf("nodeAnalyticRecord: Node initialized for environment ID: %d", env.ID)
  508. if err != nil {
  509. logger.Debugf("nodeAnalyticRecord: InitNode failed for environment ID: %d, error: %v", env.ID, err)
  510. return err
  511. }
  512. u, err := env.GetWebSocketURL("/api/analytic/intro")
  513. if err != nil {
  514. logger.Debugf("nodeAnalyticRecord: GetWebSocketURL failed for environment ID: %d, error: %v", env.ID, err)
  515. return err
  516. }
  517. logger.Debugf("nodeAnalyticRecord: Connecting to WebSocket URL: %s for environment ID: %d", u, env.ID)
  518. header := http.Header{}
  519. header.Set("X-Node-Secret", env.Token)
  520. dial := &websocket.Dialer{
  521. Proxy: http.ProxyFromEnvironment,
  522. HandshakeTimeout: 5 * time.Second,
  523. }
  524. c, _, err := dial.Dial(u, header)
  525. if err != nil {
  526. logger.Debugf("nodeAnalyticRecord: WebSocket dial failed for environment ID: %d, error: %v", env.ID, err)
  527. return err
  528. }
  529. defer c.Close()
  530. logger.Debugf("nodeAnalyticRecord: WebSocket connection established for environment ID: %d", env.ID)
  531. go func() {
  532. <-scopeCtx.Done()
  533. logger.Debugf("nodeAnalyticRecord: Context cancelled, closing WebSocket for environment ID: %d", env.ID)
  534. _ = c.Close()
  535. }()
  536. var nodeStat NodeStat
  537. messageCount := 0
  538. for {
  539. err = c.ReadJSON(&nodeStat)
  540. if err != nil {
  541. if helper.IsUnexpectedWebsocketError(err) {
  542. logger.Debugf("nodeAnalyticRecord: Unexpected WebSocket error for environment ID: %d, error: %v", env.ID, err)
  543. return err
  544. }
  545. logger.Debugf("nodeAnalyticRecord: WebSocket read completed for environment ID: %d", env.ID)
  546. return nil
  547. }
  548. messageCount++
  549. logger.Debugf("nodeAnalyticRecord: Received message #%d from environment ID: %d", messageCount, env.ID)
  550. // set online
  551. nodeStat.Status = true
  552. nodeStat.ResponseAt = time.Now()
  553. mutex.Lock()
  554. if NodeMap[env.ID] != nil {
  555. NodeMap[env.ID].NodeStat = nodeStat
  556. logger.Debugf("nodeAnalyticRecord: Updated NodeStat for environment ID: %d, Status: %t, ResponseAt: %v",
  557. env.ID, nodeStat.Status, nodeStat.ResponseAt)
  558. } else {
  559. logger.Debugf("nodeAnalyticRecord: Warning - Node not found in NodeMap for environment ID: %d", env.ID)
  560. }
  561. mutex.Unlock()
  562. }
  563. }