eventmanager.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "mime/multipart"
  24. "net/http"
  25. "net/textproto"
  26. "net/url"
  27. "os"
  28. "os/exec"
  29. "path"
  30. "path/filepath"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/klauspost/compress/zip"
  37. "github.com/robfig/cron/v3"
  38. "github.com/rs/xid"
  39. "github.com/sftpgo/sdk"
  40. mail "github.com/xhit/go-simple-mail/v2"
  41. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  42. "github.com/drakkan/sftpgo/v2/internal/logger"
  43. "github.com/drakkan/sftpgo/v2/internal/plugin"
  44. "github.com/drakkan/sftpgo/v2/internal/smtp"
  45. "github.com/drakkan/sftpgo/v2/internal/util"
  46. "github.com/drakkan/sftpgo/v2/internal/vfs"
  47. )
  48. const (
  49. ipBlockedEventName = "IP Blocked"
  50. maxAttachmentsSize = int64(10 * 1024 * 1024)
  51. )
  52. var (
  53. // eventManager handle the supported event rules actions
  54. eventManager eventRulesContainer
  55. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  56. )
  57. func init() {
  58. eventManager = eventRulesContainer{
  59. schedulesMapping: make(map[string][]cron.EntryID),
  60. // arbitrary maximum number of concurrent asynchronous tasks,
  61. // each task could execute multiple actions
  62. concurrencyGuard: make(chan struct{}, 200),
  63. }
  64. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  65. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  66. eventManager.handleProviderEvent(EventParams{
  67. Name: executor,
  68. ObjectName: objectName,
  69. Event: operation,
  70. Status: 1,
  71. ObjectType: objectType,
  72. IP: ip,
  73. Role: role,
  74. Timestamp: time.Now().UnixNano(),
  75. Object: object,
  76. })
  77. })
  78. }
  79. // HandleCertificateEvent checks and executes action rules for certificate events
  80. func HandleCertificateEvent(params EventParams) {
  81. eventManager.handleCertificateEvent(params)
  82. }
  83. // eventRulesContainer stores event rules by trigger
  84. type eventRulesContainer struct {
  85. sync.RWMutex
  86. lastLoad atomic.Int64
  87. FsEvents []dataprovider.EventRule
  88. ProviderEvents []dataprovider.EventRule
  89. Schedules []dataprovider.EventRule
  90. IPBlockedEvents []dataprovider.EventRule
  91. CertificateEvents []dataprovider.EventRule
  92. schedulesMapping map[string][]cron.EntryID
  93. concurrencyGuard chan struct{}
  94. }
  95. func (r *eventRulesContainer) addAsyncTask() {
  96. activeHooks.Add(1)
  97. r.concurrencyGuard <- struct{}{}
  98. }
  99. func (r *eventRulesContainer) removeAsyncTask() {
  100. activeHooks.Add(-1)
  101. <-r.concurrencyGuard
  102. }
  103. func (r *eventRulesContainer) getLastLoadTime() int64 {
  104. return r.lastLoad.Load()
  105. }
  106. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  107. r.lastLoad.Store(modTime)
  108. }
  109. // RemoveRule deletes the rule with the specified name
  110. func (r *eventRulesContainer) RemoveRule(name string) {
  111. r.Lock()
  112. defer r.Unlock()
  113. r.removeRuleInternal(name)
  114. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  115. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  116. }
  117. func (r *eventRulesContainer) removeRuleInternal(name string) {
  118. for idx := range r.FsEvents {
  119. if r.FsEvents[idx].Name == name {
  120. lastIdx := len(r.FsEvents) - 1
  121. r.FsEvents[idx] = r.FsEvents[lastIdx]
  122. r.FsEvents = r.FsEvents[:lastIdx]
  123. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  124. return
  125. }
  126. }
  127. for idx := range r.ProviderEvents {
  128. if r.ProviderEvents[idx].Name == name {
  129. lastIdx := len(r.ProviderEvents) - 1
  130. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  131. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  132. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  133. return
  134. }
  135. }
  136. for idx := range r.IPBlockedEvents {
  137. if r.IPBlockedEvents[idx].Name == name {
  138. lastIdx := len(r.IPBlockedEvents) - 1
  139. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  140. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  141. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  142. return
  143. }
  144. }
  145. for idx := range r.CertificateEvents {
  146. if r.CertificateEvents[idx].Name == name {
  147. lastIdx := len(r.CertificateEvents) - 1
  148. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  149. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  150. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  151. return
  152. }
  153. }
  154. for idx := range r.Schedules {
  155. if r.Schedules[idx].Name == name {
  156. if schedules, ok := r.schedulesMapping[name]; ok {
  157. for _, entryID := range schedules {
  158. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  159. eventScheduler.Remove(entryID)
  160. }
  161. delete(r.schedulesMapping, name)
  162. }
  163. lastIdx := len(r.Schedules) - 1
  164. r.Schedules[idx] = r.Schedules[lastIdx]
  165. r.Schedules = r.Schedules[:lastIdx]
  166. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  167. return
  168. }
  169. }
  170. }
  171. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  172. r.removeRuleInternal(rule.Name)
  173. if rule.DeletedAt > 0 {
  174. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  175. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  176. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  177. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  178. }
  179. return
  180. }
  181. switch rule.Trigger {
  182. case dataprovider.EventTriggerFsEvent:
  183. r.FsEvents = append(r.FsEvents, rule)
  184. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  185. case dataprovider.EventTriggerProviderEvent:
  186. r.ProviderEvents = append(r.ProviderEvents, rule)
  187. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  188. case dataprovider.EventTriggerIPBlocked:
  189. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  190. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  191. case dataprovider.EventTriggerCertificate:
  192. r.CertificateEvents = append(r.CertificateEvents, rule)
  193. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  194. case dataprovider.EventTriggerSchedule:
  195. for _, schedule := range rule.Conditions.Schedules {
  196. cronSpec := schedule.GetCronSpec()
  197. job := &eventCronJob{
  198. ruleName: dataprovider.ConvertName(rule.Name),
  199. }
  200. entryID, err := eventScheduler.AddJob(cronSpec, job)
  201. if err != nil {
  202. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  203. return
  204. }
  205. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  206. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  207. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  208. }
  209. r.Schedules = append(r.Schedules, rule)
  210. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  211. default:
  212. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  213. }
  214. }
  215. func (r *eventRulesContainer) loadRules() {
  216. eventManagerLog(logger.LevelDebug, "loading updated rules")
  217. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  218. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  219. if err != nil {
  220. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  221. return
  222. }
  223. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  224. if len(rules) > 0 {
  225. r.Lock()
  226. defer r.Unlock()
  227. for _, rule := range rules {
  228. r.addUpdateRuleInternal(rule)
  229. }
  230. }
  231. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  232. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  233. r.setLastLoadTime(modTime)
  234. }
  235. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  236. if !util.Contains(conditions.ProviderEvents, params.Event) {
  237. return false
  238. }
  239. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  240. return false
  241. }
  242. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  243. return false
  244. }
  245. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  246. return false
  247. }
  248. return true
  249. }
  250. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  251. if !util.Contains(conditions.FsEvents, params.Event) {
  252. return false
  253. }
  254. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  255. return false
  256. }
  257. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  258. return false
  259. }
  260. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  261. return false
  262. }
  263. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  264. if !checkEventConditionPatterns(params.ObjectName, conditions.Options.FsPaths) {
  265. return false
  266. }
  267. }
  268. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  269. return false
  270. }
  271. if params.Event == operationUpload || params.Event == operationDownload {
  272. if conditions.Options.MinFileSize > 0 {
  273. if params.FileSize < conditions.Options.MinFileSize {
  274. return false
  275. }
  276. }
  277. if conditions.Options.MaxFileSize > 0 {
  278. if params.FileSize > conditions.Options.MaxFileSize {
  279. return false
  280. }
  281. }
  282. }
  283. return true
  284. }
  285. // hasFsRules returns true if there are any rules for filesystem event triggers
  286. func (r *eventRulesContainer) hasFsRules() bool {
  287. r.RLock()
  288. defer r.RUnlock()
  289. return len(r.FsEvents) > 0
  290. }
  291. // handleFsEvent executes the rules actions defined for the specified event
  292. func (r *eventRulesContainer) handleFsEvent(params EventParams) error {
  293. if params.Protocol == protocolEventAction {
  294. return nil
  295. }
  296. r.RLock()
  297. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  298. for _, rule := range r.FsEvents {
  299. if r.checkFsEventMatch(rule.Conditions, params) {
  300. if err := rule.CheckActionsConsistency(""); err != nil {
  301. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  302. rule.Name, err, params.Event)
  303. continue
  304. }
  305. hasSyncActions := false
  306. for _, action := range rule.Actions {
  307. if action.Options.ExecuteSync {
  308. hasSyncActions = true
  309. break
  310. }
  311. }
  312. if hasSyncActions {
  313. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  314. } else {
  315. rulesAsync = append(rulesAsync, rule)
  316. }
  317. }
  318. }
  319. r.RUnlock()
  320. params.sender = params.Name
  321. if len(rulesAsync) > 0 {
  322. go executeAsyncRulesActions(rulesAsync, params)
  323. }
  324. if len(rulesWithSyncActions) > 0 {
  325. return executeSyncRulesActions(rulesWithSyncActions, params)
  326. }
  327. return nil
  328. }
  329. // username is populated for user objects
  330. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  331. r.RLock()
  332. defer r.RUnlock()
  333. var rules []dataprovider.EventRule
  334. for _, rule := range r.ProviderEvents {
  335. if r.checkProviderEventMatch(rule.Conditions, params) {
  336. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  337. rules = append(rules, rule)
  338. } else {
  339. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  340. rule.Name, err, params.Event, params.ObjectType)
  341. }
  342. }
  343. }
  344. if len(rules) > 0 {
  345. params.sender = params.ObjectName
  346. go executeAsyncRulesActions(rules, params)
  347. }
  348. }
  349. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  350. r.RLock()
  351. defer r.RUnlock()
  352. if len(r.IPBlockedEvents) == 0 {
  353. return
  354. }
  355. var rules []dataprovider.EventRule
  356. for _, rule := range r.IPBlockedEvents {
  357. if err := rule.CheckActionsConsistency(""); err == nil {
  358. rules = append(rules, rule)
  359. } else {
  360. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  361. rule.Name, err, params.Event)
  362. }
  363. }
  364. if len(rules) > 0 {
  365. go executeAsyncRulesActions(rules, params)
  366. }
  367. }
  368. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  369. r.RLock()
  370. defer r.RUnlock()
  371. if len(r.CertificateEvents) == 0 {
  372. return
  373. }
  374. var rules []dataprovider.EventRule
  375. for _, rule := range r.CertificateEvents {
  376. if err := rule.CheckActionsConsistency(""); err == nil {
  377. rules = append(rules, rule)
  378. } else {
  379. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  380. rule.Name, err, params.Event)
  381. }
  382. }
  383. if len(rules) > 0 {
  384. go executeAsyncRulesActions(rules, params)
  385. }
  386. }
  387. type executedRetentionCheck struct {
  388. Username string
  389. ActionName string
  390. Results []folderRetentionCheckResult
  391. }
  392. // EventParams defines the supported event parameters
  393. type EventParams struct {
  394. Name string
  395. Groups []sdk.GroupMapping
  396. Event string
  397. Status int
  398. VirtualPath string
  399. FsPath string
  400. VirtualTargetPath string
  401. FsTargetPath string
  402. ObjectName string
  403. ObjectType string
  404. FileSize int64
  405. Protocol string
  406. IP string
  407. Role string
  408. Timestamp int64
  409. Object plugin.Renderer
  410. sender string
  411. updateStatusFromError bool
  412. errors []string
  413. retentionChecks []executedRetentionCheck
  414. }
  415. func (p *EventParams) getACopy() *EventParams {
  416. params := *p
  417. params.errors = make([]string, len(p.errors))
  418. copy(params.errors, p.errors)
  419. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  420. for _, c := range p.retentionChecks {
  421. executedCheck := executedRetentionCheck{
  422. Username: c.Username,
  423. ActionName: c.ActionName,
  424. }
  425. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  426. copy(executedCheck.Results, c.Results)
  427. retentionChecks = append(retentionChecks, executedCheck)
  428. }
  429. params.retentionChecks = retentionChecks
  430. return &params
  431. }
  432. // AddError adds a new error to the event params and update the status if needed
  433. func (p *EventParams) AddError(err error) {
  434. if err == nil {
  435. return
  436. }
  437. if p.updateStatusFromError && p.Status == 1 {
  438. p.Status = 2
  439. }
  440. p.errors = append(p.errors, err.Error())
  441. }
  442. func (p *EventParams) setBackupParams(backupPath string) {
  443. if p.sender != "" {
  444. return
  445. }
  446. p.sender = dataprovider.ActionExecutorSystem
  447. p.FsPath = backupPath
  448. p.ObjectName = filepath.Base(backupPath)
  449. p.VirtualPath = "/" + p.ObjectName
  450. p.Timestamp = time.Now().UnixNano()
  451. info, err := os.Stat(backupPath)
  452. if err == nil {
  453. p.FileSize = info.Size()
  454. }
  455. }
  456. func (p *EventParams) getStatusString() string {
  457. switch p.Status {
  458. case 1:
  459. return "OK"
  460. default:
  461. return "KO"
  462. }
  463. }
  464. // getUsers returns users with group settings not applied
  465. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  466. if p.sender == "" {
  467. users, err := dataprovider.DumpUsers()
  468. if err != nil {
  469. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  470. return users, errors.New("unable to get users")
  471. }
  472. return users, nil
  473. }
  474. user, err := p.getUserFromSender()
  475. if err != nil {
  476. return nil, err
  477. }
  478. return []dataprovider.User{user}, nil
  479. }
  480. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  481. if p.sender == dataprovider.ActionExecutorSystem {
  482. return dataprovider.User{
  483. BaseUser: sdk.BaseUser{
  484. Status: 1,
  485. Username: p.sender,
  486. HomeDir: dataprovider.GetBackupsPath(),
  487. Permissions: map[string][]string{
  488. "/": {dataprovider.PermAny},
  489. },
  490. },
  491. }, nil
  492. }
  493. user, err := dataprovider.UserExists(p.sender, "")
  494. if err != nil {
  495. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  496. return user, fmt.Errorf("error getting user %q", p.sender)
  497. }
  498. return user, nil
  499. }
  500. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  501. if p.sender == "" {
  502. return dataprovider.DumpFolders()
  503. }
  504. folder, err := dataprovider.GetFolderByName(p.sender)
  505. if err != nil {
  506. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  507. }
  508. return []vfs.BaseVirtualFolder{folder}, nil
  509. }
  510. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  511. if len(p.retentionChecks) == 0 {
  512. return nil, errors.New("no data retention report available")
  513. }
  514. var b bytes.Buffer
  515. wr := zip.NewWriter(&b)
  516. for _, check := range p.retentionChecks {
  517. if size := int64(len(b.Bytes())); size > maxAttachmentsSize {
  518. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  519. return nil, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  520. }
  521. data, err := getCSVRetentionReport(check.Results)
  522. if err != nil {
  523. return nil, fmt.Errorf("unable to get CSV report: %w", err)
  524. }
  525. fh := &zip.FileHeader{
  526. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  527. Method: zip.Deflate,
  528. Modified: time.Now().UTC(),
  529. }
  530. f, err := wr.CreateHeader(fh)
  531. if err != nil {
  532. return nil, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  533. }
  534. _, err = io.Copy(f, bytes.NewBuffer(data))
  535. if err != nil {
  536. return nil, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  537. }
  538. }
  539. if err := wr.Close(); err != nil {
  540. return nil, fmt.Errorf("unable to close zip writer: %w", err)
  541. }
  542. return b.Bytes(), nil
  543. }
  544. func (p *EventParams) getRetentionReportsAsMailAttachment() (mail.File, error) {
  545. var result mail.File
  546. data, err := p.getCompressedDataRetentionReport()
  547. if err != nil {
  548. return result, err
  549. }
  550. result.Name = "retention-reports.zip"
  551. result.Data = data
  552. return result, nil
  553. }
  554. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  555. replacements := []string{
  556. "{{Name}}", p.Name,
  557. "{{Event}}", p.Event,
  558. "{{Status}}", fmt.Sprintf("%d", p.Status),
  559. "{{VirtualPath}}", p.VirtualPath,
  560. "{{FsPath}}", p.FsPath,
  561. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  562. "{{FsTargetPath}}", p.FsTargetPath,
  563. "{{ObjectName}}", p.ObjectName,
  564. "{{ObjectType}}", p.ObjectType,
  565. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  566. "{{Protocol}}", p.Protocol,
  567. "{{IP}}", p.IP,
  568. "{{Role}}", p.Role,
  569. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  570. "{{StatusString}}", p.getStatusString(),
  571. }
  572. if p.VirtualPath != "" {
  573. replacements = append(replacements, "{{VirtualDirPath}}", path.Dir(p.VirtualPath))
  574. }
  575. if p.VirtualTargetPath != "" {
  576. replacements = append(replacements, "{{VirtualTargetDirPath}}", path.Dir(p.VirtualTargetPath))
  577. replacements = append(replacements, "{{TargetName}}", path.Base(p.VirtualTargetPath))
  578. }
  579. if len(p.errors) > 0 {
  580. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  581. } else {
  582. replacements = append(replacements, "{{ErrorString}}", "")
  583. }
  584. replacements = append(replacements, "{{ObjectData}}", "")
  585. if addObjectData {
  586. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  587. if err == nil {
  588. replacements[len(replacements)-1] = string(data)
  589. }
  590. }
  591. return replacements
  592. }
  593. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  594. var b bytes.Buffer
  595. csvWriter := csv.NewWriter(&b)
  596. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  597. "elapsed (ms)", "info", "error"})
  598. if err != nil {
  599. return nil, err
  600. }
  601. for _, result := range results {
  602. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  603. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  604. result.Info, result.Error})
  605. if err != nil {
  606. return nil, err
  607. }
  608. }
  609. csvWriter.Flush()
  610. err = csvWriter.Error()
  611. return b.Bytes(), err
  612. }
  613. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualPath string, numFiles int,
  614. truncatedSize int64, errTransfer error,
  615. ) error {
  616. errWrite := w.Close()
  617. info, err := conn.doStatInternal(virtualPath, 0, false, false)
  618. if err == nil {
  619. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, info.Size()-truncatedSize)
  620. _, fsPath, errFs := conn.GetFsAndResolvedPath(virtualPath)
  621. if errFs == nil {
  622. if errTransfer == nil {
  623. errTransfer = errWrite
  624. }
  625. ExecuteActionNotification(conn, operationUpload, fsPath, virtualPath, "", "", "", info.Size(), errTransfer) //nolint:errcheck
  626. }
  627. } else {
  628. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", virtualPath, err)
  629. }
  630. return errWrite
  631. }
  632. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  633. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  634. if err != nil {
  635. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  636. return
  637. }
  638. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  639. if vfolder.IsIncludedInUserQuota() {
  640. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  641. }
  642. }
  643. func getFileWriter(conn *BaseConnection, virtualPath string) (io.WriteCloser, int, int64, func(), error) {
  644. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  645. if err != nil {
  646. return nil, 0, 0, nil, err
  647. }
  648. var truncatedSize, fileSize int64
  649. numFiles := 1
  650. isFileOverwrite := false
  651. info, err := fs.Lstat(fsPath)
  652. if err == nil {
  653. fileSize = info.Size()
  654. if info.IsDir() {
  655. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  656. }
  657. if info.Mode().IsRegular() {
  658. isFileOverwrite = true
  659. truncatedSize = fileSize
  660. }
  661. numFiles = 0
  662. }
  663. if err != nil && !fs.IsNotExist(err) {
  664. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  665. }
  666. f, w, cancelFn, err := fs.Create(fsPath, 0)
  667. if err != nil {
  668. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  669. }
  670. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  671. if isFileOverwrite {
  672. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  673. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  674. truncatedSize = 0
  675. }
  676. }
  677. if cancelFn == nil {
  678. cancelFn = func() {}
  679. }
  680. if f != nil {
  681. return f, numFiles, truncatedSize, cancelFn, nil
  682. }
  683. return w, numFiles, truncatedSize, cancelFn, nil
  684. }
  685. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  686. if entryPath == wr.Name {
  687. // skip the archive itself
  688. return nil
  689. }
  690. info, err := conn.DoStat(entryPath, 1, false)
  691. if err != nil {
  692. eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
  693. return err
  694. }
  695. entryName, err := getZipEntryName(entryPath, baseDir)
  696. if err != nil {
  697. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  698. return err
  699. }
  700. if _, ok := wr.Entries[entryName]; ok {
  701. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  702. return nil
  703. }
  704. wr.Entries[entryName] = true
  705. if info.IsDir() {
  706. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  707. Name: entryName + "/",
  708. Method: zip.Deflate,
  709. Modified: info.ModTime(),
  710. })
  711. if err != nil {
  712. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  713. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  714. }
  715. contents, err := conn.ListDir(entryPath)
  716. if err != nil {
  717. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  718. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  719. }
  720. for _, info := range contents {
  721. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  722. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  723. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  724. return err
  725. }
  726. }
  727. return nil
  728. }
  729. if !info.Mode().IsRegular() {
  730. // we only allow regular files
  731. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  732. return nil
  733. }
  734. reader, cancelFn, err := getFileReader(conn, entryPath)
  735. if err != nil {
  736. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  737. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  738. }
  739. defer cancelFn()
  740. defer reader.Close()
  741. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  742. Name: entryName,
  743. Method: zip.Deflate,
  744. Modified: info.ModTime(),
  745. })
  746. if err != nil {
  747. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  748. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  749. }
  750. _, err = io.Copy(f, reader)
  751. return err
  752. }
  753. func getZipEntryName(entryPath, baseDir string) (string, error) {
  754. if !strings.HasPrefix(entryPath, baseDir) {
  755. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  756. }
  757. entryPath = strings.TrimPrefix(entryPath, baseDir)
  758. return strings.TrimPrefix(entryPath, "/"), nil
  759. }
  760. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  761. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  762. if err != nil {
  763. return nil, nil, err
  764. }
  765. f, r, cancelFn, err := fs.Open(fsPath, 0)
  766. if err != nil {
  767. return nil, nil, conn.GetFsError(fs, err)
  768. }
  769. if cancelFn == nil {
  770. cancelFn = func() {}
  771. }
  772. if f != nil {
  773. return f, cancelFn, nil
  774. }
  775. return r, cancelFn, nil
  776. }
  777. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  778. reader, cancelFn, err := getFileReader(conn, virtualPath)
  779. if err != nil {
  780. return err
  781. }
  782. defer cancelFn()
  783. defer reader.Close()
  784. _, err = io.Copy(w, reader)
  785. return err
  786. }
  787. func getFileContent(conn *BaseConnection, virtualPath string, expectedSize int) ([]byte, error) {
  788. reader, cancelFn, err := getFileReader(conn, virtualPath)
  789. if err != nil {
  790. return nil, err
  791. }
  792. defer cancelFn()
  793. defer reader.Close()
  794. data := make([]byte, expectedSize)
  795. _, err = io.ReadFull(reader, data)
  796. return data, err
  797. }
  798. func getMailAttachments(user dataprovider.User, attachments []string, replacer *strings.Replacer) ([]mail.File, error) {
  799. var files []mail.File
  800. user, err := getUserForEventAction(user)
  801. if err != nil {
  802. return nil, err
  803. }
  804. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  805. err = user.CheckFsRoot(connectionID)
  806. defer user.CloseFs() //nolint:errcheck
  807. if err != nil {
  808. return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  809. }
  810. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  811. totalSize := int64(0)
  812. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  813. info, err := conn.DoStat(virtualPath, 0, false)
  814. if err != nil {
  815. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  816. }
  817. if !info.Mode().IsRegular() {
  818. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  819. }
  820. totalSize += info.Size()
  821. if totalSize > maxAttachmentsSize {
  822. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  823. }
  824. data, err := getFileContent(conn, virtualPath, int(info.Size()))
  825. if err != nil {
  826. return nil, fmt.Errorf("unable to get content for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  827. }
  828. files = append(files, mail.File{
  829. Name: path.Base(virtualPath),
  830. Data: data,
  831. })
  832. }
  833. return files, nil
  834. }
  835. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  836. if !strings.Contains(input, "{{") {
  837. return input
  838. }
  839. return replacer.Replace(input)
  840. }
  841. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  842. matched, err := path.Match(p.Pattern, name)
  843. if err != nil {
  844. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  845. return false
  846. }
  847. if p.InverseMatch {
  848. return !matched
  849. }
  850. return matched
  851. }
  852. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  853. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  854. return false
  855. }
  856. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  857. return false
  858. }
  859. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  860. return false
  861. }
  862. return true
  863. }
  864. // checkConditionPatterns returns false if patterns are defined and no match is found
  865. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  866. if len(patterns) == 0 {
  867. return true
  868. }
  869. for _, p := range patterns {
  870. if checkEventConditionPattern(p, name) {
  871. return true
  872. }
  873. }
  874. return false
  875. }
  876. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  877. if len(patterns) == 0 {
  878. return true
  879. }
  880. for _, group := range groups {
  881. for _, p := range patterns {
  882. if checkEventConditionPattern(p, group.Name) {
  883. return true
  884. }
  885. }
  886. }
  887. return false
  888. }
  889. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  890. if len(c.QueryParameters) > 0 {
  891. u, err := url.Parse(c.Endpoint)
  892. if err != nil {
  893. return "", fmt.Errorf("invalid endpoint: %w", err)
  894. }
  895. q := u.Query()
  896. for _, keyVal := range c.QueryParameters {
  897. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  898. }
  899. u.RawQuery = q.Encode()
  900. return u.String(), nil
  901. }
  902. return c.Endpoint, nil
  903. }
  904. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  905. conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
  906. ) error {
  907. partWriter, err := m.CreatePart(h)
  908. if err != nil {
  909. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  910. return err
  911. }
  912. if part.Body != "" {
  913. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  914. if err != nil {
  915. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  916. return err
  917. }
  918. return nil
  919. }
  920. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  921. data, err := params.getCompressedDataRetentionReport()
  922. if err != nil {
  923. return err
  924. }
  925. _, err = partWriter.Write(data)
  926. if err != nil {
  927. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  928. return err
  929. }
  930. return nil
  931. }
  932. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  933. if err != nil {
  934. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  935. return err
  936. }
  937. return nil
  938. }
  939. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  940. cancel context.CancelFunc, user dataprovider.User, params *EventParams,
  941. ) (io.ReadCloser, string, error) {
  942. var body io.ReadCloser
  943. if c.Method == http.MethodGet {
  944. return body, "", nil
  945. }
  946. if c.Body != "" {
  947. if c.Body == dataprovider.RetentionReportPlaceHolder {
  948. data, err := params.getCompressedDataRetentionReport()
  949. if err != nil {
  950. return body, "", err
  951. }
  952. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  953. }
  954. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  955. }
  956. if len(c.Parts) > 0 {
  957. r, w := io.Pipe()
  958. m := multipart.NewWriter(w)
  959. var conn *BaseConnection
  960. if user.Username != "" {
  961. var err error
  962. user, err = getUserForEventAction(user)
  963. if err != nil {
  964. return body, "", err
  965. }
  966. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  967. err = user.CheckFsRoot(connectionID)
  968. if err != nil {
  969. user.CloseFs() //nolint:errcheck
  970. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  971. user.Username, err)
  972. }
  973. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  974. }
  975. go func() {
  976. defer w.Close()
  977. defer user.CloseFs() //nolint:errcheck
  978. for _, part := range c.Parts {
  979. h := make(textproto.MIMEHeader)
  980. if part.Body != "" {
  981. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  982. } else {
  983. h.Set("Content-Disposition",
  984. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  985. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  986. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  987. if contentType == "" {
  988. contentType = "application/octet-stream"
  989. }
  990. h.Set("Content-Type", contentType)
  991. }
  992. for _, keyVal := range part.Headers {
  993. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  994. }
  995. if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
  996. cancel()
  997. return
  998. }
  999. }
  1000. m.Close()
  1001. }()
  1002. return r, m.FormDataContentType(), nil
  1003. }
  1004. return body, "", nil
  1005. }
  1006. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1007. if err := c.TryDecryptPassword(); err != nil {
  1008. return err
  1009. }
  1010. addObjectData := false
  1011. if params.Object != nil {
  1012. addObjectData = c.HasObjectData()
  1013. }
  1014. replacements := params.getStringReplacements(addObjectData)
  1015. replacer := strings.NewReplacer(replacements...)
  1016. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  1017. if err != nil {
  1018. return err
  1019. }
  1020. ctx, cancel := c.GetContext()
  1021. defer cancel()
  1022. var user dataprovider.User
  1023. if c.HasMultipartFiles() {
  1024. user, err = params.getUserFromSender()
  1025. if err != nil {
  1026. return err
  1027. }
  1028. }
  1029. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
  1030. if err != nil {
  1031. return err
  1032. }
  1033. if body != nil {
  1034. defer body.Close()
  1035. }
  1036. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1037. if err != nil {
  1038. return err
  1039. }
  1040. if contentType != "" {
  1041. req.Header.Set("Content-Type", contentType)
  1042. }
  1043. if c.Username != "" {
  1044. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1045. }
  1046. for _, keyVal := range c.Headers {
  1047. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1048. }
  1049. client := c.GetHTTPClient()
  1050. defer client.CloseIdleConnections()
  1051. startTime := time.Now()
  1052. resp, err := client.Do(req)
  1053. if err != nil {
  1054. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1055. endpoint, time.Since(startTime), err)
  1056. return fmt.Errorf("error sending HTTP request: %w", err)
  1057. }
  1058. defer resp.Body.Close()
  1059. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1060. endpoint, time.Since(startTime), resp.StatusCode)
  1061. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1062. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1063. }
  1064. return nil
  1065. }
  1066. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1067. addObjectData := false
  1068. if params.Object != nil {
  1069. for _, k := range c.EnvVars {
  1070. if strings.Contains(k.Value, "{{ObjectData}}") {
  1071. addObjectData = true
  1072. break
  1073. }
  1074. }
  1075. }
  1076. replacements := params.getStringReplacements(addObjectData)
  1077. replacer := strings.NewReplacer(replacements...)
  1078. args := make([]string, 0, len(c.Args))
  1079. for _, arg := range c.Args {
  1080. args = append(args, replaceWithReplacer(arg, replacer))
  1081. }
  1082. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1083. defer cancel()
  1084. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1085. cmd.Env = []string{}
  1086. for _, keyVal := range c.EnvVars {
  1087. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1088. }
  1089. startTime := time.Now()
  1090. err := cmd.Run()
  1091. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1092. c.Cmd, time.Since(startTime), err)
  1093. return err
  1094. }
  1095. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1096. addObjectData := false
  1097. if params.Object != nil {
  1098. if strings.Contains(c.Body, "{{ObjectData}}") {
  1099. addObjectData = true
  1100. }
  1101. }
  1102. replacements := params.getStringReplacements(addObjectData)
  1103. replacer := strings.NewReplacer(replacements...)
  1104. body := replaceWithReplacer(c.Body, replacer)
  1105. subject := replaceWithReplacer(c.Subject, replacer)
  1106. startTime := time.Now()
  1107. var files []mail.File
  1108. fileAttachments := make([]string, 0, len(c.Attachments))
  1109. for _, attachment := range c.Attachments {
  1110. if attachment == dataprovider.RetentionReportPlaceHolder {
  1111. f, err := params.getRetentionReportsAsMailAttachment()
  1112. if err != nil {
  1113. return err
  1114. }
  1115. files = append(files, f)
  1116. continue
  1117. }
  1118. fileAttachments = append(fileAttachments, attachment)
  1119. }
  1120. if len(fileAttachments) > 0 {
  1121. user, err := params.getUserFromSender()
  1122. if err != nil {
  1123. return err
  1124. }
  1125. res, err := getMailAttachments(user, fileAttachments, replacer)
  1126. if err != nil {
  1127. return err
  1128. }
  1129. files = append(files, res...)
  1130. }
  1131. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1132. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1133. time.Since(startTime), err)
  1134. if err != nil {
  1135. return fmt.Errorf("unable to send email: %w", err)
  1136. }
  1137. return nil
  1138. }
  1139. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1140. err := user.LoadAndApplyGroupSettings()
  1141. if err != nil {
  1142. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1143. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1144. }
  1145. user.UploadDataTransfer = 0
  1146. user.UploadBandwidth = 0
  1147. user.DownloadBandwidth = 0
  1148. user.Filters.DisableFsChecks = false
  1149. user.Filters.FilePatterns = nil
  1150. user.Filters.BandwidthLimits = nil
  1151. user.Filters.DataTransferLimits = nil
  1152. for k := range user.Permissions {
  1153. user.Permissions[k] = []string{dataprovider.PermAny}
  1154. }
  1155. return user, nil
  1156. }
  1157. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1158. for idx := range paths {
  1159. paths[idx] = util.CleanPath(replaceWithReplacer(paths[idx], replacer))
  1160. }
  1161. return util.RemoveDuplicates(paths, false)
  1162. }
  1163. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1164. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1165. if err != nil {
  1166. return err
  1167. }
  1168. return conn.RemoveFile(fs, fsPath, item, info)
  1169. }
  1170. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1171. user, err := getUserForEventAction(user)
  1172. if err != nil {
  1173. return err
  1174. }
  1175. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1176. err = user.CheckFsRoot(connectionID)
  1177. defer user.CloseFs() //nolint:errcheck
  1178. if err != nil {
  1179. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1180. }
  1181. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1182. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1183. info, err := conn.DoStat(item, 0, false)
  1184. if err != nil {
  1185. if conn.IsNotExistError(err) {
  1186. continue
  1187. }
  1188. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1189. }
  1190. if info.IsDir() {
  1191. if err = conn.RemoveDir(item); err != nil {
  1192. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1193. }
  1194. } else {
  1195. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1196. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1197. }
  1198. }
  1199. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1200. }
  1201. return nil
  1202. }
  1203. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1204. conditions dataprovider.ConditionOptions, params *EventParams,
  1205. ) error {
  1206. users, err := params.getUsers()
  1207. if err != nil {
  1208. return fmt.Errorf("unable to get users: %w", err)
  1209. }
  1210. var failures []string
  1211. executed := 0
  1212. for _, user := range users {
  1213. // if sender is set, the conditions have already been evaluated
  1214. if params.sender == "" {
  1215. if !checkUserConditionOptions(&user, &conditions) {
  1216. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1217. user.Username)
  1218. continue
  1219. }
  1220. }
  1221. executed++
  1222. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1223. params.AddError(err)
  1224. failures = append(failures, user.Username)
  1225. continue
  1226. }
  1227. }
  1228. if len(failures) > 0 {
  1229. return fmt.Errorf("fs delete failed for users: %+v", failures)
  1230. }
  1231. if executed == 0 {
  1232. eventManagerLog(logger.LevelError, "no delete executed")
  1233. return errors.New("no delete executed")
  1234. }
  1235. return nil
  1236. }
  1237. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1238. user, err := getUserForEventAction(user)
  1239. if err != nil {
  1240. return err
  1241. }
  1242. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1243. err = user.CheckFsRoot(connectionID)
  1244. defer user.CloseFs() //nolint:errcheck
  1245. if err != nil {
  1246. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1247. }
  1248. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1249. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1250. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1251. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1252. }
  1253. if err = conn.createDirIfMissing(item); err != nil {
  1254. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1255. }
  1256. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1257. }
  1258. return nil
  1259. }
  1260. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1261. conditions dataprovider.ConditionOptions, params *EventParams,
  1262. ) error {
  1263. users, err := params.getUsers()
  1264. if err != nil {
  1265. return fmt.Errorf("unable to get users: %w", err)
  1266. }
  1267. var failures []string
  1268. executed := 0
  1269. for _, user := range users {
  1270. // if sender is set, the conditions have already been evaluated
  1271. if params.sender == "" {
  1272. if !checkUserConditionOptions(&user, &conditions) {
  1273. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1274. user.Username)
  1275. continue
  1276. }
  1277. }
  1278. executed++
  1279. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1280. failures = append(failures, user.Username)
  1281. continue
  1282. }
  1283. }
  1284. if len(failures) > 0 {
  1285. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  1286. }
  1287. if executed == 0 {
  1288. eventManagerLog(logger.LevelError, "no mkdir executed")
  1289. return errors.New("no mkdir executed")
  1290. }
  1291. return nil
  1292. }
  1293. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1294. user dataprovider.User,
  1295. ) error {
  1296. user, err := getUserForEventAction(user)
  1297. if err != nil {
  1298. return err
  1299. }
  1300. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1301. err = user.CheckFsRoot(connectionID)
  1302. defer user.CloseFs() //nolint:errcheck
  1303. if err != nil {
  1304. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1305. }
  1306. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1307. for _, item := range renames {
  1308. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1309. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1310. if err = conn.Rename(source, target); err != nil {
  1311. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1312. }
  1313. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1314. }
  1315. return nil
  1316. }
  1317. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1318. user dataprovider.User,
  1319. ) error {
  1320. user, err := getUserForEventAction(user)
  1321. if err != nil {
  1322. return err
  1323. }
  1324. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1325. err = user.CheckFsRoot(connectionID)
  1326. defer user.CloseFs() //nolint:errcheck
  1327. if err != nil {
  1328. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1329. }
  1330. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1331. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1332. if _, err = conn.DoStat(item, 0, false); err != nil {
  1333. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1334. }
  1335. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1336. }
  1337. return nil
  1338. }
  1339. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1340. conditions dataprovider.ConditionOptions, params *EventParams,
  1341. ) error {
  1342. users, err := params.getUsers()
  1343. if err != nil {
  1344. return fmt.Errorf("unable to get users: %w", err)
  1345. }
  1346. var failures []string
  1347. executed := 0
  1348. for _, user := range users {
  1349. // if sender is set, the conditions have already been evaluated
  1350. if params.sender == "" {
  1351. if !checkUserConditionOptions(&user, &conditions) {
  1352. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1353. user.Username)
  1354. continue
  1355. }
  1356. }
  1357. executed++
  1358. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1359. failures = append(failures, user.Username)
  1360. params.AddError(err)
  1361. continue
  1362. }
  1363. }
  1364. if len(failures) > 0 {
  1365. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1366. }
  1367. if executed == 0 {
  1368. eventManagerLog(logger.LevelError, "no rename executed")
  1369. return errors.New("no rename executed")
  1370. }
  1371. return nil
  1372. }
  1373. func getArchiveBaseDir(paths []string) string {
  1374. var parentDirs []string
  1375. for _, p := range paths {
  1376. parentDirs = append(parentDirs, path.Dir(p))
  1377. }
  1378. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1379. baseDir := "/"
  1380. if len(parentDirs) == 1 {
  1381. baseDir = parentDirs[0]
  1382. }
  1383. return baseDir
  1384. }
  1385. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1386. user dataprovider.User,
  1387. ) error {
  1388. user, err := getUserForEventAction(user)
  1389. if err != nil {
  1390. return err
  1391. }
  1392. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1393. err = user.CheckFsRoot(connectionID)
  1394. defer user.CloseFs() //nolint:errcheck
  1395. if err != nil {
  1396. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1397. }
  1398. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1399. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1400. paths := make([]string, 0, len(c.Paths))
  1401. for idx := range c.Paths {
  1402. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1403. if p == name {
  1404. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1405. }
  1406. paths = append(paths, p)
  1407. }
  1408. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name)
  1409. if err != nil {
  1410. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1411. return fmt.Errorf("unable to create archive: %w", err)
  1412. }
  1413. defer cancelFn()
  1414. paths = util.RemoveDuplicates(paths, false)
  1415. baseDir := getArchiveBaseDir(paths)
  1416. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1417. zipWriter := &zipWriterWrapper{
  1418. Name: name,
  1419. Writer: zip.NewWriter(writer),
  1420. Entries: make(map[string]bool),
  1421. }
  1422. for _, item := range paths {
  1423. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1424. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1425. return err
  1426. }
  1427. }
  1428. if err := zipWriter.Writer.Close(); err != nil {
  1429. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1430. closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
  1431. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1432. }
  1433. return closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err)
  1434. }
  1435. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1436. params *EventParams,
  1437. ) error {
  1438. users, err := params.getUsers()
  1439. if err != nil {
  1440. return fmt.Errorf("unable to get users: %w", err)
  1441. }
  1442. var failures []string
  1443. executed := 0
  1444. for _, user := range users {
  1445. // if sender is set, the conditions have already been evaluated
  1446. if params.sender == "" {
  1447. if !checkUserConditionOptions(&user, &conditions) {
  1448. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1449. user.Username)
  1450. continue
  1451. }
  1452. }
  1453. executed++
  1454. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1455. failures = append(failures, user.Username)
  1456. params.AddError(err)
  1457. continue
  1458. }
  1459. }
  1460. if len(failures) > 0 {
  1461. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1462. }
  1463. if executed == 0 {
  1464. eventManagerLog(logger.LevelError, "no existence check executed")
  1465. return errors.New("no existence check executed")
  1466. }
  1467. return nil
  1468. }
  1469. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1470. conditions dataprovider.ConditionOptions, params *EventParams,
  1471. ) error {
  1472. users, err := params.getUsers()
  1473. if err != nil {
  1474. return fmt.Errorf("unable to get users: %w", err)
  1475. }
  1476. var failures []string
  1477. executed := 0
  1478. for _, user := range users {
  1479. // if sender is set, the conditions have already been evaluated
  1480. if params.sender == "" {
  1481. if !checkUserConditionOptions(&user, &conditions) {
  1482. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1483. user.Username)
  1484. continue
  1485. }
  1486. }
  1487. executed++
  1488. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1489. failures = append(failures, user.Username)
  1490. params.AddError(err)
  1491. continue
  1492. }
  1493. }
  1494. if len(failures) > 0 {
  1495. return fmt.Errorf("fs compress failed for users: %+v", failures)
  1496. }
  1497. if executed == 0 {
  1498. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1499. return errors.New("no file/folder compressed")
  1500. }
  1501. return nil
  1502. }
  1503. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1504. params *EventParams,
  1505. ) error {
  1506. addObjectData := false
  1507. replacements := params.getStringReplacements(addObjectData)
  1508. replacer := strings.NewReplacer(replacements...)
  1509. switch c.Type {
  1510. case dataprovider.FilesystemActionRename:
  1511. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1512. case dataprovider.FilesystemActionDelete:
  1513. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1514. case dataprovider.FilesystemActionMkdirs:
  1515. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1516. case dataprovider.FilesystemActionExist:
  1517. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1518. case dataprovider.FilesystemActionCompress:
  1519. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1520. default:
  1521. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1522. }
  1523. }
  1524. func executeQuotaResetForUser(user *dataprovider.User) error {
  1525. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1526. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1527. user.Username, err)
  1528. return err
  1529. }
  1530. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1531. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1532. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1533. }
  1534. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1535. numFiles, size, err := user.ScanQuota()
  1536. if err != nil {
  1537. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1538. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1539. }
  1540. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1541. if err != nil {
  1542. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1543. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1544. }
  1545. return nil
  1546. }
  1547. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1548. users, err := params.getUsers()
  1549. if err != nil {
  1550. return fmt.Errorf("unable to get users: %w", err)
  1551. }
  1552. var failedResets []string
  1553. executed := 0
  1554. for _, user := range users {
  1555. // if sender is set, the conditions have already been evaluated
  1556. if params.sender == "" {
  1557. if !checkUserConditionOptions(&user, &conditions) {
  1558. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1559. user.Username)
  1560. continue
  1561. }
  1562. }
  1563. executed++
  1564. if err = executeQuotaResetForUser(&user); err != nil {
  1565. params.AddError(err)
  1566. failedResets = append(failedResets, user.Username)
  1567. continue
  1568. }
  1569. }
  1570. if len(failedResets) > 0 {
  1571. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1572. }
  1573. if executed == 0 {
  1574. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1575. return errors.New("no user quota reset executed")
  1576. }
  1577. return nil
  1578. }
  1579. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1580. folders, err := params.getFolders()
  1581. if err != nil {
  1582. return fmt.Errorf("unable to get folders: %w", err)
  1583. }
  1584. var failedResets []string
  1585. executed := 0
  1586. for _, folder := range folders {
  1587. // if sender is set, the conditions have already been evaluated
  1588. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1589. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1590. folder.Name)
  1591. continue
  1592. }
  1593. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1594. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1595. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1596. failedResets = append(failedResets, folder.Name)
  1597. continue
  1598. }
  1599. executed++
  1600. f := vfs.VirtualFolder{
  1601. BaseVirtualFolder: folder,
  1602. VirtualPath: "/",
  1603. }
  1604. numFiles, size, err := f.ScanQuota()
  1605. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1606. if err != nil {
  1607. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1608. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1609. failedResets = append(failedResets, folder.Name)
  1610. continue
  1611. }
  1612. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1613. if err != nil {
  1614. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1615. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1616. failedResets = append(failedResets, folder.Name)
  1617. }
  1618. }
  1619. if len(failedResets) > 0 {
  1620. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1621. }
  1622. if executed == 0 {
  1623. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1624. return errors.New("no folder quota reset executed")
  1625. }
  1626. return nil
  1627. }
  1628. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1629. users, err := params.getUsers()
  1630. if err != nil {
  1631. return fmt.Errorf("unable to get users: %w", err)
  1632. }
  1633. var failedResets []string
  1634. executed := 0
  1635. for _, user := range users {
  1636. // if sender is set, the conditions have already been evaluated
  1637. if params.sender == "" {
  1638. if !checkUserConditionOptions(&user, &conditions) {
  1639. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  1640. user.Username)
  1641. continue
  1642. }
  1643. }
  1644. executed++
  1645. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1646. if err != nil {
  1647. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1648. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1649. failedResets = append(failedResets, user.Username)
  1650. }
  1651. }
  1652. if len(failedResets) > 0 {
  1653. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1654. }
  1655. if executed == 0 {
  1656. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1657. return errors.New("no transfer quota reset executed")
  1658. }
  1659. return nil
  1660. }
  1661. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1662. params *EventParams, actionName string,
  1663. ) error {
  1664. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1665. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1666. user.Username, err)
  1667. return err
  1668. }
  1669. check := RetentionCheck{
  1670. Folders: folders,
  1671. }
  1672. c := RetentionChecks.Add(check, &user)
  1673. if c == nil {
  1674. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1675. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1676. }
  1677. defer func() {
  1678. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  1679. Username: user.Username,
  1680. ActionName: actionName,
  1681. Results: c.results,
  1682. })
  1683. }()
  1684. if err := c.Start(); err != nil {
  1685. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1686. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1687. }
  1688. return nil
  1689. }
  1690. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1691. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  1692. ) error {
  1693. users, err := params.getUsers()
  1694. if err != nil {
  1695. return fmt.Errorf("unable to get users: %w", err)
  1696. }
  1697. var failedChecks []string
  1698. executed := 0
  1699. for _, user := range users {
  1700. // if sender is set, the conditions have already been evaluated
  1701. if params.sender == "" {
  1702. if !checkUserConditionOptions(&user, &conditions) {
  1703. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  1704. user.Username)
  1705. continue
  1706. }
  1707. }
  1708. executed++
  1709. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  1710. failedChecks = append(failedChecks, user.Username)
  1711. params.AddError(err)
  1712. continue
  1713. }
  1714. }
  1715. if len(failedChecks) > 0 {
  1716. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1717. }
  1718. if executed == 0 {
  1719. eventManagerLog(logger.LevelError, "no retention check executed")
  1720. return errors.New("no retention check executed")
  1721. }
  1722. return nil
  1723. }
  1724. func executeMetadataCheckForUser(user *dataprovider.User) error {
  1725. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1726. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1727. user.Username, err)
  1728. return err
  1729. }
  1730. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  1731. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  1732. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  1733. }
  1734. defer ActiveMetadataChecks.Remove(user.Username)
  1735. if err := user.CheckMetadataConsistency(); err != nil {
  1736. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  1737. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  1738. }
  1739. return nil
  1740. }
  1741. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1742. users, err := params.getUsers()
  1743. if err != nil {
  1744. return fmt.Errorf("unable to get users: %w", err)
  1745. }
  1746. var failures []string
  1747. var executed int
  1748. for _, user := range users {
  1749. // if sender is set, the conditions have already been evaluated
  1750. if params.sender == "" {
  1751. if !checkUserConditionOptions(&user, &conditions) {
  1752. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  1753. user.Username)
  1754. continue
  1755. }
  1756. }
  1757. executed++
  1758. if err = executeMetadataCheckForUser(&user); err != nil {
  1759. params.AddError(err)
  1760. failures = append(failures, user.Username)
  1761. continue
  1762. }
  1763. }
  1764. if len(failures) > 0 {
  1765. return fmt.Errorf("metadata check failed for users: %+v", failures)
  1766. }
  1767. if executed == 0 {
  1768. eventManagerLog(logger.LevelError, "no metadata check executed")
  1769. return errors.New("no metadata check executed")
  1770. }
  1771. return nil
  1772. }
  1773. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  1774. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1775. eventManagerLog(logger.LevelError, "skipping password expiration check for user %s, cannot apply group settings: %v",
  1776. user.Username, err)
  1777. return err
  1778. }
  1779. if user.Filters.PasswordExpiration == 0 {
  1780. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  1781. return nil
  1782. }
  1783. days := user.PasswordExpiresIn()
  1784. if days > config.Threshold {
  1785. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  1786. user.Username, days, config.Threshold)
  1787. return nil
  1788. }
  1789. body := new(bytes.Buffer)
  1790. data := make(map[string]any)
  1791. data["Username"] = user.Username
  1792. data["Days"] = days
  1793. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  1794. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  1795. user.Username, err)
  1796. return err
  1797. }
  1798. subject := "SFTPGo password expiration notification"
  1799. startTime := time.Now()
  1800. if err := smtp.SendEmail([]string{user.Email}, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  1801. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  1802. user.Username, err, time.Since(startTime))
  1803. return err
  1804. }
  1805. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  1806. user.Username, days, time.Since(startTime))
  1807. return nil
  1808. }
  1809. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  1810. params *EventParams) error {
  1811. users, err := params.getUsers()
  1812. if err != nil {
  1813. return fmt.Errorf("unable to get users: %w", err)
  1814. }
  1815. var failures []string
  1816. for _, user := range users {
  1817. // if sender is set, the conditions have already been evaluated
  1818. if params.sender == "" {
  1819. if !checkUserConditionOptions(&user, &conditions) {
  1820. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  1821. user.Username)
  1822. continue
  1823. }
  1824. }
  1825. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  1826. params.AddError(err)
  1827. failures = append(failures, user.Username)
  1828. continue
  1829. }
  1830. }
  1831. if len(failures) > 0 {
  1832. return fmt.Errorf("password expiration check failed for users: %+v", failures)
  1833. }
  1834. return nil
  1835. }
  1836. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  1837. conditions dataprovider.ConditionOptions,
  1838. ) error {
  1839. var err error
  1840. switch action.Type {
  1841. case dataprovider.ActionTypeHTTP:
  1842. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  1843. case dataprovider.ActionTypeCommand:
  1844. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  1845. case dataprovider.ActionTypeEmail:
  1846. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  1847. case dataprovider.ActionTypeBackup:
  1848. var backupPath string
  1849. backupPath, err = dataprovider.ExecuteBackup()
  1850. if err == nil {
  1851. params.setBackupParams(backupPath)
  1852. }
  1853. case dataprovider.ActionTypeUserQuotaReset:
  1854. err = executeUsersQuotaResetRuleAction(conditions, params)
  1855. case dataprovider.ActionTypeFolderQuotaReset:
  1856. err = executeFoldersQuotaResetRuleAction(conditions, params)
  1857. case dataprovider.ActionTypeTransferQuotaReset:
  1858. err = executeTransferQuotaResetRuleAction(conditions, params)
  1859. case dataprovider.ActionTypeDataRetentionCheck:
  1860. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  1861. case dataprovider.ActionTypeMetadataCheck:
  1862. err = executeMetadataCheckRuleAction(conditions, params)
  1863. case dataprovider.ActionTypeFilesystem:
  1864. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  1865. case dataprovider.ActionTypePasswordExpirationCheck:
  1866. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  1867. default:
  1868. err = fmt.Errorf("unsupported action type: %d", action.Type)
  1869. }
  1870. if err != nil {
  1871. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  1872. }
  1873. params.AddError(err)
  1874. return err
  1875. }
  1876. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  1877. var errRes error
  1878. for _, rule := range rules {
  1879. var failedActions []string
  1880. paramsCopy := params.getACopy()
  1881. for _, action := range rule.Actions {
  1882. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  1883. startTime := time.Now()
  1884. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  1885. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  1886. action.Name, rule.Name, time.Since(startTime), err)
  1887. failedActions = append(failedActions, action.Name)
  1888. // we return the last error, it is ok for now
  1889. errRes = err
  1890. if action.Options.StopOnFailure {
  1891. break
  1892. }
  1893. } else {
  1894. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  1895. action.Name, rule.Name, time.Since(startTime))
  1896. }
  1897. }
  1898. }
  1899. // execute async actions if any, including failure actions
  1900. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  1901. }
  1902. return errRes
  1903. }
  1904. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  1905. eventManager.addAsyncTask()
  1906. defer eventManager.removeAsyncTask()
  1907. for _, rule := range rules {
  1908. executeRuleAsyncActions(rule, params.getACopy(), nil)
  1909. }
  1910. }
  1911. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  1912. for _, action := range rule.Actions {
  1913. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  1914. startTime := time.Now()
  1915. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1916. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  1917. action.Name, rule.Name, time.Since(startTime), err)
  1918. failedActions = append(failedActions, action.Name)
  1919. if action.Options.StopOnFailure {
  1920. break
  1921. }
  1922. } else {
  1923. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  1924. action.Name, rule.Name, time.Since(startTime))
  1925. }
  1926. }
  1927. }
  1928. if len(failedActions) > 0 {
  1929. params.updateStatusFromError = false
  1930. // execute failure actions
  1931. for _, action := range rule.Actions {
  1932. if action.Options.IsFailureAction {
  1933. startTime := time.Now()
  1934. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  1935. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  1936. action.Name, rule.Name, time.Since(startTime), err)
  1937. if action.Options.StopOnFailure {
  1938. break
  1939. }
  1940. } else {
  1941. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  1942. action.Name, rule.Name, time.Since(startTime))
  1943. }
  1944. }
  1945. }
  1946. }
  1947. }
  1948. type eventCronJob struct {
  1949. ruleName string
  1950. }
  1951. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  1952. if rule.GuardFromConcurrentExecution() {
  1953. task, err := dataprovider.GetTaskByName(rule.Name)
  1954. if _, ok := err.(*util.RecordNotFoundError); ok {
  1955. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  1956. task = dataprovider.Task{
  1957. Name: rule.Name,
  1958. UpdateAt: 0,
  1959. Version: 0,
  1960. }
  1961. err = dataprovider.AddTask(rule.Name)
  1962. if err != nil {
  1963. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  1964. return task, err
  1965. }
  1966. } else {
  1967. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  1968. }
  1969. return task, err
  1970. }
  1971. return dataprovider.Task{}, nil
  1972. }
  1973. func (j *eventCronJob) Run() {
  1974. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  1975. rule, err := dataprovider.EventRuleExists(j.ruleName)
  1976. if err != nil {
  1977. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  1978. return
  1979. }
  1980. if err = rule.CheckActionsConsistency(""); err != nil {
  1981. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  1982. return
  1983. }
  1984. task, err := j.getTask(rule)
  1985. if err != nil {
  1986. return
  1987. }
  1988. if task.Name != "" {
  1989. updateInterval := 5 * time.Minute
  1990. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  1991. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  1992. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  1993. return
  1994. }
  1995. err = dataprovider.UpdateTask(rule.Name, task.Version)
  1996. if err != nil {
  1997. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  1998. rule.Name, err)
  1999. return
  2000. }
  2001. ticker := time.NewTicker(updateInterval)
  2002. done := make(chan bool)
  2003. defer func() {
  2004. done <- true
  2005. ticker.Stop()
  2006. }()
  2007. go func(taskName string) {
  2008. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2009. for {
  2010. select {
  2011. case <-done:
  2012. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2013. return
  2014. case <-ticker.C:
  2015. err := dataprovider.UpdateTaskTimestamp(taskName)
  2016. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2017. }
  2018. }
  2019. }(task.Name)
  2020. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2021. } else {
  2022. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2023. }
  2024. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2025. }
  2026. type zipWriterWrapper struct {
  2027. Name string
  2028. Entries map[string]bool
  2029. Writer *zip.Writer
  2030. }
  2031. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2032. logger.Log(level, "eventmanager", "", format, v...)
  2033. }