runtime.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package parser
  2. /*
  3. This file contains
  4. - the runtime parsing routines
  5. */
  6. import (
  7. "errors"
  8. "fmt"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/antonmedv/expr"
  15. "github.com/mohae/deepcopy"
  16. "github.com/prometheus/client_golang/prometheus"
  17. log "github.com/sirupsen/logrus"
  18. "github.com/crowdsecurity/crowdsec/pkg/types"
  19. )
  20. /* ok, this is kinda experimental, I don't know how bad of an idea it is .. */
  21. func SetTargetByName(target string, value string, evt *types.Event) bool {
  22. if evt == nil {
  23. return false
  24. }
  25. //it's a hack, we do it for the user
  26. target = strings.TrimPrefix(target, "evt.")
  27. log.Debugf("setting target %s to %s", target, value)
  28. defer func() {
  29. if r := recover(); r != nil {
  30. log.Errorf("Runtime error while trying to set '%s': %+v", target, r)
  31. return
  32. }
  33. }()
  34. iter := reflect.ValueOf(evt).Elem()
  35. if (iter == reflect.Value{}) || iter.IsZero() {
  36. log.Tracef("event is nill")
  37. //event is nill
  38. return false
  39. }
  40. for _, f := range strings.Split(target, ".") {
  41. /*
  42. ** According to current Event layout we only have to handle struct and map
  43. */
  44. switch iter.Kind() {
  45. case reflect.Map:
  46. tmp := iter.MapIndex(reflect.ValueOf(f))
  47. /*if we're in a map and the field doesn't exist, the user wants to add it :) */
  48. if (tmp == reflect.Value{}) || tmp.IsZero() {
  49. log.Debugf("map entry is zero in '%s'", target)
  50. }
  51. iter.SetMapIndex(reflect.ValueOf(f), reflect.ValueOf(value))
  52. return true
  53. case reflect.Struct:
  54. tmp := iter.FieldByName(f)
  55. if !tmp.IsValid() {
  56. log.Debugf("'%s' is not a valid target because '%s' is not valid", target, f)
  57. return false
  58. }
  59. if tmp.Kind() == reflect.Ptr {
  60. tmp = reflect.Indirect(tmp)
  61. }
  62. iter = tmp
  63. case reflect.Ptr:
  64. tmp := iter.Elem()
  65. iter = reflect.Indirect(tmp.FieldByName(f))
  66. default:
  67. log.Errorf("unexpected type %s in '%s'", iter.Kind(), target)
  68. return false
  69. }
  70. }
  71. //now we should have the final member :)
  72. if !iter.CanSet() {
  73. log.Errorf("'%s' can't be set", target)
  74. return false
  75. }
  76. if iter.Kind() != reflect.String {
  77. log.Errorf("Expected string, got %v when handling '%s'", iter.Kind(), target)
  78. return false
  79. }
  80. iter.Set(reflect.ValueOf(value))
  81. return true
  82. }
  83. func printStaticTarget(static ExtraField) string {
  84. switch {
  85. case static.Method != "":
  86. return static.Method
  87. case static.Parsed != "":
  88. return fmt.Sprintf(".Parsed[%s]", static.Parsed)
  89. case static.Meta != "":
  90. return fmt.Sprintf(".Meta[%s]", static.Meta)
  91. case static.Enriched != "":
  92. return fmt.Sprintf(".Enriched[%s]", static.Enriched)
  93. case static.TargetByName != "":
  94. return static.TargetByName
  95. default:
  96. return "?"
  97. }
  98. }
  99. func (n *Node) ProcessStatics(statics []ExtraField, event *types.Event) error {
  100. //we have a few cases :
  101. //(meta||key) + (static||reference||expr)
  102. var value string
  103. clog := n.Logger
  104. for _, static := range statics {
  105. value = ""
  106. if static.Value != "" {
  107. value = static.Value
  108. } else if static.RunTimeValue != nil {
  109. output, err := expr.Run(static.RunTimeValue, map[string]interface{}{"evt": event})
  110. if err != nil {
  111. clog.Warningf("failed to run RunTimeValue : %v", err)
  112. continue
  113. }
  114. switch out := output.(type) {
  115. case string:
  116. value = out
  117. case int:
  118. value = strconv.Itoa(out)
  119. case map[string]interface{}:
  120. clog.Warnf("Expression '%s' returned a map, please use ToJsonString() to convert it to string if you want to keep it as is, or refine your expression to extract a string", static.ExpValue)
  121. case []interface{}:
  122. clog.Warnf("Expression '%s' returned an array, please use ToJsonString() to convert it to string if you want to keep it as is, or refine your expression to extract a string", static.ExpValue)
  123. case nil:
  124. clog.Debugf("Expression '%s' returned nil, skipping", static.ExpValue)
  125. default:
  126. clog.Errorf("unexpected return type for RunTimeValue : %T", output)
  127. return errors.New("unexpected return type for RunTimeValue")
  128. }
  129. }
  130. if value == "" {
  131. //allow ParseDate to have empty input
  132. if static.Method != "ParseDate" {
  133. clog.Debugf("Empty value for %s, skip.", printStaticTarget(static))
  134. continue
  135. }
  136. }
  137. if static.Method != "" {
  138. processed := false
  139. /*still way too hackish, but : inject all the results in enriched, and */
  140. if enricherPlugin, ok := n.EnrichFunctions.Registered[static.Method]; ok {
  141. clog.Tracef("Found method '%s'", static.Method)
  142. ret, err := enricherPlugin.EnrichFunc(value, event, enricherPlugin.Ctx, n.Logger)
  143. if err != nil {
  144. clog.Errorf("method '%s' returned an error : %v", static.Method, err)
  145. }
  146. processed = true
  147. clog.Debugf("+ Method %s('%s') returned %d entries to merge in .Enriched\n", static.Method, value, len(ret))
  148. //Hackish check, but those methods do not return any data by design
  149. if len(ret) == 0 && static.Method != "UnmarshalJSON" {
  150. clog.Debugf("+ Method '%s' empty response on '%s'", static.Method, value)
  151. }
  152. for k, v := range ret {
  153. clog.Debugf("\t.Enriched[%s] = '%s'\n", k, v)
  154. event.Enriched[k] = v
  155. }
  156. } else {
  157. clog.Debugf("method '%s' doesn't exist or plugin not initialized", static.Method)
  158. }
  159. if !processed {
  160. clog.Debugf("method '%s' doesn't exist", static.Method)
  161. }
  162. } else if static.Parsed != "" {
  163. clog.Debugf(".Parsed[%s] = '%s'", static.Parsed, value)
  164. event.Parsed[static.Parsed] = value
  165. } else if static.Meta != "" {
  166. clog.Debugf(".Meta[%s] = '%s'", static.Meta, value)
  167. event.Meta[static.Meta] = value
  168. } else if static.Enriched != "" {
  169. clog.Debugf(".Enriched[%s] = '%s'", static.Enriched, value)
  170. event.Enriched[static.Enriched] = value
  171. } else if static.TargetByName != "" {
  172. if !SetTargetByName(static.TargetByName, value, event) {
  173. clog.Errorf("Unable to set value of '%s'", static.TargetByName)
  174. } else {
  175. clog.Debugf("%s = '%s'", static.TargetByName, value)
  176. }
  177. } else {
  178. clog.Fatal("unable to process static : unknown target")
  179. }
  180. }
  181. return nil
  182. }
  183. var NodesHits = prometheus.NewCounterVec(
  184. prometheus.CounterOpts{
  185. Name: "cs_node_hits_total",
  186. Help: "Total events entered node.",
  187. },
  188. []string{"source", "type", "name"},
  189. )
  190. var NodesHitsOk = prometheus.NewCounterVec(
  191. prometheus.CounterOpts{
  192. Name: "cs_node_hits_ok_total",
  193. Help: "Total events successfully exited node.",
  194. },
  195. []string{"source", "type", "name"},
  196. )
  197. var NodesHitsKo = prometheus.NewCounterVec(
  198. prometheus.CounterOpts{
  199. Name: "cs_node_hits_ko_total",
  200. Help: "Total events unsuccessfully exited node.",
  201. },
  202. []string{"source", "type", "name"},
  203. )
  204. func stageidx(stage string, stages []string) int {
  205. for i, v := range stages {
  206. if stage == v {
  207. return i
  208. }
  209. }
  210. return -1
  211. }
  212. type ParserResult struct {
  213. Evt types.Event
  214. Success bool
  215. }
  216. var ParseDump bool
  217. var DumpFolder string
  218. var StageParseCache map[string]map[string][]ParserResult
  219. var StageParseMutex sync.Mutex
  220. func Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error) {
  221. var event = xp
  222. /* the stage is undefined, probably line is freshly acquired, set to first stage !*/
  223. if event.Stage == "" && len(ctx.Stages) > 0 {
  224. event.Stage = ctx.Stages[0]
  225. log.Tracef("no stage, set to : %s", event.Stage)
  226. }
  227. event.Process = false
  228. if event.Time.IsZero() {
  229. event.Time = time.Now().UTC()
  230. }
  231. if event.Parsed == nil {
  232. event.Parsed = make(map[string]string)
  233. }
  234. if event.Enriched == nil {
  235. event.Enriched = make(map[string]string)
  236. }
  237. if event.Meta == nil {
  238. event.Meta = make(map[string]string)
  239. }
  240. if event.Unmarshaled == nil {
  241. event.Unmarshaled = make(map[string]interface{})
  242. }
  243. if event.Type == types.LOG {
  244. log.Tracef("INPUT '%s'", event.Line.Raw)
  245. }
  246. if ParseDump {
  247. if StageParseCache == nil {
  248. StageParseMutex.Lock()
  249. StageParseCache = make(map[string]map[string][]ParserResult)
  250. StageParseCache["success"] = make(map[string][]ParserResult)
  251. StageParseCache["success"][""] = make([]ParserResult, 0)
  252. StageParseMutex.Unlock()
  253. }
  254. }
  255. for _, stage := range ctx.Stages {
  256. if ParseDump {
  257. StageParseMutex.Lock()
  258. if _, ok := StageParseCache[stage]; !ok {
  259. StageParseCache[stage] = make(map[string][]ParserResult)
  260. }
  261. StageParseMutex.Unlock()
  262. }
  263. /* if the node is forward in stages, seek to this stage */
  264. /* this is for example used by testing system to inject logs in post-syslog-parsing phase*/
  265. if stageidx(event.Stage, ctx.Stages) > stageidx(stage, ctx.Stages) {
  266. log.Tracef("skipping stage, we are already at [%s] expecting [%s]", event.Stage, stage)
  267. continue
  268. }
  269. log.Tracef("node stage : %s, current stage : %s", event.Stage, stage)
  270. /* if the stage is wrong, it means that the log didn't manage "pass" a stage with a onsuccess: next_stage tag */
  271. if event.Stage != stage {
  272. log.Debugf("Event not parsed, expected stage '%s' got '%s', abort", stage, event.Stage)
  273. event.Process = false
  274. return event, nil
  275. }
  276. isStageOK := false
  277. for idx, node := range nodes {
  278. //Only process current stage's nodes
  279. if event.Stage != node.Stage {
  280. continue
  281. }
  282. clog := log.WithFields(log.Fields{
  283. "node-name": node.rn,
  284. "stage": event.Stage,
  285. })
  286. clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn)
  287. if ctx.Profiling {
  288. node.Profiling = true
  289. }
  290. ret, err := node.process(&event, ctx, map[string]interface{}{"evt": &event})
  291. if err != nil {
  292. clog.Errorf("Error while processing node : %v", err)
  293. return event, err
  294. }
  295. clog.Tracef("node (%s) ret : %v", node.rn, ret)
  296. if ParseDump {
  297. StageParseMutex.Lock()
  298. if len(StageParseCache[stage][node.Name]) == 0 {
  299. StageParseCache[stage][node.Name] = make([]ParserResult, 0)
  300. }
  301. StageParseMutex.Unlock()
  302. evtcopy := deepcopy.Copy(event)
  303. parserInfo := ParserResult{Evt: evtcopy.(types.Event), Success: ret}
  304. StageParseMutex.Lock()
  305. StageParseCache[stage][node.Name] = append(StageParseCache[stage][node.Name], parserInfo)
  306. StageParseMutex.Unlock()
  307. }
  308. if ret {
  309. isStageOK = true
  310. }
  311. if ret && node.OnSuccess == "next_stage" {
  312. clog.Debugf("node successful, stop end stage %s", stage)
  313. break
  314. }
  315. //the parsed object moved onto the next phase
  316. if event.Stage != stage {
  317. clog.Tracef("node moved stage, break and redo")
  318. break
  319. }
  320. }
  321. if !isStageOK {
  322. log.Debugf("Log didn't finish stage %s", event.Stage)
  323. event.Process = false
  324. return event, nil
  325. }
  326. }
  327. event.Process = true
  328. return event, nil
  329. }