network_proxy_test.go 8.3 KB


  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net"
  8. "runtime"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/ishidawataru/sctp"
  13. "gotest.tools/v3/skip"
  14. // this takes care of the incontainer flag
  15. _ "github.com/docker/docker/libnetwork/testutils"
  16. )
  17. var testBuf = []byte("Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo")
  18. var testBufSize = len(testBuf)
  19. type EchoServer interface {
  20. Run()
  21. Close()
  22. LocalAddr() net.Addr
  23. }
  24. type EchoServerOptions struct {
  25. TCPHalfClose bool
  26. }
  27. type StreamEchoServer struct {
  28. listener net.Listener
  29. testCtx *testing.T
  30. opts EchoServerOptions
  31. }
  32. type UDPEchoServer struct {
  33. conn net.PacketConn
  34. testCtx *testing.T
  35. }
  36. func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
  37. var server EchoServer
  38. if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
  39. t.Fatalf("TCPHalfClose is not supported for %s", proto)
  40. }
  41. switch {
  42. case strings.HasPrefix(proto, "tcp"):
  43. listener, err := net.Listen(proto, address)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. server = &StreamEchoServer{listener: listener, testCtx: t, opts: opts}
  48. case strings.HasPrefix(proto, "udp"):
  49. socket, err := net.ListenPacket(proto, address)
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. server = &UDPEchoServer{conn: socket, testCtx: t}
  54. case strings.HasPrefix(proto, "sctp"):
  55. addr, err := sctp.ResolveSCTPAddr(proto, address)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. listener, err := sctp.ListenSCTP(proto, addr)
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. server = &StreamEchoServer{listener: listener, testCtx: t}
  64. default:
  65. t.Fatalf("unknown protocol: %s", proto)
  66. }
  67. return server
  68. }
  69. func (server *StreamEchoServer) Run() {
  70. go func() {
  71. for {
  72. client, err := server.listener.Accept()
  73. if err != nil {
  74. return
  75. }
  76. go func(client net.Conn) {
  77. if server.opts.TCPHalfClose {
  78. data, err := ioutil.ReadAll(client)
  79. if err != nil {
  80. server.testCtx.Logf("io.ReadAll() failed for the client: %v\n", err.Error())
  81. }
  82. if _, err := client.Write(data); err != nil {
  83. server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
  84. }
  85. client.(*net.TCPConn).CloseWrite()
  86. } else {
  87. if _, err := io.Copy(client, client); err != nil {
  88. server.testCtx.Logf("can't echo to the client: %v\n", err.Error())
  89. }
  90. client.Close()
  91. }
  92. }(client)
  93. }
  94. }()
  95. }
  96. func (server *StreamEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
  97. func (server *StreamEchoServer) Close() { server.listener.Close() }
  98. func (server *UDPEchoServer) Run() {
  99. go func() {
  100. readBuf := make([]byte, 1024)
  101. for {
  102. read, from, err := server.conn.ReadFrom(readBuf)
  103. if err != nil {
  104. return
  105. }
  106. for i := 0; i != read; {
  107. written, err := server.conn.WriteTo(readBuf[i:read], from)
  108. if err != nil {
  109. break
  110. }
  111. i += written
  112. }
  113. }
  114. }()
  115. }
  116. func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
  117. func (server *UDPEchoServer) Close() { server.conn.Close() }
  118. func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
  119. defer proxy.Close()
  120. go proxy.Run()
  121. var client net.Conn
  122. var err error
  123. if strings.HasPrefix(proto, "sctp") {
  124. var a *sctp.SCTPAddr
  125. a, err = sctp.ResolveSCTPAddr(proto, addr)
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. client, err = sctp.DialSCTP(proto, nil, a)
  130. } else {
  131. client, err = net.Dial(proto, addr)
  132. }
  133. if err != nil {
  134. t.Fatalf("Can't connect to the proxy: %v", err)
  135. }
  136. defer client.Close()
  137. client.SetDeadline(time.Now().Add(10 * time.Second))
  138. if _, err = client.Write(testBuf); err != nil {
  139. t.Fatal(err)
  140. }
  141. if halfClose {
  142. if proto != "tcp" {
  143. t.Fatalf("halfClose is not supported for %s", proto)
  144. }
  145. client.(*net.TCPConn).CloseWrite()
  146. }
  147. recvBuf := make([]byte, testBufSize)
  148. if _, err = client.Read(recvBuf); err != nil {
  149. t.Fatal(err)
  150. }
  151. if !bytes.Equal(testBuf, recvBuf) {
  152. t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
  153. }
  154. }
  155. func testProxy(t *testing.T, proto string, proxy Proxy, halfClose bool) {
  156. testProxyAt(t, proto, proxy, proxy.FrontendAddr().String(), halfClose)
  157. }
  158. func testTCP4Proxy(t *testing.T, halfClose bool) {
  159. backend := NewEchoServer(t, "tcp", "127.0.0.1:0", EchoServerOptions{TCPHalfClose: halfClose})
  160. defer backend.Close()
  161. backend.Run()
  162. frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  163. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  164. if err != nil {
  165. t.Fatal(err)
  166. }
  167. testProxy(t, "tcp", proxy, halfClose)
  168. }
  169. func TestTCP4Proxy(t *testing.T) {
  170. testTCP4Proxy(t, false)
  171. }
  172. func TestTCP4ProxyHalfClose(t *testing.T) {
  173. testTCP4Proxy(t, true)
  174. }
  175. func TestTCP6Proxy(t *testing.T) {
  176. t.Skip("Need to start CI docker with --ipv6")
  177. backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
  178. defer backend.Close()
  179. backend.Run()
  180. frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
  181. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  182. if err != nil {
  183. t.Fatal(err)
  184. }
  185. testProxy(t, "tcp", proxy, false)
  186. }
  187. func TestTCPDualStackProxy(t *testing.T) {
  188. // If I understand `godoc -src net favoriteAddrFamily` (used by the
  189. // net.Listen* functions) correctly this should work, but it doesn't.
  190. t.Skip("No support for dual stack yet")
  191. backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
  192. defer backend.Close()
  193. backend.Run()
  194. frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
  195. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  196. if err != nil {
  197. t.Fatal(err)
  198. }
  199. ipv4ProxyAddr := &net.TCPAddr{
  200. IP: net.IPv4(127, 0, 0, 1),
  201. Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
  202. }
  203. testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String(), false)
  204. }
  205. func TestUDP4Proxy(t *testing.T) {
  206. backend := NewEchoServer(t, "udp", "127.0.0.1:0", EchoServerOptions{})
  207. defer backend.Close()
  208. backend.Run()
  209. frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  210. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  211. if err != nil {
  212. t.Fatal(err)
  213. }
  214. testProxy(t, "udp", proxy, false)
  215. }
  216. func TestUDP6Proxy(t *testing.T) {
  217. t.Skip("Need to start CI docker with --ipv6")
  218. backend := NewEchoServer(t, "udp", "[::1]:0", EchoServerOptions{})
  219. defer backend.Close()
  220. backend.Run()
  221. frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
  222. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. testProxy(t, "udp", proxy, false)
  227. }
  228. func TestUDPWriteError(t *testing.T) {
  229. frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  230. // Hopefully, this port will be free: */
  231. backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
  232. proxy, err := NewProxy(frontendAddr, backendAddr)
  233. if err != nil {
  234. t.Fatal(err)
  235. }
  236. defer proxy.Close()
  237. go proxy.Run()
  238. client, err := net.Dial("udp", "127.0.0.1:25587")
  239. if err != nil {
  240. t.Fatalf("Can't connect to the proxy: %v", err)
  241. }
  242. defer client.Close()
  243. // Make sure the proxy doesn't stop when there is no actual backend:
  244. client.Write(testBuf)
  245. client.Write(testBuf)
  246. backend := NewEchoServer(t, "udp", "127.0.0.1:25587", EchoServerOptions{})
  247. defer backend.Close()
  248. backend.Run()
  249. client.SetDeadline(time.Now().Add(10 * time.Second))
  250. if _, err = client.Write(testBuf); err != nil {
  251. t.Fatal(err)
  252. }
  253. recvBuf := make([]byte, testBufSize)
  254. if _, err = client.Read(recvBuf); err != nil {
  255. t.Fatal(err)
  256. }
  257. if !bytes.Equal(testBuf, recvBuf) {
  258. t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
  259. }
  260. }
  261. func TestSCTP4Proxy(t *testing.T) {
  262. skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
  263. backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
  264. defer backend.Close()
  265. backend.Run()
  266. frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}, Port: 0}
  267. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. testProxy(t, "sctp", proxy, false)
  272. }
  273. func TestSCTP6Proxy(t *testing.T) {
  274. t.Skip("Need to start CI docker with --ipv6")
  275. skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
  276. backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
  277. defer backend.Close()
  278. backend.Run()
  279. frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv6loopback}}, Port: 0}
  280. proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. testProxy(t, "sctp", proxy, false)
  285. }