metrics.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/crowdsecurity/crowdsec/pkg/types"
  12. log "github.com/sirupsen/logrus"
  13. "gopkg.in/yaml.v2"
  14. "github.com/olekukonko/tablewriter"
  15. dto "github.com/prometheus/client_model/go"
  16. "github.com/prometheus/prom2json"
  17. "github.com/spf13/cobra"
  18. )
  19. func lapiMetricsToTable(table *tablewriter.Table, stats map[string]map[string]map[string]int) error {
  20. //stats : machine -> route -> method -> count
  21. /*we want consistent display order*/
  22. machineKeys := []string{}
  23. for k := range stats {
  24. machineKeys = append(machineKeys, k)
  25. }
  26. sort.Strings(machineKeys)
  27. for _, machine := range machineKeys {
  28. //oneRow : route -> method -> count
  29. machineRow := stats[machine]
  30. for routeName, route := range machineRow {
  31. for methodName, count := range route {
  32. row := []string{}
  33. row = append(row, machine)
  34. row = append(row, routeName)
  35. row = append(row, methodName)
  36. if count != 0 {
  37. row = append(row, fmt.Sprintf("%d", count))
  38. } else {
  39. row = append(row, "-")
  40. }
  41. table.Append(row)
  42. }
  43. }
  44. }
  45. return nil
  46. }
  47. func metricsToTable(table *tablewriter.Table, stats map[string]map[string]int, keys []string) error {
  48. var sortedKeys []string
  49. if table == nil {
  50. return fmt.Errorf("nil table")
  51. }
  52. //sort keys to keep consistent order when printing
  53. sortedKeys = []string{}
  54. for akey := range stats {
  55. sortedKeys = append(sortedKeys, akey)
  56. }
  57. sort.Strings(sortedKeys)
  58. //
  59. for _, alabel := range sortedKeys {
  60. astats, ok := stats[alabel]
  61. if !ok {
  62. continue
  63. }
  64. row := []string{}
  65. row = append(row, alabel) //name
  66. for _, sl := range keys {
  67. if v, ok := astats[sl]; ok && v != 0 {
  68. numberToShow := fmt.Sprintf("%d", v)
  69. if !noUnit {
  70. numberToShow = formatNumber(v)
  71. }
  72. row = append(row, numberToShow)
  73. } else {
  74. row = append(row, "-")
  75. }
  76. }
  77. table.Append(row)
  78. }
  79. return nil
  80. }
  81. /*This is a complete rip from prom2json*/
  82. func ShowPrometheus(url string) {
  83. mfChan := make(chan *dto.MetricFamily, 1024)
  84. // Start with the DefaultTransport for sane defaults.
  85. transport := http.DefaultTransport.(*http.Transport).Clone()
  86. // Conservatively disable HTTP keep-alives as this program will only
  87. // ever need a single HTTP request.
  88. transport.DisableKeepAlives = true
  89. // Timeout early if the server doesn't even return the headers.
  90. transport.ResponseHeaderTimeout = time.Minute
  91. go func() {
  92. defer types.CatchPanic("crowdsec/ShowPrometheus")
  93. err := prom2json.FetchMetricFamilies(url, mfChan, transport)
  94. if err != nil {
  95. log.Fatalf("failed to fetch prometheus metrics : %v", err)
  96. }
  97. }()
  98. result := []*prom2json.Family{}
  99. for mf := range mfChan {
  100. result = append(result, prom2json.NewFamily(mf))
  101. }
  102. log.Debugf("Finished reading prometheus output, %d entries", len(result))
  103. /*walk*/
  104. lapi_decisions_stats := map[string]struct {
  105. NonEmpty int
  106. Empty int
  107. }{}
  108. acquis_stats := map[string]map[string]int{}
  109. parsers_stats := map[string]map[string]int{}
  110. buckets_stats := map[string]map[string]int{}
  111. lapi_stats := map[string]map[string]int{}
  112. lapi_machine_stats := map[string]map[string]map[string]int{}
  113. lapi_bouncer_stats := map[string]map[string]map[string]int{}
  114. for idx, fam := range result {
  115. if !strings.HasPrefix(fam.Name, "cs_") {
  116. continue
  117. }
  118. log.Tracef("round %d", idx)
  119. for _, m := range fam.Metrics {
  120. metric := m.(prom2json.Metric)
  121. name, ok := metric.Labels["name"]
  122. if !ok {
  123. log.Debugf("no name in Metric %v", metric.Labels)
  124. }
  125. source, ok := metric.Labels["source"]
  126. if !ok {
  127. log.Debugf("no source in Metric %v for %s", metric.Labels, fam.Name)
  128. } else {
  129. if srctype, ok := metric.Labels["type"]; ok {
  130. source = srctype + ":" + source
  131. }
  132. }
  133. value := m.(prom2json.Metric).Value
  134. machine := metric.Labels["machine"]
  135. bouncer := metric.Labels["bouncer"]
  136. route := metric.Labels["route"]
  137. method := metric.Labels["method"]
  138. fval, err := strconv.ParseFloat(value, 32)
  139. if err != nil {
  140. log.Errorf("Unexpected int value %s : %s", value, err)
  141. }
  142. ival := int(fval)
  143. switch fam.Name {
  144. /*buckets*/
  145. case "cs_bucket_created_total":
  146. if _, ok := buckets_stats[name]; !ok {
  147. buckets_stats[name] = make(map[string]int)
  148. }
  149. buckets_stats[name]["instanciation"] += ival
  150. case "cs_buckets":
  151. if _, ok := buckets_stats[name]; !ok {
  152. buckets_stats[name] = make(map[string]int)
  153. }
  154. buckets_stats[name]["curr_count"] += ival
  155. case "cs_bucket_overflowed_total":
  156. if _, ok := buckets_stats[name]; !ok {
  157. buckets_stats[name] = make(map[string]int)
  158. }
  159. buckets_stats[name]["overflow"] += ival
  160. case "cs_bucket_poured_total":
  161. if _, ok := buckets_stats[name]; !ok {
  162. buckets_stats[name] = make(map[string]int)
  163. }
  164. if _, ok := acquis_stats[source]; !ok {
  165. acquis_stats[source] = make(map[string]int)
  166. }
  167. buckets_stats[name]["pour"] += ival
  168. acquis_stats[source]["pour"] += ival
  169. case "cs_bucket_underflowed_total":
  170. if _, ok := buckets_stats[name]; !ok {
  171. buckets_stats[name] = make(map[string]int)
  172. }
  173. buckets_stats[name]["underflow"] += ival
  174. /*acquis*/
  175. case "cs_parser_hits_total":
  176. if _, ok := acquis_stats[source]; !ok {
  177. acquis_stats[source] = make(map[string]int)
  178. }
  179. acquis_stats[source]["reads"] += ival
  180. case "cs_parser_hits_ok_total":
  181. if _, ok := acquis_stats[source]; !ok {
  182. acquis_stats[source] = make(map[string]int)
  183. }
  184. acquis_stats[source]["parsed"] += ival
  185. case "cs_parser_hits_ko_total":
  186. if _, ok := acquis_stats[source]; !ok {
  187. acquis_stats[source] = make(map[string]int)
  188. }
  189. acquis_stats[source]["unparsed"] += ival
  190. case "cs_node_hits_total":
  191. if _, ok := parsers_stats[name]; !ok {
  192. parsers_stats[name] = make(map[string]int)
  193. }
  194. parsers_stats[name]["hits"] += ival
  195. case "cs_node_hits_ok_total":
  196. if _, ok := parsers_stats[name]; !ok {
  197. parsers_stats[name] = make(map[string]int)
  198. }
  199. parsers_stats[name]["parsed"] += ival
  200. case "cs_node_hits_ko_total":
  201. if _, ok := parsers_stats[name]; !ok {
  202. parsers_stats[name] = make(map[string]int)
  203. }
  204. parsers_stats[name]["unparsed"] += ival
  205. case "cs_lapi_route_requests_total":
  206. if _, ok := lapi_stats[route]; !ok {
  207. lapi_stats[route] = make(map[string]int)
  208. }
  209. lapi_stats[route][method] += ival
  210. case "cs_lapi_machine_requests_total":
  211. if _, ok := lapi_machine_stats[machine]; !ok {
  212. lapi_machine_stats[machine] = make(map[string]map[string]int)
  213. }
  214. if _, ok := lapi_machine_stats[machine][route]; !ok {
  215. lapi_machine_stats[machine][route] = make(map[string]int)
  216. }
  217. lapi_machine_stats[machine][route][method] += ival
  218. case "cs_lapi_bouncer_requests_total":
  219. if _, ok := lapi_bouncer_stats[bouncer]; !ok {
  220. lapi_bouncer_stats[bouncer] = make(map[string]map[string]int)
  221. }
  222. if _, ok := lapi_bouncer_stats[bouncer][route]; !ok {
  223. lapi_bouncer_stats[bouncer][route] = make(map[string]int)
  224. }
  225. lapi_bouncer_stats[bouncer][route][method] += ival
  226. case "cs_lapi_decisions_ko_total", "cs_lapi_decisions_ok_total":
  227. if _, ok := lapi_decisions_stats[bouncer]; !ok {
  228. lapi_decisions_stats[bouncer] = struct {
  229. NonEmpty int
  230. Empty int
  231. }{}
  232. }
  233. x := lapi_decisions_stats[bouncer]
  234. if fam.Name == "cs_lapi_decisions_ko_total" {
  235. x.Empty += ival
  236. } else if fam.Name == "cs_lapi_decisions_ok_total" {
  237. x.NonEmpty += ival
  238. }
  239. lapi_decisions_stats[bouncer] = x
  240. default:
  241. continue
  242. }
  243. }
  244. }
  245. if csConfig.Cscli.Output == "human" {
  246. acquisTable := tablewriter.NewWriter(os.Stdout)
  247. acquisTable.SetHeader([]string{"Source", "Lines read", "Lines parsed", "Lines unparsed", "Lines poured to bucket"})
  248. keys := []string{"reads", "parsed", "unparsed", "pour"}
  249. if err := metricsToTable(acquisTable, acquis_stats, keys); err != nil {
  250. log.Warningf("while collecting acquis stats : %s", err)
  251. }
  252. bucketsTable := tablewriter.NewWriter(os.Stdout)
  253. bucketsTable.SetHeader([]string{"Bucket", "Current Count", "Overflows", "Instanciated", "Poured", "Expired"})
  254. keys = []string{"curr_count", "overflow", "instanciation", "pour", "underflow"}
  255. if err := metricsToTable(bucketsTable, buckets_stats, keys); err != nil {
  256. log.Warningf("while collecting acquis stats : %s", err)
  257. }
  258. parsersTable := tablewriter.NewWriter(os.Stdout)
  259. parsersTable.SetHeader([]string{"Parsers", "Hits", "Parsed", "Unparsed"})
  260. keys = []string{"hits", "parsed", "unparsed"}
  261. if err := metricsToTable(parsersTable, parsers_stats, keys); err != nil {
  262. log.Warningf("while collecting acquis stats : %s", err)
  263. }
  264. lapiMachinesTable := tablewriter.NewWriter(os.Stdout)
  265. lapiMachinesTable.SetHeader([]string{"Machine", "Route", "Method", "Hits"})
  266. if err := lapiMetricsToTable(lapiMachinesTable, lapi_machine_stats); err != nil {
  267. log.Warningf("while collecting machine lapi stats : %s", err)
  268. }
  269. //lapiMetricsToTable
  270. lapiBouncersTable := tablewriter.NewWriter(os.Stdout)
  271. lapiBouncersTable.SetHeader([]string{"Bouncer", "Route", "Method", "Hits"})
  272. if err := lapiMetricsToTable(lapiBouncersTable, lapi_bouncer_stats); err != nil {
  273. log.Warningf("while collecting bouncer lapi stats : %s", err)
  274. }
  275. lapiDecisionsTable := tablewriter.NewWriter(os.Stdout)
  276. lapiDecisionsTable.SetHeader([]string{"Bouncer", "Empty answers", "Non-empty answers"})
  277. for bouncer, hits := range lapi_decisions_stats {
  278. row := []string{}
  279. row = append(row, bouncer)
  280. row = append(row, fmt.Sprintf("%d", hits.Empty))
  281. row = append(row, fmt.Sprintf("%d", hits.NonEmpty))
  282. lapiDecisionsTable.Append(row)
  283. }
  284. /*unfortunately, we can't reuse metricsToTable as the structure is too different :/*/
  285. lapiTable := tablewriter.NewWriter(os.Stdout)
  286. lapiTable.SetHeader([]string{"Route", "Method", "Hits"})
  287. sortedKeys := []string{}
  288. for akey := range lapi_stats {
  289. sortedKeys = append(sortedKeys, akey)
  290. }
  291. sort.Strings(sortedKeys)
  292. for _, alabel := range sortedKeys {
  293. astats := lapi_stats[alabel]
  294. subKeys := []string{}
  295. for skey := range astats {
  296. subKeys = append(subKeys, skey)
  297. }
  298. sort.Strings(subKeys)
  299. for _, sl := range subKeys {
  300. row := []string{}
  301. row = append(row, alabel)
  302. row = append(row, sl)
  303. row = append(row, fmt.Sprintf("%d", astats[sl]))
  304. lapiTable.Append(row)
  305. }
  306. }
  307. if bucketsTable.NumLines() > 0 {
  308. log.Printf("Buckets Metrics:")
  309. bucketsTable.SetAlignment(tablewriter.ALIGN_LEFT)
  310. bucketsTable.Render()
  311. }
  312. if acquisTable.NumLines() > 0 {
  313. log.Printf("Acquisition Metrics:")
  314. acquisTable.SetAlignment(tablewriter.ALIGN_LEFT)
  315. acquisTable.Render()
  316. }
  317. if parsersTable.NumLines() > 0 {
  318. log.Printf("Parser Metrics:")
  319. parsersTable.SetAlignment(tablewriter.ALIGN_LEFT)
  320. parsersTable.Render()
  321. }
  322. if lapiTable.NumLines() > 0 {
  323. log.Printf("Local Api Metrics:")
  324. lapiTable.SetAlignment(tablewriter.ALIGN_LEFT)
  325. lapiTable.Render()
  326. }
  327. if lapiMachinesTable.NumLines() > 0 {
  328. log.Printf("Local Api Machines Metrics:")
  329. lapiMachinesTable.SetAlignment(tablewriter.ALIGN_LEFT)
  330. lapiMachinesTable.Render()
  331. }
  332. if lapiBouncersTable.NumLines() > 0 {
  333. log.Printf("Local Api Bouncers Metrics:")
  334. lapiBouncersTable.SetAlignment(tablewriter.ALIGN_LEFT)
  335. lapiBouncersTable.Render()
  336. }
  337. if lapiDecisionsTable.NumLines() > 0 {
  338. log.Printf("Local Api Bouncers Decisions:")
  339. lapiDecisionsTable.SetAlignment(tablewriter.ALIGN_LEFT)
  340. lapiDecisionsTable.Render()
  341. }
  342. } else if csConfig.Cscli.Output == "json" {
  343. for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} {
  344. x, err := json.MarshalIndent(val, "", " ")
  345. if err != nil {
  346. log.Fatalf("failed to unmarshal metrics : %v", err)
  347. }
  348. fmt.Printf("%s\n", string(x))
  349. }
  350. } else if csConfig.Cscli.Output == "raw" {
  351. for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} {
  352. x, err := yaml.Marshal(val)
  353. if err != nil {
  354. log.Fatalf("failed to unmarshal metrics : %v", err)
  355. }
  356. fmt.Printf("%s\n", string(x))
  357. }
  358. }
  359. }
  360. var noUnit bool
  361. func NewMetricsCmd() *cobra.Command {
  362. /* ---- UPDATE COMMAND */
  363. var cmdMetrics = &cobra.Command{
  364. Use: "metrics",
  365. Short: "Display crowdsec prometheus metrics.",
  366. Long: `Fetch metrics from the prometheus server and display them in a human-friendly way`,
  367. Args: cobra.ExactArgs(0),
  368. DisableAutoGenTag: true,
  369. Run: func(cmd *cobra.Command, args []string) {
  370. if err := csConfig.LoadPrometheus(); err != nil {
  371. log.Fatalf(err.Error())
  372. }
  373. if !csConfig.Prometheus.Enabled {
  374. log.Warningf("Prometheus is not enabled, can't show metrics")
  375. os.Exit(1)
  376. }
  377. if prometheusURL == "" {
  378. prometheusURL = csConfig.Cscli.PrometheusUrl
  379. }
  380. if prometheusURL == "" {
  381. log.Errorf("No prometheus url, please specify in %s or via -u", *csConfig.FilePath)
  382. os.Exit(1)
  383. }
  384. ShowPrometheus(prometheusURL + "/metrics")
  385. },
  386. }
  387. cmdMetrics.PersistentFlags().StringVarP(&prometheusURL, "url", "u", "", "Prometheus url (http://<ip>:<port>/metrics)")
  388. cmdMetrics.PersistentFlags().BoolVar(&noUnit, "no-unit", false, "Show the real number instead of formatted with units")
  389. return cmdMetrics
  390. }