eventmanager.go 75 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "mime/multipart"
  24. "net/http"
  25. "net/textproto"
  26. "net/url"
  27. "os"
  28. "os/exec"
  29. "path"
  30. "path/filepath"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/bmatcuk/doublestar/v4"
  37. "github.com/klauspost/compress/zip"
  38. "github.com/robfig/cron/v3"
  39. "github.com/rs/xid"
  40. "github.com/sftpgo/sdk"
  41. mail "github.com/xhit/go-simple-mail/v2"
  42. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  43. "github.com/drakkan/sftpgo/v2/internal/logger"
  44. "github.com/drakkan/sftpgo/v2/internal/plugin"
  45. "github.com/drakkan/sftpgo/v2/internal/smtp"
  46. "github.com/drakkan/sftpgo/v2/internal/util"
  47. "github.com/drakkan/sftpgo/v2/internal/vfs"
  48. )
  49. const (
  50. ipBlockedEventName = "IP Blocked"
  51. maxAttachmentsSize = int64(10 * 1024 * 1024)
  52. )
  53. var (
  54. // eventManager handle the supported event rules actions
  55. eventManager eventRulesContainer
  56. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  57. )
  58. func init() {
  59. eventManager = eventRulesContainer{
  60. schedulesMapping: make(map[string][]cron.EntryID),
  61. // arbitrary maximum number of concurrent asynchronous tasks,
  62. // each task could execute multiple actions
  63. concurrencyGuard: make(chan struct{}, 200),
  64. }
  65. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  66. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  67. eventManager.handleProviderEvent(EventParams{
  68. Name: executor,
  69. ObjectName: objectName,
  70. Event: operation,
  71. Status: 1,
  72. ObjectType: objectType,
  73. IP: ip,
  74. Role: role,
  75. Timestamp: time.Now().UnixNano(),
  76. Object: object,
  77. })
  78. })
  79. }
  80. // HandleCertificateEvent checks and executes action rules for certificate events
  81. func HandleCertificateEvent(params EventParams) {
  82. eventManager.handleCertificateEvent(params)
  83. }
  84. // eventRulesContainer stores event rules by trigger
  85. type eventRulesContainer struct {
  86. sync.RWMutex
  87. lastLoad atomic.Int64
  88. FsEvents []dataprovider.EventRule
  89. ProviderEvents []dataprovider.EventRule
  90. Schedules []dataprovider.EventRule
  91. IPBlockedEvents []dataprovider.EventRule
  92. CertificateEvents []dataprovider.EventRule
  93. schedulesMapping map[string][]cron.EntryID
  94. concurrencyGuard chan struct{}
  95. }
  96. func (r *eventRulesContainer) addAsyncTask() {
  97. activeHooks.Add(1)
  98. r.concurrencyGuard <- struct{}{}
  99. }
  100. func (r *eventRulesContainer) removeAsyncTask() {
  101. activeHooks.Add(-1)
  102. <-r.concurrencyGuard
  103. }
  104. func (r *eventRulesContainer) getLastLoadTime() int64 {
  105. return r.lastLoad.Load()
  106. }
  107. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  108. r.lastLoad.Store(modTime)
  109. }
  110. // RemoveRule deletes the rule with the specified name
  111. func (r *eventRulesContainer) RemoveRule(name string) {
  112. r.Lock()
  113. defer r.Unlock()
  114. r.removeRuleInternal(name)
  115. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  116. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  117. }
  118. func (r *eventRulesContainer) removeRuleInternal(name string) {
  119. for idx := range r.FsEvents {
  120. if r.FsEvents[idx].Name == name {
  121. lastIdx := len(r.FsEvents) - 1
  122. r.FsEvents[idx] = r.FsEvents[lastIdx]
  123. r.FsEvents = r.FsEvents[:lastIdx]
  124. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  125. return
  126. }
  127. }
  128. for idx := range r.ProviderEvents {
  129. if r.ProviderEvents[idx].Name == name {
  130. lastIdx := len(r.ProviderEvents) - 1
  131. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  132. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  133. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  134. return
  135. }
  136. }
  137. for idx := range r.IPBlockedEvents {
  138. if r.IPBlockedEvents[idx].Name == name {
  139. lastIdx := len(r.IPBlockedEvents) - 1
  140. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  141. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  142. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  143. return
  144. }
  145. }
  146. for idx := range r.CertificateEvents {
  147. if r.CertificateEvents[idx].Name == name {
  148. lastIdx := len(r.CertificateEvents) - 1
  149. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  150. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  151. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  152. return
  153. }
  154. }
  155. for idx := range r.Schedules {
  156. if r.Schedules[idx].Name == name {
  157. if schedules, ok := r.schedulesMapping[name]; ok {
  158. for _, entryID := range schedules {
  159. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  160. eventScheduler.Remove(entryID)
  161. }
  162. delete(r.schedulesMapping, name)
  163. }
  164. lastIdx := len(r.Schedules) - 1
  165. r.Schedules[idx] = r.Schedules[lastIdx]
  166. r.Schedules = r.Schedules[:lastIdx]
  167. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  168. return
  169. }
  170. }
  171. }
  172. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  173. r.removeRuleInternal(rule.Name)
  174. if rule.DeletedAt > 0 {
  175. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  176. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  177. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  178. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  179. }
  180. return
  181. }
  182. switch rule.Trigger {
  183. case dataprovider.EventTriggerFsEvent:
  184. r.FsEvents = append(r.FsEvents, rule)
  185. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  186. case dataprovider.EventTriggerProviderEvent:
  187. r.ProviderEvents = append(r.ProviderEvents, rule)
  188. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  189. case dataprovider.EventTriggerIPBlocked:
  190. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  191. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  192. case dataprovider.EventTriggerCertificate:
  193. r.CertificateEvents = append(r.CertificateEvents, rule)
  194. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  195. case dataprovider.EventTriggerSchedule:
  196. for _, schedule := range rule.Conditions.Schedules {
  197. cronSpec := schedule.GetCronSpec()
  198. job := &eventCronJob{
  199. ruleName: dataprovider.ConvertName(rule.Name),
  200. }
  201. entryID, err := eventScheduler.AddJob(cronSpec, job)
  202. if err != nil {
  203. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  204. return
  205. }
  206. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  207. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  208. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  209. }
  210. r.Schedules = append(r.Schedules, rule)
  211. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  212. default:
  213. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  214. }
  215. }
  216. func (r *eventRulesContainer) loadRules() {
  217. eventManagerLog(logger.LevelDebug, "loading updated rules")
  218. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  219. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  220. if err != nil {
  221. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  222. return
  223. }
  224. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  225. if len(rules) > 0 {
  226. r.Lock()
  227. defer r.Unlock()
  228. for _, rule := range rules {
  229. r.addUpdateRuleInternal(rule)
  230. }
  231. }
  232. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  233. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  234. r.setLastLoadTime(modTime)
  235. }
  236. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  237. if !util.Contains(conditions.ProviderEvents, params.Event) {
  238. return false
  239. }
  240. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  241. return false
  242. }
  243. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  244. return false
  245. }
  246. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  247. return false
  248. }
  249. return true
  250. }
  251. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  252. if !util.Contains(conditions.FsEvents, params.Event) {
  253. return false
  254. }
  255. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  256. return false
  257. }
  258. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  259. return false
  260. }
  261. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  262. return false
  263. }
  264. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  265. return false
  266. }
  267. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  268. return false
  269. }
  270. if params.Event == operationUpload || params.Event == operationDownload {
  271. if conditions.Options.MinFileSize > 0 {
  272. if params.FileSize < conditions.Options.MinFileSize {
  273. return false
  274. }
  275. }
  276. if conditions.Options.MaxFileSize > 0 {
  277. if params.FileSize > conditions.Options.MaxFileSize {
  278. return false
  279. }
  280. }
  281. }
  282. return true
  283. }
  284. // hasFsRules returns true if there are any rules for filesystem event triggers
  285. func (r *eventRulesContainer) hasFsRules() bool {
  286. r.RLock()
  287. defer r.RUnlock()
  288. return len(r.FsEvents) > 0
  289. }
  290. // handleFsEvent executes the rules actions defined for the specified event.
  291. // The boolean parameter indicates whether a sync action was executed
  292. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  293. if params.Protocol == protocolEventAction {
  294. return false, nil
  295. }
  296. r.RLock()
  297. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  298. for _, rule := range r.FsEvents {
  299. if r.checkFsEventMatch(rule.Conditions, params) {
  300. if err := rule.CheckActionsConsistency(""); err != nil {
  301. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  302. rule.Name, err, params.Event)
  303. continue
  304. }
  305. hasSyncActions := false
  306. for _, action := range rule.Actions {
  307. if action.Options.ExecuteSync {
  308. hasSyncActions = true
  309. break
  310. }
  311. }
  312. if hasSyncActions {
  313. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  314. } else {
  315. rulesAsync = append(rulesAsync, rule)
  316. }
  317. }
  318. }
  319. r.RUnlock()
  320. params.sender = params.Name
  321. if len(rulesAsync) > 0 {
  322. go executeAsyncRulesActions(rulesAsync, params)
  323. }
  324. if len(rulesWithSyncActions) > 0 {
  325. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  326. }
  327. return false, nil
  328. }
  329. // username is populated for user objects
  330. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  331. r.RLock()
  332. defer r.RUnlock()
  333. var rules []dataprovider.EventRule
  334. for _, rule := range r.ProviderEvents {
  335. if r.checkProviderEventMatch(rule.Conditions, params) {
  336. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  337. rules = append(rules, rule)
  338. } else {
  339. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  340. rule.Name, err, params.Event, params.ObjectType)
  341. }
  342. }
  343. }
  344. if len(rules) > 0 {
  345. params.sender = params.ObjectName
  346. go executeAsyncRulesActions(rules, params)
  347. }
  348. }
  349. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  350. r.RLock()
  351. defer r.RUnlock()
  352. if len(r.IPBlockedEvents) == 0 {
  353. return
  354. }
  355. var rules []dataprovider.EventRule
  356. for _, rule := range r.IPBlockedEvents {
  357. if err := rule.CheckActionsConsistency(""); err == nil {
  358. rules = append(rules, rule)
  359. } else {
  360. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  361. rule.Name, err, params.Event)
  362. }
  363. }
  364. if len(rules) > 0 {
  365. go executeAsyncRulesActions(rules, params)
  366. }
  367. }
  368. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  369. r.RLock()
  370. defer r.RUnlock()
  371. if len(r.CertificateEvents) == 0 {
  372. return
  373. }
  374. var rules []dataprovider.EventRule
  375. for _, rule := range r.CertificateEvents {
  376. if err := rule.CheckActionsConsistency(""); err == nil {
  377. rules = append(rules, rule)
  378. } else {
  379. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  380. rule.Name, err, params.Event)
  381. }
  382. }
  383. if len(rules) > 0 {
  384. go executeAsyncRulesActions(rules, params)
  385. }
  386. }
  387. type executedRetentionCheck struct {
  388. Username string
  389. ActionName string
  390. Results []folderRetentionCheckResult
  391. }
  392. // EventParams defines the supported event parameters
  393. type EventParams struct {
  394. Name string
  395. Groups []sdk.GroupMapping
  396. Event string
  397. Status int
  398. VirtualPath string
  399. FsPath string
  400. VirtualTargetPath string
  401. FsTargetPath string
  402. ObjectName string
  403. ObjectType string
  404. FileSize int64
  405. Protocol string
  406. IP string
  407. Role string
  408. Timestamp int64
  409. Object plugin.Renderer
  410. sender string
  411. updateStatusFromError bool
  412. errors []string
  413. retentionChecks []executedRetentionCheck
  414. }
  415. func (p *EventParams) getACopy() *EventParams {
  416. params := *p
  417. params.errors = make([]string, len(p.errors))
  418. copy(params.errors, p.errors)
  419. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  420. for _, c := range p.retentionChecks {
  421. executedCheck := executedRetentionCheck{
  422. Username: c.Username,
  423. ActionName: c.ActionName,
  424. }
  425. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  426. copy(executedCheck.Results, c.Results)
  427. retentionChecks = append(retentionChecks, executedCheck)
  428. }
  429. params.retentionChecks = retentionChecks
  430. return &params
  431. }
  432. // AddError adds a new error to the event params and update the status if needed
  433. func (p *EventParams) AddError(err error) {
  434. if err == nil {
  435. return
  436. }
  437. if p.updateStatusFromError && p.Status == 1 {
  438. p.Status = 2
  439. }
  440. p.errors = append(p.errors, err.Error())
  441. }
  442. func (p *EventParams) setBackupParams(backupPath string) {
  443. if p.sender != "" {
  444. return
  445. }
  446. p.sender = dataprovider.ActionExecutorSystem
  447. p.FsPath = backupPath
  448. p.ObjectName = filepath.Base(backupPath)
  449. p.VirtualPath = "/" + p.ObjectName
  450. p.Timestamp = time.Now().UnixNano()
  451. info, err := os.Stat(backupPath)
  452. if err == nil {
  453. p.FileSize = info.Size()
  454. }
  455. }
  456. func (p *EventParams) getStatusString() string {
  457. switch p.Status {
  458. case 1:
  459. return "OK"
  460. default:
  461. return "KO"
  462. }
  463. }
  464. // getUsers returns users with group settings not applied
  465. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  466. if p.sender == "" {
  467. users, err := dataprovider.DumpUsers()
  468. if err != nil {
  469. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  470. return users, errors.New("unable to get users")
  471. }
  472. return users, nil
  473. }
  474. user, err := p.getUserFromSender()
  475. if err != nil {
  476. return nil, err
  477. }
  478. return []dataprovider.User{user}, nil
  479. }
  480. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  481. if p.sender == dataprovider.ActionExecutorSystem {
  482. return dataprovider.User{
  483. BaseUser: sdk.BaseUser{
  484. Status: 1,
  485. Username: p.sender,
  486. HomeDir: dataprovider.GetBackupsPath(),
  487. Permissions: map[string][]string{
  488. "/": {dataprovider.PermAny},
  489. },
  490. },
  491. }, nil
  492. }
  493. user, err := dataprovider.UserExists(p.sender, "")
  494. if err != nil {
  495. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  496. return user, fmt.Errorf("error getting user %q", p.sender)
  497. }
  498. return user, nil
  499. }
  500. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  501. if p.sender == "" {
  502. return dataprovider.DumpFolders()
  503. }
  504. folder, err := dataprovider.GetFolderByName(p.sender)
  505. if err != nil {
  506. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  507. }
  508. return []vfs.BaseVirtualFolder{folder}, nil
  509. }
  510. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  511. if len(p.retentionChecks) == 0 {
  512. return nil, errors.New("no data retention report available")
  513. }
  514. var b bytes.Buffer
  515. wr := zip.NewWriter(&b)
  516. for _, check := range p.retentionChecks {
  517. if size := int64(len(b.Bytes())); size > maxAttachmentsSize {
  518. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  519. return nil, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(size))
  520. }
  521. data, err := getCSVRetentionReport(check.Results)
  522. if err != nil {
  523. return nil, fmt.Errorf("unable to get CSV report: %w", err)
  524. }
  525. fh := &zip.FileHeader{
  526. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  527. Method: zip.Deflate,
  528. Modified: time.Now().UTC(),
  529. }
  530. f, err := wr.CreateHeader(fh)
  531. if err != nil {
  532. return nil, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  533. }
  534. _, err = io.Copy(f, bytes.NewBuffer(data))
  535. if err != nil {
  536. return nil, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  537. }
  538. }
  539. if err := wr.Close(); err != nil {
  540. return nil, fmt.Errorf("unable to close zip writer: %w", err)
  541. }
  542. return b.Bytes(), nil
  543. }
  544. func (p *EventParams) getRetentionReportsAsMailAttachment() (mail.File, error) {
  545. var result mail.File
  546. data, err := p.getCompressedDataRetentionReport()
  547. if err != nil {
  548. return result, err
  549. }
  550. result.Name = "retention-reports.zip"
  551. result.Data = data
  552. return result, nil
  553. }
  554. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  555. replacements := []string{
  556. "{{Name}}", p.Name,
  557. "{{Event}}", p.Event,
  558. "{{Status}}", fmt.Sprintf("%d", p.Status),
  559. "{{VirtualPath}}", p.VirtualPath,
  560. "{{FsPath}}", p.FsPath,
  561. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  562. "{{FsTargetPath}}", p.FsTargetPath,
  563. "{{ObjectName}}", p.ObjectName,
  564. "{{ObjectType}}", p.ObjectType,
  565. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  566. "{{Protocol}}", p.Protocol,
  567. "{{IP}}", p.IP,
  568. "{{Role}}", p.Role,
  569. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  570. "{{StatusString}}", p.getStatusString(),
  571. }
  572. if p.VirtualPath != "" {
  573. replacements = append(replacements, "{{VirtualDirPath}}", path.Dir(p.VirtualPath))
  574. }
  575. if p.VirtualTargetPath != "" {
  576. replacements = append(replacements, "{{VirtualTargetDirPath}}", path.Dir(p.VirtualTargetPath))
  577. replacements = append(replacements, "{{TargetName}}", path.Base(p.VirtualTargetPath))
  578. }
  579. if len(p.errors) > 0 {
  580. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  581. } else {
  582. replacements = append(replacements, "{{ErrorString}}", "")
  583. }
  584. replacements = append(replacements, "{{ObjectData}}", "")
  585. if addObjectData {
  586. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  587. if err == nil {
  588. replacements[len(replacements)-1] = string(data)
  589. }
  590. }
  591. return replacements
  592. }
  593. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  594. var b bytes.Buffer
  595. csvWriter := csv.NewWriter(&b)
  596. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  597. "elapsed (ms)", "info", "error"})
  598. if err != nil {
  599. return nil, err
  600. }
  601. for _, result := range results {
  602. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  603. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  604. result.Info, result.Error})
  605. if err != nil {
  606. return nil, err
  607. }
  608. }
  609. csvWriter.Flush()
  610. err = csvWriter.Error()
  611. return b.Bytes(), err
  612. }
  613. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  614. numFiles int, truncatedSize int64, errTransfer error, operation string,
  615. ) error {
  616. errWrite := w.Close()
  617. targetPath := virtualSourcePath
  618. if virtualTargetPath != "" {
  619. targetPath = virtualTargetPath
  620. }
  621. info, err := conn.doStatInternal(targetPath, 0, false, false)
  622. if err == nil {
  623. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  624. var fsSrcPath, fsDstPath string
  625. var errSrcFs, errDstFs error
  626. if virtualSourcePath != "" {
  627. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  628. }
  629. if virtualTargetPath != "" {
  630. _, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  631. }
  632. if errSrcFs == nil && errDstFs == nil {
  633. if errTransfer == nil {
  634. errTransfer = errWrite
  635. }
  636. if operation == operationCopy {
  637. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  638. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr)
  639. }
  640. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer) //nolint:errcheck
  641. }
  642. } else {
  643. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  644. }
  645. if errTransfer != nil {
  646. return errTransfer
  647. }
  648. return errWrite
  649. }
  650. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  651. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  652. if err != nil {
  653. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  654. return
  655. }
  656. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  657. if vfolder.IsIncludedInUserQuota() {
  658. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  659. }
  660. }
  661. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  662. if numFiles == 0 {
  663. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  664. return conn.GetPermissionDeniedError()
  665. }
  666. } else {
  667. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  668. return conn.GetPermissionDeniedError()
  669. }
  670. }
  671. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  672. if !q.HasSpace {
  673. return conn.GetQuotaExceededError()
  674. }
  675. if expectedSize != -1 {
  676. sizeDiff := expectedSize - truncatedSize
  677. if sizeDiff > 0 {
  678. remainingSize := q.GetRemainingSize()
  679. if remainingSize > 0 && remainingSize < sizeDiff {
  680. return conn.GetQuotaExceededError()
  681. }
  682. }
  683. }
  684. return nil
  685. }
  686. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  687. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  688. if err != nil {
  689. return nil, 0, 0, nil, err
  690. }
  691. var truncatedSize, fileSize int64
  692. numFiles := 1
  693. isFileOverwrite := false
  694. info, err := fs.Lstat(fsPath)
  695. if err == nil {
  696. fileSize = info.Size()
  697. if info.IsDir() {
  698. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  699. }
  700. if info.Mode().IsRegular() {
  701. isFileOverwrite = true
  702. truncatedSize = fileSize
  703. }
  704. numFiles = 0
  705. }
  706. if err != nil && !fs.IsNotExist(err) {
  707. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  708. }
  709. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  710. return nil, numFiles, truncatedSize, nil, err
  711. }
  712. f, w, cancelFn, err := fs.Create(fsPath, 0)
  713. if err != nil {
  714. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  715. }
  716. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  717. if isFileOverwrite {
  718. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  719. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  720. truncatedSize = 0
  721. }
  722. }
  723. if cancelFn == nil {
  724. cancelFn = func() {}
  725. }
  726. if f != nil {
  727. return f, numFiles, truncatedSize, cancelFn, nil
  728. }
  729. return w, numFiles, truncatedSize, cancelFn, nil
  730. }
  731. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  732. if entryPath == wr.Name {
  733. // skip the archive itself
  734. return nil
  735. }
  736. info, err := conn.DoStat(entryPath, 1, false)
  737. if err != nil {
  738. eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
  739. return err
  740. }
  741. entryName, err := getZipEntryName(entryPath, baseDir)
  742. if err != nil {
  743. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  744. return err
  745. }
  746. if _, ok := wr.Entries[entryName]; ok {
  747. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  748. return nil
  749. }
  750. wr.Entries[entryName] = true
  751. if info.IsDir() {
  752. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  753. Name: entryName + "/",
  754. Method: zip.Deflate,
  755. Modified: info.ModTime(),
  756. })
  757. if err != nil {
  758. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  759. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  760. }
  761. contents, err := conn.ListDir(entryPath)
  762. if err != nil {
  763. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  764. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  765. }
  766. for _, info := range contents {
  767. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  768. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  769. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  770. return err
  771. }
  772. }
  773. return nil
  774. }
  775. if !info.Mode().IsRegular() {
  776. // we only allow regular files
  777. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  778. return nil
  779. }
  780. reader, cancelFn, err := getFileReader(conn, entryPath)
  781. if err != nil {
  782. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  783. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  784. }
  785. defer cancelFn()
  786. defer reader.Close()
  787. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  788. Name: entryName,
  789. Method: zip.Deflate,
  790. Modified: info.ModTime(),
  791. })
  792. if err != nil {
  793. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  794. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  795. }
  796. _, err = io.Copy(f, reader)
  797. return err
  798. }
  799. func getZipEntryName(entryPath, baseDir string) (string, error) {
  800. if !strings.HasPrefix(entryPath, baseDir) {
  801. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  802. }
  803. entryPath = strings.TrimPrefix(entryPath, baseDir)
  804. return strings.TrimPrefix(entryPath, "/"), nil
  805. }
  806. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  807. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  808. return nil, nil, conn.GetPermissionDeniedError()
  809. }
  810. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  811. if err != nil {
  812. return nil, nil, err
  813. }
  814. f, r, cancelFn, err := fs.Open(fsPath, 0)
  815. if err != nil {
  816. return nil, nil, conn.GetFsError(fs, err)
  817. }
  818. if cancelFn == nil {
  819. cancelFn = func() {}
  820. }
  821. if f != nil {
  822. return f, cancelFn, nil
  823. }
  824. return r, cancelFn, nil
  825. }
  826. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  827. reader, cancelFn, err := getFileReader(conn, virtualPath)
  828. if err != nil {
  829. return err
  830. }
  831. defer cancelFn()
  832. defer reader.Close()
  833. _, err = io.Copy(w, reader)
  834. return err
  835. }
  836. func getFileContent(conn *BaseConnection, virtualPath string, expectedSize int) ([]byte, error) {
  837. reader, cancelFn, err := getFileReader(conn, virtualPath)
  838. if err != nil {
  839. return nil, err
  840. }
  841. defer cancelFn()
  842. defer reader.Close()
  843. data := make([]byte, expectedSize)
  844. _, err = io.ReadFull(reader, data)
  845. return data, err
  846. }
  847. func getMailAttachments(user dataprovider.User, attachments []string, replacer *strings.Replacer) ([]mail.File, error) {
  848. var files []mail.File
  849. user, err := getUserForEventAction(user)
  850. if err != nil {
  851. return nil, err
  852. }
  853. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  854. err = user.CheckFsRoot(connectionID)
  855. defer user.CloseFs() //nolint:errcheck
  856. if err != nil {
  857. return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  858. }
  859. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  860. totalSize := int64(0)
  861. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  862. info, err := conn.DoStat(virtualPath, 0, false)
  863. if err != nil {
  864. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  865. }
  866. if !info.Mode().IsRegular() {
  867. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  868. }
  869. totalSize += info.Size()
  870. if totalSize > maxAttachmentsSize {
  871. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  872. }
  873. data, err := getFileContent(conn, virtualPath, int(info.Size()))
  874. if err != nil {
  875. return nil, fmt.Errorf("unable to get content for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  876. }
  877. files = append(files, mail.File{
  878. Name: path.Base(virtualPath),
  879. Data: data,
  880. })
  881. }
  882. return files, nil
  883. }
  884. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  885. if !strings.Contains(input, "{{") {
  886. return input
  887. }
  888. return replacer.Replace(input)
  889. }
  890. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  891. var matched bool
  892. var err error
  893. if strings.Contains(p.Pattern, "**") {
  894. matched, err = doublestar.Match(p.Pattern, name)
  895. } else {
  896. matched, err = path.Match(p.Pattern, name)
  897. }
  898. if err != nil {
  899. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  900. return false
  901. }
  902. if p.InverseMatch {
  903. return !matched
  904. }
  905. return matched
  906. }
  907. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  908. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  909. return false
  910. }
  911. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  912. return false
  913. }
  914. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  915. return false
  916. }
  917. return true
  918. }
  919. // checkConditionPatterns returns false if patterns are defined and no match is found
  920. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  921. if len(patterns) == 0 {
  922. return true
  923. }
  924. for _, p := range patterns {
  925. if checkEventConditionPattern(p, name) {
  926. return true
  927. }
  928. }
  929. return false
  930. }
  931. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  932. if len(patterns) == 0 {
  933. return true
  934. }
  935. for _, group := range groups {
  936. for _, p := range patterns {
  937. if checkEventConditionPattern(p, group.Name) {
  938. return true
  939. }
  940. }
  941. }
  942. return false
  943. }
  944. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  945. if len(c.QueryParameters) > 0 {
  946. u, err := url.Parse(c.Endpoint)
  947. if err != nil {
  948. return "", fmt.Errorf("invalid endpoint: %w", err)
  949. }
  950. q := u.Query()
  951. for _, keyVal := range c.QueryParameters {
  952. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  953. }
  954. u.RawQuery = q.Encode()
  955. return u.String(), nil
  956. }
  957. return c.Endpoint, nil
  958. }
  959. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  960. conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
  961. ) error {
  962. partWriter, err := m.CreatePart(h)
  963. if err != nil {
  964. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  965. return err
  966. }
  967. if part.Body != "" {
  968. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  969. if err != nil {
  970. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  971. return err
  972. }
  973. return nil
  974. }
  975. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  976. data, err := params.getCompressedDataRetentionReport()
  977. if err != nil {
  978. return err
  979. }
  980. _, err = partWriter.Write(data)
  981. if err != nil {
  982. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  983. return err
  984. }
  985. return nil
  986. }
  987. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  988. if err != nil {
  989. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  990. return err
  991. }
  992. return nil
  993. }
  994. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  995. cancel context.CancelFunc, user dataprovider.User, params *EventParams,
  996. ) (io.ReadCloser, string, error) {
  997. var body io.ReadCloser
  998. if c.Method == http.MethodGet {
  999. return body, "", nil
  1000. }
  1001. if c.Body != "" {
  1002. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1003. data, err := params.getCompressedDataRetentionReport()
  1004. if err != nil {
  1005. return body, "", err
  1006. }
  1007. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  1008. }
  1009. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  1010. }
  1011. if len(c.Parts) > 0 {
  1012. r, w := io.Pipe()
  1013. m := multipart.NewWriter(w)
  1014. var conn *BaseConnection
  1015. if user.Username != "" {
  1016. var err error
  1017. user, err = getUserForEventAction(user)
  1018. if err != nil {
  1019. return body, "", err
  1020. }
  1021. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1022. err = user.CheckFsRoot(connectionID)
  1023. if err != nil {
  1024. user.CloseFs() //nolint:errcheck
  1025. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1026. user.Username, err)
  1027. }
  1028. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1029. }
  1030. go func() {
  1031. defer w.Close()
  1032. defer user.CloseFs() //nolint:errcheck
  1033. for _, part := range c.Parts {
  1034. h := make(textproto.MIMEHeader)
  1035. if part.Body != "" {
  1036. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1037. } else {
  1038. h.Set("Content-Disposition",
  1039. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1040. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  1041. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1042. if contentType == "" {
  1043. contentType = "application/octet-stream"
  1044. }
  1045. h.Set("Content-Type", contentType)
  1046. }
  1047. for _, keyVal := range part.Headers {
  1048. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1049. }
  1050. if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
  1051. cancel()
  1052. return
  1053. }
  1054. }
  1055. m.Close()
  1056. }()
  1057. return r, m.FormDataContentType(), nil
  1058. }
  1059. return body, "", nil
  1060. }
  1061. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1062. if err := c.TryDecryptPassword(); err != nil {
  1063. return err
  1064. }
  1065. addObjectData := false
  1066. if params.Object != nil {
  1067. addObjectData = c.HasObjectData()
  1068. }
  1069. replacements := params.getStringReplacements(addObjectData)
  1070. replacer := strings.NewReplacer(replacements...)
  1071. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  1072. if err != nil {
  1073. return err
  1074. }
  1075. ctx, cancel := c.GetContext()
  1076. defer cancel()
  1077. var user dataprovider.User
  1078. if c.HasMultipartFiles() {
  1079. user, err = params.getUserFromSender()
  1080. if err != nil {
  1081. return err
  1082. }
  1083. }
  1084. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
  1085. if err != nil {
  1086. return err
  1087. }
  1088. if body != nil {
  1089. defer body.Close()
  1090. }
  1091. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1092. if err != nil {
  1093. return err
  1094. }
  1095. if contentType != "" {
  1096. req.Header.Set("Content-Type", contentType)
  1097. }
  1098. if c.Username != "" {
  1099. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1100. }
  1101. for _, keyVal := range c.Headers {
  1102. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1103. }
  1104. client := c.GetHTTPClient()
  1105. defer client.CloseIdleConnections()
  1106. startTime := time.Now()
  1107. resp, err := client.Do(req)
  1108. if err != nil {
  1109. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1110. endpoint, time.Since(startTime), err)
  1111. return fmt.Errorf("error sending HTTP request: %w", err)
  1112. }
  1113. defer resp.Body.Close()
  1114. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1115. endpoint, time.Since(startTime), resp.StatusCode)
  1116. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1117. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1118. }
  1119. return nil
  1120. }
  1121. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1122. addObjectData := false
  1123. if params.Object != nil {
  1124. for _, k := range c.EnvVars {
  1125. if strings.Contains(k.Value, "{{ObjectData}}") {
  1126. addObjectData = true
  1127. break
  1128. }
  1129. }
  1130. }
  1131. replacements := params.getStringReplacements(addObjectData)
  1132. replacer := strings.NewReplacer(replacements...)
  1133. args := make([]string, 0, len(c.Args))
  1134. for _, arg := range c.Args {
  1135. args = append(args, replaceWithReplacer(arg, replacer))
  1136. }
  1137. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1138. defer cancel()
  1139. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1140. cmd.Env = []string{}
  1141. for _, keyVal := range c.EnvVars {
  1142. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1143. }
  1144. startTime := time.Now()
  1145. err := cmd.Run()
  1146. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1147. c.Cmd, time.Since(startTime), err)
  1148. return err
  1149. }
  1150. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1151. addObjectData := false
  1152. if params.Object != nil {
  1153. if strings.Contains(c.Body, "{{ObjectData}}") {
  1154. addObjectData = true
  1155. }
  1156. }
  1157. replacements := params.getStringReplacements(addObjectData)
  1158. replacer := strings.NewReplacer(replacements...)
  1159. body := replaceWithReplacer(c.Body, replacer)
  1160. subject := replaceWithReplacer(c.Subject, replacer)
  1161. startTime := time.Now()
  1162. var files []mail.File
  1163. fileAttachments := make([]string, 0, len(c.Attachments))
  1164. for _, attachment := range c.Attachments {
  1165. if attachment == dataprovider.RetentionReportPlaceHolder {
  1166. f, err := params.getRetentionReportsAsMailAttachment()
  1167. if err != nil {
  1168. return err
  1169. }
  1170. files = append(files, f)
  1171. continue
  1172. }
  1173. fileAttachments = append(fileAttachments, attachment)
  1174. }
  1175. if len(fileAttachments) > 0 {
  1176. user, err := params.getUserFromSender()
  1177. if err != nil {
  1178. return err
  1179. }
  1180. res, err := getMailAttachments(user, fileAttachments, replacer)
  1181. if err != nil {
  1182. return err
  1183. }
  1184. files = append(files, res...)
  1185. }
  1186. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1187. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1188. time.Since(startTime), err)
  1189. if err != nil {
  1190. return fmt.Errorf("unable to send email: %w", err)
  1191. }
  1192. return nil
  1193. }
  1194. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1195. err := user.LoadAndApplyGroupSettings()
  1196. if err != nil {
  1197. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1198. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1199. }
  1200. user.UploadDataTransfer = 0
  1201. user.UploadBandwidth = 0
  1202. user.DownloadBandwidth = 0
  1203. user.Filters.DisableFsChecks = false
  1204. user.Filters.FilePatterns = nil
  1205. user.Filters.BandwidthLimits = nil
  1206. user.Filters.DataTransferLimits = nil
  1207. for k := range user.Permissions {
  1208. user.Permissions[k] = []string{dataprovider.PermAny}
  1209. }
  1210. return user, nil
  1211. }
  1212. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1213. results := make([]string, 0, len(paths))
  1214. for _, p := range paths {
  1215. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1216. }
  1217. return util.RemoveDuplicates(results, false)
  1218. }
  1219. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1220. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1221. if err != nil {
  1222. return err
  1223. }
  1224. return conn.RemoveFile(fs, fsPath, item, info)
  1225. }
  1226. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1227. user, err := getUserForEventAction(user)
  1228. if err != nil {
  1229. return err
  1230. }
  1231. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1232. err = user.CheckFsRoot(connectionID)
  1233. defer user.CloseFs() //nolint:errcheck
  1234. if err != nil {
  1235. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1236. }
  1237. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1238. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1239. info, err := conn.DoStat(item, 0, false)
  1240. if err != nil {
  1241. if conn.IsNotExistError(err) {
  1242. continue
  1243. }
  1244. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1245. }
  1246. if info.IsDir() {
  1247. if err = conn.RemoveDir(item); err != nil {
  1248. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1249. }
  1250. } else {
  1251. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1252. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1253. }
  1254. }
  1255. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1256. }
  1257. return nil
  1258. }
  1259. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1260. conditions dataprovider.ConditionOptions, params *EventParams,
  1261. ) error {
  1262. users, err := params.getUsers()
  1263. if err != nil {
  1264. return fmt.Errorf("unable to get users: %w", err)
  1265. }
  1266. var failures []string
  1267. executed := 0
  1268. for _, user := range users {
  1269. // if sender is set, the conditions have already been evaluated
  1270. if params.sender == "" {
  1271. if !checkUserConditionOptions(&user, &conditions) {
  1272. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1273. user.Username)
  1274. continue
  1275. }
  1276. }
  1277. executed++
  1278. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1279. params.AddError(err)
  1280. failures = append(failures, user.Username)
  1281. continue
  1282. }
  1283. }
  1284. if len(failures) > 0 {
  1285. return fmt.Errorf("fs delete failed for users: %+v", failures)
  1286. }
  1287. if executed == 0 {
  1288. eventManagerLog(logger.LevelError, "no delete executed")
  1289. return errors.New("no delete executed")
  1290. }
  1291. return nil
  1292. }
  1293. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1294. user, err := getUserForEventAction(user)
  1295. if err != nil {
  1296. return err
  1297. }
  1298. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1299. err = user.CheckFsRoot(connectionID)
  1300. defer user.CloseFs() //nolint:errcheck
  1301. if err != nil {
  1302. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1303. }
  1304. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1305. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1306. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1307. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1308. }
  1309. if err = conn.createDirIfMissing(item); err != nil {
  1310. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1311. }
  1312. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1313. }
  1314. return nil
  1315. }
  1316. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1317. conditions dataprovider.ConditionOptions, params *EventParams,
  1318. ) error {
  1319. users, err := params.getUsers()
  1320. if err != nil {
  1321. return fmt.Errorf("unable to get users: %w", err)
  1322. }
  1323. var failures []string
  1324. executed := 0
  1325. for _, user := range users {
  1326. // if sender is set, the conditions have already been evaluated
  1327. if params.sender == "" {
  1328. if !checkUserConditionOptions(&user, &conditions) {
  1329. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1330. user.Username)
  1331. continue
  1332. }
  1333. }
  1334. executed++
  1335. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1336. failures = append(failures, user.Username)
  1337. continue
  1338. }
  1339. }
  1340. if len(failures) > 0 {
  1341. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  1342. }
  1343. if executed == 0 {
  1344. eventManagerLog(logger.LevelError, "no mkdir executed")
  1345. return errors.New("no mkdir executed")
  1346. }
  1347. return nil
  1348. }
  1349. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1350. user dataprovider.User,
  1351. ) error {
  1352. user, err := getUserForEventAction(user)
  1353. if err != nil {
  1354. return err
  1355. }
  1356. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1357. err = user.CheckFsRoot(connectionID)
  1358. defer user.CloseFs() //nolint:errcheck
  1359. if err != nil {
  1360. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1361. }
  1362. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1363. for _, item := range renames {
  1364. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1365. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1366. if err = conn.renameInternal(source, target, true); err != nil {
  1367. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1368. }
  1369. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1370. }
  1371. return nil
  1372. }
  1373. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1374. user dataprovider.User,
  1375. ) error {
  1376. user, err := getUserForEventAction(user)
  1377. if err != nil {
  1378. return err
  1379. }
  1380. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1381. err = user.CheckFsRoot(connectionID)
  1382. defer user.CloseFs() //nolint:errcheck
  1383. if err != nil {
  1384. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1385. }
  1386. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1387. for _, item := range copy {
  1388. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1389. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1390. if strings.HasSuffix(item.Key, "/") {
  1391. source += "/"
  1392. }
  1393. if strings.HasSuffix(item.Value, "/") {
  1394. target += "/"
  1395. }
  1396. if err = conn.Copy(source, target); err != nil {
  1397. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1398. }
  1399. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1400. }
  1401. return nil
  1402. }
  1403. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1404. user dataprovider.User,
  1405. ) error {
  1406. user, err := getUserForEventAction(user)
  1407. if err != nil {
  1408. return err
  1409. }
  1410. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1411. err = user.CheckFsRoot(connectionID)
  1412. defer user.CloseFs() //nolint:errcheck
  1413. if err != nil {
  1414. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1415. }
  1416. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1417. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1418. if _, err = conn.DoStat(item, 0, false); err != nil {
  1419. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1420. }
  1421. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1422. }
  1423. return nil
  1424. }
  1425. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1426. conditions dataprovider.ConditionOptions, params *EventParams,
  1427. ) error {
  1428. users, err := params.getUsers()
  1429. if err != nil {
  1430. return fmt.Errorf("unable to get users: %w", err)
  1431. }
  1432. var failures []string
  1433. executed := 0
  1434. for _, user := range users {
  1435. // if sender is set, the conditions have already been evaluated
  1436. if params.sender == "" {
  1437. if !checkUserConditionOptions(&user, &conditions) {
  1438. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1439. user.Username)
  1440. continue
  1441. }
  1442. }
  1443. executed++
  1444. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1445. failures = append(failures, user.Username)
  1446. params.AddError(err)
  1447. continue
  1448. }
  1449. }
  1450. if len(failures) > 0 {
  1451. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1452. }
  1453. if executed == 0 {
  1454. eventManagerLog(logger.LevelError, "no rename executed")
  1455. return errors.New("no rename executed")
  1456. }
  1457. return nil
  1458. }
  1459. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1460. conditions dataprovider.ConditionOptions, params *EventParams,
  1461. ) error {
  1462. users, err := params.getUsers()
  1463. if err != nil {
  1464. return fmt.Errorf("unable to get users: %w", err)
  1465. }
  1466. var failures []string
  1467. var executed int
  1468. for _, user := range users {
  1469. // if sender is set, the conditions have already been evaluated
  1470. if params.sender == "" {
  1471. if !checkUserConditionOptions(&user, &conditions) {
  1472. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1473. user.Username)
  1474. continue
  1475. }
  1476. }
  1477. executed++
  1478. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1479. failures = append(failures, user.Username)
  1480. params.AddError(err)
  1481. continue
  1482. }
  1483. }
  1484. if len(failures) > 0 {
  1485. return fmt.Errorf("fs copy failed for users: %+v", failures)
  1486. }
  1487. if executed == 0 {
  1488. eventManagerLog(logger.LevelError, "no copy executed")
  1489. return errors.New("no copy executed")
  1490. }
  1491. return nil
  1492. }
  1493. func getArchiveBaseDir(paths []string) string {
  1494. var parentDirs []string
  1495. for _, p := range paths {
  1496. parentDirs = append(parentDirs, path.Dir(p))
  1497. }
  1498. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1499. baseDir := "/"
  1500. if len(parentDirs) == 1 {
  1501. baseDir = parentDirs[0]
  1502. }
  1503. return baseDir
  1504. }
  1505. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1506. if info.IsDir() {
  1507. var dirSize int64
  1508. entries, err := conn.ListDir(p)
  1509. if err != nil {
  1510. return 0, err
  1511. }
  1512. for _, entry := range entries {
  1513. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1514. if err != nil {
  1515. return 0, err
  1516. }
  1517. dirSize += size
  1518. }
  1519. return dirSize, nil
  1520. }
  1521. if info.Mode().IsRegular() {
  1522. return info.Size(), nil
  1523. }
  1524. return 0, nil
  1525. }
  1526. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1527. q, _ := conn.HasSpace(false, false, zipPath)
  1528. if q.HasSpace && q.GetRemainingSize() > 0 {
  1529. var size int64
  1530. for _, item := range paths {
  1531. info, err := conn.DoStat(item, 1, false)
  1532. if err != nil {
  1533. return size, err
  1534. }
  1535. itemSize, err := getSizeForPath(conn, item, info)
  1536. if err != nil {
  1537. return size, err
  1538. }
  1539. size += itemSize
  1540. }
  1541. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1542. // we assume the zip size will be half of the real size
  1543. return size / 2, nil
  1544. }
  1545. return -1, nil
  1546. }
  1547. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1548. user dataprovider.User,
  1549. ) error {
  1550. user, err := getUserForEventAction(user)
  1551. if err != nil {
  1552. return err
  1553. }
  1554. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1555. err = user.CheckFsRoot(connectionID)
  1556. defer user.CloseFs() //nolint:errcheck
  1557. if err != nil {
  1558. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1559. }
  1560. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1561. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1562. paths := make([]string, 0, len(c.Paths))
  1563. for idx := range c.Paths {
  1564. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1565. if p == name {
  1566. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1567. }
  1568. paths = append(paths, p)
  1569. }
  1570. paths = util.RemoveDuplicates(paths, false)
  1571. estimatedSize, err := estimateZipSize(conn, name, paths)
  1572. if err != nil {
  1573. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1574. return fmt.Errorf("unable to estimate archive size: %w", err)
  1575. }
  1576. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1577. if err != nil {
  1578. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1579. return fmt.Errorf("unable to create archive: %w", err)
  1580. }
  1581. defer cancelFn()
  1582. baseDir := getArchiveBaseDir(paths)
  1583. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1584. zipWriter := &zipWriterWrapper{
  1585. Name: name,
  1586. Writer: zip.NewWriter(writer),
  1587. Entries: make(map[string]bool),
  1588. }
  1589. for _, item := range paths {
  1590. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1591. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload) //nolint:errcheck
  1592. return err
  1593. }
  1594. }
  1595. if err := zipWriter.Writer.Close(); err != nil {
  1596. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1597. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload) //nolint:errcheck
  1598. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1599. }
  1600. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload)
  1601. }
  1602. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1603. params *EventParams,
  1604. ) error {
  1605. users, err := params.getUsers()
  1606. if err != nil {
  1607. return fmt.Errorf("unable to get users: %w", err)
  1608. }
  1609. var failures []string
  1610. executed := 0
  1611. for _, user := range users {
  1612. // if sender is set, the conditions have already been evaluated
  1613. if params.sender == "" {
  1614. if !checkUserConditionOptions(&user, &conditions) {
  1615. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1616. user.Username)
  1617. continue
  1618. }
  1619. }
  1620. executed++
  1621. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1622. failures = append(failures, user.Username)
  1623. params.AddError(err)
  1624. continue
  1625. }
  1626. }
  1627. if len(failures) > 0 {
  1628. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1629. }
  1630. if executed == 0 {
  1631. eventManagerLog(logger.LevelError, "no existence check executed")
  1632. return errors.New("no existence check executed")
  1633. }
  1634. return nil
  1635. }
  1636. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1637. conditions dataprovider.ConditionOptions, params *EventParams,
  1638. ) error {
  1639. users, err := params.getUsers()
  1640. if err != nil {
  1641. return fmt.Errorf("unable to get users: %w", err)
  1642. }
  1643. var failures []string
  1644. executed := 0
  1645. for _, user := range users {
  1646. // if sender is set, the conditions have already been evaluated
  1647. if params.sender == "" {
  1648. if !checkUserConditionOptions(&user, &conditions) {
  1649. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1650. user.Username)
  1651. continue
  1652. }
  1653. }
  1654. executed++
  1655. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1656. failures = append(failures, user.Username)
  1657. params.AddError(err)
  1658. continue
  1659. }
  1660. }
  1661. if len(failures) > 0 {
  1662. return fmt.Errorf("fs compress failed for users: %+v", failures)
  1663. }
  1664. if executed == 0 {
  1665. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1666. return errors.New("no file/folder compressed")
  1667. }
  1668. return nil
  1669. }
  1670. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1671. params *EventParams,
  1672. ) error {
  1673. addObjectData := false
  1674. replacements := params.getStringReplacements(addObjectData)
  1675. replacer := strings.NewReplacer(replacements...)
  1676. switch c.Type {
  1677. case dataprovider.FilesystemActionRename:
  1678. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1679. case dataprovider.FilesystemActionDelete:
  1680. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1681. case dataprovider.FilesystemActionMkdirs:
  1682. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1683. case dataprovider.FilesystemActionExist:
  1684. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1685. case dataprovider.FilesystemActionCompress:
  1686. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1687. case dataprovider.FilesystemActionCopy:
  1688. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1689. default:
  1690. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1691. }
  1692. }
  1693. func executeQuotaResetForUser(user *dataprovider.User) error {
  1694. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1695. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1696. user.Username, err)
  1697. return err
  1698. }
  1699. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1700. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1701. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1702. }
  1703. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1704. numFiles, size, err := user.ScanQuota()
  1705. if err != nil {
  1706. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1707. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1708. }
  1709. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1710. if err != nil {
  1711. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1712. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1713. }
  1714. return nil
  1715. }
  1716. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1717. users, err := params.getUsers()
  1718. if err != nil {
  1719. return fmt.Errorf("unable to get users: %w", err)
  1720. }
  1721. var failedResets []string
  1722. executed := 0
  1723. for _, user := range users {
  1724. // if sender is set, the conditions have already been evaluated
  1725. if params.sender == "" {
  1726. if !checkUserConditionOptions(&user, &conditions) {
  1727. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1728. user.Username)
  1729. continue
  1730. }
  1731. }
  1732. executed++
  1733. if err = executeQuotaResetForUser(&user); err != nil {
  1734. params.AddError(err)
  1735. failedResets = append(failedResets, user.Username)
  1736. continue
  1737. }
  1738. }
  1739. if len(failedResets) > 0 {
  1740. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1741. }
  1742. if executed == 0 {
  1743. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1744. return errors.New("no user quota reset executed")
  1745. }
  1746. return nil
  1747. }
  1748. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1749. folders, err := params.getFolders()
  1750. if err != nil {
  1751. return fmt.Errorf("unable to get folders: %w", err)
  1752. }
  1753. var failedResets []string
  1754. executed := 0
  1755. for _, folder := range folders {
  1756. // if sender is set, the conditions have already been evaluated
  1757. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1758. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1759. folder.Name)
  1760. continue
  1761. }
  1762. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1763. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1764. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1765. failedResets = append(failedResets, folder.Name)
  1766. continue
  1767. }
  1768. executed++
  1769. f := vfs.VirtualFolder{
  1770. BaseVirtualFolder: folder,
  1771. VirtualPath: "/",
  1772. }
  1773. numFiles, size, err := f.ScanQuota()
  1774. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1775. if err != nil {
  1776. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1777. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1778. failedResets = append(failedResets, folder.Name)
  1779. continue
  1780. }
  1781. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1782. if err != nil {
  1783. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1784. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1785. failedResets = append(failedResets, folder.Name)
  1786. }
  1787. }
  1788. if len(failedResets) > 0 {
  1789. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1790. }
  1791. if executed == 0 {
  1792. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1793. return errors.New("no folder quota reset executed")
  1794. }
  1795. return nil
  1796. }
  1797. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1798. users, err := params.getUsers()
  1799. if err != nil {
  1800. return fmt.Errorf("unable to get users: %w", err)
  1801. }
  1802. var failedResets []string
  1803. executed := 0
  1804. for _, user := range users {
  1805. // if sender is set, the conditions have already been evaluated
  1806. if params.sender == "" {
  1807. if !checkUserConditionOptions(&user, &conditions) {
  1808. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  1809. user.Username)
  1810. continue
  1811. }
  1812. }
  1813. executed++
  1814. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1815. if err != nil {
  1816. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1817. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1818. failedResets = append(failedResets, user.Username)
  1819. }
  1820. }
  1821. if len(failedResets) > 0 {
  1822. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1823. }
  1824. if executed == 0 {
  1825. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1826. return errors.New("no transfer quota reset executed")
  1827. }
  1828. return nil
  1829. }
  1830. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1831. params *EventParams, actionName string,
  1832. ) error {
  1833. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1834. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1835. user.Username, err)
  1836. return err
  1837. }
  1838. check := RetentionCheck{
  1839. Folders: folders,
  1840. }
  1841. c := RetentionChecks.Add(check, &user)
  1842. if c == nil {
  1843. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1844. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1845. }
  1846. defer func() {
  1847. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  1848. Username: user.Username,
  1849. ActionName: actionName,
  1850. Results: c.results,
  1851. })
  1852. }()
  1853. if err := c.Start(); err != nil {
  1854. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1855. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1856. }
  1857. return nil
  1858. }
  1859. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1860. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  1861. ) error {
  1862. users, err := params.getUsers()
  1863. if err != nil {
  1864. return fmt.Errorf("unable to get users: %w", err)
  1865. }
  1866. var failedChecks []string
  1867. executed := 0
  1868. for _, user := range users {
  1869. // if sender is set, the conditions have already been evaluated
  1870. if params.sender == "" {
  1871. if !checkUserConditionOptions(&user, &conditions) {
  1872. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  1873. user.Username)
  1874. continue
  1875. }
  1876. }
  1877. executed++
  1878. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  1879. failedChecks = append(failedChecks, user.Username)
  1880. params.AddError(err)
  1881. continue
  1882. }
  1883. }
  1884. if len(failedChecks) > 0 {
  1885. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1886. }
  1887. if executed == 0 {
  1888. eventManagerLog(logger.LevelError, "no retention check executed")
  1889. return errors.New("no retention check executed")
  1890. }
  1891. return nil
  1892. }
  1893. func executeMetadataCheckForUser(user *dataprovider.User) error {
  1894. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1895. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1896. user.Username, err)
  1897. return err
  1898. }
  1899. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  1900. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  1901. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  1902. }
  1903. defer ActiveMetadataChecks.Remove(user.Username)
  1904. if err := user.CheckMetadataConsistency(); err != nil {
  1905. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  1906. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  1907. }
  1908. return nil
  1909. }
  1910. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1911. users, err := params.getUsers()
  1912. if err != nil {
  1913. return fmt.Errorf("unable to get users: %w", err)
  1914. }
  1915. var failures []string
  1916. var executed int
  1917. for _, user := range users {
  1918. // if sender is set, the conditions have already been evaluated
  1919. if params.sender == "" {
  1920. if !checkUserConditionOptions(&user, &conditions) {
  1921. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  1922. user.Username)
  1923. continue
  1924. }
  1925. }
  1926. executed++
  1927. if err = executeMetadataCheckForUser(&user); err != nil {
  1928. params.AddError(err)
  1929. failures = append(failures, user.Username)
  1930. continue
  1931. }
  1932. }
  1933. if len(failures) > 0 {
  1934. return fmt.Errorf("metadata check failed for users: %+v", failures)
  1935. }
  1936. if executed == 0 {
  1937. eventManagerLog(logger.LevelError, "no metadata check executed")
  1938. return errors.New("no metadata check executed")
  1939. }
  1940. return nil
  1941. }
  1942. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  1943. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1944. eventManagerLog(logger.LevelError, "skipping password expiration check for user %s, cannot apply group settings: %v",
  1945. user.Username, err)
  1946. return err
  1947. }
  1948. if user.Filters.PasswordExpiration == 0 {
  1949. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  1950. return nil
  1951. }
  1952. days := user.PasswordExpiresIn()
  1953. if days > config.Threshold {
  1954. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  1955. user.Username, days, config.Threshold)
  1956. return nil
  1957. }
  1958. body := new(bytes.Buffer)
  1959. data := make(map[string]any)
  1960. data["Username"] = user.Username
  1961. data["Days"] = days
  1962. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  1963. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  1964. user.Username, err)
  1965. return err
  1966. }
  1967. subject := "SFTPGo password expiration notification"
  1968. startTime := time.Now()
  1969. if err := smtp.SendEmail([]string{user.Email}, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  1970. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  1971. user.Username, err, time.Since(startTime))
  1972. return err
  1973. }
  1974. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  1975. user.Username, days, time.Since(startTime))
  1976. return nil
  1977. }
  1978. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  1979. params *EventParams) error {
  1980. users, err := params.getUsers()
  1981. if err != nil {
  1982. return fmt.Errorf("unable to get users: %w", err)
  1983. }
  1984. var failures []string
  1985. for _, user := range users {
  1986. // if sender is set, the conditions have already been evaluated
  1987. if params.sender == "" {
  1988. if !checkUserConditionOptions(&user, &conditions) {
  1989. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  1990. user.Username)
  1991. continue
  1992. }
  1993. }
  1994. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  1995. params.AddError(err)
  1996. failures = append(failures, user.Username)
  1997. continue
  1998. }
  1999. }
  2000. if len(failures) > 0 {
  2001. return fmt.Errorf("password expiration check failed for users: %+v", failures)
  2002. }
  2003. return nil
  2004. }
  2005. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2006. conditions dataprovider.ConditionOptions,
  2007. ) error {
  2008. var err error
  2009. switch action.Type {
  2010. case dataprovider.ActionTypeHTTP:
  2011. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2012. case dataprovider.ActionTypeCommand:
  2013. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2014. case dataprovider.ActionTypeEmail:
  2015. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2016. case dataprovider.ActionTypeBackup:
  2017. var backupPath string
  2018. backupPath, err = dataprovider.ExecuteBackup()
  2019. if err == nil {
  2020. params.setBackupParams(backupPath)
  2021. }
  2022. case dataprovider.ActionTypeUserQuotaReset:
  2023. err = executeUsersQuotaResetRuleAction(conditions, params)
  2024. case dataprovider.ActionTypeFolderQuotaReset:
  2025. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2026. case dataprovider.ActionTypeTransferQuotaReset:
  2027. err = executeTransferQuotaResetRuleAction(conditions, params)
  2028. case dataprovider.ActionTypeDataRetentionCheck:
  2029. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2030. case dataprovider.ActionTypeMetadataCheck:
  2031. err = executeMetadataCheckRuleAction(conditions, params)
  2032. case dataprovider.ActionTypeFilesystem:
  2033. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2034. case dataprovider.ActionTypePasswordExpirationCheck:
  2035. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2036. default:
  2037. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2038. }
  2039. if err != nil {
  2040. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2041. }
  2042. params.AddError(err)
  2043. return err
  2044. }
  2045. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2046. var errRes error
  2047. for _, rule := range rules {
  2048. var failedActions []string
  2049. paramsCopy := params.getACopy()
  2050. for _, action := range rule.Actions {
  2051. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2052. startTime := time.Now()
  2053. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2054. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2055. action.Name, rule.Name, time.Since(startTime), err)
  2056. failedActions = append(failedActions, action.Name)
  2057. // we return the last error, it is ok for now
  2058. errRes = err
  2059. if action.Options.StopOnFailure {
  2060. break
  2061. }
  2062. } else {
  2063. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2064. action.Name, rule.Name, time.Since(startTime))
  2065. }
  2066. }
  2067. }
  2068. // execute async actions if any, including failure actions
  2069. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2070. }
  2071. return errRes
  2072. }
  2073. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2074. eventManager.addAsyncTask()
  2075. defer eventManager.removeAsyncTask()
  2076. for _, rule := range rules {
  2077. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2078. }
  2079. }
  2080. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2081. for _, action := range rule.Actions {
  2082. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2083. startTime := time.Now()
  2084. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2085. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2086. action.Name, rule.Name, time.Since(startTime), err)
  2087. failedActions = append(failedActions, action.Name)
  2088. if action.Options.StopOnFailure {
  2089. break
  2090. }
  2091. } else {
  2092. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2093. action.Name, rule.Name, time.Since(startTime))
  2094. }
  2095. }
  2096. }
  2097. if len(failedActions) > 0 {
  2098. params.updateStatusFromError = false
  2099. // execute failure actions
  2100. for _, action := range rule.Actions {
  2101. if action.Options.IsFailureAction {
  2102. startTime := time.Now()
  2103. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2104. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2105. action.Name, rule.Name, time.Since(startTime), err)
  2106. if action.Options.StopOnFailure {
  2107. break
  2108. }
  2109. } else {
  2110. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2111. action.Name, rule.Name, time.Since(startTime))
  2112. }
  2113. }
  2114. }
  2115. }
  2116. }
  2117. type eventCronJob struct {
  2118. ruleName string
  2119. }
  2120. func (j *eventCronJob) getTask(rule dataprovider.EventRule) (dataprovider.Task, error) {
  2121. if rule.GuardFromConcurrentExecution() {
  2122. task, err := dataprovider.GetTaskByName(rule.Name)
  2123. if err != nil {
  2124. if _, ok := err.(*util.RecordNotFoundError); ok {
  2125. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2126. task = dataprovider.Task{
  2127. Name: rule.Name,
  2128. UpdateAt: 0,
  2129. Version: 0,
  2130. }
  2131. err = dataprovider.AddTask(rule.Name)
  2132. if err != nil {
  2133. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2134. return task, err
  2135. }
  2136. } else {
  2137. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2138. }
  2139. }
  2140. return task, err
  2141. }
  2142. return dataprovider.Task{}, nil
  2143. }
  2144. func (j *eventCronJob) Run() {
  2145. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2146. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2147. if err != nil {
  2148. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2149. return
  2150. }
  2151. if err = rule.CheckActionsConsistency(""); err != nil {
  2152. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2153. return
  2154. }
  2155. task, err := j.getTask(rule)
  2156. if err != nil {
  2157. return
  2158. }
  2159. if task.Name != "" {
  2160. updateInterval := 5 * time.Minute
  2161. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2162. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2163. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2164. return
  2165. }
  2166. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2167. if err != nil {
  2168. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2169. rule.Name, err)
  2170. return
  2171. }
  2172. ticker := time.NewTicker(updateInterval)
  2173. done := make(chan bool)
  2174. defer func() {
  2175. done <- true
  2176. ticker.Stop()
  2177. }()
  2178. go func(taskName string) {
  2179. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2180. for {
  2181. select {
  2182. case <-done:
  2183. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2184. return
  2185. case <-ticker.C:
  2186. err := dataprovider.UpdateTaskTimestamp(taskName)
  2187. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2188. }
  2189. }
  2190. }(task.Name)
  2191. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2192. } else {
  2193. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2194. }
  2195. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2196. }
  2197. type zipWriterWrapper struct {
  2198. Name string
  2199. Entries map[string]bool
  2200. Writer *zip.Writer
  2201. }
  2202. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2203. logger.Log(level, "eventmanager", "", format, v...)
  2204. }