main.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. "io/ioutil"
  6. _ "net/http/pprof"
  7. "time"
  8. "github.com/crowdsecurity/crowdsec/pkg/acquisition"
  9. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  10. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  11. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  12. "github.com/crowdsecurity/crowdsec/pkg/outputs"
  13. "github.com/crowdsecurity/crowdsec/pkg/parser"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. log "github.com/sirupsen/logrus"
  16. "gopkg.in/natefinch/lumberjack.v2"
  17. "gopkg.in/tomb.v2"
  18. "gopkg.in/yaml.v2"
  19. )
  20. var (
  21. /*tombs for the parser, buckets and outputs.*/
  22. acquisTomb tomb.Tomb
  23. parsersTomb tomb.Tomb
  24. bucketsTomb tomb.Tomb
  25. outputsTomb tomb.Tomb
  26. holders []leaky.BucketFactory
  27. buckets *leaky.Buckets
  28. cConfig *csconfig.CrowdSec
  29. /*settings*/
  30. lastProcessedItem time.Time /*keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them.*/
  31. )
  32. func configureLogger(logMode string, logFolder string, logLevel log.Level) error {
  33. /*Configure logs*/
  34. if logMode == "file" {
  35. log.SetOutput(&lumberjack.Logger{
  36. Filename: logFolder + "/crowdsec.log",
  37. MaxSize: 500, //megabytes
  38. MaxBackups: 3,
  39. MaxAge: 28, //days
  40. Compress: true, //disabled by default
  41. })
  42. log.SetFormatter(&log.TextFormatter{TimestampFormat: "02-01-2006 15:04:05", FullTimestamp: true})
  43. } else if logMode != "stdout" {
  44. return fmt.Errorf("log mode '%s' unknown", logMode)
  45. }
  46. log.Printf("setting loglevel to %s", logLevel)
  47. log.SetLevel(logLevel)
  48. log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
  49. if logLevel >= log.InfoLevel {
  50. log.SetFormatter(&log.TextFormatter{TimestampFormat: "02-01-2006 15:04:05", FullTimestamp: true})
  51. }
  52. if logLevel >= log.DebugLevel {
  53. log.SetReportCaller(true)
  54. }
  55. return nil
  56. }
  57. func main() {
  58. var (
  59. err error
  60. p parser.UnixParser
  61. parserNodes []parser.Node = make([]parser.Node, 0)
  62. postOverflowNodes []parser.Node = make([]parser.Node, 0)
  63. nbParser int = 1
  64. parserCTX *parser.UnixParserCtx
  65. postOverflowCTX *parser.UnixParserCtx
  66. acquisitionCTX *acquisition.FileAcquisCtx
  67. CustomParsers []parser.Stagefile
  68. CustomPostoverflows []parser.Stagefile
  69. CustomScenarios []parser.Stagefile
  70. outputEventChan chan types.Event
  71. )
  72. inputLineChan := make(chan types.Event)
  73. inputEventChan := make(chan types.Event)
  74. cConfig = csconfig.NewCrowdSecConfig()
  75. // Handle command line arguments
  76. if err := cConfig.GetOPT(); err != nil {
  77. log.Fatalf(err.Error())
  78. }
  79. if err = configureLogger(cConfig.LogMode, cConfig.LogFolder, cConfig.LogLevel); err != nil {
  80. log.Fatal(err.Error())
  81. }
  82. log.Infof("Crowdwatch %s", cwversion.VersionStr())
  83. if cConfig.Prometheus {
  84. registerPrometheus()
  85. cConfig.Profiling = true
  86. }
  87. log.Infof("Loading grok library")
  88. /* load base regexps for two grok parsers */
  89. parserCTX, err = p.Init(map[string]interface{}{"patterns": cConfig.ConfigFolder + string("/patterns/"), "data": cConfig.DataFolder})
  90. if err != nil {
  91. log.Errorf("failed to initialize parser : %v", err)
  92. return
  93. }
  94. postOverflowCTX, err = p.Init(map[string]interface{}{"patterns": cConfig.ConfigFolder + string("/patterns/"), "data": cConfig.DataFolder})
  95. if err != nil {
  96. log.Errorf("failed to initialize postoverflow : %v", err)
  97. return
  98. }
  99. /*enable profiling*/
  100. if cConfig.Profiling {
  101. go runTachymeter(cConfig.HTTPListen)
  102. parserCTX.Profiling = true
  103. postOverflowCTX.Profiling = true
  104. }
  105. /*
  106. Load enrichers
  107. */
  108. log.Infof("Loading enrich plugins")
  109. parserPlugins, err := parser.Loadplugin(cConfig.DataFolder)
  110. if err != nil {
  111. log.Errorf("Failed to load plugin geoip : %v", err)
  112. }
  113. parser.ECTX = append(parser.ECTX, parserPlugins)
  114. /*parser the validatormode option if present. mostly used for testing purposes*/
  115. if cConfig.ValidatorMode != "" {
  116. //beurk : provided 'parser:file.yaml,postoverflow:file.yaml,scenario:file.yaml load only those
  117. validators := strings.Split(cConfig.ValidatorMode, ",")
  118. for _, val := range validators {
  119. splittedValidator := strings.Split(val, ":")
  120. if len(splittedValidator) != 2 {
  121. log.Fatalf("parser:file,scenario:file,postoverflow:file")
  122. }
  123. configType := splittedValidator[0]
  124. configFile := splittedValidator[1]
  125. var parsedFile []parser.Stagefile
  126. dataFile, err := ioutil.ReadFile(configFile)
  127. if err != nil {
  128. log.Fatalf("failed opening %s : %s", configFile, err)
  129. }
  130. if err := yaml.UnmarshalStrict(dataFile, &parsedFile); err != nil {
  131. log.Fatalf("failed unmarshalling %s : %s", configFile, err)
  132. }
  133. switch configType {
  134. case "parser":
  135. CustomParsers = parsedFile
  136. case "scenario":
  137. CustomScenarios = parsedFile
  138. case "postoverflow":
  139. CustomPostoverflows = parsedFile
  140. default:
  141. log.Fatalf("wrong type, format is parser:file,scenario:file,postoverflow:file")
  142. }
  143. }
  144. }
  145. /* load the parser nodes */
  146. if cConfig.ValidatorMode != "" && len(CustomParsers) > 0 {
  147. log.Infof("Loading (validatormode) parsers")
  148. parserNodes, err = parser.LoadStages(CustomParsers, parserCTX)
  149. } else {
  150. log.Infof("Loading parsers")
  151. parserNodes, err = parser.LoadStageDir(cConfig.ConfigFolder+"/parsers/", parserCTX)
  152. }
  153. if err != nil {
  154. log.Fatalf("failed to load parser config : %v", err)
  155. }
  156. /* parsers loaded */
  157. /* load the post-overflow stages*/
  158. if cConfig.ValidatorMode != "" && len(CustomPostoverflows) > 0 {
  159. log.Infof("Loading (validatormode) postoverflow parsers")
  160. postOverflowNodes, err = parser.LoadStages(CustomPostoverflows, postOverflowCTX)
  161. } else {
  162. log.Infof("Loading postoverflow parsers")
  163. postOverflowNodes, err = parser.LoadStageDir(cConfig.ConfigFolder+"/postoverflows/", postOverflowCTX)
  164. }
  165. if err != nil {
  166. log.Fatalf("failed to load postoverflow config : %v", err)
  167. }
  168. log.Infof("Loaded Nodes : %d parser, %d postoverflow", len(parserNodes), len(postOverflowNodes))
  169. /* post overflow loaded */
  170. /* Loading buckets / scenarios */
  171. if cConfig.ValidatorMode != "" && len(CustomScenarios) > 0 {
  172. log.Infof("Loading (validatormode) scenarios")
  173. bucketFiles := []string{}
  174. for _, scenarios := range CustomScenarios {
  175. bucketFiles = append(bucketFiles, scenarios.Filename)
  176. }
  177. holders, outputEventChan, err = leaky.LoadBuckets(bucketFiles, cConfig.DataFolder)
  178. } else {
  179. log.Infof("Loading scenarios")
  180. holders, outputEventChan, err = leaky.Init(map[string]string{"patterns": cConfig.ConfigFolder + "/scenarios/", "data": cConfig.DataFolder})
  181. }
  182. if err != nil {
  183. log.Fatalf("Scenario loading failed : %v", err)
  184. }
  185. /* buckets/scenarios loaded */
  186. /*keep track of scenarios name for consensus profiling*/
  187. var scenariosEnabled string
  188. for _, x := range holders {
  189. if scenariosEnabled != "" {
  190. scenariosEnabled += ","
  191. }
  192. scenariosEnabled += x.Name
  193. }
  194. buckets = leaky.NewBuckets()
  195. /*restore as well previous state if present*/
  196. if cConfig.RestoreMode != "" {
  197. log.Warningf("Restoring buckets state from %s", cConfig.RestoreMode)
  198. if err := leaky.LoadBucketsState(cConfig.RestoreMode, buckets, holders); err != nil {
  199. log.Fatalf("unable to restore buckets : %s", err)
  200. }
  201. }
  202. if cConfig.Profiling {
  203. //force the profiling in all buckets
  204. for holderIndex := range holders {
  205. holders[holderIndex].Profiling = true
  206. }
  207. }
  208. /*
  209. Load output profiles
  210. */
  211. log.Infof("Loading output profiles")
  212. outputProfiles, err := outputs.LoadOutputProfiles(cConfig.ConfigFolder + "/profiles.yaml")
  213. if err != nil || len(outputProfiles) == 0 {
  214. log.Fatalf("Failed to load output profiles : %v", err)
  215. }
  216. /* Linting is done */
  217. if cConfig.Linter {
  218. return
  219. }
  220. outputRunner, err := outputs.NewOutput(cConfig.OutputConfig, cConfig.Daemonize)
  221. if err != nil {
  222. log.Fatalf("output plugins initialization error : %s", err.Error())
  223. }
  224. /* Init the API connector */
  225. if cConfig.APIMode {
  226. log.Infof("Loading API client")
  227. var apiConfig = map[string]string{
  228. "path": cConfig.ConfigFolder + "/api.yaml",
  229. "profile": scenariosEnabled,
  230. }
  231. if err := outputRunner.InitAPI(apiConfig); err != nil {
  232. log.Fatalf(err.Error())
  233. }
  234. }
  235. /*if the user is in "single file mode" (might be writting scenario or parsers), allow loading **without** parsers or scenarios */
  236. if cConfig.SingleFile == "" {
  237. if len(parserNodes) == 0 {
  238. log.Fatalf("no parser(s) loaded, abort.")
  239. }
  240. if len(holders) == 0 {
  241. log.Fatalf("no bucket(s) loaded, abort.")
  242. }
  243. if len(outputProfiles) == 0 {
  244. log.Fatalf("no output profile(s) loaded, abort.")
  245. }
  246. }
  247. log.Infof("Starting processing routines")
  248. //start go-routines for parsing, buckets pour and ouputs.
  249. for i := 0; i < nbParser; i++ {
  250. parsersTomb.Go(func() error {
  251. err := runParse(inputLineChan, inputEventChan, *parserCTX, parserNodes)
  252. if err != nil {
  253. log.Errorf("runParse error : %s", err)
  254. return err
  255. }
  256. return nil
  257. })
  258. }
  259. for i := 0; i < nbParser; i++ {
  260. bucketsTomb.Go(func() error {
  261. err := runPour(inputEventChan, holders, buckets)
  262. if err != nil {
  263. log.Errorf("runPour error : %s", err)
  264. return err
  265. }
  266. return nil
  267. })
  268. }
  269. for i := 0; i < nbParser; i++ {
  270. outputsTomb.Go(func() error {
  271. err := runOutput(inputEventChan, outputEventChan, holders, buckets, *postOverflowCTX, postOverflowNodes, outputProfiles, outputRunner)
  272. if err != nil {
  273. log.Errorf("runPour error : %s", err)
  274. return err
  275. }
  276. return nil
  277. })
  278. }
  279. log.Warningf("Starting processing data")
  280. //Init the acqusition : from cli or from acquis.yaml file
  281. acquisitionCTX, err = loadAcquisition()
  282. if err != nil {
  283. log.Fatalf("Failed to start acquisition : %s", err)
  284. }
  285. //start reading in the background
  286. acquisition.AcquisStartReading(acquisitionCTX, inputLineChan, &acquisTomb)
  287. if err = serve(*outputRunner); err != nil {
  288. log.Fatalf(err.Error())
  289. }
  290. }