mmsghdr_unix.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright 2017 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. //go:build aix || linux || netbsd
  5. // +build aix linux netbsd
  6. package socket
  7. import (
  8. "net"
  9. "os"
  10. "sync"
  11. "syscall"
  12. )
  13. type mmsghdrs []mmsghdr
  14. func (hs mmsghdrs) unpack(ms []Message, parseFn func([]byte, string) (net.Addr, error), hint string) error {
  15. for i := range hs {
  16. ms[i].N = int(hs[i].Len)
  17. ms[i].NN = hs[i].Hdr.controllen()
  18. ms[i].Flags = hs[i].Hdr.flags()
  19. if parseFn != nil {
  20. var err error
  21. ms[i].Addr, err = parseFn(hs[i].Hdr.name(), hint)
  22. if err != nil {
  23. return err
  24. }
  25. }
  26. }
  27. return nil
  28. }
  29. // mmsghdrsPacker packs Message-slices into mmsghdrs (re-)using pre-allocated buffers.
  30. type mmsghdrsPacker struct {
  31. // hs are the pre-allocated mmsghdrs.
  32. hs mmsghdrs
  33. // sockaddrs is the pre-allocated buffer for the Hdr.Name buffers.
  34. // We use one large buffer for all messages and slice it up.
  35. sockaddrs []byte
  36. // vs are the pre-allocated iovecs.
  37. // We allocate one large buffer for all messages and slice it up. This allows to reuse the buffer
  38. // if the number of buffers per message is distributed differently between calls.
  39. vs []iovec
  40. }
  41. func (p *mmsghdrsPacker) prepare(ms []Message) {
  42. n := len(ms)
  43. if n <= cap(p.hs) {
  44. p.hs = p.hs[:n]
  45. } else {
  46. p.hs = make(mmsghdrs, n)
  47. }
  48. if n*sizeofSockaddrInet6 <= cap(p.sockaddrs) {
  49. p.sockaddrs = p.sockaddrs[:n*sizeofSockaddrInet6]
  50. } else {
  51. p.sockaddrs = make([]byte, n*sizeofSockaddrInet6)
  52. }
  53. nb := 0
  54. for _, m := range ms {
  55. nb += len(m.Buffers)
  56. }
  57. if nb <= cap(p.vs) {
  58. p.vs = p.vs[:nb]
  59. } else {
  60. p.vs = make([]iovec, nb)
  61. }
  62. }
  63. func (p *mmsghdrsPacker) pack(ms []Message, parseFn func([]byte, string) (net.Addr, error), marshalFn func(net.Addr, []byte) int) mmsghdrs {
  64. p.prepare(ms)
  65. hs := p.hs
  66. vsRest := p.vs
  67. saRest := p.sockaddrs
  68. for i := range hs {
  69. nvs := len(ms[i].Buffers)
  70. vs := vsRest[:nvs]
  71. vsRest = vsRest[nvs:]
  72. var sa []byte
  73. if parseFn != nil {
  74. sa = saRest[:sizeofSockaddrInet6]
  75. saRest = saRest[sizeofSockaddrInet6:]
  76. } else if marshalFn != nil {
  77. n := marshalFn(ms[i].Addr, saRest)
  78. if n > 0 {
  79. sa = saRest[:n]
  80. saRest = saRest[n:]
  81. }
  82. }
  83. hs[i].Hdr.pack(vs, ms[i].Buffers, ms[i].OOB, sa)
  84. }
  85. return hs
  86. }
  87. // syscaller is a helper to invoke recvmmsg and sendmmsg via the RawConn.Read/Write interface.
  88. // It is reusable, to amortize the overhead of allocating a closure for the function passed to
  89. // RawConn.Read/Write.
  90. type syscaller struct {
  91. n int
  92. operr error
  93. hs mmsghdrs
  94. flags int
  95. boundRecvmmsgF func(uintptr) bool
  96. boundSendmmsgF func(uintptr) bool
  97. }
  98. func (r *syscaller) init() {
  99. r.boundRecvmmsgF = r.recvmmsgF
  100. r.boundSendmmsgF = r.sendmmsgF
  101. }
  102. func (r *syscaller) recvmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
  103. r.n = 0
  104. r.operr = nil
  105. r.hs = hs
  106. r.flags = flags
  107. if err := c.Read(r.boundRecvmmsgF); err != nil {
  108. return r.n, err
  109. }
  110. if r.operr != nil {
  111. return r.n, os.NewSyscallError("recvmmsg", r.operr)
  112. }
  113. return r.n, nil
  114. }
  115. func (r *syscaller) recvmmsgF(s uintptr) bool {
  116. r.n, r.operr = recvmmsg(s, r.hs, r.flags)
  117. return ioComplete(r.flags, r.operr)
  118. }
  119. func (r *syscaller) sendmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
  120. r.n = 0
  121. r.operr = nil
  122. r.hs = hs
  123. r.flags = flags
  124. if err := c.Write(r.boundSendmmsgF); err != nil {
  125. return r.n, err
  126. }
  127. if r.operr != nil {
  128. return r.n, os.NewSyscallError("sendmmsg", r.operr)
  129. }
  130. return r.n, nil
  131. }
  132. func (r *syscaller) sendmmsgF(s uintptr) bool {
  133. r.n, r.operr = sendmmsg(s, r.hs, r.flags)
  134. return ioComplete(r.flags, r.operr)
  135. }
  136. // mmsgTmps holds reusable temporary helpers for recvmmsg and sendmmsg.
  137. type mmsgTmps struct {
  138. packer mmsghdrsPacker
  139. syscaller syscaller
  140. }
  141. var defaultMmsgTmpsPool = mmsgTmpsPool{
  142. p: sync.Pool{
  143. New: func() interface{} {
  144. tmps := new(mmsgTmps)
  145. tmps.syscaller.init()
  146. return tmps
  147. },
  148. },
  149. }
  150. type mmsgTmpsPool struct {
  151. p sync.Pool
  152. }
  153. func (p *mmsgTmpsPool) Get() *mmsgTmps {
  154. return p.p.Get().(*mmsgTmps)
  155. }
  156. func (p *mmsgTmpsPool) Put(tmps *mmsgTmps) {
  157. p.p.Put(tmps)
  158. }