backend_linux.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. // +build linux
  2. package plugin
  3. import (
  4. "archive/tar"
  5. "compress/gzip"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "sort"
  15. "strings"
  16. "github.com/Sirupsen/logrus"
  17. "github.com/docker/distribution/manifest/schema2"
  18. "github.com/docker/distribution/reference"
  19. "github.com/docker/docker/api/types"
  20. "github.com/docker/docker/api/types/filters"
  21. "github.com/docker/docker/distribution"
  22. progressutils "github.com/docker/docker/distribution/utils"
  23. "github.com/docker/docker/distribution/xfer"
  24. "github.com/docker/docker/image"
  25. "github.com/docker/docker/layer"
  26. "github.com/docker/docker/pkg/authorization"
  27. "github.com/docker/docker/pkg/chrootarchive"
  28. "github.com/docker/docker/pkg/mount"
  29. "github.com/docker/docker/pkg/pools"
  30. "github.com/docker/docker/pkg/progress"
  31. "github.com/docker/docker/plugin/v2"
  32. refstore "github.com/docker/docker/reference"
  33. "github.com/opencontainers/go-digest"
  34. "github.com/pkg/errors"
  35. "golang.org/x/net/context"
  36. )
  37. var acceptedPluginFilterTags = map[string]bool{
  38. "enabled": true,
  39. "capability": true,
  40. }
  41. // Disable deactivates a plugin. This means resources (volumes, networks) cant use them.
  42. func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) error {
  43. p, err := pm.config.Store.GetV2Plugin(refOrID)
  44. if err != nil {
  45. return err
  46. }
  47. pm.mu.RLock()
  48. c := pm.cMap[p]
  49. pm.mu.RUnlock()
  50. if !config.ForceDisable && p.GetRefCount() > 0 {
  51. return fmt.Errorf("plugin %s is in use", p.Name())
  52. }
  53. for _, typ := range p.GetTypes() {
  54. if typ.Capability == authorization.AuthZApiImplements {
  55. authzList := pm.config.AuthzMiddleware.GetAuthzPlugins()
  56. for i, authPlugin := range authzList {
  57. if authPlugin.Name() == p.Name() {
  58. // Remove plugin from authzmiddleware chain
  59. authzList = append(authzList[:i], authzList[i+1:]...)
  60. pm.config.AuthzMiddleware.SetAuthzPlugins(authzList)
  61. }
  62. }
  63. }
  64. }
  65. if err := pm.disable(p, c); err != nil {
  66. return err
  67. }
  68. pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
  69. return nil
  70. }
  71. // Enable activates a plugin, which implies that they are ready to be used by containers.
  72. func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) error {
  73. p, err := pm.config.Store.GetV2Plugin(refOrID)
  74. if err != nil {
  75. return err
  76. }
  77. c := &controller{timeoutInSecs: config.Timeout}
  78. if err := pm.enable(p, c, false); err != nil {
  79. return err
  80. }
  81. pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
  82. return nil
  83. }
  84. // Inspect examines a plugin config
  85. func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
  86. p, err := pm.config.Store.GetV2Plugin(refOrID)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return &p.PluginObj, nil
  91. }
  92. func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error {
  93. if outStream != nil {
  94. // Include a buffer so that slow client connections don't affect
  95. // transfer performance.
  96. progressChan := make(chan progress.Progress, 100)
  97. writesDone := make(chan struct{})
  98. defer func() {
  99. close(progressChan)
  100. <-writesDone
  101. }()
  102. var cancelFunc context.CancelFunc
  103. ctx, cancelFunc = context.WithCancel(ctx)
  104. go func() {
  105. progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
  106. close(writesDone)
  107. }()
  108. config.ProgressOutput = progress.ChanOutput(progressChan)
  109. } else {
  110. config.ProgressOutput = progress.DiscardOutput()
  111. }
  112. return distribution.Pull(ctx, ref, config)
  113. }
  114. type tempConfigStore struct {
  115. config []byte
  116. configDigest digest.Digest
  117. }
  118. func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) {
  119. dgst := digest.FromBytes(c)
  120. s.config = c
  121. s.configDigest = dgst
  122. return dgst, nil
  123. }
  124. func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) {
  125. if d != s.configDigest {
  126. return nil, fmt.Errorf("digest not found")
  127. }
  128. return s.config, nil
  129. }
  130. func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
  131. return configToRootFS(c)
  132. }
  133. func computePrivileges(c types.PluginConfig) (types.PluginPrivileges, error) {
  134. var privileges types.PluginPrivileges
  135. if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
  136. privileges = append(privileges, types.PluginPrivilege{
  137. Name: "network",
  138. Description: "permissions to access a network",
  139. Value: []string{c.Network.Type},
  140. })
  141. }
  142. for _, mount := range c.Mounts {
  143. if mount.Source != nil {
  144. privileges = append(privileges, types.PluginPrivilege{
  145. Name: "mount",
  146. Description: "host path to mount",
  147. Value: []string{*mount.Source},
  148. })
  149. }
  150. }
  151. for _, device := range c.Linux.Devices {
  152. if device.Path != nil {
  153. privileges = append(privileges, types.PluginPrivilege{
  154. Name: "device",
  155. Description: "host device to access",
  156. Value: []string{*device.Path},
  157. })
  158. }
  159. }
  160. if c.Linux.AllowAllDevices {
  161. privileges = append(privileges, types.PluginPrivilege{
  162. Name: "allow-all-devices",
  163. Description: "allow 'rwm' access to all devices",
  164. Value: []string{"true"},
  165. })
  166. }
  167. if len(c.Linux.Capabilities) > 0 {
  168. privileges = append(privileges, types.PluginPrivilege{
  169. Name: "capabilities",
  170. Description: "list of additional capabilities required",
  171. Value: c.Linux.Capabilities,
  172. })
  173. }
  174. return privileges, nil
  175. }
  176. // Privileges pulls a plugin config and computes the privileges required to install it.
  177. func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
  178. // create image store instance
  179. cs := &tempConfigStore{}
  180. // DownloadManager not defined because only pulling configuration.
  181. pluginPullConfig := &distribution.ImagePullConfig{
  182. Config: distribution.Config{
  183. MetaHeaders: metaHeader,
  184. AuthConfig: authConfig,
  185. RegistryService: pm.config.RegistryService,
  186. ImageEventLogger: func(string, string, string) {},
  187. ImageStore: cs,
  188. },
  189. Schema2Types: distribution.PluginTypes,
  190. }
  191. if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil {
  192. return nil, err
  193. }
  194. if cs.config == nil {
  195. return nil, errors.New("no configuration pulled")
  196. }
  197. var config types.PluginConfig
  198. if err := json.Unmarshal(cs.config, &config); err != nil {
  199. return nil, err
  200. }
  201. return computePrivileges(config)
  202. }
  203. // Upgrade upgrades a plugin
  204. func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
  205. p, err := pm.config.Store.GetV2Plugin(name)
  206. if err != nil {
  207. return errors.Wrap(err, "plugin must be installed before upgrading")
  208. }
  209. if p.IsEnabled() {
  210. return fmt.Errorf("plugin must be disabled before upgrading")
  211. }
  212. pm.muGC.RLock()
  213. defer pm.muGC.RUnlock()
  214. // revalidate because Pull is public
  215. nameref, err := reference.ParseNormalizedNamed(name)
  216. if err != nil {
  217. return errors.Wrapf(err, "failed to parse %q", name)
  218. }
  219. name = reference.FamiliarString(reference.TagNameOnly(nameref))
  220. tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
  221. if err != nil {
  222. return err
  223. }
  224. defer os.RemoveAll(tmpRootFSDir)
  225. dm := &downloadManager{
  226. tmpDir: tmpRootFSDir,
  227. blobStore: pm.blobStore,
  228. }
  229. pluginPullConfig := &distribution.ImagePullConfig{
  230. Config: distribution.Config{
  231. MetaHeaders: metaHeader,
  232. AuthConfig: authConfig,
  233. RegistryService: pm.config.RegistryService,
  234. ImageEventLogger: pm.config.LogPluginEvent,
  235. ImageStore: dm,
  236. },
  237. DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
  238. Schema2Types: distribution.PluginTypes,
  239. }
  240. err = pm.pull(ctx, ref, pluginPullConfig, outStream)
  241. if err != nil {
  242. go pm.GC()
  243. return err
  244. }
  245. if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil {
  246. return err
  247. }
  248. p.PluginObj.PluginReference = ref.String()
  249. return nil
  250. }
  251. // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
  252. func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
  253. pm.muGC.RLock()
  254. defer pm.muGC.RUnlock()
  255. // revalidate because Pull is public
  256. nameref, err := reference.ParseNormalizedNamed(name)
  257. if err != nil {
  258. return errors.Wrapf(err, "failed to parse %q", name)
  259. }
  260. name = reference.FamiliarString(reference.TagNameOnly(nameref))
  261. if err := pm.config.Store.validateName(name); err != nil {
  262. return err
  263. }
  264. tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
  265. if err != nil {
  266. return err
  267. }
  268. defer os.RemoveAll(tmpRootFSDir)
  269. dm := &downloadManager{
  270. tmpDir: tmpRootFSDir,
  271. blobStore: pm.blobStore,
  272. }
  273. pluginPullConfig := &distribution.ImagePullConfig{
  274. Config: distribution.Config{
  275. MetaHeaders: metaHeader,
  276. AuthConfig: authConfig,
  277. RegistryService: pm.config.RegistryService,
  278. ImageEventLogger: pm.config.LogPluginEvent,
  279. ImageStore: dm,
  280. },
  281. DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
  282. Schema2Types: distribution.PluginTypes,
  283. }
  284. err = pm.pull(ctx, ref, pluginPullConfig, outStream)
  285. if err != nil {
  286. go pm.GC()
  287. return err
  288. }
  289. p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges)
  290. if err != nil {
  291. return err
  292. }
  293. p.PluginObj.PluginReference = ref.String()
  294. return nil
  295. }
  296. // List displays the list of plugins and associated metadata.
  297. func (pm *Manager) List(pluginFilters filters.Args) ([]types.Plugin, error) {
  298. if err := pluginFilters.Validate(acceptedPluginFilterTags); err != nil {
  299. return nil, err
  300. }
  301. enabledOnly := false
  302. disabledOnly := false
  303. if pluginFilters.Include("enabled") {
  304. if pluginFilters.ExactMatch("enabled", "true") {
  305. enabledOnly = true
  306. } else if pluginFilters.ExactMatch("enabled", "false") {
  307. disabledOnly = true
  308. } else {
  309. return nil, fmt.Errorf("Invalid filter 'enabled=%s'", pluginFilters.Get("enabled"))
  310. }
  311. }
  312. plugins := pm.config.Store.GetAll()
  313. out := make([]types.Plugin, 0, len(plugins))
  314. next:
  315. for _, p := range plugins {
  316. if enabledOnly && !p.PluginObj.Enabled {
  317. continue
  318. }
  319. if disabledOnly && p.PluginObj.Enabled {
  320. continue
  321. }
  322. if pluginFilters.Include("capability") {
  323. for _, f := range p.GetTypes() {
  324. if !pluginFilters.Match("capability", f.Capability) {
  325. continue next
  326. }
  327. }
  328. }
  329. out = append(out, p.PluginObj)
  330. }
  331. return out, nil
  332. }
  333. // Push pushes a plugin to the store.
  334. func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
  335. p, err := pm.config.Store.GetV2Plugin(name)
  336. if err != nil {
  337. return err
  338. }
  339. ref, err := reference.ParseNormalizedNamed(p.Name())
  340. if err != nil {
  341. return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
  342. }
  343. var po progress.Output
  344. if outStream != nil {
  345. // Include a buffer so that slow client connections don't affect
  346. // transfer performance.
  347. progressChan := make(chan progress.Progress, 100)
  348. writesDone := make(chan struct{})
  349. defer func() {
  350. close(progressChan)
  351. <-writesDone
  352. }()
  353. var cancelFunc context.CancelFunc
  354. ctx, cancelFunc = context.WithCancel(ctx)
  355. go func() {
  356. progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
  357. close(writesDone)
  358. }()
  359. po = progress.ChanOutput(progressChan)
  360. } else {
  361. po = progress.DiscardOutput()
  362. }
  363. // TODO: replace these with manager
  364. is := &pluginConfigStore{
  365. pm: pm,
  366. plugin: p,
  367. }
  368. ls := &pluginLayerProvider{
  369. pm: pm,
  370. plugin: p,
  371. }
  372. rs := &pluginReference{
  373. name: ref,
  374. pluginID: p.Config,
  375. }
  376. uploadManager := xfer.NewLayerUploadManager(3)
  377. imagePushConfig := &distribution.ImagePushConfig{
  378. Config: distribution.Config{
  379. MetaHeaders: metaHeader,
  380. AuthConfig: authConfig,
  381. ProgressOutput: po,
  382. RegistryService: pm.config.RegistryService,
  383. ReferenceStore: rs,
  384. ImageEventLogger: pm.config.LogPluginEvent,
  385. ImageStore: is,
  386. RequireSchema2: true,
  387. },
  388. ConfigMediaType: schema2.MediaTypePluginConfig,
  389. LayerStore: ls,
  390. UploadManager: uploadManager,
  391. }
  392. return distribution.Push(ctx, ref, imagePushConfig)
  393. }
  394. type pluginReference struct {
  395. name reference.Named
  396. pluginID digest.Digest
  397. }
  398. func (r *pluginReference) References(id digest.Digest) []reference.Named {
  399. if r.pluginID != id {
  400. return nil
  401. }
  402. return []reference.Named{r.name}
  403. }
  404. func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association {
  405. return []refstore.Association{
  406. {
  407. Ref: r.name,
  408. ID: r.pluginID,
  409. },
  410. }
  411. }
  412. func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) {
  413. if r.name.String() != ref.String() {
  414. return digest.Digest(""), refstore.ErrDoesNotExist
  415. }
  416. return r.pluginID, nil
  417. }
  418. func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error {
  419. // Read only, ignore
  420. return nil
  421. }
  422. func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error {
  423. // Read only, ignore
  424. return nil
  425. }
  426. func (r *pluginReference) Delete(ref reference.Named) (bool, error) {
  427. // Read only, ignore
  428. return false, nil
  429. }
  430. type pluginConfigStore struct {
  431. pm *Manager
  432. plugin *v2.Plugin
  433. }
  434. func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) {
  435. return digest.Digest(""), errors.New("cannot store config on push")
  436. }
  437. func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) {
  438. if s.plugin.Config != d {
  439. return nil, errors.New("plugin not found")
  440. }
  441. rwc, err := s.pm.blobStore.Get(d)
  442. if err != nil {
  443. return nil, err
  444. }
  445. defer rwc.Close()
  446. return ioutil.ReadAll(rwc)
  447. }
  448. func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
  449. return configToRootFS(c)
  450. }
  451. type pluginLayerProvider struct {
  452. pm *Manager
  453. plugin *v2.Plugin
  454. }
  455. func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) {
  456. rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs)
  457. var i int
  458. for i = 1; i <= len(rootFS.DiffIDs); i++ {
  459. if layer.CreateChainID(rootFS.DiffIDs[:i]) == id {
  460. break
  461. }
  462. }
  463. if i > len(rootFS.DiffIDs) {
  464. return nil, errors.New("layer not found")
  465. }
  466. return &pluginLayer{
  467. pm: p.pm,
  468. diffIDs: rootFS.DiffIDs[:i],
  469. blobs: p.plugin.Blobsums[:i],
  470. }, nil
  471. }
  472. type pluginLayer struct {
  473. pm *Manager
  474. diffIDs []layer.DiffID
  475. blobs []digest.Digest
  476. }
  477. func (l *pluginLayer) ChainID() layer.ChainID {
  478. return layer.CreateChainID(l.diffIDs)
  479. }
  480. func (l *pluginLayer) DiffID() layer.DiffID {
  481. return l.diffIDs[len(l.diffIDs)-1]
  482. }
  483. func (l *pluginLayer) Parent() distribution.PushLayer {
  484. if len(l.diffIDs) == 1 {
  485. return nil
  486. }
  487. return &pluginLayer{
  488. pm: l.pm,
  489. diffIDs: l.diffIDs[:len(l.diffIDs)-1],
  490. blobs: l.blobs[:len(l.diffIDs)-1],
  491. }
  492. }
  493. func (l *pluginLayer) Open() (io.ReadCloser, error) {
  494. return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1])
  495. }
  496. func (l *pluginLayer) Size() (int64, error) {
  497. return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1])
  498. }
  499. func (l *pluginLayer) MediaType() string {
  500. return schema2.MediaTypeLayer
  501. }
  502. func (l *pluginLayer) Release() {
  503. // Nothing needs to be release, no references held
  504. }
  505. // Remove deletes plugin's root directory.
  506. func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
  507. p, err := pm.config.Store.GetV2Plugin(name)
  508. pm.mu.RLock()
  509. c := pm.cMap[p]
  510. pm.mu.RUnlock()
  511. if err != nil {
  512. return err
  513. }
  514. if !config.ForceRemove {
  515. if p.GetRefCount() > 0 {
  516. return fmt.Errorf("plugin %s is in use", p.Name())
  517. }
  518. if p.IsEnabled() {
  519. return fmt.Errorf("plugin %s is enabled", p.Name())
  520. }
  521. }
  522. if p.IsEnabled() {
  523. if err := pm.disable(p, c); err != nil {
  524. logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
  525. }
  526. }
  527. defer func() {
  528. go pm.GC()
  529. }()
  530. id := p.GetID()
  531. pm.config.Store.Remove(p)
  532. pluginDir := filepath.Join(pm.config.Root, id)
  533. if err := recursiveUnmount(pm.config.Root); err != nil {
  534. logrus.WithField("dir", pm.config.Root).WithField("id", id).Warn(err)
  535. }
  536. if err := os.RemoveAll(pluginDir); err != nil {
  537. logrus.Warnf("unable to remove %q from plugin remove: %v", pluginDir, err)
  538. }
  539. pm.config.LogPluginEvent(id, name, "remove")
  540. return nil
  541. }
  542. func getMounts(root string) ([]string, error) {
  543. infos, err := mount.GetMounts()
  544. if err != nil {
  545. return nil, errors.Wrap(err, "failed to read mount table while performing recursive unmount")
  546. }
  547. var mounts []string
  548. for _, m := range infos {
  549. if strings.HasPrefix(m.Mountpoint, root) {
  550. mounts = append(mounts, m.Mountpoint)
  551. }
  552. }
  553. return mounts, nil
  554. }
  555. func recursiveUnmount(root string) error {
  556. mounts, err := getMounts(root)
  557. if err != nil {
  558. return err
  559. }
  560. // sort in reverse-lexicographic order so the root mount will always be last
  561. sort.Sort(sort.Reverse(sort.StringSlice(mounts)))
  562. for i, m := range mounts {
  563. if err := mount.Unmount(m); err != nil {
  564. if i == len(mounts)-1 {
  565. return errors.Wrapf(err, "error performing recursive unmount on %s", root)
  566. }
  567. logrus.WithError(err).WithField("mountpoint", m).Warn("could not unmount")
  568. }
  569. }
  570. return nil
  571. }
  572. // Set sets plugin args
  573. func (pm *Manager) Set(name string, args []string) error {
  574. p, err := pm.config.Store.GetV2Plugin(name)
  575. if err != nil {
  576. return err
  577. }
  578. if err := p.Set(args); err != nil {
  579. return err
  580. }
  581. return pm.save(p)
  582. }
  583. // CreateFromContext creates a plugin from the given pluginDir which contains
  584. // both the rootfs and the config.json and a repoName with optional tag.
  585. func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *types.PluginCreateOptions) (err error) {
  586. pm.muGC.RLock()
  587. defer pm.muGC.RUnlock()
  588. ref, err := reference.ParseNormalizedNamed(options.RepoName)
  589. if err != nil {
  590. return errors.Wrapf(err, "failed to parse reference %v", options.RepoName)
  591. }
  592. if _, ok := ref.(reference.Canonical); ok {
  593. return errors.Errorf("canonical references are not permitted")
  594. }
  595. name := reference.FamiliarString(reference.TagNameOnly(ref))
  596. if err := pm.config.Store.validateName(name); err != nil { // fast check, real check is in createPlugin()
  597. return err
  598. }
  599. tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
  600. if err != nil {
  601. return errors.Wrap(err, "failed to create temp directory")
  602. }
  603. defer os.RemoveAll(tmpRootFSDir)
  604. var configJSON []byte
  605. rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
  606. rootFSBlob, err := pm.blobStore.New()
  607. if err != nil {
  608. return err
  609. }
  610. defer rootFSBlob.Close()
  611. gzw := gzip.NewWriter(rootFSBlob)
  612. layerDigester := digest.Canonical.Digester()
  613. rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash()))
  614. if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
  615. return err
  616. }
  617. if err := rootFS.Close(); err != nil {
  618. return err
  619. }
  620. if configJSON == nil {
  621. return errors.New("config not found")
  622. }
  623. if err := gzw.Close(); err != nil {
  624. return errors.Wrap(err, "error closing gzip writer")
  625. }
  626. var config types.PluginConfig
  627. if err := json.Unmarshal(configJSON, &config); err != nil {
  628. return errors.Wrap(err, "failed to parse config")
  629. }
  630. if err := pm.validateConfig(config); err != nil {
  631. return err
  632. }
  633. pm.mu.Lock()
  634. defer pm.mu.Unlock()
  635. rootFSBlobsum, err := rootFSBlob.Commit()
  636. if err != nil {
  637. return err
  638. }
  639. defer func() {
  640. if err != nil {
  641. go pm.GC()
  642. }
  643. }()
  644. config.Rootfs = &types.PluginConfigRootfs{
  645. Type: "layers",
  646. DiffIds: []string{layerDigester.Digest().String()},
  647. }
  648. configBlob, err := pm.blobStore.New()
  649. if err != nil {
  650. return err
  651. }
  652. defer configBlob.Close()
  653. if err := json.NewEncoder(configBlob).Encode(config); err != nil {
  654. return errors.Wrap(err, "error encoding json config")
  655. }
  656. configBlobsum, err := configBlob.Commit()
  657. if err != nil {
  658. return err
  659. }
  660. p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil)
  661. if err != nil {
  662. return err
  663. }
  664. p.PluginObj.PluginReference = name
  665. pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
  666. return nil
  667. }
  668. func (pm *Manager) validateConfig(config types.PluginConfig) error {
  669. return nil // TODO:
  670. }
  671. func splitConfigRootFSFromTar(in io.ReadCloser, config *[]byte) io.ReadCloser {
  672. pr, pw := io.Pipe()
  673. go func() {
  674. tarReader := tar.NewReader(in)
  675. tarWriter := tar.NewWriter(pw)
  676. defer in.Close()
  677. hasRootFS := false
  678. for {
  679. hdr, err := tarReader.Next()
  680. if err == io.EOF {
  681. if !hasRootFS {
  682. pw.CloseWithError(errors.Wrap(err, "no rootfs found"))
  683. return
  684. }
  685. // Signals end of archive.
  686. tarWriter.Close()
  687. pw.Close()
  688. return
  689. }
  690. if err != nil {
  691. pw.CloseWithError(errors.Wrap(err, "failed to read from tar"))
  692. return
  693. }
  694. content := io.Reader(tarReader)
  695. name := path.Clean(hdr.Name)
  696. if path.IsAbs(name) {
  697. name = name[1:]
  698. }
  699. if name == configFileName {
  700. dt, err := ioutil.ReadAll(content)
  701. if err != nil {
  702. pw.CloseWithError(errors.Wrapf(err, "failed to read %s", configFileName))
  703. return
  704. }
  705. *config = dt
  706. }
  707. if parts := strings.Split(name, "/"); len(parts) != 0 && parts[0] == rootFSFileName {
  708. hdr.Name = path.Clean(path.Join(parts[1:]...))
  709. if hdr.Typeflag == tar.TypeLink && strings.HasPrefix(strings.ToLower(hdr.Linkname), rootFSFileName+"/") {
  710. hdr.Linkname = hdr.Linkname[len(rootFSFileName)+1:]
  711. }
  712. if err := tarWriter.WriteHeader(hdr); err != nil {
  713. pw.CloseWithError(errors.Wrap(err, "error writing tar header"))
  714. return
  715. }
  716. if _, err := pools.Copy(tarWriter, content); err != nil {
  717. pw.CloseWithError(errors.Wrap(err, "error copying tar data"))
  718. return
  719. }
  720. hasRootFS = true
  721. } else {
  722. io.Copy(ioutil.Discard, content)
  723. }
  724. }
  725. }()
  726. return pr
  727. }