eventmanager.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "os/exec"
  25. "path"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/robfig/cron/v3"
  31. "github.com/rs/xid"
  32. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  33. "github.com/drakkan/sftpgo/v2/internal/logger"
  34. "github.com/drakkan/sftpgo/v2/internal/plugin"
  35. "github.com/drakkan/sftpgo/v2/internal/smtp"
  36. "github.com/drakkan/sftpgo/v2/internal/util"
  37. "github.com/drakkan/sftpgo/v2/internal/vfs"
  38. )
  39. const (
  40. ipBlockedEventName = "IP Blocked"
  41. )
  42. var (
  43. // eventManager handle the supported event rules actions
  44. eventManager eventRulesContainer
  45. )
  46. func init() {
  47. eventManager = eventRulesContainer{
  48. schedulesMapping: make(map[string][]cron.EntryID),
  49. // arbitrary maximum number of concurrent asynchronous tasks,
  50. // each task could execute multiple actions
  51. concurrencyGuard: make(chan struct{}, 200),
  52. }
  53. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  54. func(operation, executor, ip, objectType, objectName string, object plugin.Renderer) {
  55. eventManager.handleProviderEvent(EventParams{
  56. Name: executor,
  57. ObjectName: objectName,
  58. Event: operation,
  59. Status: 1,
  60. ObjectType: objectType,
  61. IP: ip,
  62. Timestamp: time.Now().UnixNano(),
  63. Object: object,
  64. })
  65. })
  66. }
  67. // HandleCertificateEvent checks and executes action rules for certificate events
  68. func HandleCertificateEvent(params EventParams) {
  69. eventManager.handleCertificateEvent(params)
  70. }
  71. // eventRulesContainer stores event rules by trigger
  72. type eventRulesContainer struct {
  73. sync.RWMutex
  74. lastLoad int64
  75. FsEvents []dataprovider.EventRule
  76. ProviderEvents []dataprovider.EventRule
  77. Schedules []dataprovider.EventRule
  78. IPBlockedEvents []dataprovider.EventRule
  79. CertificateEvents []dataprovider.EventRule
  80. schedulesMapping map[string][]cron.EntryID
  81. concurrencyGuard chan struct{}
  82. }
  83. func (r *eventRulesContainer) addAsyncTask() {
  84. r.concurrencyGuard <- struct{}{}
  85. }
  86. func (r *eventRulesContainer) removeAsyncTask() {
  87. <-r.concurrencyGuard
  88. }
  89. func (r *eventRulesContainer) getLastLoadTime() int64 {
  90. return atomic.LoadInt64(&r.lastLoad)
  91. }
  92. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  93. atomic.StoreInt64(&r.lastLoad, modTime)
  94. }
  95. // RemoveRule deletes the rule with the specified name
  96. func (r *eventRulesContainer) RemoveRule(name string) {
  97. r.Lock()
  98. defer r.Unlock()
  99. r.removeRuleInternal(name)
  100. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  101. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  102. }
  103. func (r *eventRulesContainer) removeRuleInternal(name string) {
  104. for idx := range r.FsEvents {
  105. if r.FsEvents[idx].Name == name {
  106. lastIdx := len(r.FsEvents) - 1
  107. r.FsEvents[idx] = r.FsEvents[lastIdx]
  108. r.FsEvents = r.FsEvents[:lastIdx]
  109. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  110. return
  111. }
  112. }
  113. for idx := range r.ProviderEvents {
  114. if r.ProviderEvents[idx].Name == name {
  115. lastIdx := len(r.ProviderEvents) - 1
  116. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  117. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  118. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  119. return
  120. }
  121. }
  122. for idx := range r.IPBlockedEvents {
  123. if r.IPBlockedEvents[idx].Name == name {
  124. lastIdx := len(r.IPBlockedEvents) - 1
  125. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  126. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  127. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  128. return
  129. }
  130. }
  131. for idx := range r.CertificateEvents {
  132. if r.CertificateEvents[idx].Name == name {
  133. lastIdx := len(r.CertificateEvents) - 1
  134. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  135. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  136. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  137. return
  138. }
  139. }
  140. for idx := range r.Schedules {
  141. if r.Schedules[idx].Name == name {
  142. if schedules, ok := r.schedulesMapping[name]; ok {
  143. for _, entryID := range schedules {
  144. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  145. eventScheduler.Remove(entryID)
  146. }
  147. delete(r.schedulesMapping, name)
  148. }
  149. lastIdx := len(r.Schedules) - 1
  150. r.Schedules[idx] = r.Schedules[lastIdx]
  151. r.Schedules = r.Schedules[:lastIdx]
  152. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  153. return
  154. }
  155. }
  156. }
  157. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  158. r.removeRuleInternal(rule.Name)
  159. if rule.DeletedAt > 0 {
  160. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  161. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  162. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  163. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  164. }
  165. return
  166. }
  167. switch rule.Trigger {
  168. case dataprovider.EventTriggerFsEvent:
  169. r.FsEvents = append(r.FsEvents, rule)
  170. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  171. case dataprovider.EventTriggerProviderEvent:
  172. r.ProviderEvents = append(r.ProviderEvents, rule)
  173. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  174. case dataprovider.EventTriggerIPBlocked:
  175. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  176. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  177. case dataprovider.EventTriggerCertificate:
  178. r.CertificateEvents = append(r.CertificateEvents, rule)
  179. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  180. case dataprovider.EventTriggerSchedule:
  181. for _, schedule := range rule.Conditions.Schedules {
  182. cronSpec := schedule.GetCronSpec()
  183. job := &eventCronJob{
  184. ruleName: dataprovider.ConvertName(rule.Name),
  185. }
  186. entryID, err := eventScheduler.AddJob(cronSpec, job)
  187. if err != nil {
  188. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  189. return
  190. }
  191. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  192. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  193. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  194. }
  195. r.Schedules = append(r.Schedules, rule)
  196. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  197. default:
  198. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  199. }
  200. }
  201. func (r *eventRulesContainer) loadRules() {
  202. eventManagerLog(logger.LevelDebug, "loading updated rules")
  203. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  204. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  205. if err != nil {
  206. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  207. return
  208. }
  209. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  210. if len(rules) > 0 {
  211. r.Lock()
  212. defer r.Unlock()
  213. for _, rule := range rules {
  214. r.addUpdateRuleInternal(rule)
  215. }
  216. }
  217. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  218. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  219. r.setLastLoadTime(modTime)
  220. }
  221. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  222. if !util.Contains(conditions.ProviderEvents, params.Event) {
  223. return false
  224. }
  225. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  226. return false
  227. }
  228. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  229. return false
  230. }
  231. return true
  232. }
  233. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  234. if !util.Contains(conditions.FsEvents, params.Event) {
  235. return false
  236. }
  237. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  238. return false
  239. }
  240. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  241. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  242. return false
  243. }
  244. }
  245. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  246. return false
  247. }
  248. if params.Event == operationUpload || params.Event == operationDownload {
  249. if conditions.Options.MinFileSize > 0 {
  250. if params.FileSize < conditions.Options.MinFileSize {
  251. return false
  252. }
  253. }
  254. if conditions.Options.MaxFileSize > 0 {
  255. if params.FileSize > conditions.Options.MaxFileSize {
  256. return false
  257. }
  258. }
  259. }
  260. return true
  261. }
  262. // hasFsRules returns true if there are any rules for filesystem event triggers
  263. func (r *eventRulesContainer) hasFsRules() bool {
  264. r.RLock()
  265. defer r.RUnlock()
  266. return len(r.FsEvents) > 0
  267. }
  268. // handleFsEvent executes the rules actions defined for the specified event
  269. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  270. if params.Protocol == protocolEventAction {
  271. return nil
  272. }
  273. r.RLock()
  274. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  275. for _, rule := range r.FsEvents {
  276. if r.checkFsEventMatch(rule.Conditions, params) {
  277. if err := rule.CheckActionsConsistency(""); err != nil {
  278. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  279. rule.Name, err, params.Event)
  280. continue
  281. }
  282. hasSyncActions := false
  283. for _, action := range rule.Actions {
  284. if action.Options.ExecuteSync {
  285. hasSyncActions = true
  286. break
  287. }
  288. }
  289. if hasSyncActions {
  290. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  291. } else {
  292. rulesAsync = append(rulesAsync, rule)
  293. }
  294. }
  295. }
  296. r.RUnlock()
  297. params.sender = params.Name
  298. if len(rulesAsync) > 0 {
  299. go executeAsyncRulesActions(rulesAsync, params)
  300. }
  301. if len(rulesWithSyncActions) > 0 {
  302. return executeSyncRulesActions(rulesWithSyncActions, params)
  303. }
  304. return nil
  305. }
  306. // username is populated for user objects
  307. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  308. r.RLock()
  309. defer r.RUnlock()
  310. var rules []dataprovider.EventRule
  311. for _, rule := range r.ProviderEvents {
  312. if r.checkProviderEventMatch(rule.Conditions, params) {
  313. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  314. rules = append(rules, rule)
  315. } else {
  316. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  317. rule.Name, err, params.Event, params.ObjectType)
  318. }
  319. }
  320. }
  321. if len(rules) > 0 {
  322. params.sender = params.ObjectName
  323. go executeAsyncRulesActions(rules, params)
  324. }
  325. }
  326. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  327. r.RLock()
  328. defer r.RUnlock()
  329. if len(r.IPBlockedEvents) == 0 {
  330. return
  331. }
  332. var rules []dataprovider.EventRule
  333. for _, rule := range r.IPBlockedEvents {
  334. if err := rule.CheckActionsConsistency(""); err == nil {
  335. rules = append(rules, rule)
  336. } else {
  337. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  338. rule.Name, err, params.Event)
  339. }
  340. }
  341. if len(rules) > 0 {
  342. go executeAsyncRulesActions(rules, params)
  343. }
  344. }
  345. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  346. r.RLock()
  347. defer r.RUnlock()
  348. if len(r.CertificateEvents) == 0 {
  349. return
  350. }
  351. var rules []dataprovider.EventRule
  352. for _, rule := range r.CertificateEvents {
  353. if err := rule.CheckActionsConsistency(""); err == nil {
  354. rules = append(rules, rule)
  355. } else {
  356. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  357. rule.Name, err, params.Event)
  358. }
  359. }
  360. if len(rules) > 0 {
  361. go executeAsyncRulesActions(rules, params)
  362. }
  363. }
  364. // EventParams defines the supported event parameters
  365. type EventParams struct {
  366. Name string
  367. Event string
  368. Status int
  369. VirtualPath string
  370. FsPath string
  371. VirtualTargetPath string
  372. FsTargetPath string
  373. ObjectName string
  374. ObjectType string
  375. FileSize int64
  376. Protocol string
  377. IP string
  378. Timestamp int64
  379. Object plugin.Renderer
  380. sender string
  381. }
  382. // getUsers returns users with group settings not applied
  383. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  384. if p.sender == "" {
  385. return dataprovider.DumpUsers()
  386. }
  387. user, err := dataprovider.UserExists(p.sender)
  388. if err != nil {
  389. return nil, fmt.Errorf("error getting user %q: %w", p.sender, err)
  390. }
  391. return []dataprovider.User{user}, nil
  392. }
  393. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  394. if p.sender == "" {
  395. return dataprovider.DumpFolders()
  396. }
  397. folder, err := dataprovider.GetFolderByName(p.sender)
  398. if err != nil {
  399. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  400. }
  401. return []vfs.BaseVirtualFolder{folder}, nil
  402. }
  403. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  404. replacements := []string{
  405. "{{Name}}", p.Name,
  406. "{{Event}}", p.Event,
  407. "{{Status}}", fmt.Sprintf("%d", p.Status),
  408. "{{VirtualPath}}", p.VirtualPath,
  409. "{{FsPath}}", p.FsPath,
  410. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  411. "{{FsTargetPath}}", p.FsTargetPath,
  412. "{{ObjectName}}", p.ObjectName,
  413. "{{ObjectType}}", p.ObjectType,
  414. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  415. "{{Protocol}}", p.Protocol,
  416. "{{IP}}", p.IP,
  417. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  418. }
  419. if addObjectData {
  420. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  421. if err == nil {
  422. replacements = append(replacements, "{{ObjectData}}", string(data))
  423. }
  424. }
  425. return replacements
  426. }
  427. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  428. if !strings.Contains(input, "{{") {
  429. return input
  430. }
  431. return replacer.Replace(input)
  432. }
  433. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  434. matched, err := path.Match(p.Pattern, name)
  435. if err != nil {
  436. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  437. return false
  438. }
  439. if p.InverseMatch {
  440. return !matched
  441. }
  442. return matched
  443. }
  444. // checkConditionPatterns returns false if patterns are defined and no match is found
  445. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  446. if len(patterns) == 0 {
  447. return true
  448. }
  449. for _, p := range patterns {
  450. if checkEventConditionPattern(p, name) {
  451. return true
  452. }
  453. }
  454. return false
  455. }
  456. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  457. if len(c.QueryParameters) > 0 {
  458. u, err := url.Parse(c.Endpoint)
  459. if err != nil {
  460. return "", fmt.Errorf("invalid endpoint: %w", err)
  461. }
  462. q := u.Query()
  463. for _, keyVal := range c.QueryParameters {
  464. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  465. }
  466. u.RawQuery = q.Encode()
  467. return u.String(), nil
  468. }
  469. return c.Endpoint, nil
  470. }
  471. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventParams) error {
  472. if !c.Password.IsEmpty() {
  473. if err := c.Password.TryDecrypt(); err != nil {
  474. return fmt.Errorf("unable to decrypt password: %w", err)
  475. }
  476. }
  477. addObjectData := false
  478. if params.Object != nil {
  479. if !addObjectData {
  480. if strings.Contains(c.Body, "{{ObjectData}}") {
  481. addObjectData = true
  482. }
  483. }
  484. }
  485. replacements := params.getStringReplacements(addObjectData)
  486. replacer := strings.NewReplacer(replacements...)
  487. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  488. if err != nil {
  489. return err
  490. }
  491. var body io.Reader
  492. if c.Body != "" && c.Method != http.MethodGet {
  493. body = bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))
  494. }
  495. req, err := http.NewRequest(c.Method, endpoint, body)
  496. if err != nil {
  497. return err
  498. }
  499. if c.Username != "" {
  500. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetAdditionalData())
  501. }
  502. for _, keyVal := range c.Headers {
  503. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  504. }
  505. client := c.GetHTTPClient()
  506. defer client.CloseIdleConnections()
  507. startTime := time.Now()
  508. resp, err := client.Do(req)
  509. if err != nil {
  510. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  511. endpoint, time.Since(startTime), err)
  512. return err
  513. }
  514. defer resp.Body.Close()
  515. eventManagerLog(logger.LevelDebug, "http notification sent, endopoint: %s, elapsed: %s, status code: %d",
  516. endpoint, time.Since(startTime), resp.StatusCode)
  517. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  518. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  519. }
  520. return nil
  521. }
  522. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params EventParams) error {
  523. envVars := make([]string, 0, len(c.EnvVars))
  524. addObjectData := false
  525. if params.Object != nil {
  526. for _, k := range c.EnvVars {
  527. if strings.Contains(k.Value, "{{ObjectData}}") {
  528. addObjectData = true
  529. break
  530. }
  531. }
  532. }
  533. replacements := params.getStringReplacements(addObjectData)
  534. replacer := strings.NewReplacer(replacements...)
  535. for _, keyVal := range c.EnvVars {
  536. envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  537. }
  538. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  539. defer cancel()
  540. cmd := exec.CommandContext(ctx, c.Cmd)
  541. cmd.Env = append(cmd.Env, os.Environ()...)
  542. cmd.Env = append(cmd.Env, envVars...)
  543. startTime := time.Now()
  544. err := cmd.Run()
  545. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  546. c.Cmd, time.Since(startTime), err)
  547. return err
  548. }
  549. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventParams) error {
  550. addObjectData := false
  551. if params.Object != nil {
  552. if strings.Contains(c.Body, "{{ObjectData}}") {
  553. addObjectData = true
  554. }
  555. }
  556. replacements := params.getStringReplacements(addObjectData)
  557. replacer := strings.NewReplacer(replacements...)
  558. body := replaceWithReplacer(c.Body, replacer)
  559. subject := replaceWithReplacer(c.Subject, replacer)
  560. startTime := time.Now()
  561. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain)
  562. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  563. time.Since(startTime), err)
  564. return err
  565. }
  566. func getUserForEventAction(username string) (dataprovider.User, error) {
  567. user, err := dataprovider.GetUserWithGroupSettings(username)
  568. if err != nil {
  569. return dataprovider.User{}, err
  570. }
  571. user.Filters.DisableFsChecks = false
  572. user.Filters.FilePatterns = nil
  573. for k := range user.Permissions {
  574. user.Permissions[k] = []string{dataprovider.PermAny}
  575. }
  576. return user, err
  577. }
  578. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  579. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  580. if err != nil {
  581. return err
  582. }
  583. return conn.RemoveFile(fs, fsPath, item, info)
  584. }
  585. func executeDeleteFsAction(deletes []string, replacer *strings.Replacer, username string) error {
  586. user, err := getUserForEventAction(username)
  587. if err != nil {
  588. return err
  589. }
  590. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  591. err = user.CheckFsRoot(connectionID)
  592. defer user.CloseFs() //nolint:errcheck
  593. if err != nil {
  594. return err
  595. }
  596. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  597. for _, item := range deletes {
  598. item = replaceWithReplacer(item, replacer)
  599. info, err := conn.DoStat(item, 0, false)
  600. if err != nil {
  601. if conn.IsNotExistError(err) {
  602. continue
  603. }
  604. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  605. }
  606. if info.IsDir() {
  607. if err = conn.RemoveDir(item); err != nil {
  608. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  609. }
  610. } else {
  611. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  612. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  613. }
  614. }
  615. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  616. }
  617. return nil
  618. }
  619. func executeMkDirsFsAction(dirs []string, replacer *strings.Replacer, username string) error {
  620. user, err := getUserForEventAction(username)
  621. if err != nil {
  622. return err
  623. }
  624. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  625. err = user.CheckFsRoot(connectionID)
  626. defer user.CloseFs() //nolint:errcheck
  627. if err != nil {
  628. return err
  629. }
  630. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  631. for _, item := range dirs {
  632. item = replaceWithReplacer(item, replacer)
  633. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  634. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  635. }
  636. if err = conn.createDirIfMissing(item); err != nil {
  637. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  638. }
  639. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  640. }
  641. return nil
  642. }
  643. func executeRenameFsAction(renames []dataprovider.KeyValue, replacer *strings.Replacer, username string) error {
  644. user, err := getUserForEventAction(username)
  645. if err != nil {
  646. return err
  647. }
  648. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  649. err = user.CheckFsRoot(connectionID)
  650. defer user.CloseFs() //nolint:errcheck
  651. if err != nil {
  652. return err
  653. }
  654. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  655. for _, item := range renames {
  656. source := replaceWithReplacer(item.Key, replacer)
  657. target := replaceWithReplacer(item.Value, replacer)
  658. if err = conn.Rename(source, target); err != nil {
  659. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  660. }
  661. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  662. }
  663. return nil
  664. }
  665. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, params EventParams) error {
  666. addObjectData := false
  667. replacements := params.getStringReplacements(addObjectData)
  668. replacer := strings.NewReplacer(replacements...)
  669. switch c.Type {
  670. case dataprovider.FilesystemActionRename:
  671. return executeRenameFsAction(c.Renames, replacer, params.sender)
  672. case dataprovider.FilesystemActionDelete:
  673. return executeDeleteFsAction(c.Deletes, replacer, params.sender)
  674. case dataprovider.FilesystemActionMkdirs:
  675. return executeMkDirsFsAction(c.MkDirs, replacer, params.sender)
  676. default:
  677. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  678. }
  679. }
  680. func executeQuotaResetForUser(user dataprovider.User) error {
  681. if err := user.LoadAndApplyGroupSettings(); err != nil {
  682. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  683. user.Username, err)
  684. return err
  685. }
  686. if !QuotaScans.AddUserQuotaScan(user.Username) {
  687. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
  688. return fmt.Errorf("another quota scan is in progress for user %s", user.Username)
  689. }
  690. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  691. numFiles, size, err := user.ScanQuota()
  692. if err != nil {
  693. eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
  694. return err
  695. }
  696. err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
  697. if err != nil {
  698. eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
  699. return err
  700. }
  701. return nil
  702. }
  703. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  704. users, err := params.getUsers()
  705. if err != nil {
  706. return fmt.Errorf("unable to get users: %w", err)
  707. }
  708. var failedResets []string
  709. executed := 0
  710. for _, user := range users {
  711. // if sender is set, the conditions have already been evaluated
  712. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  713. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
  714. user.Username)
  715. continue
  716. }
  717. executed++
  718. if err = executeQuotaResetForUser(user); err != nil {
  719. failedResets = append(failedResets, user.Username)
  720. continue
  721. }
  722. }
  723. if len(failedResets) > 0 {
  724. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  725. }
  726. if executed == 0 {
  727. eventManagerLog(logger.LevelError, "no user quota reset executed")
  728. return errors.New("no user quota reset executed")
  729. }
  730. return nil
  731. }
  732. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  733. folders, err := params.getFolders()
  734. if err != nil {
  735. return fmt.Errorf("unable to get folders: %w", err)
  736. }
  737. var failedResets []string
  738. executed := 0
  739. for _, folder := range folders {
  740. // if sender is set, the conditions have already been evaluated
  741. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  742. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  743. folder.Name)
  744. continue
  745. }
  746. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  747. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
  748. failedResets = append(failedResets, folder.Name)
  749. continue
  750. }
  751. executed++
  752. f := vfs.VirtualFolder{
  753. BaseVirtualFolder: folder,
  754. VirtualPath: "/",
  755. }
  756. numFiles, size, err := f.ScanQuota()
  757. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  758. if err != nil {
  759. eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
  760. failedResets = append(failedResets, folder.Name)
  761. continue
  762. }
  763. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  764. if err != nil {
  765. eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
  766. failedResets = append(failedResets, folder.Name)
  767. }
  768. }
  769. if len(failedResets) > 0 {
  770. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  771. }
  772. if executed == 0 {
  773. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  774. return errors.New("no folder quota reset executed")
  775. }
  776. return nil
  777. }
  778. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
  779. users, err := params.getUsers()
  780. if err != nil {
  781. return fmt.Errorf("unable to get users: %w", err)
  782. }
  783. var failedResets []string
  784. executed := 0
  785. for _, user := range users {
  786. // if sender is set, the conditions have already been evaluated
  787. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  788. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
  789. user.Username)
  790. continue
  791. }
  792. executed++
  793. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  794. if err != nil {
  795. eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
  796. failedResets = append(failedResets, user.Username)
  797. }
  798. }
  799. if len(failedResets) > 0 {
  800. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  801. }
  802. if executed == 0 {
  803. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  804. return errors.New("no transfer quota reset executed")
  805. }
  806. return nil
  807. }
  808. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention) error {
  809. if err := user.LoadAndApplyGroupSettings(); err != nil {
  810. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  811. user.Username, err)
  812. return err
  813. }
  814. check := RetentionCheck{
  815. Folders: folders,
  816. }
  817. c := RetentionChecks.Add(check, &user)
  818. if c == nil {
  819. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %s", user.Username)
  820. return fmt.Errorf("another retention check is in progress for user %s", user.Username)
  821. }
  822. if err := c.Start(); err != nil {
  823. eventManagerLog(logger.LevelError, "error checking retention for user %s: %v", user.Username, err)
  824. return err
  825. }
  826. return nil
  827. }
  828. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  829. conditions dataprovider.ConditionOptions, params EventParams,
  830. ) error {
  831. users, err := params.getUsers()
  832. if err != nil {
  833. return fmt.Errorf("unable to get users: %w", err)
  834. }
  835. var failedChecks []string
  836. executed := 0
  837. for _, user := range users {
  838. // if sender is set, the conditions have already been evaluated
  839. if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
  840. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, name conditions don't match",
  841. user.Username)
  842. continue
  843. }
  844. executed++
  845. if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
  846. failedChecks = append(failedChecks, user.Username)
  847. continue
  848. }
  849. }
  850. if len(failedChecks) > 0 {
  851. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  852. }
  853. if executed == 0 {
  854. eventManagerLog(logger.LevelError, "no retention check executed")
  855. return errors.New("no retention check executed")
  856. }
  857. return nil
  858. }
  859. func executeRuleAction(action dataprovider.BaseEventAction, params EventParams, conditions dataprovider.ConditionOptions) error {
  860. switch action.Type {
  861. case dataprovider.ActionTypeHTTP:
  862. return executeHTTPRuleAction(action.Options.HTTPConfig, params)
  863. case dataprovider.ActionTypeCommand:
  864. return executeCommandRuleAction(action.Options.CmdConfig, params)
  865. case dataprovider.ActionTypeEmail:
  866. return executeEmailRuleAction(action.Options.EmailConfig, params)
  867. case dataprovider.ActionTypeBackup:
  868. return dataprovider.ExecuteBackup()
  869. case dataprovider.ActionTypeUserQuotaReset:
  870. return executeUsersQuotaResetRuleAction(conditions, params)
  871. case dataprovider.ActionTypeFolderQuotaReset:
  872. return executeFoldersQuotaResetRuleAction(conditions, params)
  873. case dataprovider.ActionTypeTransferQuotaReset:
  874. return executeTransferQuotaResetRuleAction(conditions, params)
  875. case dataprovider.ActionTypeDataRetentionCheck:
  876. return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
  877. case dataprovider.ActionTypeFilesystem:
  878. return executeFsRuleAction(action.Options.FsConfig, params)
  879. default:
  880. return fmt.Errorf("unsupported action type: %d", action.Type)
  881. }
  882. }
  883. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  884. var errRes error
  885. for _, rule := range rules {
  886. var failedActions []string
  887. for _, action := range rule.Actions {
  888. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  889. startTime := time.Now()
  890. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  891. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  892. action.Name, rule.Name, time.Since(startTime), err)
  893. failedActions = append(failedActions, action.Name)
  894. // we return the last error, it is ok for now
  895. errRes = err
  896. if action.Options.StopOnFailure {
  897. break
  898. }
  899. } else {
  900. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  901. action.Name, rule.Name, time.Since(startTime))
  902. }
  903. }
  904. }
  905. // execute async actions if any, including failure actions
  906. go executeRuleAsyncActions(rule, params, failedActions)
  907. }
  908. return errRes
  909. }
  910. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  911. eventManager.addAsyncTask()
  912. defer eventManager.removeAsyncTask()
  913. for _, rule := range rules {
  914. executeRuleAsyncActions(rule, params, nil)
  915. }
  916. }
  917. func executeRuleAsyncActions(rule dataprovider.EventRule, params EventParams, failedActions []string) {
  918. for _, action := range rule.Actions {
  919. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  920. startTime := time.Now()
  921. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  922. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  923. action.Name, rule.Name, time.Since(startTime), err)
  924. failedActions = append(failedActions, action.Name)
  925. if action.Options.StopOnFailure {
  926. break
  927. }
  928. } else {
  929. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  930. action.Name, rule.Name, time.Since(startTime))
  931. }
  932. }
  933. }
  934. if len(failedActions) > 0 {
  935. // execute failure actions
  936. for _, action := range rule.Actions {
  937. if action.Options.IsFailureAction {
  938. startTime := time.Now()
  939. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  940. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  941. action.Name, rule.Name, time.Since(startTime), err)
  942. if action.Options.StopOnFailure {
  943. break
  944. }
  945. } else {
  946. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  947. action.Name, rule.Name, time.Since(startTime))
  948. }
  949. }
  950. }
  951. }
  952. }
  953. type eventCronJob struct {
  954. ruleName string
  955. }
  956. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  957. if rule.GuardFromConcurrentExecution() {
  958. task, err := dataprovider.GetTaskByName(rule.Name)
  959. if _, ok := err.(*util.RecordNotFoundError); ok {
  960. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  961. task = dataprovider.Task{
  962. Name: rule.Name,
  963. UpdateAt: 0,
  964. Version: 0,
  965. }
  966. err = dataprovider.AddTask(rule.Name)
  967. if err != nil {
  968. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  969. return task, err
  970. }
  971. } else {
  972. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  973. }
  974. return task, err
  975. }
  976. return dataprovider.Task{}, nil
  977. }
  978. func (j *eventCronJob) Run() {
  979. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  980. rule, err := dataprovider.EventRuleExists(j.ruleName)
  981. if err != nil {
  982. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  983. return
  984. }
  985. if err = rule.CheckActionsConsistency(""); err != nil {
  986. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  987. return
  988. }
  989. task, err := j.getTask(rule)
  990. if err != nil {
  991. return
  992. }
  993. if task.Name != "" {
  994. updateInterval := 5 * time.Minute
  995. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  996. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  997. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  998. return
  999. }
  1000. err = dataprovider.UpdateTask(rule.Name, task.Version)
  1001. if err != nil {
  1002. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  1003. rule.Name, err)
  1004. return
  1005. }
  1006. ticker := time.NewTicker(updateInterval)
  1007. done := make(chan bool)
  1008. defer func() {
  1009. done <- true
  1010. ticker.Stop()
  1011. }()
  1012. go func(taskName string) {
  1013. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  1014. for {
  1015. select {
  1016. case <-done:
  1017. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  1018. return
  1019. case <-ticker.C:
  1020. err := dataprovider.UpdateTaskTimestamp(taskName)
  1021. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  1022. }
  1023. }
  1024. }(task.Name)
  1025. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
  1026. } else {
  1027. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
  1028. }
  1029. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  1030. }
  1031. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  1032. logger.Log(level, "eventmanager", "", format, v...)
  1033. }