tcp_proxy.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package proxy
  2. import (
  3. "io"
  4. "net"
  5. "syscall"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. type TCPProxy struct {
  9. listener *net.TCPListener
  10. frontendAddr *net.TCPAddr
  11. backendAddr *net.TCPAddr
  12. }
  13. func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
  14. listener, err := net.ListenTCP("tcp", frontendAddr)
  15. if err != nil {
  16. return nil, err
  17. }
  18. // If the port in frontendAddr was 0 then ListenTCP will have a picked
  19. // a port to listen on, hence the call to Addr to get that actual port:
  20. return &TCPProxy{
  21. listener: listener,
  22. frontendAddr: listener.Addr().(*net.TCPAddr),
  23. backendAddr: backendAddr,
  24. }, nil
  25. }
  26. func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
  27. backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
  28. if err != nil {
  29. log.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
  30. client.Close()
  31. return
  32. }
  33. event := make(chan int64)
  34. var broker = func(to, from *net.TCPConn) {
  35. written, err := io.Copy(to, from)
  36. if err != nil {
  37. // If the socket we are writing to is shutdown with
  38. // SHUT_WR, forward it to the other end of the pipe:
  39. if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
  40. from.CloseWrite()
  41. }
  42. }
  43. to.CloseRead()
  44. event <- written
  45. }
  46. go broker(client, backend)
  47. go broker(backend, client)
  48. var transferred int64 = 0
  49. for i := 0; i < 2; i++ {
  50. select {
  51. case written := <-event:
  52. transferred += written
  53. case <-quit:
  54. // Interrupt the two brokers and "join" them.
  55. client.Close()
  56. backend.Close()
  57. for ; i < 2; i++ {
  58. transferred += <-event
  59. }
  60. return
  61. }
  62. }
  63. client.Close()
  64. backend.Close()
  65. }
  66. func (proxy *TCPProxy) Run() {
  67. quit := make(chan bool)
  68. defer close(quit)
  69. for {
  70. client, err := proxy.listener.Accept()
  71. if err != nil {
  72. log.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
  73. return
  74. }
  75. go proxy.clientLoop(client.(*net.TCPConn), quit)
  76. }
  77. }
  78. func (proxy *TCPProxy) Close() { proxy.listener.Close() }
  79. func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
  80. func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }