tcp_proxy.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package proxy
  2. import (
  3. "io"
  4. "net"
  5. "syscall"
  6. "github.com/Sirupsen/logrus"
  7. )
  8. // TCPProxy is a proxy for TCP connections. It implements the Proxy interface to
  9. // handle TCP traffic forwarding between the frontend and backend addresses.
  10. type TCPProxy struct {
  11. listener *net.TCPListener
  12. frontendAddr *net.TCPAddr
  13. backendAddr *net.TCPAddr
  14. }
  15. // NewTCPProxy creates a new TCPProxy.
  16. func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
  17. listener, err := net.ListenTCP("tcp", frontendAddr)
  18. if err != nil {
  19. return nil, err
  20. }
  21. // If the port in frontendAddr was 0 then ListenTCP will have a picked
  22. // a port to listen on, hence the call to Addr to get that actual port:
  23. return &TCPProxy{
  24. listener: listener,
  25. frontendAddr: listener.Addr().(*net.TCPAddr),
  26. backendAddr: backendAddr,
  27. }, nil
  28. }
  29. func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
  30. backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
  31. if err != nil {
  32. logrus.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
  33. client.Close()
  34. return
  35. }
  36. event := make(chan int64)
  37. var broker = func(to, from *net.TCPConn) {
  38. written, err := io.Copy(to, from)
  39. if err != nil {
  40. // If the socket we are writing to is shutdown with
  41. // SHUT_WR, forward it to the other end of the pipe:
  42. if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
  43. from.CloseWrite()
  44. }
  45. }
  46. to.CloseRead()
  47. event <- written
  48. }
  49. go broker(client, backend)
  50. go broker(backend, client)
  51. var transferred int64
  52. for i := 0; i < 2; i++ {
  53. select {
  54. case written := <-event:
  55. transferred += written
  56. case <-quit:
  57. // Interrupt the two brokers and "join" them.
  58. client.Close()
  59. backend.Close()
  60. for ; i < 2; i++ {
  61. transferred += <-event
  62. }
  63. return
  64. }
  65. }
  66. client.Close()
  67. backend.Close()
  68. }
  69. // Run starts forwarding the traffic using TCP.
  70. func (proxy *TCPProxy) Run() {
  71. quit := make(chan bool)
  72. defer close(quit)
  73. for {
  74. client, err := proxy.listener.Accept()
  75. if err != nil {
  76. logrus.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
  77. return
  78. }
  79. go proxy.clientLoop(client.(*net.TCPConn), quit)
  80. }
  81. }
  82. // Close stops forwarding the traffic.
  83. func (proxy *TCPProxy) Close() { proxy.listener.Close() }
  84. // FrontendAddr returns the TCP address on which the proxy is listening.
  85. func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  86. // BackendAddr returns the TCP proxied address.
  87. func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }