pipe.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // +build windows
  2. package winio
  3. import (
  4. "errors"
  5. "io"
  6. "net"
  7. "os"
  8. "syscall"
  9. "time"
  10. "unsafe"
  11. )
  12. //sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe
  13. //sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW
  14. //sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW
  15. //sys waitNamedPipe(name string, timeout uint32) (err error) = WaitNamedPipeW
  16. //sys getNamedPipeInfo(pipe syscall.Handle, flags *uint32, outSize *uint32, inSize *uint32, maxInstances *uint32) (err error) = GetNamedPipeInfo
  17. //sys getNamedPipeHandleState(pipe syscall.Handle, state *uint32, curInstances *uint32, maxCollectionCount *uint32, collectDataTimeout *uint32, userName *uint16, maxUserNameSize uint32) (err error) = GetNamedPipeHandleStateW
  18. //sys localAlloc(uFlags uint32, length uint32) (ptr uintptr) = LocalAlloc
  19. const (
  20. cERROR_PIPE_BUSY = syscall.Errno(231)
  21. cERROR_PIPE_CONNECTED = syscall.Errno(535)
  22. cERROR_SEM_TIMEOUT = syscall.Errno(121)
  23. cPIPE_ACCESS_DUPLEX = 0x3
  24. cFILE_FLAG_FIRST_PIPE_INSTANCE = 0x80000
  25. cSECURITY_SQOS_PRESENT = 0x100000
  26. cSECURITY_ANONYMOUS = 0
  27. cPIPE_REJECT_REMOTE_CLIENTS = 0x8
  28. cPIPE_UNLIMITED_INSTANCES = 255
  29. cNMPWAIT_USE_DEFAULT_WAIT = 0
  30. cNMPWAIT_NOWAIT = 1
  31. cPIPE_TYPE_MESSAGE = 4
  32. cPIPE_READMODE_MESSAGE = 2
  33. )
  34. var (
  35. // ErrPipeListenerClosed is returned for pipe operations on listeners that have been closed.
  36. // This error should match net.errClosing since docker takes a dependency on its text.
  37. ErrPipeListenerClosed = errors.New("use of closed network connection")
  38. errPipeWriteClosed = errors.New("pipe has been closed for write")
  39. )
  40. type win32Pipe struct {
  41. *win32File
  42. path string
  43. }
  44. type win32MessageBytePipe struct {
  45. win32Pipe
  46. writeClosed bool
  47. readEOF bool
  48. }
  49. type pipeAddress string
  50. func (f *win32Pipe) LocalAddr() net.Addr {
  51. return pipeAddress(f.path)
  52. }
  53. func (f *win32Pipe) RemoteAddr() net.Addr {
  54. return pipeAddress(f.path)
  55. }
  56. func (f *win32Pipe) SetDeadline(t time.Time) error {
  57. f.SetReadDeadline(t)
  58. f.SetWriteDeadline(t)
  59. return nil
  60. }
  61. // CloseWrite closes the write side of a message pipe in byte mode.
  62. func (f *win32MessageBytePipe) CloseWrite() error {
  63. if f.writeClosed {
  64. return errPipeWriteClosed
  65. }
  66. err := f.win32File.Flush()
  67. if err != nil {
  68. return err
  69. }
  70. _, err = f.win32File.Write(nil)
  71. if err != nil {
  72. return err
  73. }
  74. f.writeClosed = true
  75. return nil
  76. }
  77. // Write writes bytes to a message pipe in byte mode. Zero-byte writes are ignored, since
  78. // they are used to implement CloseWrite().
  79. func (f *win32MessageBytePipe) Write(b []byte) (int, error) {
  80. if f.writeClosed {
  81. return 0, errPipeWriteClosed
  82. }
  83. if len(b) == 0 {
  84. return 0, nil
  85. }
  86. return f.win32File.Write(b)
  87. }
  88. // Read reads bytes from a message pipe in byte mode. A read of a zero-byte message on a message
  89. // mode pipe will return io.EOF, as will all subsequent reads.
  90. func (f *win32MessageBytePipe) Read(b []byte) (int, error) {
  91. if f.readEOF {
  92. return 0, io.EOF
  93. }
  94. n, err := f.win32File.Read(b)
  95. if err == io.EOF {
  96. // If this was the result of a zero-byte read, then
  97. // it is possible that the read was due to a zero-size
  98. // message. Since we are simulating CloseWrite with a
  99. // zero-byte message, ensure that all future Read() calls
  100. // also return EOF.
  101. f.readEOF = true
  102. }
  103. return n, err
  104. }
  105. func (s pipeAddress) Network() string {
  106. return "pipe"
  107. }
  108. func (s pipeAddress) String() string {
  109. return string(s)
  110. }
  111. // DialPipe connects to a named pipe by path, timing out if the connection
  112. // takes longer than the specified duration. If timeout is nil, then the timeout
  113. // is the default timeout established by the pipe server.
  114. func DialPipe(path string, timeout *time.Duration) (net.Conn, error) {
  115. var absTimeout time.Time
  116. if timeout != nil {
  117. absTimeout = time.Now().Add(*timeout)
  118. }
  119. var err error
  120. var h syscall.Handle
  121. for {
  122. h, err = createFile(path, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.OPEN_EXISTING, syscall.FILE_FLAG_OVERLAPPED|cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  123. if err != cERROR_PIPE_BUSY {
  124. break
  125. }
  126. now := time.Now()
  127. var ms uint32
  128. if absTimeout.IsZero() {
  129. ms = cNMPWAIT_USE_DEFAULT_WAIT
  130. } else if now.After(absTimeout) {
  131. ms = cNMPWAIT_NOWAIT
  132. } else {
  133. ms = uint32(absTimeout.Sub(now).Nanoseconds() / 1000 / 1000)
  134. }
  135. err = waitNamedPipe(path, ms)
  136. if err != nil {
  137. if err == cERROR_SEM_TIMEOUT {
  138. return nil, ErrTimeout
  139. }
  140. break
  141. }
  142. }
  143. if err != nil {
  144. return nil, &os.PathError{Op: "open", Path: path, Err: err}
  145. }
  146. var flags uint32
  147. err = getNamedPipeInfo(h, &flags, nil, nil, nil)
  148. if err != nil {
  149. return nil, err
  150. }
  151. var state uint32
  152. err = getNamedPipeHandleState(h, &state, nil, nil, nil, nil, 0)
  153. if err != nil {
  154. return nil, err
  155. }
  156. if state&cPIPE_READMODE_MESSAGE != 0 {
  157. return nil, &os.PathError{Op: "open", Path: path, Err: errors.New("message readmode pipes not supported")}
  158. }
  159. f, err := makeWin32File(h)
  160. if err != nil {
  161. syscall.Close(h)
  162. return nil, err
  163. }
  164. // If the pipe is in message mode, return a message byte pipe, which
  165. // supports CloseWrite().
  166. if flags&cPIPE_TYPE_MESSAGE != 0 {
  167. return &win32MessageBytePipe{
  168. win32Pipe: win32Pipe{win32File: f, path: path},
  169. }, nil
  170. }
  171. return &win32Pipe{win32File: f, path: path}, nil
  172. }
  173. type acceptResponse struct {
  174. f *win32File
  175. err error
  176. }
  177. type win32PipeListener struct {
  178. firstHandle syscall.Handle
  179. path string
  180. securityDescriptor []byte
  181. config PipeConfig
  182. acceptCh chan (chan acceptResponse)
  183. closeCh chan int
  184. doneCh chan int
  185. }
  186. func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
  187. var flags uint32 = cPIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
  188. if first {
  189. flags |= cFILE_FLAG_FIRST_PIPE_INSTANCE
  190. }
  191. var mode uint32 = cPIPE_REJECT_REMOTE_CLIENTS
  192. if c.MessageMode {
  193. mode |= cPIPE_TYPE_MESSAGE
  194. }
  195. sa := &syscall.SecurityAttributes{}
  196. sa.Length = uint32(unsafe.Sizeof(*sa))
  197. if securityDescriptor != nil {
  198. len := uint32(len(securityDescriptor))
  199. sa.SecurityDescriptor = localAlloc(0, len)
  200. defer localFree(sa.SecurityDescriptor)
  201. copy((*[0xffff]byte)(unsafe.Pointer(sa.SecurityDescriptor))[:], securityDescriptor)
  202. }
  203. h, err := createNamedPipe(path, flags, mode, cPIPE_UNLIMITED_INSTANCES, uint32(c.OutputBufferSize), uint32(c.InputBufferSize), 0, sa)
  204. if err != nil {
  205. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  206. }
  207. return h, nil
  208. }
  209. func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
  210. h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false)
  211. if err != nil {
  212. return nil, err
  213. }
  214. f, err := makeWin32File(h)
  215. if err != nil {
  216. syscall.Close(h)
  217. return nil, err
  218. }
  219. return f, nil
  220. }
  221. func (l *win32PipeListener) listenerRoutine() {
  222. closed := false
  223. for !closed {
  224. select {
  225. case <-l.closeCh:
  226. closed = true
  227. case responseCh := <-l.acceptCh:
  228. p, err := l.makeServerPipe()
  229. if err == nil {
  230. // Wait for the client to connect.
  231. ch := make(chan error)
  232. go func() {
  233. ch <- connectPipe(p)
  234. }()
  235. select {
  236. case err = <-ch:
  237. if err != nil {
  238. p.Close()
  239. p = nil
  240. }
  241. case <-l.closeCh:
  242. // Abort the connect request by closing the handle.
  243. p.Close()
  244. p = nil
  245. err = <-ch
  246. if err == nil || err == ErrFileClosed {
  247. err = ErrPipeListenerClosed
  248. }
  249. closed = true
  250. }
  251. }
  252. responseCh <- acceptResponse{p, err}
  253. }
  254. }
  255. syscall.Close(l.firstHandle)
  256. l.firstHandle = 0
  257. // Notify Close() and Accept() callers that the handle has been closed.
  258. close(l.doneCh)
  259. }
  260. // PipeConfig contain configuration for the pipe listener.
  261. type PipeConfig struct {
  262. // SecurityDescriptor contains a Windows security descriptor in SDDL format.
  263. SecurityDescriptor string
  264. // MessageMode determines whether the pipe is in byte or message mode. In either
  265. // case the pipe is read in byte mode by default. The only practical difference in
  266. // this implementation is that CloseWrite() is only supported for message mode pipes;
  267. // CloseWrite() is implemented as a zero-byte write, but zero-byte writes are only
  268. // transferred to the reader (and returned as io.EOF in this implementation)
  269. // when the pipe is in message mode.
  270. MessageMode bool
  271. // InputBufferSize specifies the size the input buffer, in bytes.
  272. InputBufferSize int32
  273. // OutputBufferSize specifies the size the input buffer, in bytes.
  274. OutputBufferSize int32
  275. }
  276. // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
  277. // The pipe must not already exist.
  278. func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
  279. var (
  280. sd []byte
  281. err error
  282. )
  283. if c == nil {
  284. c = &PipeConfig{}
  285. }
  286. if c.SecurityDescriptor != "" {
  287. sd, err = SddlToSecurityDescriptor(c.SecurityDescriptor)
  288. if err != nil {
  289. return nil, err
  290. }
  291. }
  292. h, err := makeServerPipeHandle(path, sd, c, true)
  293. if err != nil {
  294. return nil, err
  295. }
  296. // Immediately open and then close a client handle so that the named pipe is
  297. // created but not currently accepting connections.
  298. h2, err := createFile(path, 0, 0, nil, syscall.OPEN_EXISTING, cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  299. if err != nil {
  300. syscall.Close(h)
  301. return nil, err
  302. }
  303. syscall.Close(h2)
  304. l := &win32PipeListener{
  305. firstHandle: h,
  306. path: path,
  307. securityDescriptor: sd,
  308. config: *c,
  309. acceptCh: make(chan (chan acceptResponse)),
  310. closeCh: make(chan int),
  311. doneCh: make(chan int),
  312. }
  313. go l.listenerRoutine()
  314. return l, nil
  315. }
  316. func connectPipe(p *win32File) error {
  317. c, err := p.prepareIo()
  318. if err != nil {
  319. return err
  320. }
  321. defer p.wg.Done()
  322. err = connectNamedPipe(p.handle, &c.o)
  323. _, err = p.asyncIo(c, nil, 0, err)
  324. if err != nil && err != cERROR_PIPE_CONNECTED {
  325. return err
  326. }
  327. return nil
  328. }
  329. func (l *win32PipeListener) Accept() (net.Conn, error) {
  330. ch := make(chan acceptResponse)
  331. select {
  332. case l.acceptCh <- ch:
  333. response := <-ch
  334. err := response.err
  335. if err != nil {
  336. return nil, err
  337. }
  338. if l.config.MessageMode {
  339. return &win32MessageBytePipe{
  340. win32Pipe: win32Pipe{win32File: response.f, path: l.path},
  341. }, nil
  342. }
  343. return &win32Pipe{win32File: response.f, path: l.path}, nil
  344. case <-l.doneCh:
  345. return nil, ErrPipeListenerClosed
  346. }
  347. }
  348. func (l *win32PipeListener) Close() error {
  349. select {
  350. case l.closeCh <- 1:
  351. <-l.doneCh
  352. case <-l.doneCh:
  353. }
  354. return nil
  355. }
  356. func (l *win32PipeListener) Addr() net.Addr {
  357. return pipeAddress(l.path)
  358. }