notifiers plugin: replace params with a struct

Fixes #658

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2022-01-02 15:16:35 +01:00
parent 6c6a6e3d16
commit 85c2d474d9
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB
13 changed files with 249 additions and 187 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk"
"github.com/drakkan/sftpgo/v2/sdk/plugin"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
"github.com/drakkan/sftpgo/v2/util"
)
@ -50,12 +51,7 @@ func InitializeActionHandler(handler ActionHandler) {
actionHandler = handler
}
// ExecutePreAction executes a pre-* action and returns the result
func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath string, fileSize int64, openFlags int) error {
remoteIP := conn.GetRemoteIP()
plugin.Handler.NotifyFsEvent(time.Now().UnixNano(), operation, conn.User.Username, filePath, "", "", conn.protocol,
remoteIP, virtualPath, "", conn.ID, fileSize, nil)
if !util.IsStringInSlice(operation, Config.Actions.ExecuteOn) {
func handleUnconfiguredPreAction(operation string) error {
// for pre-delete we execute the internal handling on error, so we must return errUnconfiguredAction.
// Other pre action will deny the operation on error so if we have no configuration we must return
// a nil error
@ -63,54 +59,55 @@ func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath str
return errUnconfiguredAction
}
return nil
}
// ExecutePreAction executes a pre-* action and returns the result
func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath string, fileSize int64, openFlags int) error {
var event *notifier.FsEvent
hasNotifiersPlugin := plugin.Handler.HasNotifiers()
hasHook := util.IsStringInSlice(operation, Config.Actions.ExecuteOn)
if !hasHook && !hasNotifiersPlugin {
return handleUnconfiguredPreAction(operation)
}
notification := newActionNotification(&conn.User, operation, filePath, virtualPath, "", "", "",
conn.protocol, remoteIP, conn.ID, fileSize, openFlags, nil)
return actionHandler.Handle(notification)
event = newActionNotification(&conn.User, operation, filePath, virtualPath, "", "", "",
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, openFlags, nil)
if hasNotifiersPlugin {
plugin.Handler.NotifyFsEvent(event)
}
if !hasHook {
return handleUnconfiguredPreAction(operation)
}
return actionHandler.Handle(event)
}
// ExecuteActionNotification executes the defined hook, if any, for the specified action
func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtualPath, target, virtualTarget, sshCmd string,
fileSize int64, err error,
) {
remoteIP := conn.GetRemoteIP()
plugin.Handler.NotifyFsEvent(time.Now().UnixNano(), operation, conn.User.Username, filePath, target, sshCmd, conn.protocol,
remoteIP, virtualPath, virtualTarget, conn.ID, fileSize, err)
hasNotifiersPlugin := plugin.Handler.HasNotifiers()
hasHook := util.IsStringInSlice(operation, Config.Actions.ExecuteOn)
if !hasHook && !hasNotifiersPlugin {
return
}
notification := newActionNotification(&conn.User, operation, filePath, virtualPath, target, virtualTarget, sshCmd,
conn.protocol, remoteIP, conn.ID, fileSize, 0, err)
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, 0, err)
if hasNotifiersPlugin {
plugin.Handler.NotifyFsEvent(notification)
}
if hasHook {
if util.IsStringInSlice(operation, Config.Actions.ExecuteSync) {
actionHandler.Handle(notification) //nolint:errcheck
return
}
go actionHandler.Handle(notification) //nolint:errcheck
}
}
// ActionHandler handles a notification for a Protocol Action.
type ActionHandler interface {
Handle(notification *ActionNotification) error
}
// ActionNotification defines a notification for a Protocol Action.
type ActionNotification struct {
Action string `json:"action"`
Username string `json:"username"`
Path string `json:"path"`
TargetPath string `json:"target_path,omitempty"`
VirtualPath string `json:"virtual_path"`
VirtualTargetPath string `json:"virtual_target_path,omitempty"`
SSHCmd string `json:"ssh_cmd,omitempty"`
FileSize int64 `json:"file_size,omitempty"`
FsProvider int `json:"fs_provider"`
Bucket string `json:"bucket,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Status int `json:"status"`
Protocol string `json:"protocol"`
IP string `json:"ip"`
SessionID string `json:"session_id"`
Timestamp int64 `json:"timestamp"`
OpenFlags int `json:"open_flags,omitempty"`
Handle(notification *notifier.FsEvent) error
}
func newActionNotification(
@ -119,7 +116,7 @@ func newActionNotification(
fileSize int64,
openFlags int,
err error,
) *ActionNotification {
) *notifier.FsEvent {
var bucket, endpoint string
status := 1
@ -146,7 +143,7 @@ func newActionNotification(
status = 2
}
return &ActionNotification{
return &notifier.FsEvent{
Action: operation,
Username: user.Username,
Path: filePath,
@ -169,28 +166,29 @@ func newActionNotification(
type defaultActionHandler struct{}
func (h *defaultActionHandler) Handle(notification *ActionNotification) error {
if !util.IsStringInSlice(notification.Action, Config.Actions.ExecuteOn) {
func (h *defaultActionHandler) Handle(event *notifier.FsEvent) error {
if !util.IsStringInSlice(event.Action, Config.Actions.ExecuteOn) {
return errUnconfiguredAction
}
if Config.Actions.Hook == "" {
logger.Warn(notification.Protocol, "", "Unable to send notification, no hook is defined")
logger.Warn(event.Protocol, "", "Unable to send notification, no hook is defined")
return errNoHook
}
if strings.HasPrefix(Config.Actions.Hook, "http") {
return h.handleHTTP(notification)
return h.handleHTTP(event)
}
return h.handleCommand(notification)
return h.handleCommand(event)
}
func (h *defaultActionHandler) handleHTTP(notification *ActionNotification) error {
func (h *defaultActionHandler) handleHTTP(event *notifier.FsEvent) error {
u, err := url.Parse(Config.Actions.Hook)
if err != nil {
logger.Warn(notification.Protocol, "", "Invalid hook %#v for operation %#v: %v", Config.Actions.Hook, notification.Action, err)
logger.Error(event.Protocol, "", "Invalid hook %#v for operation %#v: %v",
Config.Actions.Hook, event.Action, err)
return err
}
@ -198,7 +196,7 @@ func (h *defaultActionHandler) handleHTTP(notification *ActionNotification) erro
respCode := 0
var b bytes.Buffer
_ = json.NewEncoder(&b).Encode(notification)
_ = json.NewEncoder(&b).Encode(event)
resp, err := httpclient.RetryablePost(Config.Actions.Hook, "application/json", &b)
if err == nil {
@ -210,16 +208,16 @@ func (h *defaultActionHandler) handleHTTP(notification *ActionNotification) erro
}
}
logger.Debug(notification.Protocol, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
notification.Action, u.Redacted(), respCode, time.Since(startTime), err)
logger.Debug(event.Protocol, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
event.Action, u.Redacted(), respCode, time.Since(startTime), err)
return err
}
func (h *defaultActionHandler) handleCommand(notification *ActionNotification) error {
func (h *defaultActionHandler) handleCommand(event *notifier.FsEvent) error {
if !filepath.IsAbs(Config.Actions.Hook) {
err := fmt.Errorf("invalid notification command %#v", Config.Actions.Hook)
logger.Warn(notification.Protocol, "", "unable to execute notification command: %v", err)
logger.Warn(event.Protocol, "", "unable to execute notification command: %v", err)
return err
}
@ -228,35 +226,35 @@ func (h *defaultActionHandler) handleCommand(notification *ActionNotification) e
defer cancel()
cmd := exec.CommandContext(ctx, Config.Actions.Hook)
cmd.Env = append(os.Environ(), notificationAsEnvVars(notification)...)
cmd.Env = append(os.Environ(), notificationAsEnvVars(event)...)
startTime := time.Now()
err := cmd.Run()
logger.Debug(notification.Protocol, "", "executed command %#v, elapsed: %v, error: %v",
logger.Debug(event.Protocol, "", "executed command %#v, elapsed: %v, error: %v",
Config.Actions.Hook, time.Since(startTime), err)
return err
}
func notificationAsEnvVars(notification *ActionNotification) []string {
func notificationAsEnvVars(event *notifier.FsEvent) []string {
return []string{
fmt.Sprintf("SFTPGO_ACTION=%v", notification.Action),
fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", notification.Username),
fmt.Sprintf("SFTPGO_ACTION_PATH=%v", notification.Path),
fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", notification.TargetPath),
fmt.Sprintf("SFTPGO_ACTION_VIRTUAL_PATH=%v", notification.VirtualPath),
fmt.Sprintf("SFTPGO_ACTION_VIRTUAL_TARGET=%v", notification.VirtualTargetPath),
fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", notification.SSHCmd),
fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", notification.FileSize),
fmt.Sprintf("SFTPGO_ACTION_FS_PROVIDER=%v", notification.FsProvider),
fmt.Sprintf("SFTPGO_ACTION_BUCKET=%v", notification.Bucket),
fmt.Sprintf("SFTPGO_ACTION_ENDPOINT=%v", notification.Endpoint),
fmt.Sprintf("SFTPGO_ACTION_STATUS=%v", notification.Status),
fmt.Sprintf("SFTPGO_ACTION_PROTOCOL=%v", notification.Protocol),
fmt.Sprintf("SFTPGO_ACTION_IP=%v", notification.IP),
fmt.Sprintf("SFTPGO_ACTION_SESSION_ID=%v", notification.SessionID),
fmt.Sprintf("SFTPGO_ACTION_OPEN_FLAGS=%v", notification.OpenFlags),
fmt.Sprintf("SFTPGO_ACTION_TIMESTAMP=%v", notification.Timestamp),
fmt.Sprintf("SFTPGO_ACTION=%v", event.Action),
fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", event.Username),
fmt.Sprintf("SFTPGO_ACTION_PATH=%v", event.Path),
fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", event.TargetPath),
fmt.Sprintf("SFTPGO_ACTION_VIRTUAL_PATH=%v", event.VirtualPath),
fmt.Sprintf("SFTPGO_ACTION_VIRTUAL_TARGET=%v", event.VirtualTargetPath),
fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", event.SSHCmd),
fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", event.FileSize),
fmt.Sprintf("SFTPGO_ACTION_FS_PROVIDER=%v", event.FsProvider),
fmt.Sprintf("SFTPGO_ACTION_BUCKET=%v", event.Bucket),
fmt.Sprintf("SFTPGO_ACTION_ENDPOINT=%v", event.Endpoint),
fmt.Sprintf("SFTPGO_ACTION_STATUS=%v", event.Status),
fmt.Sprintf("SFTPGO_ACTION_PROTOCOL=%v", event.Protocol),
fmt.Sprintf("SFTPGO_ACTION_IP=%v", event.IP),
fmt.Sprintf("SFTPGO_ACTION_SESSION_ID=%v", event.SessionID),
fmt.Sprintf("SFTPGO_ACTION_OPEN_FLAGS=%v", event.OpenFlags),
fmt.Sprintf("SFTPGO_ACTION_TIMESTAMP=%v", event.Timestamp),
}
}

View file

@ -15,6 +15,8 @@ import (
"github.com/drakkan/sftpgo/v2/dataprovider"
"github.com/drakkan/sftpgo/v2/sdk"
"github.com/drakkan/sftpgo/v2/sdk/plugin"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
"github.com/drakkan/sftpgo/v2/vfs"
)
@ -146,6 +148,8 @@ func TestActionCMD(t *testing.T) {
c := NewBaseConnection("id", ProtocolSFTP, "", "", *user)
ExecuteActionNotification(c, OperationSSHCmd, "path", "vpath", "target", "vtarget", "sha1sum", 0, nil)
ExecuteActionNotification(c, operationDownload, "path", "vpath", "", "", "", 0, nil)
Config.Actions = actionsCopy
}
@ -235,11 +239,42 @@ func TestPreDeleteAction(t *testing.T) {
Config.Actions = actionsCopy
}
func TestUnconfiguredHook(t *testing.T) {
actionsCopy := Config.Actions
Config.Actions = ProtocolActions{
ExecuteOn: []string{operationDownload},
Hook: "",
}
pluginsConfig := []plugin.Config{
{
Type: "notifier",
},
}
err := plugin.Initialize(pluginsConfig, true)
assert.Error(t, err)
assert.True(t, plugin.Handler.HasNotifiers())
c := NewBaseConnection("id", ProtocolSFTP, "", "", dataprovider.User{})
err = ExecutePreAction(c, OperationPreDownload, "", "", 0, 0)
assert.NoError(t, err)
err = ExecutePreAction(c, operationPreDelete, "", "", 0, 0)
assert.ErrorIs(t, err, errUnconfiguredAction)
ExecuteActionNotification(c, operationDownload, "", "", "", "", "", 0, nil)
err = plugin.Initialize(nil, true)
assert.NoError(t, err)
assert.False(t, plugin.Handler.HasNotifiers())
Config.Actions = actionsCopy
}
type actionHandlerStub struct {
called bool
}
func (h *actionHandlerStub) Handle(notification *ActionNotification) error {
func (h *actionHandlerStub) Handle(event *notifier.FsEvent) error {
h.called = true
return nil
@ -253,7 +288,7 @@ func TestInitializeActionHandler(t *testing.T) {
InitializeActionHandler(&defaultActionHandler{})
})
err := actionHandler.Handle(&ActionNotification{})
err := actionHandler.Handle(&notifier.FsEvent{})
assert.NoError(t, err)
assert.True(t, handler.called)

View file

@ -14,6 +14,7 @@ import (
"github.com/drakkan/sftpgo/v2/httpclient"
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk/plugin"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
"github.com/drakkan/sftpgo/v2/util"
)
@ -33,7 +34,16 @@ const (
)
func executeAction(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
plugin.Handler.NotifyProviderEvent(time.Now().UnixNano(), operation, executor, objectType, objectName, ip, object)
if plugin.Handler.HasNotifiers() {
plugin.Handler.NotifyProviderEvent(&notifier.ProviderEvent{
Action: operation,
Username: executor,
ObjectType: objectType,
ObjectName: objectName,
IP: ip,
Timestamp: time.Now().UnixNano(),
}, object)
}
if config.Actions.Hook == "" {
return
}

View file

@ -1745,10 +1745,20 @@ func validateIPFilters(user *User) error {
return nil
}
func validateBandwidthLimit(bl sdk.BandwidthLimit) error {
for _, source := range bl.Sources {
_, _, err := net.ParseCIDR(source)
if err != nil {
return util.NewValidationError(fmt.Sprintf("could not parse bandwidth limit source %#v: %v", source, err))
}
}
return nil
}
func validateBandwidthLimitFilters(user *User) error {
for idx, bandwidthLimit := range user.Filters.BandwidthLimits {
user.Filters.BandwidthLimits[idx].Sources = util.RemoveDuplicates(bandwidthLimit.Sources)
if err := bandwidthLimit.Validate(); err != nil {
if err := validateBandwidthLimit(bandwidthLimit); err != nil {
return err
}
if bandwidthLimit.DownloadBandwidth < 0 {

View file

@ -55,8 +55,8 @@ The configuration file contains the following sections:
- `idle_timeout`, integer. Time in minutes after which an idle client will be disconnected. 0 means disabled. Default: 15
- `upload_mode` integer. 0 means standard: the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. Default: 0
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions. See [Custom Actions](./custom-actions.md) for more details
- `execute_on`, list of strings. Valid values are `pre-download`, `download`, `pre-upload`, `upload`, `pre-delete`, `delete`, `rename`, `ssh_cmd`. Leave empty to disable actions.
- `execute_sync`, list of strings. Actions to be performed synchronously. The `pre-delete` action is always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the `pre-delete` hook synchronously
- `execute_on`, list of strings. Valid values are `pre-download`, `download`, `pre-upload`, `upload`, `pre-delete`, `delete`, `rename`, `mkdir`, `rmdir`, `ssh_cmd`. Leave empty to disable actions.
- `execute_sync`, list of strings. Actions, defined in the `execute_on` list above, to be performed synchronously. The `pre-*` actions are always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the defined `pre-*` hook synchronously
- `hook`, string. Absolute path to the command to execute or HTTP URL to notify.
- `setstat_mode`, integer. 0 means "normal mode": requests for changing permissions, owner/group and access/modification times are executed. 1 means "ignore mode": requests for changing permissions, owner/group and access/modification times are silently ignored. 2 means "ignore mode if not supported": requests for changing permissions and owner/group are silently ignored for cloud filesystems and executed for local/SFTP filesystem. Requests for changing modification times are always executed for local/SFTP filesystems and are executed for cloud based filesystems if the target is a file and there is a metadata plugin available. A metadata plugin can be found [here](https://github.com/sftpgo/sftpgo-plugin-metadata).
- `temp_path`, string. Defines the path for temporary files such as those used for atomic uploads or file pipes. If you set this option you must make sure that the defined path exists, is accessible for writing by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise the renaming for atomic uploads will become a copy and therefore may take a long time. The temporary files are not namespaced. The default is generally fine. Leave empty for the default.

2
go.mod
View file

@ -39,7 +39,7 @@ require (
github.com/rs/cors v1.8.2
github.com/rs/xid v1.3.0
github.com/rs/zerolog v1.26.2-0.20211219225053-665519c4da50
github.com/shirou/gopsutil/v3 v3.21.11
github.com/shirou/gopsutil/v3 v3.21.12
github.com/spf13/afero v1.7.1
github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1

6
go.sum
View file

@ -740,8 +740,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJxNzj3QBOf7dZwupeVC+mG1Lo=
github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY=
github.com/shirou/gopsutil/v3 v3.21.11 h1:d5tOAP5+bmJ8Hf2+4bxOSkQ/64+sjEbjU9nSW9nJgG0=
github.com/shirou/gopsutil/v3 v3.21.11/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA=
github.com/shirou/gopsutil/v3 v3.21.12 h1:VoGxEW2hpmz0Vt3wUvHIl9fquzYLNpVpgNNB7pGJimA=
github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
@ -798,8 +798,6 @@ github.com/wagslane/go-password-validator v0.3.0 h1:vfxOPzGHkz5S146HDpavl0cw1DSV
github.com/wagslane/go-password-validator v0.3.0/go.mod h1:TI1XJ6T5fRdRnHqHt14pvy1tNVnrwe7m3/f1f2fDphQ=
github.com/xhit/go-simple-mail/v2 v2.10.0 h1:nib6RaJ4qVh5HD9UE9QJqnUZyWp3upv+Z6CFxaMj0V8=
github.com/xhit/go-simple-mail/v2 v2.10.0/go.mod h1:kA1XbQfCI4JxQ9ccSN6VFyIEkkugOm7YiPkA5hKiQn4=
github.com/yl2chen/cidranger v1.0.2 h1:lbOWZVCG1tCRX4u24kuM1Tb4nHqWkDxwLdoS+SevawU=
github.com/yl2chen/cidranger v1.0.2/go.mod h1:9U1yz7WPYDwf0vpNWFaeRh0bjwz5RVgRy/9UEQfHl0g=
github.com/yl2chen/cidranger v1.0.3-0.20210928021809-d1cb2c52f37a h1:XfF01GyP+0eWCaVp0y6rNN+kFp7pt9Da4UUYrJ5XPWA=
github.com/yl2chen/cidranger v1.0.3-0.20210928021809-d1cb2c52f37a/go.mod h1:aXb8yZQEWo1XHGMf1qQfnb83GR/EJ2EBlwtUgAaNBoE=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

View file

@ -12,7 +12,6 @@ import (
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
"github.com/drakkan/sftpgo/v2/util"
)
@ -37,48 +36,25 @@ func (c *NotifierConfig) hasActions() bool {
type eventsQueue struct {
sync.RWMutex
fsEvents []*proto.FsEvent
providerEvents []*proto.ProviderEvent
fsEvents []*notifier.FsEvent
providerEvents []*notifier.ProviderEvent
}
func (q *eventsQueue) addFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
fileSize int64, status int,
) {
func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
q.Lock()
defer q.Unlock()
q.fsEvents = append(q.fsEvents, &proto.FsEvent{
Timestamp: timestamp,
Action: action,
Username: username,
FsPath: fsPath,
FsTargetPath: fsTargetPath,
SshCmd: sshCmd,
FileSize: fileSize,
Protocol: protocol,
Ip: ip,
Status: int32(status),
})
q.fsEvents = append(q.fsEvents, event)
}
func (q *eventsQueue) addProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
objectAsJSON []byte,
) {
func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
q.Lock()
defer q.Unlock()
q.providerEvents = append(q.providerEvents, &proto.ProviderEvent{
Timestamp: timestamp,
Action: action,
ObjectType: objectType,
Username: username,
Ip: ip,
ObjectName: objectName,
ObjectData: objectAsJSON,
})
q.providerEvents = append(q.providerEvents, event)
}
func (q *eventsQueue) popFsEvent() *proto.FsEvent {
func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
q.Lock()
defer q.Unlock()
@ -93,7 +69,7 @@ func (q *eventsQueue) popFsEvent() *proto.FsEvent {
return ev
}
func (q *eventsQueue) popProviderEvent() *proto.ProviderEvent {
func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
q.Lock()
defer q.Unlock()
@ -193,7 +169,7 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
if p.config.NotifierOptions.RetryMaxTime == 0 {
return false
}
if time.Now().After(util.GetTimeFromMsecSinceEpoch(timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
return false
}
if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
@ -202,58 +178,47 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
return true
}
func (p *notifierPlugin) notifyFsAction(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, errAction error) {
if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.FsEvents) {
return
}
go func() {
status := 1
if errAction != nil {
status = 0
}
p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath, virtualTargetPath,
sessionID, fileSize, status)
p.sendFsEvent(event)
}()
}
func (p *notifierPlugin) notifyProviderAction(timestamp int64, action, username, objectType, objectName, ip string,
object Renderer,
) {
if !util.IsStringInSlice(action, p.config.NotifierOptions.ProviderEvents) ||
!util.IsStringInSlice(objectType, p.config.NotifierOptions.ProviderObjects) {
func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.ProviderEvents) ||
!util.IsStringInSlice(event.ObjectType, p.config.NotifierOptions.ProviderObjects) {
return
}
go func() {
objectAsJSON, err := object.RenderAsJSON(action != "delete")
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
if err != nil {
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
return
}
p.sendProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
event.ObjectData = objectAsJSON
p.sendProviderEvent(event)
}()
}
func (p *notifierPlugin) sendFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, status int) {
if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
virtualPath, virtualTargetPath, sessionID, fileSize, status); err != nil {
func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
if err := p.notifier.NotifyFsEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(timestamp) {
p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, fileSize, status)
if p.canQueueEvent(event.Timestamp) {
p.queue.addFsEvent(event)
}
}
}
func (p *notifierPlugin) sendProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
objectAsJSON []byte,
) {
if err := p.notifier.NotifyProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON); err != nil {
func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
if err := p.notifier.NotifyProviderEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(timestamp) {
p.queue.addProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
if p.canQueueEvent(event.Timestamp) {
p.queue.addProviderEvent(event)
}
}
}
@ -266,16 +231,17 @@ func (p *notifierPlugin) sendQueuedEvents() {
logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
fsEv := p.queue.popFsEvent()
for fsEv != nil {
go p.sendFsEvent(fsEv.Timestamp, fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
fsEv.SshCmd, fsEv.Protocol, fsEv.Ip, fsEv.VirtualPath, fsEv.VirtualTargetPath, fsEv.SessionId,
fsEv.FileSize, int(fsEv.Status))
go func(ev *notifier.FsEvent) {
p.sendFsEvent(ev)
}(fsEv)
fsEv = p.queue.popFsEvent()
}
providerEv := p.queue.popProviderEvent()
for providerEv != nil {
go p.sendProviderEvent(providerEv.Timestamp, providerEv.Action, providerEv.Username, providerEv.ObjectType,
providerEv.ObjectName, providerEv.Ip, providerEv.ObjectData)
go func(ev *notifier.ProviderEvent) {
p.sendProviderEvent(ev)
}(providerEv)
providerEv = p.queue.popProviderEvent()
}
logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())

View file

@ -69,14 +69,35 @@ type GRPCServer struct {
// SendFsEvent implements the serve side fs notify method
func (s *GRPCServer) SendFsEvent(ctx context.Context, req *proto.FsEvent) (*emptypb.Empty, error) {
err := s.Impl.NotifyFsEvent(req.Timestamp, req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd,
req.Protocol, req.Ip, req.VirtualPath, req.VirtualTargetPath, req.SessionId, req.FileSize, int(req.Status))
event := &FsEvent{
Action: req.Action,
Username: req.Username,
Path: req.FsPath,
TargetPath: req.FsTargetPath,
VirtualPath: req.VirtualPath,
SSHCmd: req.SshCmd,
FileSize: req.FileSize,
Status: int(req.Status),
Protocol: req.Protocol,
IP: req.Ip,
SessionID: req.SessionId,
Timestamp: req.Timestamp,
}
err := s.Impl.NotifyFsEvent(event)
return &emptypb.Empty{}, err
}
// SendProviderEvent implements the serve side provider event notify method
func (s *GRPCServer) SendProviderEvent(ctx context.Context, req *proto.ProviderEvent) (*emptypb.Empty, error) {
err := s.Impl.NotifyProviderEvent(req.Timestamp, req.Action, req.Username, req.ObjectType, req.ObjectName,
req.Ip, req.ObjectData)
event := &ProviderEvent{
Action: req.Action,
Username: req.Username,
ObjectType: req.ObjectType,
ObjectName: req.ObjectName,
IP: req.Ip,
ObjectData: req.ObjectData,
Timestamp: req.Timestamp,
}
err := s.Impl.NotifyProviderEvent(event)
return &emptypb.Empty{}, err
}

View file

@ -30,11 +30,42 @@ var PluginMap = map[string]plugin.Plugin{
PluginName: &Plugin{},
}
// FsEvent defines a file system event
type FsEvent struct {
Action string `json:"action"`
Username string `json:"username"`
Path string `json:"path"`
TargetPath string `json:"target_path,omitempty"`
VirtualPath string `json:"virtual_path"`
VirtualTargetPath string `json:"virtual_target_path,omitempty"`
SSHCmd string `json:"ssh_cmd,omitempty"`
FileSize int64 `json:"file_size,omitempty"`
FsProvider int `json:"fs_provider"`
Bucket string `json:"bucket,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Status int `json:"status"`
Protocol string `json:"protocol"`
IP string `json:"ip"`
SessionID string `json:"session_id"`
Timestamp int64 `json:"timestamp"`
OpenFlags int `json:"open_flags,omitempty"`
}
// ProviderEvent defines a provider event
type ProviderEvent struct {
Action string
Username string
ObjectType string
ObjectName string
IP string
ObjectData []byte
Timestamp int64
}
// Notifier defines the interface for notifiers plugins
type Notifier interface {
NotifyFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
virtualPath, virtualTargetPath, sessionID string, fileSize int64, status int) error
NotifyProviderEvent(timestamp int64, action, username, objectType, objectName, ip string, object []byte) error
NotifyFsEvent(event *FsEvent) error
NotifyProviderEvent(event *ProviderEvent) error
}
// Plugin defines the implementation to serve/connect to a notifier plugin

