network_proxy.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package docker
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "github.com/dotcloud/docker/utils"
  6. "io"
  7. "log"
  8. "net"
  9. "sync"
  10. "syscall"
  11. "time"
  12. )
  13. const (
  14. UDPConnTrackTimeout = 90 * time.Second
  15. UDPBufSize = 2048
  16. )
  17. type Proxy interface {
  18. // Start forwarding traffic back and forth the front and back-end
  19. // addresses.
  20. Run()
  21. // Stop forwarding traffic and close both ends of the Proxy.
  22. Close()
  23. // Return the address on which the proxy is listening.
  24. FrontendAddr() net.Addr
  25. // Return the proxied address.
  26. BackendAddr() net.Addr
  27. }
  28. type TCPProxy struct {
  29. listener *net.TCPListener
  30. frontendAddr *net.TCPAddr
  31. backendAddr *net.TCPAddr
  32. }
  33. func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
  34. listener, err := net.ListenTCP("tcp", frontendAddr)
  35. if err != nil {
  36. return nil, err
  37. }
  38. // If the port in frontendAddr was 0 then ListenTCP will have a picked
  39. // a port to listen on, hence the call to Addr to get that actual port:
  40. return &TCPProxy{
  41. listener: listener,
  42. frontendAddr: listener.Addr().(*net.TCPAddr),
  43. backendAddr: backendAddr,
  44. }, nil
  45. }
  46. func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
  47. backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
  48. if err != nil {
  49. log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error())
  50. client.Close()
  51. return
  52. }
  53. event := make(chan int64)
  54. var broker = func(to, from *net.TCPConn) {
  55. written, err := io.Copy(to, from)
  56. if err != nil {
  57. // If the socket we are writing to is shutdown with
  58. // SHUT_WR, forward it to the other end of the pipe:
  59. if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
  60. from.CloseWrite()
  61. }
  62. }
  63. to.CloseRead()
  64. event <- written
  65. }
  66. utils.Debugf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr())
  67. go broker(client, backend)
  68. go broker(backend, client)
  69. var transferred int64 = 0
  70. for i := 0; i < 2; i++ {
  71. select {
  72. case written := <-event:
  73. transferred += written
  74. case <-quit:
  75. // Interrupt the two brokers and "join" them.
  76. client.Close()
  77. backend.Close()
  78. for ; i < 2; i++ {
  79. transferred += <-event
  80. }
  81. goto done
  82. }
  83. }
  84. client.Close()
  85. backend.Close()
  86. done:
  87. utils.Debugf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr())
  88. }
  89. func (proxy *TCPProxy) Run() {
  90. quit := make(chan bool)
  91. defer close(quit)
  92. utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
  93. for {
  94. client, err := proxy.listener.Accept()
  95. if err != nil {
  96. utils.Errorf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
  97. return
  98. }
  99. go proxy.clientLoop(client.(*net.TCPConn), quit)
  100. }
  101. }
  102. func (proxy *TCPProxy) Close() { proxy.listener.Close() }
  103. func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  104. func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
  105. // A net.Addr where the IP is split into two fields so you can use it as a key
  106. // in a map:
  107. type connTrackKey struct {
  108. IPHigh uint64
  109. IPLow uint64
  110. Port int
  111. }
  112. func newConnTrackKey(addr *net.UDPAddr) *connTrackKey {
  113. if len(addr.IP) == net.IPv4len {
  114. return &connTrackKey{
  115. IPHigh: 0,
  116. IPLow: uint64(binary.BigEndian.Uint32(addr.IP)),
  117. Port: addr.Port,
  118. }
  119. }
  120. return &connTrackKey{
  121. IPHigh: binary.BigEndian.Uint64(addr.IP[:8]),
  122. IPLow: binary.BigEndian.Uint64(addr.IP[8:]),
  123. Port: addr.Port,
  124. }
  125. }
  126. type connTrackMap map[connTrackKey]*net.UDPConn
  127. type UDPProxy struct {
  128. listener *net.UDPConn
  129. frontendAddr *net.UDPAddr
  130. backendAddr *net.UDPAddr
  131. connTrackTable connTrackMap
  132. connTrackLock sync.Mutex
  133. }
  134. func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
  135. listener, err := net.ListenUDP("udp", frontendAddr)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return &UDPProxy{
  140. listener: listener,
  141. frontendAddr: listener.LocalAddr().(*net.UDPAddr),
  142. backendAddr: backendAddr,
  143. connTrackTable: make(connTrackMap),
  144. }, nil
  145. }
  146. func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) {
  147. defer func() {
  148. proxy.connTrackLock.Lock()
  149. delete(proxy.connTrackTable, *clientKey)
  150. proxy.connTrackLock.Unlock()
  151. utils.Debugf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String())
  152. proxyConn.Close()
  153. }()
  154. readBuf := make([]byte, UDPBufSize)
  155. for {
  156. proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout))
  157. again:
  158. read, err := proxyConn.Read(readBuf)
  159. if err != nil {
  160. if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED {
  161. // This will happen if the last write failed
  162. // (e.g: nothing is actually listening on the
  163. // proxied port on the container), ignore it
  164. // and continue until UDPConnTrackTimeout
  165. // expires:
  166. goto again
  167. }
  168. return
  169. }
  170. for i := 0; i != read; {
  171. written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr)
  172. if err != nil {
  173. return
  174. }
  175. i += written
  176. utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String())
  177. }
  178. }
  179. }
  180. func (proxy *UDPProxy) Run() {
  181. readBuf := make([]byte, UDPBufSize)
  182. utils.Debugf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr)
  183. for {
  184. read, from, err := proxy.listener.ReadFromUDP(readBuf)
  185. if err != nil {
  186. // NOTE: Apparently ReadFrom doesn't return
  187. // ECONNREFUSED like Read do (see comment in
  188. // UDPProxy.replyLoop)
  189. utils.Errorf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
  190. break
  191. }
  192. fromKey := newConnTrackKey(from)
  193. proxy.connTrackLock.Lock()
  194. proxyConn, hit := proxy.connTrackTable[*fromKey]
  195. if !hit {
  196. proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr)
  197. if err != nil {
  198. log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
  199. continue
  200. }
  201. proxy.connTrackTable[*fromKey] = proxyConn
  202. go proxy.replyLoop(proxyConn, from, fromKey)
  203. }
  204. proxy.connTrackLock.Unlock()
  205. for i := 0; i != read; {
  206. written, err := proxyConn.Write(readBuf[i:read])
  207. if err != nil {
  208. log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err)
  209. break
  210. }
  211. i += written
  212. utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String())
  213. }
  214. }
  215. }
  216. func (proxy *UDPProxy) Close() {
  217. proxy.listener.Close()
  218. proxy.connTrackLock.Lock()
  219. defer proxy.connTrackLock.Unlock()
  220. for _, conn := range proxy.connTrackTable {
  221. conn.Close()
  222. }
  223. }
  224. func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  225. func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
  226. func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
  227. switch frontendAddr.(type) {
  228. case *net.UDPAddr:
  229. return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
  230. case *net.TCPAddr:
  231. return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
  232. default:
  233. panic(fmt.Errorf("Unsupported protocol"))
  234. }
  235. }