From c900cde8e4635a0fe3ac12d5294b18f08bb4859a Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Tue, 20 Jul 2021 12:51:21 +0200 Subject: [PATCH] notifiers plugin: add settings to retry unhandled events --- common/actions.go | 4 +- config/config.go | 12 +++ config/config_test.go | 8 ++ dataprovider/dataprovider.go | 2 +- docs/custom-actions.md | 4 + docs/full-configuration.md | 2 + go.mod | 4 +- go.sum | 11 +-- sdk/plugin/notifier.go | 149 +++++++++++++++++++++++++++++--- sdk/plugin/notifier/grpc.go | 12 +-- sdk/plugin/notifier/notifier.go | 6 +- sdk/plugin/plugin.go | 31 +++---- 12 files changed, 202 insertions(+), 43 deletions(-) diff --git a/common/actions.go b/common/actions.go index 53bc813f..4546b317 100644 --- a/common/actions.go +++ b/common/actions.go @@ -52,7 +52,7 @@ func InitializeActionHandler(handler ActionHandler) { // ExecutePreAction executes a pre-* action and returns the result func ExecutePreAction(user *dataprovider.User, operation, filePath, virtualPath, protocol string, fileSize int64, openFlags int) error { - plugin.Handler.NotifyFsEvent(operation, user.Username, filePath, "", "", protocol, fileSize, nil) + plugin.Handler.NotifyFsEvent(time.Now(), operation, user.Username, filePath, "", "", protocol, fileSize, nil) if !util.IsStringInSlice(operation, Config.Actions.ExecuteOn) { // for pre-delete we execute the internal handling on error, so we must return errUnconfiguredAction. // Other pre action will deny the operation on error so if we have no configuration we must return @@ -68,7 +68,7 @@ func ExecutePreAction(user *dataprovider.User, operation, filePath, virtualPath, // ExecuteActionNotification executes the defined hook, if any, for the specified action func ExecuteActionNotification(user *dataprovider.User, operation, filePath, virtualPath, target, sshCmd, protocol string, fileSize int64, err error) { - plugin.Handler.NotifyFsEvent(operation, user.Username, filePath, target, sshCmd, protocol, fileSize, err) + plugin.Handler.NotifyFsEvent(time.Now(), operation, user.Username, filePath, target, sshCmd, protocol, fileSize, err) notification := newActionNotification(user, operation, filePath, virtualPath, target, sshCmd, protocol, fileSize, 0, err) if util.IsStringInSlice(operation, Config.Actions.ExecuteSync) { diff --git a/config/config.go b/config/config.go index 643eb617..39824214 100644 --- a/config/config.go +++ b/config/config.go @@ -598,6 +598,18 @@ func getPluginsFromEnv(idx int) { isSet = true } + notifierRetryMaxTime, ok := lookupIntFromEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__NOTIFIER_OPTIONS__RETRY_MAX_TIME", idx)) + if ok { + pluginConfig.NotifierOptions.RetryMaxTime = int(notifierRetryMaxTime) + isSet = true + } + + notifierRetryQueueMaxSize, ok := lookupIntFromEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE", idx)) + if ok { + pluginConfig.NotifierOptions.RetryQueueMaxSize = int(notifierRetryQueueMaxSize) + isSet = true + } + kmsScheme, ok := os.LookupEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__KMS_OPTIONS__SCHEME", idx)) if ok { pluginConfig.KMSOptions.Scheme = kmsScheme diff --git a/config/config_test.go b/config/config_test.go index 250b6378..71a30e40 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -297,6 +297,8 @@ func TestPluginsFromEnv(t *testing.T) { os.Setenv("SFTPGO_PLUGINS__0__TYPE", "notifier") os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__FS_EVENTS", "upload,download") os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__USER_EVENTS", "add,update") + os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_MAX_TIME", "2") + os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE", "1000") os.Setenv("SFTPGO_PLUGINS__0__CMD", "plugin_start_cmd") os.Setenv("SFTPGO_PLUGINS__0__ARGS", "arg1,arg2") os.Setenv("SFTPGO_PLUGINS__0__SHA256SUM", "0a71ded61fccd59c4f3695b51c1b3d180da8d2d77ea09ccee20dac242675c193") @@ -307,6 +309,8 @@ func TestPluginsFromEnv(t *testing.T) { os.Unsetenv("SFTPGO_PLUGINS__0__TYPE") os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__FS_EVENTS") os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__USER_EVENTS") + os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_MAX_TIME") + os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE") os.Unsetenv("SFTPGO_PLUGINS__0__CMD") os.Unsetenv("SFTPGO_PLUGINS__0__ARGS") os.Unsetenv("SFTPGO_PLUGINS__0__SHA256SUM") @@ -328,6 +332,8 @@ func TestPluginsFromEnv(t *testing.T) { require.Len(t, pluginConf.NotifierOptions.UserEvents, 2) require.Equal(t, "add", pluginConf.NotifierOptions.UserEvents[0]) require.Equal(t, "update", pluginConf.NotifierOptions.UserEvents[1]) + require.Equal(t, 2, pluginConf.NotifierOptions.RetryMaxTime) + require.Equal(t, 1000, pluginConf.NotifierOptions.RetryQueueMaxSize) require.Equal(t, "plugin_start_cmd", pluginConf.Cmd) require.Len(t, pluginConf.Args, 2) require.Equal(t, "arg1", pluginConf.Args[0]) @@ -361,6 +367,8 @@ func TestPluginsFromEnv(t *testing.T) { require.Len(t, pluginConf.NotifierOptions.UserEvents, 2) require.Equal(t, "add", pluginConf.NotifierOptions.UserEvents[0]) require.Equal(t, "update", pluginConf.NotifierOptions.UserEvents[1]) + require.Equal(t, 2, pluginConf.NotifierOptions.RetryMaxTime) + require.Equal(t, 1000, pluginConf.NotifierOptions.RetryQueueMaxSize) require.Equal(t, "plugin_start_cmd1", pluginConf.Cmd) require.Len(t, pluginConf.Args, 0) require.Equal(t, "0a71ded61fccd59c4f3695b51c1b3d180da8d2d77ea09ccee20dac242675c193", pluginConf.SHA256Sum) diff --git a/dataprovider/dataprovider.go b/dataprovider/dataprovider.go index 83daf9eb..91da042d 100644 --- a/dataprovider/dataprovider.go +++ b/dataprovider/dataprovider.go @@ -2407,7 +2407,7 @@ func executeNotificationCommand(operation string, commandArgs []string, userAsJS } func executeAction(operation string, user *User) { - plugin.Handler.NotifyUserEvent(operation, user) + plugin.Handler.NotifyUserEvent(time.Now(), operation, user) if !util.IsStringInSlice(operation, config.Actions.ExecuteOn) { return } diff --git a/docs/custom-actions.md b/docs/custom-actions.md index 272180d3..e064d58e 100644 --- a/docs/custom-actions.md +++ b/docs/custom-actions.md @@ -98,3 +98,7 @@ If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. Th The HTTP hook will use the global configuration for HTTP clients and will respect the retry configurations. The structure for SFTPGo users can be found within the [OpenAPI schema](../httpd/schema/openapi.yaml). + +## Pub/Sub services + +You can forward SFTPGo events to serveral publish/subscribe systems using the [sftpgo-plugin-pubsub](https://github.com/sftpgo/sftpgo-plugin-pubsub). The notifiers SFTPGo plugins are not suitable for interactive actions such as `pre-*` events. Their scope is to simply forward events to external services. A custom hook is a better choice if you need to react to `pre-*` events. diff --git a/docs/full-configuration.md b/docs/full-configuration.md index abd13dab..1c1eed75 100644 --- a/docs/full-configuration.md +++ b/docs/full-configuration.md @@ -244,6 +244,8 @@ The configuration file contains the following sections: - `notifier_options`, struct. Defines the options for notifier plugins. - `fs_events`, list of strings. Defines the filesystem events that will be notified to this plugin. - `user_events`, list of strings. Defines the user events that will be notified to this plugin. + - `retry_max_time`, integer. Defines the maximum number of seconds an event can be late. SFTPGo adds a timestamp to each event and add to an internal queue any events that a the plugin fails to handle (the plugin returns an error or it is not running). If a plugin fails to handle an event that is too late, based on this configuration, it will be discarded. SFTPGo will try to resend queued events every 30 seconds. 0 means no retry. + - `retry_queue_max_size`, integer. Defines the maximum number of events that the internal queue can hold. Once the queue is full, the events that cannot be sent to the plugin will be discarded. 0 means no limit. - `kms_options`, struct. Defines the options for kms plugins. - `scheme`, string. KMS scheme. Supported schemes are: `awskms`, `gcpkms`, `hashivault`, `azurekeyvault`. - `encrypted_status`, string. Encrypted status for a KMS secret. Supported statuses are: `AWS`, `GCP`, `VaultTransit`, `AzureKeyVault`. diff --git a/go.mod b/go.mod index f1551ab0..76643fd4 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 github.com/StackExchange/wmi v1.2.0 // indirect github.com/alexedwards/argon2id v0.0.0-20210511081203-7d35d68092b8 - github.com/aws/aws-sdk-go v1.40.2 + github.com/aws/aws-sdk-go v1.40.3 github.com/cockroachdb/cockroach-go/v2 v2.1.1 github.com/eikenb/pipeat v0.0.0-20210603033007-44fc3ffce52b github.com/fatih/color v1.12.0 // indirect @@ -62,7 +62,7 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 google.golang.org/api v0.50.0 - google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea // indirect + google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 // indirect google.golang.org/grpc v1.39.0 google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index dda73537..fa4b6431 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,8 @@ github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= -github.com/aws/aws-sdk-go v1.40.2 h1:iNaJUKjUeULTsuTGrGbAFG1H5AVSWgo5kwyUDmtJrwk= -github.com/aws/aws-sdk-go v1.40.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/aws/aws-sdk-go v1.40.3 h1:NzjcLRsb+C9L1dVPajdNbdzkuPBi0pQJWiQW0eYJGo8= +github.com/aws/aws-sdk-go v1.40.3/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -1019,8 +1019,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1126,8 +1127,8 @@ google.golang.org/genproto v0.0.0-20210608205507-b6d2f5bf0d7d/go.mod h1:UODoCrxH google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20210624174822-c5cf32407d0a/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= -google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea h1:8ZyCcgugUqamxp/vZSEJw9CMy7VZlSWYJLLJPi/dSDA= -google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k= +google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 h1:7yQQsvnwjfEahbNNEKcBHv3mR+HnB1ctGY/z1JXzx8M= +google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/sdk/plugin/notifier.go b/sdk/plugin/notifier.go index fcf3e242..7234a555 100644 --- a/sdk/plugin/notifier.go +++ b/sdk/plugin/notifier.go @@ -4,19 +4,25 @@ import ( "crypto/sha256" "fmt" "os/exec" + "sync" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/drakkan/sftpgo/v2/logger" "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier" + "github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto" "github.com/drakkan/sftpgo/v2/util" ) // NotifierConfig defines configuration parameters for notifiers plugins type NotifierConfig struct { - FsEvents []string `json:"fs_events" mapstructure:"fs_events"` - UserEvents []string `json:"user_events" mapstructure:"user_events"` + FsEvents []string `json:"fs_events" mapstructure:"fs_events"` + UserEvents []string `json:"user_events" mapstructure:"user_events"` + RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"` + RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"` } func (c *NotifierConfig) hasActions() bool { @@ -29,15 +35,88 @@ func (c *NotifierConfig) hasActions() bool { return false } +type eventsQueue struct { + sync.RWMutex + fsEvents []*proto.FsEvent + userEvents []*proto.UserEvent +} + +func (q *eventsQueue) addFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) { + q.Lock() + defer q.Unlock() + + q.fsEvents = append(q.fsEvents, &proto.FsEvent{ + Timestamp: timestamppb.New(timestamp), + Action: action, + Username: username, + FsPath: fsPath, + FsTargetPath: fsTargetPath, + SshCmd: sshCmd, + FileSize: fileSize, + Protocol: protocol, + Status: int32(status), + }) +} + +func (q *eventsQueue) addUserEvent(timestamp time.Time, action string, userAsJSON []byte) { + q.Lock() + defer q.Unlock() + + q.userEvents = append(q.userEvents, &proto.UserEvent{ + Timestamp: timestamppb.New(timestamp), + Action: action, + User: userAsJSON, + }) +} + +func (q *eventsQueue) popFsEvent() *proto.FsEvent { + q.Lock() + defer q.Unlock() + + if len(q.fsEvents) == 0 { + return nil + } + truncLen := len(q.fsEvents) - 1 + ev := q.fsEvents[truncLen] + q.fsEvents[truncLen] = nil + q.fsEvents = q.fsEvents[:truncLen] + + return ev +} + +func (q *eventsQueue) popUserEvent() *proto.UserEvent { + q.Lock() + defer q.Unlock() + + if len(q.userEvents) == 0 { + return nil + } + truncLen := len(q.userEvents) - 1 + ev := q.userEvents[truncLen] + q.userEvents[truncLen] = nil + q.userEvents = q.userEvents[:truncLen] + + return ev +} + +func (q *eventsQueue) getSize() int { + q.RLock() + defer q.RUnlock() + + return len(q.userEvents) + len(q.fsEvents) +} + type notifierPlugin struct { config Config notifier notifier.Notifier client *plugin.Client + queue *eventsQueue } func newNotifierPlugin(config Config) (*notifierPlugin, error) { p := ¬ifierPlugin{ config: config, + queue: &eventsQueue{}, } if err := p.initialize(); err != nil { logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config) @@ -101,7 +180,21 @@ func (p *notifierPlugin) initialize() error { return nil } -func (p *notifierPlugin) notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, errAction error) { +func (p *notifierPlugin) canQueueEvent(timestamp time.Time) bool { + if p.config.NotifierOptions.RetryMaxTime == 0 { + return false + } + if time.Now().After(timestamp.Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) { + return false + } + if p.config.NotifierOptions.RetryQueueMaxSize > 0 { + return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize + } + return true +} + +func (p *notifierPlugin) notifyFsAction(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, + protocol string, fileSize int64, errAction error) { if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) { return } @@ -111,13 +204,11 @@ func (p *notifierPlugin) notifyFsAction(action, username, fsPath, fsTargetPath, if errAction != nil { status = 0 } - if err := p.notifier.NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status); err != nil { - logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) - } + p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status) }() } -func (p *notifierPlugin) notifyUserAction(action string, user Renderer) { +func (p *notifierPlugin) notifyUserAction(timestamp time.Time, action string, user Renderer) { if !util.IsStringInSlice(action, p.config.NotifierOptions.UserEvents) { return } @@ -128,8 +219,46 @@ func (p *notifierPlugin) notifyUserAction(action string, user Renderer) { logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err) return } - if err := p.notifier.NotifyUserEvent(action, userAsJSON); err != nil { - logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) - } + p.sendUserEvent(timestamp, action, userAsJSON) }() } + +func (p *notifierPlugin) sendFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, + protocol string, fileSize int64, status int) { + if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status); err != nil { + logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) + if p.canQueueEvent(timestamp) { + p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status) + } + } +} + +func (p *notifierPlugin) sendUserEvent(timestamp time.Time, action string, userAsJSON []byte) { + if err := p.notifier.NotifyUserEvent(timestamp, action, userAsJSON); err != nil { + logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) + if p.canQueueEvent(timestamp) { + p.queue.addUserEvent(timestamp, action, userAsJSON) + } + } +} + +func (p *notifierPlugin) sendQueuedEvents() { + queueSize := p.queue.getSize() + if queueSize == 0 { + return + } + logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize) + fsEv := p.queue.popFsEvent() + for fsEv != nil { + go p.sendFsEvent(fsEv.Timestamp.AsTime(), fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath, + fsEv.SshCmd, fsEv.Protocol, fsEv.FileSize, int(fsEv.Status)) + fsEv = p.queue.popFsEvent() + } + + userEv := p.queue.popUserEvent() + for userEv != nil { + go p.sendUserEvent(userEv.Timestamp.AsTime(), userEv.Action, userEv.User) + userEv = p.queue.popUserEvent() + } + logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize()) +} diff --git a/sdk/plugin/notifier/grpc.go b/sdk/plugin/notifier/grpc.go index 739d708c..1d8865bc 100644 --- a/sdk/plugin/notifier/grpc.go +++ b/sdk/plugin/notifier/grpc.go @@ -20,12 +20,12 @@ type GRPCClient struct { } // NotifyFsEvent implements the Notifier interface -func (c *GRPCClient) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error { +func (c *GRPCClient) NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error { ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) defer cancel() _, err := c.client.SendFsEvent(ctx, &proto.FsEvent{ - Timestamp: timestamppb.New(time.Now()), + Timestamp: timestamppb.New(timestamp), Action: action, Username: username, FsPath: fsPath, @@ -40,12 +40,12 @@ func (c *GRPCClient) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCm } // NotifyUserEvent implements the Notifier interface -func (c *GRPCClient) NotifyUserEvent(action string, user []byte) error { +func (c *GRPCClient) NotifyUserEvent(timestamp time.Time, action string, user []byte) error { ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) defer cancel() _, err := c.client.SendUserEvent(ctx, &proto.UserEvent{ - Timestamp: timestamppb.New(time.Now()), + Timestamp: timestamppb.New(timestamp), Action: action, User: user, }) @@ -60,13 +60,13 @@ type GRPCServer struct { // SendFsEvent implements the serve side fs notify method func (s *GRPCServer) SendFsEvent(ctx context.Context, req *proto.FsEvent) (*emptypb.Empty, error) { - err := s.Impl.NotifyFsEvent(req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd, + err := s.Impl.NotifyFsEvent(req.Timestamp.AsTime(), req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd, req.Protocol, req.FileSize, int(req.Status)) return &emptypb.Empty{}, err } // SendUserEvent implements the serve side user notify method func (s *GRPCServer) SendUserEvent(ctx context.Context, req *proto.UserEvent) (*emptypb.Empty, error) { - err := s.Impl.NotifyUserEvent(req.Action, req.User) + err := s.Impl.NotifyUserEvent(req.Timestamp.AsTime(), req.Action, req.User) return &emptypb.Empty{}, err } diff --git a/sdk/plugin/notifier/notifier.go b/sdk/plugin/notifier/notifier.go index f94b401c..2b753763 100644 --- a/sdk/plugin/notifier/notifier.go +++ b/sdk/plugin/notifier/notifier.go @@ -5,6 +5,7 @@ package notifier import ( "context" + "time" "github.com/hashicorp/go-plugin" "google.golang.org/grpc" @@ -31,8 +32,9 @@ var PluginMap = map[string]plugin.Plugin{ // Notifier defines the interface for notifiers plugins type Notifier interface { - NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error - NotifyUserEvent(action string, user []byte) error + NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, + fileSize int64, status int) error + NotifyUserEvent(timestamp time.Time, action string, user []byte) error } // Plugin defines the implementation to serve/connect to a notifier plugin diff --git a/sdk/plugin/plugin.go b/sdk/plugin/plugin.go index 5fd0212f..f01d4a8b 100644 --- a/sdk/plugin/plugin.go +++ b/sdk/plugin/plugin.go @@ -145,30 +145,23 @@ func (m *Manager) validateConfigs() error { } // NotifyFsEvent sends the fs event notifications using any defined notifier plugins -func (m *Manager) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, err error) { +func (m *Manager) NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, + fileSize int64, err error) { m.notifLock.RLock() defer m.notifLock.RUnlock() for _, n := range m.notifiers { - if n.exited() { - logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send fs event", n.config.Cmd) - continue - } - n.notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err) + n.notifyFsAction(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err) } } // NotifyUserEvent sends the user event notifications using any defined notifier plugins -func (m *Manager) NotifyUserEvent(action string, user Renderer) { +func (m *Manager) NotifyUserEvent(timestamp time.Time, action string, user Renderer) { m.notifLock.RLock() defer m.notifLock.RUnlock() for _, n := range m.notifiers { - if n.exited() { - logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send user event", n.config.Cmd) - continue - } - n.notifyUserAction(action, user) + n.notifyUserAction(timestamp, action, user) } } @@ -192,7 +185,11 @@ func (m *Manager) checkCrashedPlugins() { m.notifLock.RLock() for idx, n := range m.notifiers { if n.exited() { - defer Handler.restartNotifierPlugin(n.config, idx) + defer func(cfg Config, index int) { + Handler.restartNotifierPlugin(cfg, index) + }(n.config, idx) + } else { + n.sendQueuedEvents() } } m.notifLock.RUnlock() @@ -200,7 +197,9 @@ func (m *Manager) checkCrashedPlugins() { m.kmsLock.RLock() for idx, k := range m.kms { if k.exited() { - defer Handler.restartKMSPlugin(k.config, idx) + defer func(cfg Config, index int) { + Handler.restartKMSPlugin(cfg, index) + }(k.config, idx) } } m.kmsLock.RUnlock() @@ -210,7 +209,7 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) { if atomic.LoadInt32(&m.closed) == 1 { return } - logger.Info(logSender, "", "try to restart notifier crashed plugin %#v, idx: %v", config.Cmd, idx) + logger.Info(logSender, "", "try to restart crashed notifier plugin %#v, idx: %v", config.Cmd, idx) plugin, err := newNotifierPlugin(config) if err != nil { logger.Warn(logSender, "", "unable to restart notifier plugin %#v, err: %v", config.Cmd, err) @@ -218,8 +217,10 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) { } m.notifLock.Lock() + plugin.queue = m.notifiers[idx].queue m.notifiers[idx] = plugin m.notifLock.Unlock() + plugin.sendQueuedEvents() } func (m *Manager) restartKMSPlugin(config Config, idx int) {