|
@@ -2,6 +2,7 @@ package analytic
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "fmt"
|
|
|
"net/http"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -21,8 +22,196 @@ type NodeRecordManager struct {
|
|
|
mu sync.Mutex
|
|
|
}
|
|
|
|
|
|
+// RetryConfig holds configuration for retry logic
|
|
|
+type RetryConfig struct {
|
|
|
+ BaseInterval time.Duration // Base retry interval
|
|
|
+ MaxInterval time.Duration // Maximum retry interval
|
|
|
+ MaxRetries int // Maximum consecutive failures before giving up temporarily
|
|
|
+ BackoffMultiple float64 // Multiplier for exponential backoff
|
|
|
+ ResetAfter time.Duration // Time to reset failure count if successful
|
|
|
+}
|
|
|
+
|
|
|
+// Default retry configuration
|
|
|
+var defaultRetryConfig = RetryConfig{
|
|
|
+ BaseInterval: 5 * time.Second, // Start with 5 seconds
|
|
|
+ MaxInterval: 5 * time.Minute, // Max 5 minutes between retries
|
|
|
+ MaxRetries: 10, // Max 10 consecutive failures
|
|
|
+ BackoffMultiple: 1.5, // 1.5x backoff each time
|
|
|
+ ResetAfter: 30 * time.Second, // Reset failure count after 30s of success
|
|
|
+}
|
|
|
+
|
|
|
+// NodeRetryState tracks retry state for each node
|
|
|
+type NodeRetryState struct {
|
|
|
+ FailureCount int
|
|
|
+ LastRetryTime time.Time
|
|
|
+ LastSuccessTime time.Time
|
|
|
+ NextRetryTime time.Time
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ retryStates = make(map[uint64]*NodeRetryState)
|
|
|
+ retryMutex sync.Mutex
|
|
|
+)
|
|
|
+
|
|
|
+// getRetryState gets or creates retry state for a node
|
|
|
+func getRetryState(envID uint64) *NodeRetryState {
|
|
|
+ retryMutex.Lock()
|
|
|
+ defer retryMutex.Unlock()
|
|
|
+
|
|
|
+ if state, exists := retryStates[envID]; exists {
|
|
|
+ return state
|
|
|
+ }
|
|
|
+
|
|
|
+ state := &NodeRetryState{
|
|
|
+ FailureCount: 0,
|
|
|
+ LastSuccessTime: time.Now(),
|
|
|
+ NextRetryTime: time.Now(),
|
|
|
+ }
|
|
|
+ retryStates[envID] = state
|
|
|
+ return state
|
|
|
+}
|
|
|
+
|
|
|
+// updateNodeStatus safely updates node status with proper timestamp
|
|
|
+func updateNodeStatus(envID uint64, status bool, reason string) {
|
|
|
+ mutex.Lock()
|
|
|
+ defer mutex.Unlock()
|
|
|
+
|
|
|
+ now := time.Now()
|
|
|
+ if NodeMap[envID] != nil {
|
|
|
+ NodeMap[envID].Status = status
|
|
|
+ NodeMap[envID].ResponseAt = now
|
|
|
+ logger.Debugf("updateNodeStatus: Node[%d] status updated to %t (%s) at %v",
|
|
|
+ envID, status, reason, now)
|
|
|
+ } else {
|
|
|
+ logger.Debugf("updateNodeStatus: Warning - Node[%d] not found in NodeMap", envID)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// calculateNextRetryInterval calculates the next retry interval using exponential backoff
|
|
|
+func calculateNextRetryInterval(state *NodeRetryState, config RetryConfig) time.Duration {
|
|
|
+ if state.FailureCount == 0 {
|
|
|
+ return config.BaseInterval
|
|
|
+ }
|
|
|
+
|
|
|
+ interval := config.BaseInterval
|
|
|
+ for i := 0; i < state.FailureCount-1; i++ {
|
|
|
+ interval = time.Duration(float64(interval) * config.BackoffMultiple)
|
|
|
+ if interval > config.MaxInterval {
|
|
|
+ interval = config.MaxInterval
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Debugf("calculateNextRetryInterval: FailureCount=%d, NextInterval=%v",
|
|
|
+ state.FailureCount, interval)
|
|
|
+ return interval
|
|
|
+}
|
|
|
+
|
|
|
+// shouldRetry determines if we should retry connection for a node
|
|
|
+func shouldRetry(envID uint64, config RetryConfig) bool {
|
|
|
+ state := getRetryState(envID)
|
|
|
+ now := time.Now()
|
|
|
+
|
|
|
+ // Check if we've exceeded max retries
|
|
|
+ if state.FailureCount >= config.MaxRetries {
|
|
|
+ // If we've been successful recently, reset the failure count
|
|
|
+ if now.Sub(state.LastSuccessTime) < config.ResetAfter {
|
|
|
+ logger.Debugf("shouldRetry: Resetting failure count for node %d due to recent success", envID)
|
|
|
+ state.FailureCount = 0
|
|
|
+ state.NextRetryTime = now
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Too many failures, back off for a longer period
|
|
|
+ if now.Before(state.NextRetryTime) {
|
|
|
+ logger.Debugf("shouldRetry: Node %d in backoff period until %v (failures: %d)",
|
|
|
+ envID, state.NextRetryTime, state.FailureCount)
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reset after long backoff period
|
|
|
+ logger.Debugf("shouldRetry: Resetting failure count for node %d after backoff period", envID)
|
|
|
+ state.FailureCount = config.MaxRetries / 2 // Start from middle to avoid immediate max again
|
|
|
+ state.NextRetryTime = now
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Normal retry logic
|
|
|
+ if now.Before(state.NextRetryTime) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// markConnectionFailure marks a connection failure and calculates next retry time
|
|
|
+func markConnectionFailure(envID uint64, config RetryConfig, err error) {
|
|
|
+ state := getRetryState(envID)
|
|
|
+ now := time.Now()
|
|
|
+
|
|
|
+ state.FailureCount++
|
|
|
+ state.LastRetryTime = now
|
|
|
+
|
|
|
+ nextInterval := calculateNextRetryInterval(state, config)
|
|
|
+ state.NextRetryTime = now.Add(nextInterval)
|
|
|
+
|
|
|
+ logger.Debugf("markConnectionFailure: Node %d failed (count: %d), next retry at %v, error: %v",
|
|
|
+ envID, state.FailureCount, state.NextRetryTime, err)
|
|
|
+
|
|
|
+ // Update node status to offline
|
|
|
+ updateNodeStatus(envID, false, "connection_failed")
|
|
|
+}
|
|
|
+
|
|
|
+// markConnectionSuccess marks a successful connection
|
|
|
+func markConnectionSuccess(envID uint64) {
|
|
|
+ state := getRetryState(envID)
|
|
|
+ now := time.Now()
|
|
|
+
|
|
|
+ state.FailureCount = 0
|
|
|
+ state.LastSuccessTime = now
|
|
|
+ state.NextRetryTime = now // Can retry immediately if connection drops
|
|
|
+
|
|
|
+ logger.Debugf("markConnectionSuccess: Node %d connection successful, failure count reset", envID)
|
|
|
+
|
|
|
+ // Status will be updated in nodeAnalyticRecord when we receive actual data
|
|
|
+}
|
|
|
+
|
|
|
+// logCurrentNodeStatus logs current node status for debugging
|
|
|
+func logCurrentNodeStatus(prefix string) {
|
|
|
+ mutex.Lock()
|
|
|
+ defer mutex.Unlock()
|
|
|
+
|
|
|
+ if NodeMap == nil {
|
|
|
+ logger.Debugf("%s: NodeMap is nil", prefix)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Debugf("%s: Current NodeMap contains %d nodes", prefix, len(NodeMap))
|
|
|
+ for envID, node := range NodeMap {
|
|
|
+ if node == nil {
|
|
|
+ logger.Debugf("%s: Node[%d] is nil", prefix, envID)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Also log retry state
|
|
|
+ retryMutex.Lock()
|
|
|
+ state := retryStates[envID]
|
|
|
+ retryMutex.Unlock()
|
|
|
+
|
|
|
+ retryInfo := "no_retry_state"
|
|
|
+ if state != nil {
|
|
|
+ retryInfo = fmt.Sprintf("failures=%d,next_retry=%v",
|
|
|
+ state.FailureCount, state.NextRetryTime)
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Debugf("%s: Node[%d] - Status: %t, ResponseAt: %v, RetryState: %s",
|
|
|
+ prefix, envID, node.Status, node.ResponseAt, retryInfo)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// NewNodeRecordManager creates a new NodeRecordManager with the provided context
|
|
|
func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
|
|
|
+ logger.Debug("Creating new NodeRecordManager")
|
|
|
ctx, cancel := context.WithCancel(parentCtx)
|
|
|
return &NodeRecordManager{
|
|
|
ctx: ctx,
|
|
@@ -35,11 +224,16 @@ func (m *NodeRecordManager) Start() {
|
|
|
m.mu.Lock()
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
+ logger.Debug("NodeRecordManager: Starting node status retrieval")
|
|
|
+ logCurrentNodeStatus("NodeRecordManager.Start - Before start")
|
|
|
+
|
|
|
m.wg.Add(1)
|
|
|
go func() {
|
|
|
defer m.wg.Done()
|
|
|
RetrieveNodesStatus(m.ctx)
|
|
|
}()
|
|
|
+
|
|
|
+ logger.Debug("NodeRecordManager: Started successfully")
|
|
|
}
|
|
|
|
|
|
// Stop cancels the current context and waits for operations to complete
|
|
@@ -47,19 +241,32 @@ func (m *NodeRecordManager) Stop() {
|
|
|
m.mu.Lock()
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
+ logger.Debug("NodeRecordManager: Stopping node status retrieval")
|
|
|
+ logCurrentNodeStatus("NodeRecordManager.Stop - Before stop")
|
|
|
+
|
|
|
m.cancel()
|
|
|
m.wg.Wait()
|
|
|
+
|
|
|
+ logger.Debug("NodeRecordManager: Stopped successfully")
|
|
|
+ logCurrentNodeStatus("NodeRecordManager.Stop - After stop")
|
|
|
}
|
|
|
|
|
|
// Restart stops and then restarts the node status retrieval
|
|
|
func (m *NodeRecordManager) Restart() {
|
|
|
+ logger.Debug("NodeRecordManager: Restarting node status retrieval")
|
|
|
+ logCurrentNodeStatus("NodeRecordManager.Restart - Before restart")
|
|
|
+
|
|
|
m.Stop()
|
|
|
|
|
|
+ logger.Debug("NodeRecordManager: Creating new context for restart")
|
|
|
// Create new context
|
|
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
|
|
|
|
|
// Start retrieval with new context
|
|
|
m.Start()
|
|
|
+
|
|
|
+ logger.Debug("NodeRecordManager: Restart completed")
|
|
|
+ logCurrentNodeStatus("NodeRecordManager.Restart - After restart")
|
|
|
}
|
|
|
|
|
|
// For backward compatibility
|
|
@@ -70,11 +277,18 @@ var (
|
|
|
|
|
|
// InitDefaultManager initializes the default NodeRecordManager
|
|
|
func InitDefaultManager() {
|
|
|
+ logger.Debug("Initializing default NodeRecordManager")
|
|
|
+ logCurrentNodeStatus("InitDefaultManager - Before init")
|
|
|
+
|
|
|
if defaultManager != nil {
|
|
|
+ logger.Debug("Default manager exists, stopping it first")
|
|
|
defaultManager.Stop()
|
|
|
}
|
|
|
defaultManager = NewNodeRecordManager(context.Background())
|
|
|
defaultManager.Start()
|
|
|
+
|
|
|
+ logger.Debug("Default NodeRecordManager initialized")
|
|
|
+ logCurrentNodeStatus("InitDefaultManager - After init")
|
|
|
}
|
|
|
|
|
|
// RestartRetrieveNodesStatus restarts the node status retrieval process
|
|
@@ -83,18 +297,32 @@ func RestartRetrieveNodesStatus() {
|
|
|
restartMu.Lock()
|
|
|
defer restartMu.Unlock()
|
|
|
|
|
|
+ logger.Debug("RestartRetrieveNodesStatus called")
|
|
|
+ logCurrentNodeStatus("RestartRetrieveNodesStatus - Before restart")
|
|
|
+
|
|
|
if defaultManager == nil {
|
|
|
+ logger.Debug("Default manager is nil, initializing new one")
|
|
|
InitDefaultManager()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ logger.Debug("Restarting existing default manager")
|
|
|
defaultManager.Restart()
|
|
|
+
|
|
|
+ logger.Debug("RestartRetrieveNodesStatus completed")
|
|
|
+ logCurrentNodeStatus("RestartRetrieveNodesStatus - After restart")
|
|
|
}
|
|
|
|
|
|
// StartRetrieveNodesStatus starts the node status retrieval with a custom context
|
|
|
func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
|
|
|
+ logger.Debug("Starting node status retrieval with custom context")
|
|
|
+ logCurrentNodeStatus("StartRetrieveNodesStatus - Before start")
|
|
|
+
|
|
|
manager := NewNodeRecordManager(ctx)
|
|
|
manager.Start()
|
|
|
+
|
|
|
+ logger.Debug("Custom NodeRecordManager started")
|
|
|
+ logCurrentNodeStatus("StartRetrieveNodesStatus - After start")
|
|
|
return manager
|
|
|
}
|
|
|
|
|
@@ -104,68 +332,277 @@ func StartDefaultManager() {
|
|
|
restartMu.Lock()
|
|
|
defer restartMu.Unlock()
|
|
|
|
|
|
+ logger.Debug("StartDefaultManager called")
|
|
|
+ logCurrentNodeStatus("StartDefaultManager - Before start")
|
|
|
+
|
|
|
if defaultManager != nil {
|
|
|
logger.Info("DefaultManager already running, restarting...")
|
|
|
+ logger.Debug("Default manager exists, performing restart")
|
|
|
defaultManager.Restart()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
logger.Info("Starting default NodeRecordManager...")
|
|
|
+ logger.Debug("No default manager exists, initializing new one")
|
|
|
InitDefaultManager()
|
|
|
+
|
|
|
+ logger.Debug("StartDefaultManager completed")
|
|
|
+ logCurrentNodeStatus("StartDefaultManager - After start")
|
|
|
+}
|
|
|
+
|
|
|
+// cleanupDisabledNodes removes retry states for environments that are no longer enabled
|
|
|
+func cleanupDisabledNodes(enabledEnvIDs []uint64) {
|
|
|
+ retryMutex.Lock()
|
|
|
+ defer retryMutex.Unlock()
|
|
|
+
|
|
|
+ // Create a map for quick lookup
|
|
|
+ enabledMap := make(map[uint64]bool)
|
|
|
+ for _, id := range enabledEnvIDs {
|
|
|
+ enabledMap[id] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove retry states for disabled environments
|
|
|
+ var cleanedUp []uint64
|
|
|
+ for envID := range retryStates {
|
|
|
+ if !enabledMap[envID] {
|
|
|
+ delete(retryStates, envID)
|
|
|
+ cleanedUp = append(cleanedUp, envID)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(cleanedUp) > 0 {
|
|
|
+ logger.Debugf("cleanupDisabledNodes: Cleaned up retry states for disabled environments: %v", cleanedUp)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// removeFromNodeMap removes disabled nodes from NodeMap
|
|
|
+func removeFromNodeMap(enabledEnvIDs []uint64) {
|
|
|
+ mutex.Lock()
|
|
|
+ defer mutex.Unlock()
|
|
|
+
|
|
|
+ // Create a map for quick lookup
|
|
|
+ enabledMap := make(map[uint64]bool)
|
|
|
+ for _, id := range enabledEnvIDs {
|
|
|
+ enabledMap[id] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove nodes for disabled environments
|
|
|
+ var removed []uint64
|
|
|
+ for envID := range NodeMap {
|
|
|
+ if !enabledMap[envID] {
|
|
|
+ delete(NodeMap, envID)
|
|
|
+ removed = append(removed, envID)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(removed) > 0 {
|
|
|
+ logger.Debugf("removeFromNodeMap: Removed disabled nodes from NodeMap: %v", removed)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// checkEnvironmentStillEnabled checks if an environment is still enabled
|
|
|
+func checkEnvironmentStillEnabled(envID uint64) bool {
|
|
|
+ env := query.Environment
|
|
|
+ environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
|
|
|
+ if err != nil {
|
|
|
+ logger.Debugf("checkEnvironmentStillEnabled: Environment ID %d no longer enabled or not found", envID)
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return environment != nil
|
|
|
}
|
|
|
|
|
|
func RetrieveNodesStatus(ctx context.Context) {
|
|
|
logger.Info("RetrieveNodesStatus start")
|
|
|
+ logger.Debug("RetrieveNodesStatus: Initializing node status retrieval")
|
|
|
defer logger.Info("RetrieveNodesStatus exited")
|
|
|
+ defer logger.Debug("RetrieveNodesStatus: Cleanup completed")
|
|
|
|
|
|
mutex.Lock()
|
|
|
if NodeMap == nil {
|
|
|
+ logger.Debug("RetrieveNodesStatus: NodeMap is nil, creating new one")
|
|
|
NodeMap = make(TNodeMap)
|
|
|
+ } else {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: NodeMap already exists with %d nodes", len(NodeMap))
|
|
|
}
|
|
|
mutex.Unlock()
|
|
|
|
|
|
+ logCurrentNodeStatus("RetrieveNodesStatus - Initial state")
|
|
|
+
|
|
|
+ // Add periodic environment checking ticker
|
|
|
+ envCheckTicker := time.NewTicker(30 * time.Second) // Check every 30 seconds
|
|
|
+ defer envCheckTicker.Stop()
|
|
|
+
|
|
|
env := query.Environment
|
|
|
envs, err := env.Where(env.Enabled.Is(true)).Find()
|
|
|
if err != nil {
|
|
|
logger.Error(err)
|
|
|
+ logger.Debug("RetrieveNodesStatus: Failed to query enabled environments")
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Found %d enabled environments", len(envs))
|
|
|
+ for i, e := range envs {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Environment[%d] - ID: %d, Name: %s, Enabled: %t",
|
|
|
+ i, e.ID, e.Name, e.Enabled)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get current enabled environment IDs
|
|
|
+ var enabledEnvIDs []uint64
|
|
|
+ for _, e := range envs {
|
|
|
+ enabledEnvIDs = append(enabledEnvIDs, e.ID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Clean up disabled nodes
|
|
|
+ cleanupDisabledNodes(enabledEnvIDs)
|
|
|
+ removeFromNodeMap(enabledEnvIDs)
|
|
|
+
|
|
|
var wg sync.WaitGroup
|
|
|
defer wg.Wait()
|
|
|
|
|
|
+ // Channel to signal when environment list changes
|
|
|
+ envUpdateChan := make(chan []uint64, 1)
|
|
|
+
|
|
|
+ // Start environment monitoring goroutine
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ defer logger.Debug("RetrieveNodesStatus: Environment monitor goroutine completed")
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ logger.Debug("RetrieveNodesStatus: Environment monitor context cancelled")
|
|
|
+ return
|
|
|
+ case <-envCheckTicker.C:
|
|
|
+ // Re-check enabled environments
|
|
|
+ currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("RetrieveNodesStatus: Failed to re-query environments:", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ var currentEnabledIDs []uint64
|
|
|
+ for _, e := range currentEnvs {
|
|
|
+ currentEnabledIDs = append(currentEnabledIDs, e.ID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if environment list changed
|
|
|
+ if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Environment list changed from %v to %v", enabledEnvIDs, currentEnabledIDs)
|
|
|
+ cleanupDisabledNodes(currentEnabledIDs)
|
|
|
+ removeFromNodeMap(currentEnabledIDs)
|
|
|
+
|
|
|
+ // Update the list
|
|
|
+ enabledEnvIDs = currentEnabledIDs
|
|
|
+
|
|
|
+ // Notify about the change
|
|
|
+ select {
|
|
|
+ case envUpdateChan <- currentEnabledIDs:
|
|
|
+ default:
|
|
|
+ // Non-blocking send
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
for _, env := range envs {
|
|
|
wg.Add(1)
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Starting goroutine for environment ID: %d, Name: %s", env.ID, env.Name)
|
|
|
go func(e *model.Environment) {
|
|
|
defer wg.Done()
|
|
|
- retryTicker := time.NewTicker(5 * time.Second)
|
|
|
+ defer logger.Debugf("RetrieveNodesStatus: Goroutine completed for environment ID: %d", e.ID)
|
|
|
+
|
|
|
+ // Retry ticker - check every 1 second but use backoff logic to determine actual retry
|
|
|
+ retryTicker := time.NewTicker(1 * time.Second)
|
|
|
defer retryTicker.Stop()
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Context cancelled for environment ID: %d", e.ID)
|
|
|
return
|
|
|
- default:
|
|
|
+ case newEnabledIDs := <-envUpdateChan:
|
|
|
+ // Check if this environment is still enabled
|
|
|
+ found := false
|
|
|
+ for _, id := range newEnabledIDs {
|
|
|
+ if id == e.ID {
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Environment ID %d has been disabled, stopping goroutine", e.ID)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ case <-retryTicker.C:
|
|
|
+ // Double-check if environment is still enabled before retrying
|
|
|
+ if !checkEnvironmentStillEnabled(e.ID) {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Environment ID %d no longer enabled, stopping goroutine", e.ID)
|
|
|
+ // Clean up retry state
|
|
|
+ retryMutex.Lock()
|
|
|
+ delete(retryStates, e.ID)
|
|
|
+ retryMutex.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we should retry based on backoff logic
|
|
|
+ if !shouldRetry(e.ID, defaultRetryConfig) {
|
|
|
+ continue // Skip this iteration
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Attempting connection to environment ID: %d", e.ID)
|
|
|
if err := nodeAnalyticRecord(e, ctx); err != nil {
|
|
|
logger.Error(err)
|
|
|
- mutex.Lock()
|
|
|
- if NodeMap[e.ID] != nil {
|
|
|
- NodeMap[e.ID].Status = false
|
|
|
- }
|
|
|
- mutex.Unlock()
|
|
|
- select {
|
|
|
- case <-retryTicker.C:
|
|
|
- case <-ctx.Done():
|
|
|
- return
|
|
|
- }
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Connection failed for environment ID: %d, error: %v", e.ID, err)
|
|
|
+ markConnectionFailure(e.ID, defaultRetryConfig, err)
|
|
|
+ } else {
|
|
|
+ logger.Debugf("RetrieveNodesStatus: Connection successful for environment ID: %d", e.ID)
|
|
|
+ markConnectionSuccess(e.ID)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}(env)
|
|
|
}
|
|
|
+
|
|
|
+ logger.Debug("RetrieveNodesStatus: All goroutines started, waiting for completion")
|
|
|
+}
|
|
|
+
|
|
|
+// equalUint64Slices compares two uint64 slices for equality
|
|
|
+func equalUint64Slices(a, b []uint64) bool {
|
|
|
+ if len(a) != len(b) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create maps for comparison
|
|
|
+ mapA := make(map[uint64]bool)
|
|
|
+ mapB := make(map[uint64]bool)
|
|
|
+
|
|
|
+ for _, v := range a {
|
|
|
+ mapA[v] = true
|
|
|
+ }
|
|
|
+ for _, v := range b {
|
|
|
+ mapB[v] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Compare maps
|
|
|
+ for k := range mapA {
|
|
|
+ if !mapB[k] {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for k := range mapB {
|
|
|
+ if !mapA[k] {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Starting for environment ID: %d, Name: %s", env.ID, env.Name)
|
|
|
+
|
|
|
scopeCtx, cancel := context.WithCancel(ctx)
|
|
|
defer cancel()
|
|
|
|
|
@@ -175,17 +612,22 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
|
|
|
NodeMap[env.ID] = node
|
|
|
mutex.Unlock()
|
|
|
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Node initialized for environment ID: %d", env.ID)
|
|
|
+
|
|
|
if err != nil {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: InitNode failed for environment ID: %d, error: %v", env.ID, err)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
u, err := env.GetWebSocketURL("/api/analytic/intro")
|
|
|
if err != nil {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: GetWebSocketURL failed for environment ID: %d, error: %v", env.ID, err)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- header := http.Header{}
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Connecting to WebSocket URL: %s for environment ID: %d", u, env.ID)
|
|
|
|
|
|
+ header := http.Header{}
|
|
|
header.Set("X-Node-Secret", env.Token)
|
|
|
|
|
|
dial := &websocket.Dialer{
|
|
@@ -195,33 +637,48 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
|
|
|
|
|
|
c, _, err := dial.Dial(u, header)
|
|
|
if err != nil {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: WebSocket dial failed for environment ID: %d, error: %v", env.ID, err)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
defer c.Close()
|
|
|
+ logger.Debugf("nodeAnalyticRecord: WebSocket connection established for environment ID: %d", env.ID)
|
|
|
|
|
|
go func() {
|
|
|
<-scopeCtx.Done()
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Context cancelled, closing WebSocket for environment ID: %d", env.ID)
|
|
|
_ = c.Close()
|
|
|
}()
|
|
|
|
|
|
var nodeStat NodeStat
|
|
|
+ messageCount := 0
|
|
|
|
|
|
for {
|
|
|
err = c.ReadJSON(&nodeStat)
|
|
|
if err != nil {
|
|
|
if helper.IsUnexpectedWebsocketError(err) {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Unexpected WebSocket error for environment ID: %d, error: %v", env.ID, err)
|
|
|
return err
|
|
|
}
|
|
|
+ logger.Debugf("nodeAnalyticRecord: WebSocket read completed for environment ID: %d", env.ID)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ messageCount++
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Received message #%d from environment ID: %d", messageCount, env.ID)
|
|
|
+
|
|
|
// set online
|
|
|
nodeStat.Status = true
|
|
|
nodeStat.ResponseAt = time.Now()
|
|
|
|
|
|
mutex.Lock()
|
|
|
- NodeMap[env.ID].NodeStat = nodeStat
|
|
|
+ if NodeMap[env.ID] != nil {
|
|
|
+ NodeMap[env.ID].NodeStat = nodeStat
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Updated NodeStat for environment ID: %d, Status: %t, ResponseAt: %v",
|
|
|
+ env.ID, nodeStat.Status, nodeStat.ResponseAt)
|
|
|
+ } else {
|
|
|
+ logger.Debugf("nodeAnalyticRecord: Warning - Node not found in NodeMap for environment ID: %d", env.ID)
|
|
|
+ }
|
|
|
mutex.Unlock()
|
|
|
}
|
|
|
}
|