eventmanager.go 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/csv"
  19. "errors"
  20. "fmt"
  21. "io"
  22. "mime"
  23. "mime/multipart"
  24. "net/http"
  25. "net/textproto"
  26. "net/url"
  27. "os"
  28. "os/exec"
  29. "path"
  30. "path/filepath"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/bmatcuk/doublestar/v4"
  37. "github.com/klauspost/compress/zip"
  38. "github.com/robfig/cron/v3"
  39. "github.com/rs/xid"
  40. "github.com/sftpgo/sdk"
  41. "github.com/wneessen/go-mail"
  42. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  43. "github.com/drakkan/sftpgo/v2/internal/logger"
  44. "github.com/drakkan/sftpgo/v2/internal/plugin"
  45. "github.com/drakkan/sftpgo/v2/internal/smtp"
  46. "github.com/drakkan/sftpgo/v2/internal/util"
  47. "github.com/drakkan/sftpgo/v2/internal/vfs"
  48. )
  49. const (
  50. ipBlockedEventName = "IP Blocked"
  51. maxAttachmentsSize = int64(10 * 1024 * 1024)
  52. )
  53. var (
  54. // eventManager handle the supported event rules actions
  55. eventManager eventRulesContainer
  56. multipartQuoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  57. )
  58. func init() {
  59. eventManager = eventRulesContainer{
  60. schedulesMapping: make(map[string][]cron.EntryID),
  61. // arbitrary maximum number of concurrent asynchronous tasks,
  62. // each task could execute multiple actions
  63. concurrencyGuard: make(chan struct{}, 200),
  64. }
  65. dataprovider.SetEventRulesCallbacks(eventManager.loadRules, eventManager.RemoveRule,
  66. func(operation, executor, ip, objectType, objectName, role string, object plugin.Renderer) {
  67. eventManager.handleProviderEvent(EventParams{
  68. Name: executor,
  69. ObjectName: objectName,
  70. Event: operation,
  71. Status: 1,
  72. ObjectType: objectType,
  73. IP: ip,
  74. Role: role,
  75. Timestamp: time.Now().UnixNano(),
  76. Object: object,
  77. })
  78. })
  79. }
  80. // HandleCertificateEvent checks and executes action rules for certificate events
  81. func HandleCertificateEvent(params EventParams) {
  82. eventManager.handleCertificateEvent(params)
  83. }
  84. // eventRulesContainer stores event rules by trigger
  85. type eventRulesContainer struct {
  86. sync.RWMutex
  87. lastLoad atomic.Int64
  88. FsEvents []dataprovider.EventRule
  89. ProviderEvents []dataprovider.EventRule
  90. Schedules []dataprovider.EventRule
  91. IPBlockedEvents []dataprovider.EventRule
  92. CertificateEvents []dataprovider.EventRule
  93. schedulesMapping map[string][]cron.EntryID
  94. concurrencyGuard chan struct{}
  95. }
  96. func (r *eventRulesContainer) addAsyncTask() {
  97. activeHooks.Add(1)
  98. r.concurrencyGuard <- struct{}{}
  99. }
  100. func (r *eventRulesContainer) removeAsyncTask() {
  101. activeHooks.Add(-1)
  102. <-r.concurrencyGuard
  103. }
  104. func (r *eventRulesContainer) getLastLoadTime() int64 {
  105. return r.lastLoad.Load()
  106. }
  107. func (r *eventRulesContainer) setLastLoadTime(modTime int64) {
  108. r.lastLoad.Store(modTime)
  109. }
  110. // RemoveRule deletes the rule with the specified name
  111. func (r *eventRulesContainer) RemoveRule(name string) {
  112. r.Lock()
  113. defer r.Unlock()
  114. r.removeRuleInternal(name)
  115. eventManagerLog(logger.LevelDebug, "event rules updated after delete, fs events: %d, provider events: %d, schedules: %d",
  116. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules))
  117. }
  118. func (r *eventRulesContainer) removeRuleInternal(name string) {
  119. for idx := range r.FsEvents {
  120. if r.FsEvents[idx].Name == name {
  121. lastIdx := len(r.FsEvents) - 1
  122. r.FsEvents[idx] = r.FsEvents[lastIdx]
  123. r.FsEvents = r.FsEvents[:lastIdx]
  124. eventManagerLog(logger.LevelDebug, "removed rule %q from fs events", name)
  125. return
  126. }
  127. }
  128. for idx := range r.ProviderEvents {
  129. if r.ProviderEvents[idx].Name == name {
  130. lastIdx := len(r.ProviderEvents) - 1
  131. r.ProviderEvents[idx] = r.ProviderEvents[lastIdx]
  132. r.ProviderEvents = r.ProviderEvents[:lastIdx]
  133. eventManagerLog(logger.LevelDebug, "removed rule %q from provider events", name)
  134. return
  135. }
  136. }
  137. for idx := range r.IPBlockedEvents {
  138. if r.IPBlockedEvents[idx].Name == name {
  139. lastIdx := len(r.IPBlockedEvents) - 1
  140. r.IPBlockedEvents[idx] = r.IPBlockedEvents[lastIdx]
  141. r.IPBlockedEvents = r.IPBlockedEvents[:lastIdx]
  142. eventManagerLog(logger.LevelDebug, "removed rule %q from IP blocked events", name)
  143. return
  144. }
  145. }
  146. for idx := range r.CertificateEvents {
  147. if r.CertificateEvents[idx].Name == name {
  148. lastIdx := len(r.CertificateEvents) - 1
  149. r.CertificateEvents[idx] = r.CertificateEvents[lastIdx]
  150. r.CertificateEvents = r.CertificateEvents[:lastIdx]
  151. eventManagerLog(logger.LevelDebug, "removed rule %q from certificate events", name)
  152. return
  153. }
  154. }
  155. for idx := range r.Schedules {
  156. if r.Schedules[idx].Name == name {
  157. if schedules, ok := r.schedulesMapping[name]; ok {
  158. for _, entryID := range schedules {
  159. eventManagerLog(logger.LevelDebug, "removing scheduled entry id %d for rule %q", entryID, name)
  160. eventScheduler.Remove(entryID)
  161. }
  162. delete(r.schedulesMapping, name)
  163. }
  164. lastIdx := len(r.Schedules) - 1
  165. r.Schedules[idx] = r.Schedules[lastIdx]
  166. r.Schedules = r.Schedules[:lastIdx]
  167. eventManagerLog(logger.LevelDebug, "removed rule %q from scheduled events", name)
  168. return
  169. }
  170. }
  171. }
  172. func (r *eventRulesContainer) addUpdateRuleInternal(rule dataprovider.EventRule) {
  173. r.removeRuleInternal(rule.Name)
  174. if rule.DeletedAt > 0 {
  175. deletedAt := util.GetTimeFromMsecSinceEpoch(rule.DeletedAt)
  176. if deletedAt.Add(30 * time.Minute).Before(time.Now()) {
  177. eventManagerLog(logger.LevelDebug, "removing rule %q deleted at %s", rule.Name, deletedAt)
  178. go dataprovider.RemoveEventRule(rule) //nolint:errcheck
  179. }
  180. return
  181. }
  182. if rule.Status != 1 || rule.Trigger == dataprovider.EventTriggerOnDemand {
  183. return
  184. }
  185. switch rule.Trigger {
  186. case dataprovider.EventTriggerFsEvent:
  187. r.FsEvents = append(r.FsEvents, rule)
  188. eventManagerLog(logger.LevelDebug, "added rule %q to fs events", rule.Name)
  189. case dataprovider.EventTriggerProviderEvent:
  190. r.ProviderEvents = append(r.ProviderEvents, rule)
  191. eventManagerLog(logger.LevelDebug, "added rule %q to provider events", rule.Name)
  192. case dataprovider.EventTriggerIPBlocked:
  193. r.IPBlockedEvents = append(r.IPBlockedEvents, rule)
  194. eventManagerLog(logger.LevelDebug, "added rule %q to IP blocked events", rule.Name)
  195. case dataprovider.EventTriggerCertificate:
  196. r.CertificateEvents = append(r.CertificateEvents, rule)
  197. eventManagerLog(logger.LevelDebug, "added rule %q to certificate events", rule.Name)
  198. case dataprovider.EventTriggerSchedule:
  199. for _, schedule := range rule.Conditions.Schedules {
  200. cronSpec := schedule.GetCronSpec()
  201. job := &eventCronJob{
  202. ruleName: dataprovider.ConvertName(rule.Name),
  203. }
  204. entryID, err := eventScheduler.AddJob(cronSpec, job)
  205. if err != nil {
  206. eventManagerLog(logger.LevelError, "unable to add scheduled rule %q, cron string %q: %v", rule.Name, cronSpec, err)
  207. return
  208. }
  209. r.schedulesMapping[rule.Name] = append(r.schedulesMapping[rule.Name], entryID)
  210. eventManagerLog(logger.LevelDebug, "schedule for rule %q added, id: %d, cron string %q, active scheduling rules: %d",
  211. rule.Name, entryID, cronSpec, len(r.schedulesMapping))
  212. }
  213. r.Schedules = append(r.Schedules, rule)
  214. eventManagerLog(logger.LevelDebug, "added rule %q to scheduled events", rule.Name)
  215. default:
  216. eventManagerLog(logger.LevelError, "unsupported trigger: %d", rule.Trigger)
  217. }
  218. }
  219. func (r *eventRulesContainer) loadRules() {
  220. eventManagerLog(logger.LevelDebug, "loading updated rules")
  221. modTime := util.GetTimeAsMsSinceEpoch(time.Now())
  222. rules, err := dataprovider.GetRecentlyUpdatedRules(r.getLastLoadTime())
  223. if err != nil {
  224. eventManagerLog(logger.LevelError, "unable to load event rules: %v", err)
  225. return
  226. }
  227. eventManagerLog(logger.LevelDebug, "recently updated event rules loaded: %d", len(rules))
  228. if len(rules) > 0 {
  229. r.Lock()
  230. defer r.Unlock()
  231. for _, rule := range rules {
  232. r.addUpdateRuleInternal(rule)
  233. }
  234. }
  235. eventManagerLog(logger.LevelDebug, "event rules updated, fs events: %d, provider events: %d, schedules: %d, ip blocked events: %d, certificate events: %d",
  236. len(r.FsEvents), len(r.ProviderEvents), len(r.Schedules), len(r.IPBlockedEvents), len(r.CertificateEvents))
  237. r.setLastLoadTime(modTime)
  238. }
  239. func (r *eventRulesContainer) checkProviderEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  240. if !util.Contains(conditions.ProviderEvents, params.Event) {
  241. return false
  242. }
  243. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  244. return false
  245. }
  246. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  247. return false
  248. }
  249. if len(conditions.Options.ProviderObjects) > 0 && !util.Contains(conditions.Options.ProviderObjects, params.ObjectType) {
  250. return false
  251. }
  252. return true
  253. }
  254. func (r *eventRulesContainer) checkFsEventMatch(conditions dataprovider.EventConditions, params EventParams) bool {
  255. if !util.Contains(conditions.FsEvents, params.Event) {
  256. return false
  257. }
  258. if !checkEventConditionPatterns(params.Name, conditions.Options.Names) {
  259. return false
  260. }
  261. if !checkEventConditionPatterns(params.Role, conditions.Options.RoleNames) {
  262. return false
  263. }
  264. if !checkEventGroupConditionPatters(params.Groups, conditions.Options.GroupNames) {
  265. return false
  266. }
  267. if !checkEventConditionPatterns(params.VirtualPath, conditions.Options.FsPaths) {
  268. return false
  269. }
  270. if len(conditions.Options.Protocols) > 0 && !util.Contains(conditions.Options.Protocols, params.Protocol) {
  271. return false
  272. }
  273. if params.Event == operationUpload || params.Event == operationDownload {
  274. if conditions.Options.MinFileSize > 0 {
  275. if params.FileSize < conditions.Options.MinFileSize {
  276. return false
  277. }
  278. }
  279. if conditions.Options.MaxFileSize > 0 {
  280. if params.FileSize > conditions.Options.MaxFileSize {
  281. return false
  282. }
  283. }
  284. }
  285. return true
  286. }
  287. // hasFsRules returns true if there are any rules for filesystem event triggers
  288. func (r *eventRulesContainer) hasFsRules() bool {
  289. r.RLock()
  290. defer r.RUnlock()
  291. return len(r.FsEvents) > 0
  292. }
  293. // handleFsEvent executes the rules actions defined for the specified event.
  294. // The boolean parameter indicates whether a sync action was executed
  295. func (r *eventRulesContainer) handleFsEvent(params EventParams) (bool, error) {
  296. if params.Protocol == protocolEventAction {
  297. return false, nil
  298. }
  299. r.RLock()
  300. var rulesWithSyncActions, rulesAsync []dataprovider.EventRule
  301. for _, rule := range r.FsEvents {
  302. if r.checkFsEventMatch(rule.Conditions, params) {
  303. if err := rule.CheckActionsConsistency(""); err != nil {
  304. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  305. rule.Name, err, params.Event)
  306. continue
  307. }
  308. hasSyncActions := false
  309. for _, action := range rule.Actions {
  310. if action.Options.ExecuteSync {
  311. hasSyncActions = true
  312. break
  313. }
  314. }
  315. if hasSyncActions {
  316. rulesWithSyncActions = append(rulesWithSyncActions, rule)
  317. } else {
  318. rulesAsync = append(rulesAsync, rule)
  319. }
  320. }
  321. }
  322. r.RUnlock()
  323. params.sender = params.Name
  324. if len(rulesAsync) > 0 {
  325. go executeAsyncRulesActions(rulesAsync, params)
  326. }
  327. if len(rulesWithSyncActions) > 0 {
  328. return true, executeSyncRulesActions(rulesWithSyncActions, params)
  329. }
  330. return false, nil
  331. }
  332. // username is populated for user objects
  333. func (r *eventRulesContainer) handleProviderEvent(params EventParams) {
  334. r.RLock()
  335. defer r.RUnlock()
  336. var rules []dataprovider.EventRule
  337. for _, rule := range r.ProviderEvents {
  338. if r.checkProviderEventMatch(rule.Conditions, params) {
  339. if err := rule.CheckActionsConsistency(params.ObjectType); err == nil {
  340. rules = append(rules, rule)
  341. } else {
  342. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q object type %q",
  343. rule.Name, err, params.Event, params.ObjectType)
  344. }
  345. }
  346. }
  347. if len(rules) > 0 {
  348. params.sender = params.ObjectName
  349. go executeAsyncRulesActions(rules, params)
  350. }
  351. }
  352. func (r *eventRulesContainer) handleIPBlockedEvent(params EventParams) {
  353. r.RLock()
  354. defer r.RUnlock()
  355. if len(r.IPBlockedEvents) == 0 {
  356. return
  357. }
  358. var rules []dataprovider.EventRule
  359. for _, rule := range r.IPBlockedEvents {
  360. if err := rule.CheckActionsConsistency(""); err == nil {
  361. rules = append(rules, rule)
  362. } else {
  363. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  364. rule.Name, err, params.Event)
  365. }
  366. }
  367. if len(rules) > 0 {
  368. go executeAsyncRulesActions(rules, params)
  369. }
  370. }
  371. func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
  372. r.RLock()
  373. defer r.RUnlock()
  374. if len(r.CertificateEvents) == 0 {
  375. return
  376. }
  377. var rules []dataprovider.EventRule
  378. for _, rule := range r.CertificateEvents {
  379. if err := rule.CheckActionsConsistency(""); err == nil {
  380. rules = append(rules, rule)
  381. } else {
  382. eventManagerLog(logger.LevelWarn, "rule %q skipped: %v, event %q",
  383. rule.Name, err, params.Event)
  384. }
  385. }
  386. if len(rules) > 0 {
  387. go executeAsyncRulesActions(rules, params)
  388. }
  389. }
  390. type executedRetentionCheck struct {
  391. Username string
  392. ActionName string
  393. Results []folderRetentionCheckResult
  394. }
  395. // EventParams defines the supported event parameters
  396. type EventParams struct {
  397. Name string
  398. Groups []sdk.GroupMapping
  399. Event string
  400. Status int
  401. VirtualPath string
  402. FsPath string
  403. VirtualTargetPath string
  404. FsTargetPath string
  405. ObjectName string
  406. ObjectType string
  407. FileSize int64
  408. Elapsed int64
  409. Protocol string
  410. IP string
  411. Role string
  412. Timestamp int64
  413. Object plugin.Renderer
  414. sender string
  415. updateStatusFromError bool
  416. errors []string
  417. retentionChecks []executedRetentionCheck
  418. }
  419. func (p *EventParams) getACopy() *EventParams {
  420. params := *p
  421. params.errors = make([]string, len(p.errors))
  422. copy(params.errors, p.errors)
  423. retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
  424. for _, c := range p.retentionChecks {
  425. executedCheck := executedRetentionCheck{
  426. Username: c.Username,
  427. ActionName: c.ActionName,
  428. }
  429. executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
  430. copy(executedCheck.Results, c.Results)
  431. retentionChecks = append(retentionChecks, executedCheck)
  432. }
  433. params.retentionChecks = retentionChecks
  434. return &params
  435. }
  436. // AddError adds a new error to the event params and update the status if needed
  437. func (p *EventParams) AddError(err error) {
  438. if err == nil {
  439. return
  440. }
  441. if p.updateStatusFromError && p.Status == 1 {
  442. p.Status = 2
  443. }
  444. p.errors = append(p.errors, err.Error())
  445. }
  446. func (p *EventParams) setBackupParams(backupPath string) {
  447. if p.sender != "" {
  448. return
  449. }
  450. p.sender = dataprovider.ActionExecutorSystem
  451. p.FsPath = backupPath
  452. p.ObjectName = filepath.Base(backupPath)
  453. p.VirtualPath = "/" + p.ObjectName
  454. p.Timestamp = time.Now().UnixNano()
  455. info, err := os.Stat(backupPath)
  456. if err == nil {
  457. p.FileSize = info.Size()
  458. }
  459. }
  460. func (p *EventParams) getStatusString() string {
  461. switch p.Status {
  462. case 1:
  463. return "OK"
  464. default:
  465. return "KO"
  466. }
  467. }
  468. // getUsers returns users with group settings not applied
  469. func (p *EventParams) getUsers() ([]dataprovider.User, error) {
  470. if p.sender == "" {
  471. users, err := dataprovider.DumpUsers()
  472. if err != nil {
  473. eventManagerLog(logger.LevelError, "unable to get users: %+v", err)
  474. return users, errors.New("unable to get users")
  475. }
  476. return users, nil
  477. }
  478. user, err := p.getUserFromSender()
  479. if err != nil {
  480. return nil, err
  481. }
  482. return []dataprovider.User{user}, nil
  483. }
  484. func (p *EventParams) getUserFromSender() (dataprovider.User, error) {
  485. if p.sender == dataprovider.ActionExecutorSystem {
  486. return dataprovider.User{
  487. BaseUser: sdk.BaseUser{
  488. Status: 1,
  489. Username: p.sender,
  490. HomeDir: dataprovider.GetBackupsPath(),
  491. Permissions: map[string][]string{
  492. "/": {dataprovider.PermAny},
  493. },
  494. },
  495. }, nil
  496. }
  497. user, err := dataprovider.UserExists(p.sender, "")
  498. if err != nil {
  499. eventManagerLog(logger.LevelError, "unable to get user %q: %+v", p.sender, err)
  500. return user, fmt.Errorf("error getting user %q", p.sender)
  501. }
  502. return user, nil
  503. }
  504. func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
  505. if p.sender == "" {
  506. return dataprovider.DumpFolders()
  507. }
  508. folder, err := dataprovider.GetFolderByName(p.sender)
  509. if err != nil {
  510. return nil, fmt.Errorf("error getting folder %q: %w", p.sender, err)
  511. }
  512. return []vfs.BaseVirtualFolder{folder}, nil
  513. }
  514. func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
  515. if len(p.retentionChecks) == 0 {
  516. return nil, errors.New("no data retention report available")
  517. }
  518. var b bytes.Buffer
  519. if _, err := p.writeCompressedDataRetentionReports(&b); err != nil {
  520. return nil, err
  521. }
  522. return b.Bytes(), nil
  523. }
  524. func (p *EventParams) writeCompressedDataRetentionReports(w io.Writer) (int64, error) {
  525. var n int64
  526. wr := zip.NewWriter(w)
  527. for _, check := range p.retentionChecks {
  528. data, err := getCSVRetentionReport(check.Results)
  529. if err != nil {
  530. return n, fmt.Errorf("unable to get CSV report: %w", err)
  531. }
  532. dataSize := int64(len(data))
  533. n += dataSize
  534. // we suppose a 3:1 compression ratio
  535. if n > (maxAttachmentsSize * 3) {
  536. eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s",
  537. util.ByteCountIEC(n))
  538. return n, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(n))
  539. }
  540. fh := &zip.FileHeader{
  541. Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
  542. Method: zip.Deflate,
  543. Modified: time.Now().UTC(),
  544. }
  545. f, err := wr.CreateHeader(fh)
  546. if err != nil {
  547. return n, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
  548. }
  549. _, err = io.CopyN(f, bytes.NewBuffer(data), dataSize)
  550. if err != nil {
  551. return n, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
  552. }
  553. }
  554. if err := wr.Close(); err != nil {
  555. return n, fmt.Errorf("unable to close zip writer: %w", err)
  556. }
  557. return n, nil
  558. }
  559. func (p *EventParams) getRetentionReportsAsMailAttachment() (*mail.File, error) {
  560. if len(p.retentionChecks) == 0 {
  561. return nil, errors.New("no data retention report available")
  562. }
  563. return &mail.File{
  564. Name: "retention-reports.zip",
  565. Header: make(map[string][]string),
  566. Writer: p.writeCompressedDataRetentionReports,
  567. }, nil
  568. }
  569. func (p *EventParams) getStringReplacements(addObjectData bool) []string {
  570. replacements := []string{
  571. "{{Name}}", p.Name,
  572. "{{Event}}", p.Event,
  573. "{{Status}}", fmt.Sprintf("%d", p.Status),
  574. "{{VirtualPath}}", p.VirtualPath,
  575. "{{FsPath}}", p.FsPath,
  576. "{{VirtualTargetPath}}", p.VirtualTargetPath,
  577. "{{FsTargetPath}}", p.FsTargetPath,
  578. "{{ObjectName}}", p.ObjectName,
  579. "{{ObjectType}}", p.ObjectType,
  580. "{{FileSize}}", fmt.Sprintf("%d", p.FileSize),
  581. "{{Elapsed}}", fmt.Sprintf("%d", p.Elapsed),
  582. "{{Protocol}}", p.Protocol,
  583. "{{IP}}", p.IP,
  584. "{{Role}}", p.Role,
  585. "{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
  586. "{{StatusString}}", p.getStatusString(),
  587. }
  588. if p.VirtualPath != "" {
  589. replacements = append(replacements, "{{VirtualDirPath}}", path.Dir(p.VirtualPath))
  590. }
  591. if p.VirtualTargetPath != "" {
  592. replacements = append(replacements, "{{VirtualTargetDirPath}}", path.Dir(p.VirtualTargetPath))
  593. replacements = append(replacements, "{{TargetName}}", path.Base(p.VirtualTargetPath))
  594. }
  595. if len(p.errors) > 0 {
  596. replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
  597. } else {
  598. replacements = append(replacements, "{{ErrorString}}", "")
  599. }
  600. replacements = append(replacements, "{{ObjectData}}", "")
  601. if addObjectData {
  602. data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
  603. if err == nil {
  604. replacements[len(replacements)-1] = string(data)
  605. }
  606. }
  607. return replacements
  608. }
  609. func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
  610. var b bytes.Buffer
  611. csvWriter := csv.NewWriter(&b)
  612. err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
  613. "elapsed (ms)", "info", "error"})
  614. if err != nil {
  615. return nil, err
  616. }
  617. for _, result := range results {
  618. err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
  619. strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
  620. result.Info, result.Error})
  621. if err != nil {
  622. return nil, err
  623. }
  624. }
  625. csvWriter.Flush()
  626. err = csvWriter.Error()
  627. return b.Bytes(), err
  628. }
  629. func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSourcePath, virtualTargetPath string,
  630. numFiles int, truncatedSize int64, errTransfer error, operation string, startTime time.Time,
  631. ) error {
  632. errWrite := w.Close()
  633. targetPath := virtualSourcePath
  634. if virtualTargetPath != "" {
  635. targetPath = virtualTargetPath
  636. }
  637. info, err := conn.doStatInternal(targetPath, 0, false, false)
  638. if err == nil {
  639. updateUserQuotaAfterFileWrite(conn, targetPath, numFiles, info.Size()-truncatedSize)
  640. var fsSrcPath, fsDstPath string
  641. var errSrcFs, errDstFs error
  642. if virtualSourcePath != "" {
  643. _, fsSrcPath, errSrcFs = conn.GetFsAndResolvedPath(virtualSourcePath)
  644. }
  645. if virtualTargetPath != "" {
  646. _, fsDstPath, errDstFs = conn.GetFsAndResolvedPath(virtualTargetPath)
  647. }
  648. if errSrcFs == nil && errDstFs == nil {
  649. elapsed := time.Since(startTime).Nanoseconds() / 1000000
  650. if errTransfer == nil {
  651. errTransfer = errWrite
  652. }
  653. if operation == operationCopy {
  654. logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
  655. "", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
  656. }
  657. ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck
  658. }
  659. } else {
  660. eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
  661. }
  662. if errTransfer != nil {
  663. return errTransfer
  664. }
  665. return errWrite
  666. }
  667. func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
  668. vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
  669. if err != nil {
  670. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  671. return
  672. }
  673. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
  674. if vfolder.IsIncludedInUserQuota() {
  675. dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
  676. }
  677. }
  678. func checkWriterPermsAndQuota(conn *BaseConnection, virtualPath string, numFiles int, expectedSize, truncatedSize int64) error {
  679. if numFiles == 0 {
  680. if !conn.User.HasPerm(dataprovider.PermOverwrite, path.Dir(virtualPath)) {
  681. return conn.GetPermissionDeniedError()
  682. }
  683. } else {
  684. if !conn.User.HasPerm(dataprovider.PermUpload, path.Dir(virtualPath)) {
  685. return conn.GetPermissionDeniedError()
  686. }
  687. }
  688. q, _ := conn.HasSpace(numFiles > 0, false, virtualPath)
  689. if !q.HasSpace {
  690. return conn.GetQuotaExceededError()
  691. }
  692. if expectedSize != -1 {
  693. sizeDiff := expectedSize - truncatedSize
  694. if sizeDiff > 0 {
  695. remainingSize := q.GetRemainingSize()
  696. if remainingSize > 0 && remainingSize < sizeDiff {
  697. return conn.GetQuotaExceededError()
  698. }
  699. }
  700. }
  701. return nil
  702. }
  703. func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64) (io.WriteCloser, int, int64, func(), error) {
  704. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  705. if err != nil {
  706. return nil, 0, 0, nil, err
  707. }
  708. var truncatedSize, fileSize int64
  709. numFiles := 1
  710. isFileOverwrite := false
  711. info, err := fs.Lstat(fsPath)
  712. if err == nil {
  713. fileSize = info.Size()
  714. if info.IsDir() {
  715. return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
  716. }
  717. if info.Mode().IsRegular() {
  718. isFileOverwrite = true
  719. truncatedSize = fileSize
  720. }
  721. numFiles = 0
  722. }
  723. if err != nil && !fs.IsNotExist(err) {
  724. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  725. }
  726. if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
  727. return nil, numFiles, truncatedSize, nil, err
  728. }
  729. f, w, cancelFn, err := fs.Create(fsPath, 0)
  730. if err != nil {
  731. return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
  732. }
  733. vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
  734. if isFileOverwrite {
  735. if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
  736. updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
  737. truncatedSize = 0
  738. }
  739. }
  740. if cancelFn == nil {
  741. cancelFn = func() {}
  742. }
  743. if f != nil {
  744. return f, numFiles, truncatedSize, cancelFn, nil
  745. }
  746. return w, numFiles, truncatedSize, cancelFn, nil
  747. }
  748. func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
  749. if entryPath == wr.Name {
  750. // skip the archive itself
  751. return nil
  752. }
  753. info, err := conn.DoStat(entryPath, 1, false)
  754. if err != nil {
  755. eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
  756. return err
  757. }
  758. entryName, err := getZipEntryName(entryPath, baseDir)
  759. if err != nil {
  760. eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
  761. return err
  762. }
  763. if _, ok := wr.Entries[entryName]; ok {
  764. eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
  765. return nil
  766. }
  767. wr.Entries[entryName] = true
  768. if info.IsDir() {
  769. _, err = wr.Writer.CreateHeader(&zip.FileHeader{
  770. Name: entryName + "/",
  771. Method: zip.Deflate,
  772. Modified: info.ModTime(),
  773. })
  774. if err != nil {
  775. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  776. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  777. }
  778. contents, err := conn.ListDir(entryPath)
  779. if err != nil {
  780. eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
  781. return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
  782. }
  783. for _, info := range contents {
  784. fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
  785. if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
  786. eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
  787. return err
  788. }
  789. }
  790. return nil
  791. }
  792. if !info.Mode().IsRegular() {
  793. // we only allow regular files
  794. eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
  795. return nil
  796. }
  797. reader, cancelFn, err := getFileReader(conn, entryPath)
  798. if err != nil {
  799. eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
  800. return fmt.Errorf("unable to open %q: %w", entryPath, err)
  801. }
  802. defer cancelFn()
  803. defer reader.Close()
  804. f, err := wr.Writer.CreateHeader(&zip.FileHeader{
  805. Name: entryName,
  806. Method: zip.Deflate,
  807. Modified: info.ModTime(),
  808. })
  809. if err != nil {
  810. eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
  811. return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
  812. }
  813. _, err = io.Copy(f, reader)
  814. return err
  815. }
  816. func getZipEntryName(entryPath, baseDir string) (string, error) {
  817. if !strings.HasPrefix(entryPath, baseDir) {
  818. return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
  819. }
  820. entryPath = strings.TrimPrefix(entryPath, baseDir)
  821. return strings.TrimPrefix(entryPath, "/"), nil
  822. }
  823. func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
  824. if !conn.User.HasPerm(dataprovider.PermDownload, path.Dir(virtualPath)) {
  825. return nil, nil, conn.GetPermissionDeniedError()
  826. }
  827. fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
  828. if err != nil {
  829. return nil, nil, err
  830. }
  831. f, r, cancelFn, err := fs.Open(fsPath, 0)
  832. if err != nil {
  833. return nil, nil, conn.GetFsError(fs, err)
  834. }
  835. if cancelFn == nil {
  836. cancelFn = func() {}
  837. }
  838. if f != nil {
  839. return f, cancelFn, nil
  840. }
  841. return r, cancelFn, nil
  842. }
  843. func writeFileContent(conn *BaseConnection, virtualPath string, w io.Writer) error {
  844. reader, cancelFn, err := getFileReader(conn, virtualPath)
  845. if err != nil {
  846. return err
  847. }
  848. defer cancelFn()
  849. defer reader.Close()
  850. _, err = io.Copy(w, reader)
  851. return err
  852. }
  853. func getFileContentFn(conn *BaseConnection, virtualPath string, size int64) func(w io.Writer) (int64, error) {
  854. return func(w io.Writer) (int64, error) {
  855. reader, cancelFn, err := getFileReader(conn, virtualPath)
  856. if err != nil {
  857. return 0, err
  858. }
  859. defer cancelFn()
  860. defer reader.Close()
  861. return io.CopyN(w, reader, size)
  862. }
  863. }
  864. func getMailAttachments(conn *BaseConnection, attachments []string, replacer *strings.Replacer) ([]*mail.File, error) {
  865. var files []*mail.File
  866. totalSize := int64(0)
  867. for _, virtualPath := range replacePathsPlaceholders(attachments, replacer) {
  868. info, err := conn.DoStat(virtualPath, 0, false)
  869. if err != nil {
  870. return nil, fmt.Errorf("unable to get info for file %q, user %q: %w", virtualPath, conn.User.Username, err)
  871. }
  872. if !info.Mode().IsRegular() {
  873. return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
  874. }
  875. totalSize += info.Size()
  876. if totalSize > maxAttachmentsSize {
  877. return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
  878. }
  879. files = append(files, &mail.File{
  880. Name: path.Base(virtualPath),
  881. Header: make(map[string][]string),
  882. Writer: getFileContentFn(conn, virtualPath, info.Size()),
  883. })
  884. }
  885. return files, nil
  886. }
  887. func replaceWithReplacer(input string, replacer *strings.Replacer) string {
  888. if !strings.Contains(input, "{{") {
  889. return input
  890. }
  891. return replacer.Replace(input)
  892. }
  893. func checkEventConditionPattern(p dataprovider.ConditionPattern, name string) bool {
  894. var matched bool
  895. var err error
  896. if strings.Contains(p.Pattern, "**") {
  897. matched, err = doublestar.Match(p.Pattern, name)
  898. } else {
  899. matched, err = path.Match(p.Pattern, name)
  900. }
  901. if err != nil {
  902. eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
  903. return false
  904. }
  905. if p.InverseMatch {
  906. return !matched
  907. }
  908. return matched
  909. }
  910. func checkUserConditionOptions(user *dataprovider.User, conditions *dataprovider.ConditionOptions) bool {
  911. if !checkEventConditionPatterns(user.Username, conditions.Names) {
  912. return false
  913. }
  914. if !checkEventConditionPatterns(user.Role, conditions.RoleNames) {
  915. return false
  916. }
  917. if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
  918. return false
  919. }
  920. return true
  921. }
  922. // checkConditionPatterns returns false if patterns are defined and no match is found
  923. func checkEventConditionPatterns(name string, patterns []dataprovider.ConditionPattern) bool {
  924. if len(patterns) == 0 {
  925. return true
  926. }
  927. for _, p := range patterns {
  928. if checkEventConditionPattern(p, name) {
  929. return true
  930. }
  931. }
  932. return false
  933. }
  934. func checkEventGroupConditionPatters(groups []sdk.GroupMapping, patterns []dataprovider.ConditionPattern) bool {
  935. if len(patterns) == 0 {
  936. return true
  937. }
  938. for _, group := range groups {
  939. for _, p := range patterns {
  940. if checkEventConditionPattern(p, group.Name) {
  941. return true
  942. }
  943. }
  944. }
  945. return false
  946. }
  947. func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer) (string, error) {
  948. u, err := url.Parse(c.Endpoint)
  949. if err != nil {
  950. return "", fmt.Errorf("invalid endpoint: %w", err)
  951. }
  952. if strings.Contains(u.Path, "{{") {
  953. pathComponents := strings.Split(u.Path, "/")
  954. for idx := range pathComponents {
  955. part := replaceWithReplacer(pathComponents[idx], replacer)
  956. if part != pathComponents[idx] {
  957. pathComponents[idx] = url.PathEscape(part)
  958. }
  959. }
  960. u.Path = ""
  961. u = u.JoinPath(pathComponents...)
  962. }
  963. if len(c.QueryParameters) > 0 {
  964. q := u.Query()
  965. for _, keyVal := range c.QueryParameters {
  966. q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  967. }
  968. u.RawQuery = q.Encode()
  969. }
  970. return u.String(), nil
  971. }
  972. func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
  973. conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
  974. ) error {
  975. partWriter, err := m.CreatePart(h)
  976. if err != nil {
  977. eventManagerLog(logger.LevelError, "unable to create part %q, err: %v", part.Name, err)
  978. return err
  979. }
  980. if part.Body != "" {
  981. _, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, replacer)))
  982. if err != nil {
  983. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  984. return err
  985. }
  986. return nil
  987. }
  988. if part.Filepath == dataprovider.RetentionReportPlaceHolder {
  989. data, err := params.getCompressedDataRetentionReport()
  990. if err != nil {
  991. return err
  992. }
  993. _, err = partWriter.Write(data)
  994. if err != nil {
  995. eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
  996. return err
  997. }
  998. return nil
  999. }
  1000. err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
  1001. if err != nil {
  1002. eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
  1003. return err
  1004. }
  1005. return nil
  1006. }
  1007. func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
  1008. cancel context.CancelFunc, user dataprovider.User, params *EventParams,
  1009. ) (io.ReadCloser, string, error) {
  1010. var body io.ReadCloser
  1011. if c.Method == http.MethodGet {
  1012. return body, "", nil
  1013. }
  1014. if c.Body != "" {
  1015. if c.Body == dataprovider.RetentionReportPlaceHolder {
  1016. data, err := params.getCompressedDataRetentionReport()
  1017. if err != nil {
  1018. return body, "", err
  1019. }
  1020. return io.NopCloser(bytes.NewBuffer(data)), "", nil
  1021. }
  1022. return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
  1023. }
  1024. if len(c.Parts) > 0 {
  1025. r, w := io.Pipe()
  1026. m := multipart.NewWriter(w)
  1027. var conn *BaseConnection
  1028. if user.Username != "" {
  1029. var err error
  1030. user, err = getUserForEventAction(user)
  1031. if err != nil {
  1032. return body, "", err
  1033. }
  1034. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1035. err = user.CheckFsRoot(connectionID)
  1036. if err != nil {
  1037. user.CloseFs() //nolint:errcheck
  1038. return body, "", fmt.Errorf("error getting multipart file/s, unable to check root fs for user %q: %w",
  1039. user.Username, err)
  1040. }
  1041. conn = NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1042. }
  1043. go func() {
  1044. defer w.Close()
  1045. defer user.CloseFs() //nolint:errcheck
  1046. for _, part := range c.Parts {
  1047. h := make(textproto.MIMEHeader)
  1048. if part.Body != "" {
  1049. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
  1050. } else {
  1051. h.Set("Content-Disposition",
  1052. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  1053. multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
  1054. contentType := mime.TypeByExtension(path.Ext(part.Filepath))
  1055. if contentType == "" {
  1056. contentType = "application/octet-stream"
  1057. }
  1058. h.Set("Content-Type", contentType)
  1059. }
  1060. for _, keyVal := range part.Headers {
  1061. h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1062. }
  1063. if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
  1064. cancel()
  1065. return
  1066. }
  1067. }
  1068. m.Close()
  1069. }()
  1070. return r, m.FormDataContentType(), nil
  1071. }
  1072. return body, "", nil
  1073. }
  1074. func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
  1075. if err := c.TryDecryptPassword(); err != nil {
  1076. return err
  1077. }
  1078. addObjectData := false
  1079. if params.Object != nil {
  1080. addObjectData = c.HasObjectData()
  1081. }
  1082. replacements := params.getStringReplacements(addObjectData)
  1083. replacer := strings.NewReplacer(replacements...)
  1084. endpoint, err := getHTTPRuleActionEndpoint(c, replacer)
  1085. if err != nil {
  1086. return err
  1087. }
  1088. ctx, cancel := c.GetContext()
  1089. defer cancel()
  1090. var user dataprovider.User
  1091. if c.HasMultipartFiles() {
  1092. user, err = params.getUserFromSender()
  1093. if err != nil {
  1094. return err
  1095. }
  1096. }
  1097. body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
  1098. if err != nil {
  1099. return err
  1100. }
  1101. if body != nil {
  1102. defer body.Close()
  1103. }
  1104. req, err := http.NewRequestWithContext(ctx, c.Method, endpoint, body)
  1105. if err != nil {
  1106. return err
  1107. }
  1108. if contentType != "" {
  1109. req.Header.Set("Content-Type", contentType)
  1110. }
  1111. if c.Username != "" {
  1112. req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetPayload())
  1113. }
  1114. for _, keyVal := range c.Headers {
  1115. req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
  1116. }
  1117. client := c.GetHTTPClient()
  1118. defer client.CloseIdleConnections()
  1119. startTime := time.Now()
  1120. resp, err := client.Do(req)
  1121. if err != nil {
  1122. eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
  1123. endpoint, time.Since(startTime), err)
  1124. return fmt.Errorf("error sending HTTP request: %w", err)
  1125. }
  1126. defer resp.Body.Close()
  1127. eventManagerLog(logger.LevelDebug, "http notification sent, endpoint: %s, elapsed: %s, status code: %d",
  1128. endpoint, time.Since(startTime), resp.StatusCode)
  1129. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
  1130. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  1131. }
  1132. return nil
  1133. }
  1134. func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
  1135. addObjectData := false
  1136. if params.Object != nil {
  1137. for _, k := range c.EnvVars {
  1138. if strings.Contains(k.Value, "{{ObjectData}}") {
  1139. addObjectData = true
  1140. break
  1141. }
  1142. }
  1143. }
  1144. replacements := params.getStringReplacements(addObjectData)
  1145. replacer := strings.NewReplacer(replacements...)
  1146. args := make([]string, 0, len(c.Args))
  1147. for _, arg := range c.Args {
  1148. args = append(args, replaceWithReplacer(arg, replacer))
  1149. }
  1150. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
  1151. defer cancel()
  1152. cmd := exec.CommandContext(ctx, c.Cmd, args...)
  1153. cmd.Env = []string{}
  1154. for _, keyVal := range c.EnvVars {
  1155. cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
  1156. }
  1157. startTime := time.Now()
  1158. err := cmd.Run()
  1159. eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
  1160. c.Cmd, time.Since(startTime), err)
  1161. return err
  1162. }
  1163. func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
  1164. addObjectData := false
  1165. if params.Object != nil {
  1166. if strings.Contains(c.Body, "{{ObjectData}}") {
  1167. addObjectData = true
  1168. }
  1169. }
  1170. replacements := params.getStringReplacements(addObjectData)
  1171. replacer := strings.NewReplacer(replacements...)
  1172. body := replaceWithReplacer(c.Body, replacer)
  1173. subject := replaceWithReplacer(c.Subject, replacer)
  1174. startTime := time.Now()
  1175. var files []*mail.File
  1176. fileAttachments := make([]string, 0, len(c.Attachments))
  1177. for _, attachment := range c.Attachments {
  1178. if attachment == dataprovider.RetentionReportPlaceHolder {
  1179. f, err := params.getRetentionReportsAsMailAttachment()
  1180. if err != nil {
  1181. return err
  1182. }
  1183. files = append(files, f)
  1184. continue
  1185. }
  1186. fileAttachments = append(fileAttachments, attachment)
  1187. }
  1188. if len(fileAttachments) > 0 {
  1189. user, err := params.getUserFromSender()
  1190. if err != nil {
  1191. return err
  1192. }
  1193. user, err = getUserForEventAction(user)
  1194. if err != nil {
  1195. return err
  1196. }
  1197. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1198. err = user.CheckFsRoot(connectionID)
  1199. defer user.CloseFs() //nolint:errcheck
  1200. if err != nil {
  1201. return fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
  1202. }
  1203. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1204. res, err := getMailAttachments(conn, fileAttachments, replacer)
  1205. if err != nil {
  1206. return err
  1207. }
  1208. files = append(files, res...)
  1209. }
  1210. err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
  1211. eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
  1212. time.Since(startTime), err)
  1213. if err != nil {
  1214. return fmt.Errorf("unable to send email: %w", err)
  1215. }
  1216. return nil
  1217. }
  1218. func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
  1219. err := user.LoadAndApplyGroupSettings()
  1220. if err != nil {
  1221. eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
  1222. return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
  1223. }
  1224. user.UploadDataTransfer = 0
  1225. user.UploadBandwidth = 0
  1226. user.DownloadBandwidth = 0
  1227. user.Filters.DisableFsChecks = false
  1228. user.Filters.FilePatterns = nil
  1229. user.Filters.BandwidthLimits = nil
  1230. user.Filters.DataTransferLimits = nil
  1231. for k := range user.Permissions {
  1232. user.Permissions[k] = []string{dataprovider.PermAny}
  1233. }
  1234. return user, nil
  1235. }
  1236. func replacePathsPlaceholders(paths []string, replacer *strings.Replacer) []string {
  1237. results := make([]string, 0, len(paths))
  1238. for _, p := range paths {
  1239. results = append(results, util.CleanPath(replaceWithReplacer(p, replacer)))
  1240. }
  1241. return util.RemoveDuplicates(results, false)
  1242. }
  1243. func executeDeleteFileFsAction(conn *BaseConnection, item string, info os.FileInfo) error {
  1244. fs, fsPath, err := conn.GetFsAndResolvedPath(item)
  1245. if err != nil {
  1246. return err
  1247. }
  1248. return conn.RemoveFile(fs, fsPath, item, info)
  1249. }
  1250. func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer, user dataprovider.User) error {
  1251. user, err := getUserForEventAction(user)
  1252. if err != nil {
  1253. return err
  1254. }
  1255. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1256. err = user.CheckFsRoot(connectionID)
  1257. defer user.CloseFs() //nolint:errcheck
  1258. if err != nil {
  1259. return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
  1260. }
  1261. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1262. for _, item := range replacePathsPlaceholders(deletes, replacer) {
  1263. info, err := conn.DoStat(item, 0, false)
  1264. if err != nil {
  1265. if conn.IsNotExistError(err) {
  1266. continue
  1267. }
  1268. return fmt.Errorf("unable to check item to delete %q, user %q: %w", item, user.Username, err)
  1269. }
  1270. if info.IsDir() {
  1271. if err = conn.RemoveDir(item); err != nil {
  1272. return fmt.Errorf("unable to remove dir %q, user %q: %w", item, user.Username, err)
  1273. }
  1274. } else {
  1275. if err = executeDeleteFileFsAction(conn, item, info); err != nil {
  1276. return fmt.Errorf("unable to remove file %q, user %q: %w", item, user.Username, err)
  1277. }
  1278. }
  1279. eventManagerLog(logger.LevelDebug, "item %q removed for user %q", item, user.Username)
  1280. }
  1281. return nil
  1282. }
  1283. func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
  1284. conditions dataprovider.ConditionOptions, params *EventParams,
  1285. ) error {
  1286. users, err := params.getUsers()
  1287. if err != nil {
  1288. return fmt.Errorf("unable to get users: %w", err)
  1289. }
  1290. var failures []string
  1291. executed := 0
  1292. for _, user := range users {
  1293. // if sender is set, the conditions have already been evaluated
  1294. if params.sender == "" {
  1295. if !checkUserConditionOptions(&user, &conditions) {
  1296. eventManagerLog(logger.LevelDebug, "skipping fs delete for user %s, condition options don't match",
  1297. user.Username)
  1298. continue
  1299. }
  1300. }
  1301. executed++
  1302. if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
  1303. params.AddError(err)
  1304. failures = append(failures, user.Username)
  1305. continue
  1306. }
  1307. }
  1308. if len(failures) > 0 {
  1309. return fmt.Errorf("fs delete failed for users: %+v", failures)
  1310. }
  1311. if executed == 0 {
  1312. eventManagerLog(logger.LevelError, "no delete executed")
  1313. return errors.New("no delete executed")
  1314. }
  1315. return nil
  1316. }
  1317. func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, user dataprovider.User) error {
  1318. user, err := getUserForEventAction(user)
  1319. if err != nil {
  1320. return err
  1321. }
  1322. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1323. err = user.CheckFsRoot(connectionID)
  1324. defer user.CloseFs() //nolint:errcheck
  1325. if err != nil {
  1326. return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
  1327. }
  1328. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1329. for _, item := range replacePathsPlaceholders(dirs, replacer) {
  1330. if err = conn.CheckParentDirs(path.Dir(item)); err != nil {
  1331. return fmt.Errorf("unable to check parent dirs for %q, user %q: %w", item, user.Username, err)
  1332. }
  1333. if err = conn.createDirIfMissing(item); err != nil {
  1334. return fmt.Errorf("unable to create dir %q, user %q: %w", item, user.Username, err)
  1335. }
  1336. eventManagerLog(logger.LevelDebug, "directory %q created for user %q", item, user.Username)
  1337. }
  1338. return nil
  1339. }
  1340. func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
  1341. conditions dataprovider.ConditionOptions, params *EventParams,
  1342. ) error {
  1343. users, err := params.getUsers()
  1344. if err != nil {
  1345. return fmt.Errorf("unable to get users: %w", err)
  1346. }
  1347. var failures []string
  1348. executed := 0
  1349. for _, user := range users {
  1350. // if sender is set, the conditions have already been evaluated
  1351. if params.sender == "" {
  1352. if !checkUserConditionOptions(&user, &conditions) {
  1353. eventManagerLog(logger.LevelDebug, "skipping fs mkdir for user %s, condition options don't match",
  1354. user.Username)
  1355. continue
  1356. }
  1357. }
  1358. executed++
  1359. if err = executeMkDirsFsActionForUser(dirs, replacer, user); err != nil {
  1360. failures = append(failures, user.Username)
  1361. continue
  1362. }
  1363. }
  1364. if len(failures) > 0 {
  1365. return fmt.Errorf("fs mkdir failed for users: %+v", failures)
  1366. }
  1367. if executed == 0 {
  1368. eventManagerLog(logger.LevelError, "no mkdir executed")
  1369. return errors.New("no mkdir executed")
  1370. }
  1371. return nil
  1372. }
  1373. func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1374. user dataprovider.User,
  1375. ) error {
  1376. user, err := getUserForEventAction(user)
  1377. if err != nil {
  1378. return err
  1379. }
  1380. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1381. err = user.CheckFsRoot(connectionID)
  1382. defer user.CloseFs() //nolint:errcheck
  1383. if err != nil {
  1384. return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
  1385. }
  1386. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1387. for _, item := range renames {
  1388. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1389. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1390. if err = conn.renameInternal(source, target, true); err != nil {
  1391. return fmt.Errorf("unable to rename %q->%q, user %q: %w", source, target, user.Username, err)
  1392. }
  1393. eventManagerLog(logger.LevelDebug, "rename %q->%q ok, user %q", source, target, user.Username)
  1394. }
  1395. return nil
  1396. }
  1397. func executeCopyFsActionForUser(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1398. user dataprovider.User,
  1399. ) error {
  1400. user, err := getUserForEventAction(user)
  1401. if err != nil {
  1402. return err
  1403. }
  1404. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1405. err = user.CheckFsRoot(connectionID)
  1406. defer user.CloseFs() //nolint:errcheck
  1407. if err != nil {
  1408. return fmt.Errorf("copy error, unable to check root fs for user %q: %w", user.Username, err)
  1409. }
  1410. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1411. for _, item := range copy {
  1412. source := util.CleanPath(replaceWithReplacer(item.Key, replacer))
  1413. target := util.CleanPath(replaceWithReplacer(item.Value, replacer))
  1414. if strings.HasSuffix(item.Key, "/") {
  1415. source += "/"
  1416. }
  1417. if strings.HasSuffix(item.Value, "/") {
  1418. target += "/"
  1419. }
  1420. if err = conn.Copy(source, target); err != nil {
  1421. return fmt.Errorf("unable to copy %q->%q, user %q: %w", source, target, user.Username, err)
  1422. }
  1423. eventManagerLog(logger.LevelDebug, "copy %q->%q ok, user %q", source, target, user.Username)
  1424. }
  1425. return nil
  1426. }
  1427. func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
  1428. user dataprovider.User,
  1429. ) error {
  1430. user, err := getUserForEventAction(user)
  1431. if err != nil {
  1432. return err
  1433. }
  1434. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1435. err = user.CheckFsRoot(connectionID)
  1436. defer user.CloseFs() //nolint:errcheck
  1437. if err != nil {
  1438. return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
  1439. }
  1440. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1441. for _, item := range replacePathsPlaceholders(exist, replacer) {
  1442. if _, err = conn.DoStat(item, 0, false); err != nil {
  1443. return fmt.Errorf("error checking existence for path %q, user %q: %w", item, user.Username, err)
  1444. }
  1445. eventManagerLog(logger.LevelDebug, "path %q exists for user %q", item, user.Username)
  1446. }
  1447. return nil
  1448. }
  1449. func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
  1450. conditions dataprovider.ConditionOptions, params *EventParams,
  1451. ) error {
  1452. users, err := params.getUsers()
  1453. if err != nil {
  1454. return fmt.Errorf("unable to get users: %w", err)
  1455. }
  1456. var failures []string
  1457. executed := 0
  1458. for _, user := range users {
  1459. // if sender is set, the conditions have already been evaluated
  1460. if params.sender == "" {
  1461. if !checkUserConditionOptions(&user, &conditions) {
  1462. eventManagerLog(logger.LevelDebug, "skipping fs rename for user %s, condition options don't match",
  1463. user.Username)
  1464. continue
  1465. }
  1466. }
  1467. executed++
  1468. if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
  1469. failures = append(failures, user.Username)
  1470. params.AddError(err)
  1471. continue
  1472. }
  1473. }
  1474. if len(failures) > 0 {
  1475. return fmt.Errorf("fs rename failed for users: %+v", failures)
  1476. }
  1477. if executed == 0 {
  1478. eventManagerLog(logger.LevelError, "no rename executed")
  1479. return errors.New("no rename executed")
  1480. }
  1481. return nil
  1482. }
  1483. func executeCopyFsRuleAction(copy []dataprovider.KeyValue, replacer *strings.Replacer,
  1484. conditions dataprovider.ConditionOptions, params *EventParams,
  1485. ) error {
  1486. users, err := params.getUsers()
  1487. if err != nil {
  1488. return fmt.Errorf("unable to get users: %w", err)
  1489. }
  1490. var failures []string
  1491. var executed int
  1492. for _, user := range users {
  1493. // if sender is set, the conditions have already been evaluated
  1494. if params.sender == "" {
  1495. if !checkUserConditionOptions(&user, &conditions) {
  1496. eventManagerLog(logger.LevelDebug, "skipping fs copy for user %s, condition options don't match",
  1497. user.Username)
  1498. continue
  1499. }
  1500. }
  1501. executed++
  1502. if err = executeCopyFsActionForUser(copy, replacer, user); err != nil {
  1503. failures = append(failures, user.Username)
  1504. params.AddError(err)
  1505. continue
  1506. }
  1507. }
  1508. if len(failures) > 0 {
  1509. return fmt.Errorf("fs copy failed for users: %+v", failures)
  1510. }
  1511. if executed == 0 {
  1512. eventManagerLog(logger.LevelError, "no copy executed")
  1513. return errors.New("no copy executed")
  1514. }
  1515. return nil
  1516. }
  1517. func getArchiveBaseDir(paths []string) string {
  1518. var parentDirs []string
  1519. for _, p := range paths {
  1520. parentDirs = append(parentDirs, path.Dir(p))
  1521. }
  1522. parentDirs = util.RemoveDuplicates(parentDirs, false)
  1523. baseDir := "/"
  1524. if len(parentDirs) == 1 {
  1525. baseDir = parentDirs[0]
  1526. }
  1527. return baseDir
  1528. }
  1529. func getSizeForPath(conn *BaseConnection, p string, info os.FileInfo) (int64, error) {
  1530. if info.IsDir() {
  1531. var dirSize int64
  1532. entries, err := conn.ListDir(p)
  1533. if err != nil {
  1534. return 0, err
  1535. }
  1536. for _, entry := range entries {
  1537. size, err := getSizeForPath(conn, path.Join(p, entry.Name()), entry)
  1538. if err != nil {
  1539. return 0, err
  1540. }
  1541. dirSize += size
  1542. }
  1543. return dirSize, nil
  1544. }
  1545. if info.Mode().IsRegular() {
  1546. return info.Size(), nil
  1547. }
  1548. return 0, nil
  1549. }
  1550. func estimateZipSize(conn *BaseConnection, zipPath string, paths []string) (int64, error) {
  1551. q, _ := conn.HasSpace(false, false, zipPath)
  1552. if q.HasSpace && q.GetRemainingSize() > 0 {
  1553. var size int64
  1554. for _, item := range paths {
  1555. info, err := conn.DoStat(item, 1, false)
  1556. if err != nil {
  1557. return size, err
  1558. }
  1559. itemSize, err := getSizeForPath(conn, item, info)
  1560. if err != nil {
  1561. return size, err
  1562. }
  1563. size += itemSize
  1564. }
  1565. eventManagerLog(logger.LevelDebug, "archive paths %v, archive name %q, size: %d", paths, zipPath, size)
  1566. // we assume the zip size will be half of the real size
  1567. return size / 2, nil
  1568. }
  1569. return -1, nil
  1570. }
  1571. func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1572. user dataprovider.User,
  1573. ) error {
  1574. user, err := getUserForEventAction(user)
  1575. if err != nil {
  1576. return err
  1577. }
  1578. connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
  1579. err = user.CheckFsRoot(connectionID)
  1580. defer user.CloseFs() //nolint:errcheck
  1581. if err != nil {
  1582. return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
  1583. }
  1584. conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
  1585. name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
  1586. paths := make([]string, 0, len(c.Paths))
  1587. for idx := range c.Paths {
  1588. p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
  1589. if p == name {
  1590. return fmt.Errorf("cannot compress the archive to create: %q", name)
  1591. }
  1592. paths = append(paths, p)
  1593. }
  1594. paths = util.RemoveDuplicates(paths, false)
  1595. estimatedSize, err := estimateZipSize(conn, name, paths)
  1596. if err != nil {
  1597. eventManagerLog(logger.LevelError, "unable to estimate size for archive %q: %v", name, err)
  1598. return fmt.Errorf("unable to estimate archive size: %w", err)
  1599. }
  1600. writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name, estimatedSize)
  1601. if err != nil {
  1602. eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
  1603. return fmt.Errorf("unable to create archive: %w", err)
  1604. }
  1605. defer cancelFn()
  1606. baseDir := getArchiveBaseDir(paths)
  1607. eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
  1608. zipWriter := &zipWriterWrapper{
  1609. Name: name,
  1610. Writer: zip.NewWriter(writer),
  1611. Entries: make(map[string]bool),
  1612. }
  1613. startTime := time.Now()
  1614. for _, item := range paths {
  1615. if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
  1616. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1617. return err
  1618. }
  1619. }
  1620. if err := zipWriter.Writer.Close(); err != nil {
  1621. eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
  1622. closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime) //nolint:errcheck
  1623. return fmt.Errorf("unable to close zip file %q: %w", name, err)
  1624. }
  1625. return closeWriterAndUpdateQuota(writer, conn, name, "", numFiles, truncatedSize, err, operationUpload, startTime)
  1626. }
  1627. func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
  1628. params *EventParams,
  1629. ) error {
  1630. users, err := params.getUsers()
  1631. if err != nil {
  1632. return fmt.Errorf("unable to get users: %w", err)
  1633. }
  1634. var failures []string
  1635. executed := 0
  1636. for _, user := range users {
  1637. // if sender is set, the conditions have already been evaluated
  1638. if params.sender == "" {
  1639. if !checkUserConditionOptions(&user, &conditions) {
  1640. eventManagerLog(logger.LevelDebug, "skipping fs exist for user %s, condition options don't match",
  1641. user.Username)
  1642. continue
  1643. }
  1644. }
  1645. executed++
  1646. if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
  1647. failures = append(failures, user.Username)
  1648. params.AddError(err)
  1649. continue
  1650. }
  1651. }
  1652. if len(failures) > 0 {
  1653. return fmt.Errorf("fs existence check failed for users: %+v", failures)
  1654. }
  1655. if executed == 0 {
  1656. eventManagerLog(logger.LevelError, "no existence check executed")
  1657. return errors.New("no existence check executed")
  1658. }
  1659. return nil
  1660. }
  1661. func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
  1662. conditions dataprovider.ConditionOptions, params *EventParams,
  1663. ) error {
  1664. users, err := params.getUsers()
  1665. if err != nil {
  1666. return fmt.Errorf("unable to get users: %w", err)
  1667. }
  1668. var failures []string
  1669. executed := 0
  1670. for _, user := range users {
  1671. // if sender is set, the conditions have already been evaluated
  1672. if params.sender == "" {
  1673. if !checkUserConditionOptions(&user, &conditions) {
  1674. eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, condition options don't match",
  1675. user.Username)
  1676. continue
  1677. }
  1678. }
  1679. executed++
  1680. if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
  1681. failures = append(failures, user.Username)
  1682. params.AddError(err)
  1683. continue
  1684. }
  1685. }
  1686. if len(failures) > 0 {
  1687. return fmt.Errorf("fs compress failed for users: %+v", failures)
  1688. }
  1689. if executed == 0 {
  1690. eventManagerLog(logger.LevelError, "no file/folder compressed")
  1691. return errors.New("no file/folder compressed")
  1692. }
  1693. return nil
  1694. }
  1695. func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
  1696. params *EventParams,
  1697. ) error {
  1698. addObjectData := false
  1699. replacements := params.getStringReplacements(addObjectData)
  1700. replacer := strings.NewReplacer(replacements...)
  1701. switch c.Type {
  1702. case dataprovider.FilesystemActionRename:
  1703. return executeRenameFsRuleAction(c.Renames, replacer, conditions, params)
  1704. case dataprovider.FilesystemActionDelete:
  1705. return executeDeleteFsRuleAction(c.Deletes, replacer, conditions, params)
  1706. case dataprovider.FilesystemActionMkdirs:
  1707. return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
  1708. case dataprovider.FilesystemActionExist:
  1709. return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
  1710. case dataprovider.FilesystemActionCompress:
  1711. return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
  1712. case dataprovider.FilesystemActionCopy:
  1713. return executeCopyFsRuleAction(c.Copy, replacer, conditions, params)
  1714. default:
  1715. return fmt.Errorf("unsupported filesystem action %d", c.Type)
  1716. }
  1717. }
  1718. func executeQuotaResetForUser(user *dataprovider.User) error {
  1719. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1720. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1721. user.Username, err)
  1722. return err
  1723. }
  1724. if !QuotaScans.AddUserQuotaScan(user.Username, user.Role) {
  1725. eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
  1726. return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
  1727. }
  1728. defer QuotaScans.RemoveUserQuotaScan(user.Username)
  1729. numFiles, size, err := user.ScanQuota()
  1730. if err != nil {
  1731. eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
  1732. return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
  1733. }
  1734. err = dataprovider.UpdateUserQuota(user, numFiles, size, true)
  1735. if err != nil {
  1736. eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
  1737. return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
  1738. }
  1739. return nil
  1740. }
  1741. func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1742. users, err := params.getUsers()
  1743. if err != nil {
  1744. return fmt.Errorf("unable to get users: %w", err)
  1745. }
  1746. var failedResets []string
  1747. executed := 0
  1748. for _, user := range users {
  1749. // if sender is set, the conditions have already been evaluated
  1750. if params.sender == "" {
  1751. if !checkUserConditionOptions(&user, &conditions) {
  1752. eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, condition options don't match",
  1753. user.Username)
  1754. continue
  1755. }
  1756. }
  1757. executed++
  1758. if err = executeQuotaResetForUser(&user); err != nil {
  1759. params.AddError(err)
  1760. failedResets = append(failedResets, user.Username)
  1761. continue
  1762. }
  1763. }
  1764. if len(failedResets) > 0 {
  1765. return fmt.Errorf("quota reset failed for users: %+v", failedResets)
  1766. }
  1767. if executed == 0 {
  1768. eventManagerLog(logger.LevelError, "no user quota reset executed")
  1769. return errors.New("no user quota reset executed")
  1770. }
  1771. return nil
  1772. }
  1773. func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1774. folders, err := params.getFolders()
  1775. if err != nil {
  1776. return fmt.Errorf("unable to get folders: %w", err)
  1777. }
  1778. var failedResets []string
  1779. executed := 0
  1780. for _, folder := range folders {
  1781. // if sender is set, the conditions have already been evaluated
  1782. if params.sender == "" && !checkEventConditionPatterns(folder.Name, conditions.Names) {
  1783. eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
  1784. folder.Name)
  1785. continue
  1786. }
  1787. if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
  1788. eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
  1789. params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
  1790. failedResets = append(failedResets, folder.Name)
  1791. continue
  1792. }
  1793. executed++
  1794. f := vfs.VirtualFolder{
  1795. BaseVirtualFolder: folder,
  1796. VirtualPath: "/",
  1797. }
  1798. numFiles, size, err := f.ScanQuota()
  1799. QuotaScans.RemoveVFolderQuotaScan(folder.Name)
  1800. if err != nil {
  1801. eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
  1802. params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
  1803. failedResets = append(failedResets, folder.Name)
  1804. continue
  1805. }
  1806. err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
  1807. if err != nil {
  1808. eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
  1809. params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
  1810. failedResets = append(failedResets, folder.Name)
  1811. }
  1812. }
  1813. if len(failedResets) > 0 {
  1814. return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
  1815. }
  1816. if executed == 0 {
  1817. eventManagerLog(logger.LevelError, "no folder quota reset executed")
  1818. return errors.New("no folder quota reset executed")
  1819. }
  1820. return nil
  1821. }
  1822. func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1823. users, err := params.getUsers()
  1824. if err != nil {
  1825. return fmt.Errorf("unable to get users: %w", err)
  1826. }
  1827. var failedResets []string
  1828. executed := 0
  1829. for _, user := range users {
  1830. // if sender is set, the conditions have already been evaluated
  1831. if params.sender == "" {
  1832. if !checkUserConditionOptions(&user, &conditions) {
  1833. eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, condition options don't match",
  1834. user.Username)
  1835. continue
  1836. }
  1837. }
  1838. executed++
  1839. err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
  1840. if err != nil {
  1841. eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
  1842. params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
  1843. failedResets = append(failedResets, user.Username)
  1844. }
  1845. }
  1846. if len(failedResets) > 0 {
  1847. return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
  1848. }
  1849. if executed == 0 {
  1850. eventManagerLog(logger.LevelError, "no transfer quota reset executed")
  1851. return errors.New("no transfer quota reset executed")
  1852. }
  1853. return nil
  1854. }
  1855. func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
  1856. params *EventParams, actionName string,
  1857. ) error {
  1858. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1859. eventManagerLog(logger.LevelError, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
  1860. user.Username, err)
  1861. return err
  1862. }
  1863. check := RetentionCheck{
  1864. Folders: folders,
  1865. }
  1866. c := RetentionChecks.Add(check, &user)
  1867. if c == nil {
  1868. eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
  1869. return fmt.Errorf("another retention check is in progress for user %q", user.Username)
  1870. }
  1871. defer func() {
  1872. params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
  1873. Username: user.Username,
  1874. ActionName: actionName,
  1875. Results: c.results,
  1876. })
  1877. }()
  1878. if err := c.Start(); err != nil {
  1879. eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
  1880. return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
  1881. }
  1882. return nil
  1883. }
  1884. func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
  1885. conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
  1886. ) error {
  1887. users, err := params.getUsers()
  1888. if err != nil {
  1889. return fmt.Errorf("unable to get users: %w", err)
  1890. }
  1891. var failedChecks []string
  1892. executed := 0
  1893. for _, user := range users {
  1894. // if sender is set, the conditions have already been evaluated
  1895. if params.sender == "" {
  1896. if !checkUserConditionOptions(&user, &conditions) {
  1897. eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, condition options don't match",
  1898. user.Username)
  1899. continue
  1900. }
  1901. }
  1902. executed++
  1903. if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
  1904. failedChecks = append(failedChecks, user.Username)
  1905. params.AddError(err)
  1906. continue
  1907. }
  1908. }
  1909. if len(failedChecks) > 0 {
  1910. return fmt.Errorf("retention check failed for users: %+v", failedChecks)
  1911. }
  1912. if executed == 0 {
  1913. eventManagerLog(logger.LevelError, "no retention check executed")
  1914. return errors.New("no retention check executed")
  1915. }
  1916. return nil
  1917. }
  1918. func executeMetadataCheckForUser(user *dataprovider.User) error {
  1919. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1920. eventManagerLog(logger.LevelError, "skipping scheduled quota reset for user %s, cannot apply group settings: %v",
  1921. user.Username, err)
  1922. return err
  1923. }
  1924. if !ActiveMetadataChecks.Add(user.Username, user.Role) {
  1925. eventManagerLog(logger.LevelError, "another metadata check is already in progress for user %q", user.Username)
  1926. return fmt.Errorf("another metadata check is in progress for user %q", user.Username)
  1927. }
  1928. defer ActiveMetadataChecks.Remove(user.Username)
  1929. if err := user.CheckMetadataConsistency(); err != nil {
  1930. eventManagerLog(logger.LevelError, "error checking metadata consistence for user %q: %v", user.Username, err)
  1931. return fmt.Errorf("error checking metadata consistence for user %q: %w", user.Username, err)
  1932. }
  1933. return nil
  1934. }
  1935. func executeMetadataCheckRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
  1936. users, err := params.getUsers()
  1937. if err != nil {
  1938. return fmt.Errorf("unable to get users: %w", err)
  1939. }
  1940. var failures []string
  1941. var executed int
  1942. for _, user := range users {
  1943. // if sender is set, the conditions have already been evaluated
  1944. if params.sender == "" {
  1945. if !checkUserConditionOptions(&user, &conditions) {
  1946. eventManagerLog(logger.LevelDebug, "skipping metadata check for user %q, condition options don't match",
  1947. user.Username)
  1948. continue
  1949. }
  1950. }
  1951. executed++
  1952. if err = executeMetadataCheckForUser(&user); err != nil {
  1953. params.AddError(err)
  1954. failures = append(failures, user.Username)
  1955. continue
  1956. }
  1957. }
  1958. if len(failures) > 0 {
  1959. return fmt.Errorf("metadata check failed for users: %+v", failures)
  1960. }
  1961. if executed == 0 {
  1962. eventManagerLog(logger.LevelError, "no metadata check executed")
  1963. return errors.New("no metadata check executed")
  1964. }
  1965. return nil
  1966. }
  1967. func executePwdExpirationCheckForUser(user *dataprovider.User, config dataprovider.EventActionPasswordExpiration) error {
  1968. if err := user.LoadAndApplyGroupSettings(); err != nil {
  1969. eventManagerLog(logger.LevelError, "skipping password expiration check for user %s, cannot apply group settings: %v",
  1970. user.Username, err)
  1971. return err
  1972. }
  1973. if user.Filters.PasswordExpiration == 0 {
  1974. eventManagerLog(logger.LevelDebug, "password expiration not set for user %q skipping check", user.Username)
  1975. return nil
  1976. }
  1977. days := user.PasswordExpiresIn()
  1978. if days > config.Threshold {
  1979. eventManagerLog(logger.LevelDebug, "password for user %q expires in %d days, threshold %d, no need to notify",
  1980. user.Username, days, config.Threshold)
  1981. return nil
  1982. }
  1983. body := new(bytes.Buffer)
  1984. data := make(map[string]any)
  1985. data["Username"] = user.Username
  1986. data["Days"] = days
  1987. if err := smtp.RenderPasswordExpirationTemplate(body, data); err != nil {
  1988. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v",
  1989. user.Username, err)
  1990. return err
  1991. }
  1992. subject := "SFTPGo password expiration notification"
  1993. startTime := time.Now()
  1994. if err := smtp.SendEmail([]string{user.Email}, subject, body.String(), smtp.EmailContentTypeTextHTML); err != nil {
  1995. eventManagerLog(logger.LevelError, "unable to notify password expiration for user %s: %v, elapsed: %s",
  1996. user.Username, err, time.Since(startTime))
  1997. return err
  1998. }
  1999. eventManagerLog(logger.LevelDebug, "password expiration email sent to user %s, days: %d, elapsed: %s",
  2000. user.Username, days, time.Since(startTime))
  2001. return nil
  2002. }
  2003. func executePwdExpirationCheckRuleAction(config dataprovider.EventActionPasswordExpiration, conditions dataprovider.ConditionOptions,
  2004. params *EventParams) error {
  2005. users, err := params.getUsers()
  2006. if err != nil {
  2007. return fmt.Errorf("unable to get users: %w", err)
  2008. }
  2009. var failures []string
  2010. for _, user := range users {
  2011. // if sender is set, the conditions have already been evaluated
  2012. if params.sender == "" {
  2013. if !checkUserConditionOptions(&user, &conditions) {
  2014. eventManagerLog(logger.LevelDebug, "skipping password check for user %q, condition options don't match",
  2015. user.Username)
  2016. continue
  2017. }
  2018. }
  2019. if err = executePwdExpirationCheckForUser(&user, config); err != nil {
  2020. params.AddError(err)
  2021. failures = append(failures, user.Username)
  2022. continue
  2023. }
  2024. }
  2025. if len(failures) > 0 {
  2026. return fmt.Errorf("password expiration check failed for users: %+v", failures)
  2027. }
  2028. return nil
  2029. }
  2030. func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
  2031. conditions dataprovider.ConditionOptions,
  2032. ) error {
  2033. var err error
  2034. switch action.Type {
  2035. case dataprovider.ActionTypeHTTP:
  2036. err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
  2037. case dataprovider.ActionTypeCommand:
  2038. err = executeCommandRuleAction(action.Options.CmdConfig, params)
  2039. case dataprovider.ActionTypeEmail:
  2040. err = executeEmailRuleAction(action.Options.EmailConfig, params)
  2041. case dataprovider.ActionTypeBackup:
  2042. var backupPath string
  2043. backupPath, err = dataprovider.ExecuteBackup()
  2044. if err == nil {
  2045. params.setBackupParams(backupPath)
  2046. }
  2047. case dataprovider.ActionTypeUserQuotaReset:
  2048. err = executeUsersQuotaResetRuleAction(conditions, params)
  2049. case dataprovider.ActionTypeFolderQuotaReset:
  2050. err = executeFoldersQuotaResetRuleAction(conditions, params)
  2051. case dataprovider.ActionTypeTransferQuotaReset:
  2052. err = executeTransferQuotaResetRuleAction(conditions, params)
  2053. case dataprovider.ActionTypeDataRetentionCheck:
  2054. err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
  2055. case dataprovider.ActionTypeMetadataCheck:
  2056. err = executeMetadataCheckRuleAction(conditions, params)
  2057. case dataprovider.ActionTypeFilesystem:
  2058. err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
  2059. case dataprovider.ActionTypePasswordExpirationCheck:
  2060. err = executePwdExpirationCheckRuleAction(action.Options.PwdExpirationConfig, conditions, params)
  2061. default:
  2062. err = fmt.Errorf("unsupported action type: %d", action.Type)
  2063. }
  2064. if err != nil {
  2065. err = fmt.Errorf("action %q failed: %w", action.Name, err)
  2066. }
  2067. params.AddError(err)
  2068. return err
  2069. }
  2070. func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
  2071. var errRes error
  2072. for _, rule := range rules {
  2073. var failedActions []string
  2074. paramsCopy := params.getACopy()
  2075. for _, action := range rule.Actions {
  2076. if !action.Options.IsFailureAction && action.Options.ExecuteSync {
  2077. startTime := time.Now()
  2078. if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
  2079. eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
  2080. action.Name, rule.Name, time.Since(startTime), err)
  2081. failedActions = append(failedActions, action.Name)
  2082. // we return the last error, it is ok for now
  2083. errRes = err
  2084. if action.Options.StopOnFailure {
  2085. break
  2086. }
  2087. } else {
  2088. eventManagerLog(logger.LevelDebug, "executed sync action %q for rule %q, elapsed: %s",
  2089. action.Name, rule.Name, time.Since(startTime))
  2090. }
  2091. }
  2092. }
  2093. // execute async actions if any, including failure actions
  2094. go executeRuleAsyncActions(rule, paramsCopy, failedActions)
  2095. }
  2096. return errRes
  2097. }
  2098. func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams) {
  2099. eventManager.addAsyncTask()
  2100. defer eventManager.removeAsyncTask()
  2101. for _, rule := range rules {
  2102. executeRuleAsyncActions(rule, params.getACopy(), nil)
  2103. }
  2104. }
  2105. func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
  2106. for _, action := range rule.Actions {
  2107. if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
  2108. startTime := time.Now()
  2109. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2110. eventManagerLog(logger.LevelError, "unable to execute action %q for rule %q, elapsed %s, err: %v",
  2111. action.Name, rule.Name, time.Since(startTime), err)
  2112. failedActions = append(failedActions, action.Name)
  2113. if action.Options.StopOnFailure {
  2114. break
  2115. }
  2116. } else {
  2117. eventManagerLog(logger.LevelDebug, "executed action %q for rule %q, elapsed %s",
  2118. action.Name, rule.Name, time.Since(startTime))
  2119. }
  2120. }
  2121. }
  2122. if len(failedActions) > 0 {
  2123. params.updateStatusFromError = false
  2124. // execute failure actions
  2125. for _, action := range rule.Actions {
  2126. if action.Options.IsFailureAction {
  2127. startTime := time.Now()
  2128. if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
  2129. eventManagerLog(logger.LevelError, "unable to execute failure action %q for rule %q, elapsed %s, err: %v",
  2130. action.Name, rule.Name, time.Since(startTime), err)
  2131. if action.Options.StopOnFailure {
  2132. break
  2133. }
  2134. } else {
  2135. eventManagerLog(logger.LevelDebug, "executed failure action %q for rule %q, elapsed: %s",
  2136. action.Name, rule.Name, time.Since(startTime))
  2137. }
  2138. }
  2139. }
  2140. }
  2141. }
  2142. type eventCronJob struct {
  2143. ruleName string
  2144. }
  2145. func (j *eventCronJob) getTask(rule *dataprovider.EventRule) (dataprovider.Task, error) {
  2146. if rule.GuardFromConcurrentExecution() {
  2147. task, err := dataprovider.GetTaskByName(rule.Name)
  2148. if err != nil {
  2149. if errors.Is(err, util.ErrNotFound) {
  2150. eventManagerLog(logger.LevelDebug, "adding task for rule %q", rule.Name)
  2151. task = dataprovider.Task{
  2152. Name: rule.Name,
  2153. UpdateAt: 0,
  2154. Version: 0,
  2155. }
  2156. err = dataprovider.AddTask(rule.Name)
  2157. if err != nil {
  2158. eventManagerLog(logger.LevelWarn, "unable to add task for rule %q: %v", rule.Name, err)
  2159. return task, err
  2160. }
  2161. } else {
  2162. eventManagerLog(logger.LevelWarn, "unable to get task for rule %q: %v", rule.Name, err)
  2163. }
  2164. }
  2165. return task, err
  2166. }
  2167. return dataprovider.Task{}, nil
  2168. }
  2169. func (j *eventCronJob) Run() {
  2170. eventManagerLog(logger.LevelDebug, "executing scheduled rule %q", j.ruleName)
  2171. rule, err := dataprovider.EventRuleExists(j.ruleName)
  2172. if err != nil {
  2173. eventManagerLog(logger.LevelError, "unable to load rule with name %q", j.ruleName)
  2174. return
  2175. }
  2176. if err := rule.CheckActionsConsistency(""); err != nil {
  2177. eventManagerLog(logger.LevelWarn, "scheduled rule %q skipped: %v", rule.Name, err)
  2178. return
  2179. }
  2180. task, err := j.getTask(&rule)
  2181. if err != nil {
  2182. return
  2183. }
  2184. if task.Name != "" {
  2185. updateInterval := 5 * time.Minute
  2186. updatedAt := util.GetTimeFromMsecSinceEpoch(task.UpdateAt)
  2187. if updatedAt.Add(updateInterval*2 + 1).After(time.Now()) {
  2188. eventManagerLog(logger.LevelDebug, "task for rule %q too recent: %s, skip execution", rule.Name, updatedAt)
  2189. return
  2190. }
  2191. err = dataprovider.UpdateTask(rule.Name, task.Version)
  2192. if err != nil {
  2193. eventManagerLog(logger.LevelInfo, "unable to update task timestamp for rule %q, skip execution, err: %v",
  2194. rule.Name, err)
  2195. return
  2196. }
  2197. ticker := time.NewTicker(updateInterval)
  2198. done := make(chan bool)
  2199. defer func() {
  2200. done <- true
  2201. ticker.Stop()
  2202. }()
  2203. go func(taskName string) {
  2204. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker started", taskName)
  2205. for {
  2206. select {
  2207. case <-done:
  2208. eventManagerLog(logger.LevelDebug, "update task %q timestamp worker finished", taskName)
  2209. return
  2210. case <-ticker.C:
  2211. err := dataprovider.UpdateTaskTimestamp(taskName)
  2212. eventManagerLog(logger.LevelInfo, "updated timestamp for task %q, err: %v", taskName, err)
  2213. }
  2214. }
  2215. }(task.Name)
  2216. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2217. } else {
  2218. executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2219. }
  2220. eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
  2221. }
  2222. // RunOnDemandRule executes actions for a rule with on-demand trigger
  2223. func RunOnDemandRule(name string) error {
  2224. eventManagerLog(logger.LevelDebug, "executing on demand rule %q", name)
  2225. rule, err := dataprovider.EventRuleExists(name)
  2226. if err != nil {
  2227. eventManagerLog(logger.LevelDebug, "unable to load rule with name %q", name)
  2228. return util.NewRecordNotFoundError(fmt.Sprintf("rule %q does not exist", name))
  2229. }
  2230. if rule.Trigger != dataprovider.EventTriggerOnDemand {
  2231. eventManagerLog(logger.LevelDebug, "cannot run rule %q as on demand, trigger: %d", name, rule.Trigger)
  2232. return util.NewValidationError(fmt.Sprintf("rule %q is not defined as on-demand", name))
  2233. }
  2234. if rule.Status != 1 {
  2235. eventManagerLog(logger.LevelDebug, "on-demand rule %q is inactive", name)
  2236. return util.NewValidationError(fmt.Sprintf("rule %q is inactive", name))
  2237. }
  2238. if err := rule.CheckActionsConsistency(""); err != nil {
  2239. eventManagerLog(logger.LevelError, "on-demand rule %q has incompatible actions: %v", name, err)
  2240. return util.NewValidationError(fmt.Sprintf("rule %q has incosistent actions", name))
  2241. }
  2242. eventManagerLog(logger.LevelDebug, "on-demand rule %q started", name)
  2243. go executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
  2244. return nil
  2245. }
  2246. type zipWriterWrapper struct {
  2247. Name string
  2248. Entries map[string]bool
  2249. Writer *zip.Writer
  2250. }
  2251. func eventManagerLog(level logger.LogLevel, format string, v ...any) {
  2252. logger.Log(level, "eventmanager", "", format, v...)
  2253. }