123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- package apiserver
- import (
- "context"
- "fmt"
- "io"
- "net"
- "net/http"
- "os"
- "strings"
- "time"
- "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers"
- "github.com/crowdsecurity/crowdsec/pkg/csconfig"
- "github.com/crowdsecurity/crowdsec/pkg/csplugin"
- "github.com/crowdsecurity/crowdsec/pkg/database"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/gin-gonic/gin"
- "github.com/go-co-op/gocron"
- "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
- "gopkg.in/natefinch/lumberjack.v2"
- "gopkg.in/tomb.v2"
- )
- var (
- keyLength = 32
- )
- type APIServer struct {
- URL string
- TLS *csconfig.TLSCfg
- dbClient *database.Client
- logFile string
- controller *controllers.Controller
- flushScheduler *gocron.Scheduler
- router *gin.Engine
- httpServer *http.Server
- apic *apic
- httpServerTomb tomb.Tomb
- consoleConfig *csconfig.ConsoleConfig
- }
- // RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one.
- func CustomRecoveryWithWriter() gin.HandlerFunc {
- return func(c *gin.Context) {
- defer func() {
- if err := recover(); err != nil {
- // Check for a broken connection, as it is not really a
- // condition that warrants a panic stack trace.
- var brokenPipe bool
- if ne, ok := err.(*net.OpError); ok {
- if se, ok := ne.Err.(*os.SyscallError); ok {
- if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
- brokenPipe = true
- }
- }
- }
- // because of https://github.com/golang/net/blob/39120d07d75e76f0079fe5d27480bcb965a21e4c/http2/server.go
- // and because it seems gin doesn't handle those neither, we need to "hand define" some errors to properly catch them
- if strErr, ok := err.(error); ok {
- //stolen from http2/server.go in x/net
- var (
- errClientDisconnected = errors.New("client disconnected")
- errClosedBody = errors.New("body closed by handler")
- errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
- errStreamClosed = errors.New("http2: stream closed")
- )
- if strErr == errClientDisconnected ||
- strErr == errClosedBody ||
- strErr == errHandlerComplete ||
- strErr == errStreamClosed {
- brokenPipe = true
- }
- }
- if brokenPipe {
- log.Warningf("client %s disconnected : %s", c.ClientIP(), err)
- c.Abort()
- } else {
- filename := types.WriteStackTrace(err)
- log.Warningf("client %s error : %s", c.ClientIP(), err)
- log.Warningf("stacktrace written to %s, please join to your issue", filename)
- c.AbortWithStatus(http.StatusInternalServerError)
- }
- }
- }()
- c.Next()
- }
- }
- func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
- var flushScheduler *gocron.Scheduler
- dbClient, err := database.NewClient(config.DbConfig)
- if err != nil {
- return &APIServer{}, fmt.Errorf("unable to init database client: %s", err)
- }
- if config.DbConfig.Flush != nil {
- flushScheduler, err = dbClient.StartFlushScheduler(config.DbConfig.Flush)
- if err != nil {
- return &APIServer{}, err
- }
- }
- logFile := ""
- if config.LogMedia == "file" {
- logFile = fmt.Sprintf("%s/crowdsec_api.log", config.LogDir)
- }
- if log.GetLevel() < log.DebugLevel {
- gin.SetMode(gin.ReleaseMode)
- }
- log.Debugf("starting router, logging to %s", logFile)
- router := gin.New()
- if config.TrustedProxies != nil && config.UseForwardedForHeaders {
- if err := router.SetTrustedProxies(*config.TrustedProxies); err != nil {
- return &APIServer{}, errors.Wrap(err, "while setting trusted_proxies")
- }
- router.ForwardedByClientIP = true
- } else {
- router.ForwardedByClientIP = false
- }
- /*The logger that will be used by handlers*/
- clog := log.New()
- if err := types.ConfigureLogger(clog); err != nil {
- return nil, errors.Wrap(err, "while configuring gin logger")
- }
- if config.LogLevel != nil {
- clog.SetLevel(*config.LogLevel)
- }
- /*Configure logs*/
- if logFile != "" {
- _maxsize := 500
- if config.LogMaxSize != 0 {
- _maxsize = config.LogMaxSize
- }
- _maxfiles := 3
- if config.LogMaxFiles != 0 {
- _maxfiles = config.LogMaxFiles
- }
- _maxage := 28
- if config.LogMaxAge != 0 {
- _maxage = config.LogMaxAge
- }
- _compress := true
- if config.CompressLogs != nil {
- _compress = *config.CompressLogs
- }
- /*cf. https://github.com/natefinch/lumberjack/issues/82
- let's create the file beforehand w/ the right perms */
- // check if file exists
- _, err := os.Stat(logFile)
- // create file if not exists, purposefully ignore errors
- if os.IsNotExist(err) {
- file, _ := os.OpenFile(logFile, os.O_RDWR|os.O_CREATE, 0600)
- file.Close()
- }
- LogOutput := &lumberjack.Logger{
- Filename: logFile,
- MaxSize: _maxsize, //megabytes
- MaxBackups: _maxfiles,
- MaxAge: _maxage, //days
- Compress: _compress, //disabled by default
- }
- clog.SetOutput(LogOutput)
- }
- gin.DefaultErrorWriter = clog.WriterLevel(log.ErrorLevel)
- gin.DefaultWriter = clog.Writer()
- router.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
- return fmt.Sprintf("%s - [%s] \"%s %s %s %d %s \"%s\" %s\"\n",
- param.ClientIP,
- param.TimeStamp.Format(time.RFC1123),
- param.Method,
- param.Path,
- param.Request.Proto,
- param.StatusCode,
- param.Latency,
- param.Request.UserAgent(),
- param.ErrorMessage,
- )
- }))
- router.NoRoute(func(c *gin.Context) {
- c.JSON(http.StatusNotFound, gin.H{"message": "Page or Method not found"})
- return
- })
- router.Use(CustomRecoveryWithWriter())
- controller := &controllers.Controller{
- DBClient: dbClient,
- Ectx: context.Background(),
- Router: router,
- Profiles: config.Profiles,
- Log: clog,
- ConsoleConfig: config.ConsoleConfig,
- }
- var apiClient *apic
- if config.OnlineClient != nil && config.OnlineClient.Credentials != nil {
- log.Printf("Loading CAPI pusher")
- apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig)
- if err != nil {
- return &APIServer{}, err
- }
- controller.CAPIChan = apiClient.alertToPush
- } else {
- apiClient = nil
- controller.CAPIChan = nil
- }
- return &APIServer{
- URL: config.ListenURI,
- TLS: config.TLS,
- logFile: logFile,
- dbClient: dbClient,
- controller: controller,
- flushScheduler: flushScheduler,
- router: router,
- apic: apiClient,
- httpServerTomb: tomb.Tomb{},
- consoleConfig: config.ConsoleConfig,
- }, nil
- }
- func (s *APIServer) Router() (*gin.Engine, error) {
- return s.router, nil
- }
- func (s *APIServer) Run() error {
- defer types.CatchPanic("lapi/runServer")
- s.httpServer = &http.Server{
- Addr: s.URL,
- Handler: s.router,
- }
- if s.apic != nil {
- s.apic.pushTomb.Go(func() error {
- if err := s.apic.Push(); err != nil {
- log.Errorf("capi push: %s", err)
- return err
- }
- return nil
- })
- s.apic.pullTomb.Go(func() error {
- if err := s.apic.Pull(); err != nil {
- log.Errorf("capi pull: %s", err)
- return err
- }
- return nil
- })
- s.apic.metricsTomb.Go(func() error {
- if err := s.apic.SendMetrics(); err != nil {
- log.Errorf("capi metrics: %s", err)
- return err
- }
- return nil
- })
- }
- s.httpServerTomb.Go(func() error {
- go func() {
- if s.TLS != nil && s.TLS.CertFilePath != "" && s.TLS.KeyFilePath != "" {
- if err := s.httpServer.ListenAndServeTLS(s.TLS.CertFilePath, s.TLS.KeyFilePath); err != nil {
- log.Fatalf(err.Error())
- }
- } else {
- if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
- log.Fatalf(err.Error())
- }
- }
- }()
- <-s.httpServerTomb.Dying()
- return nil
- })
- return nil
- }
- func (s *APIServer) Close() {
- if s.apic != nil {
- s.apic.Shutdown() // stop apic first since it use dbClient
- }
- s.dbClient.Ent.Close()
- if s.flushScheduler != nil {
- s.flushScheduler.Stop()
- }
- }
- func (s *APIServer) Shutdown() error {
- s.Close()
- if err := s.httpServer.Shutdown(context.TODO()); err != nil {
- return err
- }
- //close io.writer logger given to gin
- if pipe, ok := gin.DefaultErrorWriter.(*io.PipeWriter); ok {
- pipe.Close()
- }
- if pipe, ok := gin.DefaultWriter.(*io.PipeWriter); ok {
- pipe.Close()
- }
- s.httpServerTomb.Kill(nil)
- if err := s.httpServerTomb.Wait(); err != nil {
- return errors.Wrap(err, "while waiting on httpServerTomb")
- }
- return nil
- }
- func (s *APIServer) AttachPluginBroker(broker *csplugin.PluginBroker) {
- s.controller.PluginChannel = broker.PluginChannel
- }
- func (s *APIServer) InitController() error {
- err := s.controller.Init()
- return err
- }
|