manager.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/libcontainerd"
  10. "github.com/docker/docker/plugin/store"
  11. "github.com/docker/docker/plugin/v2"
  12. "github.com/docker/docker/registry"
  13. )
  14. var (
  15. manager *Manager
  16. )
  17. func (pm *Manager) restorePlugin(p *v2.Plugin) error {
  18. p.RuntimeSourcePath = filepath.Join(pm.runRoot, p.GetID())
  19. if p.IsEnabled() {
  20. return pm.restore(p)
  21. }
  22. return nil
  23. }
  24. type eventLogger func(id, name, action string)
  25. // Manager controls the plugin subsystem.
  26. type Manager struct {
  27. libRoot string
  28. runRoot string
  29. pluginStore *store.Store
  30. containerdClient libcontainerd.Client
  31. registryService registry.Service
  32. liveRestore bool
  33. pluginEventLogger eventLogger
  34. }
  35. // GetManager returns the singleton plugin Manager
  36. func GetManager() *Manager {
  37. return manager
  38. }
  39. // Init (was NewManager) instantiates the singleton Manager.
  40. // TODO: revert this to NewManager once we get rid of all the singletons.
  41. func Init(root string, ps *store.Store, remote libcontainerd.Remote, rs registry.Service, liveRestore bool, evL eventLogger) (err error) {
  42. if manager != nil {
  43. return nil
  44. }
  45. root = filepath.Join(root, "plugins")
  46. manager = &Manager{
  47. libRoot: root,
  48. runRoot: "/run/docker",
  49. pluginStore: ps,
  50. registryService: rs,
  51. liveRestore: liveRestore,
  52. pluginEventLogger: evL,
  53. }
  54. if err := os.MkdirAll(manager.runRoot, 0700); err != nil {
  55. return err
  56. }
  57. manager.containerdClient, err = remote.Client(manager)
  58. if err != nil {
  59. return err
  60. }
  61. if err := manager.init(); err != nil {
  62. return err
  63. }
  64. return nil
  65. }
  66. // StateChanged updates plugin internals using libcontainerd events.
  67. func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
  68. logrus.Debugf("plugin state changed %s %#v", id, e)
  69. switch e.State {
  70. case libcontainerd.StateExit:
  71. p, err := pm.pluginStore.GetByID(id)
  72. if err != nil {
  73. return err
  74. }
  75. p.RLock()
  76. if p.ExitChan != nil {
  77. close(p.ExitChan)
  78. }
  79. restart := p.Restart
  80. p.RUnlock()
  81. p.RemoveFromDisk()
  82. if restart {
  83. pm.enable(p, true)
  84. }
  85. }
  86. return nil
  87. }
  88. func (pm *Manager) init() error {
  89. dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
  90. if err != nil {
  91. if os.IsNotExist(err) {
  92. return nil
  93. }
  94. return err
  95. }
  96. defer dt.Close()
  97. plugins := make(map[string]*v2.Plugin)
  98. if err := json.NewDecoder(dt).Decode(&plugins); err != nil {
  99. return err
  100. }
  101. pm.pluginStore.SetAll(plugins)
  102. var group sync.WaitGroup
  103. group.Add(len(plugins))
  104. for _, p := range plugins {
  105. go func(p *v2.Plugin) {
  106. defer group.Done()
  107. if err := pm.restorePlugin(p); err != nil {
  108. logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
  109. return
  110. }
  111. pm.pluginStore.Update(p)
  112. requiresManualRestore := !pm.liveRestore && p.IsEnabled()
  113. if requiresManualRestore {
  114. // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
  115. if err := pm.enable(p, true); err != nil {
  116. logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
  117. }
  118. }
  119. }(p)
  120. }
  121. group.Wait()
  122. return nil
  123. }
  124. type logHook struct{ id string }
  125. func (logHook) Levels() []logrus.Level {
  126. return logrus.AllLevels
  127. }
  128. func (l logHook) Fire(entry *logrus.Entry) error {
  129. entry.Data = logrus.Fields{"plugin": l.id}
  130. return nil
  131. }
  132. func attachToLog(id string) func(libcontainerd.IOPipe) error {
  133. return func(iop libcontainerd.IOPipe) error {
  134. iop.Stdin.Close()
  135. logger := logrus.New()
  136. logger.Hooks.Add(logHook{id})
  137. // TODO: cache writer per id
  138. w := logger.Writer()
  139. go func() {
  140. io.Copy(w, iop.Stdout)
  141. }()
  142. go func() {
  143. // TODO: update logrus and use logger.WriterLevel
  144. io.Copy(w, iop.Stderr)
  145. }()
  146. return nil
  147. }
  148. }