Ver código fonte

notifiers plugin: add settings to retry unhandled events

Nicola Murino 4 anos atrás
pai
commit
c900cde8e4

+ 2 - 2
common/actions.go

@@ -52,7 +52,7 @@ func InitializeActionHandler(handler ActionHandler) {
 
 // ExecutePreAction executes a pre-* action and returns the result
 func ExecutePreAction(user *dataprovider.User, operation, filePath, virtualPath, protocol string, fileSize int64, openFlags int) error {
-	plugin.Handler.NotifyFsEvent(operation, user.Username, filePath, "", "", protocol, fileSize, nil)
+	plugin.Handler.NotifyFsEvent(time.Now(), operation, user.Username, filePath, "", "", protocol, fileSize, nil)
 	if !util.IsStringInSlice(operation, Config.Actions.ExecuteOn) {
 		// for pre-delete we execute the internal handling on error, so we must return errUnconfiguredAction.
 		// Other pre action will deny the operation on error so if we have no configuration we must return
@@ -68,7 +68,7 @@ func ExecutePreAction(user *dataprovider.User, operation, filePath, virtualPath,
 
 // ExecuteActionNotification executes the defined hook, if any, for the specified action
 func ExecuteActionNotification(user *dataprovider.User, operation, filePath, virtualPath, target, sshCmd, protocol string, fileSize int64, err error) {
-	plugin.Handler.NotifyFsEvent(operation, user.Username, filePath, target, sshCmd, protocol, fileSize, err)
+	plugin.Handler.NotifyFsEvent(time.Now(), operation, user.Username, filePath, target, sshCmd, protocol, fileSize, err)
 	notification := newActionNotification(user, operation, filePath, virtualPath, target, sshCmd, protocol, fileSize, 0, err)
 
 	if util.IsStringInSlice(operation, Config.Actions.ExecuteSync) {

+ 12 - 0
config/config.go

@@ -598,6 +598,18 @@ func getPluginsFromEnv(idx int) {
 		isSet = true
 	}
 
+	notifierRetryMaxTime, ok := lookupIntFromEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__NOTIFIER_OPTIONS__RETRY_MAX_TIME", idx))
+	if ok {
+		pluginConfig.NotifierOptions.RetryMaxTime = int(notifierRetryMaxTime)
+		isSet = true
+	}
+
+	notifierRetryQueueMaxSize, ok := lookupIntFromEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE", idx))
+	if ok {
+		pluginConfig.NotifierOptions.RetryQueueMaxSize = int(notifierRetryQueueMaxSize)
+		isSet = true
+	}
+
 	kmsScheme, ok := os.LookupEnv(fmt.Sprintf("SFTPGO_PLUGINS__%v__KMS_OPTIONS__SCHEME", idx))
 	if ok {
 		pluginConfig.KMSOptions.Scheme = kmsScheme

+ 8 - 0
config/config_test.go

@@ -297,6 +297,8 @@ func TestPluginsFromEnv(t *testing.T) {
 	os.Setenv("SFTPGO_PLUGINS__0__TYPE", "notifier")
 	os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__FS_EVENTS", "upload,download")
 	os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__USER_EVENTS", "add,update")
+	os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_MAX_TIME", "2")
+	os.Setenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE", "1000")
 	os.Setenv("SFTPGO_PLUGINS__0__CMD", "plugin_start_cmd")
 	os.Setenv("SFTPGO_PLUGINS__0__ARGS", "arg1,arg2")
 	os.Setenv("SFTPGO_PLUGINS__0__SHA256SUM", "0a71ded61fccd59c4f3695b51c1b3d180da8d2d77ea09ccee20dac242675c193")
@@ -307,6 +309,8 @@ func TestPluginsFromEnv(t *testing.T) {
 		os.Unsetenv("SFTPGO_PLUGINS__0__TYPE")
 		os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__FS_EVENTS")
 		os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__USER_EVENTS")
+		os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_MAX_TIME")
+		os.Unsetenv("SFTPGO_PLUGINS__0__NOTIFIER_OPTIONS__RETRY_QUEUE_MAX_SIZE")
 		os.Unsetenv("SFTPGO_PLUGINS__0__CMD")
 		os.Unsetenv("SFTPGO_PLUGINS__0__ARGS")
 		os.Unsetenv("SFTPGO_PLUGINS__0__SHA256SUM")
@@ -328,6 +332,8 @@ func TestPluginsFromEnv(t *testing.T) {
 	require.Len(t, pluginConf.NotifierOptions.UserEvents, 2)
 	require.Equal(t, "add", pluginConf.NotifierOptions.UserEvents[0])
 	require.Equal(t, "update", pluginConf.NotifierOptions.UserEvents[1])
+	require.Equal(t, 2, pluginConf.NotifierOptions.RetryMaxTime)
+	require.Equal(t, 1000, pluginConf.NotifierOptions.RetryQueueMaxSize)
 	require.Equal(t, "plugin_start_cmd", pluginConf.Cmd)
 	require.Len(t, pluginConf.Args, 2)
 	require.Equal(t, "arg1", pluginConf.Args[0])
@@ -361,6 +367,8 @@ func TestPluginsFromEnv(t *testing.T) {
 	require.Len(t, pluginConf.NotifierOptions.UserEvents, 2)
 	require.Equal(t, "add", pluginConf.NotifierOptions.UserEvents[0])
 	require.Equal(t, "update", pluginConf.NotifierOptions.UserEvents[1])
+	require.Equal(t, 2, pluginConf.NotifierOptions.RetryMaxTime)
+	require.Equal(t, 1000, pluginConf.NotifierOptions.RetryQueueMaxSize)
 	require.Equal(t, "plugin_start_cmd1", pluginConf.Cmd)
 	require.Len(t, pluginConf.Args, 0)
 	require.Equal(t, "0a71ded61fccd59c4f3695b51c1b3d180da8d2d77ea09ccee20dac242675c193", pluginConf.SHA256Sum)

+ 1 - 1
dataprovider/dataprovider.go

@@ -2407,7 +2407,7 @@ func executeNotificationCommand(operation string, commandArgs []string, userAsJS
 }
 
 func executeAction(operation string, user *User) {
-	plugin.Handler.NotifyUserEvent(operation, user)
+	plugin.Handler.NotifyUserEvent(time.Now(), operation, user)
 	if !util.IsStringInSlice(operation, config.Actions.ExecuteOn) {
 		return
 	}

+ 4 - 0
docs/custom-actions.md

@@ -98,3 +98,7 @@ If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. Th
 The HTTP hook will use the global configuration for HTTP clients and will respect the retry configurations.
 
 The structure for SFTPGo users can be found within the [OpenAPI schema](../httpd/schema/openapi.yaml).
+
+## Pub/Sub services
+
+You can forward SFTPGo events to serveral publish/subscribe systems using the [sftpgo-plugin-pubsub](https://github.com/sftpgo/sftpgo-plugin-pubsub). The notifiers SFTPGo plugins are not suitable for interactive actions such as `pre-*` events. Their scope is to simply forward events to external services. A custom hook is a better choice if you need to react to `pre-*` events.

+ 2 - 0
docs/full-configuration.md

@@ -244,6 +244,8 @@ The configuration file contains the following sections:
   - `notifier_options`, struct. Defines the options for notifier plugins.
     - `fs_events`, list of strings. Defines the filesystem events that will be notified to this plugin.
     - `user_events`, list of strings. Defines the user events that will be notified to this plugin.
+    - `retry_max_time`, integer. Defines the maximum number of seconds an event can be late. SFTPGo adds a timestamp to each event and add to an internal queue any events that a the plugin fails to handle (the plugin returns an error or it is not running). If a plugin fails to handle an event that is too late, based on this configuration, it will be discarded. SFTPGo will try to resend queued events every 30 seconds. 0 means no retry.
+    - `retry_queue_max_size`, integer. Defines the maximum number of events that the internal queue can hold. Once the queue is full, the events that cannot be sent to the plugin will be discarded. 0 means no limit.
   - `kms_options`, struct. Defines the options for kms plugins.
     - `scheme`, string. KMS scheme. Supported schemes are: `awskms`, `gcpkms`, `hashivault`, `azurekeyvault`.
     - `encrypted_status`, string. Encrypted status for a KMS secret. Supported statuses are: `AWS`, `GCP`, `VaultTransit`, `AzureKeyVault`.

+ 2 - 2
go.mod

@@ -8,7 +8,7 @@ require (
 	github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962
 	github.com/StackExchange/wmi v1.2.0 // indirect
 	github.com/alexedwards/argon2id v0.0.0-20210511081203-7d35d68092b8
-	github.com/aws/aws-sdk-go v1.40.2
+	github.com/aws/aws-sdk-go v1.40.3
 	github.com/cockroachdb/cockroach-go/v2 v2.1.1
 	github.com/eikenb/pipeat v0.0.0-20210603033007-44fc3ffce52b
 	github.com/fatih/color v1.12.0 // indirect
@@ -62,7 +62,7 @@ require (
 	golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
 	golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
 	google.golang.org/api v0.50.0
-	google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea // indirect
+	google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 // indirect
 	google.golang.org/grpc v1.39.0
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0

+ 6 - 5
go.sum

@@ -121,8 +121,8 @@ github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo
 github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
-github.com/aws/aws-sdk-go v1.40.2 h1:iNaJUKjUeULTsuTGrGbAFG1H5AVSWgo5kwyUDmtJrwk=
-github.com/aws/aws-sdk-go v1.40.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
+github.com/aws/aws-sdk-go v1.40.3 h1:NzjcLRsb+C9L1dVPajdNbdzkuPBi0pQJWiQW0eYJGo8=
+github.com/aws/aws-sdk-go v1.40.3/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
 github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -1019,8 +1019,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
 golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
-golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
 golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -1126,8 +1127,8 @@ google.golang.org/genproto v0.0.0-20210608205507-b6d2f5bf0d7d/go.mod h1:UODoCrxH
 google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
 google.golang.org/genproto v0.0.0-20210624174822-c5cf32407d0a/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
 google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
-google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea h1:8ZyCcgugUqamxp/vZSEJw9CMy7VZlSWYJLLJPi/dSDA=
-google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k=
+google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 h1:7yQQsvnwjfEahbNNEKcBHv3mR+HnB1ctGY/z1JXzx8M=
+google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
 google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
 google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=

+ 139 - 10
sdk/plugin/notifier.go

@@ -4,19 +4,25 @@ import (
 	"crypto/sha256"
 	"fmt"
 	"os/exec"
+	"sync"
+	"time"
 
 	"github.com/hashicorp/go-hclog"
 	"github.com/hashicorp/go-plugin"
+	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/drakkan/sftpgo/v2/logger"
 	"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
+	"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
 	"github.com/drakkan/sftpgo/v2/util"
 )
 
 // NotifierConfig defines configuration parameters for notifiers plugins
 type NotifierConfig struct {
-	FsEvents   []string `json:"fs_events" mapstructure:"fs_events"`
-	UserEvents []string `json:"user_events" mapstructure:"user_events"`
+	FsEvents          []string `json:"fs_events" mapstructure:"fs_events"`
+	UserEvents        []string `json:"user_events" mapstructure:"user_events"`
+	RetryMaxTime      int      `json:"retry_max_time" mapstructure:"retry_max_time"`
+	RetryQueueMaxSize int      `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
 }
 
 func (c *NotifierConfig) hasActions() bool {
@@ -29,15 +35,88 @@ func (c *NotifierConfig) hasActions() bool {
 	return false
 }
 
+type eventsQueue struct {
+	sync.RWMutex
+	fsEvents   []*proto.FsEvent
+	userEvents []*proto.UserEvent
+}
+
+func (q *eventsQueue) addFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) {
+	q.Lock()
+	defer q.Unlock()
+
+	q.fsEvents = append(q.fsEvents, &proto.FsEvent{
+		Timestamp:    timestamppb.New(timestamp),
+		Action:       action,
+		Username:     username,
+		FsPath:       fsPath,
+		FsTargetPath: fsTargetPath,
+		SshCmd:       sshCmd,
+		FileSize:     fileSize,
+		Protocol:     protocol,
+		Status:       int32(status),
+	})
+}
+
+func (q *eventsQueue) addUserEvent(timestamp time.Time, action string, userAsJSON []byte) {
+	q.Lock()
+	defer q.Unlock()
+
+	q.userEvents = append(q.userEvents, &proto.UserEvent{
+		Timestamp: timestamppb.New(timestamp),
+		Action:    action,
+		User:      userAsJSON,
+	})
+}
+
+func (q *eventsQueue) popFsEvent() *proto.FsEvent {
+	q.Lock()
+	defer q.Unlock()
+
+	if len(q.fsEvents) == 0 {
+		return nil
+	}
+	truncLen := len(q.fsEvents) - 1
+	ev := q.fsEvents[truncLen]
+	q.fsEvents[truncLen] = nil
+	q.fsEvents = q.fsEvents[:truncLen]
+
+	return ev
+}
+
+func (q *eventsQueue) popUserEvent() *proto.UserEvent {
+	q.Lock()
+	defer q.Unlock()
+
+	if len(q.userEvents) == 0 {
+		return nil
+	}
+	truncLen := len(q.userEvents) - 1
+	ev := q.userEvents[truncLen]
+	q.userEvents[truncLen] = nil
+	q.userEvents = q.userEvents[:truncLen]
+
+	return ev
+}
+
+func (q *eventsQueue) getSize() int {
+	q.RLock()
+	defer q.RUnlock()
+
+	return len(q.userEvents) + len(q.fsEvents)
+}
+
 type notifierPlugin struct {
 	config   Config
 	notifier notifier.Notifier
 	client   *plugin.Client
+	queue    *eventsQueue
 }
 
 func newNotifierPlugin(config Config) (*notifierPlugin, error) {
 	p := &notifierPlugin{
 		config: config,
+		queue:  &eventsQueue{},
 	}
 	if err := p.initialize(); err != nil {
 		logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
@@ -101,7 +180,21 @@ func (p *notifierPlugin) initialize() error {
 	return nil
 }
 
-func (p *notifierPlugin) notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, errAction error) {
+func (p *notifierPlugin) canQueueEvent(timestamp time.Time) bool {
+	if p.config.NotifierOptions.RetryMaxTime == 0 {
+		return false
+	}
+	if time.Now().After(timestamp.Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
+		return false
+	}
+	if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
+		return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
+	}
+	return true
+}
+
+func (p *notifierPlugin) notifyFsAction(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
+	protocol string, fileSize int64, errAction error) {
 	if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
 		return
 	}
@@ -111,13 +204,11 @@ func (p *notifierPlugin) notifyFsAction(action, username, fsPath, fsTargetPath,
 		if errAction != nil {
 			status = 0
 		}
-		if err := p.notifier.NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status); err != nil {
-			logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
-		}
+		p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status)
 	}()
 }
 
-func (p *notifierPlugin) notifyUserAction(action string, user Renderer) {
+func (p *notifierPlugin) notifyUserAction(timestamp time.Time, action string, user Renderer) {
 	if !util.IsStringInSlice(action, p.config.NotifierOptions.UserEvents) {
 		return
 	}
@@ -128,8 +219,46 @@ func (p *notifierPlugin) notifyUserAction(action string, user Renderer) {
 			logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
 			return
 		}
-		if err := p.notifier.NotifyUserEvent(action, userAsJSON); err != nil {
-			logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
-		}
+		p.sendUserEvent(timestamp, action, userAsJSON)
 	}()
 }
+
+func (p *notifierPlugin) sendFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
+	protocol string, fileSize int64, status int) {
+	if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status); err != nil {
+		logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
+		if p.canQueueEvent(timestamp) {
+			p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, status)
+		}
+	}
+}
+
+func (p *notifierPlugin) sendUserEvent(timestamp time.Time, action string, userAsJSON []byte) {
+	if err := p.notifier.NotifyUserEvent(timestamp, action, userAsJSON); err != nil {
+		logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
+		if p.canQueueEvent(timestamp) {
+			p.queue.addUserEvent(timestamp, action, userAsJSON)
+		}
+	}
+}
+
+func (p *notifierPlugin) sendQueuedEvents() {
+	queueSize := p.queue.getSize()
+	if queueSize == 0 {
+		return
+	}
+	logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
+	fsEv := p.queue.popFsEvent()
+	for fsEv != nil {
+		go p.sendFsEvent(fsEv.Timestamp.AsTime(), fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
+			fsEv.SshCmd, fsEv.Protocol, fsEv.FileSize, int(fsEv.Status))
+		fsEv = p.queue.popFsEvent()
+	}
+
+	userEv := p.queue.popUserEvent()
+	for userEv != nil {
+		go p.sendUserEvent(userEv.Timestamp.AsTime(), userEv.Action, userEv.User)
+		userEv = p.queue.popUserEvent()
+	}
+	logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
+}

+ 6 - 6
sdk/plugin/notifier/grpc.go

@@ -20,12 +20,12 @@ type GRPCClient struct {
 }
 
 // NotifyFsEvent implements the Notifier interface
-func (c *GRPCClient) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error {
+func (c *GRPCClient) NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error {
 	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
 	defer cancel()
 
 	_, err := c.client.SendFsEvent(ctx, &proto.FsEvent{
-		Timestamp:    timestamppb.New(time.Now()),
+		Timestamp:    timestamppb.New(timestamp),
 		Action:       action,
 		Username:     username,
 		FsPath:       fsPath,
@@ -40,12 +40,12 @@ func (c *GRPCClient) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCm
 }
 
 // NotifyUserEvent implements the Notifier interface
-func (c *GRPCClient) NotifyUserEvent(action string, user []byte) error {
+func (c *GRPCClient) NotifyUserEvent(timestamp time.Time, action string, user []byte) error {
 	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
 	defer cancel()
 
 	_, err := c.client.SendUserEvent(ctx, &proto.UserEvent{
-		Timestamp: timestamppb.New(time.Now()),
+		Timestamp: timestamppb.New(timestamp),
 		Action:    action,
 		User:      user,
 	})
@@ -60,13 +60,13 @@ type GRPCServer struct {
 
 // SendFsEvent implements the serve side fs notify method
 func (s *GRPCServer) SendFsEvent(ctx context.Context, req *proto.FsEvent) (*emptypb.Empty, error) {
-	err := s.Impl.NotifyFsEvent(req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd,
+	err := s.Impl.NotifyFsEvent(req.Timestamp.AsTime(), req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd,
 		req.Protocol, req.FileSize, int(req.Status))
 	return &emptypb.Empty{}, err
 }
 
 // SendUserEvent implements the serve side user notify method
 func (s *GRPCServer) SendUserEvent(ctx context.Context, req *proto.UserEvent) (*emptypb.Empty, error) {
-	err := s.Impl.NotifyUserEvent(req.Action, req.User)
+	err := s.Impl.NotifyUserEvent(req.Timestamp.AsTime(), req.Action, req.User)
 	return &emptypb.Empty{}, err
 }

+ 4 - 2
sdk/plugin/notifier/notifier.go

@@ -5,6 +5,7 @@ package notifier
 
 import (
 	"context"
+	"time"
 
 	"github.com/hashicorp/go-plugin"
 	"google.golang.org/grpc"
@@ -31,8 +32,9 @@ var PluginMap = map[string]plugin.Plugin{
 
 // Notifier defines the interface for notifiers plugins
 type Notifier interface {
-	NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, status int) error
-	NotifyUserEvent(action string, user []byte) error
+	NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string,
+		fileSize int64, status int) error
+	NotifyUserEvent(timestamp time.Time, action string, user []byte) error
 }
 
 // Plugin defines the implementation to serve/connect to a notifier plugin

+ 16 - 15
sdk/plugin/plugin.go

@@ -145,30 +145,23 @@ func (m *Manager) validateConfigs() error {
 }
 
 // NotifyFsEvent sends the fs event notifications using any defined notifier plugins
-func (m *Manager) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, err error) {
+func (m *Manager) NotifyFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol string,
+	fileSize int64, err error) {
 	m.notifLock.RLock()
 	defer m.notifLock.RUnlock()
 
 	for _, n := range m.notifiers {
-		if n.exited() {
-			logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send fs event", n.config.Cmd)
-			continue
-		}
-		n.notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err)
+		n.notifyFsAction(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err)
 	}
 }
 
 // NotifyUserEvent sends the user event notifications using any defined notifier plugins
-func (m *Manager) NotifyUserEvent(action string, user Renderer) {
+func (m *Manager) NotifyUserEvent(timestamp time.Time, action string, user Renderer) {
 	m.notifLock.RLock()
 	defer m.notifLock.RUnlock()
 
 	for _, n := range m.notifiers {
-		if n.exited() {
-			logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send user event", n.config.Cmd)
-			continue
-		}
-		n.notifyUserAction(action, user)
+		n.notifyUserAction(timestamp, action, user)
 	}
 }
 
@@ -192,7 +185,11 @@ func (m *Manager) checkCrashedPlugins() {
 	m.notifLock.RLock()
 	for idx, n := range m.notifiers {
 		if n.exited() {
-			defer Handler.restartNotifierPlugin(n.config, idx)
+			defer func(cfg Config, index int) {
+				Handler.restartNotifierPlugin(cfg, index)
+			}(n.config, idx)
+		} else {
+			n.sendQueuedEvents()
 		}
 	}
 	m.notifLock.RUnlock()
@@ -200,7 +197,9 @@ func (m *Manager) checkCrashedPlugins() {
 	m.kmsLock.RLock()
 	for idx, k := range m.kms {
 		if k.exited() {
-			defer Handler.restartKMSPlugin(k.config, idx)
+			defer func(cfg Config, index int) {
+				Handler.restartKMSPlugin(cfg, index)
+			}(k.config, idx)
 		}
 	}
 	m.kmsLock.RUnlock()
@@ -210,7 +209,7 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) {
 	if atomic.LoadInt32(&m.closed) == 1 {
 		return
 	}
-	logger.Info(logSender, "", "try to restart notifier crashed plugin %#v, idx: %v", config.Cmd, idx)
+	logger.Info(logSender, "", "try to restart crashed notifier plugin %#v, idx: %v", config.Cmd, idx)
 	plugin, err := newNotifierPlugin(config)
 	if err != nil {
 		logger.Warn(logSender, "", "unable to restart notifier plugin %#v, err: %v", config.Cmd, err)
@@ -218,8 +217,10 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) {
 	}
 
 	m.notifLock.Lock()
+	plugin.queue = m.notifiers[idx].queue
 	m.notifiers[idx] = plugin
 	m.notifLock.Unlock()
+	plugin.sendQueuedEvents()
 }
 
 func (m *Manager) restartKMSPlugin(config Config, idx int) {