apiserver.go 8.5 KB


  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers"
  12. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  13. "github.com/crowdsecurity/crowdsec/pkg/csplugin"
  14. "github.com/crowdsecurity/crowdsec/pkg/database"
  15. "github.com/crowdsecurity/crowdsec/pkg/types"
  16. "github.com/gin-gonic/gin"
  17. "github.com/go-co-op/gocron"
  18. "github.com/pkg/errors"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/natefinch/lumberjack.v2"
  21. "gopkg.in/tomb.v2"
  22. )
  23. var (
  24. keyLength = 32
  25. )
  26. type APIServer struct {
  27. URL string
  28. TLS *csconfig.TLSCfg
  29. dbClient *database.Client
  30. logFile string
  31. controller *controllers.Controller
  32. flushScheduler *gocron.Scheduler
  33. router *gin.Engine
  34. httpServer *http.Server
  35. apic *apic
  36. httpServerTomb tomb.Tomb
  37. consoleConfig *csconfig.ConsoleConfig
  38. }
  39. // RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one.
  40. func CustomRecoveryWithWriter() gin.HandlerFunc {
  41. return func(c *gin.Context) {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. // Check for a broken connection, as it is not really a
  45. // condition that warrants a panic stack trace.
  46. var brokenPipe bool
  47. if ne, ok := err.(*net.OpError); ok {
  48. if se, ok := ne.Err.(*os.SyscallError); ok {
  49. if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
  50. brokenPipe = true
  51. }
  52. }
  53. }
  54. // because of https://github.com/golang/net/blob/39120d07d75e76f0079fe5d27480bcb965a21e4c/http2/server.go
  55. // and because it seems gin doesn't handle those neither, we need to "hand define" some errors to properly catch them
  56. if strErr, ok := err.(error); ok {
  57. //stolen from http2/server.go in x/net
  58. var (
  59. errClientDisconnected = errors.New("client disconnected")
  60. errClosedBody = errors.New("body closed by handler")
  61. errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
  62. errStreamClosed = errors.New("http2: stream closed")
  63. )
  64. if strErr == errClientDisconnected ||
  65. strErr == errClosedBody ||
  66. strErr == errHandlerComplete ||
  67. strErr == errStreamClosed {
  68. brokenPipe = true
  69. }
  70. }
  71. if brokenPipe {
  72. log.Warningf("client %s disconnected : %s", c.ClientIP(), err)
  73. c.Abort()
  74. } else {
  75. filename := types.WriteStackTrace(err)
  76. log.Warningf("client %s error : %s", c.ClientIP(), err)
  77. log.Warningf("stacktrace written to %s, please join to your issue", filename)
  78. c.AbortWithStatus(http.StatusInternalServerError)
  79. }
  80. }
  81. }()
  82. c.Next()
  83. }
  84. }
  85. func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
  86. var flushScheduler *gocron.Scheduler
  87. dbClient, err := database.NewClient(config.DbConfig)
  88. if err != nil {
  89. return &APIServer{}, fmt.Errorf("unable to init database client: %s", err)
  90. }
  91. if config.DbConfig.Flush != nil {
  92. flushScheduler, err = dbClient.StartFlushScheduler(config.DbConfig.Flush)
  93. if err != nil {
  94. return &APIServer{}, err
  95. }
  96. }
  97. logFile := ""
  98. if config.LogMedia == "file" {
  99. logFile = fmt.Sprintf("%s/crowdsec_api.log", config.LogDir)
  100. }
  101. if log.GetLevel() < log.DebugLevel {
  102. gin.SetMode(gin.ReleaseMode)
  103. }
  104. log.Debugf("starting router, logging to %s", logFile)
  105. router := gin.New()
  106. if config.TrustedProxies != nil && config.UseForwardedForHeaders {
  107. if err := router.SetTrustedProxies(*config.TrustedProxies); err != nil {
  108. return &APIServer{}, errors.Wrap(err, "while setting trusted_proxies")
  109. }
  110. router.ForwardedByClientIP = true
  111. } else {
  112. router.ForwardedByClientIP = false
  113. }
  114. /*The logger that will be used by handlers*/
  115. clog := log.New()
  116. if err := types.ConfigureLogger(clog); err != nil {
  117. return nil, errors.Wrap(err, "while configuring gin logger")
  118. }
  119. if config.LogLevel != nil {
  120. clog.SetLevel(*config.LogLevel)
  121. }
  122. /*Configure logs*/
  123. if logFile != "" {
  124. _maxsize := 500
  125. if config.LogMaxSize != 0 {
  126. _maxsize = config.LogMaxSize
  127. }
  128. _maxfiles := 3
  129. if config.LogMaxFiles != 0 {
  130. _maxfiles = config.LogMaxFiles
  131. }
  132. _maxage := 28
  133. if config.LogMaxAge != 0 {
  134. _maxage = config.LogMaxAge
  135. }
  136. _compress := true
  137. if config.CompressLogs != nil {
  138. _compress = *config.CompressLogs
  139. }
  140. /*cf. https://github.com/natefinch/lumberjack/issues/82
  141. let's create the file beforehand w/ the right perms */
  142. // check if file exists
  143. _, err := os.Stat(logFile)
  144. // create file if not exists, purposefully ignore errors
  145. if os.IsNotExist(err) {
  146. file, _ := os.OpenFile(logFile, os.O_RDWR|os.O_CREATE, 0600)
  147. file.Close()
  148. }
  149. LogOutput := &lumberjack.Logger{
  150. Filename: logFile,
  151. MaxSize: _maxsize, //megabytes
  152. MaxBackups: _maxfiles,
  153. MaxAge: _maxage, //days
  154. Compress: _compress, //disabled by default
  155. }
  156. clog.SetOutput(LogOutput)
  157. }
  158. gin.DefaultErrorWriter = clog.WriterLevel(log.ErrorLevel)
  159. gin.DefaultWriter = clog.Writer()
  160. router.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
  161. return fmt.Sprintf("%s - [%s] \"%s %s %s %d %s \"%s\" %s\"\n",
  162. param.ClientIP,
  163. param.TimeStamp.Format(time.RFC1123),
  164. param.Method,
  165. param.Path,
  166. param.Request.Proto,
  167. param.StatusCode,
  168. param.Latency,
  169. param.Request.UserAgent(),
  170. param.ErrorMessage,
  171. )
  172. }))
  173. router.NoRoute(func(c *gin.Context) {
  174. c.JSON(http.StatusNotFound, gin.H{"message": "Page or Method not found"})
  175. return
  176. })
  177. router.Use(CustomRecoveryWithWriter())
  178. controller := &controllers.Controller{
  179. DBClient: dbClient,
  180. Ectx: context.Background(),
  181. Router: router,
  182. Profiles: config.Profiles,
  183. Log: clog,
  184. ConsoleConfig: config.ConsoleConfig,
  185. }
  186. var apiClient *apic
  187. if config.OnlineClient != nil && config.OnlineClient.Credentials != nil {
  188. log.Printf("Loading CAPI pusher")
  189. apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig)
  190. if err != nil {
  191. return &APIServer{}, err
  192. }
  193. controller.CAPIChan = apiClient.alertToPush
  194. } else {
  195. apiClient = nil
  196. controller.CAPIChan = nil
  197. }
  198. return &APIServer{
  199. URL: config.ListenURI,
  200. TLS: config.TLS,
  201. logFile: logFile,
  202. dbClient: dbClient,
  203. controller: controller,
  204. flushScheduler: flushScheduler,
  205. router: router,
  206. apic: apiClient,
  207. httpServerTomb: tomb.Tomb{},
  208. consoleConfig: config.ConsoleConfig,
  209. }, nil
  210. }
  211. func (s *APIServer) Router() (*gin.Engine, error) {
  212. return s.router, nil
  213. }
  214. func (s *APIServer) Run() error {
  215. defer types.CatchPanic("lapi/runServer")
  216. s.httpServer = &http.Server{
  217. Addr: s.URL,
  218. Handler: s.router,
  219. }
  220. if s.apic != nil {
  221. s.apic.pushTomb.Go(func() error {
  222. if err := s.apic.Push(); err != nil {
  223. log.Errorf("capi push: %s", err)
  224. return err
  225. }
  226. return nil
  227. })
  228. s.apic.pullTomb.Go(func() error {
  229. if err := s.apic.Pull(); err != nil {
  230. log.Errorf("capi pull: %s", err)
  231. return err
  232. }
  233. return nil
  234. })
  235. s.apic.metricsTomb.Go(func() error {
  236. if err := s.apic.SendMetrics(); err != nil {
  237. log.Errorf("capi metrics: %s", err)
  238. return err
  239. }
  240. return nil
  241. })
  242. }
  243. s.httpServerTomb.Go(func() error {
  244. go func() {
  245. if s.TLS != nil && s.TLS.CertFilePath != "" && s.TLS.KeyFilePath != "" {
  246. if err := s.httpServer.ListenAndServeTLS(s.TLS.CertFilePath, s.TLS.KeyFilePath); err != nil {
  247. log.Fatalf(err.Error())
  248. }
  249. } else {
  250. if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
  251. log.Fatalf(err.Error())
  252. }
  253. }
  254. }()
  255. <-s.httpServerTomb.Dying()
  256. return nil
  257. })
  258. return nil
  259. }
  260. func (s *APIServer) Close() {
  261. if s.apic != nil {
  262. s.apic.Shutdown() // stop apic first since it use dbClient
  263. }
  264. s.dbClient.Ent.Close()
  265. if s.flushScheduler != nil {
  266. s.flushScheduler.Stop()
  267. }
  268. }
  269. func (s *APIServer) Shutdown() error {
  270. s.Close()
  271. if err := s.httpServer.Shutdown(context.TODO()); err != nil {
  272. return err
  273. }
  274. //close io.writer logger given to gin
  275. if pipe, ok := gin.DefaultErrorWriter.(*io.PipeWriter); ok {
  276. pipe.Close()
  277. }
  278. if pipe, ok := gin.DefaultWriter.(*io.PipeWriter); ok {
  279. pipe.Close()
  280. }
  281. s.httpServerTomb.Kill(nil)
  282. if err := s.httpServerTomb.Wait(); err != nil {
  283. return errors.Wrap(err, "while waiting on httpServerTomb")
  284. }
  285. return nil
  286. }
  287. func (s *APIServer) AttachPluginBroker(broker *csplugin.PluginBroker) {
  288. s.controller.PluginChannel = broker.PluginChannel
  289. }
  290. func (s *APIServer) InitController() error {
  291. err := s.controller.Init()
  292. return err
  293. }