manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "regexp"
  10. "runtime"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "github.com/docker/distribution/reference"
  15. "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/image"
  17. "github.com/docker/docker/layer"
  18. "github.com/docker/docker/pkg/authorization"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/pkg/mount"
  21. "github.com/docker/docker/pkg/pubsub"
  22. "github.com/docker/docker/pkg/system"
  23. "github.com/docker/docker/plugin/v2"
  24. "github.com/docker/docker/registry"
  25. "github.com/opencontainers/go-digest"
  26. specs "github.com/opencontainers/runtime-spec/specs-go"
  27. "github.com/pkg/errors"
  28. "github.com/sirupsen/logrus"
  29. )
  30. const configFileName = "config.json"
  31. const rootFSFileName = "rootfs"
  32. var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
  33. // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
  34. type Executor interface {
  35. Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
  36. Restore(id string, stdout, stderr io.WriteCloser) error
  37. IsRunning(id string) (bool, error)
  38. Signal(id string, signal int) error
  39. }
  40. func (pm *Manager) restorePlugin(p *v2.Plugin) error {
  41. if p.IsEnabled() {
  42. return pm.restore(p)
  43. }
  44. return nil
  45. }
  46. type eventLogger func(id, name, action string)
  47. // ManagerConfig defines configuration needed to start new manager.
  48. type ManagerConfig struct {
  49. Store *Store // remove
  50. RegistryService registry.Service
  51. LiveRestoreEnabled bool // TODO: remove
  52. LogPluginEvent eventLogger
  53. Root string
  54. ExecRoot string
  55. CreateExecutor ExecutorCreator
  56. AuthzMiddleware *authorization.Middleware
  57. }
  58. // ExecutorCreator is used in the manager config to pass in an `Executor`
  59. type ExecutorCreator func(*Manager) (Executor, error)
  60. // Manager controls the plugin subsystem.
  61. type Manager struct {
  62. config ManagerConfig
  63. mu sync.RWMutex // protects cMap
  64. muGC sync.RWMutex // protects blobstore deletions
  65. cMap map[*v2.Plugin]*controller
  66. blobStore *basicBlobStore
  67. publisher *pubsub.Publisher
  68. executor Executor
  69. }
  70. // controller represents the manager's control on a plugin.
  71. type controller struct {
  72. restart bool
  73. exitChan chan bool
  74. timeoutInSecs int
  75. }
  76. // pluginRegistryService ensures that all resolved repositories
  77. // are of the plugin class.
  78. type pluginRegistryService struct {
  79. registry.Service
  80. }
  81. func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
  82. repoInfo, err = s.Service.ResolveRepository(name)
  83. if repoInfo != nil {
  84. repoInfo.Class = "plugin"
  85. }
  86. return
  87. }
  88. // NewManager returns a new plugin manager.
  89. func NewManager(config ManagerConfig) (*Manager, error) {
  90. if config.RegistryService != nil {
  91. config.RegistryService = pluginRegistryService{config.RegistryService}
  92. }
  93. manager := &Manager{
  94. config: config,
  95. }
  96. for _, dirName := range []string{manager.config.Root, manager.config.ExecRoot, manager.tmpDir()} {
  97. if err := os.MkdirAll(dirName, 0700); err != nil {
  98. return nil, errors.Wrapf(err, "failed to mkdir %v", dirName)
  99. }
  100. }
  101. if err := setupRoot(manager.config.Root); err != nil {
  102. return nil, err
  103. }
  104. var err error
  105. manager.executor, err = config.CreateExecutor(manager)
  106. if err != nil {
  107. return nil, err
  108. }
  109. manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
  110. if err != nil {
  111. return nil, err
  112. }
  113. manager.cMap = make(map[*v2.Plugin]*controller)
  114. if err := manager.reload(); err != nil {
  115. return nil, errors.Wrap(err, "failed to restore plugins")
  116. }
  117. manager.publisher = pubsub.NewPublisher(0, 0)
  118. return manager, nil
  119. }
  120. func (pm *Manager) tmpDir() string {
  121. return filepath.Join(pm.config.Root, "tmp")
  122. }
  123. // HandleExitEvent is called when the executor receives the exit event
  124. // In the future we may change this, but for now all we care about is the exit event.
  125. func (pm *Manager) HandleExitEvent(id string) error {
  126. p, err := pm.config.Store.GetV2Plugin(id)
  127. if err != nil {
  128. return err
  129. }
  130. os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
  131. if p.PropagatedMount != "" {
  132. if err := mount.Unmount(p.PropagatedMount); err != nil {
  133. logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
  134. }
  135. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  136. if err := mount.Unmount(propRoot); err != nil {
  137. logrus.Warn("Could not unmount %s: %v", propRoot, err)
  138. }
  139. }
  140. pm.mu.RLock()
  141. c := pm.cMap[p]
  142. if c.exitChan != nil {
  143. close(c.exitChan)
  144. }
  145. restart := c.restart
  146. pm.mu.RUnlock()
  147. if restart {
  148. pm.enable(p, c, true)
  149. }
  150. return nil
  151. }
  152. func handleLoadError(err error, id string) {
  153. if err == nil {
  154. return
  155. }
  156. logger := logrus.WithError(err).WithField("id", id)
  157. if os.IsNotExist(errors.Cause(err)) {
  158. // Likely some error while removing on an older version of docker
  159. logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
  160. return
  161. }
  162. logger.Error("error loading plugin, skipping")
  163. }
  164. func (pm *Manager) reload() error { // todo: restore
  165. dir, err := ioutil.ReadDir(pm.config.Root)
  166. if err != nil {
  167. return errors.Wrapf(err, "failed to read %v", pm.config.Root)
  168. }
  169. plugins := make(map[string]*v2.Plugin)
  170. for _, v := range dir {
  171. if validFullID.MatchString(v.Name()) {
  172. p, err := pm.loadPlugin(v.Name())
  173. if err != nil {
  174. handleLoadError(err, v.Name())
  175. continue
  176. }
  177. plugins[p.GetID()] = p
  178. } else {
  179. if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
  180. // There was likely some error while removing this plugin, let's try to remove again here
  181. if err := system.EnsureRemoveAll(v.Name()); err != nil {
  182. logrus.WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
  183. }
  184. }
  185. }
  186. }
  187. pm.config.Store.SetAll(plugins)
  188. var wg sync.WaitGroup
  189. wg.Add(len(plugins))
  190. for _, p := range plugins {
  191. c := &controller{} // todo: remove this
  192. pm.cMap[p] = c
  193. go func(p *v2.Plugin) {
  194. defer wg.Done()
  195. if err := pm.restorePlugin(p); err != nil {
  196. logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
  197. return
  198. }
  199. if p.Rootfs != "" {
  200. p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
  201. }
  202. // We should only enable rootfs propagation for certain plugin types that need it.
  203. for _, typ := range p.PluginObj.Config.Interface.Types {
  204. if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
  205. if p.PluginObj.Config.PropagatedMount != "" {
  206. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  207. // check if we need to migrate an older propagated mount from before
  208. // these mounts were stored outside the plugin rootfs
  209. if _, err := os.Stat(propRoot); os.IsNotExist(err) {
  210. if _, err := os.Stat(p.PropagatedMount); err == nil {
  211. // make sure nothing is mounted here
  212. // don't care about errors
  213. mount.Unmount(p.PropagatedMount)
  214. if err := os.Rename(p.PropagatedMount, propRoot); err != nil {
  215. logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
  216. }
  217. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  218. logrus.WithError(err).WithField("dir", p.PropagatedMount).Error("error migrating propagated mount storage")
  219. }
  220. }
  221. }
  222. if err := os.MkdirAll(propRoot, 0755); err != nil {
  223. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
  224. }
  225. // TODO: sanitize PropagatedMount and prevent breakout
  226. p.PropagatedMount = filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
  227. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  228. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", p.PropagatedMount, err)
  229. return
  230. }
  231. }
  232. }
  233. }
  234. pm.save(p)
  235. requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
  236. if requiresManualRestore {
  237. // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
  238. if err := pm.enable(p, c, true); err != nil {
  239. logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
  240. }
  241. }
  242. }(p)
  243. }
  244. wg.Wait()
  245. return nil
  246. }
  247. // Get looks up the requested plugin in the store.
  248. func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
  249. return pm.config.Store.GetV2Plugin(idOrName)
  250. }
  251. func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
  252. p := filepath.Join(pm.config.Root, id, configFileName)
  253. dt, err := ioutil.ReadFile(p)
  254. if err != nil {
  255. return nil, errors.Wrapf(err, "error reading %v", p)
  256. }
  257. var plugin v2.Plugin
  258. if err := json.Unmarshal(dt, &plugin); err != nil {
  259. return nil, errors.Wrapf(err, "error decoding %v", p)
  260. }
  261. return &plugin, nil
  262. }
  263. func (pm *Manager) save(p *v2.Plugin) error {
  264. pluginJSON, err := json.Marshal(p)
  265. if err != nil {
  266. return errors.Wrap(err, "failed to marshal plugin json")
  267. }
  268. if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
  269. return errors.Wrap(err, "failed to write atomically plugin json")
  270. }
  271. return nil
  272. }
  273. // GC cleans up unreferenced blobs. This is recommended to run in a goroutine
  274. func (pm *Manager) GC() {
  275. pm.muGC.Lock()
  276. defer pm.muGC.Unlock()
  277. whitelist := make(map[digest.Digest]struct{})
  278. for _, p := range pm.config.Store.GetAll() {
  279. whitelist[p.Config] = struct{}{}
  280. for _, b := range p.Blobsums {
  281. whitelist[b] = struct{}{}
  282. }
  283. }
  284. pm.blobStore.gc(whitelist)
  285. }
  286. type logHook struct{ id string }
  287. func (logHook) Levels() []logrus.Level {
  288. return logrus.AllLevels
  289. }
  290. func (l logHook) Fire(entry *logrus.Entry) error {
  291. entry.Data = logrus.Fields{"plugin": l.id}
  292. return nil
  293. }
  294. func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
  295. logger := logrus.New()
  296. logger.Hooks.Add(logHook{id})
  297. return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
  298. }
  299. func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
  300. if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
  301. return errors.New("incorrect privileges")
  302. }
  303. return nil
  304. }
  305. func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
  306. if len(arrOne) != len(arrOther) {
  307. return false
  308. }
  309. sort.Sort(arrOne)
  310. sort.Sort(arrOther)
  311. for i := 1; i < arrOne.Len(); i++ {
  312. if !compare(arrOne[i], arrOther[i]) {
  313. return false
  314. }
  315. }
  316. return true
  317. }
  318. func isEqualPrivilege(a, b types.PluginPrivilege) bool {
  319. if a.Name != b.Name {
  320. return false
  321. }
  322. return reflect.DeepEqual(a.Value, b.Value)
  323. }
  324. func configToRootFS(c []byte) (*image.RootFS, layer.OS, error) {
  325. // TODO @jhowardmsft LCOW - Will need to revisit this. For now, calculate the operating system.
  326. os := layer.OS(runtime.GOOS)
  327. if system.LCOWSupported() {
  328. os = "linux"
  329. }
  330. var pluginConfig types.PluginConfig
  331. if err := json.Unmarshal(c, &pluginConfig); err != nil {
  332. return nil, "", err
  333. }
  334. // validation for empty rootfs is in distribution code
  335. if pluginConfig.Rootfs == nil {
  336. return nil, os, nil
  337. }
  338. return rootFSFromPlugin(pluginConfig.Rootfs), os, nil
  339. }
  340. func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
  341. rootFS := image.RootFS{
  342. Type: pluginfs.Type,
  343. DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
  344. }
  345. for i := range pluginfs.DiffIds {
  346. rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
  347. }
  348. return &rootFS
  349. }