system.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. package hcs
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "strconv"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "github.com/Microsoft/hcsshim/internal/interop"
  11. "github.com/Microsoft/hcsshim/internal/schema1"
  12. "github.com/Microsoft/hcsshim/internal/timeout"
  13. "github.com/sirupsen/logrus"
  14. )
  15. // currentContainerStarts is used to limit the number of concurrent container
  16. // starts.
  17. var currentContainerStarts containerStarts
  18. type containerStarts struct {
  19. maxParallel int
  20. inProgress int
  21. sync.Mutex
  22. }
  23. func init() {
  24. mpsS := os.Getenv("HCSSHIM_MAX_PARALLEL_START")
  25. if len(mpsS) > 0 {
  26. mpsI, err := strconv.Atoi(mpsS)
  27. if err != nil || mpsI < 0 {
  28. return
  29. }
  30. currentContainerStarts.maxParallel = mpsI
  31. }
  32. }
  33. type System struct {
  34. handleLock sync.RWMutex
  35. handle hcsSystem
  36. id string
  37. callbackNumber uintptr
  38. }
  39. // CreateComputeSystem creates a new compute system with the given configuration but does not start it.
  40. func CreateComputeSystem(id string, hcsDocumentInterface interface{}) (*System, error) {
  41. operation := "CreateComputeSystem"
  42. title := "hcsshim::" + operation
  43. computeSystem := &System{
  44. id: id,
  45. }
  46. hcsDocumentB, err := json.Marshal(hcsDocumentInterface)
  47. if err != nil {
  48. return nil, err
  49. }
  50. hcsDocument := string(hcsDocumentB)
  51. logrus.Debugf(title+" ID=%s config=%s", id, hcsDocument)
  52. var (
  53. resultp *uint16
  54. identity syscall.Handle
  55. )
  56. completed := false
  57. go syscallWatcher(fmt.Sprintf("CreateCompleteSystem %s: %s", id, hcsDocument), &completed)
  58. createError := hcsCreateComputeSystem(id, hcsDocument, identity, &computeSystem.handle, &resultp)
  59. completed = true
  60. if createError == nil || IsPending(createError) {
  61. if err := computeSystem.registerCallback(); err != nil {
  62. // Terminate the compute system if it still exists. We're okay to
  63. // ignore a failure here.
  64. computeSystem.Terminate()
  65. return nil, makeSystemError(computeSystem, operation, "", err, nil)
  66. }
  67. }
  68. events, err := processAsyncHcsResult(createError, resultp, computeSystem.callbackNumber, hcsNotificationSystemCreateCompleted, &timeout.SystemCreate)
  69. if err != nil {
  70. if err == ErrTimeout {
  71. // Terminate the compute system if it still exists. We're okay to
  72. // ignore a failure here.
  73. computeSystem.Terminate()
  74. }
  75. return nil, makeSystemError(computeSystem, operation, hcsDocument, err, events)
  76. }
  77. logrus.Debugf(title+" succeeded id=%s handle=%d", id, computeSystem.handle)
  78. return computeSystem, nil
  79. }
  80. // OpenComputeSystem opens an existing compute system by ID.
  81. func OpenComputeSystem(id string) (*System, error) {
  82. operation := "OpenComputeSystem"
  83. title := "hcsshim::" + operation
  84. logrus.Debugf(title+" ID=%s", id)
  85. computeSystem := &System{
  86. id: id,
  87. }
  88. var (
  89. handle hcsSystem
  90. resultp *uint16
  91. )
  92. err := hcsOpenComputeSystem(id, &handle, &resultp)
  93. events := processHcsResult(resultp)
  94. if err != nil {
  95. return nil, makeSystemError(computeSystem, operation, "", err, events)
  96. }
  97. computeSystem.handle = handle
  98. if err := computeSystem.registerCallback(); err != nil {
  99. return nil, makeSystemError(computeSystem, operation, "", err, nil)
  100. }
  101. logrus.Debugf(title+" succeeded id=%s handle=%d", id, handle)
  102. return computeSystem, nil
  103. }
  104. // GetComputeSystems gets a list of the compute systems on the system that match the query
  105. func GetComputeSystems(q schema1.ComputeSystemQuery) ([]schema1.ContainerProperties, error) {
  106. operation := "GetComputeSystems"
  107. title := "hcsshim::" + operation
  108. queryb, err := json.Marshal(q)
  109. if err != nil {
  110. return nil, err
  111. }
  112. query := string(queryb)
  113. logrus.Debugf(title+" query=%s", query)
  114. var (
  115. resultp *uint16
  116. computeSystemsp *uint16
  117. )
  118. completed := false
  119. go syscallWatcher(fmt.Sprintf("GetComputeSystems %s:", query), &completed)
  120. err = hcsEnumerateComputeSystems(query, &computeSystemsp, &resultp)
  121. completed = true
  122. events := processHcsResult(resultp)
  123. if err != nil {
  124. return nil, &HcsError{Op: operation, Err: err, Events: events}
  125. }
  126. if computeSystemsp == nil {
  127. return nil, ErrUnexpectedValue
  128. }
  129. computeSystemsRaw := interop.ConvertAndFreeCoTaskMemBytes(computeSystemsp)
  130. computeSystems := []schema1.ContainerProperties{}
  131. if err := json.Unmarshal(computeSystemsRaw, &computeSystems); err != nil {
  132. return nil, err
  133. }
  134. logrus.Debugf(title + " succeeded")
  135. return computeSystems, nil
  136. }
  137. // Start synchronously starts the computeSystem.
  138. func (computeSystem *System) Start() error {
  139. computeSystem.handleLock.RLock()
  140. defer computeSystem.handleLock.RUnlock()
  141. title := "hcsshim::ComputeSystem::Start ID=" + computeSystem.ID()
  142. logrus.Debugf(title)
  143. if computeSystem.handle == 0 {
  144. return makeSystemError(computeSystem, "Start", "", ErrAlreadyClosed, nil)
  145. }
  146. // This is a very simple backoff-retry loop to limit the number
  147. // of parallel container starts if environment variable
  148. // HCSSHIM_MAX_PARALLEL_START is set to a positive integer.
  149. // It should generally only be used as a workaround to various
  150. // platform issues that exist between RS1 and RS4 as of Aug 2018
  151. if currentContainerStarts.maxParallel > 0 {
  152. for {
  153. currentContainerStarts.Lock()
  154. if currentContainerStarts.inProgress < currentContainerStarts.maxParallel {
  155. currentContainerStarts.inProgress++
  156. currentContainerStarts.Unlock()
  157. break
  158. }
  159. if currentContainerStarts.inProgress == currentContainerStarts.maxParallel {
  160. currentContainerStarts.Unlock()
  161. time.Sleep(100 * time.Millisecond)
  162. }
  163. }
  164. // Make sure we decrement the count when we are done.
  165. defer func() {
  166. currentContainerStarts.Lock()
  167. currentContainerStarts.inProgress--
  168. currentContainerStarts.Unlock()
  169. }()
  170. }
  171. var resultp *uint16
  172. completed := false
  173. go syscallWatcher(fmt.Sprintf("StartComputeSystem %s:", computeSystem.ID()), &completed)
  174. err := hcsStartComputeSystem(computeSystem.handle, "", &resultp)
  175. completed = true
  176. events, err := processAsyncHcsResult(err, resultp, computeSystem.callbackNumber, hcsNotificationSystemStartCompleted, &timeout.SystemStart)
  177. if err != nil {
  178. return makeSystemError(computeSystem, "Start", "", err, events)
  179. }
  180. logrus.Debugf(title + " succeeded")
  181. return nil
  182. }
  183. // ID returns the compute system's identifier.
  184. func (computeSystem *System) ID() string {
  185. return computeSystem.id
  186. }
  187. // Shutdown requests a compute system shutdown, if IsPending() on the error returned is true,
  188. // it may not actually be shut down until Wait() succeeds.
  189. func (computeSystem *System) Shutdown() error {
  190. computeSystem.handleLock.RLock()
  191. defer computeSystem.handleLock.RUnlock()
  192. title := "hcsshim::ComputeSystem::Shutdown"
  193. logrus.Debugf(title)
  194. if computeSystem.handle == 0 {
  195. return makeSystemError(computeSystem, "Shutdown", "", ErrAlreadyClosed, nil)
  196. }
  197. var resultp *uint16
  198. completed := false
  199. go syscallWatcher(fmt.Sprintf("ShutdownComputeSystem %s:", computeSystem.ID()), &completed)
  200. err := hcsShutdownComputeSystem(computeSystem.handle, "", &resultp)
  201. completed = true
  202. events := processHcsResult(resultp)
  203. if err != nil {
  204. return makeSystemError(computeSystem, "Shutdown", "", err, events)
  205. }
  206. logrus.Debugf(title + " succeeded")
  207. return nil
  208. }
  209. // Terminate requests a compute system terminate, if IsPending() on the error returned is true,
  210. // it may not actually be shut down until Wait() succeeds.
  211. func (computeSystem *System) Terminate() error {
  212. computeSystem.handleLock.RLock()
  213. defer computeSystem.handleLock.RUnlock()
  214. title := "hcsshim::ComputeSystem::Terminate ID=" + computeSystem.ID()
  215. logrus.Debugf(title)
  216. if computeSystem.handle == 0 {
  217. return makeSystemError(computeSystem, "Terminate", "", ErrAlreadyClosed, nil)
  218. }
  219. var resultp *uint16
  220. completed := false
  221. go syscallWatcher(fmt.Sprintf("TerminateComputeSystem %s:", computeSystem.ID()), &completed)
  222. err := hcsTerminateComputeSystem(computeSystem.handle, "", &resultp)
  223. completed = true
  224. events := processHcsResult(resultp)
  225. if err != nil {
  226. return makeSystemError(computeSystem, "Terminate", "", err, events)
  227. }
  228. logrus.Debugf(title + " succeeded")
  229. return nil
  230. }
  231. // Wait synchronously waits for the compute system to shutdown or terminate.
  232. func (computeSystem *System) Wait() error {
  233. title := "hcsshim::ComputeSystem::Wait ID=" + computeSystem.ID()
  234. logrus.Debugf(title)
  235. err := waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil)
  236. if err != nil {
  237. return makeSystemError(computeSystem, "Wait", "", err, nil)
  238. }
  239. logrus.Debugf(title + " succeeded")
  240. return nil
  241. }
  242. // WaitTimeout synchronously waits for the compute system to terminate or the duration to elapse.
  243. // If the timeout expires, IsTimeout(err) == true
  244. func (computeSystem *System) WaitTimeout(timeout time.Duration) error {
  245. title := "hcsshim::ComputeSystem::WaitTimeout ID=" + computeSystem.ID()
  246. logrus.Debugf(title)
  247. err := waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, &timeout)
  248. if err != nil {
  249. return makeSystemError(computeSystem, "WaitTimeout", "", err, nil)
  250. }
  251. logrus.Debugf(title + " succeeded")
  252. return nil
  253. }
  254. func (computeSystem *System) Properties(types ...schema1.PropertyType) (*schema1.ContainerProperties, error) {
  255. computeSystem.handleLock.RLock()
  256. defer computeSystem.handleLock.RUnlock()
  257. queryj, err := json.Marshal(schema1.PropertyQuery{types})
  258. if err != nil {
  259. return nil, makeSystemError(computeSystem, "Properties", "", err, nil)
  260. }
  261. var resultp, propertiesp *uint16
  262. completed := false
  263. go syscallWatcher(fmt.Sprintf("GetComputeSystemProperties %s:", computeSystem.ID()), &completed)
  264. err = hcsGetComputeSystemProperties(computeSystem.handle, string(queryj), &propertiesp, &resultp)
  265. completed = true
  266. events := processHcsResult(resultp)
  267. if err != nil {
  268. return nil, makeSystemError(computeSystem, "Properties", "", err, events)
  269. }
  270. if propertiesp == nil {
  271. return nil, ErrUnexpectedValue
  272. }
  273. propertiesRaw := interop.ConvertAndFreeCoTaskMemBytes(propertiesp)
  274. properties := &schema1.ContainerProperties{}
  275. if err := json.Unmarshal(propertiesRaw, properties); err != nil {
  276. return nil, makeSystemError(computeSystem, "Properties", "", err, nil)
  277. }
  278. return properties, nil
  279. }
  280. // Pause pauses the execution of the computeSystem. This feature is not enabled in TP5.
  281. func (computeSystem *System) Pause() error {
  282. computeSystem.handleLock.RLock()
  283. defer computeSystem.handleLock.RUnlock()
  284. title := "hcsshim::ComputeSystem::Pause ID=" + computeSystem.ID()
  285. logrus.Debugf(title)
  286. if computeSystem.handle == 0 {
  287. return makeSystemError(computeSystem, "Pause", "", ErrAlreadyClosed, nil)
  288. }
  289. var resultp *uint16
  290. completed := false
  291. go syscallWatcher(fmt.Sprintf("PauseComputeSystem %s:", computeSystem.ID()), &completed)
  292. err := hcsPauseComputeSystem(computeSystem.handle, "", &resultp)
  293. completed = true
  294. events, err := processAsyncHcsResult(err, resultp, computeSystem.callbackNumber, hcsNotificationSystemPauseCompleted, &timeout.SystemPause)
  295. if err != nil {
  296. return makeSystemError(computeSystem, "Pause", "", err, events)
  297. }
  298. logrus.Debugf(title + " succeeded")
  299. return nil
  300. }
  301. // Resume resumes the execution of the computeSystem. This feature is not enabled in TP5.
  302. func (computeSystem *System) Resume() error {
  303. computeSystem.handleLock.RLock()
  304. defer computeSystem.handleLock.RUnlock()
  305. title := "hcsshim::ComputeSystem::Resume ID=" + computeSystem.ID()
  306. logrus.Debugf(title)
  307. if computeSystem.handle == 0 {
  308. return makeSystemError(computeSystem, "Resume", "", ErrAlreadyClosed, nil)
  309. }
  310. var resultp *uint16
  311. completed := false
  312. go syscallWatcher(fmt.Sprintf("ResumeComputeSystem %s:", computeSystem.ID()), &completed)
  313. err := hcsResumeComputeSystem(computeSystem.handle, "", &resultp)
  314. completed = true
  315. events, err := processAsyncHcsResult(err, resultp, computeSystem.callbackNumber, hcsNotificationSystemResumeCompleted, &timeout.SystemResume)
  316. if err != nil {
  317. return makeSystemError(computeSystem, "Resume", "", err, events)
  318. }
  319. logrus.Debugf(title + " succeeded")
  320. return nil
  321. }
  322. // CreateProcess launches a new process within the computeSystem.
  323. func (computeSystem *System) CreateProcess(c interface{}) (*Process, error) {
  324. computeSystem.handleLock.RLock()
  325. defer computeSystem.handleLock.RUnlock()
  326. title := "hcsshim::ComputeSystem::CreateProcess ID=" + computeSystem.ID()
  327. var (
  328. processInfo hcsProcessInformation
  329. processHandle hcsProcess
  330. resultp *uint16
  331. )
  332. if computeSystem.handle == 0 {
  333. return nil, makeSystemError(computeSystem, "CreateProcess", "", ErrAlreadyClosed, nil)
  334. }
  335. configurationb, err := json.Marshal(c)
  336. if err != nil {
  337. return nil, makeSystemError(computeSystem, "CreateProcess", "", err, nil)
  338. }
  339. configuration := string(configurationb)
  340. logrus.Debugf(title+" config=%s", configuration)
  341. completed := false
  342. go syscallWatcher(fmt.Sprintf("CreateProcess %s: %s", computeSystem.ID(), configuration), &completed)
  343. err = hcsCreateProcess(computeSystem.handle, configuration, &processInfo, &processHandle, &resultp)
  344. completed = true
  345. events := processHcsResult(resultp)
  346. if err != nil {
  347. return nil, makeSystemError(computeSystem, "CreateProcess", configuration, err, events)
  348. }
  349. process := &Process{
  350. handle: processHandle,
  351. processID: int(processInfo.ProcessId),
  352. system: computeSystem,
  353. cachedPipes: &cachedPipes{
  354. stdIn: processInfo.StdInput,
  355. stdOut: processInfo.StdOutput,
  356. stdErr: processInfo.StdError,
  357. },
  358. }
  359. if err := process.registerCallback(); err != nil {
  360. return nil, makeSystemError(computeSystem, "CreateProcess", "", err, nil)
  361. }
  362. logrus.Debugf(title+" succeeded processid=%d", process.processID)
  363. return process, nil
  364. }
  365. // OpenProcess gets an interface to an existing process within the computeSystem.
  366. func (computeSystem *System) OpenProcess(pid int) (*Process, error) {
  367. computeSystem.handleLock.RLock()
  368. defer computeSystem.handleLock.RUnlock()
  369. title := "hcsshim::ComputeSystem::OpenProcess ID=" + computeSystem.ID()
  370. logrus.Debugf(title+" processid=%d", pid)
  371. var (
  372. processHandle hcsProcess
  373. resultp *uint16
  374. )
  375. if computeSystem.handle == 0 {
  376. return nil, makeSystemError(computeSystem, "OpenProcess", "", ErrAlreadyClosed, nil)
  377. }
  378. completed := false
  379. go syscallWatcher(fmt.Sprintf("OpenProcess %s: %d", computeSystem.ID(), pid), &completed)
  380. err := hcsOpenProcess(computeSystem.handle, uint32(pid), &processHandle, &resultp)
  381. completed = true
  382. events := processHcsResult(resultp)
  383. if err != nil {
  384. return nil, makeSystemError(computeSystem, "OpenProcess", "", err, events)
  385. }
  386. process := &Process{
  387. handle: processHandle,
  388. processID: pid,
  389. system: computeSystem,
  390. }
  391. if err := process.registerCallback(); err != nil {
  392. return nil, makeSystemError(computeSystem, "OpenProcess", "", err, nil)
  393. }
  394. logrus.Debugf(title+" succeeded processid=%s", process.processID)
  395. return process, nil
  396. }
  397. // Close cleans up any state associated with the compute system but does not terminate or wait for it.
  398. func (computeSystem *System) Close() error {
  399. computeSystem.handleLock.Lock()
  400. defer computeSystem.handleLock.Unlock()
  401. title := "hcsshim::ComputeSystem::Close ID=" + computeSystem.ID()
  402. logrus.Debugf(title)
  403. // Don't double free this
  404. if computeSystem.handle == 0 {
  405. return nil
  406. }
  407. if err := computeSystem.unregisterCallback(); err != nil {
  408. return makeSystemError(computeSystem, "Close", "", err, nil)
  409. }
  410. completed := false
  411. go syscallWatcher(fmt.Sprintf("CloseComputeSystem %s:", computeSystem.ID()), &completed)
  412. err := hcsCloseComputeSystem(computeSystem.handle)
  413. completed = true
  414. if err != nil {
  415. return makeSystemError(computeSystem, "Close", "", err, nil)
  416. }
  417. computeSystem.handle = 0
  418. logrus.Debugf(title + " succeeded")
  419. return nil
  420. }
  421. func (computeSystem *System) registerCallback() error {
  422. context := &notifcationWatcherContext{
  423. channels: newChannels(),
  424. }
  425. callbackMapLock.Lock()
  426. callbackNumber := nextCallback
  427. nextCallback++
  428. callbackMap[callbackNumber] = context
  429. callbackMapLock.Unlock()
  430. var callbackHandle hcsCallback
  431. err := hcsRegisterComputeSystemCallback(computeSystem.handle, notificationWatcherCallback, callbackNumber, &callbackHandle)
  432. if err != nil {
  433. return err
  434. }
  435. context.handle = callbackHandle
  436. computeSystem.callbackNumber = callbackNumber
  437. return nil
  438. }
  439. func (computeSystem *System) unregisterCallback() error {
  440. callbackNumber := computeSystem.callbackNumber
  441. callbackMapLock.RLock()
  442. context := callbackMap[callbackNumber]
  443. callbackMapLock.RUnlock()
  444. if context == nil {
  445. return nil
  446. }
  447. handle := context.handle
  448. if handle == 0 {
  449. return nil
  450. }
  451. // hcsUnregisterComputeSystemCallback has its own syncronization
  452. // to wait for all callbacks to complete. We must NOT hold the callbackMapLock.
  453. err := hcsUnregisterComputeSystemCallback(handle)
  454. if err != nil {
  455. return err
  456. }
  457. closeChannels(context.channels)
  458. callbackMapLock.Lock()
  459. callbackMap[callbackNumber] = nil
  460. callbackMapLock.Unlock()
  461. handle = 0
  462. return nil
  463. }
  464. // Modifies the System by sending a request to HCS
  465. func (computeSystem *System) Modify(config interface{}) error {
  466. computeSystem.handleLock.RLock()
  467. defer computeSystem.handleLock.RUnlock()
  468. title := "hcsshim::Modify ID=" + computeSystem.id
  469. if computeSystem.handle == 0 {
  470. return makeSystemError(computeSystem, "Modify", "", ErrAlreadyClosed, nil)
  471. }
  472. requestJSON, err := json.Marshal(config)
  473. if err != nil {
  474. return err
  475. }
  476. requestString := string(requestJSON)
  477. logrus.Debugf(title + " " + requestString)
  478. var resultp *uint16
  479. completed := false
  480. go syscallWatcher(fmt.Sprintf("ModifyComputeSystem %s: %s", computeSystem.ID(), requestString), &completed)
  481. err = hcsModifyComputeSystem(computeSystem.handle, requestString, &resultp)
  482. completed = true
  483. events := processHcsResult(resultp)
  484. if err != nil {
  485. return makeSystemError(computeSystem, "Modify", requestString, err, events)
  486. }
  487. logrus.Debugf(title + " succeeded ")
  488. return nil
  489. }