|
@@ -21,6 +21,7 @@ import (
|
|
|
"context"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "io/fs"
|
|
|
"math"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
@@ -478,9 +479,9 @@ func (c *Manager) Delete() error {
|
|
|
return remove(c.path)
|
|
|
}
|
|
|
|
|
|
-func (c *Manager) Procs(recursive bool) ([]uint64, error) {
|
|
|
- var processes []uint64
|
|
|
- err := filepath.Walk(c.path, func(p string, info os.FileInfo, err error) error {
|
|
|
+func (c *Manager) getTasks(recursive bool, tType string) ([]uint64, error) {
|
|
|
+ var tasks []uint64
|
|
|
+ err := filepath.Walk(c.path, func(p string, info fs.FileInfo, err error) error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -491,17 +492,25 @@ func (c *Manager) Procs(recursive bool) ([]uint64, error) {
|
|
|
return filepath.SkipDir
|
|
|
}
|
|
|
_, name := filepath.Split(p)
|
|
|
- if name != cgroupProcs {
|
|
|
+ if name != tType {
|
|
|
return nil
|
|
|
}
|
|
|
- procs, err := parseCgroupProcsFile(p)
|
|
|
+ curTasks, err := parseCgroupTasksFile(p)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- processes = append(processes, procs...)
|
|
|
+ tasks = append(tasks, curTasks...)
|
|
|
return nil
|
|
|
})
|
|
|
- return processes, err
|
|
|
+ return tasks, err
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Manager) Procs(recursive bool) ([]uint64, error) {
|
|
|
+ return c.getTasks(recursive, cgroupProcs)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Manager) Threads(recursive bool) ([]uint64, error) {
|
|
|
+ return c.getTasks(recursive, cgroupThreads)
|
|
|
}
|
|
|
|
|
|
func (c *Manager) MoveTo(destination *Manager) error {
|
|
@@ -559,6 +568,7 @@ func (c *Manager) Stat() (*stats.Metrics, error) {
|
|
|
NrPeriods: out["nr_periods"],
|
|
|
NrThrottled: out["nr_throttled"],
|
|
|
ThrottledUsec: out["throttled_usec"],
|
|
|
+ PSI: getStatPSIFromFile(filepath.Join(c.path, "cpu.pressure")),
|
|
|
}
|
|
|
metrics.Memory = &stats.MemoryStat{
|
|
|
Anon: out["anon"],
|
|
@@ -594,8 +604,11 @@ func (c *Manager) Stat() (*stats.Metrics, error) {
|
|
|
ThpCollapseAlloc: out["thp_collapse_alloc"],
|
|
|
Usage: getStatFileContentUint64(filepath.Join(c.path, "memory.current")),
|
|
|
UsageLimit: getStatFileContentUint64(filepath.Join(c.path, "memory.max")),
|
|
|
+ MaxUsage: getStatFileContentUint64(filepath.Join(c.path, "memory.peak")),
|
|
|
SwapUsage: getStatFileContentUint64(filepath.Join(c.path, "memory.swap.current")),
|
|
|
SwapLimit: getStatFileContentUint64(filepath.Join(c.path, "memory.swap.max")),
|
|
|
+ SwapMaxUsage: getStatFileContentUint64(filepath.Join(c.path, "memory.swap.peak")),
|
|
|
+ PSI: getStatPSIFromFile(filepath.Join(c.path, "memory.pressure")),
|
|
|
}
|
|
|
if len(memoryEvents) > 0 {
|
|
|
metrics.MemoryEvents = &stats.MemoryEvents{
|
|
@@ -606,7 +619,10 @@ func (c *Manager) Stat() (*stats.Metrics, error) {
|
|
|
OomKill: memoryEvents["oom_kill"],
|
|
|
}
|
|
|
}
|
|
|
- metrics.Io = &stats.IOStat{Usage: readIoStats(c.path)}
|
|
|
+ metrics.Io = &stats.IOStat{
|
|
|
+ Usage: readIoStats(c.path),
|
|
|
+ PSI: getStatPSIFromFile(filepath.Join(c.path, "io.pressure")),
|
|
|
+ }
|
|
|
metrics.Rdma = &stats.RdmaStat{
|
|
|
Current: rdmaStats(filepath.Join(c.path, "rdma.current")),
|
|
|
Limit: rdmaStats(filepath.Join(c.path, "rdma.max")),
|
|
@@ -870,14 +886,7 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
|
|
|
newSystemdProperty("TasksMax", uint64(resources.Pids.Max)))
|
|
|
}
|
|
|
|
|
|
- statusChan := make(chan string, 1)
|
|
|
- if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err == nil {
|
|
|
- select {
|
|
|
- case <-statusChan:
|
|
|
- case <-time.After(time.Second):
|
|
|
- logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
|
|
|
- }
|
|
|
- } else if !isUnitExists(err) {
|
|
|
+ if err := startUnit(conn, group, properties, pid == -1); err != nil {
|
|
|
return &Manager{}, err
|
|
|
}
|
|
|
|
|
@@ -886,6 +895,60 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
+func startUnit(conn *systemdDbus.Conn, group string, properties []systemdDbus.Property, ignoreExists bool) error {
|
|
|
+ ctx := context.TODO()
|
|
|
+
|
|
|
+ statusChan := make(chan string, 1)
|
|
|
+ defer close(statusChan)
|
|
|
+
|
|
|
+ retry := true
|
|
|
+ started := false
|
|
|
+
|
|
|
+ for !started {
|
|
|
+ if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err != nil {
|
|
|
+ if !isUnitExists(err) {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if ignoreExists {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if retry {
|
|
|
+ retry = false
|
|
|
+ // When a unit of the same name already exists, it may be a leftover failed unit.
|
|
|
+ // If we reset it once, systemd can try to remove it.
|
|
|
+ attemptFailedUnitReset(conn, group)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+ } else {
|
|
|
+ started = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case s := <-statusChan:
|
|
|
+ if s != "done" {
|
|
|
+ attemptFailedUnitReset(conn, group)
|
|
|
+ return fmt.Errorf("error creating systemd unit `%s`: got `%s`", group, s)
|
|
|
+ }
|
|
|
+ case <-time.After(30 * time.Second):
|
|
|
+ logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func attemptFailedUnitReset(conn *systemdDbus.Conn, group string) {
|
|
|
+ err := conn.ResetFailedUnitContext(context.TODO(), group)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ logrus.Warnf("Unable to reset failed unit: %v", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func LoadSystemd(slice, group string) (*Manager, error) {
|
|
|
if slice == "" {
|
|
|
slice = defaultSlice
|