console_linux.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. //go:build linux
  2. // +build linux
  3. /*
  4. Copyright The containerd Authors.
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. */
  15. package console
  16. import (
  17. "io"
  18. "os"
  19. "sync"
  20. "golang.org/x/sys/unix"
  21. )
  22. const (
  23. maxEvents = 128
  24. )
  25. // Epoller manages multiple epoll consoles using edge-triggered epoll api so we
  26. // dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
  27. // For more details, see:
  28. // - https://github.com/systemd/systemd/pull/4262
  29. // - https://github.com/moby/moby/issues/27202
  30. //
  31. // Example usage of Epoller and EpollConsole can be as follow:
  32. //
  33. // epoller, _ := NewEpoller()
  34. // epollConsole, _ := epoller.Add(console)
  35. // go epoller.Wait()
  36. // var (
  37. // b bytes.Buffer
  38. // wg sync.WaitGroup
  39. // )
  40. // wg.Add(1)
  41. // go func() {
  42. // io.Copy(&b, epollConsole)
  43. // wg.Done()
  44. // }()
  45. // // perform I/O on the console
  46. // epollConsole.Shutdown(epoller.CloseConsole)
  47. // wg.Wait()
  48. // epollConsole.Close()
  49. type Epoller struct {
  50. efd int
  51. mu sync.Mutex
  52. fdMapping map[int]*EpollConsole
  53. closeOnce sync.Once
  54. }
  55. // NewEpoller returns an instance of epoller with a valid epoll fd.
  56. func NewEpoller() (*Epoller, error) {
  57. efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
  58. if err != nil {
  59. return nil, err
  60. }
  61. return &Epoller{
  62. efd: efd,
  63. fdMapping: make(map[int]*EpollConsole),
  64. }, nil
  65. }
  66. // Add creates an epoll console based on the provided console. The console will
  67. // be registered with EPOLLET (i.e. using edge-triggered notification) and its
  68. // file descriptor will be set to non-blocking mode. After this, user should use
  69. // the return console to perform I/O.
  70. func (e *Epoller) Add(console Console) (*EpollConsole, error) {
  71. sysfd := int(console.Fd())
  72. // Set sysfd to non-blocking mode
  73. if err := unix.SetNonblock(sysfd, true); err != nil {
  74. return nil, err
  75. }
  76. ev := unix.EpollEvent{
  77. Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
  78. Fd: int32(sysfd),
  79. }
  80. if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
  81. return nil, err
  82. }
  83. ef := &EpollConsole{
  84. Console: console,
  85. sysfd: sysfd,
  86. readc: sync.NewCond(&sync.Mutex{}),
  87. writec: sync.NewCond(&sync.Mutex{}),
  88. }
  89. e.mu.Lock()
  90. e.fdMapping[sysfd] = ef
  91. e.mu.Unlock()
  92. return ef, nil
  93. }
  94. // Wait starts the loop to wait for its consoles' notifications and signal
  95. // appropriate console that it can perform I/O.
  96. func (e *Epoller) Wait() error {
  97. events := make([]unix.EpollEvent, maxEvents)
  98. for {
  99. n, err := unix.EpollWait(e.efd, events, -1)
  100. if err != nil {
  101. // EINTR: The call was interrupted by a signal handler before either
  102. // any of the requested events occurred or the timeout expired
  103. if err == unix.EINTR {
  104. continue
  105. }
  106. return err
  107. }
  108. for i := 0; i < n; i++ {
  109. ev := &events[i]
  110. // the console is ready to be read from
  111. if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  112. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  113. epfile.signalRead()
  114. }
  115. }
  116. // the console is ready to be written to
  117. if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  118. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  119. epfile.signalWrite()
  120. }
  121. }
  122. }
  123. }
  124. }
  125. // CloseConsole unregisters the console's file descriptor from epoll interface
  126. func (e *Epoller) CloseConsole(fd int) error {
  127. e.mu.Lock()
  128. defer e.mu.Unlock()
  129. delete(e.fdMapping, fd)
  130. return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
  131. }
  132. func (e *Epoller) getConsole(sysfd int) *EpollConsole {
  133. e.mu.Lock()
  134. f := e.fdMapping[sysfd]
  135. e.mu.Unlock()
  136. return f
  137. }
  138. // Close closes the epoll fd
  139. func (e *Epoller) Close() error {
  140. closeErr := os.ErrClosed // default to "file already closed"
  141. e.closeOnce.Do(func() {
  142. closeErr = unix.Close(e.efd)
  143. })
  144. return closeErr
  145. }
  146. // EpollConsole acts like a console but registers its file descriptor with an
  147. // epoll fd and uses epoll API to perform I/O.
  148. type EpollConsole struct {
  149. Console
  150. readc *sync.Cond
  151. writec *sync.Cond
  152. sysfd int
  153. closed bool
  154. }
  155. // Read reads up to len(p) bytes into p. It returns the number of bytes read
  156. // (0 <= n <= len(p)) and any error encountered.
  157. //
  158. // If the console's read returns EAGAIN or EIO, we assume that it's a
  159. // temporary error because the other side went away and wait for the signal
  160. // generated by epoll event to continue.
  161. func (ec *EpollConsole) Read(p []byte) (n int, err error) {
  162. var read int
  163. ec.readc.L.Lock()
  164. defer ec.readc.L.Unlock()
  165. for {
  166. read, err = ec.Console.Read(p[n:])
  167. n += read
  168. if err != nil {
  169. var hangup bool
  170. if perr, ok := err.(*os.PathError); ok {
  171. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  172. } else {
  173. hangup = (err == unix.EAGAIN || err == unix.EIO)
  174. }
  175. // if the other end disappear, assume this is temporary and wait for the
  176. // signal to continue again. Unless we didnt read anything and the
  177. // console is already marked as closed then we should exit
  178. if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
  179. ec.readc.Wait()
  180. continue
  181. }
  182. }
  183. break
  184. }
  185. // if we didnt read anything then return io.EOF to end gracefully
  186. if n == 0 && len(p) > 0 && err == nil {
  187. err = io.EOF
  188. }
  189. // signal for others that we finished the read
  190. ec.readc.Signal()
  191. return n, err
  192. }
  193. // Writes len(p) bytes from p to the console. It returns the number of bytes
  194. // written from p (0 <= n <= len(p)) and any error encountered that caused
  195. // the write to stop early.
  196. //
  197. // If writes to the console returns EAGAIN or EIO, we assume that it's a
  198. // temporary error because the other side went away and wait for the signal
  199. // generated by epoll event to continue.
  200. func (ec *EpollConsole) Write(p []byte) (n int, err error) {
  201. var written int
  202. ec.writec.L.Lock()
  203. defer ec.writec.L.Unlock()
  204. for {
  205. written, err = ec.Console.Write(p[n:])
  206. n += written
  207. if err != nil {
  208. var hangup bool
  209. if perr, ok := err.(*os.PathError); ok {
  210. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  211. } else {
  212. hangup = (err == unix.EAGAIN || err == unix.EIO)
  213. }
  214. // if the other end disappears, assume this is temporary and wait for the
  215. // signal to continue again.
  216. if hangup {
  217. ec.writec.Wait()
  218. continue
  219. }
  220. }
  221. // unrecoverable error, break the loop and return the error
  222. break
  223. }
  224. if n < len(p) && err == nil {
  225. err = io.ErrShortWrite
  226. }
  227. // signal for others that we finished the write
  228. ec.writec.Signal()
  229. return n, err
  230. }
  231. // Shutdown closes the file descriptor and signals call waiters for this fd.
  232. // It accepts a callback which will be called with the console's fd. The
  233. // callback typically will be used to do further cleanup such as unregister the
  234. // console's fd from the epoll interface.
  235. // User should call Shutdown and wait for all I/O operation to be finished
  236. // before closing the console.
  237. func (ec *EpollConsole) Shutdown(close func(int) error) error {
  238. ec.readc.L.Lock()
  239. defer ec.readc.L.Unlock()
  240. ec.writec.L.Lock()
  241. defer ec.writec.L.Unlock()
  242. ec.readc.Broadcast()
  243. ec.writec.Broadcast()
  244. ec.closed = true
  245. return close(ec.sysfd)
  246. }
  247. // signalRead signals that the console is readable.
  248. func (ec *EpollConsole) signalRead() {
  249. ec.readc.L.Lock()
  250. ec.readc.Signal()
  251. ec.readc.L.Unlock()
  252. }
  253. // signalWrite signals that the console is writable.
  254. func (ec *EpollConsole) signalWrite() {
  255. ec.writec.L.Lock()
  256. ec.writec.Signal()
  257. ec.writec.L.Unlock()
  258. }