network_proxy_test.go 8.1 KB


  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/ishidawataru/sctp"
  12. // this takes care of the incontainer flag
  13. _ "github.com/docker/libnetwork/testutils"
  14. )
  15. var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
  16. var testBufSize = len(testBuf)
  17. type EchoServer interface {
  18. Run()
  19. Close()
  20. LocalAddr() net.Addr
  21. }
  22. type EchoServerOptions struct {
  23. TCPHalfClose bool
  24. }
  25. type StreamEchoServer struct {
  26. listener net.Listener
  27. testCtx *testing.T
  28. opts EchoServerOptions
  29. }
  30. type UDPEchoServer struct {
  31. conn net.PacketConn
  32. testCtx *testing.T
  33. }
  34. func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
  35. var server EchoServer
  36. if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
  37. t.Fatalf("TCPHalfClose is not supported for %s", proto)
  38. }
  39. switch {
  40. case strings.HasPrefix(proto, "tcp"):
  41. listener, err := net.Listen(proto, address)
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. server = &StreamEchoServer{listener: listener, testCtx: t, opts: opts}
  46. case strings.HasPrefix(proto, "udp"):
  47. socket, err := net.ListenPacket(proto, address)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. server = &UDPEchoServer{conn: socket, testCtx: t}
  52. case strings.HasPrefix(proto, "sctp"):
  53. addr, err := sctp.ResolveSCTPAddr(proto, address)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. listener, err := sctp.ListenSCTP(proto, addr)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. server = &StreamEchoServer{listener: listener, testCtx: t}
  62. default:
  63. t.Fatalf("unknown protocol: %s", proto)
  64. }
  65. return server
  66. }
  67. func (server *StreamEchoServer) Run() {
  68. go func() {
  69. for {
  70. client, err := server.listener.Accept()
  71. if err != nil {
  72. return
  73. }
  74. go func(client net.Conn) {
  75. if server.opts.TCPHalfClose {
  76. data, err := ioutil.ReadAll(client)
  77. if err != nil {
  78. server.testCtx.Logf("io.ReadAll() failed for the client: %v\n", err.Error())
  79. }
  80. if _, err := client.Write(data); err != nil {
  81. server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
  82. }
  83. client.(*net.TCPConn).CloseWrite()
  84. } else {
  85. if _, err := io.Copy(client, client); err != nil {
  86. server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
  87. }
  88. client.Close()
  89. }
  90. }(client)
  91. }
  92. }()
  93. }
  94. func (server *StreamEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
  95. func (server *StreamEchoServer) Close() { server.listener.Close() }
  96. func (server *UDPEchoServer) Run() {
  97. go func() {
  98. readBuf := make([]byte, 1024)
  99. for {
  100. read, from, err := server.conn.ReadFrom(readBuf)
  101. if err != nil {
  102. return
  103. }
  104. for i := 0; i != read; {
  105. written, err := server.conn.WriteTo(readBuf[i:read], from)
  106. if err != nil {
  107. break
  108. }
  109. i += written
  110. }
  111. }
  112. }()
  113. }
  114. func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
  115. func (server *UDPEchoServer) Close() { server.conn.Close() }
  116. func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
  117. defer proxy.Close()
  118. go proxy.Run()
  119. var client net.Conn
  120. var err error
  121. if strings.HasPrefix(proto, "sctp") {
  122. var a *sctp.SCTPAddr
  123. a, err = sctp.ResolveSCTPAddr(proto, addr)
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. client, err = sctp.DialSCTP(proto, nil, a)
  128. } else {
  129. client, err = net.Dial(proto, addr)
  130. }
  131. if err != nil {
  132. t.Fatalf("Can't connect to the proxy: %v", err)
  133. }
  134. defer client.Close()
  135. client.SetDeadline(time.Now().Add(10 * time.Second))
  136. if _, err = client.Write(testBuf); err != nil {
  137. t.Fatal(err)
  138. }
  139. if halfClose {
  140. if proto != "tcp" {
  141. t.Fatalf("halfClose is not supported for %s", proto)
  142. }
  143. client.(*net.TCPConn).CloseWrite()
  144. }
  145. recvBuf := make([]byte, testBufSize)
  146. if _, err = client.Read(recvBuf); err != nil {
  147. t.Fatal(err)
  148. }
  149. if !bytes.Equal(testBuf, recvBuf) {
  150. t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
  151. }
  152. }
  153. func testProxy(t *testing.T, proto string, proxy Proxy, halfClose bool) {
  154. testProxyAt(t, proto, proxy, proxy.FrontendAddr().String(), halfClose)
  155. }
  156. func testTCP4Proxy(t *testing.T, halfClose bool) {
  157. backend := NewEchoServer(t, "tcp", "127.0.0.1:0", EchoServerOptions{TCPHalfClose: halfClose})
  158. defer backend.Close()
  159. backend.Run()
  160. frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  161. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. testProxy(t, "tcp", proxy, halfClose)
  166. }
  167. func TestTCP4Proxy(t *testing.T) {
  168. testTCP4Proxy(t, false)
  169. }
  170. func TestTCP4ProxyHalfClose(t *testing.T) {
  171. testTCP4Proxy(t, true)
  172. }
  173. func TestTCP6Proxy(t *testing.T) {
  174. t.Skip("Need to start CI docker with --ipv6")
  175. backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
  176. defer backend.Close()
  177. backend.Run()
  178. frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
  179. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. testProxy(t, "tcp", proxy, false)
  184. }
  185. func TestTCPDualStackProxy(t *testing.T) {
  186. // If I understand `godoc -src net favoriteAddrFamily` (used by the
  187. // net.Listen* functions) correctly this should work, but it doesn't.
  188. t.Skip("No support for dual stack yet")
  189. backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
  190. defer backend.Close()
  191. backend.Run()
  192. frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
  193. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. ipv4ProxyAddr := &net.TCPAddr{
  198. IP: net.IPv4(127, 0, 0, 1),
  199. Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
  200. }
  201. testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String(), false)
  202. }
  203. func TestUDP4Proxy(t *testing.T) {
  204. backend := NewEchoServer(t, "udp", "127.0.0.1:0", EchoServerOptions{})
  205. defer backend.Close()
  206. backend.Run()
  207. frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  208. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. testProxy(t, "udp", proxy, false)
  213. }
  214. func TestUDP6Proxy(t *testing.T) {
  215. t.Skip("Need to start CI docker with --ipv6")
  216. backend := NewEchoServer(t, "udp", "[::1]:0", EchoServerOptions{})
  217. defer backend.Close()
  218. backend.Run()
  219. frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
  220. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  221. if err != nil {
  222. t.Fatal(err)
  223. }
  224. testProxy(t, "udp", proxy, false)
  225. }
  226. func TestUDPWriteError(t *testing.T) {
  227. frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  228. // Hopefully, this port will be free: */
  229. backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
  230. proxy, err := NewProxy(frontendAddr, backendAddr)
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. defer proxy.Close()
  235. go proxy.Run()
  236. client, err := net.Dial("udp", "127.0.0.1:25587")
  237. if err != nil {
  238. t.Fatalf("Can't connect to the proxy: %v", err)
  239. }
  240. defer client.Close()
  241. // Make sure the proxy doesn't stop when there is no actual backend:
  242. client.Write(testBuf)
  243. client.Write(testBuf)
  244. backend := NewEchoServer(t, "udp", "127.0.0.1:25587", EchoServerOptions{})
  245. defer backend.Close()
  246. backend.Run()
  247. client.SetDeadline(time.Now().Add(10 * time.Second))
  248. if _, err = client.Write(testBuf); err != nil {
  249. t.Fatal(err)
  250. }
  251. recvBuf := make([]byte, testBufSize)
  252. if _, err = client.Read(recvBuf); err != nil {
  253. t.Fatal(err)
  254. }
  255. if !bytes.Equal(testBuf, recvBuf) {
  256. t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
  257. }
  258. }
  259. func TestSCTP4Proxy(t *testing.T) {
  260. backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
  261. defer backend.Close()
  262. backend.Run()
  263. frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}, Port: 0}
  264. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  265. if err != nil {
  266. t.Fatal(err)
  267. }
  268. testProxy(t, "sctp", proxy, false)
  269. }
  270. func TestSCTP6Proxy(t *testing.T) {
  271. t.Skip("Need to start CI docker with --ipv6")
  272. backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
  273. defer backend.Close()
  274. backend.Run()
  275. frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv6loopback}}, Port: 0}
  276. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. testProxy(t, "sctp", proxy, false)
  281. }