123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125 |
- // Copyright (C) 2019-2022 Nicola Murino
- //
- // This program is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Affero General Public License as published
- // by the Free Software Foundation, version 3.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Affero General Public License for more details.
- //
- // You should have received a copy of the GNU Affero General Public License
- // along with this program. If not, see <https://www.gnu.org/licenses/>.
- package common
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/robfig/cron/v3"
- "github.com/rs/xid"
- "github.com/drakkan/sftpgo/v2/internal/dataprovider"
- "github.com/drakkan/sftpgo/v2/internal/logger"
- "github.com/drakkan/sftpgo/v2/internal/plugin"
- "github.com/drakkan/sftpgo/v2/internal/smtp"
- "github.com/drakkan/sftpgo/v2/internal/util"
- "github.com/drakkan/sftpgo/v2/internal/vfs"
- )
- const (
- ipBlockedEventName = "IP Blocked"
- )
- var (
- // eventManager handle the supported event rules actions
- eventManager eventRulesContainer
- )
- func init() {
- eventManager = eventRulesContainer{
- schedulesMapping: make(map[string][]cron.EntryID),
- // arbitrary maximum number of concurrent asynchronous tasks,
- // each task could execute multiple actions
- concurrencyGuard: make(chan struct{}, 200),
- }
- dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
- func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
- eventManager.handleProviderEvent(EventParams{
- Name: executor,
- ObjectName: objectName,
- Event: operation,
- Status: 1,
- ObjectType: objectType,
- IP: ip,
- Timestamp: time.Now().UnixNano(),
- Object: object,
- })
- })
- }
- // HandleCertificateEvent checks and executes action rules for certificate events
- func HandleCertificateEvent(params EventParams) {
- eventManager.handleCertificateEvent(params)
- }
- // eventRulesContainer stores event rules by trigger
- type eventRulesContainer struct {
- sync.RWMutex
- lastLoad int64
- FsEvents []dataprovider.EventRule
- ProviderEvents []dataprovider.EventRule
- Schedules []dataprovider.EventRule
- IPBlockedEvents []dataprovider.EventRule
- CertificateEvents []dataprovider.EventRule
- schedulesMapping map[string][]cron.EntryID
- concurrencyGuard chan struct{}
- }
- func (r *eventRulesContainer) addAsyncTask() {
- r.concurrencyGuard <- struct{}{}
- }
- func (r *eventRulesContainer) removeAsyncTask() {
- <-r.concurrencyGuard
- }
- func (r *eventRulesContainer) getLastLoadTime() int64 {
- return atomic.LoadInt64(&r.lastLoad)
- }
- func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
- atomic.StoreInt64(&r.lastLoad, modTime)
- }
- // RemoveRule deletes the rule with the specified name
- func (r *eventRulesContainer) RemoveRule(name string) {
- r.Lock()
- defer r.Unlock()
- r.removeRuleInternal(name)
- eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
- len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
- }
- func (r *eventRulesContainer) removeRuleInternal(name string) {
- for idx := range r.FsEvents {
- if r.FsEvents[idx].Name == name {
- lastIdx := len(r.FsEvents) - 1
- r.FsEvents[idx] = r.FsEvents[lastIdx]
- r.FsEvents = r.FsEvents[:lastIdx]
- eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
- return
- }
- }
- for idx := range r.ProviderEvents {
- if r.ProviderEvents[idx].Name == name {
- lastIdx := len(r.ProviderEvents) - 1
- r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
- r.ProviderEvents = r.ProviderEvents[:lastIdx]
- eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
- return
- }
- }
- for idx := range r.IPBlockedEvents {
- if r.IPBlockedEvents[idx].Name == name {
- lastIdx := len(r.IPBlockedEvents) - 1
- r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
- r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
- eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
- return
- }
- }
- for idx := range r.CertificateEvents {
- if r.CertificateEvents[idx].Name == name {
- lastIdx := len(r.CertificateEvents) - 1
- r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
- r.CertificateEvents = r.CertificateEvents[:lastIdx]
- eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
- return
- }
- }
- for idx := range r.Schedules {
- if r.Schedules[idx].Name == name {
- if schedules, ok := r.schedulesMapping[name]; ok {
- for _, entryID := range schedules {
- eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
- eventScheduler.Remove(entryID)
- }
- delete(r.schedulesMapping, name)
- }
- lastIdx := len(r.Schedules) - 1
- r.Schedules[idx] = r.Schedules[lastIdx]
- r.Schedules = r.Schedules[:lastIdx]
- eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
- return
- }
- }
- }
- func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
- r.removeRuleInternal(rule.Name)
- if rule.DeletedAt > 0 {
- deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
- if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
- eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
- go dataprovider.RemoveEventRule(rule) //nolint:errcheck
- }
- return
- }
- switch rule.Trigger {
- case dataprovider.EventTriggerFsEvent:
- r.FsEvents = append(r.FsEvents, rule)
- eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
- case dataprovider.EventTriggerProviderEvent:
- r.ProviderEvents = append(r.ProviderEvents, rule)
- eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
- case dataprovider.EventTriggerIPBlocked:
- r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
- eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
- case dataprovider.EventTriggerCertificate:
- r.CertificateEvents = append(r.CertificateEvents, rule)
- eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
- case dataprovider.EventTriggerSchedule:
- for _, schedule := range rule.Conditions.Schedules {
- cronSpec := schedule.GetCronSpec()
- job := &eventCronJob{
- ruleName: dataprovider.ConvertName(rule.Name),
- }
- entryID, err := eventScheduler.AddJob(cronSpec, job)
- if err != nil {
- eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
- return
- }
- r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
- eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
- rule.Name, entryID, cronSpec, len(r.schedulesMapping))
- }
- r.Schedules = append(r.Schedules, rule)
- eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
- default:
- eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
- }
- }
- func (r *eventRulesContainer) loadRules() {
- eventManagerLog(logger.LevelDebug, "loading updated rules")
- modTime := util.GetTimeAsMsSinceEpoch(time.Now())
- rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
- if err != nil {
- eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
- return
- }
- eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
- if len(rules) > 0 {
- r.Lock()
- defer r.Unlock()
- for _, rule := range rules {
- r.addUpdateRuleInternal(rule)
- }
- }
- eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
- len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
- r.setLastLoadTime(modTime)
- }
- func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
- if !util.Contains(conditions.ProviderEvents, params.Event) {
- return false
- }
- if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
- return false
- }
- if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
- return false
- }
- return true
- }
- func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
- if !util.Contains(conditions.FsEvents, params.Event) {
- return false
- }
- if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
- return false
- }
- if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
- if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
- return false
- }
- }
- if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
- return false
- }
- if params.Event == operationUpload || params.Event == operationDownload {
- if conditions.Options.MinFileSize > 0 {
- if params.FileSize < conditions.Options.MinFileSize {
- return false
- }
- }
- if conditions.Options.MaxFileSize > 0 {
- if params.FileSize > conditions.Options.MaxFileSize {
- return false
- }
- }
- }
- return true
- }
- // hasFsRules returns true if there are any rules for filesystem event triggers
- func (r *eventRulesContainer) hasFsRules() bool {
- r.RLock()
- defer r.RUnlock()
- return len(r.FsEvents) > 0
- }
- // handleFsEvent executes the rules actions defined for the specified event
- func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
- if params.Protocol == protocolEventAction {
- return nil
- }
- r.RLock()
- var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
- for _, rule := range r.FsEvents {
- if r.checkFsEventMatch(rule.Conditions, params) {
- if err := rule.CheckActionsConsistency(""); err != nil {
- eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
- rule.Name, err, params.Event)
- continue
- }
- hasSyncActions := false
- for _, action := range rule.Actions {
- if action.Options.ExecuteSync {
- hasSyncActions = true
- break
- }
- }
- if hasSyncActions {
- rulesWithSyncActions = append(rulesWithSyncActions, rule)
- } else {
- rulesAsync = append(rulesAsync, rule)
- }
- }
- }
- r.RUnlock()
- params.sender = params.Name
- if len(rulesAsync) > 0 {
- go executeAsyncRulesActions(rulesAsync, params)
- }
- if len(rulesWithSyncActions) > 0 {
- return executeSyncRulesActions(rulesWithSyncActions, params)
- }
- return nil
- }
- // username is populated for user objects
- func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
- r.RLock()
- defer r.RUnlock()
- var rules []dataprovider.EventRule
- for _, rule := range r.ProviderEvents {
- if r.checkProviderEventMatch(rule.Conditions, params) {
- if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
- rules = append(rules, rule)
- } else {
- eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
- rule.Name, err, params.Event, params.ObjectType)
- }
- }
- }
- if len(rules) > 0 {
- params.sender = params.ObjectName
- go executeAsyncRulesActions(rules, params)
- }
- }
- func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
- r.RLock()
- defer r.RUnlock()
- if len(r.IPBlockedEvents) == 0 {
- return
- }
- var rules []dataprovider.EventRule
- for _, rule := range r.IPBlockedEvents {
- if err := rule.CheckActionsConsistency(""); err == nil {
- rules = append(rules, rule)
- } else {
- eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
- rule.Name, err, params.Event)
- }
- }
- if len(rules) > 0 {
- go executeAsyncRulesActions(rules, params)
- }
- }
- func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
- r.RLock()
- defer r.RUnlock()
- if len(r.CertificateEvents) == 0 {
- return
- }
- var rules []dataprovider.EventRule
- for _, rule := range r.CertificateEvents {
- if err := rule.CheckActionsConsistency(""); err == nil {
- rules = append(rules, rule)
- } else {
- eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
- rule.Name, err, params.Event)
- }
- }
- if len(rules) > 0 {
- go executeAsyncRulesActions(rules, params)
- }
- }
- // EventParams defines the supported event parameters
- type EventParams struct {
- Name string
- Event string
- Status int
- VirtualPath string
- FsPath string
- VirtualTargetPath string
- FsTargetPath string
- ObjectName string
- ObjectType string
- FileSize int64
- Protocol string
- IP string
- Timestamp int64
- Object plugin.Renderer
- sender string
- }
- // getUsers returns users with group settings not applied
- func (p *EventParams) getUsers() ([]dataprovider.User, error) {
- if p.sender == "" {
- return dataprovider.DumpUsers()
- }
- user, err := dataprovider.UserExists(p.sender)
- if err != nil {
- return nil, fmt.Errorf("error getting user %q: %w", p.sender, err)
- }
- return []dataprovider.User{user}, nil
- }
- func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
- if p.sender == "" {
- return dataprovider.DumpFolders()
- }
- folder, err := dataprovider.GetFolderByName(p.sender)
- if err != nil {
- return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
- }
- return []vfs.BaseVirtualFolder{folder}, nil
- }
- func (p *EventParams) getStringReplacements(addObjectData bool) []string {
- replacements := []string{
- "{{Name}}", p.Name,
- "{{Event}}", p.Event,
- "{{Status}}", fmt.Sprintf("%d", p.Status),
- "{{VirtualPath}}", p.VirtualPath,
- "{{FsPath}}", p.FsPath,
- "{{VirtualTargetPath}}", p.VirtualTargetPath,
- "{{FsTargetPath}}", p.FsTargetPath,
- "{{ObjectName}}", p.ObjectName,
- "{{ObjectType}}", p.ObjectType,
- "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
- "{{Protocol}}", p.Protocol,
- "{{IP}}", p.IP,
- "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
- }
- if addObjectData {
- data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
- if err == nil {
- replacements = append(replacements, "{{ObjectData}}", string(data))
- }
- }
- return replacements
- }
- func replaceWithReplacer(input string, replacer *strings.Replacer) string {
- if !strings.Contains(input, "{{") {
- return input
- }
- return replacer.Replace(input)
- }
- func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
- matched, err := path.Match(p.Pattern, name)
- if err != nil {
- eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
- return false
- }
- if p.InverseMatch {
- return !matched
- }
- return matched
- }
- // checkConditionPatterns returns false if patterns are defined and no match is found
- func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
- if len(patterns) == 0 {
- return true
- }
- for _, p := range patterns {
- if checkEventConditionPattern(p, name) {
- return true
- }
- }
- return false
- }
- func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
- if len(c.QueryParameters) > 0 {
- u, err := url.Parse(c.Endpoint)
- if err != nil {
- return "", fmt.Errorf("invalid endpoint: %w", err)
- }
- q := u.Query()
- for _, keyVal := range c.QueryParameters {
- q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
- }
- u.RawQuery = q.Encode()
- return u.String(), nil
- }
- return c.Endpoint, nil
- }
- func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventParams) error {
- if !c.Password.IsEmpty() {
- if err := c.Password.TryDecrypt(); err != nil {
- return fmt.Errorf("unable to decrypt password: %w", err)
- }
- }
- addObjectData := false
- if params.Object != nil {
- if !addObjectData {
- if strings.Contains(c.Body, "{{ObjectData}}") {
- addObjectData = true
- }
- }
- }
- replacements := params.getStringReplacements(addObjectData)
- replacer := strings.NewReplacer(replacements...)
- endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
- if err != nil {
- return err
- }
- var body io.Reader
- if c.Body != "" && c.Method != http.MethodGet {
- body = bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))
- }
- req, err := http.NewRequest(c.Method, endpoint, body)
- if err != nil {
- return err
- }
- if c.Username != "" {
- req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetAdditionalData())
- }
- for _, keyVal := range c.Headers {
- req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
- }
- client := c.GetHTTPClient()
- defer client.CloseIdleConnections()
- startTime := time.Now()
- resp, err := client.Do(req)
- if err != nil {
- eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
- endpoint, time.Since(startTime), err)
- return err
- }
- defer resp.Body.Close()
- eventManagerLog(logger.LevelDebug, "http notification sent, endopoint: %s, elapsed: %s, status code: %d",
- endpoint, time.Since(startTime), resp.StatusCode)
- if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
- return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- }
- return nil
- }
- func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params EventParams) error {
- envVars := make([]string, 0, len(c.EnvVars))
- addObjectData := false
- if params.Object != nil {
- for _, k := range c.EnvVars {
- if strings.Contains(k.Value, "{{ObjectData}}") {
- addObjectData = true
- break
- }
- }
- }
- replacements := params.getStringReplacements(addObjectData)
- replacer := strings.NewReplacer(replacements...)
- for _, keyVal := range c.EnvVars {
- envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
- }
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
- defer cancel()
- cmd := exec.CommandContext(ctx, c.Cmd)
- cmd.Env = append(cmd.Env, os.Environ()...)
- cmd.Env = append(cmd.Env, envVars...)
- startTime := time.Now()
- err := cmd.Run()
- eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
- c.Cmd, time.Since(startTime), err)
- return err
- }
- func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventParams) error {
- addObjectData := false
- if params.Object != nil {
- if strings.Contains(c.Body, "{{ObjectData}}") {
- addObjectData = true
- }
- }
- replacements := params.getStringReplacements(addObjectData)
- replacer := strings.NewReplacer(replacements...)
- body := replaceWithReplacer(c.Body, replacer)
- subject := replaceWithReplacer(c.Subject, replacer)
- startTime := time.Now()
- err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain)
- eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
- time.Since(startTime), err)
- return err
- }
- func getUserForEventAction(username string) (dataprovider.User, error) {
- user, err := dataprovider.GetUserWithGroupSettings(username)
- if err != nil {
- return dataprovider.User{}, err
- }
- user.Filters.DisableFsChecks = false
- user.Filters.FilePatterns = nil
- for k := range user.Permissions {
- user.Permissions[k] = []string{dataprovider.PermAny}
- }
- return user, err
- }
- func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
- fs, fsPath, err := conn.GetFsAndResolvedPath(item)
- if err != nil {
- return err
- }
- return conn.RemoveFile(fs, fsPath, item, info)
- }
- func executeDeleteFsAction(deletes []string, replacer *strings.Replacer, username string) error {
- user, err := getUserForEventAction(username)
- if err != nil {
- return err
- }
- connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
- err = user.CheckFsRoot(connectionID)
- defer user.CloseFs() //nolint:errcheck
- if err != nil {
- return err
- }
- conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
- for _, item := range deletes {
- item = replaceWithReplacer(item, replacer)
- info, err := conn.DoStat(item, 0, false)
- if err != nil {
- if conn.IsNotExistError(err) {
- continue
- }
- return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
- }
- if info.IsDir() {
- if err = conn.RemoveDir(item); err != nil {
- return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
- }
- } else {
- if err = executeDeleteFileFsAction(conn, item, info); err != nil {
- return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
- }
- }
- eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
- }
- return nil
- }
- func executeMkDirsFsAction(dirs []string, replacer *strings.Replacer, username string) error {
- user, err := getUserForEventAction(username)
- if err != nil {
- return err
- }
- connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
- err = user.CheckFsRoot(connectionID)
- defer user.CloseFs() //nolint:errcheck
- if err != nil {
- return err
- }
- conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
- for _, item := range dirs {
- item = replaceWithReplacer(item, replacer)
- if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
- return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
- }
- if err = conn.createDirIfMissing(item); err != nil {
- return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
- }
- eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
- }
- return nil
- }
- func executeRenameFsAction(renames []dataprovider.KeyValue, replacer *strings.Replacer, username string) error {
- user, err := getUserForEventAction(username)
- if err != nil {
- return err
- }
- connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
- err = user.CheckFsRoot(connectionID)
- defer user.CloseFs() //nolint:errcheck
- if err != nil {
- return err
- }
- conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
- for _, item := range renames {
- source := replaceWithReplacer(item.Key, replacer)
- target := replaceWithReplacer(item.Value, replacer)
- if err = conn.Rename(source, target); err != nil {
- return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
- }
- eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
- }
- return nil
- }
- func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, params EventParams) error {
- addObjectData := false
- replacements := params.getStringReplacements(addObjectData)
- replacer := strings.NewReplacer(replacements...)
- switch c.Type {
- case dataprovider.FilesystemActionRename:
- return executeRenameFsAction(c.Renames, replacer, params.sender)
- case dataprovider.FilesystemActionDelete:
- return executeDeleteFsAction(c.Deletes, replacer, params.sender)
- case dataprovider.FilesystemActionMkdirs:
- return executeMkDirsFsAction(c.MkDirs, replacer, params.sender)
- default:
- return fmt.Errorf("unsupported filesystem action %d", c.Type)
- }
- }
- func executeQuotaResetForUser(user dataprovider.User) error {
- if err := user.LoadAndApplyGroupSettings(); err != nil {
- eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
- user.Username, err)
- return err
- }
- if !QuotaScans.AddUserQuotaScan(user.Username) {
- eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
- return fmt.Errorf("another quota scan is in progress for user %s", user.Username)
- }
- defer QuotaScans.RemoveUserQuotaScan(user.Username)
- numFiles, size, err := user.ScanQuota()
- if err != nil {
- eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
- return err
- }
- err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
- if err != nil {
- eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
- return err
- }
- return nil
- }
- func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
- users, err := params.getUsers()
- if err != nil {
- return fmt.Errorf("unable to get users: %w", err)
- }
- var failedResets []string
- executed := 0
- for _, user := range users {
- // if sender is set, the conditions have already been evaluated
- if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
- eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
- user.Username)
- continue
- }
- executed++
- if err = executeQuotaResetForUser(user); err != nil {
- failedResets = append(failedResets, user.Username)
- continue
- }
- }
- if len(failedResets) > 0 {
- return fmt.Errorf("quota reset failed for users: %+v", failedResets)
- }
- if executed == 0 {
- eventManagerLog(logger.LevelError, "no user quota reset executed")
- return errors.New("no user quota reset executed")
- }
- return nil
- }
- func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
- folders, err := params.getFolders()
- if err != nil {
- return fmt.Errorf("unable to get folders: %w", err)
- }
- var failedResets []string
- executed := 0
- for _, folder := range folders {
- // if sender is set, the conditions have already been evaluated
- if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
- eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
- folder.Name)
- continue
- }
- if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
- eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
- failedResets = append(failedResets, folder.Name)
- continue
- }
- executed++
- f := vfs.VirtualFolder{
- BaseVirtualFolder: folder,
- VirtualPath: "/",
- }
- numFiles, size, err := f.ScanQuota()
- QuotaScans.RemoveVFolderQuotaScan(folder.Name)
- if err != nil {
- eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
- failedResets = append(failedResets, folder.Name)
- continue
- }
- err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
- if err != nil {
- eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
- failedResets = append(failedResets, folder.Name)
- }
- }
- if len(failedResets) > 0 {
- return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
- }
- if executed == 0 {
- eventManagerLog(logger.LevelError, "no folder quota reset executed")
- return errors.New("no folder quota reset executed")
- }
- return nil
- }
- func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
- users, err := params.getUsers()
- if err != nil {
- return fmt.Errorf("unable to get users: %w", err)
- }
- var failedResets []string
- executed := 0
- for _, user := range users {
- // if sender is set, the conditions have already been evaluated
- if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
- eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
- user.Username)
- continue
- }
- executed++
- err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
- if err != nil {
- eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
- failedResets = append(failedResets, user.Username)
- }
- }
- if len(failedResets) > 0 {
- return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
- }
- if executed == 0 {
- eventManagerLog(logger.LevelError, "no transfer quota reset executed")
- return errors.New("no transfer quota reset executed")
- }
- return nil
- }
- func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention) error {
- if err := user.LoadAndApplyGroupSettings(); err != nil {
- eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
- user.Username, err)
- return err
- }
- check := RetentionCheck{
- Folders: folders,
- }
- c := RetentionChecks.Add(check, &user)
- if c == nil {
- eventManagerLog(logger.LevelError, "another retention check is already in progress for user %s", user.Username)
- return fmt.Errorf("another retention check is in progress for user %s", user.Username)
- }
- if err := c.Start(); err != nil {
- eventManagerLog(logger.LevelError, "error checking retention for user %s: %v", user.Username, err)
- return err
- }
- return nil
- }
- func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
- conditions dataprovider.ConditionOptions, params EventParams,
- ) error {
- users, err := params.getUsers()
- if err != nil {
- return fmt.Errorf("unable to get users: %w", err)
- }
- var failedChecks []string
- executed := 0
- for _, user := range users {
- // if sender is set, the conditions have already been evaluated
- if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
- eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
- user.Username)
- continue
- }
- executed++
- if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
- failedChecks = append(failedChecks, user.Username)
- continue
- }
- }
- if len(failedChecks) > 0 {
- return fmt.Errorf("retention check failed for users: %+v", failedChecks)
- }
- if executed == 0 {
- eventManagerLog(logger.LevelError, "no retention check executed")
- return errors.New("no retention check executed")
- }
- return nil
- }
- func executeRuleAction(action dataprovider.BaseEventAction, params EventParams, conditions dataprovider.ConditionOptions) error {
- switch action.Type {
- case dataprovider.ActionTypeHTTP:
- return executeHTTPRuleAction(action.Options.HTTPConfig, params)
- case dataprovider.ActionTypeCommand:
- return executeCommandRuleAction(action.Options.CmdConfig, params)
- case dataprovider.ActionTypeEmail:
- return executeEmailRuleAction(action.Options.EmailConfig, params)
- case dataprovider.ActionTypeBackup:
- return dataprovider.ExecuteBackup()
- case dataprovider.ActionTypeUserQuotaReset:
- return executeUsersQuotaResetRuleAction(conditions, params)
- case dataprovider.ActionTypeFolderQuotaReset:
- return executeFoldersQuotaResetRuleAction(conditions, params)
- case dataprovider.ActionTypeTransferQuotaReset:
- return executeTransferQuotaResetRuleAction(conditions, params)
- case dataprovider.ActionTypeDataRetentionCheck:
- return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
- case dataprovider.ActionTypeFilesystem:
- return executeFsRuleAction(action.Options.FsConfig, params)
- default:
- return fmt.Errorf("unsupported action type: %d", action.Type)
- }
- }
- func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
- var errRes error
- for _, rule := range rules {
- var failedActions []string
- for _, action := range rule.Actions {
- if !action.Options.IsFailureAction && action.Options.ExecuteSync {
- startTime := time.Now()
- if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
- eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
- action.Name, rule.Name, time.Since(startTime), err)
- failedActions = append(failedActions, action.Name)
- // we return the last error, it is ok for now
- errRes = err
- if action.Options.StopOnFailure {
- break
- }
- } else {
- eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
- action.Name, rule.Name, time.Since(startTime))
- }
- }
- }
- // execute async actions if any, including failure actions
- go executeRuleAsyncActions(rule, params, failedActions)
- }
- return errRes
- }
- func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
- eventManager.addAsyncTask()
- defer eventManager.removeAsyncTask()
- for _, rule := range rules {
- executeRuleAsyncActions(rule, params, nil)
- }
- }
- func executeRuleAsyncActions(rule dataprovider.EventRule, params EventParams, failedActions []string) {
- for _, action := range rule.Actions {
- if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
- startTime := time.Now()
- if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
- eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
- action.Name, rule.Name, time.Since(startTime), err)
- failedActions = append(failedActions, action.Name)
- if action.Options.StopOnFailure {
- break
- }
- } else {
- eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
- action.Name, rule.Name, time.Since(startTime))
- }
- }
- }
- if len(failedActions) > 0 {
- // execute failure actions
- for _, action := range rule.Actions {
- if action.Options.IsFailureAction {
- startTime := time.Now()
- if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
- eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
- action.Name, rule.Name, time.Since(startTime), err)
- if action.Options.StopOnFailure {
- break
- }
- } else {
- eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
- action.Name, rule.Name, time.Since(startTime))
- }
- }
- }
- }
- }
- type eventCronJob struct {
- ruleName string
- }
- func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
- if rule.GuardFromConcurrentExecution() {
- task, err := dataprovider.GetTaskByName(rule.Name)
- if _, ok := err.(*util.RecordNotFoundError); ok {
- eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
- task = dataprovider.Task{
- Name: rule.Name,
- UpdateAt: 0,
- Version: 0,
- }
- err = dataprovider.AddTask(rule.Name)
- if err != nil {
- eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
- return task, err
- }
- } else {
- eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
- }
- return task, err
- }
- return dataprovider.Task{}, nil
- }
- func (j *eventCronJob) Run() {
- eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
- rule, err := dataprovider.EventRuleExists(j.ruleName)
- if err != nil {
- eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
- return
- }
- if err = rule.CheckActionsConsistency(""); err != nil {
- eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
- return
- }
- task, err := j.getTask(rule)
- if err != nil {
- return
- }
- if task.Name != "" {
- updateInterval := 5 * time.Minute
- updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
- if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
- eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
- return
- }
- err = dataprovider.UpdateTask(rule.Name, task.Version)
- if err != nil {
- eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
- rule.Name, err)
- return
- }
- ticker := time.NewTicker(updateInterval)
- done := make(chan bool)
- defer func() {
- done <- true
- ticker.Stop()
- }()
- go func(taskName string) {
- eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
- for {
- select {
- case <-done:
- eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
- return
- case <-ticker.C:
- err := dataprovider.UpdateTaskTimestamp(taskName)
- eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
- }
- }
- }(task.Name)
- executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
- } else {
- executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
- }
- eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
- }
- func eventManagerLog(level logger.LogLevel, format string, v ...any) {
- logger.Log(level, "eventmanager", "", format, v...)
- }
|