notifier.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package plugin
  2. import (
  3. "crypto/sha256"
  4. "fmt"
  5. "os/exec"
  6. "sync"
  7. "time"
  8. "github.com/hashicorp/go-hclog"
  9. "github.com/hashicorp/go-plugin"
  10. "google.golang.org/protobuf/types/known/timestamppb"
  11. "github.com/drakkan/sftpgo/v2/logger"
  12. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
  13. "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
  14. "github.com/drakkan/sftpgo/v2/util"
  15. )
  16. // NotifierConfig defines configuration parameters for notifiers plugins
  17. type NotifierConfig struct {
  18. FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
  19. UserEvents []string `json:"user_events" mapstructure:"user_events"`
  20. RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
  21. RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
  22. }
  23. func (c *NotifierConfig) hasActions() bool {
  24. if len(c.FsEvents) > 0 {
  25. return true
  26. }
  27. if len(c.UserEvents) > 0 {
  28. return true
  29. }
  30. return false
  31. }
  32. type eventsQueue struct {
  33. sync.RWMutex
  34. fsEvents []*proto.FsEvent
  35. userEvents []*proto.UserEvent
  36. }
  37. func (q *eventsQueue) addFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) {
  38. q.Lock()
  39. defer q.Unlock()
  40. q.fsEvents = append(q.fsEvents, &proto.FsEvent{
  41. Timestamp: timestamppb.New(timestamp),
  42. Action: action,
  43. Username: username,
  44. FsPath: fsPath,
  45. FsTargetPath: fsTargetPath,
  46. SshCmd: sshCmd,
  47. FileSize: fileSize,
  48. Protocol: protocol,
  49. Status: int32(status),
  50. })
  51. }
  52. func (q *eventsQueue) addUserEvent(timestamp time.Time, action string, userAsJSON []byte) {
  53. q.Lock()
  54. defer q.Unlock()
  55. q.userEvents = append(q.userEvents, &proto.UserEvent{
  56. Timestamp: timestamppb.New(timestamp),
  57. Action: action,
  58. User: userAsJSON,
  59. })
  60. }
  61. func (q *eventsQueue) popFsEvent() *proto.FsEvent {
  62. q.Lock()
  63. defer q.Unlock()
  64. if len(q.fsEvents) == 0 {
  65. return nil
  66. }
  67. truncLen := len(q.fsEvents) - 1
  68. ev := q.fsEvents[truncLen]
  69. q.fsEvents[truncLen] = nil
  70. q.fsEvents = q.fsEvents[:truncLen]
  71. return ev
  72. }
  73. func (q *eventsQueue) popUserEvent() *proto.UserEvent {
  74. q.Lock()
  75. defer q.Unlock()
  76. if len(q.userEvents) == 0 {
  77. return nil
  78. }
  79. truncLen := len(q.userEvents) - 1
  80. ev := q.userEvents[truncLen]
  81. q.userEvents[truncLen] = nil
  82. q.userEvents = q.userEvents[:truncLen]
  83. return ev
  84. }
  85. func (q *eventsQueue) getSize() int {
  86. q.RLock()
  87. defer q.RUnlock()
  88. return len(q.userEvents) + len(q.fsEvents)
  89. }
  90. type notifierPlugin struct {
  91. config Config
  92. notifier notifier.Notifier
  93. client *plugin.Client
  94. queue *eventsQueue
  95. }
  96. func newNotifierPlugin(config Config) (*notifierPlugin, error) {
  97. p := &notifierPlugin{
  98. config: config,
  99. queue: &eventsQueue{},
  100. }
  101. if err := p.initialize(); err != nil {
  102. logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
  103. return nil, err
  104. }
  105. return p, nil
  106. }
  107. func (p *notifierPlugin) exited() bool {
  108. return p.client.Exited()
  109. }
  110. func (p *notifierPlugin) cleanup() {
  111. p.client.Kill()
  112. }
  113. func (p *notifierPlugin) initialize() error {
  114. killProcess(p.config.Cmd)
  115. logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
  116. if !p.config.NotifierOptions.hasActions() {
  117. return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
  118. }
  119. var secureConfig *plugin.SecureConfig
  120. if p.config.SHA256Sum != "" {
  121. secureConfig.Checksum = []byte(p.config.SHA256Sum)
  122. secureConfig.Hash = sha256.New()
  123. }
  124. client := plugin.NewClient(&plugin.ClientConfig{
  125. HandshakeConfig: notifier.Handshake,
  126. Plugins: notifier.PluginMap,
  127. Cmd: exec.Command(p.config.Cmd, p.config.Args...),
  128. AllowedProtocols: []plugin.Protocol{
  129. plugin.ProtocolGRPC,
  130. },
  131. AutoMTLS: p.config.AutoMTLS,
  132. SecureConfig: secureConfig,
  133. Managed: false,
  134. Logger: &logger.HCLogAdapter{
  135. Logger: hclog.New(&hclog.LoggerOptions{
  136. Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
  137. Level: pluginsLogLevel,
  138. DisableTime: true,
  139. }),
  140. },
  141. })
  142. rpcClient, err := client.Client()
  143. if err != nil {
  144. logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
  145. return err
  146. }
  147. raw, err := rpcClient.Dispense(notifier.PluginName)
  148. if err != nil {
  149. logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
  150. notifier.PluginName, p.config.Cmd, err)
  151. return err
  152. }
  153. p.client = client
  154. p.notifier = raw.(notifier.Notifier)
  155. return nil
  156. }
  157. func (p *notifierPlugin) canQueueEvent(timestamp time.Time) bool {
  158. if p.config.NotifierOptions.RetryMaxTime == 0 {
  159. return false
  160. }
  161. if time.Now().After(timestamp.Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
  162. return false
  163. }
  164. if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
  165. return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
  166. }
  167. return true
  168. }
  169. func (p *notifierPlugin) notifyFsAction(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
  170. protocol string, fileSize int64, errAction error) {
  171. if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
  172. return
  173. }
  174. go func() {
  175. status := 1
  176. if errAction != nil {
  177. status = 0
  178. }
  179. p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status)
  180. }()
  181. }
  182. func (p *notifierPlugin) notifyUserAction(timestamp time.Time, action string, user Renderer) {
  183. if !util.IsStringInSlice(action, p.config.NotifierOptions.UserEvents) {
  184. return
  185. }
  186. go func() {
  187. userAsJSON, err := user.RenderAsJSON(action != "delete")
  188. if err != nil {
  189. logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
  190. return
  191. }
  192. p.sendUserEvent(timestamp, action, userAsJSON)
  193. }()
  194. }
  195. func (p *notifierPlugin) sendFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
  196. protocol string, fileSize int64, status int) {
  197. if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status); err != nil {
  198. logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
  199. if p.canQueueEvent(timestamp) {
  200. p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status)
  201. }
  202. }
  203. }
  204. func (p *notifierPlugin) sendUserEvent(timestamp time.Time, action string, userAsJSON []byte) {
  205. if err := p.notifier.NotifyUserEvent(timestamp, action, userAsJSON); err != nil {
  206. logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
  207. if p.canQueueEvent(timestamp) {
  208. p.queue.addUserEvent(timestamp, action, userAsJSON)
  209. }
  210. }
  211. }
  212. func (p *notifierPlugin) sendQueuedEvents() {
  213. queueSize := p.queue.getSize()
  214. if queueSize == 0 {
  215. return
  216. }
  217. logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
  218. fsEv := p.queue.popFsEvent()
  219. for fsEv != nil {
  220. go p.sendFsEvent(fsEv.Timestamp.AsTime(), fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
  221. fsEv.SshCmd, fsEv.Protocol, fsEv.FileSize, int(fsEv.Status))
  222. fsEv = p.queue.popFsEvent()
  223. }
  224. userEv := p.queue.popUserEvent()
  225. for userEv != nil {
  226. go p.sendUserEvent(userEv.Timestamp.AsTime(), userEv.Action, userEv.User)
  227. userEv = p.queue.popUserEvent()
  228. }
  229. logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
  230. }