udp_proxy.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package proxy
  2. import (
  3. "encoding/binary"
  4. "github.com/dotcloud/docker/utils"
  5. "log"
  6. "net"
  7. "sync"
  8. "syscall"
  9. "time"
  10. )
  11. const (
  12. UDPConnTrackTimeout = 90 * time.Second
  13. UDPBufSize = 2048
  14. )
  15. // A net.Addr where the IP is split into two fields so you can use it as a key
  16. // in a map:
  17. type connTrackKey struct {
  18. IPHigh uint64
  19. IPLow uint64
  20. Port int
  21. }
  22. func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
  23. if len(addr.IP) == net.IPv4len {
  24. return &connTrackKey{
  25. IPHigh: 0,
  26. IPLow: uint64(binary.BigEndian.Uint32(addr.IP)),
  27. Port: addr.Port,
  28. }
  29. }
  30. return &connTrackKey{
  31. IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
  32. IPLow: binary.BigEndian.Uint64(addr.IP[8:]),
  33. Port: addr.Port,
  34. }
  35. }
  36. type connTrackMap map[connTrackKey]*net.UDPConn
  37. type UDPProxy struct {
  38. listener *net.UDPConn
  39. frontendAddr *net.UDPAddr
  40. backendAddr *net.UDPAddr
  41. connTrackTable connTrackMap
  42. connTrackLock sync.Mutex
  43. }
  44. func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
  45. listener, err := net.ListenUDP("udp", frontendAddr)
  46. if err != nil {
  47. return nil, err
  48. }
  49. return &UDPProxy{
  50. listener: listener,
  51. frontendAddr: listener.LocalAddr().(*net.UDPAddr),
  52. backendAddr: backendAddr,
  53. connTrackTable: make(connTrackMap),
  54. }, nil
  55. }
  56. func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
  57. defer func() {
  58. proxy.connTrackLock.Lock()
  59. delete(proxy.connTrackTable, *clientKey)
  60. proxy.connTrackLock.Unlock()
  61. utils.Debugf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String())
  62. proxyConn.Close()
  63. }()
  64. readBuf := make([]byte, UDPBufSize)
  65. for {
  66. proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
  67. again:
  68. read, err := proxyConn.Read(readBuf)
  69. if err != nil {
  70. if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
  71. // This will happen if the last write failed
  72. // (e.g: nothing is actually listening on the
  73. // proxied port on the container), ignore it
  74. // and continue until UDPConnTrackTimeout
  75. // expires:
  76. goto again
  77. }
  78. return
  79. }
  80. for i := 0; i != read; {
  81. written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
  82. if err != nil {
  83. return
  84. }
  85. i += written
  86. utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String())
  87. }
  88. }
  89. }
  90. func (proxy *UDPProxy) Run() {
  91. readBuf := make([]byte, UDPBufSize)
  92. utils.Debugf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr)
  93. for {
  94. read, from, err := proxy.listener.ReadFromUDP(readBuf)
  95. if err != nil {
  96. // NOTE: Apparently ReadFrom doesn't return
  97. // ECONNREFUSED like Read do (see comment in
  98. // UDPProxy.replyLoop)
  99. if utils.IsClosedError(err) {
  100. utils.Debugf("Stopping proxy on udp/%v for udp/%v (socket was closed)", proxy.frontendAddr, proxy.backendAddr)
  101. } else {
  102. utils.Errorf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
  103. }
  104. break
  105. }
  106. fromKey := newConnTrackKey(from)
  107. proxy.connTrackLock.Lock()
  108. proxyConn, hit := proxy.connTrackTable[*fromKey]
  109. if !hit {
  110. proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
  111. if err != nil {
  112. log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
  113. continue
  114. }
  115. proxy.connTrackTable[*fromKey] = proxyConn
  116. go proxy.replyLoop(proxyConn, from, fromKey)
  117. }
  118. proxy.connTrackLock.Unlock()
  119. for i := 0; i != read; {
  120. written, err := proxyConn.Write(readBuf[i:read])
  121. if err != nil {
  122. log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
  123. break
  124. }
  125. i += written
  126. utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String())
  127. }
  128. }
  129. }
  130. func (proxy *UDPProxy) Close() {
  131. proxy.listener.Close()
  132. proxy.connTrackLock.Lock()
  133. defer proxy.connTrackLock.Unlock()
  134. for _, conn := range proxy.connTrackTable {
  135. conn.Close()
  136. }
  137. }
  138. func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  139. func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr }