manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "regexp"
  10. "runtime"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "github.com/docker/distribution/reference"
  15. "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/image"
  17. "github.com/docker/docker/layer"
  18. "github.com/docker/docker/pkg/authorization"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/pkg/pubsub"
  21. "github.com/docker/docker/pkg/system"
  22. "github.com/docker/docker/plugin/v2"
  23. "github.com/docker/docker/registry"
  24. "github.com/opencontainers/go-digest"
  25. specs "github.com/opencontainers/runtime-spec/specs-go"
  26. "github.com/pkg/errors"
  27. "github.com/sirupsen/logrus"
  28. )
  29. const configFileName = "config.json"
  30. const rootFSFileName = "rootfs"
  31. var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
  32. // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
  33. type Executor interface {
  34. Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
  35. Restore(id string, stdout, stderr io.WriteCloser) error
  36. IsRunning(id string) (bool, error)
  37. Signal(id string, signal int) error
  38. }
  39. func (pm *Manager) restorePlugin(p *v2.Plugin) error {
  40. if p.IsEnabled() {
  41. return pm.restore(p)
  42. }
  43. return nil
  44. }
  45. type eventLogger func(id, name, action string)
  46. // ManagerConfig defines configuration needed to start new manager.
  47. type ManagerConfig struct {
  48. Store *Store // remove
  49. RegistryService registry.Service
  50. LiveRestoreEnabled bool // TODO: remove
  51. LogPluginEvent eventLogger
  52. Root string
  53. ExecRoot string
  54. CreateExecutor ExecutorCreator
  55. AuthzMiddleware *authorization.Middleware
  56. }
  57. // ExecutorCreator is used in the manager config to pass in an `Executor`
  58. type ExecutorCreator func(*Manager) (Executor, error)
  59. // Manager controls the plugin subsystem.
  60. type Manager struct {
  61. config ManagerConfig
  62. mu sync.RWMutex // protects cMap
  63. muGC sync.RWMutex // protects blobstore deletions
  64. cMap map[*v2.Plugin]*controller
  65. blobStore *basicBlobStore
  66. publisher *pubsub.Publisher
  67. executor Executor
  68. }
  69. // controller represents the manager's control on a plugin.
  70. type controller struct {
  71. restart bool
  72. exitChan chan bool
  73. timeoutInSecs int
  74. }
  75. // pluginRegistryService ensures that all resolved repositories
  76. // are of the plugin class.
  77. type pluginRegistryService struct {
  78. registry.Service
  79. }
  80. func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
  81. repoInfo, err = s.Service.ResolveRepository(name)
  82. if repoInfo != nil {
  83. repoInfo.Class = "plugin"
  84. }
  85. return
  86. }
  87. // NewManager returns a new plugin manager.
  88. func NewManager(config ManagerConfig) (*Manager, error) {
  89. if config.RegistryService != nil {
  90. config.RegistryService = pluginRegistryService{config.RegistryService}
  91. }
  92. manager := &Manager{
  93. config: config,
  94. }
  95. for _, dirName := range []string{manager.config.Root, manager.config.ExecRoot, manager.tmpDir()} {
  96. if err := os.MkdirAll(dirName, 0700); err != nil {
  97. return nil, errors.Wrapf(err, "failed to mkdir %v", dirName)
  98. }
  99. }
  100. var err error
  101. manager.executor, err = config.CreateExecutor(manager)
  102. if err != nil {
  103. return nil, err
  104. }
  105. manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
  106. if err != nil {
  107. return nil, err
  108. }
  109. manager.cMap = make(map[*v2.Plugin]*controller)
  110. if err := manager.reload(); err != nil {
  111. return nil, errors.Wrap(err, "failed to restore plugins")
  112. }
  113. manager.publisher = pubsub.NewPublisher(0, 0)
  114. return manager, nil
  115. }
  116. func (pm *Manager) tmpDir() string {
  117. return filepath.Join(pm.config.Root, "tmp")
  118. }
  119. // HandleExitEvent is called when the executor receives the exit event
  120. // In the future we may change this, but for now all we care about is the exit event.
  121. func (pm *Manager) HandleExitEvent(id string) error {
  122. p, err := pm.config.Store.GetV2Plugin(id)
  123. if err != nil {
  124. return err
  125. }
  126. os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
  127. pm.mu.RLock()
  128. c := pm.cMap[p]
  129. if c.exitChan != nil {
  130. close(c.exitChan)
  131. }
  132. restart := c.restart
  133. pm.mu.RUnlock()
  134. if restart {
  135. pm.enable(p, c, true)
  136. }
  137. return nil
  138. }
  139. func handleLoadError(err error, id string) {
  140. if err == nil {
  141. return
  142. }
  143. logger := logrus.WithError(err).WithField("id", id)
  144. if os.IsNotExist(errors.Cause(err)) {
  145. // Likely some error while removing on an older version of docker
  146. logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
  147. return
  148. }
  149. logger.Error("error loading plugin, skipping")
  150. }
  151. func (pm *Manager) reload() error { // todo: restore
  152. dir, err := ioutil.ReadDir(pm.config.Root)
  153. if err != nil {
  154. return errors.Wrapf(err, "failed to read %v", pm.config.Root)
  155. }
  156. plugins := make(map[string]*v2.Plugin)
  157. for _, v := range dir {
  158. if validFullID.MatchString(v.Name()) {
  159. p, err := pm.loadPlugin(v.Name())
  160. if err != nil {
  161. handleLoadError(err, v.Name())
  162. continue
  163. }
  164. plugins[p.GetID()] = p
  165. } else {
  166. if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
  167. // There was likely some error while removing this plugin, let's try to remove again here
  168. if err := system.EnsureRemoveAll(v.Name()); err != nil {
  169. logrus.WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
  170. }
  171. }
  172. }
  173. }
  174. pm.config.Store.SetAll(plugins)
  175. var wg sync.WaitGroup
  176. wg.Add(len(plugins))
  177. for _, p := range plugins {
  178. c := &controller{} // todo: remove this
  179. pm.cMap[p] = c
  180. go func(p *v2.Plugin) {
  181. defer wg.Done()
  182. if err := pm.restorePlugin(p); err != nil {
  183. logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
  184. return
  185. }
  186. if p.Rootfs != "" {
  187. p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
  188. }
  189. // We should only enable rootfs propagation for certain plugin types that need it.
  190. for _, typ := range p.PluginObj.Config.Interface.Types {
  191. if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
  192. if p.PluginObj.Config.PropagatedMount != "" {
  193. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  194. // check if we need to migrate an older propagated mount from before
  195. // these mounts were stored outside the plugin rootfs
  196. if _, err := os.Stat(propRoot); os.IsNotExist(err) {
  197. rootfsProp := filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
  198. if _, err := os.Stat(rootfsProp); err == nil {
  199. if err := os.Rename(rootfsProp, propRoot); err != nil {
  200. logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
  201. }
  202. }
  203. }
  204. if err := os.MkdirAll(propRoot, 0755); err != nil {
  205. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
  206. }
  207. }
  208. }
  209. }
  210. pm.save(p)
  211. requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
  212. if requiresManualRestore {
  213. // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
  214. if err := pm.enable(p, c, true); err != nil {
  215. logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
  216. }
  217. }
  218. }(p)
  219. }
  220. wg.Wait()
  221. return nil
  222. }
  223. // Get looks up the requested plugin in the store.
  224. func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
  225. return pm.config.Store.GetV2Plugin(idOrName)
  226. }
  227. func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
  228. p := filepath.Join(pm.config.Root, id, configFileName)
  229. dt, err := ioutil.ReadFile(p)
  230. if err != nil {
  231. return nil, errors.Wrapf(err, "error reading %v", p)
  232. }
  233. var plugin v2.Plugin
  234. if err := json.Unmarshal(dt, &plugin); err != nil {
  235. return nil, errors.Wrapf(err, "error decoding %v", p)
  236. }
  237. return &plugin, nil
  238. }
  239. func (pm *Manager) save(p *v2.Plugin) error {
  240. pluginJSON, err := json.Marshal(p)
  241. if err != nil {
  242. return errors.Wrap(err, "failed to marshal plugin json")
  243. }
  244. if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
  245. return errors.Wrap(err, "failed to write atomically plugin json")
  246. }
  247. return nil
  248. }
  249. // GC cleans up unreferenced blobs. This is recommended to run in a goroutine
  250. func (pm *Manager) GC() {
  251. pm.muGC.Lock()
  252. defer pm.muGC.Unlock()
  253. whitelist := make(map[digest.Digest]struct{})
  254. for _, p := range pm.config.Store.GetAll() {
  255. whitelist[p.Config] = struct{}{}
  256. for _, b := range p.Blobsums {
  257. whitelist[b] = struct{}{}
  258. }
  259. }
  260. pm.blobStore.gc(whitelist)
  261. }
  262. type logHook struct{ id string }
  263. func (logHook) Levels() []logrus.Level {
  264. return logrus.AllLevels
  265. }
  266. func (l logHook) Fire(entry *logrus.Entry) error {
  267. entry.Data = logrus.Fields{"plugin": l.id}
  268. return nil
  269. }
  270. func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
  271. logger := logrus.New()
  272. logger.Hooks.Add(logHook{id})
  273. return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
  274. }
  275. func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
  276. if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
  277. return errors.New("incorrect privileges")
  278. }
  279. return nil
  280. }
  281. func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
  282. if len(arrOne) != len(arrOther) {
  283. return false
  284. }
  285. sort.Sort(arrOne)
  286. sort.Sort(arrOther)
  287. for i := 1; i < arrOne.Len(); i++ {
  288. if !compare(arrOne[i], arrOther[i]) {
  289. return false
  290. }
  291. }
  292. return true
  293. }
  294. func isEqualPrivilege(a, b types.PluginPrivilege) bool {
  295. if a.Name != b.Name {
  296. return false
  297. }
  298. return reflect.DeepEqual(a.Value, b.Value)
  299. }
  300. func configToRootFS(c []byte) (*image.RootFS, string, error) {
  301. // TODO @jhowardmsft LCOW - Will need to revisit this.
  302. os := runtime.GOOS
  303. var pluginConfig types.PluginConfig
  304. if err := json.Unmarshal(c, &pluginConfig); err != nil {
  305. return nil, "", err
  306. }
  307. // validation for empty rootfs is in distribution code
  308. if pluginConfig.Rootfs == nil {
  309. return nil, os, nil
  310. }
  311. return rootFSFromPlugin(pluginConfig.Rootfs), os, nil
  312. }
  313. func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
  314. rootFS := image.RootFS{
  315. Type: pluginfs.Type,
  316. DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
  317. }
  318. for i := range pluginfs.DiffIds {
  319. rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
  320. }
  321. return &rootFS
  322. }