plugin.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package plugin provides support for the SFTPGo plugin system
  15. package plugin
  16. import (
  17. "crypto/sha256"
  18. "crypto/x509"
  19. "encoding/hex"
  20. "errors"
  21. "fmt"
  22. "os"
  23. "os/exec"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/hashicorp/go-hclog"
  30. "github.com/hashicorp/go-plugin"
  31. "github.com/sftpgo/sdk/plugin/auth"
  32. "github.com/sftpgo/sdk/plugin/eventsearcher"
  33. "github.com/sftpgo/sdk/plugin/ipfilter"
  34. kmsplugin "github.com/sftpgo/sdk/plugin/kms"
  35. "github.com/sftpgo/sdk/plugin/notifier"
  36. "github.com/drakkan/sftpgo/v2/internal/kms"
  37. "github.com/drakkan/sftpgo/v2/internal/logger"
  38. "github.com/drakkan/sftpgo/v2/internal/util"
  39. )
  40. const (
  41. logSender = "plugins"
  42. )
  43. var (
  44. // Handler defines the plugins manager
  45. Handler Manager
  46. pluginsLogLevel = hclog.Debug
  47. // ErrNoSearcher defines the error to return for events searches if no plugin is configured
  48. ErrNoSearcher = errors.New("no events searcher plugin defined")
  49. )
  50. // Renderer defines the interface for generic objects rendering
  51. type Renderer interface {
  52. RenderAsJSON(reload bool) ([]byte, error)
  53. }
  54. // Config defines a plugin configuration
  55. type Config struct {
  56. // Plugin type
  57. Type string `json:"type" mapstructure:"type"`
  58. // NotifierOptions defines options for notifiers plugins
  59. NotifierOptions NotifierConfig `json:"notifier_options" mapstructure:"notifier_options"`
  60. // KMSOptions defines options for a KMS plugin
  61. KMSOptions KMSConfig `json:"kms_options" mapstructure:"kms_options"`
  62. // AuthOptions defines options for authentication plugins
  63. AuthOptions AuthConfig `json:"auth_options" mapstructure:"auth_options"`
  64. // Path to the plugin executable
  65. Cmd string `json:"cmd" mapstructure:"cmd"`
  66. // Args to pass to the plugin executable
  67. Args []string `json:"args" mapstructure:"args"`
  68. // SHA256 checksum for the plugin executable.
  69. // If not empty it will be used to verify the integrity of the executable
  70. SHA256Sum string `json:"sha256sum" mapstructure:"sha256sum"`
  71. // If enabled the client and the server automatically negotiate mTLS for
  72. // transport authentication. This ensures that only the original client will
  73. // be allowed to connect to the server, and all other connections will be
  74. // rejected. The client will also refuse to connect to any server that isn't
  75. // the original instance started by the client.
  76. AutoMTLS bool `json:"auto_mtls" mapstructure:"auto_mtls"`
  77. // EnvPrefix defines the prefix for env vars to pass from the SFTPGo process
  78. // environment to the plugin. Set to "none" to not pass any environment
  79. // variable, set to "*" to pass all environment variables. If empty, the
  80. // prefix is returned as the plugin name in uppercase with "-" replaced with
  81. // "_" and a trailing "_". For example if the plugin name is
  82. // sftpgo-plugin-eventsearch the prefix will be SFTPGO_PLUGIN_EVENTSEARCH_
  83. EnvPrefix string `json:"env_prefix" mapstructure:"env_prefix"`
  84. // Additional environment variable names to pass from the SFTPGo process
  85. // environment to the plugin.
  86. EnvVars []string `json:"env_vars" mapstructure:"env_vars"`
  87. // unique identifier for kms plugins
  88. kmsID int
  89. }
  90. func (c *Config) getSecureConfig() (*plugin.SecureConfig, error) {
  91. if c.SHA256Sum != "" {
  92. checksum, err := hex.DecodeString(c.SHA256Sum)
  93. if err != nil {
  94. return nil, fmt.Errorf("invalid sha256 hash %q: %w", c.SHA256Sum, err)
  95. }
  96. return &plugin.SecureConfig{
  97. Checksum: checksum,
  98. Hash: sha256.New(),
  99. }, nil
  100. }
  101. return nil, nil
  102. }
  103. func (c *Config) getEnvVarPrefix() string {
  104. if c.EnvPrefix == "none" {
  105. return ""
  106. }
  107. if c.EnvPrefix != "" {
  108. return c.EnvPrefix
  109. }
  110. prefix := strings.ToUpper(filepath.Base(c.Cmd)) + "_"
  111. return strings.ReplaceAll(prefix, "-", "_")
  112. }
  113. func (c *Config) getCommand() *exec.Cmd {
  114. cmd := exec.Command(c.Cmd, c.Args...)
  115. cmd.Env = []string{}
  116. if envVarPrefix := c.getEnvVarPrefix(); envVarPrefix != "" {
  117. if envVarPrefix == "*" {
  118. logger.Debug(logSender, "", "sharing all the environment variables with plugin %q", c.Cmd)
  119. cmd.Env = append(cmd.Env, os.Environ()...)
  120. return cmd
  121. }
  122. logger.Debug(logSender, "", "adding env vars with prefix %q for plugin %q", envVarPrefix, c.Cmd)
  123. for _, val := range os.Environ() {
  124. if strings.HasPrefix(val, envVarPrefix) {
  125. cmd.Env = append(cmd.Env, val)
  126. }
  127. }
  128. }
  129. logger.Debug(logSender, "", "additional env vars for plugin %q: %+v", c.Cmd, c.EnvVars)
  130. for _, key := range c.EnvVars {
  131. cmd.Env = append(cmd.Env, os.Getenv(key))
  132. }
  133. return cmd
  134. }
  135. func (c *Config) newKMSPluginSecretProvider(base kms.BaseSecret, url, masterKey string) kms.SecretProvider {
  136. return &kmsPluginSecretProvider{
  137. BaseSecret: base,
  138. URL: url,
  139. MasterKey: masterKey,
  140. config: c,
  141. }
  142. }
  143. // Manager handles enabled plugins
  144. type Manager struct {
  145. closed atomic.Bool
  146. done chan bool
  147. // List of configured plugins
  148. Configs []Config `json:"plugins" mapstructure:"plugins"`
  149. notifLock sync.RWMutex
  150. notifiers []*notifierPlugin
  151. kmsLock sync.RWMutex
  152. kms []*kmsPlugin
  153. authLock sync.RWMutex
  154. auths []*authPlugin
  155. searcherLock sync.RWMutex
  156. searcher *searcherPlugin
  157. ipFilterLock sync.RWMutex
  158. filter *ipFilterPlugin
  159. authScopes int
  160. hasSearcher bool
  161. hasNotifiers bool
  162. hasAuths bool
  163. hasIPFilter bool
  164. concurrencyGuard chan struct{}
  165. }
  166. // Initialize initializes the configured plugins
  167. func Initialize(configs []Config, logLevel string) error {
  168. logger.Debug(logSender, "", "initialize")
  169. Handler = Manager{
  170. Configs: configs,
  171. done: make(chan bool),
  172. authScopes: -1,
  173. concurrencyGuard: make(chan struct{}, 250),
  174. }
  175. Handler.closed.Store(false)
  176. setLogLevel(logLevel)
  177. if len(configs) == 0 {
  178. return nil
  179. }
  180. if err := Handler.validateConfigs(); err != nil {
  181. return err
  182. }
  183. if err := initializePlugins(); err != nil {
  184. return err
  185. }
  186. startCheckTicker()
  187. return nil
  188. }
  189. func initializePlugins() error {
  190. kmsID := 0
  191. for idx, config := range Handler.Configs {
  192. switch config.Type {
  193. case notifier.PluginName:
  194. plugin, err := newNotifierPlugin(config)
  195. if err != nil {
  196. return err
  197. }
  198. Handler.notifiers = append(Handler.notifiers, plugin)
  199. case kmsplugin.PluginName:
  200. plugin, err := newKMSPlugin(config)
  201. if err != nil {
  202. return err
  203. }
  204. Handler.kms = append(Handler.kms, plugin)
  205. Handler.Configs[idx].kmsID = kmsID
  206. kmsID++
  207. kms.RegisterSecretProvider(config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus,
  208. Handler.Configs[idx].newKMSPluginSecretProvider)
  209. logger.Info(logSender, "", "registered secret provider for scheme: %v, encrypted status: %v",
  210. config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus)
  211. case auth.PluginName:
  212. plugin, err := newAuthPlugin(config)
  213. if err != nil {
  214. return err
  215. }
  216. Handler.auths = append(Handler.auths, plugin)
  217. if Handler.authScopes == -1 {
  218. Handler.authScopes = config.AuthOptions.Scope
  219. } else {
  220. Handler.authScopes |= config.AuthOptions.Scope
  221. }
  222. case eventsearcher.PluginName:
  223. plugin, err := newSearcherPlugin(config)
  224. if err != nil {
  225. return err
  226. }
  227. Handler.searcher = plugin
  228. case ipfilter.PluginName:
  229. plugin, err := newIPFilterPlugin(config)
  230. if err != nil {
  231. return err
  232. }
  233. Handler.filter = plugin
  234. default:
  235. return fmt.Errorf("unsupported plugin type: %v", config.Type)
  236. }
  237. }
  238. return nil
  239. }
  240. func (m *Manager) validateConfigs() error {
  241. kmsSchemes := make(map[string]bool)
  242. kmsEncryptions := make(map[string]bool)
  243. m.hasSearcher = false
  244. m.hasNotifiers = false
  245. m.hasAuths = false
  246. m.hasIPFilter = false
  247. for _, config := range m.Configs {
  248. switch config.Type {
  249. case kmsplugin.PluginName:
  250. if _, ok := kmsSchemes[config.KMSOptions.Scheme]; ok {
  251. return fmt.Errorf("invalid KMS configuration, duplicated scheme %q", config.KMSOptions.Scheme)
  252. }
  253. if _, ok := kmsEncryptions[config.KMSOptions.EncryptedStatus]; ok {
  254. return fmt.Errorf("invalid KMS configuration, duplicated encrypted status %q", config.KMSOptions.EncryptedStatus)
  255. }
  256. kmsSchemes[config.KMSOptions.Scheme] = true
  257. kmsEncryptions[config.KMSOptions.EncryptedStatus] = true
  258. case eventsearcher.PluginName:
  259. if m.hasSearcher {
  260. return errors.New("only one eventsearcher plugin can be defined")
  261. }
  262. m.hasSearcher = true
  263. case notifier.PluginName:
  264. m.hasNotifiers = true
  265. case auth.PluginName:
  266. m.hasAuths = true
  267. case ipfilter.PluginName:
  268. m.hasIPFilter = true
  269. }
  270. }
  271. return nil
  272. }
  273. // HasAuthenticators returns true if there is at least an auth plugin
  274. func (m *Manager) HasAuthenticators() bool {
  275. return m.hasAuths
  276. }
  277. // HasNotifiers returns true if there is at least a notifier plugin
  278. func (m *Manager) HasNotifiers() bool {
  279. return m.hasNotifiers
  280. }
  281. // NotifyFsEvent sends the fs event notifications using any defined notifier plugins
  282. func (m *Manager) NotifyFsEvent(event *notifier.FsEvent) {
  283. m.notifLock.RLock()
  284. defer m.notifLock.RUnlock()
  285. for _, n := range m.notifiers {
  286. n.notifyFsAction(event)
  287. }
  288. }
  289. // NotifyProviderEvent sends the provider event notifications using any defined notifier plugins
  290. func (m *Manager) NotifyProviderEvent(event *notifier.ProviderEvent, object Renderer) {
  291. m.notifLock.RLock()
  292. defer m.notifLock.RUnlock()
  293. for _, n := range m.notifiers {
  294. n.notifyProviderAction(event, object)
  295. }
  296. }
  297. // NotifyLogEvent sends the log event notifications using any defined notifier plugins
  298. func (m *Manager) NotifyLogEvent(event notifier.LogEventType, protocol, username, ip, role string, err error) {
  299. if !m.hasNotifiers {
  300. return
  301. }
  302. m.notifLock.RLock()
  303. defer m.notifLock.RUnlock()
  304. e := &notifier.LogEvent{
  305. Timestamp: time.Now().UnixNano(),
  306. Event: event,
  307. Protocol: protocol,
  308. Username: username,
  309. IP: ip,
  310. Message: err.Error(),
  311. Role: role,
  312. }
  313. for _, n := range m.notifiers {
  314. n.notifyLogEvent(e)
  315. }
  316. }
  317. // HasSearcher returns true if an event searcher plugin is defined
  318. func (m *Manager) HasSearcher() bool {
  319. return m.hasSearcher
  320. }
  321. // SearchFsEvents returns the filesystem events matching the specified filters
  322. func (m *Manager) SearchFsEvents(searchFilters *eventsearcher.FsEventSearch) ([]byte, error) {
  323. if !m.hasSearcher {
  324. return nil, ErrNoSearcher
  325. }
  326. m.searcherLock.RLock()
  327. plugin := m.searcher
  328. m.searcherLock.RUnlock()
  329. return plugin.searchear.SearchFsEvents(searchFilters)
  330. }
  331. // SearchProviderEvents returns the provider events matching the specified filters
  332. func (m *Manager) SearchProviderEvents(searchFilters *eventsearcher.ProviderEventSearch) ([]byte, error) {
  333. if !m.hasSearcher {
  334. return nil, ErrNoSearcher
  335. }
  336. m.searcherLock.RLock()
  337. plugin := m.searcher
  338. m.searcherLock.RUnlock()
  339. return plugin.searchear.SearchProviderEvents(searchFilters)
  340. }
  341. // SearchLogEvents returns the log events matching the specified filters
  342. func (m *Manager) SearchLogEvents(searchFilters *eventsearcher.LogEventSearch) ([]byte, error) {
  343. if !m.hasSearcher {
  344. return nil, ErrNoSearcher
  345. }
  346. m.searcherLock.RLock()
  347. plugin := m.searcher
  348. m.searcherLock.RUnlock()
  349. return plugin.searchear.SearchLogEvents(searchFilters)
  350. }
  351. // IsIPBanned returns true if the IP filter plugin does not allow the specified ip.
  352. // If no IP filter plugin is defined this method returns false
  353. func (m *Manager) IsIPBanned(ip, protocol string) bool {
  354. if !m.hasIPFilter {
  355. return false
  356. }
  357. m.ipFilterLock.RLock()
  358. plugin := m.filter
  359. m.ipFilterLock.RUnlock()
  360. if plugin.exited() {
  361. logger.Warn(logSender, "", "ip filter plugin is not active, cannot check ip %q", ip)
  362. return false
  363. }
  364. return plugin.filter.CheckIP(ip, protocol) != nil
  365. }
  366. // ReloadFilter sends a reload request to the IP filter plugin
  367. func (m *Manager) ReloadFilter() {
  368. if !m.hasIPFilter {
  369. return
  370. }
  371. m.ipFilterLock.RLock()
  372. plugin := m.filter
  373. m.ipFilterLock.RUnlock()
  374. if err := plugin.filter.Reload(); err != nil {
  375. logger.Error(logSender, "", "unable to reload IP filter plugin: %v", err)
  376. }
  377. }
  378. func (m *Manager) kmsEncrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, string, int32, error) {
  379. m.kmsLock.RLock()
  380. plugin := m.kms[kmsID]
  381. m.kmsLock.RUnlock()
  382. return plugin.Encrypt(secret, url, masterKey)
  383. }
  384. func (m *Manager) kmsDecrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, error) {
  385. m.kmsLock.RLock()
  386. plugin := m.kms[kmsID]
  387. m.kmsLock.RUnlock()
  388. return plugin.Decrypt(secret, url, masterKey)
  389. }
  390. // HasAuthScope returns true if there is an auth plugin that support the specified scope
  391. func (m *Manager) HasAuthScope(scope int) bool {
  392. if m.authScopes == -1 {
  393. return false
  394. }
  395. return m.authScopes&scope != 0
  396. }
  397. // Authenticate tries to authenticate the specified user using an external plugin
  398. func (m *Manager) Authenticate(username, password, ip, protocol string, pkey string,
  399. tlsCert *x509.Certificate, authScope int, userAsJSON []byte,
  400. ) ([]byte, error) {
  401. switch authScope {
  402. case AuthScopePassword:
  403. return m.checkUserAndPass(username, password, ip, protocol, userAsJSON)
  404. case AuthScopePublicKey:
  405. return m.checkUserAndPublicKey(username, pkey, ip, protocol, userAsJSON)
  406. case AuthScopeKeyboardInteractive:
  407. return m.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
  408. case AuthScopeTLSCertificate:
  409. cert, err := util.EncodeTLSCertToPem(tlsCert)
  410. if err != nil {
  411. logger.Error(logSender, "", "unable to encode tls certificate to pem: %v", err)
  412. return nil, fmt.Errorf("unable to encode tls cert to pem: %w", err)
  413. }
  414. return m.checkUserAndTLSCert(username, cert, ip, protocol, userAsJSON)
  415. default:
  416. return nil, fmt.Errorf("unsupported auth scope: %v", authScope)
  417. }
  418. }
  419. // ExecuteKeyboardInteractiveStep executes a keyboard interactive step
  420. func (m *Manager) ExecuteKeyboardInteractiveStep(req *KeyboardAuthRequest) (*KeyboardAuthResponse, error) {
  421. var plugin *authPlugin
  422. m.authLock.Lock()
  423. for _, p := range m.auths {
  424. if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
  425. plugin = p
  426. break
  427. }
  428. }
  429. m.authLock.Unlock()
  430. if plugin == nil {
  431. return nil, errors.New("no auth plugin configured for keyaboard interactive authentication step")
  432. }
  433. return plugin.sendKeyboardIteractiveRequest(req)
  434. }
  435. func (m *Manager) checkUserAndPass(username, password, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  436. var plugin *authPlugin
  437. m.authLock.Lock()
  438. for _, p := range m.auths {
  439. if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
  440. plugin = p
  441. break
  442. }
  443. }
  444. m.authLock.Unlock()
  445. if plugin == nil {
  446. return nil, errors.New("no auth plugin configured for password checking")
  447. }
  448. return plugin.checkUserAndPass(username, password, ip, protocol, userAsJSON)
  449. }
  450. func (m *Manager) checkUserAndPublicKey(username, pubKey, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  451. var plugin *authPlugin
  452. m.authLock.Lock()
  453. for _, p := range m.auths {
  454. if p.config.AuthOptions.Scope&AuthScopePublicKey != 0 {
  455. plugin = p
  456. break
  457. }
  458. }
  459. m.authLock.Unlock()
  460. if plugin == nil {
  461. return nil, errors.New("no auth plugin configured for public key checking")
  462. }
  463. return plugin.checkUserAndPublicKey(username, pubKey, ip, protocol, userAsJSON)
  464. }
  465. func (m *Manager) checkUserAndTLSCert(username, tlsCert, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  466. var plugin *authPlugin
  467. m.authLock.Lock()
  468. for _, p := range m.auths {
  469. if p.config.AuthOptions.Scope&AuthScopeTLSCertificate != 0 {
  470. plugin = p
  471. break
  472. }
  473. }
  474. m.authLock.Unlock()
  475. if plugin == nil {
  476. return nil, errors.New("no auth plugin configured for TLS certificate checking")
  477. }
  478. return plugin.checkUserAndTLSCertificate(username, tlsCert, ip, protocol, userAsJSON)
  479. }
  480. func (m *Manager) checkUserAndKeyboardInteractive(username, ip, protocol string, userAsJSON []byte) ([]byte, error) {
  481. var plugin *authPlugin
  482. m.authLock.Lock()
  483. for _, p := range m.auths {
  484. if p.config.AuthOptions.Scope&AuthScopeKeyboardInteractive != 0 {
  485. plugin = p
  486. break
  487. }
  488. }
  489. m.authLock.Unlock()
  490. if plugin == nil {
  491. return nil, errors.New("no auth plugin configured for keyboard interactive checking")
  492. }
  493. return plugin.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
  494. }
  495. func (m *Manager) checkCrashedPlugins() {
  496. m.notifLock.RLock()
  497. for idx, n := range m.notifiers {
  498. if n.exited() {
  499. defer func(cfg Config, index int) {
  500. Handler.restartNotifierPlugin(cfg, index)
  501. }(n.config, idx)
  502. } else {
  503. n.sendQueuedEvents()
  504. }
  505. }
  506. m.notifLock.RUnlock()
  507. m.kmsLock.RLock()
  508. for idx, k := range m.kms {
  509. if k.exited() {
  510. defer func(cfg Config, index int) {
  511. Handler.restartKMSPlugin(cfg, index)
  512. }(k.config, idx)
  513. }
  514. }
  515. m.kmsLock.RUnlock()
  516. m.authLock.RLock()
  517. for idx, a := range m.auths {
  518. if a.exited() {
  519. defer func(cfg Config, index int) {
  520. Handler.restartAuthPlugin(cfg, index)
  521. }(a.config, idx)
  522. }
  523. }
  524. m.authLock.RUnlock()
  525. if m.hasSearcher {
  526. m.searcherLock.RLock()
  527. if m.searcher.exited() {
  528. defer func(cfg Config) {
  529. Handler.restartSearcherPlugin(cfg)
  530. }(m.searcher.config)
  531. }
  532. m.searcherLock.RUnlock()
  533. }
  534. if m.hasIPFilter {
  535. m.ipFilterLock.RLock()
  536. if m.filter.exited() {
  537. defer func(cfg Config) {
  538. Handler.restartIPFilterPlugin(cfg)
  539. }(m.filter.config)
  540. }
  541. m.ipFilterLock.RUnlock()
  542. }
  543. }
  544. func (m *Manager) restartNotifierPlugin(config Config, idx int) {
  545. if m.closed.Load() {
  546. return
  547. }
  548. logger.Info(logSender, "", "try to restart crashed notifier plugin %q, idx: %v", config.Cmd, idx)
  549. plugin, err := newNotifierPlugin(config)
  550. if err != nil {
  551. logger.Error(logSender, "", "unable to restart notifier plugin %q, err: %v", config.Cmd, err)
  552. return
  553. }
  554. m.notifLock.Lock()
  555. plugin.queue = m.notifiers[idx].queue
  556. m.notifiers[idx] = plugin
  557. m.notifLock.Unlock()
  558. plugin.sendQueuedEvents()
  559. }
  560. func (m *Manager) restartKMSPlugin(config Config, idx int) {
  561. if m.closed.Load() {
  562. return
  563. }
  564. logger.Info(logSender, "", "try to restart crashed kms plugin %q, idx: %v", config.Cmd, idx)
  565. plugin, err := newKMSPlugin(config)
  566. if err != nil {
  567. logger.Error(logSender, "", "unable to restart kms plugin %q, err: %v", config.Cmd, err)
  568. return
  569. }
  570. m.kmsLock.Lock()
  571. m.kms[idx] = plugin
  572. m.kmsLock.Unlock()
  573. }
  574. func (m *Manager) restartAuthPlugin(config Config, idx int) {
  575. if m.closed.Load() {
  576. return
  577. }
  578. logger.Info(logSender, "", "try to restart crashed auth plugin %q, idx: %v", config.Cmd, idx)
  579. plugin, err := newAuthPlugin(config)
  580. if err != nil {
  581. logger.Error(logSender, "", "unable to restart auth plugin %q, err: %v", config.Cmd, err)
  582. return
  583. }
  584. m.authLock.Lock()
  585. m.auths[idx] = plugin
  586. m.authLock.Unlock()
  587. }
  588. func (m *Manager) restartSearcherPlugin(config Config) {
  589. if m.closed.Load() {
  590. return
  591. }
  592. logger.Info(logSender, "", "try to restart crashed searcher plugin %q", config.Cmd)
  593. plugin, err := newSearcherPlugin(config)
  594. if err != nil {
  595. logger.Error(logSender, "", "unable to restart searcher plugin %q, err: %v", config.Cmd, err)
  596. return
  597. }
  598. m.searcherLock.Lock()
  599. m.searcher = plugin
  600. m.searcherLock.Unlock()
  601. }
  602. func (m *Manager) restartIPFilterPlugin(config Config) {
  603. if m.closed.Load() {
  604. return
  605. }
  606. logger.Info(logSender, "", "try to restart crashed IP filter plugin %q", config.Cmd)
  607. plugin, err := newIPFilterPlugin(config)
  608. if err != nil {
  609. logger.Error(logSender, "", "unable to restart IP filter plugin %q, err: %v", config.Cmd, err)
  610. return
  611. }
  612. m.ipFilterLock.Lock()
  613. m.filter = plugin
  614. m.ipFilterLock.Unlock()
  615. }
  616. func (m *Manager) addTask() {
  617. m.concurrencyGuard <- struct{}{}
  618. }
  619. func (m *Manager) removeTask() {
  620. <-m.concurrencyGuard
  621. }
  622. // Cleanup releases all the active plugins
  623. func (m *Manager) Cleanup() {
  624. if m.closed.Swap(true) {
  625. return
  626. }
  627. logger.Debug(logSender, "", "cleanup")
  628. close(m.done)
  629. m.notifLock.Lock()
  630. for _, n := range m.notifiers {
  631. logger.Debug(logSender, "", "cleanup notifier plugin %v", n.config.Cmd)
  632. n.cleanup()
  633. }
  634. m.notifLock.Unlock()
  635. m.kmsLock.Lock()
  636. for _, k := range m.kms {
  637. logger.Debug(logSender, "", "cleanup kms plugin %v", k.config.Cmd)
  638. k.cleanup()
  639. }
  640. m.kmsLock.Unlock()
  641. m.authLock.Lock()
  642. for _, a := range m.auths {
  643. logger.Debug(logSender, "", "cleanup auth plugin %v", a.config.Cmd)
  644. a.cleanup()
  645. }
  646. m.authLock.Unlock()
  647. if m.hasSearcher {
  648. m.searcherLock.Lock()
  649. logger.Debug(logSender, "", "cleanup searcher plugin %v", m.searcher.config.Cmd)
  650. m.searcher.cleanup()
  651. m.searcherLock.Unlock()
  652. }
  653. if m.hasIPFilter {
  654. m.ipFilterLock.Lock()
  655. logger.Debug(logSender, "", "cleanup IP filter plugin %v", m.filter.config.Cmd)
  656. m.filter.cleanup()
  657. m.ipFilterLock.Unlock()
  658. }
  659. }
  660. func setLogLevel(logLevel string) {
  661. switch logLevel {
  662. case "info":
  663. pluginsLogLevel = hclog.Info
  664. case "warn":
  665. pluginsLogLevel = hclog.Warn
  666. case "error":
  667. pluginsLogLevel = hclog.Error
  668. default:
  669. pluginsLogLevel = hclog.Debug
  670. }
  671. }
  672. func startCheckTicker() {
  673. logger.Debug(logSender, "", "start plugins checker")
  674. go func() {
  675. ticker := time.NewTicker(30 * time.Second)
  676. defer ticker.Stop()
  677. for {
  678. select {
  679. case <-Handler.done:
  680. logger.Debug(logSender, "", "handler done, stop plugins checker")
  681. return
  682. case <-ticker.C:
  683. Handler.checkCrashedPlugins()
  684. }
  685. }
  686. }()
  687. }