tcp_proxy.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package proxy
  2. import (
  3. "github.com/dotcloud/docker/utils"
  4. "io"
  5. "log"
  6. "net"
  7. "syscall"
  8. )
  9. type TCPProxy struct {
  10. listener *net.TCPListener
  11. frontendAddr *net.TCPAddr
  12. backendAddr *net.TCPAddr
  13. }
  14. func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
  15. listener, err := net.ListenTCP("tcp", frontendAddr)
  16. if err != nil {
  17. return nil, err
  18. }
  19. // If the port in frontendAddr was 0 then ListenTCP will have a picked
  20. // a port to listen on, hence the call to Addr to get that actual port:
  21. return &TCPProxy{
  22. listener: listener,
  23. frontendAddr: listener.Addr().(*net.TCPAddr),
  24. backendAddr: backendAddr,
  25. }, nil
  26. }
  27. func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
  28. backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
  29. if err != nil {
  30. log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error())
  31. client.Close()
  32. return
  33. }
  34. event := make(chan int64)
  35. var broker = func(to, from *net.TCPConn) {
  36. written, err := io.Copy(to, from)
  37. if err != nil {
  38. // If the socket we are writing to is shutdown with
  39. // SHUT_WR, forward it to the other end of the pipe:
  40. if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
  41. from.CloseWrite()
  42. }
  43. }
  44. to.CloseRead()
  45. event <- written
  46. }
  47. utils.Debugf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr())
  48. go broker(client, backend)
  49. go broker(backend, client)
  50. var transferred int64 = 0
  51. for i := 0; i < 2; i++ {
  52. select {
  53. case written := <-event:
  54. transferred += written
  55. case <-quit:
  56. // Interrupt the two brokers and "join" them.
  57. client.Close()
  58. backend.Close()
  59. for ; i < 2; i++ {
  60. transferred += <-event
  61. }
  62. goto done
  63. }
  64. }
  65. client.Close()
  66. backend.Close()
  67. done:
  68. utils.Debugf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr())
  69. }
  70. func (proxy *TCPProxy) Run() {
  71. quit := make(chan bool)
  72. defer close(quit)
  73. utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
  74. for {
  75. client, err := proxy.listener.Accept()
  76. if err != nil {
  77. utils.Debugf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error())
  78. return
  79. }
  80. go proxy.clientLoop(client.(*net.TCPConn), quit)
  81. }
  82. }
  83. func (proxy *TCPProxy) Close() { proxy.listener.Close() }
  84. func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  85. func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }