syslog.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package syslogacquisition
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "time"
  7. "github.com/pkg/errors"
  8. "github.com/prometheus/client_golang/prometheus"
  9. log "github.com/sirupsen/logrus"
  10. "gopkg.in/tomb.v2"
  11. "gopkg.in/yaml.v2"
  12. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  13. "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc3164"
  14. "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc5424"
  15. syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server"
  16. "github.com/crowdsecurity/crowdsec/pkg/types"
  17. )
  18. type SyslogConfiguration struct {
  19. Proto string `yaml:"protocol,omitempty"`
  20. Port int `yaml:"listen_port,omitempty"`
  21. Addr string `yaml:"listen_addr,omitempty"`
  22. MaxMessageLen int `yaml:"max_message_len,omitempty"`
  23. configuration.DataSourceCommonCfg `yaml:",inline"`
  24. }
  25. type SyslogSource struct {
  26. config SyslogConfiguration
  27. logger *log.Entry
  28. server *syslogserver.SyslogServer
  29. serverTomb *tomb.Tomb
  30. }
  31. var linesReceived = prometheus.NewCounterVec(
  32. prometheus.CounterOpts{
  33. Name: "cs_syslogsource_hits_total",
  34. Help: "Total lines that were received.",
  35. },
  36. []string{"source"})
  37. var linesParsed = prometheus.NewCounterVec(
  38. prometheus.CounterOpts{
  39. Name: "cs_syslogsource_parsed_total",
  40. Help: "Total lines that were successfully parsed",
  41. },
  42. []string{"source", "type"})
  43. func (s *SyslogSource) GetName() string {
  44. return "syslog"
  45. }
  46. func (s *SyslogSource) GetMode() string {
  47. return s.config.Mode
  48. }
  49. func (s *SyslogSource) Dump() interface{} {
  50. return s
  51. }
  52. func (s *SyslogSource) CanRun() error {
  53. return nil
  54. }
  55. func (s *SyslogSource) GetMetrics() []prometheus.Collector {
  56. return []prometheus.Collector{linesReceived, linesParsed}
  57. }
  58. func (s *SyslogSource) GetAggregMetrics() []prometheus.Collector {
  59. return []prometheus.Collector{linesReceived, linesParsed}
  60. }
  61. func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  62. return fmt.Errorf("syslog datasource does not support one shot acquisition")
  63. }
  64. func (s *SyslogSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  65. return fmt.Errorf("syslog datasource does not support one shot acquisition")
  66. }
  67. func validatePort(port int) bool {
  68. return port > 0 && port <= 65535
  69. }
  70. func validateAddr(addr string) bool {
  71. return net.ParseIP(addr) != nil
  72. }
  73. func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error {
  74. s.config = SyslogConfiguration{}
  75. s.config.Mode = configuration.TAIL_MODE
  76. err := yaml.UnmarshalStrict(yamlConfig, &s.config)
  77. if err != nil {
  78. return errors.Wrap(err, "Cannot parse syslog configuration")
  79. }
  80. if s.config.Addr == "" {
  81. s.config.Addr = "127.0.0.1" //do we want a usable or secure default ?
  82. }
  83. if s.config.Port == 0 {
  84. s.config.Port = 514
  85. }
  86. if s.config.MaxMessageLen == 0 {
  87. s.config.MaxMessageLen = 2048
  88. }
  89. if !validatePort(s.config.Port) {
  90. return fmt.Errorf("invalid port %d", s.config.Port)
  91. }
  92. if !validateAddr(s.config.Addr) {
  93. return fmt.Errorf("invalid listen IP %s", s.config.Addr)
  94. }
  95. return nil
  96. }
  97. func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  98. s.logger = logger
  99. s.logger.Infof("Starting syslog datasource configuration")
  100. err := s.UnmarshalConfig(yamlConfig)
  101. if err != nil {
  102. return err
  103. }
  104. return nil
  105. }
  106. func (s *SyslogSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  107. c := make(chan syslogserver.SyslogMessage)
  108. s.server = &syslogserver.SyslogServer{Logger: s.logger.WithField("syslog", "internal"), MaxMessageLen: s.config.MaxMessageLen}
  109. s.server.SetChannel(c)
  110. err := s.server.Listen(s.config.Addr, s.config.Port)
  111. if err != nil {
  112. return errors.Wrap(err, "could not start syslog server")
  113. }
  114. s.serverTomb = s.server.StartServer()
  115. t.Go(func() error {
  116. defer types.CatchPanic("crowdsec/acquis/syslog/live")
  117. return s.handleSyslogMsg(out, t, c)
  118. })
  119. return nil
  120. }
  121. func (s *SyslogSource) buildLogFromSyslog(ts time.Time, hostname string,
  122. appname string, pid string, msg string) string {
  123. ret := ""
  124. if !ts.IsZero() {
  125. ret += ts.Format("Jan 2 15:04:05")
  126. } else {
  127. s.logger.Tracef("%s - missing TS", msg)
  128. ret += time.Now().UTC().Format("Jan 2 15:04:05")
  129. }
  130. if hostname != "" {
  131. ret += " " + hostname
  132. } else {
  133. s.logger.Tracef("%s - missing host", msg)
  134. ret += " unknownhost"
  135. }
  136. if appname != "" {
  137. ret += " " + appname
  138. }
  139. if pid != "" {
  140. ret += "[" + pid + "]: "
  141. } else {
  142. ret += ": "
  143. }
  144. if msg != "" {
  145. ret += msg
  146. }
  147. return ret
  148. }
  149. func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c chan syslogserver.SyslogMessage) error {
  150. killed := false
  151. for {
  152. select {
  153. case <-t.Dying():
  154. if !killed {
  155. s.logger.Info("Syslog datasource is dying")
  156. s.serverTomb.Kill(nil)
  157. killed = true
  158. }
  159. case <-s.serverTomb.Dead():
  160. s.logger.Info("Syslog server has exited")
  161. return nil
  162. case syslogLine := <-c:
  163. var line string
  164. var ts time.Time
  165. logger := s.logger.WithField("client", syslogLine.Client)
  166. logger.Tracef("raw: %s", syslogLine)
  167. linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc()
  168. p := rfc3164.NewRFC3164Parser(rfc3164.WithCurrentYear())
  169. err := p.Parse(syslogLine.Message)
  170. if err != nil {
  171. logger.Debugf("could not parse as RFC3164 (%s)", err)
  172. p2 := rfc5424.NewRFC5424Parser()
  173. err = p2.Parse(syslogLine.Message)
  174. if err != nil {
  175. logger.Errorf("could not parse message: %s", err)
  176. logger.Debugf("could not parse as RFC5424 (%s) : %s", err, syslogLine.Message)
  177. continue
  178. }
  179. line = s.buildLogFromSyslog(p2.Timestamp, p2.Hostname, p2.Tag, p2.PID, p2.Message)
  180. } else {
  181. line = s.buildLogFromSyslog(p.Timestamp, p.Hostname, p.Tag, p.PID, p.Message)
  182. }
  183. line = strings.TrimSuffix(line, "\n")
  184. l := types.Line{}
  185. l.Raw = line
  186. l.Module = s.GetName()
  187. l.Labels = s.config.Labels
  188. l.Time = ts
  189. l.Src = syslogLine.Client
  190. l.Process = true
  191. if !s.config.UseTimeMachine {
  192. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
  193. } else {
  194. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  195. }
  196. }
  197. }
  198. }