123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package loki
- /*
- https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail
- */
- import (
- "context"
- "fmt"
- "net/url"
- "strconv"
- "strings"
- "time"
- "github.com/pkg/errors"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- tomb "gopkg.in/tomb.v2"
- yaml "gopkg.in/yaml.v2"
- "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
- lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- )
- const (
- readyTimeout time.Duration = 3 * time.Second
- readyLoop int = 3
- readySleep time.Duration = 10 * time.Second
- lokiLimit int = 100
- )
- var linesRead = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "cs_lokisource_hits_total",
- Help: "Total lines that were read.",
- },
- []string{"source"})
- type LokiAuthConfiguration struct {
- Username string `yaml:"username"`
- Password string `yaml:"password"`
- }
- type LokiConfiguration struct {
- URL string `yaml:"url"` // Loki url
- Prefix string `yaml:"prefix"` // Loki prefix
- Query string `yaml:"query"` // LogQL query
- Limit int `yaml:"limit"` // Limit of logs to read
- DelayFor time.Duration `yaml:"delay_for"`
- Since time.Duration `yaml:"since"`
- Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
- WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
- Auth LokiAuthConfiguration `yaml:"auth"`
- configuration.DataSourceCommonCfg `yaml:",inline"`
- }
- type LokiSource struct {
- Config LokiConfiguration
- Client *lokiclient.LokiClient
- logger *log.Entry
- lokiWebsocket string
- }
- func (l *LokiSource) GetMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
- err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
- if err != nil {
- return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
- }
- if l.Config.Query == "" {
- return errors.New("Loki query is mandatory")
- }
- if l.Config.WaitForReady == 0 {
- l.Config.WaitForReady = 10 * time.Second
- }
- if l.Config.Mode == "" {
- l.Config.Mode = configuration.TAIL_MODE
- }
- if l.Config.Prefix == "" {
- l.Config.Prefix = "/"
- }
- if !strings.HasSuffix(l.Config.Prefix, "/") {
- l.Config.Prefix += "/"
- }
- if l.Config.Limit == 0 {
- l.Config.Limit = lokiLimit
- }
- if l.Config.Mode == configuration.TAIL_MODE {
- l.logger.Infof("Resetting since")
- l.Config.Since = 0
- }
- return nil
- }
- func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
- l.Config = LokiConfiguration{}
- l.logger = logger
- err := l.UnmarshalConfig(config)
- if err != nil {
- return err
- }
- l.logger.Infof("Since value: %s", l.Config.Since.String())
- clientConfig := lokiclient.Config{
- LokiURL: l.Config.URL,
- Headers: l.Config.Headers,
- Limit: l.Config.Limit,
- Query: l.Config.Query,
- Since: l.Config.Since,
- Username: l.Config.Auth.Username,
- Password: l.Config.Auth.Password,
- }
- l.Client = lokiclient.NewLokiClient(clientConfig)
- l.Client.Logger = logger.WithField("component", "lokiclient")
- return nil
- }
- func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
- l.logger = logger
- l.Config = LokiConfiguration{}
- l.Config.Mode = configuration.CAT_MODE
- l.Config.Labels = labels
- l.Config.UniqueId = uuid
- u, err := url.Parse(dsn)
- if err != nil {
- return errors.Wrap(err, "can't parse dsn configuration : "+dsn)
- }
- if u.Scheme != "loki" {
- return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
- }
- if u.Host == "" {
- return errors.New("Empty loki host")
- }
- scheme := "http"
- params := u.Query()
- if q := params.Get("ssl"); q != "" {
- scheme = "https"
- }
- if q := params.Get("query"); q != "" {
- l.Config.Query = q
- }
- if w := params.Get("wait_for_ready"); w != "" {
- l.Config.WaitForReady, err = time.ParseDuration(w)
- if err != nil {
- return err
- }
- } else {
- l.Config.WaitForReady = 10 * time.Second
- }
- if d := params.Get("delay_for"); d != "" {
- delayFor, err := time.ParseDuration(d)
- if err != nil {
- return err
- }
- l.Config.DelayFor = delayFor
- }
- if s := params.Get("since"); s != "" {
- l.Config.Since, err = time.ParseDuration(s)
- if err != nil {
- return errors.Wrap(err, "can't parse since in DSN configuration")
- }
- }
- if limit := params.Get("limit"); limit != "" {
- limit, err := strconv.Atoi(limit)
- if err != nil {
- return errors.Wrap(err, "can't parse limit in DSN configuration")
- }
- l.Config.Limit = limit
- } else {
- l.Config.Limit = 5000 // max limit allowed by loki
- }
- if logLevel := params.Get("log_level"); logLevel != "" {
- level, err := log.ParseLevel(logLevel)
- if err != nil {
- return errors.Wrap(err, "can't parse log_level in DSN configuration")
- }
- l.Config.LogLevel = &level
- l.logger.Logger.SetLevel(level)
- }
- l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
- if u.User != nil {
- l.Config.Auth.Username = u.User.Username()
- l.Config.Auth.Password, _ = u.User.Password()
- }
- clientConfig := lokiclient.Config{
- LokiURL: l.Config.URL,
- Headers: l.Config.Headers,
- Limit: l.Config.Limit,
- Query: l.Config.Query,
- Since: l.Config.Since,
- Username: l.Config.Auth.Username,
- Password: l.Config.Auth.Password,
- }
- l.Client = lokiclient.NewLokiClient(clientConfig)
- l.Client.Logger = logger.WithField("component", "lokiclient")
- return nil
- }
- func (l *LokiSource) GetMode() string {
- return l.Config.Mode
- }
- func (l *LokiSource) GetName() string {
- return "loki"
- }
- // OneShotAcquisition reads a set of file and returns when done
- func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
- l.logger.Debug("Loki one shot acquisition")
- readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
- defer cancel()
- err := l.Client.Ready(readyCtx)
- if err != nil {
- return errors.Wrap(err, "loki is not ready")
- }
- ctx, cancel := context.WithCancel(context.Background())
- c := l.Client.QueryRange(ctx)
- for {
- select {
- case <-t.Dying():
- l.logger.Debug("Loki one shot acquisition stopped")
- cancel()
- return nil
- case resp, ok := <-c:
- if !ok {
- l.logger.Info("Loki acuiqisition done, chan closed")
- cancel()
- return nil
- }
- for _, stream := range resp.Data.Result {
- for _, entry := range stream.Entries {
- l.readOneEntry(entry, l.Config.Labels, out)
- }
- }
- }
- }
- }
- func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) {
- ll := types.Line{}
- ll.Raw = entry.Line
- ll.Time = entry.Timestamp
- ll.Src = l.Config.URL
- ll.Labels = labels
- ll.Process = true
- ll.Module = l.GetName()
- linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
- out <- types.Event{
- Line: ll,
- Process: true,
- Type: types.LOG,
- ExpectMode: types.TIMEMACHINE,
- }
- }
- func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
- readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
- defer cancel()
- err := l.Client.Ready(readyCtx)
- if err != nil {
- return errors.Wrap(err, "loki is not ready")
- }
- ll := l.logger.WithField("websocket url", l.lokiWebsocket)
- t.Go(func() error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- respChan, err := l.Client.Tail(ctx)
- if err != nil {
- ll.Errorf("could not start loki tail: %s", err)
- return errors.Wrap(err, "could not start loki tail")
- }
- for {
- select {
- case resp, ok := <-respChan:
- if !ok {
- ll.Warnf("loki channel closed")
- return err
- }
- if len(resp.DroppedEntries) > 0 {
- ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
- }
- for _, stream := range resp.Streams {
- for _, entry := range stream.Entries {
- l.readOneEntry(entry, l.Config.Labels, out)
- }
- }
- case <-t.Dying():
- return nil
- }
- }
- })
- return nil
- }
- func (l *LokiSource) CanRun() error {
- return nil
- }
- func (l *LokiSource) GetUuid() string {
- return l.Config.UniqueId
- }
- func (l *LokiSource) Dump() interface{} {
- return l
- }
- // SupportedModes returns the supported modes by the acquisition module
- func (l *LokiSource) SupportedModes() []string {
- return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
- }
|