|
@@ -17,6 +17,7 @@ package common
|
|
|
import (
|
|
|
"bytes"
|
|
|
"context"
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"net/http"
|
|
@@ -30,6 +31,7 @@ import (
|
|
|
"time"
|
|
|
|
|
|
"github.com/robfig/cron/v3"
|
|
|
+ "github.com/rs/xid"
|
|
|
|
|
|
"github.com/drakkan/sftpgo/v2/internal/dataprovider"
|
|
|
"github.com/drakkan/sftpgo/v2/internal/logger"
|
|
@@ -47,6 +49,9 @@ var (
|
|
|
func init() {
|
|
|
eventManager = eventRulesContainer{
|
|
|
schedulesMapping: make(map[string][]cron.EntryID),
|
|
|
+ // arbitrary maximum number of concurrent asynchronous tasks,
|
|
|
+ // each task could execute multiple actions
|
|
|
+ concurrencyGuard: make(chan struct{}, 200),
|
|
|
}
|
|
|
dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
|
|
|
func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
|
|
@@ -71,6 +76,15 @@ type eventRulesContainer struct {
|
|
|
Schedules []dataprovider.EventRule
|
|
|
schedulesMapping map[string][]cron.EntryID
|
|
|
lastLoad int64
|
|
|
+ concurrencyGuard chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+func (r *eventRulesContainer) addAsyncTask() {
|
|
|
+ r.concurrencyGuard <- struct{}{}
|
|
|
+}
|
|
|
+
|
|
|
+func (r *eventRulesContainer) removeAsyncTask() {
|
|
|
+ <-r.concurrencyGuard
|
|
|
}
|
|
|
|
|
|
func (r *eventRulesContainer) getLastLoadTime() int64 {
|
|
@@ -245,11 +259,19 @@ func (r *eventRulesContainer) hasFsRules() bool {
|
|
|
|
|
|
// handleFsEvent executes the rules actions defined for the specified event
|
|
|
func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
|
|
|
+ if params.Protocol == protocolEventAction {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
r.RLock()
|
|
|
|
|
|
var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
|
|
|
for _, rule := range r.FsEvents {
|
|
|
if r.checkFsEventMatch(rule.Conditions, params) {
|
|
|
+ if err := rule.CheckActionsConsistency(""); err != nil {
|
|
|
+ eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
|
|
|
+ rule.Name, err, params.Event)
|
|
|
+ continue
|
|
|
+ }
|
|
|
hasSyncActions := false
|
|
|
for _, action := range rule.Actions {
|
|
|
if action.Options.ExecuteSync {
|
|
@@ -267,6 +289,7 @@ func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
|
|
|
|
|
|
r.RUnlock()
|
|
|
|
|
|
+ params.sender = params.Name
|
|
|
if len(rulesAsync) > 0 {
|
|
|
go executeAsyncRulesActions(rulesAsync, params)
|
|
|
}
|
|
@@ -277,6 +300,7 @@ func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// username is populated for user objects
|
|
|
func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
|
|
|
r.RLock()
|
|
|
defer r.RUnlock()
|
|
@@ -284,11 +308,17 @@ func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
|
|
|
var rules []dataprovider.EventRule
|
|
|
for _, rule := range r.ProviderEvents {
|
|
|
if r.checkProviderEventMatch(rule.Conditions, params) {
|
|
|
- rules = append(rules, rule)
|
|
|
+ if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
|
|
|
+ rules = append(rules, rule)
|
|
|
+ } else {
|
|
|
+ eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
|
|
|
+ rule.Name, err, params.Event, params.ObjectType)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if len(rules) > 0 {
|
|
|
+ params.sender = params.ObjectName
|
|
|
go executeAsyncRulesActions(rules, params)
|
|
|
}
|
|
|
}
|
|
@@ -309,6 +339,30 @@ type EventParams struct {
|
|
|
IP string
|
|
|
Timestamp int64
|
|
|
Object plugin.Renderer
|
|
|
+ sender string
|
|
|
+}
|
|
|
+
|
|
|
+// getUsers returns users with group settings not applied
|
|
|
+func (p *EventParams) getUsers() ([]dataprovider.User, error) {
|
|
|
+ if p.sender == "" {
|
|
|
+ return dataprovider.DumpUsers()
|
|
|
+ }
|
|
|
+ user, err := dataprovider.UserExists(p.sender)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error getting user %q: %w", p.sender, err)
|
|
|
+ }
|
|
|
+ return []dataprovider.User{user}, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
|
|
|
+ if p.sender == "" {
|
|
|
+ return dataprovider.DumpFolders()
|
|
|
+ }
|
|
|
+ folder, err := dataprovider.GetFolderByName(p.sender)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
|
|
|
+ }
|
|
|
+ return []vfs.BaseVirtualFolder{folder}, nil
|
|
|
}
|
|
|
|
|
|
func (p *EventParams) getStringReplacements(addObjectData bool) []string {
|
|
@@ -495,6 +549,123 @@ func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventP
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+func getUserForEventAction(username string) (dataprovider.User, error) {
|
|
|
+ user, err := dataprovider.GetUserWithGroupSettings(username)
|
|
|
+ if err != nil {
|
|
|
+ return dataprovider.User{}, err
|
|
|
+ }
|
|
|
+ user.Filters.DisableFsChecks = false
|
|
|
+ user.Filters.FilePatterns = nil
|
|
|
+ for k := range user.Permissions {
|
|
|
+ user.Permissions[k] = []string{dataprovider.PermAny}
|
|
|
+ }
|
|
|
+ return user, err
|
|
|
+}
|
|
|
+
|
|
|
+func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
|
|
|
+ fs, fsPath, err := conn.GetFsAndResolvedPath(item)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return conn.RemoveFile(fs, fsPath, item, info)
|
|
|
+}
|
|
|
+
|
|
|
+func executeDeleteFsAction(deletes []string, replacer *strings.Replacer, username string) error {
|
|
|
+ user, err := getUserForEventAction(username)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
|
|
|
+ err = user.CheckFsRoot(connectionID)
|
|
|
+ defer user.CloseFs() //nolint:errcheck
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
|
|
|
+ for _, item := range deletes {
|
|
|
+ item = replaceWithReplacer(item, replacer)
|
|
|
+ info, err := conn.DoStat(item, 0, false)
|
|
|
+ if err != nil {
|
|
|
+ if conn.IsNotExistError(err) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if info.IsDir() {
|
|
|
+ if err = conn.RemoveDir(item); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if err = executeDeleteFileFsAction(conn, item, info); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func executeMkDirsFsAction(dirs []string, replacer *strings.Replacer, username string) error {
|
|
|
+ user, err := getUserForEventAction(username)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
|
|
|
+ err = user.CheckFsRoot(connectionID)
|
|
|
+ defer user.CloseFs() //nolint:errcheck
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
|
|
|
+ for _, item := range dirs {
|
|
|
+ item = replaceWithReplacer(item, replacer)
|
|
|
+ if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err = conn.CreateDir(item, false); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func executeRenameFsAction(renames []dataprovider.KeyValue, replacer *strings.Replacer, username string) error {
|
|
|
+ user, err := getUserForEventAction(username)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
|
|
|
+ err = user.CheckFsRoot(connectionID)
|
|
|
+ defer user.CloseFs() //nolint:errcheck
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
|
|
|
+ for _, item := range renames {
|
|
|
+ source := replaceWithReplacer(item.Key, replacer)
|
|
|
+ target := replaceWithReplacer(item.Value, replacer)
|
|
|
+ if err = conn.Rename(source, target); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, params EventParams) error {
|
|
|
+ addObjectData := false
|
|
|
+ replacements := params.getStringReplacements(addObjectData)
|
|
|
+ replacer := strings.NewReplacer(replacements...)
|
|
|
+ switch c.Type {
|
|
|
+ case dataprovider.FilesystemActionRename:
|
|
|
+ return executeRenameFsAction(c.Renames, replacer, params.sender)
|
|
|
+ case dataprovider.FilesystemActionDelete:
|
|
|
+ return executeDeleteFsAction(c.Deletes, replacer, params.sender)
|
|
|
+ case dataprovider.FilesystemActionMkdirs:
|
|
|
+ return executeMkDirsFsAction(c.MkDirs, replacer, params.sender)
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("unsupported filesystem action %d", c.Type)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func executeQuotaResetForUser(user dataprovider.User) error {
|
|
|
if err := user.LoadAndApplyGroupSettings(); err != nil {
|
|
|
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
|
|
@@ -520,18 +691,21 @@ func executeQuotaResetForUser(user dataprovider.User) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
|
|
|
- users, err := dataprovider.DumpUsers()
|
|
|
+func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
|
|
|
+ users, err := params.getUsers()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("unable to get users: %w", err)
|
|
|
}
|
|
|
var failedResets []string
|
|
|
+ executed := 0
|
|
|
for _, user := range users {
|
|
|
- if !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
+ // if sender is set, the conditions have already been evaluated
|
|
|
+ if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
|
|
|
user.Username)
|
|
|
continue
|
|
|
}
|
|
|
+ executed++
|
|
|
if err = executeQuotaResetForUser(user); err != nil {
|
|
|
failedResets = append(failedResets, user.Username)
|
|
|
continue
|
|
@@ -540,17 +714,23 @@ func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions)
|
|
|
if len(failedResets) > 0 {
|
|
|
return fmt.Errorf("quota reset failed for users: %+v", failedResets)
|
|
|
}
|
|
|
+ if executed == 0 {
|
|
|
+ eventManagerLog(logger.LevelError, "no user quota reset executed")
|
|
|
+ return errors.New("no user quota reset executed")
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
|
|
|
- folders, err := dataprovider.DumpFolders()
|
|
|
+func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
|
|
|
+ folders, err := params.getFolders()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("unable to get folders: %w", err)
|
|
|
}
|
|
|
var failedResets []string
|
|
|
+ executed := 0
|
|
|
for _, folder := range folders {
|
|
|
- if !checkEventConditionPatterns(folder.Name, conditions.Names) {
|
|
|
+ // if sender is set, the conditions have already been evaluated
|
|
|
+ if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
|
|
|
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
|
|
|
folder.Name)
|
|
|
continue
|
|
@@ -560,6 +740,7 @@ func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions
|
|
|
failedResets = append(failedResets, folder.Name)
|
|
|
continue
|
|
|
}
|
|
|
+ executed++
|
|
|
f := vfs.VirtualFolder{
|
|
|
BaseVirtualFolder: folder,
|
|
|
VirtualPath: "/",
|
|
@@ -580,21 +761,28 @@ func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions
|
|
|
if len(failedResets) > 0 {
|
|
|
return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
|
|
|
}
|
|
|
+ if executed == 0 {
|
|
|
+ eventManagerLog(logger.LevelError, "no folder quota reset executed")
|
|
|
+ return errors.New("no folder quota reset executed")
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions) error {
|
|
|
- users, err := dataprovider.DumpUsers()
|
|
|
+func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
|
|
|
+ users, err := params.getUsers()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("unable to get users: %w", err)
|
|
|
}
|
|
|
var failedResets []string
|
|
|
+ executed := 0
|
|
|
for _, user := range users {
|
|
|
- if !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
+ // if sender is set, the conditions have already been evaluated
|
|
|
+ if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
|
|
|
user.Username)
|
|
|
continue
|
|
|
}
|
|
|
+ executed++
|
|
|
err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
|
|
|
if err != nil {
|
|
|
eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
|
|
@@ -604,6 +792,10 @@ func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOption
|
|
|
if len(failedResets) > 0 {
|
|
|
return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
|
|
|
}
|
|
|
+ if executed == 0 {
|
|
|
+ eventManagerLog(logger.LevelError, "no transfer quota reset executed")
|
|
|
+ return errors.New("no transfer quota reset executed")
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -629,19 +821,22 @@ func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprov
|
|
|
}
|
|
|
|
|
|
func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
|
|
|
- conditions dataprovider.ConditionOptions,
|
|
|
+ conditions dataprovider.ConditionOptions, params EventParams,
|
|
|
) error {
|
|
|
- users, err := dataprovider.DumpUsers()
|
|
|
+ users, err := params.getUsers()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("unable to get users: %w", err)
|
|
|
}
|
|
|
var failedChecks []string
|
|
|
+ executed := 0
|
|
|
for _, user := range users {
|
|
|
- if !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
+ // if sender is set, the conditions have already been evaluated
|
|
|
+ if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
|
|
|
eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
|
|
|
user.Username)
|
|
|
continue
|
|
|
}
|
|
|
+ executed++
|
|
|
if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
|
|
|
failedChecks = append(failedChecks, user.Username)
|
|
|
continue
|
|
@@ -650,6 +845,10 @@ func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRete
|
|
|
if len(failedChecks) > 0 {
|
|
|
return fmt.Errorf("retention check failed for users: %+v", failedChecks)
|
|
|
}
|
|
|
+ if executed == 0 {
|
|
|
+ eventManagerLog(logger.LevelError, "no retention check executed")
|
|
|
+ return errors.New("no retention check executed")
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -664,13 +863,15 @@ func executeRuleAction(action dataprovider.BaseEventAction, params EventParams,
|
|
|
case dataprovider.ActionTypeBackup:
|
|
|
return dataprovider.ExecuteBackup()
|
|
|
case dataprovider.ActionTypeUserQuotaReset:
|
|
|
- return executeUsersQuotaResetRuleAction(conditions)
|
|
|
+ return executeUsersQuotaResetRuleAction(conditions, params)
|
|
|
case dataprovider.ActionTypeFolderQuotaReset:
|
|
|
- return executeFoldersQuotaResetRuleAction(conditions)
|
|
|
+ return executeFoldersQuotaResetRuleAction(conditions, params)
|
|
|
case dataprovider.ActionTypeTransferQuotaReset:
|
|
|
- return executeTransferQuotaResetRuleAction(conditions)
|
|
|
+ return executeTransferQuotaResetRuleAction(conditions, params)
|
|
|
case dataprovider.ActionTypeDataRetentionCheck:
|
|
|
- return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions)
|
|
|
+ return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
|
|
|
+ case dataprovider.ActionTypeFilesystem:
|
|
|
+ return executeFsRuleAction(action.Options.FsConfig, params)
|
|
|
default:
|
|
|
return fmt.Errorf("unsupported action type: %d", action.Type)
|
|
|
}
|
|
@@ -707,6 +908,9 @@ func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams)
|
|
|
}
|
|
|
|
|
|
func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
|
|
|
+ eventManager.addAsyncTask()
|
|
|
+ defer eventManager.removeAsyncTask()
|
|
|
+
|
|
|
for _, rule := range rules {
|
|
|
executeRuleAsyncActions(rule, params, nil)
|
|
|
}
|
|
@@ -784,6 +988,10 @@ func (j *eventCronJob) Run() {
|
|
|
eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
|
|
|
return
|
|
|
}
|
|
|
+ if err = rule.CheckActionsConsistency(""); err != nil {
|
|
|
+ eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
task, err := j.getTask(rule)
|
|
|
if err != nil {
|
|
|
return
|
|
@@ -823,9 +1031,9 @@ func (j *eventCronJob) Run() {
|
|
|
}
|
|
|
}(task.Name)
|
|
|
|
|
|
- executeRuleAsyncActions(rule, EventParams{}, nil)
|
|
|
+ executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
|
|
|
} else {
|
|
|
- executeRuleAsyncActions(rule, EventParams{}, nil)
|
|
|
+ executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
|
|
|
}
|
|
|
eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
|
|
|
}
|