cache.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /*
  2. Copyright © 2021 The CDI Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cdi
  14. import (
  15. "errors"
  16. "fmt"
  17. "io/fs"
  18. "os"
  19. "path/filepath"
  20. "sort"
  21. "strings"
  22. "sync"
  23. "github.com/container-orchestrated-devices/container-device-interface/internal/multierror"
  24. cdi "github.com/container-orchestrated-devices/container-device-interface/specs-go"
  25. "github.com/fsnotify/fsnotify"
  26. oci "github.com/opencontainers/runtime-spec/specs-go"
  27. )
  28. // Option is an option to change some aspect of default CDI behavior.
  29. type Option func(*Cache) error
  30. // Cache stores CDI Specs loaded from Spec directories.
  31. type Cache struct {
  32. sync.Mutex
  33. specDirs []string
  34. specs map[string][]*Spec
  35. devices map[string]*Device
  36. errors map[string][]error
  37. dirErrors map[string]error
  38. autoRefresh bool
  39. watch *watch
  40. }
  41. // WithAutoRefresh returns an option to control automatic Cache refresh.
  42. // By default auto-refresh is enabled, the list of Spec directories are
  43. // monitored and the Cache is automatically refreshed whenever a change
  44. // is detected. This option can be used to disable this behavior when a
  45. // manually refreshed mode is preferable.
  46. func WithAutoRefresh(autoRefresh bool) Option {
  47. return func(c *Cache) error {
  48. c.autoRefresh = autoRefresh
  49. return nil
  50. }
  51. }
  52. // NewCache creates a new CDI Cache. The cache is populated from a set
  53. // of CDI Spec directories. These can be specified using a WithSpecDirs
  54. // option. The default set of directories is exposed in DefaultSpecDirs.
  55. func NewCache(options ...Option) (*Cache, error) {
  56. c := &Cache{
  57. autoRefresh: true,
  58. watch: &watch{},
  59. }
  60. WithSpecDirs(DefaultSpecDirs...)(c)
  61. c.Lock()
  62. defer c.Unlock()
  63. return c, c.configure(options...)
  64. }
  65. // Configure applies options to the Cache. Updates and refreshes the
  66. // Cache if options have changed.
  67. func (c *Cache) Configure(options ...Option) error {
  68. if len(options) == 0 {
  69. return nil
  70. }
  71. c.Lock()
  72. defer c.Unlock()
  73. return c.configure(options...)
  74. }
  75. // Configure the Cache. Start/stop CDI Spec directory watch, refresh
  76. // the Cache if necessary.
  77. func (c *Cache) configure(options ...Option) error {
  78. var err error
  79. for _, o := range options {
  80. if err = o(c); err != nil {
  81. return fmt.Errorf("failed to apply cache options: %w", err)
  82. }
  83. }
  84. c.dirErrors = make(map[string]error)
  85. c.watch.stop()
  86. if c.autoRefresh {
  87. c.watch.setup(c.specDirs, c.dirErrors)
  88. c.watch.start(&c.Mutex, c.refresh, c.dirErrors)
  89. }
  90. c.refresh()
  91. return nil
  92. }
  93. // Refresh rescans the CDI Spec directories and refreshes the Cache.
  94. // In manual refresh mode the cache is always refreshed. In auto-
  95. // refresh mode the cache is only refreshed if it is out of date.
  96. func (c *Cache) Refresh() error {
  97. c.Lock()
  98. defer c.Unlock()
  99. // force a refresh in manual mode
  100. if refreshed, err := c.refreshIfRequired(!c.autoRefresh); refreshed {
  101. return err
  102. }
  103. // collect and return cached errors, much like refresh() does it
  104. var result error
  105. for _, errors := range c.errors {
  106. result = multierror.Append(result, errors...)
  107. }
  108. return result
  109. }
  110. // Refresh the Cache by rescanning CDI Spec directories and files.
  111. func (c *Cache) refresh() error {
  112. var (
  113. specs = map[string][]*Spec{}
  114. devices = map[string]*Device{}
  115. conflicts = map[string]struct{}{}
  116. specErrors = map[string][]error{}
  117. result []error
  118. )
  119. // collect errors per spec file path and once globally
  120. collectError := func(err error, paths ...string) {
  121. result = append(result, err)
  122. for _, path := range paths {
  123. specErrors[path] = append(specErrors[path], err)
  124. }
  125. }
  126. // resolve conflicts based on device Spec priority (order of precedence)
  127. resolveConflict := func(name string, dev *Device, old *Device) bool {
  128. devSpec, oldSpec := dev.GetSpec(), old.GetSpec()
  129. devPrio, oldPrio := devSpec.GetPriority(), oldSpec.GetPriority()
  130. switch {
  131. case devPrio > oldPrio:
  132. return false
  133. case devPrio == oldPrio:
  134. devPath, oldPath := devSpec.GetPath(), oldSpec.GetPath()
  135. collectError(fmt.Errorf("conflicting device %q (specs %q, %q)",
  136. name, devPath, oldPath), devPath, oldPath)
  137. conflicts[name] = struct{}{}
  138. }
  139. return true
  140. }
  141. _ = scanSpecDirs(c.specDirs, func(path string, priority int, spec *Spec, err error) error {
  142. path = filepath.Clean(path)
  143. if err != nil {
  144. collectError(fmt.Errorf("failed to load CDI Spec %w", err), path)
  145. return nil
  146. }
  147. vendor := spec.GetVendor()
  148. specs[vendor] = append(specs[vendor], spec)
  149. for _, dev := range spec.devices {
  150. qualified := dev.GetQualifiedName()
  151. other, ok := devices[qualified]
  152. if ok {
  153. if resolveConflict(qualified, dev, other) {
  154. continue
  155. }
  156. }
  157. devices[qualified] = dev
  158. }
  159. return nil
  160. })
  161. for conflict := range conflicts {
  162. delete(devices, conflict)
  163. }
  164. c.specs = specs
  165. c.devices = devices
  166. c.errors = specErrors
  167. return multierror.New(result...)
  168. }
  169. // RefreshIfRequired triggers a refresh if necessary.
  170. func (c *Cache) refreshIfRequired(force bool) (bool, error) {
  171. // We need to refresh if
  172. // - it's forced by an explicitly call to Refresh() in manual mode
  173. // - a missing Spec dir appears (added to watch) in auto-refresh mode
  174. if force || (c.autoRefresh && c.watch.update(c.dirErrors)) {
  175. return true, c.refresh()
  176. }
  177. return false, nil
  178. }
  179. // InjectDevices injects the given qualified devices to an OCI Spec. It
  180. // returns any unresolvable devices and an error if injection fails for
  181. // any of the devices.
  182. func (c *Cache) InjectDevices(ociSpec *oci.Spec, devices ...string) ([]string, error) {
  183. var unresolved []string
  184. if ociSpec == nil {
  185. return devices, fmt.Errorf("can't inject devices, nil OCI Spec")
  186. }
  187. c.Lock()
  188. defer c.Unlock()
  189. c.refreshIfRequired(false)
  190. edits := &ContainerEdits{}
  191. specs := map[*Spec]struct{}{}
  192. for _, device := range devices {
  193. d := c.devices[device]
  194. if d == nil {
  195. unresolved = append(unresolved, device)
  196. continue
  197. }
  198. if _, ok := specs[d.GetSpec()]; !ok {
  199. specs[d.GetSpec()] = struct{}{}
  200. edits.Append(d.GetSpec().edits())
  201. }
  202. edits.Append(d.edits())
  203. }
  204. if unresolved != nil {
  205. return unresolved, fmt.Errorf("unresolvable CDI devices %s",
  206. strings.Join(devices, ", "))
  207. }
  208. if err := edits.Apply(ociSpec); err != nil {
  209. return nil, fmt.Errorf("failed to inject devices: %w", err)
  210. }
  211. return nil, nil
  212. }
  213. // highestPrioritySpecDir returns the Spec directory with highest priority
  214. // and its priority.
  215. func (c *Cache) highestPrioritySpecDir() (string, int) {
  216. if len(c.specDirs) == 0 {
  217. return "", -1
  218. }
  219. prio := len(c.specDirs) - 1
  220. dir := c.specDirs[prio]
  221. return dir, prio
  222. }
  223. // WriteSpec writes a Spec file with the given content into the highest
  224. // priority Spec directory. If name has a "json" or "yaml" extension it
  225. // choses the encoding. Otherwise the default YAML encoding is used.
  226. func (c *Cache) WriteSpec(raw *cdi.Spec, name string) error {
  227. var (
  228. specDir string
  229. path string
  230. prio int
  231. spec *Spec
  232. err error
  233. )
  234. specDir, prio = c.highestPrioritySpecDir()
  235. if specDir == "" {
  236. return errors.New("no Spec directories to write to")
  237. }
  238. path = filepath.Join(specDir, name)
  239. if ext := filepath.Ext(path); ext != ".json" && ext != ".yaml" {
  240. path += defaultSpecExt
  241. }
  242. spec, err = newSpec(raw, path, prio)
  243. if err != nil {
  244. return err
  245. }
  246. return spec.write(true)
  247. }
  248. // RemoveSpec removes a Spec with the given name from the highest
  249. // priority Spec directory. This function can be used to remove a
  250. // Spec previously written by WriteSpec(). If the file exists and
  251. // its removal fails RemoveSpec returns an error.
  252. func (c *Cache) RemoveSpec(name string) error {
  253. var (
  254. specDir string
  255. path string
  256. err error
  257. )
  258. specDir, _ = c.highestPrioritySpecDir()
  259. if specDir == "" {
  260. return errors.New("no Spec directories to remove from")
  261. }
  262. path = filepath.Join(specDir, name)
  263. if ext := filepath.Ext(path); ext != ".json" && ext != ".yaml" {
  264. path += defaultSpecExt
  265. }
  266. err = os.Remove(path)
  267. if err != nil && errors.Is(err, fs.ErrNotExist) {
  268. err = nil
  269. }
  270. return err
  271. }
  272. // GetDevice returns the cached device for the given qualified name.
  273. func (c *Cache) GetDevice(device string) *Device {
  274. c.Lock()
  275. defer c.Unlock()
  276. c.refreshIfRequired(false)
  277. return c.devices[device]
  278. }
  279. // ListDevices lists all cached devices by qualified name.
  280. func (c *Cache) ListDevices() []string {
  281. var devices []string
  282. c.Lock()
  283. defer c.Unlock()
  284. c.refreshIfRequired(false)
  285. for name := range c.devices {
  286. devices = append(devices, name)
  287. }
  288. sort.Strings(devices)
  289. return devices
  290. }
  291. // ListVendors lists all vendors known to the cache.
  292. func (c *Cache) ListVendors() []string {
  293. var vendors []string
  294. c.Lock()
  295. defer c.Unlock()
  296. c.refreshIfRequired(false)
  297. for vendor := range c.specs {
  298. vendors = append(vendors, vendor)
  299. }
  300. sort.Strings(vendors)
  301. return vendors
  302. }
  303. // ListClasses lists all device classes known to the cache.
  304. func (c *Cache) ListClasses() []string {
  305. var (
  306. cmap = map[string]struct{}{}
  307. classes []string
  308. )
  309. c.Lock()
  310. defer c.Unlock()
  311. c.refreshIfRequired(false)
  312. for _, specs := range c.specs {
  313. for _, spec := range specs {
  314. cmap[spec.GetClass()] = struct{}{}
  315. }
  316. }
  317. for class := range cmap {
  318. classes = append(classes, class)
  319. }
  320. sort.Strings(classes)
  321. return classes
  322. }
  323. // GetVendorSpecs returns all specs for the given vendor.
  324. func (c *Cache) GetVendorSpecs(vendor string) []*Spec {
  325. c.Lock()
  326. defer c.Unlock()
  327. c.refreshIfRequired(false)
  328. return c.specs[vendor]
  329. }
  330. // GetSpecErrors returns all errors encountered for the spec during the
  331. // last cache refresh.
  332. func (c *Cache) GetSpecErrors(spec *Spec) []error {
  333. var errors []error
  334. c.Lock()
  335. defer c.Unlock()
  336. if errs, ok := c.errors[spec.GetPath()]; ok {
  337. errors = make([]error, len(errs))
  338. copy(errors, errs)
  339. }
  340. return errors
  341. }
  342. // GetErrors returns all errors encountered during the last
  343. // cache refresh.
  344. func (c *Cache) GetErrors() map[string][]error {
  345. c.Lock()
  346. defer c.Unlock()
  347. errors := map[string][]error{}
  348. for path, errs := range c.errors {
  349. errors[path] = errs
  350. }
  351. for path, err := range c.dirErrors {
  352. errors[path] = []error{err}
  353. }
  354. return errors
  355. }
  356. // GetSpecDirectories returns the CDI Spec directories currently in use.
  357. func (c *Cache) GetSpecDirectories() []string {
  358. c.Lock()
  359. defer c.Unlock()
  360. dirs := make([]string, len(c.specDirs))
  361. copy(dirs, c.specDirs)
  362. return dirs
  363. }
  364. // GetSpecDirErrors returns any errors related to configured Spec directories.
  365. func (c *Cache) GetSpecDirErrors() map[string]error {
  366. if c.dirErrors == nil {
  367. return nil
  368. }
  369. c.Lock()
  370. defer c.Unlock()
  371. errors := make(map[string]error)
  372. for dir, err := range c.dirErrors {
  373. errors[dir] = err
  374. }
  375. return errors
  376. }
  377. // Our fsnotify helper wrapper.
  378. type watch struct {
  379. watcher *fsnotify.Watcher
  380. tracked map[string]bool
  381. }
  382. // Setup monitoring for the given Spec directories.
  383. func (w *watch) setup(dirs []string, dirErrors map[string]error) {
  384. var (
  385. dir string
  386. err error
  387. )
  388. w.tracked = make(map[string]bool)
  389. for _, dir = range dirs {
  390. w.tracked[dir] = false
  391. }
  392. w.watcher, err = fsnotify.NewWatcher()
  393. if err != nil {
  394. for _, dir := range dirs {
  395. dirErrors[dir] = fmt.Errorf("failed to create watcher: %w", err)
  396. }
  397. return
  398. }
  399. w.update(dirErrors)
  400. }
  401. // Start watching Spec directories for relevant changes.
  402. func (w *watch) start(m *sync.Mutex, refresh func() error, dirErrors map[string]error) {
  403. go w.watch(w.watcher, m, refresh, dirErrors)
  404. }
  405. // Stop watching directories.
  406. func (w *watch) stop() {
  407. if w.watcher == nil {
  408. return
  409. }
  410. w.watcher.Close()
  411. w.tracked = nil
  412. }
  413. // Watch Spec directory changes, triggering a refresh if necessary.
  414. func (w *watch) watch(fsw *fsnotify.Watcher, m *sync.Mutex, refresh func() error, dirErrors map[string]error) {
  415. watch := fsw
  416. if watch == nil {
  417. return
  418. }
  419. for {
  420. select {
  421. case event, ok := <-watch.Events:
  422. if !ok {
  423. return
  424. }
  425. if (event.Op & (fsnotify.Rename | fsnotify.Remove | fsnotify.Write)) == 0 {
  426. continue
  427. }
  428. if event.Op == fsnotify.Write {
  429. if ext := filepath.Ext(event.Name); ext != ".json" && ext != ".yaml" {
  430. continue
  431. }
  432. }
  433. m.Lock()
  434. if event.Op == fsnotify.Remove && w.tracked[event.Name] {
  435. w.update(dirErrors, event.Name)
  436. } else {
  437. w.update(dirErrors)
  438. }
  439. refresh()
  440. m.Unlock()
  441. case _, ok := <-watch.Errors:
  442. if !ok {
  443. return
  444. }
  445. }
  446. }
  447. }
  448. // Update watch with pending/missing or removed directories.
  449. func (w *watch) update(dirErrors map[string]error, removed ...string) bool {
  450. var (
  451. dir string
  452. ok bool
  453. err error
  454. update bool
  455. )
  456. for dir, ok = range w.tracked {
  457. if ok {
  458. continue
  459. }
  460. err = w.watcher.Add(dir)
  461. if err == nil {
  462. w.tracked[dir] = true
  463. delete(dirErrors, dir)
  464. update = true
  465. } else {
  466. w.tracked[dir] = false
  467. dirErrors[dir] = fmt.Errorf("failed to monitor for changes: %w", err)
  468. }
  469. }
  470. for _, dir = range removed {
  471. w.tracked[dir] = false
  472. dirErrors[dir] = errors.New("directory removed")
  473. update = true
  474. }
  475. return update
  476. }