udp_proxy.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package proxy
  2. import (
  3. "encoding/binary"
  4. "net"
  5. "strings"
  6. "sync"
  7. "syscall"
  8. "time"
  9. log "github.com/Sirupsen/logrus"
  10. )
  11. const (
  12. UDPConnTrackTimeout = 90 * time.Second
  13. UDPBufSize = 2048
  14. )
  15. // A net.Addr where the IP is split into two fields so you can use it as a key
  16. // in a map:
  17. type connTrackKey struct {
  18. IPHigh uint64
  19. IPLow uint64
  20. Port int
  21. }
  22. func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
  23. if len(addr.IP) == net.IPv4len {
  24. return &connTrackKey{
  25. IPHigh: 0,
  26. IPLow: uint64(binary.BigEndian.Uint32(addr.IP)),
  27. Port: addr.Port,
  28. }
  29. }
  30. return &connTrackKey{
  31. IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
  32. IPLow: binary.BigEndian.Uint64(addr.IP[8:]),
  33. Port: addr.Port,
  34. }
  35. }
  36. type connTrackMap map[connTrackKey]*net.UDPConn
  37. type UDPProxy struct {
  38. listener *net.UDPConn
  39. frontendAddr *net.UDPAddr
  40. backendAddr *net.UDPAddr
  41. connTrackTable connTrackMap
  42. connTrackLock sync.Mutex
  43. }
  44. func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
  45. listener, err := net.ListenUDP("udp", frontendAddr)
  46. if err != nil {
  47. return nil, err
  48. }
  49. return &UDPProxy{
  50. listener: listener,
  51. frontendAddr: listener.LocalAddr().(*net.UDPAddr),
  52. backendAddr: backendAddr,
  53. connTrackTable: make(connTrackMap),
  54. }, nil
  55. }
  56. func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
  57. defer func() {
  58. proxy.connTrackLock.Lock()
  59. delete(proxy.connTrackTable, *clientKey)
  60. proxy.connTrackLock.Unlock()
  61. proxyConn.Close()
  62. }()
  63. readBuf := make([]byte, UDPBufSize)
  64. for {
  65. proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
  66. again:
  67. read, err := proxyConn.Read(readBuf)
  68. if err != nil {
  69. if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
  70. // This will happen if the last write failed
  71. // (e.g: nothing is actually listening on the
  72. // proxied port on the container), ignore it
  73. // and continue until UDPConnTrackTimeout
  74. // expires:
  75. goto again
  76. }
  77. return
  78. }
  79. for i := 0; i != read; {
  80. written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
  81. if err != nil {
  82. return
  83. }
  84. i += written
  85. }
  86. }
  87. }
  88. func (proxy *UDPProxy) Run() {
  89. readBuf := make([]byte, UDPBufSize)
  90. for {
  91. read, from, err := proxy.listener.ReadFromUDP(readBuf)
  92. if err != nil {
  93. // NOTE: Apparently ReadFrom doesn't return
  94. // ECONNREFUSED like Read do (see comment in
  95. // UDPProxy.replyLoop)
  96. if !isClosedError(err) {
  97. log.Printf("Stopping proxy on udp/%v for udp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
  98. }
  99. break
  100. }
  101. fromKey := newConnTrackKey(from)
  102. proxy.connTrackLock.Lock()
  103. proxyConn, hit := proxy.connTrackTable[*fromKey]
  104. if !hit {
  105. proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
  106. if err != nil {
  107. log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
  108. proxy.connTrackLock.Unlock()
  109. continue
  110. }
  111. proxy.connTrackTable[*fromKey] = proxyConn
  112. go proxy.replyLoop(proxyConn, from, fromKey)
  113. }
  114. proxy.connTrackLock.Unlock()
  115. for i := 0; i != read; {
  116. written, err := proxyConn.Write(readBuf[i:read])
  117. if err != nil {
  118. log.Printf("Can't proxy a datagram to udp/%s: %s\n", proxy.backendAddr, err)
  119. break
  120. }
  121. i += written
  122. }
  123. }
  124. }
  125. func (proxy *UDPProxy) Close() {
  126. proxy.listener.Close()
  127. proxy.connTrackLock.Lock()
  128. defer proxy.connTrackLock.Unlock()
  129. for _, conn := range proxy.connTrackTable {
  130. conn.Close()
  131. }
  132. }
  133. func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  134. func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
  135. func isClosedError(err error) bool {
  136. /* This comparison is ugly, but unfortunately, net.go doesn't export errClosing.
  137. * See:
  138. * http://golang.org/src/pkg/net/net.go
  139. * https://code.google.com/p/go/issues/detail?id=4337
  140. * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ
  141. */
  142. return strings.HasSuffix(err.Error(), "use of closed network connection")
  143. }