loki.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package loki
  2. /*
  3. https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail
  4. */
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/pkg/errors"
  13. "github.com/prometheus/client_golang/prometheus"
  14. log "github.com/sirupsen/logrus"
  15. tomb "gopkg.in/tomb.v2"
  16. yaml "gopkg.in/yaml.v2"
  17. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  18. lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
  19. "github.com/crowdsecurity/crowdsec/pkg/types"
  20. )
  21. const (
  22. readyTimeout time.Duration = 3 * time.Second
  23. readyLoop int = 3
  24. readySleep time.Duration = 10 * time.Second
  25. lokiLimit int = 100
  26. )
  27. var linesRead = prometheus.NewCounterVec(
  28. prometheus.CounterOpts{
  29. Name: "cs_lokisource_hits_total",
  30. Help: "Total lines that were read.",
  31. },
  32. []string{"source"})
  33. type LokiAuthConfiguration struct {
  34. Username string `yaml:"username"`
  35. Password string `yaml:"password"`
  36. }
  37. type LokiConfiguration struct {
  38. URL string `yaml:"url"` // Loki url
  39. Prefix string `yaml:"prefix"` // Loki prefix
  40. Query string `yaml:"query"` // LogQL query
  41. Limit int `yaml:"limit"` // Limit of logs to read
  42. DelayFor time.Duration `yaml:"delay_for"`
  43. Since time.Duration `yaml:"since"`
  44. Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
  45. WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
  46. Auth LokiAuthConfiguration `yaml:"auth"`
  47. configuration.DataSourceCommonCfg `yaml:",inline"`
  48. }
  49. type LokiSource struct {
  50. Config LokiConfiguration
  51. Client *lokiclient.LokiClient
  52. logger *log.Entry
  53. lokiWebsocket string
  54. }
  55. func (l *LokiSource) GetMetrics() []prometheus.Collector {
  56. return []prometheus.Collector{linesRead}
  57. }
  58. func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
  59. return []prometheus.Collector{linesRead}
  60. }
  61. func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
  62. err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
  63. if err != nil {
  64. return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
  65. }
  66. if l.Config.Query == "" {
  67. return errors.New("Loki query is mandatory")
  68. }
  69. if l.Config.WaitForReady == 0 {
  70. l.Config.WaitForReady = 10 * time.Second
  71. }
  72. if l.Config.Mode == "" {
  73. l.Config.Mode = configuration.TAIL_MODE
  74. }
  75. if l.Config.Prefix == "" {
  76. l.Config.Prefix = "/"
  77. }
  78. if !strings.HasSuffix(l.Config.Prefix, "/") {
  79. l.Config.Prefix += "/"
  80. }
  81. if l.Config.Limit == 0 {
  82. l.Config.Limit = lokiLimit
  83. }
  84. if l.Config.Mode == configuration.TAIL_MODE {
  85. l.logger.Infof("Resetting since")
  86. l.Config.Since = 0
  87. }
  88. return nil
  89. }
  90. func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
  91. l.Config = LokiConfiguration{}
  92. l.logger = logger
  93. err := l.UnmarshalConfig(config)
  94. if err != nil {
  95. return err
  96. }
  97. l.logger.Infof("Since value: %s", l.Config.Since.String())
  98. clientConfig := lokiclient.Config{
  99. LokiURL: l.Config.URL,
  100. Headers: l.Config.Headers,
  101. Limit: l.Config.Limit,
  102. Query: l.Config.Query,
  103. Since: l.Config.Since,
  104. Username: l.Config.Auth.Username,
  105. Password: l.Config.Auth.Password,
  106. }
  107. l.Client = lokiclient.NewLokiClient(clientConfig)
  108. l.Client.Logger = logger.WithField("component", "lokiclient")
  109. return nil
  110. }
  111. func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
  112. l.logger = logger
  113. l.Config = LokiConfiguration{}
  114. l.Config.Mode = configuration.CAT_MODE
  115. l.Config.Labels = labels
  116. l.Config.UniqueId = uuid
  117. u, err := url.Parse(dsn)
  118. if err != nil {
  119. return errors.Wrap(err, "can't parse dsn configuration : "+dsn)
  120. }
  121. if u.Scheme != "loki" {
  122. return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
  123. }
  124. if u.Host == "" {
  125. return errors.New("Empty loki host")
  126. }
  127. scheme := "http"
  128. params := u.Query()
  129. if q := params.Get("ssl"); q != "" {
  130. scheme = "https"
  131. }
  132. if q := params.Get("query"); q != "" {
  133. l.Config.Query = q
  134. }
  135. if w := params.Get("wait_for_ready"); w != "" {
  136. l.Config.WaitForReady, err = time.ParseDuration(w)
  137. if err != nil {
  138. return err
  139. }
  140. } else {
  141. l.Config.WaitForReady = 10 * time.Second
  142. }
  143. if d := params.Get("delay_for"); d != "" {
  144. delayFor, err := time.ParseDuration(d)
  145. if err != nil {
  146. return err
  147. }
  148. l.Config.DelayFor = delayFor
  149. }
  150. if s := params.Get("since"); s != "" {
  151. l.Config.Since, err = time.ParseDuration(s)
  152. if err != nil {
  153. return errors.Wrap(err, "can't parse since in DSN configuration")
  154. }
  155. }
  156. if limit := params.Get("limit"); limit != "" {
  157. limit, err := strconv.Atoi(limit)
  158. if err != nil {
  159. return errors.Wrap(err, "can't parse limit in DSN configuration")
  160. }
  161. l.Config.Limit = limit
  162. } else {
  163. l.Config.Limit = 5000 // max limit allowed by loki
  164. }
  165. if logLevel := params.Get("log_level"); logLevel != "" {
  166. level, err := log.ParseLevel(logLevel)
  167. if err != nil {
  168. return errors.Wrap(err, "can't parse log_level in DSN configuration")
  169. }
  170. l.Config.LogLevel = &level
  171. l.logger.Logger.SetLevel(level)
  172. }
  173. l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
  174. if u.User != nil {
  175. l.Config.Auth.Username = u.User.Username()
  176. l.Config.Auth.Password, _ = u.User.Password()
  177. }
  178. clientConfig := lokiclient.Config{
  179. LokiURL: l.Config.URL,
  180. Headers: l.Config.Headers,
  181. Limit: l.Config.Limit,
  182. Query: l.Config.Query,
  183. Since: l.Config.Since,
  184. Username: l.Config.Auth.Username,
  185. Password: l.Config.Auth.Password,
  186. }
  187. l.Client = lokiclient.NewLokiClient(clientConfig)
  188. l.Client.Logger = logger.WithField("component", "lokiclient")
  189. return nil
  190. }
  191. func (l *LokiSource) GetMode() string {
  192. return l.Config.Mode
  193. }
  194. func (l *LokiSource) GetName() string {
  195. return "loki"
  196. }
  197. // OneShotAcquisition reads a set of file and returns when done
  198. func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  199. l.logger.Debug("Loki one shot acquisition")
  200. readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
  201. defer cancel()
  202. err := l.Client.Ready(readyCtx)
  203. if err != nil {
  204. return errors.Wrap(err, "loki is not ready")
  205. }
  206. ctx, cancel := context.WithCancel(context.Background())
  207. c := l.Client.QueryRange(ctx)
  208. for {
  209. select {
  210. case <-t.Dying():
  211. l.logger.Debug("Loki one shot acquisition stopped")
  212. cancel()
  213. return nil
  214. case resp, ok := <-c:
  215. if !ok {
  216. l.logger.Info("Loki acuiqisition done, chan closed")
  217. cancel()
  218. return nil
  219. }
  220. for _, stream := range resp.Data.Result {
  221. for _, entry := range stream.Entries {
  222. l.readOneEntry(entry, l.Config.Labels, out)
  223. }
  224. }
  225. }
  226. }
  227. }
  228. func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) {
  229. ll := types.Line{}
  230. ll.Raw = entry.Line
  231. ll.Time = entry.Timestamp
  232. ll.Src = l.Config.URL
  233. ll.Labels = labels
  234. ll.Process = true
  235. ll.Module = l.GetName()
  236. linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
  237. out <- types.Event{
  238. Line: ll,
  239. Process: true,
  240. Type: types.LOG,
  241. ExpectMode: types.TIMEMACHINE,
  242. }
  243. }
  244. func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  245. readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
  246. defer cancel()
  247. err := l.Client.Ready(readyCtx)
  248. if err != nil {
  249. return errors.Wrap(err, "loki is not ready")
  250. }
  251. ll := l.logger.WithField("websocket url", l.lokiWebsocket)
  252. t.Go(func() error {
  253. ctx, cancel := context.WithCancel(context.Background())
  254. defer cancel()
  255. respChan, err := l.Client.Tail(ctx)
  256. if err != nil {
  257. ll.Errorf("could not start loki tail: %s", err)
  258. return errors.Wrap(err, "could not start loki tail")
  259. }
  260. for {
  261. select {
  262. case resp, ok := <-respChan:
  263. if !ok {
  264. ll.Warnf("loki channel closed")
  265. return err
  266. }
  267. if len(resp.DroppedEntries) > 0 {
  268. ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
  269. }
  270. for _, stream := range resp.Streams {
  271. for _, entry := range stream.Entries {
  272. l.readOneEntry(entry, l.Config.Labels, out)
  273. }
  274. }
  275. case <-t.Dying():
  276. return nil
  277. }
  278. }
  279. })
  280. return nil
  281. }
  282. func (l *LokiSource) CanRun() error {
  283. return nil
  284. }
  285. func (l *LokiSource) GetUuid() string {
  286. return l.Config.UniqueId
  287. }
  288. func (l *LokiSource) Dump() interface{} {
  289. return l
  290. }
  291. // SupportedModes returns the supported modes by the acquisition module
  292. func (l *LokiSource) SupportedModes() []string {
  293. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  294. }