console_linux.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. // +build linux
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package console
  15. import (
  16. "io"
  17. "os"
  18. "sync"
  19. "golang.org/x/sys/unix"
  20. )
  21. const (
  22. maxEvents = 128
  23. )
  24. // Epoller manages multiple epoll consoles using edge-triggered epoll api so we
  25. // dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
  26. // For more details, see:
  27. // - https://github.com/systemd/systemd/pull/4262
  28. // - https://github.com/moby/moby/issues/27202
  29. //
  30. // Example usage of Epoller and EpollConsole can be as follow:
  31. //
  32. // epoller, _ := NewEpoller()
  33. // epollConsole, _ := epoller.Add(console)
  34. // go epoller.Wait()
  35. // var (
  36. // b bytes.Buffer
  37. // wg sync.WaitGroup
  38. // )
  39. // wg.Add(1)
  40. // go func() {
  41. // io.Copy(&b, epollConsole)
  42. // wg.Done()
  43. // }()
  44. // // perform I/O on the console
  45. // epollConsole.Shutdown(epoller.CloseConsole)
  46. // wg.Wait()
  47. // epollConsole.Close()
  48. type Epoller struct {
  49. efd int
  50. mu sync.Mutex
  51. fdMapping map[int]*EpollConsole
  52. }
  53. // NewEpoller returns an instance of epoller with a valid epoll fd.
  54. func NewEpoller() (*Epoller, error) {
  55. efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return &Epoller{
  60. efd: efd,
  61. fdMapping: make(map[int]*EpollConsole),
  62. }, nil
  63. }
  64. // Add creates a epoll console based on the provided console. The console will
  65. // be registered with EPOLLET (i.e. using edge-triggered notification) and its
  66. // file descriptor will be set to non-blocking mode. After this, user should use
  67. // the return console to perform I/O.
  68. func (e *Epoller) Add(console Console) (*EpollConsole, error) {
  69. sysfd := int(console.Fd())
  70. // Set sysfd to non-blocking mode
  71. if err := unix.SetNonblock(sysfd, true); err != nil {
  72. return nil, err
  73. }
  74. ev := unix.EpollEvent{
  75. Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
  76. Fd: int32(sysfd),
  77. }
  78. if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
  79. return nil, err
  80. }
  81. ef := &EpollConsole{
  82. Console: console,
  83. sysfd: sysfd,
  84. readc: sync.NewCond(&sync.Mutex{}),
  85. writec: sync.NewCond(&sync.Mutex{}),
  86. }
  87. e.mu.Lock()
  88. e.fdMapping[sysfd] = ef
  89. e.mu.Unlock()
  90. return ef, nil
  91. }
  92. // Wait starts the loop to wait for its consoles' notifications and signal
  93. // appropriate console that it can perform I/O.
  94. func (e *Epoller) Wait() error {
  95. events := make([]unix.EpollEvent, maxEvents)
  96. for {
  97. n, err := unix.EpollWait(e.efd, events, -1)
  98. if err != nil {
  99. // EINTR: The call was interrupted by a signal handler before either
  100. // any of the requested events occurred or the timeout expired
  101. if err == unix.EINTR {
  102. continue
  103. }
  104. return err
  105. }
  106. for i := 0; i < n; i++ {
  107. ev := &events[i]
  108. // the console is ready to be read from
  109. if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  110. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  111. epfile.signalRead()
  112. }
  113. }
  114. // the console is ready to be written to
  115. if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  116. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  117. epfile.signalWrite()
  118. }
  119. }
  120. }
  121. }
  122. }
  123. // Close unregister the console's file descriptor from epoll interface
  124. func (e *Epoller) CloseConsole(fd int) error {
  125. e.mu.Lock()
  126. defer e.mu.Unlock()
  127. delete(e.fdMapping, fd)
  128. return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
  129. }
  130. func (e *Epoller) getConsole(sysfd int) *EpollConsole {
  131. e.mu.Lock()
  132. f := e.fdMapping[sysfd]
  133. e.mu.Unlock()
  134. return f
  135. }
  136. // Close the epoll fd
  137. func (e *Epoller) Close() error {
  138. return unix.Close(e.efd)
  139. }
  140. // EpollConsole acts like a console but register its file descriptor with a
  141. // epoll fd and uses epoll API to perform I/O.
  142. type EpollConsole struct {
  143. Console
  144. readc *sync.Cond
  145. writec *sync.Cond
  146. sysfd int
  147. closed bool
  148. }
  149. // Read reads up to len(p) bytes into p. It returns the number of bytes read
  150. // (0 <= n <= len(p)) and any error encountered.
  151. //
  152. // If the console's read returns EAGAIN or EIO, we assumes that its a
  153. // temporary error because the other side went away and wait for the signal
  154. // generated by epoll event to continue.
  155. func (ec *EpollConsole) Read(p []byte) (n int, err error) {
  156. var read int
  157. ec.readc.L.Lock()
  158. defer ec.readc.L.Unlock()
  159. for {
  160. read, err = ec.Console.Read(p[n:])
  161. n += read
  162. if err != nil {
  163. var hangup bool
  164. if perr, ok := err.(*os.PathError); ok {
  165. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  166. } else {
  167. hangup = (err == unix.EAGAIN || err == unix.EIO)
  168. }
  169. // if the other end disappear, assume this is temporary and wait for the
  170. // signal to continue again. Unless we didnt read anything and the
  171. // console is already marked as closed then we should exit
  172. if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
  173. ec.readc.Wait()
  174. continue
  175. }
  176. }
  177. break
  178. }
  179. // if we didnt read anything then return io.EOF to end gracefully
  180. if n == 0 && len(p) > 0 && err == nil {
  181. err = io.EOF
  182. }
  183. // signal for others that we finished the read
  184. ec.readc.Signal()
  185. return n, err
  186. }
  187. // Writes len(p) bytes from p to the console. It returns the number of bytes
  188. // written from p (0 <= n <= len(p)) and any error encountered that caused
  189. // the write to stop early.
  190. //
  191. // If writes to the console returns EAGAIN or EIO, we assumes that its a
  192. // temporary error because the other side went away and wait for the signal
  193. // generated by epoll event to continue.
  194. func (ec *EpollConsole) Write(p []byte) (n int, err error) {
  195. var written int
  196. ec.writec.L.Lock()
  197. defer ec.writec.L.Unlock()
  198. for {
  199. written, err = ec.Console.Write(p[n:])
  200. n += written
  201. if err != nil {
  202. var hangup bool
  203. if perr, ok := err.(*os.PathError); ok {
  204. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  205. } else {
  206. hangup = (err == unix.EAGAIN || err == unix.EIO)
  207. }
  208. // if the other end disappear, assume this is temporary and wait for the
  209. // signal to continue again.
  210. if hangup {
  211. ec.writec.Wait()
  212. continue
  213. }
  214. }
  215. // unrecoverable error, break the loop and return the error
  216. break
  217. }
  218. if n < len(p) && err == nil {
  219. err = io.ErrShortWrite
  220. }
  221. // signal for others that we finished the write
  222. ec.writec.Signal()
  223. return n, err
  224. }
  225. // Close closed the file descriptor and signal call waiters for this fd.
  226. // It accepts a callback which will be called with the console's fd. The
  227. // callback typically will be used to do further cleanup such as unregister the
  228. // console's fd from the epoll interface.
  229. // User should call Shutdown and wait for all I/O operation to be finished
  230. // before closing the console.
  231. func (ec *EpollConsole) Shutdown(close func(int) error) error {
  232. ec.readc.L.Lock()
  233. defer ec.readc.L.Unlock()
  234. ec.writec.L.Lock()
  235. defer ec.writec.L.Unlock()
  236. ec.readc.Broadcast()
  237. ec.writec.Broadcast()
  238. ec.closed = true
  239. return close(ec.sysfd)
  240. }
  241. // signalRead signals that the console is readable.
  242. func (ec *EpollConsole) signalRead() {
  243. ec.readc.Signal()
  244. }
  245. // signalWrite signals that the console is writable.
  246. func (ec *EpollConsole) signalWrite() {
  247. ec.writec.Signal()
  248. }