file.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. //go:build windows
  2. // +build windows
  3. package winio
  4. import (
  5. "errors"
  6. "io"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "syscall"
  11. "time"
  12. "golang.org/x/sys/windows"
  13. )
  14. //sys cancelIoEx(file syscall.Handle, o *syscall.Overlapped) (err error) = CancelIoEx
  15. //sys createIoCompletionPort(file syscall.Handle, port syscall.Handle, key uintptr, threadCount uint32) (newport syscall.Handle, err error) = CreateIoCompletionPort
  16. //sys getQueuedCompletionStatus(port syscall.Handle, bytes *uint32, key *uintptr, o **ioOperation, timeout uint32) (err error) = GetQueuedCompletionStatus
  17. //sys setFileCompletionNotificationModes(h syscall.Handle, flags uint8) (err error) = SetFileCompletionNotificationModes
  18. //sys wsaGetOverlappedResult(h syscall.Handle, o *syscall.Overlapped, bytes *uint32, wait bool, flags *uint32) (err error) = ws2_32.WSAGetOverlappedResult
  19. type atomicBool int32
  20. func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
  21. func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
  22. func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
  23. //revive:disable-next-line:predeclared Keep "new" to maintain consistency with "atomic" pkg
  24. func (b *atomicBool) swap(new bool) bool {
  25. var newInt int32
  26. if new {
  27. newInt = 1
  28. }
  29. return atomic.SwapInt32((*int32)(b), newInt) == 1
  30. }
  31. var (
  32. ErrFileClosed = errors.New("file has already been closed")
  33. ErrTimeout = &timeoutError{}
  34. )
  35. type timeoutError struct{}
  36. func (*timeoutError) Error() string { return "i/o timeout" }
  37. func (*timeoutError) Timeout() bool { return true }
  38. func (*timeoutError) Temporary() bool { return true }
  39. type timeoutChan chan struct{}
  40. var ioInitOnce sync.Once
  41. var ioCompletionPort syscall.Handle
  42. // ioResult contains the result of an asynchronous IO operation.
  43. type ioResult struct {
  44. bytes uint32
  45. err error
  46. }
  47. // ioOperation represents an outstanding asynchronous Win32 IO.
  48. type ioOperation struct {
  49. o syscall.Overlapped
  50. ch chan ioResult
  51. }
  52. func initIO() {
  53. h, err := createIoCompletionPort(syscall.InvalidHandle, 0, 0, 0xffffffff)
  54. if err != nil {
  55. panic(err)
  56. }
  57. ioCompletionPort = h
  58. go ioCompletionProcessor(h)
  59. }
  60. // win32File implements Reader, Writer, and Closer on a Win32 handle without blocking in a syscall.
  61. // It takes ownership of this handle and will close it if it is garbage collected.
  62. type win32File struct {
  63. handle syscall.Handle
  64. wg sync.WaitGroup
  65. wgLock sync.RWMutex
  66. closing atomicBool
  67. socket bool
  68. readDeadline deadlineHandler
  69. writeDeadline deadlineHandler
  70. }
  71. type deadlineHandler struct {
  72. setLock sync.Mutex
  73. channel timeoutChan
  74. channelLock sync.RWMutex
  75. timer *time.Timer
  76. timedout atomicBool
  77. }
  78. // makeWin32File makes a new win32File from an existing file handle.
  79. func makeWin32File(h syscall.Handle) (*win32File, error) {
  80. f := &win32File{handle: h}
  81. ioInitOnce.Do(initIO)
  82. _, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff)
  83. if err != nil {
  84. return nil, err
  85. }
  86. err = setFileCompletionNotificationModes(h, windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS|windows.FILE_SKIP_SET_EVENT_ON_HANDLE)
  87. if err != nil {
  88. return nil, err
  89. }
  90. f.readDeadline.channel = make(timeoutChan)
  91. f.writeDeadline.channel = make(timeoutChan)
  92. return f, nil
  93. }
  94. func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
  95. // If we return the result of makeWin32File directly, it can result in an
  96. // interface-wrapped nil, rather than a nil interface value.
  97. f, err := makeWin32File(h)
  98. if err != nil {
  99. return nil, err
  100. }
  101. return f, nil
  102. }
  103. // closeHandle closes the resources associated with a Win32 handle.
  104. func (f *win32File) closeHandle() {
  105. f.wgLock.Lock()
  106. // Atomically set that we are closing, releasing the resources only once.
  107. if !f.closing.swap(true) {
  108. f.wgLock.Unlock()
  109. // cancel all IO and wait for it to complete
  110. _ = cancelIoEx(f.handle, nil)
  111. f.wg.Wait()
  112. // at this point, no new IO can start
  113. syscall.Close(f.handle)
  114. f.handle = 0
  115. } else {
  116. f.wgLock.Unlock()
  117. }
  118. }
  119. // Close closes a win32File.
  120. func (f *win32File) Close() error {
  121. f.closeHandle()
  122. return nil
  123. }
  124. // IsClosed checks if the file has been closed.
  125. func (f *win32File) IsClosed() bool {
  126. return f.closing.isSet()
  127. }
  128. // prepareIO prepares for a new IO operation.
  129. // The caller must call f.wg.Done() when the IO is finished, prior to Close() returning.
  130. func (f *win32File) prepareIO() (*ioOperation, error) {
  131. f.wgLock.RLock()
  132. if f.closing.isSet() {
  133. f.wgLock.RUnlock()
  134. return nil, ErrFileClosed
  135. }
  136. f.wg.Add(1)
  137. f.wgLock.RUnlock()
  138. c := &ioOperation{}
  139. c.ch = make(chan ioResult)
  140. return c, nil
  141. }
  142. // ioCompletionProcessor processes completed async IOs forever.
  143. func ioCompletionProcessor(h syscall.Handle) {
  144. for {
  145. var bytes uint32
  146. var key uintptr
  147. var op *ioOperation
  148. err := getQueuedCompletionStatus(h, &bytes, &key, &op, syscall.INFINITE)
  149. if op == nil {
  150. panic(err)
  151. }
  152. op.ch <- ioResult{bytes, err}
  153. }
  154. }
  155. // todo: helsaawy - create an asyncIO version that takes a context
  156. // asyncIO processes the return value from ReadFile or WriteFile, blocking until
  157. // the operation has actually completed.
  158. func (f *win32File) asyncIO(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) {
  159. if err != syscall.ERROR_IO_PENDING { //nolint:errorlint // err is Errno
  160. return int(bytes), err
  161. }
  162. if f.closing.isSet() {
  163. _ = cancelIoEx(f.handle, &c.o)
  164. }
  165. var timeout timeoutChan
  166. if d != nil {
  167. d.channelLock.Lock()
  168. timeout = d.channel
  169. d.channelLock.Unlock()
  170. }
  171. var r ioResult
  172. select {
  173. case r = <-c.ch:
  174. err = r.err
  175. if err == syscall.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
  176. if f.closing.isSet() {
  177. err = ErrFileClosed
  178. }
  179. } else if err != nil && f.socket {
  180. // err is from Win32. Query the overlapped structure to get the winsock error.
  181. var bytes, flags uint32
  182. err = wsaGetOverlappedResult(f.handle, &c.o, &bytes, false, &flags)
  183. }
  184. case <-timeout:
  185. _ = cancelIoEx(f.handle, &c.o)
  186. r = <-c.ch
  187. err = r.err
  188. if err == syscall.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
  189. err = ErrTimeout
  190. }
  191. }
  192. // runtime.KeepAlive is needed, as c is passed via native
  193. // code to ioCompletionProcessor, c must remain alive
  194. // until the channel read is complete.
  195. // todo: (de)allocate *ioOperation via win32 heap functions, instead of needing to KeepAlive?
  196. runtime.KeepAlive(c)
  197. return int(r.bytes), err
  198. }
  199. // Read reads from a file handle.
  200. func (f *win32File) Read(b []byte) (int, error) {
  201. c, err := f.prepareIO()
  202. if err != nil {
  203. return 0, err
  204. }
  205. defer f.wg.Done()
  206. if f.readDeadline.timedout.isSet() {
  207. return 0, ErrTimeout
  208. }
  209. var bytes uint32
  210. err = syscall.ReadFile(f.handle, b, &bytes, &c.o)
  211. n, err := f.asyncIO(c, &f.readDeadline, bytes, err)
  212. runtime.KeepAlive(b)
  213. // Handle EOF conditions.
  214. if err == nil && n == 0 && len(b) != 0 {
  215. return 0, io.EOF
  216. } else if err == syscall.ERROR_BROKEN_PIPE { //nolint:errorlint // err is Errno
  217. return 0, io.EOF
  218. } else {
  219. return n, err
  220. }
  221. }
  222. // Write writes to a file handle.
  223. func (f *win32File) Write(b []byte) (int, error) {
  224. c, err := f.prepareIO()
  225. if err != nil {
  226. return 0, err
  227. }
  228. defer f.wg.Done()
  229. if f.writeDeadline.timedout.isSet() {
  230. return 0, ErrTimeout
  231. }
  232. var bytes uint32
  233. err = syscall.WriteFile(f.handle, b, &bytes, &c.o)
  234. n, err := f.asyncIO(c, &f.writeDeadline, bytes, err)
  235. runtime.KeepAlive(b)
  236. return n, err
  237. }
  238. func (f *win32File) SetReadDeadline(deadline time.Time) error {
  239. return f.readDeadline.set(deadline)
  240. }
  241. func (f *win32File) SetWriteDeadline(deadline time.Time) error {
  242. return f.writeDeadline.set(deadline)
  243. }
  244. func (f *win32File) Flush() error {
  245. return syscall.FlushFileBuffers(f.handle)
  246. }
  247. func (f *win32File) Fd() uintptr {
  248. return uintptr(f.handle)
  249. }
  250. func (d *deadlineHandler) set(deadline time.Time) error {
  251. d.setLock.Lock()
  252. defer d.setLock.Unlock()
  253. if d.timer != nil {
  254. if !d.timer.Stop() {
  255. <-d.channel
  256. }
  257. d.timer = nil
  258. }
  259. d.timedout.setFalse()
  260. select {
  261. case <-d.channel:
  262. d.channelLock.Lock()
  263. d.channel = make(chan struct{})
  264. d.channelLock.Unlock()
  265. default:
  266. }
  267. if deadline.IsZero() {
  268. return nil
  269. }
  270. timeoutIO := func() {
  271. d.timedout.setTrue()
  272. close(d.channel)
  273. }
  274. now := time.Now()
  275. duration := deadline.Sub(now)
  276. if deadline.After(now) {
  277. // Deadline is in the future, set a timer to wait
  278. d.timer = time.AfterFunc(duration, timeoutIO)
  279. } else {
  280. // Deadline is in the past. Cancel all pending IO now.
  281. timeoutIO()
  282. }
  283. return nil
  284. }