consul_resolver.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package upstream
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "strings"
  7. "time"
  8. )
  9. // ConsulResolver handles DNS resolution through consul
  10. type ConsulResolver struct {
  11. resolver string // e.g., "127.0.0.1:8600"
  12. }
  13. // NewConsulResolver creates a new consul resolver
  14. func NewConsulResolver(resolver string) *ConsulResolver {
  15. return &ConsulResolver{
  16. resolver: resolver,
  17. }
  18. }
  19. // ResolveService resolves a consul service to actual IP addresses and ports
  20. func (cr *ConsulResolver) ResolveService(serviceURL string) ([]string, error) {
  21. // Parse consul service URL (e.g., "service.consul service=redacted-net resolve")
  22. serviceName := cr.extractServiceName(serviceURL)
  23. if serviceName == "" {
  24. return nil, fmt.Errorf("could not extract service name from: %s", serviceURL)
  25. }
  26. // Create a custom resolver that uses the consul DNS server
  27. dialer := &net.Dialer{
  28. Timeout: 5 * time.Second,
  29. }
  30. resolver := &net.Resolver{
  31. PreferGo: true,
  32. Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
  33. return dialer.DialContext(ctx, network, cr.resolver)
  34. },
  35. }
  36. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  37. defer cancel()
  38. // Query consul for service SRV records
  39. _, srvRecords, err := resolver.LookupSRV(ctx, "", "", serviceName+".service.consul")
  40. if err != nil {
  41. // Fallback to A record lookup if SRV fails
  42. ips, err := resolver.LookupIPAddr(ctx, serviceName+".service.consul")
  43. if err != nil {
  44. return nil, fmt.Errorf("failed to resolve service %s: %v", serviceName, err)
  45. }
  46. // Return IP addresses with default port (80)
  47. var addresses []string
  48. for _, ip := range ips {
  49. addresses = append(addresses, fmt.Sprintf("%s:80", ip.IP.String()))
  50. }
  51. return addresses, nil
  52. }
  53. // Convert SRV records to address:port format
  54. var addresses []string
  55. for _, srv := range srvRecords {
  56. // Resolve the target hostname to IP
  57. ips, err := resolver.LookupIPAddr(ctx, srv.Target)
  58. if err != nil {
  59. continue // Skip this record if resolution fails
  60. }
  61. for _, ip := range ips {
  62. addresses = append(addresses, fmt.Sprintf("%s:%d", ip.IP.String(), srv.Port))
  63. }
  64. }
  65. if len(addresses) == 0 {
  66. return nil, fmt.Errorf("no addresses found for service %s", serviceName)
  67. }
  68. return addresses, nil
  69. }
  70. // extractServiceName extracts the service name from consul service URL
  71. func (cr *ConsulResolver) extractServiceName(serviceURL string) string {
  72. serviceURL = strings.TrimSpace(serviceURL)
  73. // Handle empty input
  74. if serviceURL == "" {
  75. return ""
  76. }
  77. // Parse "service.consul service=redacted-net resolve" format
  78. if strings.Contains(serviceURL, "service=") {
  79. parts := strings.Fields(serviceURL)
  80. for _, part := range parts {
  81. if strings.HasPrefix(part, "service=") {
  82. serviceName := strings.TrimPrefix(part, "service=")
  83. // Handle edge cases like "service=" or "service= "
  84. serviceName = strings.TrimSpace(serviceName)
  85. if serviceName == "" {
  86. return ""
  87. }
  88. return serviceName
  89. }
  90. }
  91. }
  92. // Fallback: try to extract from hostname format like "my-service.service.consul"
  93. if strings.Contains(serviceURL, ".service.consul") {
  94. parts := strings.Split(serviceURL, ".")
  95. if len(parts) > 0 {
  96. serviceName := strings.TrimSpace(parts[0])
  97. if serviceName == "" {
  98. return ""
  99. }
  100. return serviceName
  101. }
  102. }
  103. return ""
  104. }
  105. // TestConsulTargets performs availability test specifically for consul targets
  106. func TestConsulTargets(consulTargets []ProxyTarget) map[string]*Status {
  107. result := make(map[string]*Status)
  108. // Group consul targets by resolver
  109. consulTargetsByResolver := make(map[string][]ProxyTarget)
  110. for _, target := range consulTargets {
  111. if target.Resolver != "" {
  112. consulTargetsByResolver[target.Resolver] = append(consulTargetsByResolver[target.Resolver], target)
  113. } else {
  114. // No resolver specified, mark as offline
  115. key := target.Host + ":" + target.Port
  116. result[key] = &Status{
  117. Online: false,
  118. Latency: 0,
  119. }
  120. }
  121. }
  122. // Test each resolver group
  123. for resolver, targets := range consulTargetsByResolver {
  124. consulResolver := NewConsulResolver(resolver)
  125. for _, target := range targets {
  126. key := target.Host + ":" + target.Port
  127. // Try to resolve the consul service
  128. addresses, err := consulResolver.ResolveService(target.ServiceURL)
  129. if err != nil {
  130. // If resolution fails, mark as offline
  131. result[key] = &Status{
  132. Online: false,
  133. Latency: 0,
  134. }
  135. continue
  136. }
  137. // Test the first resolved address as representative
  138. if len(addresses) > 0 {
  139. addressResults := AvailabilityTest(addresses[:1])
  140. if status, exists := addressResults[addresses[0]]; exists {
  141. result[key] = status
  142. } else {
  143. result[key] = &Status{
  144. Online: false,
  145. Latency: 0,
  146. }
  147. }
  148. } else {
  149. result[key] = &Status{
  150. Online: false,
  151. Latency: 0,
  152. }
  153. }
  154. }
  155. }
  156. return result
  157. }
  158. // EnhancedAvailabilityTest performs availability test with consul resolution support
  159. // Deprecated: Use TestConsulTargets for consul targets and AvailabilityTest for regular targets
  160. func EnhancedAvailabilityTest(targets []ProxyTarget) map[string]*Status {
  161. result := make(map[string]*Status)
  162. // Group targets by type
  163. consulTargets := make([]ProxyTarget, 0)
  164. regularTargets := make([]string, 0)
  165. for _, target := range targets {
  166. if target.IsConsul && target.Resolver != "" {
  167. consulTargets = append(consulTargets, target)
  168. } else {
  169. // Regular target - use existing format for traditional AvailabilityTest
  170. key := target.Host + ":" + target.Port
  171. regularTargets = append(regularTargets, key)
  172. }
  173. }
  174. // Use traditional AvailabilityTest for regular targets (more efficient)
  175. if len(regularTargets) > 0 {
  176. regularResults := AvailabilityTest(regularTargets)
  177. // Merge results
  178. for k, v := range regularResults {
  179. result[k] = v
  180. }
  181. }
  182. // Test consul targets with DNS resolution
  183. if len(consulTargets) > 0 {
  184. consulResults := TestConsulTargets(consulTargets)
  185. for k, v := range consulResults {
  186. result[k] = v
  187. }
  188. }
  189. return result
  190. }