iocp.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. //go:build windows
  2. package jobobject
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "unsafe"
  8. "github.com/Microsoft/hcsshim/internal/log"
  9. "github.com/Microsoft/hcsshim/internal/queue"
  10. "github.com/Microsoft/hcsshim/internal/winapi"
  11. "github.com/sirupsen/logrus"
  12. "golang.org/x/sys/windows"
  13. )
  14. var (
  15. ioInitOnce sync.Once
  16. initIOErr error
  17. // Global iocp handle that will be re-used for every job object
  18. ioCompletionPort windows.Handle
  19. // Mapping of job handle to queue to place notifications in.
  20. jobMap sync.Map
  21. )
  22. // MsgAllProcessesExited is a type representing a message that every process in a job has exited.
  23. type MsgAllProcessesExited struct{}
  24. // MsgUnimplemented represents a message that we are aware of, but that isn't implemented currently.
  25. // This should not be treated as an error.
  26. type MsgUnimplemented struct{}
  27. // pollIOCP polls the io completion port forever.
  28. func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
  29. var (
  30. overlapped uintptr
  31. code uint32
  32. key uintptr
  33. )
  34. for {
  35. err := windows.GetQueuedCompletionStatus(iocpHandle, &code, &key, (**windows.Overlapped)(unsafe.Pointer(&overlapped)), windows.INFINITE)
  36. if err != nil {
  37. log.G(ctx).WithError(err).Error("failed to poll for job object message")
  38. continue
  39. }
  40. if val, ok := jobMap.Load(key); ok {
  41. msq, ok := val.(*queue.MessageQueue)
  42. if !ok {
  43. log.G(ctx).WithField("value", msq).Warn("encountered non queue type in job map")
  44. continue
  45. }
  46. notification, err := parseMessage(code, overlapped)
  47. if err != nil {
  48. log.G(ctx).WithFields(logrus.Fields{
  49. "code": code,
  50. "overlapped": overlapped,
  51. }).Warn("failed to parse job object message")
  52. continue
  53. }
  54. if err := msq.Enqueue(notification); err == queue.ErrQueueClosed {
  55. // Write will only return an error when the queue is closed.
  56. // The only time a queue would ever be closed is when we call `Close` on
  57. // the job it belongs to which also removes it from the jobMap, so something
  58. // went wrong here. We can't return as this is reading messages for all jobs
  59. // so just log it and move on.
  60. log.G(ctx).WithFields(logrus.Fields{
  61. "code": code,
  62. "overlapped": overlapped,
  63. }).Warn("tried to write to a closed queue")
  64. continue
  65. }
  66. } else {
  67. log.G(ctx).Warn("received a message for a job not present in the mapping")
  68. }
  69. }
  70. }
  71. func parseMessage(code uint32, overlapped uintptr) (interface{}, error) {
  72. // Check code and parse out relevant information related to that notification
  73. // that we care about. For now all we handle is the message that all processes
  74. // in the job have exited.
  75. switch code {
  76. case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
  77. return MsgAllProcessesExited{}, nil
  78. // Other messages for completeness and a check to make sure that if we fall
  79. // into the default case that this is a code we don't know how to handle.
  80. case winapi.JOB_OBJECT_MSG_END_OF_JOB_TIME:
  81. case winapi.JOB_OBJECT_MSG_END_OF_PROCESS_TIME:
  82. case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT:
  83. case winapi.JOB_OBJECT_MSG_NEW_PROCESS:
  84. case winapi.JOB_OBJECT_MSG_EXIT_PROCESS:
  85. case winapi.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
  86. case winapi.JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT:
  87. case winapi.JOB_OBJECT_MSG_JOB_MEMORY_LIMIT:
  88. case winapi.JOB_OBJECT_MSG_NOTIFICATION_LIMIT:
  89. default:
  90. return nil, fmt.Errorf("unknown job notification type: %d", code)
  91. }
  92. return MsgUnimplemented{}, nil
  93. }
  94. // Assigns an IO completion port to get notified of events for the registered job
  95. // object.
  96. func attachIOCP(job windows.Handle, iocp windows.Handle) error {
  97. info := winapi.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{
  98. CompletionKey: job,
  99. CompletionPort: iocp,
  100. }
  101. _, err := windows.SetInformationJobObject(job, windows.JobObjectAssociateCompletionPortInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)))
  102. return err
  103. }