metrics.go 6.7 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. log "github.com/sirupsen/logrus"
  12. "gopkg.in/yaml.v2"
  13. "github.com/olekukonko/tablewriter"
  14. dto "github.com/prometheus/client_model/go"
  15. "github.com/prometheus/prom2json"
  16. "github.com/spf13/cobra"
  17. )
  18. func metricsToTable(table *tablewriter.Table, stats map[string]map[string]int, keys []string) error {
  19. var sortedKeys []string
  20. if table == nil {
  21. return fmt.Errorf("nil table")
  22. }
  23. //sort keys to keep consistent order when printing
  24. sortedKeys = []string{}
  25. for akey := range stats {
  26. sortedKeys = append(sortedKeys, akey)
  27. }
  28. sort.Strings(sortedKeys)
  29. //
  30. for _, alabel := range sortedKeys {
  31. astats, ok := stats[alabel]
  32. if !ok {
  33. continue
  34. }
  35. row := []string{}
  36. row = append(row, alabel) //name
  37. for _, sl := range keys {
  38. if v, ok := astats[sl]; ok && v != 0 {
  39. row = append(row, fmt.Sprintf("%d", v))
  40. } else {
  41. row = append(row, "-")
  42. }
  43. }
  44. table.Append(row)
  45. }
  46. return nil
  47. }
  48. /*This is a complete rip from prom2json*/
  49. func ShowPrometheus(url string) {
  50. mfChan := make(chan *dto.MetricFamily, 1024)
  51. // Start with the DefaultTransport for sane defaults.
  52. transport := http.DefaultTransport.(*http.Transport).Clone()
  53. // Conservatively disable HTTP keep-alives as this program will only
  54. // ever need a single HTTP request.
  55. transport.DisableKeepAlives = true
  56. // Timeout early if the server doesn't even return the headers.
  57. transport.ResponseHeaderTimeout = time.Minute
  58. go func() {
  59. err := prom2json.FetchMetricFamilies(url, mfChan, transport)
  60. if err != nil {
  61. log.Fatalf("failed to fetch prometheus metrics : %v", err)
  62. }
  63. }()
  64. result := []*prom2json.Family{}
  65. for mf := range mfChan {
  66. result = append(result, prom2json.NewFamily(mf))
  67. }
  68. log.Debugf("Finished reading prometheus output, %d entries", len(result))
  69. /*walk*/
  70. acquis_stats := map[string]map[string]int{}
  71. parsers_stats := map[string]map[string]int{}
  72. buckets_stats := map[string]map[string]int{}
  73. for idx, fam := range result {
  74. if !strings.HasPrefix(fam.Name, "cs_") {
  75. continue
  76. }
  77. log.Debugf("round %d", idx)
  78. for _, m := range fam.Metrics {
  79. metric := m.(prom2json.Metric)
  80. name, ok := metric.Labels["name"]
  81. if !ok {
  82. log.Debugf("no name in Metric %v", metric.Labels)
  83. }
  84. source, ok := metric.Labels["source"]
  85. if !ok {
  86. log.Debugf("no source in Metric %v", metric.Labels)
  87. }
  88. value := m.(prom2json.Metric).Value
  89. fval, err := strconv.ParseFloat(value, 32)
  90. if err != nil {
  91. log.Errorf("Unexpected int value %s : %s", value, err)
  92. }
  93. ival := int(fval)
  94. switch fam.Name {
  95. /*buckets*/
  96. case "cs_bucket_create":
  97. if _, ok := buckets_stats[name]; !ok {
  98. buckets_stats[name] = make(map[string]int)
  99. }
  100. buckets_stats[name]["instanciation"] += ival
  101. case "cs_bucket_count":
  102. if _, ok := buckets_stats[name]; !ok {
  103. buckets_stats[name] = make(map[string]int)
  104. }
  105. buckets_stats[name]["curr_count"] += ival
  106. case "cs_bucket_overflow":
  107. if _, ok := buckets_stats[name]; !ok {
  108. buckets_stats[name] = make(map[string]int)
  109. }
  110. buckets_stats[name]["overflow"] += ival
  111. case "cs_bucket_pour":
  112. if _, ok := buckets_stats[name]; !ok {
  113. buckets_stats[name] = make(map[string]int)
  114. }
  115. if _, ok := acquis_stats[source]; !ok {
  116. acquis_stats[source] = make(map[string]int)
  117. }
  118. buckets_stats[name]["pour"] += ival
  119. acquis_stats[source]["pour"] += ival
  120. case "cs_bucket_underflow":
  121. if _, ok := buckets_stats[name]; !ok {
  122. buckets_stats[name] = make(map[string]int)
  123. }
  124. buckets_stats[name]["underflow"] += ival
  125. /*acquis*/
  126. case "cs_reader_hits":
  127. if _, ok := acquis_stats[source]; !ok {
  128. acquis_stats[source] = make(map[string]int)
  129. }
  130. acquis_stats[source]["reads"] += ival
  131. case "cs_parser_hits_ok":
  132. if _, ok := acquis_stats[source]; !ok {
  133. acquis_stats[source] = make(map[string]int)
  134. }
  135. acquis_stats[source]["parsed"] += ival
  136. case "cs_parser_hits_ko":
  137. if _, ok := acquis_stats[source]; !ok {
  138. acquis_stats[source] = make(map[string]int)
  139. }
  140. acquis_stats[source]["unparsed"] += ival
  141. case "cs_node_hits":
  142. if _, ok := parsers_stats[name]; !ok {
  143. parsers_stats[name] = make(map[string]int)
  144. }
  145. parsers_stats[name]["hits"] += ival
  146. case "cs_node_hits_ok":
  147. if _, ok := parsers_stats[name]; !ok {
  148. parsers_stats[name] = make(map[string]int)
  149. }
  150. parsers_stats[name]["parsed"] += ival
  151. default:
  152. continue
  153. }
  154. }
  155. }
  156. if config.output == "human" {
  157. acquisTable := tablewriter.NewWriter(os.Stdout)
  158. acquisTable.SetHeader([]string{"Source", "Lines read", "Lines parsed", "Lines unparsed", "Lines poured to bucket"})
  159. keys := []string{"reads", "parsed", "unparsed", "pour"}
  160. if err := metricsToTable(acquisTable, acquis_stats, keys); err != nil {
  161. log.Warningf("while collecting acquis stats : %s", err)
  162. }
  163. bucketsTable := tablewriter.NewWriter(os.Stdout)
  164. bucketsTable.SetHeader([]string{"Bucket", "Current Count", "Overflows", "Instanciated", "Poured", "Expired"})
  165. keys = []string{"curr_count", "overflow", "instanciation", "pour", "underflow"}
  166. if err := metricsToTable(bucketsTable, buckets_stats, keys); err != nil {
  167. log.Warningf("while collecting acquis stats : %s", err)
  168. }
  169. parsersTable := tablewriter.NewWriter(os.Stdout)
  170. parsersTable.SetHeader([]string{"Parsers", "Hits", "Parsed", "Unparsed"})
  171. keys = []string{"hits", "parsed", "unparsed"}
  172. if err := metricsToTable(parsersTable, parsers_stats, keys); err != nil {
  173. log.Warningf("while collecting acquis stats : %s", err)
  174. }
  175. log.Printf("Buckets Metrics:")
  176. bucketsTable.Render()
  177. log.Printf("Acquisition Metrics:")
  178. acquisTable.Render()
  179. log.Printf("Parser Metrics:")
  180. parsersTable.Render()
  181. } else if config.output == "json" {
  182. for _, val := range []map[string]map[string]int{acquis_stats, parsers_stats, buckets_stats} {
  183. x, err := json.MarshalIndent(val, "", " ")
  184. if err != nil {
  185. log.Fatalf("failed to unmarshal metrics : %v", err)
  186. }
  187. fmt.Printf("%s\n", string(x))
  188. }
  189. } else if config.output == "raw" {
  190. for _, val := range []map[string]map[string]int{acquis_stats, parsers_stats, buckets_stats} {
  191. x, err := yaml.Marshal(val)
  192. if err != nil {
  193. log.Fatalf("failed to unmarshal metrics : %v", err)
  194. }
  195. fmt.Printf("%s\n", string(x))
  196. }
  197. }
  198. }
  199. var purl string
  200. func NewMetricsCmd() *cobra.Command {
  201. /* ---- UPDATE COMMAND */
  202. var cmdMetrics = &cobra.Command{
  203. Use: "metrics",
  204. Short: "Display crowdsec prometheus metrics.",
  205. Long: `Fetch metrics from the prometheus server and display them in a human-friendly way`,
  206. Args: cobra.ExactArgs(0),
  207. Run: func(cmd *cobra.Command, args []string) {
  208. ShowPrometheus(purl)
  209. },
  210. }
  211. cmdMetrics.PersistentFlags().StringVarP(&purl, "url", "u", "http://127.0.0.1:6060/metrics", "Prometheus url")
  212. return cmdMetrics
  213. }