syslog.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package syslogacquisition
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "time"
  7. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  8. syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal"
  9. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  10. "github.com/crowdsecurity/crowdsec/pkg/types"
  11. "github.com/influxdata/go-syslog/v3/rfc3164"
  12. "github.com/influxdata/go-syslog/v3/rfc5424"
  13. "github.com/pkg/errors"
  14. "github.com/prometheus/client_golang/prometheus"
  15. log "github.com/sirupsen/logrus"
  16. "gopkg.in/tomb.v2"
  17. "gopkg.in/yaml.v2"
  18. )
  19. type SyslogConfiguration struct {
  20. Proto string `yaml:"protocol,omitempty"`
  21. Port int `yaml:"listen_port,omitempty"`
  22. Addr string `yaml:"listen_addr,omitempty"`
  23. MaxMessageLen int `yaml:"max_message_len,omitempty"`
  24. configuration.DataSourceCommonCfg `yaml:",inline"`
  25. }
  26. type SyslogSource struct {
  27. config SyslogConfiguration
  28. logger *log.Entry
  29. server *syslogserver.SyslogServer
  30. serverTomb *tomb.Tomb
  31. }
  32. var linesReceived = prometheus.NewCounterVec(
  33. prometheus.CounterOpts{
  34. Name: "cs_syslogsource_hits_total",
  35. Help: "Total lines that were received.",
  36. },
  37. []string{"source"})
  38. var linesParsed = prometheus.NewCounterVec(
  39. prometheus.CounterOpts{
  40. Name: "cs_syslogsource_parsed_total",
  41. Help: "Total lines that were successfully parsed",
  42. },
  43. []string{"source", "type"})
  44. func (s *SyslogSource) GetName() string {
  45. return "syslog"
  46. }
  47. func (s *SyslogSource) GetMode() string {
  48. return s.config.Mode
  49. }
  50. func (s *SyslogSource) Dump() interface{} {
  51. return s
  52. }
  53. func (s *SyslogSource) CanRun() error {
  54. return nil
  55. }
  56. func (s *SyslogSource) GetMetrics() []prometheus.Collector {
  57. return []prometheus.Collector{linesReceived, linesParsed}
  58. }
  59. func (s *SyslogSource) GetAggregMetrics() []prometheus.Collector {
  60. return []prometheus.Collector{linesReceived, linesParsed}
  61. }
  62. func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  63. return fmt.Errorf("syslog datasource does not support one shot acquisition")
  64. }
  65. func (s *SyslogSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  66. return fmt.Errorf("syslog datasource does not support one shot acquisition")
  67. }
  68. func validatePort(port int) bool {
  69. return port > 0 && port <= 65535
  70. }
  71. func validateAddr(addr string) bool {
  72. return net.ParseIP(addr) != nil
  73. }
  74. func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  75. s.logger = logger
  76. s.logger.Infof("Starting syslog datasource configuration")
  77. syslogConfig := SyslogConfiguration{}
  78. syslogConfig.Mode = configuration.TAIL_MODE
  79. err := yaml.UnmarshalStrict(yamlConfig, &syslogConfig)
  80. if err != nil {
  81. return errors.Wrap(err, "Cannot parse syslog configuration")
  82. }
  83. if syslogConfig.Addr == "" {
  84. syslogConfig.Addr = "127.0.0.1" //do we want a usable or secure default ?
  85. }
  86. if syslogConfig.Port == 0 {
  87. syslogConfig.Port = 514
  88. }
  89. if syslogConfig.MaxMessageLen == 0 {
  90. syslogConfig.MaxMessageLen = 2048
  91. }
  92. if !validatePort(syslogConfig.Port) {
  93. return fmt.Errorf("invalid port %d", syslogConfig.Port)
  94. }
  95. if !validateAddr(syslogConfig.Addr) {
  96. return fmt.Errorf("invalid listen IP %s", syslogConfig.Addr)
  97. }
  98. s.config = syslogConfig
  99. return nil
  100. }
  101. func (s *SyslogSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  102. c := make(chan syslogserver.SyslogMessage)
  103. s.server = &syslogserver.SyslogServer{Logger: s.logger.WithField("syslog", "internal"), MaxMessageLen: s.config.MaxMessageLen}
  104. s.server.SetChannel(c)
  105. err := s.server.Listen(s.config.Addr, s.config.Port)
  106. if err != nil {
  107. return errors.Wrap(err, "could not start syslog server")
  108. }
  109. s.serverTomb = s.server.StartServer()
  110. t.Go(func() error {
  111. defer types.CatchPanic("crowdsec/acquis/syslog/live")
  112. return s.handleSyslogMsg(out, t, c)
  113. })
  114. return nil
  115. }
  116. func (s *SyslogSource) buildLogFromSyslog(ts *time.Time, hostname *string,
  117. appname *string, pid *string, msg *string) (string, error) {
  118. ret := ""
  119. if msg == nil {
  120. return "", errors.Errorf("missing message field in syslog message")
  121. }
  122. if ts != nil {
  123. ret += ts.Format("Jan 2 15:04:05")
  124. } else {
  125. s.logger.Tracef("%s - missing TS", *msg)
  126. ret += time.Now().UTC().Format("Jan 2 15:04:05")
  127. }
  128. if hostname != nil {
  129. ret += " " + *hostname
  130. } else {
  131. s.logger.Tracef("%s - missing host", *msg)
  132. ret += " unknownhost"
  133. }
  134. if appname != nil {
  135. ret += " " + *appname
  136. } else {
  137. return "", errors.Errorf("missing appname field in syslog message")
  138. }
  139. if pid != nil {
  140. /*
  141. !!! ugly hack !!!
  142. Due to a bug in the syslog parser we use (https://github.com/influxdata/go-syslog/issues/31),
  143. the ProcID field will contain garbage if the message as a ] anywhere in it.
  144. Assume that a correctly formated ProcID only contains number, and if this is not the case, set it to an arbitrary value
  145. */
  146. _, err := strconv.Atoi(*pid)
  147. if err != nil {
  148. ret += "[1]: "
  149. } else {
  150. ret += "[" + *pid + "]: "
  151. }
  152. } else {
  153. ret += ": "
  154. }
  155. if msg != nil {
  156. ret += *msg
  157. }
  158. return ret, nil
  159. }
  160. func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c chan syslogserver.SyslogMessage) error {
  161. for {
  162. select {
  163. case <-t.Dying():
  164. s.logger.Info("Syslog datasource is dying")
  165. s.serverTomb.Kill(nil)
  166. return s.serverTomb.Wait()
  167. case <-s.serverTomb.Dying():
  168. s.logger.Info("Syslog server is dying, exiting")
  169. return nil
  170. case <-s.serverTomb.Dead():
  171. s.logger.Info("Syslog server has exited")
  172. return nil
  173. case syslogLine := <-c:
  174. var line string
  175. var ts time.Time
  176. logger := s.logger.WithField("client", syslogLine.Client)
  177. logger.Tracef("raw: %s", syslogLine)
  178. linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc()
  179. p := rfc5424.NewParser()
  180. m, err := p.Parse(syslogLine.Message)
  181. if err != nil {
  182. logger.Debugf("could not parse as RFC5424 (%s)", err)
  183. p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}))
  184. m, err = p.Parse(syslogLine.Message)
  185. if err != nil {
  186. logger.Errorf("could not parse message: %s", err)
  187. logger.Debugf("could not parse as RFC3164 (%s) : %s", err, syslogLine.Message)
  188. continue
  189. }
  190. msg := m.(*rfc3164.SyslogMessage)
  191. line, err = s.buildLogFromSyslog(msg.Timestamp, msg.Hostname, msg.Appname, msg.ProcID, msg.Message)
  192. if err != nil {
  193. logger.Debugf("could not parse as RFC3164 (%s) : %s", err, syslogLine.Message)
  194. logger.Error(err)
  195. continue
  196. }
  197. linesParsed.With(prometheus.Labels{"source": syslogLine.Client,
  198. "type": "RFC3164"}).Inc()
  199. } else {
  200. msg := m.(*rfc5424.SyslogMessage)
  201. line, err = s.buildLogFromSyslog(msg.Timestamp, msg.Hostname, msg.Appname, msg.ProcID, msg.Message)
  202. if err != nil {
  203. log.Debugf("could not parse message as RFC5424 (%s) : %s", err, syslogLine.Message)
  204. logger.Error(err)
  205. continue
  206. }
  207. linesParsed.With(prometheus.Labels{"source": syslogLine.Client,
  208. "type": "RFC5424"}).Inc()
  209. }
  210. l := types.Line{}
  211. l.Raw = line
  212. l.Module = s.GetName()
  213. l.Labels = s.config.Labels
  214. l.Time = ts
  215. l.Src = syslogLine.Client
  216. l.Process = true
  217. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  218. }
  219. }
  220. }