View file

@ -95,6 +95,7 @@ type Manager struct {
authScopes int
hasSearcher bool
hasMetadater bool
hasNotifiers bool
}
// Initialize initializes the configured plugins
@ -172,6 +173,7 @@ func (m *Manager) validateConfigs() error {
kmsEncryptions := make(map[string]bool)
m.hasSearcher = false
m.hasMetadater = false
m.hasNotifiers = false
for _, config := range m.Configs {
if config.Type == kmsplugin.PluginName {
@ -196,32 +198,35 @@ func (m *Manager) validateConfigs() error {
}
m.hasMetadater = true
}
if config.Type == notifier.PluginName {
m.hasNotifiers = true
}
}
return nil
}
// HasNotifiers returns true if there is at least a notifier plugin
func (m *Manager) HasNotifiers() bool {
return m.hasNotifiers
}
// NotifyFsEvent sends the fs event notifications using any defined notifier plugins
func (m *Manager) NotifyFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
virtualPath, virtualTargetPath, sessionID string, fileSize int64, err error,
) {
func (m *Manager) NotifyFsEvent(event *notifier.FsEvent) {
m.notifLock.RLock()
defer m.notifLock.RUnlock()
for _, n := range m.notifiers {
n.notifyFsAction(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath,
virtualTargetPath, sessionID, fileSize, err)
n.notifyFsAction(event)
}
}
// NotifyProviderEvent sends the provider event notifications using any defined notifier plugins
func (m *Manager) NotifyProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
object Renderer,
) {
func (m *Manager) NotifyProviderEvent(event *notifier.ProviderEvent, object Renderer) {
m.notifLock.RLock()
defer m.notifLock.RUnlock()
for _, n := range m.notifiers {
n.notifyProviderAction(timestamp, action, username, objectType, objectName, ip, object)
n.notifyProviderAction(event, object)
}
}

View file

@ -1,8 +1,6 @@
package sdk
import (
"fmt"
"net"
"strings"
"github.com/drakkan/sftpgo/v2/kms"
@ -139,17 +137,6 @@ type BandwidthLimit struct {
DownloadBandwidth int64 `json:"download_bandwidth,omitempty"`
}
// Validate returns an error if the bandwidth limit is not valid
func (l *BandwidthLimit) Validate() error {
for _, source := range l.Sources {
_, _, err := net.ParseCIDR(source)
if err != nil {
return util.NewValidationError(fmt.Sprintf("could not parse bandwidth limit source %#v: %v", source, err))
}
}
return nil
}
// GetSourcesAsString returns the sources as comma separated string
func (l *BandwidthLimit) GetSourcesAsString() string {
return strings.Join(l.Sources, ",")

View file

@ -171,7 +171,8 @@ func TestMain(m *testing.M) {
if runtime.GOOS == osWindows {
scriptArgs = "%*"
} else {
commonConf.Actions.ExecuteOn = []string{"download", "upload", "rename", "delete", "ssh_cmd"}
commonConf.Actions.ExecuteOn = []string{"download", "upload", "rename", "delete", "ssh_cmd",
"pre-download", "pre-upload"}
commonConf.Actions.Hook = hookCmdPath
scriptArgs = "$@"
}