console_linux.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // +build linux
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package console
  15. import (
  16. "io"
  17. "os"
  18. "sync"
  19. "golang.org/x/sys/unix"
  20. )
  21. const (
  22. maxEvents = 128
  23. )
  24. // Epoller manages multiple epoll consoles using edge-triggered epoll api so we
  25. // dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
  26. // For more details, see:
  27. // - https://github.com/systemd/systemd/pull/4262
  28. // - https://github.com/moby/moby/issues/27202
  29. //
  30. // Example usage of Epoller and EpollConsole can be as follow:
  31. //
  32. // epoller, _ := NewEpoller()
  33. // epollConsole, _ := epoller.Add(console)
  34. // go epoller.Wait()
  35. // var (
  36. // b bytes.Buffer
  37. // wg sync.WaitGroup
  38. // )
  39. // wg.Add(1)
  40. // go func() {
  41. // io.Copy(&b, epollConsole)
  42. // wg.Done()
  43. // }()
  44. // // perform I/O on the console
  45. // epollConsole.Shutdown(epoller.CloseConsole)
  46. // wg.Wait()
  47. // epollConsole.Close()
  48. type Epoller struct {
  49. efd int
  50. mu sync.Mutex
  51. fdMapping map[int]*EpollConsole
  52. closeOnce sync.Once
  53. }
  54. // NewEpoller returns an instance of epoller with a valid epoll fd.
  55. func NewEpoller() (*Epoller, error) {
  56. efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
  57. if err != nil {
  58. return nil, err
  59. }
  60. return &Epoller{
  61. efd: efd,
  62. fdMapping: make(map[int]*EpollConsole),
  63. }, nil
  64. }
  65. // Add creates an epoll console based on the provided console. The console will
  66. // be registered with EPOLLET (i.e. using edge-triggered notification) and its
  67. // file descriptor will be set to non-blocking mode. After this, user should use
  68. // the return console to perform I/O.
  69. func (e *Epoller) Add(console Console) (*EpollConsole, error) {
  70. sysfd := int(console.Fd())
  71. // Set sysfd to non-blocking mode
  72. if err := unix.SetNonblock(sysfd, true); err != nil {
  73. return nil, err
  74. }
  75. ev := unix.EpollEvent{
  76. Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
  77. Fd: int32(sysfd),
  78. }
  79. if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
  80. return nil, err
  81. }
  82. ef := &EpollConsole{
  83. Console: console,
  84. sysfd: sysfd,
  85. readc: sync.NewCond(&sync.Mutex{}),
  86. writec: sync.NewCond(&sync.Mutex{}),
  87. }
  88. e.mu.Lock()
  89. e.fdMapping[sysfd] = ef
  90. e.mu.Unlock()
  91. return ef, nil
  92. }
  93. // Wait starts the loop to wait for its consoles' notifications and signal
  94. // appropriate console that it can perform I/O.
  95. func (e *Epoller) Wait() error {
  96. events := make([]unix.EpollEvent, maxEvents)
  97. for {
  98. n, err := unix.EpollWait(e.efd, events, -1)
  99. if err != nil {
  100. // EINTR: The call was interrupted by a signal handler before either
  101. // any of the requested events occurred or the timeout expired
  102. if err == unix.EINTR {
  103. continue
  104. }
  105. return err
  106. }
  107. for i := 0; i < n; i++ {
  108. ev := &events[i]
  109. // the console is ready to be read from
  110. if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  111. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  112. epfile.signalRead()
  113. }
  114. }
  115. // the console is ready to be written to
  116. if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  117. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  118. epfile.signalWrite()
  119. }
  120. }
  121. }
  122. }
  123. }
  124. // CloseConsole unregisters the console's file descriptor from epoll interface
  125. func (e *Epoller) CloseConsole(fd int) error {
  126. e.mu.Lock()
  127. defer e.mu.Unlock()
  128. delete(e.fdMapping, fd)
  129. return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
  130. }
  131. func (e *Epoller) getConsole(sysfd int) *EpollConsole {
  132. e.mu.Lock()
  133. f := e.fdMapping[sysfd]
  134. e.mu.Unlock()
  135. return f
  136. }
  137. // Close closes the epoll fd
  138. func (e *Epoller) Close() error {
  139. closeErr := os.ErrClosed // default to "file already closed"
  140. e.closeOnce.Do(func() {
  141. closeErr = unix.Close(e.efd)
  142. })
  143. return closeErr
  144. }
  145. // EpollConsole acts like a console but registers its file descriptor with an
  146. // epoll fd and uses epoll API to perform I/O.
  147. type EpollConsole struct {
  148. Console
  149. readc *sync.Cond
  150. writec *sync.Cond
  151. sysfd int
  152. closed bool
  153. }
  154. // Read reads up to len(p) bytes into p. It returns the number of bytes read
  155. // (0 <= n <= len(p)) and any error encountered.
  156. //
  157. // If the console's read returns EAGAIN or EIO, we assume that it's a
  158. // temporary error because the other side went away and wait for the signal
  159. // generated by epoll event to continue.
  160. func (ec *EpollConsole) Read(p []byte) (n int, err error) {
  161. var read int
  162. ec.readc.L.Lock()
  163. defer ec.readc.L.Unlock()
  164. for {
  165. read, err = ec.Console.Read(p[n:])
  166. n += read
  167. if err != nil {
  168. var hangup bool
  169. if perr, ok := err.(*os.PathError); ok {
  170. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  171. } else {
  172. hangup = (err == unix.EAGAIN || err == unix.EIO)
  173. }
  174. // if the other end disappear, assume this is temporary and wait for the
  175. // signal to continue again. Unless we didnt read anything and the
  176. // console is already marked as closed then we should exit
  177. if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
  178. ec.readc.Wait()
  179. continue
  180. }
  181. }
  182. break
  183. }
  184. // if we didnt read anything then return io.EOF to end gracefully
  185. if n == 0 && len(p) > 0 && err == nil {
  186. err = io.EOF
  187. }
  188. // signal for others that we finished the read
  189. ec.readc.Signal()
  190. return n, err
  191. }
  192. // Writes len(p) bytes from p to the console. It returns the number of bytes
  193. // written from p (0 <= n <= len(p)) and any error encountered that caused
  194. // the write to stop early.
  195. //
  196. // If writes to the console returns EAGAIN or EIO, we assume that it's a
  197. // temporary error because the other side went away and wait for the signal
  198. // generated by epoll event to continue.
  199. func (ec *EpollConsole) Write(p []byte) (n int, err error) {
  200. var written int
  201. ec.writec.L.Lock()
  202. defer ec.writec.L.Unlock()
  203. for {
  204. written, err = ec.Console.Write(p[n:])
  205. n += written
  206. if err != nil {
  207. var hangup bool
  208. if perr, ok := err.(*os.PathError); ok {
  209. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  210. } else {
  211. hangup = (err == unix.EAGAIN || err == unix.EIO)
  212. }
  213. // if the other end disappears, assume this is temporary and wait for the
  214. // signal to continue again.
  215. if hangup {
  216. ec.writec.Wait()
  217. continue
  218. }
  219. }
  220. // unrecoverable error, break the loop and return the error
  221. break
  222. }
  223. if n < len(p) && err == nil {
  224. err = io.ErrShortWrite
  225. }
  226. // signal for others that we finished the write
  227. ec.writec.Signal()
  228. return n, err
  229. }
  230. // Shutdown closes the file descriptor and signals call waiters for this fd.
  231. // It accepts a callback which will be called with the console's fd. The
  232. // callback typically will be used to do further cleanup such as unregister the
  233. // console's fd from the epoll interface.
  234. // User should call Shutdown and wait for all I/O operation to be finished
  235. // before closing the console.
  236. func (ec *EpollConsole) Shutdown(close func(int) error) error {
  237. ec.readc.L.Lock()
  238. defer ec.readc.L.Unlock()
  239. ec.writec.L.Lock()
  240. defer ec.writec.L.Unlock()
  241. ec.readc.Broadcast()
  242. ec.writec.Broadcast()
  243. ec.closed = true
  244. return close(ec.sysfd)
  245. }
  246. // signalRead signals that the console is readable.
  247. func (ec *EpollConsole) signalRead() {
  248. ec.readc.L.Lock()
  249. ec.readc.Signal()
  250. ec.readc.L.Unlock()
  251. }
  252. // signalWrite signals that the console is writable.
  253. func (ec *EpollConsole) signalWrite() {
  254. ec.writec.L.Lock()
  255. ec.writec.Signal()
  256. ec.writec.L.Unlock()
  257. }