123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- package analytic
- import (
- "context"
- "fmt"
- "net/http"
- "sync"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/helper"
- "github.com/0xJacky/Nginx-UI/model"
- "github.com/0xJacky/Nginx-UI/query"
- "github.com/gorilla/websocket"
- "github.com/uozi-tech/cosy/logger"
- )
- // NodeRecordManager manages the node status retrieval process
- type NodeRecordManager struct {
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- mu sync.Mutex
- }
- // RetryConfig holds configuration for retry logic
- type RetryConfig struct {
- BaseInterval time.Duration // Base retry interval
- MaxInterval time.Duration // Maximum retry interval
- MaxRetries int // Maximum consecutive failures before giving up temporarily
- BackoffMultiple float64 // Multiplier for exponential backoff
- ResetAfter time.Duration // Time to reset failure count if successful
- }
- // Default retry configuration
- var defaultRetryConfig = RetryConfig{
- BaseInterval: 5 * time.Second, // Start with 5 seconds
- MaxInterval: 5 * time.Minute, // Max 5 minutes between retries
- MaxRetries: 10, // Max 10 consecutive failures
- BackoffMultiple: 1.5, // 1.5x backoff each time
- ResetAfter: 30 * time.Second, // Reset failure count after 30s of success
- }
- // NodeRetryState tracks retry state for each node
- type NodeRetryState struct {
- FailureCount int
- LastRetryTime time.Time
- LastSuccessTime time.Time
- NextRetryTime time.Time
- }
- var (
- retryStates = make(map[uint64]*NodeRetryState)
- retryMutex sync.Mutex
- )
- // getRetryState gets or creates retry state for a node
- func getRetryState(envID uint64) *NodeRetryState {
- retryMutex.Lock()
- defer retryMutex.Unlock()
- if state, exists := retryStates[envID]; exists {
- return state
- }
- state := &NodeRetryState{
- FailureCount: 0,
- LastSuccessTime: time.Now(),
- NextRetryTime: time.Now(),
- }
- retryStates[envID] = state
- return state
- }
- // updateNodeStatus safely updates node status with proper timestamp
- func updateNodeStatus(envID uint64, status bool, reason string) {
- mutex.Lock()
- defer mutex.Unlock()
- now := time.Now()
- if NodeMap[envID] != nil {
- NodeMap[envID].Status = status
- NodeMap[envID].ResponseAt = now
- logger.Debugf("updateNodeStatus: Node[%d] status updated to %t (%s) at %v",
- envID, status, reason, now)
- } else {
- logger.Debugf("updateNodeStatus: Warning - Node[%d] not found in NodeMap", envID)
- }
- }
- // calculateNextRetryInterval calculates the next retry interval using exponential backoff
- func calculateNextRetryInterval(state *NodeRetryState, config RetryConfig) time.Duration {
- if state.FailureCount == 0 {
- return config.BaseInterval
- }
- interval := config.BaseInterval
- for i := 0; i < state.FailureCount-1; i++ {
- interval = time.Duration(float64(interval) * config.BackoffMultiple)
- if interval > config.MaxInterval {
- interval = config.MaxInterval
- break
- }
- }
- logger.Debugf("calculateNextRetryInterval: FailureCount=%d, NextInterval=%v",
- state.FailureCount, interval)
- return interval
- }
- // shouldRetry determines if we should retry connection for a node
- func shouldRetry(envID uint64, config RetryConfig) bool {
- state := getRetryState(envID)
- now := time.Now()
- // Check if we've exceeded max retries
- if state.FailureCount >= config.MaxRetries {
- // If we've been successful recently, reset the failure count
- if now.Sub(state.LastSuccessTime) < config.ResetAfter {
- logger.Debugf("shouldRetry: Resetting failure count for node %d due to recent success", envID)
- state.FailureCount = 0
- state.NextRetryTime = now
- return true
- }
- // Too many failures, back off for a longer period
- if now.Before(state.NextRetryTime) {
- logger.Debugf("shouldRetry: Node %d in backoff period until %v (failures: %d)",
- envID, state.NextRetryTime, state.FailureCount)
- return false
- }
- // Reset after long backoff period
- logger.Debugf("shouldRetry: Resetting failure count for node %d after backoff period", envID)
- state.FailureCount = config.MaxRetries / 2 // Start from middle to avoid immediate max again
- state.NextRetryTime = now
- return true
- }
- // Normal retry logic
- if now.Before(state.NextRetryTime) {
- return false
- }
- return true
- }
- // markConnectionFailure marks a connection failure and calculates next retry time
- func markConnectionFailure(envID uint64, config RetryConfig, err error) {
- state := getRetryState(envID)
- now := time.Now()
- state.FailureCount++
- state.LastRetryTime = now
- nextInterval := calculateNextRetryInterval(state, config)
- state.NextRetryTime = now.Add(nextInterval)
- logger.Debugf("markConnectionFailure: Node %d failed (count: %d), next retry at %v, error: %v",
- envID, state.FailureCount, state.NextRetryTime, err)
- // Update node status to offline
- updateNodeStatus(envID, false, "connection_failed")
- }
- // markConnectionSuccess marks a successful connection
- func markConnectionSuccess(envID uint64) {
- state := getRetryState(envID)
- now := time.Now()
- state.FailureCount = 0
- state.LastSuccessTime = now
- state.NextRetryTime = now // Can retry immediately if connection drops
- logger.Debugf("markConnectionSuccess: Node %d connection successful, failure count reset", envID)
- // Status will be updated in nodeAnalyticRecord when we receive actual data
- }
- // logCurrentNodeStatus logs current node status for debugging
- func logCurrentNodeStatus(prefix string) {
- mutex.Lock()
- defer mutex.Unlock()
- if NodeMap == nil {
- logger.Debugf("%s: NodeMap is nil", prefix)
- return
- }
- logger.Debugf("%s: Current NodeMap contains %d nodes", prefix, len(NodeMap))
- for envID, node := range NodeMap {
- if node == nil {
- logger.Debugf("%s: Node[%d] is nil", prefix, envID)
- continue
- }
- // Also log retry state
- retryMutex.Lock()
- state := retryStates[envID]
- retryMutex.Unlock()
- retryInfo := "no_retry_state"
- if state != nil {
- retryInfo = fmt.Sprintf("failures=%d,next_retry=%v",
- state.FailureCount, state.NextRetryTime)
- }
- logger.Debugf("%s: Node[%d] - Status: %t, ResponseAt: %v, RetryState: %s",
- prefix, envID, node.Status, node.ResponseAt, retryInfo)
- }
- }
- // NewNodeRecordManager creates a new NodeRecordManager with the provided context
- func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
- logger.Debug("Creating new NodeRecordManager")
- ctx, cancel := context.WithCancel(parentCtx)
- return &NodeRecordManager{
- ctx: ctx,
- cancel: cancel,
- }
- }
- // Start begins retrieving node status using the manager's context
- func (m *NodeRecordManager) Start() {
- m.mu.Lock()
- defer m.mu.Unlock()
- logger.Debug("NodeRecordManager: Starting node status retrieval")
- logCurrentNodeStatus("NodeRecordManager.Start - Before start")
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
- RetrieveNodesStatus(m.ctx)
- }()
- logger.Debug("NodeRecordManager: Started successfully")
- }
- // Stop cancels the current context and waits for operations to complete
- func (m *NodeRecordManager) Stop() {
- m.mu.Lock()
- defer m.mu.Unlock()
- logger.Debug("NodeRecordManager: Stopping node status retrieval")
- logCurrentNodeStatus("NodeRecordManager.Stop - Before stop")
- m.cancel()
- m.wg.Wait()
- logger.Debug("NodeRecordManager: Stopped successfully")
- logCurrentNodeStatus("NodeRecordManager.Stop - After stop")
- }
- // Restart stops and then restarts the node status retrieval
- func (m *NodeRecordManager) Restart() {
- logger.Debug("NodeRecordManager: Restarting node status retrieval")
- logCurrentNodeStatus("NodeRecordManager.Restart - Before restart")
- m.Stop()
- logger.Debug("NodeRecordManager: Creating new context for restart")
- // Create new context
- m.ctx, m.cancel = context.WithCancel(context.Background())
- // Start retrieval with new context
- m.Start()
- logger.Debug("NodeRecordManager: Restart completed")
- logCurrentNodeStatus("NodeRecordManager.Restart - After restart")
- }
- // For backward compatibility
- var (
- defaultManager *NodeRecordManager
- restartMu sync.Mutex
- )
- // InitDefaultManager initializes the default NodeRecordManager
- func InitDefaultManager() {
- logger.Debug("Initializing default NodeRecordManager")
- logCurrentNodeStatus("InitDefaultManager - Before init")
- if defaultManager != nil {
- logger.Debug("Default manager exists, stopping it first")
- defaultManager.Stop()
- }
- defaultManager = NewNodeRecordManager(context.Background())
- defaultManager.Start()
- logger.Debug("Default NodeRecordManager initialized")
- logCurrentNodeStatus("InitDefaultManager - After init")
- }
- // RestartRetrieveNodesStatus restarts the node status retrieval process
- // Kept for backward compatibility
- func RestartRetrieveNodesStatus() {
- restartMu.Lock()
- defer restartMu.Unlock()
- logger.Debug("RestartRetrieveNodesStatus called")
- logCurrentNodeStatus("RestartRetrieveNodesStatus - Before restart")
- if defaultManager == nil {
- logger.Debug("Default manager is nil, initializing new one")
- InitDefaultManager()
- return
- }
- logger.Debug("Restarting existing default manager")
- defaultManager.Restart()
- logger.Debug("RestartRetrieveNodesStatus completed")
- logCurrentNodeStatus("RestartRetrieveNodesStatus - After restart")
- }
- // StartRetrieveNodesStatus starts the node status retrieval with a custom context
- func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
- logger.Debug("Starting node status retrieval with custom context")
- logCurrentNodeStatus("StartRetrieveNodesStatus - Before start")
- manager := NewNodeRecordManager(ctx)
- manager.Start()
- logger.Debug("Custom NodeRecordManager started")
- logCurrentNodeStatus("StartRetrieveNodesStatus - After start")
- return manager
- }
- // StartDefaultManager starts the default node status retrieval manager
- // This should be called at system startup
- func StartDefaultManager() {
- restartMu.Lock()
- defer restartMu.Unlock()
- logger.Debug("StartDefaultManager called")
- logCurrentNodeStatus("StartDefaultManager - Before start")
- if defaultManager != nil {
- logger.Info("DefaultManager already running, restarting...")
- logger.Debug("Default manager exists, performing restart")
- defaultManager.Restart()
- return
- }
- logger.Info("Starting default NodeRecordManager...")
- logger.Debug("No default manager exists, initializing new one")
- InitDefaultManager()
- logger.Debug("StartDefaultManager completed")
- logCurrentNodeStatus("StartDefaultManager - After start")
- }
- // cleanupDisabledNodes removes retry states for environments that are no longer enabled
- func cleanupDisabledNodes(enabledEnvIDs []uint64) {
- retryMutex.Lock()
- defer retryMutex.Unlock()
- // Create a map for quick lookup
- enabledMap := make(map[uint64]bool)
- for _, id := range enabledEnvIDs {
- enabledMap[id] = true
- }
- // Remove retry states for disabled environments
- var cleanedUp []uint64
- for envID := range retryStates {
- if !enabledMap[envID] {
- delete(retryStates, envID)
- cleanedUp = append(cleanedUp, envID)
- }
- }
- if len(cleanedUp) > 0 {
- logger.Debugf("cleanupDisabledNodes: Cleaned up retry states for disabled environments: %v", cleanedUp)
- }
- }
- // removeFromNodeMap removes disabled nodes from NodeMap
- func removeFromNodeMap(enabledEnvIDs []uint64) {
- mutex.Lock()
- defer mutex.Unlock()
- // Create a map for quick lookup
- enabledMap := make(map[uint64]bool)
- for _, id := range enabledEnvIDs {
- enabledMap[id] = true
- }
- // Remove nodes for disabled environments
- var removed []uint64
- for envID := range NodeMap {
- if !enabledMap[envID] {
- delete(NodeMap, envID)
- removed = append(removed, envID)
- }
- }
- if len(removed) > 0 {
- logger.Debugf("removeFromNodeMap: Removed disabled nodes from NodeMap: %v", removed)
- }
- }
- // checkEnvironmentStillEnabled checks if an environment is still enabled
- func checkEnvironmentStillEnabled(envID uint64) bool {
- env := query.Environment
- environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
- if err != nil {
- logger.Debugf("checkEnvironmentStillEnabled: Environment ID %d no longer enabled or not found", envID)
- return false
- }
- return environment != nil
- }
- func RetrieveNodesStatus(ctx context.Context) {
- logger.Info("RetrieveNodesStatus start")
- logger.Debug("RetrieveNodesStatus: Initializing node status retrieval")
- defer logger.Info("RetrieveNodesStatus exited")
- defer logger.Debug("RetrieveNodesStatus: Cleanup completed")
- mutex.Lock()
- if NodeMap == nil {
- logger.Debug("RetrieveNodesStatus: NodeMap is nil, creating new one")
- NodeMap = make(TNodeMap)
- } else {
- logger.Debugf("RetrieveNodesStatus: NodeMap already exists with %d nodes", len(NodeMap))
- }
- mutex.Unlock()
- logCurrentNodeStatus("RetrieveNodesStatus - Initial state")
- // Add periodic environment checking ticker
- envCheckTicker := time.NewTicker(30 * time.Second) // Check every 30 seconds
- defer envCheckTicker.Stop()
- env := query.Environment
- envs, err := env.Where(env.Enabled.Is(true)).Find()
- if err != nil {
- logger.Error(err)
- logger.Debug("RetrieveNodesStatus: Failed to query enabled environments")
- return
- }
- logger.Debugf("RetrieveNodesStatus: Found %d enabled environments", len(envs))
- for i, e := range envs {
- logger.Debugf("RetrieveNodesStatus: Environment[%d] - ID: %d, Name: %s, Enabled: %t",
- i, e.ID, e.Name, e.Enabled)
- }
- // Get current enabled environment IDs
- var enabledEnvIDs []uint64
- for _, e := range envs {
- enabledEnvIDs = append(enabledEnvIDs, e.ID)
- }
- // Clean up disabled nodes
- cleanupDisabledNodes(enabledEnvIDs)
- removeFromNodeMap(enabledEnvIDs)
- var wg sync.WaitGroup
- defer wg.Wait()
- // Channel to signal when environment list changes
- envUpdateChan := make(chan []uint64, 1)
- // Start environment monitoring goroutine
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer logger.Debug("RetrieveNodesStatus: Environment monitor goroutine completed")
- for {
- select {
- case <-ctx.Done():
- logger.Debug("RetrieveNodesStatus: Environment monitor context cancelled")
- return
- case <-envCheckTicker.C:
- // Re-check enabled environments
- currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
- if err != nil {
- logger.Error("RetrieveNodesStatus: Failed to re-query environments:", err)
- continue
- }
- var currentEnabledIDs []uint64
- for _, e := range currentEnvs {
- currentEnabledIDs = append(currentEnabledIDs, e.ID)
- }
- // Check if environment list changed
- if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
- logger.Debugf("RetrieveNodesStatus: Environment list changed from %v to %v", enabledEnvIDs, currentEnabledIDs)
- cleanupDisabledNodes(currentEnabledIDs)
- removeFromNodeMap(currentEnabledIDs)
- // Update the list
- enabledEnvIDs = currentEnabledIDs
- // Notify about the change
- select {
- case envUpdateChan <- currentEnabledIDs:
- default:
- // Non-blocking send
- }
- }
- }
- }
- }()
- for _, env := range envs {
- wg.Add(1)
- logger.Debugf("RetrieveNodesStatus: Starting goroutine for environment ID: %d, Name: %s", env.ID, env.Name)
- go func(e *model.Environment) {
- defer wg.Done()
- defer logger.Debugf("RetrieveNodesStatus: Goroutine completed for environment ID: %d", e.ID)
- // Retry ticker - check every 1 second but use backoff logic to determine actual retry
- retryTicker := time.NewTicker(1 * time.Second)
- defer retryTicker.Stop()
- for {
- select {
- case <-ctx.Done():
- logger.Debugf("RetrieveNodesStatus: Context cancelled for environment ID: %d", e.ID)
- return
- case newEnabledIDs := <-envUpdateChan:
- // Check if this environment is still enabled
- found := false
- for _, id := range newEnabledIDs {
- if id == e.ID {
- found = true
- break
- }
- }
- if !found {
- logger.Debugf("RetrieveNodesStatus: Environment ID %d has been disabled, stopping goroutine", e.ID)
- return
- }
- case <-retryTicker.C:
- // Double-check if environment is still enabled before retrying
- if !checkEnvironmentStillEnabled(e.ID) {
- logger.Debugf("RetrieveNodesStatus: Environment ID %d no longer enabled, stopping goroutine", e.ID)
- // Clean up retry state
- retryMutex.Lock()
- delete(retryStates, e.ID)
- retryMutex.Unlock()
- return
- }
- // Check if we should retry based on backoff logic
- if !shouldRetry(e.ID, defaultRetryConfig) {
- continue // Skip this iteration
- }
- logger.Debugf("RetrieveNodesStatus: Attempting connection to environment ID: %d", e.ID)
- if err := nodeAnalyticRecord(e, ctx); err != nil {
- logger.Error(err)
- logger.Debugf("RetrieveNodesStatus: Connection failed for environment ID: %d, error: %v", e.ID, err)
- markConnectionFailure(e.ID, defaultRetryConfig, err)
- } else {
- logger.Debugf("RetrieveNodesStatus: Connection successful for environment ID: %d", e.ID)
- markConnectionSuccess(e.ID)
- }
- }
- }
- }(env)
- }
- logger.Debug("RetrieveNodesStatus: All goroutines started, waiting for completion")
- }
- // equalUint64Slices compares two uint64 slices for equality
- func equalUint64Slices(a, b []uint64) bool {
- if len(a) != len(b) {
- return false
- }
- // Create maps for comparison
- mapA := make(map[uint64]bool)
- mapB := make(map[uint64]bool)
- for _, v := range a {
- mapA[v] = true
- }
- for _, v := range b {
- mapB[v] = true
- }
- // Compare maps
- for k := range mapA {
- if !mapB[k] {
- return false
- }
- }
- for k := range mapB {
- if !mapA[k] {
- return false
- }
- }
- return true
- }
- func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
- logger.Debugf("nodeAnalyticRecord: Starting for environment ID: %d, Name: %s", env.ID, env.Name)
- scopeCtx, cancel := context.WithCancel(ctx)
- defer cancel()
- node, err := InitNode(env)
- mutex.Lock()
- NodeMap[env.ID] = node
- mutex.Unlock()
- logger.Debugf("nodeAnalyticRecord: Node initialized for environment ID: %d", env.ID)
- if err != nil {
- logger.Debugf("nodeAnalyticRecord: InitNode failed for environment ID: %d, error: %v", env.ID, err)
- return err
- }
- u, err := env.GetWebSocketURL("/api/analytic/intro")
- if err != nil {
- logger.Debugf("nodeAnalyticRecord: GetWebSocketURL failed for environment ID: %d, error: %v", env.ID, err)
- return err
- }
- logger.Debugf("nodeAnalyticRecord: Connecting to WebSocket URL: %s for environment ID: %d", u, env.ID)
- header := http.Header{}
- header.Set("X-Node-Secret", env.Token)
- dial := &websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: 5 * time.Second,
- }
- c, _, err := dial.Dial(u, header)
- if err != nil {
- logger.Debugf("nodeAnalyticRecord: WebSocket dial failed for environment ID: %d, error: %v", env.ID, err)
- return err
- }
- defer c.Close()
- logger.Debugf("nodeAnalyticRecord: WebSocket connection established for environment ID: %d", env.ID)
- go func() {
- <-scopeCtx.Done()
- logger.Debugf("nodeAnalyticRecord: Context cancelled, closing WebSocket for environment ID: %d", env.ID)
- _ = c.Close()
- }()
- var nodeStat NodeStat
- messageCount := 0
- for {
- err = c.ReadJSON(&nodeStat)
- if err != nil {
- if helper.IsUnexpectedWebsocketError(err) {
- logger.Debugf("nodeAnalyticRecord: Unexpected WebSocket error for environment ID: %d, error: %v", env.ID, err)
- return err
- }
- logger.Debugf("nodeAnalyticRecord: WebSocket read completed for environment ID: %d", env.ID)
- return nil
- }
- messageCount++
- logger.Debugf("nodeAnalyticRecord: Received message #%d from environment ID: %d", messageCount, env.ID)
- // set online
- nodeStat.Status = true
- nodeStat.ResponseAt = time.Now()
- mutex.Lock()
- if NodeMap[env.ID] != nil {
- NodeMap[env.ID].NodeStat = nodeStat
- logger.Debugf("nodeAnalyticRecord: Updated NodeStat for environment ID: %d, Status: %t, ResponseAt: %v",
- env.ID, nodeStat.Status, nodeStat.ResponseAt)
- } else {
- logger.Debugf("nodeAnalyticRecord: Warning - Node not found in NodeMap for environment ID: %d", env.ID)
- }
- mutex.Unlock()
- }
- }
|