unix.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package beam
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "syscall"
  7. )
  8. // Send sends a new message on conn with data and f as payload and
  9. // attachment, respectively.
  10. func Send(conn *net.UnixConn, data []byte, f *os.File) error {
  11. var fds []int
  12. if f != nil {
  13. fds = append(fds, int(f.Fd()))
  14. }
  15. return sendUnix(conn, data, fds...)
  16. }
  17. // Receive waits for a new message on conn, and receives its payload
  18. // and attachment, or an error if any.
  19. //
  20. // If more than 1 file descriptor is sent in the message, they are all
  21. // closed except for the first, which is the attachment.
  22. // It is legal for a message to have no attachment or an empty payload.
  23. func Receive(conn *net.UnixConn) ([]byte, *os.File, error) {
  24. for {
  25. data, fds, err := receiveUnix(conn)
  26. if err != nil {
  27. return nil, nil, err
  28. }
  29. var f *os.File
  30. if len(fds) > 1 {
  31. for _, fd := range fds {
  32. syscall.Close(fd)
  33. }
  34. }
  35. if len(fds) >= 1 {
  36. f = os.NewFile(uintptr(fds[0]), "")
  37. }
  38. return data, f, nil
  39. }
  40. panic("impossibru")
  41. return nil, nil, nil
  42. }
  43. // SendPipe creates a new unix socket pair, sends one end as the attachment
  44. // to a beam message with the payload `data`, and returns the other end.
  45. //
  46. // This is a common pattern to open a new service endpoint.
  47. // For example, a service wishing to advertise its presence to clients might
  48. // open an endpoint with:
  49. //
  50. // endpoint, _ := SendPipe(conn, []byte("sql"))
  51. // defer endpoint.Close()
  52. // for {
  53. // conn, _ := endpoint.Receive()
  54. // go func() {
  55. // Handle(conn)
  56. // conn.Close()
  57. // }()
  58. // }
  59. //
  60. // Note that beam does not distinguish between clients and servers in the logical
  61. // sense: any program wishing to establishing a communication with another program
  62. // may use SendPipe() to create an endpoint.
  63. // For example, here is how an application might use it to connect to a database client.
  64. //
  65. // endpoint, _ := SendPipe(conn, []byte("userdb"))
  66. // defer endpoint.Close()
  67. // conn, _ := endpoint.Receive()
  68. // defer conn.Close()
  69. // db := NewDBClient(conn)
  70. //
  71. // In this example note that we only need the first connection out of the endpoint,
  72. // but we could open new ones to retry after a broken connection.
  73. // Note that, because the underlying service transport is abstracted away, this
  74. // allows for arbitrarily complex service discovery and retry logic to take place,
  75. // without complicating application code.
  76. //
  77. func SendPipe(conn *net.UnixConn, data []byte) (endpoint *net.UnixConn, err error) {
  78. local, remote, err := SocketPair()
  79. if err != nil {
  80. return nil, err
  81. }
  82. defer func() {
  83. if err != nil {
  84. local.Close()
  85. remote.Close()
  86. }
  87. }()
  88. endpoint, err = FdConn(int(local.Fd()))
  89. if err != nil {
  90. return nil, err
  91. }
  92. if err := Send(conn, data, remote); err != nil {
  93. return nil, err
  94. }
  95. return endpoint, nil
  96. }
  97. func receiveUnix(conn *net.UnixConn) ([]byte, []int, error) {
  98. buf := make([]byte, 4096)
  99. oob := make([]byte, 4096)
  100. bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
  101. if err != nil {
  102. return nil, nil, err
  103. }
  104. return buf[:bufn], extractFds(oob[:oobn]), nil
  105. }
  106. func sendUnix(conn *net.UnixConn, data []byte, fds ...int) error {
  107. _, _, err := conn.WriteMsgUnix(data, syscall.UnixRights(fds...), nil)
  108. if err == nil {
  109. for _, fd := range fds {
  110. fmt.Printf("Closing sent fd %v\n", fd)
  111. syscall.Close(fd)
  112. }
  113. }
  114. return err
  115. }
  116. func extractFds(oob []byte) (fds []int) {
  117. scms, err := syscall.ParseSocketControlMessage(oob)
  118. if err != nil {
  119. return
  120. }
  121. for _, scm := range scms {
  122. gotFds, err := syscall.ParseUnixRights(&scm)
  123. if err != nil {
  124. continue
  125. }
  126. fds = append(fds, gotFds...)
  127. }
  128. return
  129. }
  130. func socketpair() ([2]int, error) {
  131. return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
  132. }
  133. // SocketPair is a convenience wrapper around the socketpair(2) syscall.
  134. // It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
  135. // not bound to the underlying filesystem.
  136. // Messages sent on one end are received on the other, and vice-versa.
  137. // It is the caller's responsibility to close both ends.
  138. func SocketPair() (*os.File, *os.File, error) {
  139. pair, err := socketpair()
  140. if err != nil {
  141. return nil, nil, err
  142. }
  143. return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
  144. }
  145. func USocketPair() (*net.UnixConn, *net.UnixConn, error) {
  146. a, b, err := SocketPair()
  147. if err != nil {
  148. return nil, nil, err
  149. }
  150. fmt.Printf("SocketPair() = %v, %v\n", a.Fd(), b.Fd())
  151. uA, err := FdConn(int(a.Fd()))
  152. if err != nil {
  153. a.Close()
  154. b.Close()
  155. return nil, nil, err
  156. }
  157. uB, err := FdConn(int(b.Fd()))
  158. if err != nil {
  159. a.Close()
  160. b.Close()
  161. return nil, nil, err
  162. }
  163. return uA, uB, nil
  164. }
  165. // FdConn wraps a file descriptor in a standard *net.UnixConn object, or
  166. // returns an error if the file descriptor does not point to a unix socket.
  167. func FdConn(fd int) (*net.UnixConn, error) {
  168. f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
  169. conn, err := net.FileConn(f)
  170. if err != nil {
  171. return nil, err
  172. }
  173. f.Close()
  174. uconn, ok := conn.(*net.UnixConn)
  175. if !ok {
  176. conn.Close()
  177. return nil, fmt.Errorf("%d: not a unix connection", fd)
  178. }
  179. return uconn, nil
  180. }