manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. package plugin
  2. import (
  3. "encoding/json"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "regexp"
  10. "runtime"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "github.com/docker/distribution/reference"
  15. "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/image"
  17. "github.com/docker/docker/layer"
  18. "github.com/docker/docker/libcontainerd"
  19. "github.com/docker/docker/pkg/authorization"
  20. "github.com/docker/docker/pkg/ioutils"
  21. "github.com/docker/docker/pkg/mount"
  22. "github.com/docker/docker/pkg/pubsub"
  23. "github.com/docker/docker/pkg/system"
  24. "github.com/docker/docker/plugin/v2"
  25. "github.com/docker/docker/registry"
  26. "github.com/opencontainers/go-digest"
  27. "github.com/pkg/errors"
  28. "github.com/sirupsen/logrus"
  29. )
  30. const configFileName = "config.json"
  31. const rootFSFileName = "rootfs"
  32. var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
  33. func (pm *Manager) restorePlugin(p *v2.Plugin) error {
  34. if p.IsEnabled() {
  35. return pm.restore(p)
  36. }
  37. return nil
  38. }
  39. type eventLogger func(id, name, action string)
  40. // ManagerConfig defines configuration needed to start new manager.
  41. type ManagerConfig struct {
  42. Store *Store // remove
  43. Executor libcontainerd.Remote
  44. RegistryService registry.Service
  45. LiveRestoreEnabled bool // TODO: remove
  46. LogPluginEvent eventLogger
  47. Root string
  48. ExecRoot string
  49. AuthzMiddleware *authorization.Middleware
  50. }
  51. // Manager controls the plugin subsystem.
  52. type Manager struct {
  53. config ManagerConfig
  54. mu sync.RWMutex // protects cMap
  55. muGC sync.RWMutex // protects blobstore deletions
  56. cMap map[*v2.Plugin]*controller
  57. containerdClient libcontainerd.Client
  58. blobStore *basicBlobStore
  59. publisher *pubsub.Publisher
  60. }
  61. // controller represents the manager's control on a plugin.
  62. type controller struct {
  63. restart bool
  64. exitChan chan bool
  65. timeoutInSecs int
  66. }
  67. // pluginRegistryService ensures that all resolved repositories
  68. // are of the plugin class.
  69. type pluginRegistryService struct {
  70. registry.Service
  71. }
  72. func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
  73. repoInfo, err = s.Service.ResolveRepository(name)
  74. if repoInfo != nil {
  75. repoInfo.Class = "plugin"
  76. }
  77. return
  78. }
  79. // NewManager returns a new plugin manager.
  80. func NewManager(config ManagerConfig) (*Manager, error) {
  81. if config.RegistryService != nil {
  82. config.RegistryService = pluginRegistryService{config.RegistryService}
  83. }
  84. manager := &Manager{
  85. config: config,
  86. }
  87. if err := os.MkdirAll(manager.config.Root, 0700); err != nil {
  88. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.Root)
  89. }
  90. if err := os.MkdirAll(manager.config.ExecRoot, 0700); err != nil {
  91. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.ExecRoot)
  92. }
  93. if err := os.MkdirAll(manager.tmpDir(), 0700); err != nil {
  94. return nil, errors.Wrapf(err, "failed to mkdir %v", manager.tmpDir())
  95. }
  96. if err := setupRoot(manager.config.Root); err != nil {
  97. return nil, err
  98. }
  99. var err error
  100. manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct
  101. if err != nil {
  102. return nil, errors.Wrap(err, "failed to create containerd client")
  103. }
  104. manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
  105. if err != nil {
  106. return nil, err
  107. }
  108. manager.cMap = make(map[*v2.Plugin]*controller)
  109. if err := manager.reload(); err != nil {
  110. return nil, errors.Wrap(err, "failed to restore plugins")
  111. }
  112. manager.publisher = pubsub.NewPublisher(0, 0)
  113. return manager, nil
  114. }
  115. func (pm *Manager) tmpDir() string {
  116. return filepath.Join(pm.config.Root, "tmp")
  117. }
  118. // StateChanged updates plugin internals using libcontainerd events.
  119. func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
  120. logrus.Debugf("plugin state changed %s %#v", id, e)
  121. switch e.State {
  122. case libcontainerd.StateExit:
  123. p, err := pm.config.Store.GetV2Plugin(id)
  124. if err != nil {
  125. return err
  126. }
  127. os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
  128. if p.PropagatedMount != "" {
  129. if err := mount.Unmount(p.PropagatedMount); err != nil {
  130. logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
  131. }
  132. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  133. if err := mount.Unmount(propRoot); err != nil {
  134. logrus.Warn("Could not unmount %s: %v", propRoot, err)
  135. }
  136. }
  137. pm.mu.RLock()
  138. c := pm.cMap[p]
  139. if c.exitChan != nil {
  140. close(c.exitChan)
  141. }
  142. restart := c.restart
  143. pm.mu.RUnlock()
  144. if restart {
  145. pm.enable(p, c, true)
  146. }
  147. }
  148. return nil
  149. }
  150. func handleLoadError(err error, id string) {
  151. if err == nil {
  152. return
  153. }
  154. logger := logrus.WithError(err).WithField("id", id)
  155. if os.IsNotExist(errors.Cause(err)) {
  156. // Likely some error while removing on an older version of docker
  157. logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
  158. return
  159. }
  160. logger.Error("error loading plugin, skipping")
  161. }
  162. func (pm *Manager) reload() error { // todo: restore
  163. dir, err := ioutil.ReadDir(pm.config.Root)
  164. if err != nil {
  165. return errors.Wrapf(err, "failed to read %v", pm.config.Root)
  166. }
  167. plugins := make(map[string]*v2.Plugin)
  168. for _, v := range dir {
  169. if validFullID.MatchString(v.Name()) {
  170. p, err := pm.loadPlugin(v.Name())
  171. if err != nil {
  172. handleLoadError(err, v.Name())
  173. continue
  174. }
  175. plugins[p.GetID()] = p
  176. } else {
  177. if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
  178. // There was likely some error while removing this plugin, let's try to remove again here
  179. if err := system.EnsureRemoveAll(v.Name()); err != nil {
  180. logrus.WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
  181. }
  182. }
  183. }
  184. }
  185. pm.config.Store.SetAll(plugins)
  186. var wg sync.WaitGroup
  187. wg.Add(len(plugins))
  188. for _, p := range plugins {
  189. c := &controller{} // todo: remove this
  190. pm.cMap[p] = c
  191. go func(p *v2.Plugin) {
  192. defer wg.Done()
  193. if err := pm.restorePlugin(p); err != nil {
  194. logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
  195. return
  196. }
  197. if p.Rootfs != "" {
  198. p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
  199. }
  200. // We should only enable rootfs propagation for certain plugin types that need it.
  201. for _, typ := range p.PluginObj.Config.Interface.Types {
  202. if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
  203. if p.PluginObj.Config.PropagatedMount != "" {
  204. propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
  205. // check if we need to migrate an older propagated mount from before
  206. // these mounts were stored outside the plugin rootfs
  207. if _, err := os.Stat(propRoot); os.IsNotExist(err) {
  208. if _, err := os.Stat(p.PropagatedMount); err == nil {
  209. // make sure nothing is mounted here
  210. // don't care about errors
  211. mount.Unmount(p.PropagatedMount)
  212. if err := os.Rename(p.PropagatedMount, propRoot); err != nil {
  213. logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
  214. }
  215. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  216. logrus.WithError(err).WithField("dir", p.PropagatedMount).Error("error migrating propagated mount storage")
  217. }
  218. }
  219. }
  220. if err := os.MkdirAll(propRoot, 0755); err != nil {
  221. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
  222. }
  223. // TODO: sanitize PropagatedMount and prevent breakout
  224. p.PropagatedMount = filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
  225. if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
  226. logrus.Errorf("failed to create PropagatedMount directory at %s: %v", p.PropagatedMount, err)
  227. return
  228. }
  229. }
  230. }
  231. }
  232. pm.save(p)
  233. requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
  234. if requiresManualRestore {
  235. // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
  236. if err := pm.enable(p, c, true); err != nil {
  237. logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
  238. }
  239. }
  240. }(p)
  241. }
  242. wg.Wait()
  243. return nil
  244. }
  245. // Get looks up the requested plugin in the store.
  246. func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
  247. return pm.config.Store.GetV2Plugin(idOrName)
  248. }
  249. func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
  250. p := filepath.Join(pm.config.Root, id, configFileName)
  251. dt, err := ioutil.ReadFile(p)
  252. if err != nil {
  253. return nil, errors.Wrapf(err, "error reading %v", p)
  254. }
  255. var plugin v2.Plugin
  256. if err := json.Unmarshal(dt, &plugin); err != nil {
  257. return nil, errors.Wrapf(err, "error decoding %v", p)
  258. }
  259. return &plugin, nil
  260. }
  261. func (pm *Manager) save(p *v2.Plugin) error {
  262. pluginJSON, err := json.Marshal(p)
  263. if err != nil {
  264. return errors.Wrap(err, "failed to marshal plugin json")
  265. }
  266. if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
  267. return errors.Wrap(err, "failed to write atomically plugin json")
  268. }
  269. return nil
  270. }
  271. // GC cleans up unreferenced blobs. This is recommended to run in a goroutine
  272. func (pm *Manager) GC() {
  273. pm.muGC.Lock()
  274. defer pm.muGC.Unlock()
  275. whitelist := make(map[digest.Digest]struct{})
  276. for _, p := range pm.config.Store.GetAll() {
  277. whitelist[p.Config] = struct{}{}
  278. for _, b := range p.Blobsums {
  279. whitelist[b] = struct{}{}
  280. }
  281. }
  282. pm.blobStore.gc(whitelist)
  283. }
  284. type logHook struct{ id string }
  285. func (logHook) Levels() []logrus.Level {
  286. return logrus.AllLevels
  287. }
  288. func (l logHook) Fire(entry *logrus.Entry) error {
  289. entry.Data = logrus.Fields{"plugin": l.id}
  290. return nil
  291. }
  292. func attachToLog(id string) func(libcontainerd.IOPipe) error {
  293. return func(iop libcontainerd.IOPipe) error {
  294. iop.Stdin.Close()
  295. logger := logrus.New()
  296. logger.Hooks.Add(logHook{id})
  297. // TODO: cache writer per id
  298. w := logger.Writer()
  299. go func() {
  300. io.Copy(w, iop.Stdout)
  301. }()
  302. go func() {
  303. // TODO: update logrus and use logger.WriterLevel
  304. io.Copy(w, iop.Stderr)
  305. }()
  306. return nil
  307. }
  308. }
  309. func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
  310. if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
  311. return errors.New("incorrect privileges")
  312. }
  313. return nil
  314. }
  315. func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
  316. if len(arrOne) != len(arrOther) {
  317. return false
  318. }
  319. sort.Sort(arrOne)
  320. sort.Sort(arrOther)
  321. for i := 1; i < arrOne.Len(); i++ {
  322. if !compare(arrOne[i], arrOther[i]) {
  323. return false
  324. }
  325. }
  326. return true
  327. }
  328. func isEqualPrivilege(a, b types.PluginPrivilege) bool {
  329. if a.Name != b.Name {
  330. return false
  331. }
  332. return reflect.DeepEqual(a.Value, b.Value)
  333. }
  334. func configToRootFS(c []byte) (*image.RootFS, layer.Platform, error) {
  335. // TODO @jhowardmsft LCOW - Will need to revisit this. For now, calculate the platform.
  336. platform := layer.Platform(runtime.GOOS)
  337. if system.LCOWSupported() {
  338. platform = "linux"
  339. }
  340. var pluginConfig types.PluginConfig
  341. if err := json.Unmarshal(c, &pluginConfig); err != nil {
  342. return nil, "", err
  343. }
  344. // validation for empty rootfs is in distribution code
  345. if pluginConfig.Rootfs == nil {
  346. return nil, platform, nil
  347. }
  348. return rootFSFromPlugin(pluginConfig.Rootfs), platform, nil
  349. }
  350. func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
  351. rootFS := image.RootFS{
  352. Type: pluginfs.Type,
  353. DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
  354. }
  355. for i := range pluginfs.DiffIds {
  356. rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
  357. }
  358. return &rootFS
  359. }