manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "regexp"
  10. "runtime"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "github.com/docker/distribution/reference"
  15. "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/image"
  17. "github.com/docker/docker/layer"
  18. "github.com/docker/docker/pkg/authorization"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/pkg/mount"
  21. "github.com/docker/docker/pkg/pubsub"
  22. "github.com/docker/docker/pkg/system"
  23. "github.com/docker/docker/plugin/v2"
  24. "github.com/docker/docker/registry"
  25. "github.com/opencontainers/go-digest"
  26. specs "github.com/opencontainers/runtime-spec/specs-go"
  27. "github.com/pkg/errors"
  28. "github.com/sirupsen/logrus"
  29. )
  30. const configFileName = "config.json"
  31. const rootFSFileName = "rootfs"
  32. var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
  33. // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
  34. type Executor interface {
  35. Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
  36. Restore(id string, stdout, stderr io.WriteCloser) error
  37. IsRunning(id string) (bool, error)
  38. Signal(id string, signal int) error
  39. }
  40. func (pm *Manager) restorePlugin(p *v2.Plugin) error {
  41. if p.IsEnabled() {
  42. return pm.restore(p)
  43. }
  44. return nil
  45. }
  46. type eventLogger func(id, name, action string)
  47. // ManagerConfig defines configuration needed to start new manager.
  48. type ManagerConfig struct {
  49. Store *Store // remove
  50. RegistryService registry.Service
  51. LiveRestoreEnabled bool // TODO: remove
  52. LogPluginEvent eventLogger
  53. Root string
  54. ExecRoot string
  55. CreateExecutor ExecutorCreator
  56. AuthzMiddleware *authorization.Middleware
  57. }
  58. // ExecutorCreator is used in the manager config to pass in an `Executor`
  59. type ExecutorCreator func(*Manager) (Executor, error)
  60. // Manager controls the plugin subsystem.
  61. type Manager struct {
  62. config ManagerConfig
  63. mu sync.RWMutex // protects cMap
  64. muGC sync.RWMutex // protects blobstore deletions
  65. cMap map[*v2.Plugin]*controller
  66. blobStore *basicBlobStore
  67. publisher *pubsub.Publisher
  68. executor Executor
  69. }
  70. // controller represents the manager's control on a plugin.
  71. type controller struct {
  72. restart bool
  73. exitChan chan bool
  74. timeoutInSecs int
  75. }
  76. // pluginRegistryService ensures that all resolved repositories
  77. // are of the plugin class.
  78. type pluginRegistryService struct {
  79. registry.Service
  80. }
  81. func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
  82. repoInfo, err = s.Service.ResolveRepository(name)
  83. if repoInfo != nil {
  84. repoInfo.Class = "plugin"
  85. }
  86. return
  87. }
  88. // NewManager returns a new plugin manager.
  89. func NewManager(config ManagerConfig) (*Manager, error) {
  90. if config.RegistryService != nil {
  91. config.RegistryService = pluginRegistryService{config.RegistryService}
  92. }
  93. manager := &Manager{
  94. config: config,
  95. }
  96. if err := os.MkdirAll(manager.config.Root, 0700); err != nil {
  97. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.Root)
  98. }
  99. if err := os.MkdirAll(manager.config.ExecRoot, 0700); err != nil {
  100. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.ExecRoot)
  101. }
  102. if err := os.MkdirAll(manager.tmpDir(), 0700); err != nil {
  103. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.tmpDir())
  104. }
  105. if err := setupRoot(manager.config.Root); err != nil {
  106. return nil, err
  107. }
  108. var err error
  109. manager.executor, err = config.CreateExecutor(manager)
  110. if err != nil {
  111. return nil, err
  112. }
  113. manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
  114. if err != nil {
  115. return nil, err
  116. }
  117. manager.cMap = make(map[*v2.Plugin]*controller)
  118. if err := manager.reload(); err != nil {
  119. return nil, errors.Wrap(err, "failed to restore plugins")
  120. }
  121. manager.publisher = pubsub.NewPublisher(0, 0)
  122. return manager, nil
  123. }
  124. func (pm *Manager) tmpDir() string {
  125. return filepath.Join(pm.config.Root, "tmp")
  126. }
  127. // HandleExitEvent is called when the executor receives the exit event
  128. // In the future we may change this, but for now all we care about is the exit event.
  129. func (pm *Manager) HandleExitEvent(id string) error {
  130. p, err := pm.config.Store.GetV2Plugin(id)
  131. if err != nil {
  132. return err
  133. }
  134. os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
  135. if p.PropagatedMount != "" {
  136. if err := mount.Unmount(p.PropagatedMount); err != nil {
  137. logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
  138. }
  139. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  140. if err := mount.Unmount(propRoot); err != nil {
  141. logrus.Warn("Could not unmount %s: %v", propRoot, err)
  142. }
  143. }
  144. pm.mu.RLock()
  145. c := pm.cMap[p]
  146. if c.exitChan != nil {
  147. close(c.exitChan)
  148. }
  149. restart := c.restart
  150. pm.mu.RUnlock()
  151. if restart {
  152. pm.enable(p, c, true)
  153. }
  154. return nil
  155. }
  156. func handleLoadError(err error, id string) {
  157. if err == nil {
  158. return
  159. }
  160. logger := logrus.WithError(err).WithField("id", id)
  161. if os.IsNotExist(errors.Cause(err)) {
  162. // Likely some error while removing on an older version of docker
  163. logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
  164. return
  165. }
  166. logger.Error("error loading plugin, skipping")
  167. }
  168. func (pm *Manager) reload() error { // todo: restore
  169. dir, err := ioutil.ReadDir(pm.config.Root)
  170. if err != nil {
  171. return errors.Wrapf(err, "failed to read %v", pm.config.Root)
  172. }
  173. plugins := make(map[string]*v2.Plugin)
  174. for _, v := range dir {
  175. if validFullID.MatchString(v.Name()) {
  176. p, err := pm.loadPlugin(v.Name())
  177. if err != nil {
  178. handleLoadError(err, v.Name())
  179. continue
  180. }
  181. plugins[p.GetID()] = p
  182. } else {
  183. if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
  184. // There was likely some error while removing this plugin, let's try to remove again here
  185. if err := system.EnsureRemoveAll(v.Name()); err != nil {
  186. logrus.WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
  187. }
  188. }
  189. }
  190. }
  191. pm.config.Store.SetAll(plugins)
  192. var wg sync.WaitGroup
  193. wg.Add(len(plugins))
  194. for _, p := range plugins {
  195. c := &controller{} // todo: remove this
  196. pm.cMap[p] = c
  197. go func(p *v2.Plugin) {
  198. defer wg.Done()
  199. if err := pm.restorePlugin(p); err != nil {
  200. logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
  201. return
  202. }
  203. if p.Rootfs != "" {
  204. p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
  205. }
  206. // We should only enable rootfs propagation for certain plugin types that need it.
  207. for _, typ := range p.PluginObj.Config.Interface.Types {
  208. if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
  209. if p.PluginObj.Config.PropagatedMount != "" {
  210. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  211. // check if we need to migrate an older propagated mount from before
  212. // these mounts were stored outside the plugin rootfs
  213. if _, err := os.Stat(propRoot); os.IsNotExist(err) {
  214. if _, err := os.Stat(p.PropagatedMount); err == nil {
  215. // make sure nothing is mounted here
  216. // don't care about errors
  217. mount.Unmount(p.PropagatedMount)
  218. if err := os.Rename(p.PropagatedMount, propRoot); err != nil {
  219. logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
  220. }
  221. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  222. logrus.WithError(err).WithField("dir", p.PropagatedMount).Error("error migrating propagated mount storage")
  223. }
  224. }
  225. }
  226. if err := os.MkdirAll(propRoot, 0755); err != nil {
  227. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
  228. }
  229. // TODO: sanitize PropagatedMount and prevent breakout
  230. p.PropagatedMount = filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
  231. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  232. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", p.PropagatedMount, err)
  233. return
  234. }
  235. }
  236. }
  237. }
  238. pm.save(p)
  239. requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
  240. if requiresManualRestore {
  241. // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
  242. if err := pm.enable(p, c, true); err != nil {
  243. logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
  244. }
  245. }
  246. }(p)
  247. }
  248. wg.Wait()
  249. return nil
  250. }
  251. // Get looks up the requested plugin in the store.
  252. func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
  253. return pm.config.Store.GetV2Plugin(idOrName)
  254. }
  255. func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
  256. p := filepath.Join(pm.config.Root, id, configFileName)
  257. dt, err := ioutil.ReadFile(p)
  258. if err != nil {
  259. return nil, errors.Wrapf(err, "error reading %v", p)
  260. }
  261. var plugin v2.Plugin
  262. if err := json.Unmarshal(dt, &plugin); err != nil {
  263. return nil, errors.Wrapf(err, "error decoding %v", p)
  264. }
  265. return &plugin, nil
  266. }
  267. func (pm *Manager) save(p *v2.Plugin) error {
  268. pluginJSON, err := json.Marshal(p)
  269. if err != nil {
  270. return errors.Wrap(err, "failed to marshal plugin json")
  271. }
  272. if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
  273. return errors.Wrap(err, "failed to write atomically plugin json")
  274. }
  275. return nil
  276. }
  277. // GC cleans up unreferenced blobs. This is recommended to run in a goroutine
  278. func (pm *Manager) GC() {
  279. pm.muGC.Lock()
  280. defer pm.muGC.Unlock()
  281. whitelist := make(map[digest.Digest]struct{})
  282. for _, p := range pm.config.Store.GetAll() {
  283. whitelist[p.Config] = struct{}{}
  284. for _, b := range p.Blobsums {
  285. whitelist[b] = struct{}{}
  286. }
  287. }
  288. pm.blobStore.gc(whitelist)
  289. }
  290. type logHook struct{ id string }
  291. func (logHook) Levels() []logrus.Level {
  292. return logrus.AllLevels
  293. }
  294. func (l logHook) Fire(entry *logrus.Entry) error {
  295. entry.Data = logrus.Fields{"plugin": l.id}
  296. return nil
  297. }
  298. func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
  299. logger := logrus.New()
  300. logger.Hooks.Add(logHook{id})
  301. return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
  302. }
  303. func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
  304. if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
  305. return errors.New("incorrect privileges")
  306. }
  307. return nil
  308. }
  309. func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
  310. if len(arrOne) != len(arrOther) {
  311. return false
  312. }
  313. sort.Sort(arrOne)
  314. sort.Sort(arrOther)
  315. for i := 1; i < arrOne.Len(); i++ {
  316. if !compare(arrOne[i], arrOther[i]) {
  317. return false
  318. }
  319. }
  320. return true
  321. }
  322. func isEqualPrivilege(a, b types.PluginPrivilege) bool {
  323. if a.Name != b.Name {
  324. return false
  325. }
  326. return reflect.DeepEqual(a.Value, b.Value)
  327. }
  328. func configToRootFS(c []byte) (*image.RootFS, layer.Platform, error) {
  329. // TODO @jhowardmsft LCOW - Will need to revisit this. For now, calculate the platform.
  330. platform := layer.Platform(runtime.GOOS)
  331. if system.LCOWSupported() {
  332. platform = "linux"
  333. }
  334. var pluginConfig types.PluginConfig
  335. if err := json.Unmarshal(c, &pluginConfig); err != nil {
  336. return nil, "", err
  337. }
  338. // validation for empty rootfs is in distribution code
  339. if pluginConfig.Rootfs == nil {
  340. return nil, platform, nil
  341. }
  342. return rootFSFromPlugin(pluginConfig.Rootfs), platform, nil
  343. }
  344. func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
  345. rootFS := image.RootFS{
  346. Type: pluginfs.Type,
  347. DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
  348. }
  349. for i := range pluginfs.DiffIds {
  350. rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
  351. }
  352. return &rootFS
  353. }