Jelajahi Sumber

fix #723 : intercept http2 stream closed errors (#724)

* fix #723 : intercept http2 stream closed errors

* factorize the 'dump stacktrace' code
Thibault "bui" Koechlin 4 tahun lalu
induk
melakukan
4bb34d8e77

+ 1 - 1
go.mod

@@ -60,7 +60,7 @@ require (
 	github.com/vjeantet/grok v1.0.1 // indirect
 	golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
 	golang.org/x/mod v0.4.1
-	golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
+	golang.org/x/net v0.0.0-20201224014010-6772e930b67b
 	golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4
 	golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf // indirect
 	golang.org/x/text v0.3.5 // indirect

+ 53 - 1
pkg/apiserver/apiserver.go

@@ -3,7 +3,10 @@ package apiserver
 import (
 	"context"
 	"fmt"
+	"net"
 	"net/http"
+	"os"
+	"strings"
 	"time"
 
 	"github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers"
@@ -36,6 +39,55 @@ type APIServer struct {
 	httpServerTomb tomb.Tomb
 }
 
+// RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one.
+func CustomRecoveryWithWriter() gin.HandlerFunc {
+	return func(c *gin.Context) {
+		defer func() {
+			if err := recover(); err != nil {
+				// Check for a broken connection, as it is not really a
+				// condition that warrants a panic stack trace.
+				var brokenPipe bool
+				if ne, ok := err.(*net.OpError); ok {
+					if se, ok := ne.Err.(*os.SyscallError); ok {
+						if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
+							brokenPipe = true
+						}
+					}
+				}
+
+				// because of https://github.com/golang/net/blob/39120d07d75e76f0079fe5d27480bcb965a21e4c/http2/server.go
+				// and because it seems gin doesn't handle those neither, we need to "hand define" some errors to properly catch them
+				if strErr, ok := err.(error); ok {
+					//stolen from http2/server.go in x/net
+					var (
+						errClientDisconnected = errors.New("client disconnected")
+						errClosedBody         = errors.New("body closed by handler")
+						errHandlerComplete    = errors.New("http2: request body closed due to handler exiting")
+						errStreamClosed       = errors.New("http2: stream closed")
+					)
+					if strErr == errClientDisconnected ||
+						strErr == errClosedBody ||
+						strErr == errHandlerComplete ||
+						strErr == errStreamClosed {
+						brokenPipe = true
+					}
+				}
+
+				if brokenPipe {
+					log.Warningf("client %s disconnected : %s", c.ClientIP(), err)
+					c.Abort()
+				} else {
+					filename := types.WriteStackTrace(err)
+					log.Warningf("client %s error : %s", c.ClientIP(), err)
+					log.Warningf("stacktrace written to %s, please join to your issue", filename)
+					c.AbortWithStatus(http.StatusInternalServerError)
+				}
+			}
+		}()
+		c.Next()
+	}
+}
+
 func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
 	var flushScheduler *gocron.Scheduler
 	dbClient, err := database.NewClient(config.DbConfig)
@@ -111,7 +163,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
 		c.JSON(http.StatusNotFound, gin.H{"message": "Page or Method not found"})
 		return
 	})
-	router.Use(gin.Recovery())
+	router.Use(CustomRecoveryWithWriter())
 	controller := &controllers.Controller{
 		DBClient: dbClient,
 		Ectx:     context.Background(),

+ 1 - 6
pkg/apiserver/controllers/v1/alerts.go

@@ -12,7 +12,6 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/csprofiles"
 	"github.com/crowdsecurity/crowdsec/pkg/database/ent"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
-	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/gin-gonic/gin"
 	"github.com/go-openapi/strfmt"
 	log "github.com/sirupsen/logrus"
@@ -99,7 +98,6 @@ func FormatAlerts(result []*ent.Alert) models.AddAlertsRequest {
 
 // CreateAlert : write received alerts in body to the database
 func (c *Controller) CreateAlert(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/CreateAlert")
 
 	var input models.AddAlertsRequest
 
@@ -148,12 +146,12 @@ func (c *Controller) CreateAlert(gctx *gin.Context) {
 
 // FindAlerts : return alerts from database based on the specified filter
 func (c *Controller) FindAlerts(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/FindAlerts")
 	result, err := c.DBClient.QueryAlertWithFilter(gctx.Request.URL.Query())
 	if err != nil {
 		c.HandleDBErrors(gctx, err)
 		return
 	}
+
 	data := FormatAlerts(result)
 
 	if gctx.Request.Method == "HEAD" {
@@ -166,8 +164,6 @@ func (c *Controller) FindAlerts(gctx *gin.Context) {
 
 // FindAlertByID return the alert assiocated to the ID
 func (c *Controller) FindAlertByID(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/FindAlertByID")
-
 	alertIDStr := gctx.Param("alert_id")
 	alertID, err := strconv.Atoi(alertIDStr)
 	if err != nil {
@@ -191,7 +187,6 @@ func (c *Controller) FindAlertByID(gctx *gin.Context) {
 
 // DeleteAlerts : delete alerts from database based on the specified filter
 func (c *Controller) DeleteAlerts(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/DeleteAlerts")
 
 	if gctx.ClientIP() != "127.0.0.1" && gctx.ClientIP() != "::1" {
 		gctx.JSON(http.StatusForbidden, gin.H{"message": fmt.Sprintf("access forbidden from this IP (%s)", gctx.ClientIP())})

+ 0 - 5
pkg/apiserver/controllers/v1/decisions.go

@@ -9,7 +9,6 @@ import (
 
 	"github.com/crowdsecurity/crowdsec/pkg/database/ent"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
-	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/gin-gonic/gin"
 	log "github.com/sirupsen/logrus"
 )
@@ -33,7 +32,6 @@ func FormatDecisions(decisions []*ent.Decision) ([]*models.Decision, error) {
 }
 
 func (c *Controller) GetDecision(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/GetDecision")
 	var err error
 	var results []*models.Decision
 	var data []*ent.Decision
@@ -66,7 +64,6 @@ func (c *Controller) GetDecision(gctx *gin.Context) {
 }
 
 func (c *Controller) DeleteDecisionById(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/DeleteDecisionById")
 	var err error
 
 	decisionIDStr := gctx.Param("decision_id")
@@ -90,7 +87,6 @@ func (c *Controller) DeleteDecisionById(gctx *gin.Context) {
 }
 
 func (c *Controller) DeleteDecisions(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/DeleteDecisions")
 	var err error
 
 	nbDeleted, err := c.DBClient.SoftDeleteDecisionsWithFilter(gctx.Request.URL.Query())
@@ -107,7 +103,6 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) {
 }
 
 func (c *Controller) StreamDecision(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/StreamDecision")
 	var data []*ent.Decision
 	ret := make(map[string][]*models.Decision, 0)
 	ret["new"] = []*models.Decision{}

+ 0 - 2
pkg/apiserver/controllers/v1/machines.go

@@ -4,13 +4,11 @@ import (
 	"net/http"
 
 	"github.com/crowdsecurity/crowdsec/pkg/models"
-	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/gin-gonic/gin"
 	"github.com/go-openapi/strfmt"
 )
 
 func (c *Controller) CreateMachine(gctx *gin.Context) {
-	defer types.CatchPanic("crowdsec/controllersV1/CreateMachine")
 	var err error
 	var input models.WatcherRegistrationRequest
 	if err = gctx.ShouldBindJSON(&input); err != nil {

+ 26 - 37
pkg/types/utils.go

@@ -6,13 +6,11 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"net"
 	"os"
 	"path/filepath"
 	"runtime/debug"
 	"strconv"
 	"strings"
-	"syscall"
 	"time"
 
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
@@ -74,46 +72,37 @@ func Clone(a, b interface{}) error {
 	return nil
 }
 
+func WriteStackTrace(iErr interface{}) string {
+	tmpfile, err := ioutil.TempFile("/tmp/", "crowdsec-crash.*.txt")
+	if err != nil {
+		log.Fatal(err)
+	}
+	if _, err := tmpfile.Write([]byte(fmt.Sprintf("error : %+v\n", iErr))); err != nil {
+		tmpfile.Close()
+		log.Fatal(err)
+	}
+	if _, err := tmpfile.Write([]byte(cwversion.ShowStr())); err != nil {
+		tmpfile.Close()
+		log.Fatal(err)
+	}
+	if _, err := tmpfile.Write(debug.Stack()); err != nil {
+		tmpfile.Close()
+		log.Fatal(err)
+	}
+	if err := tmpfile.Close(); err != nil {
+		log.Fatal(err)
+	}
+	return tmpfile.Name()
+}
+
 //CatchPanic is a util func that we should call from all go-routines to ensure proper stacktrace handling
 func CatchPanic(component string) {
-
 	if r := recover(); r != nil {
-
-		/*mimic gin's behaviour on broken pipe*/
-		var brokenPipe bool
-		if ne, ok := r.(*net.OpError); ok {
-			if se, ok := ne.Err.(*os.SyscallError); ok {
-				if se.Err == syscall.EPIPE || se.Err == syscall.ECONNRESET {
-					brokenPipe = true
-				}
-			}
-		}
-
-		tmpfile, err := ioutil.TempFile("/tmp/", "crowdsec-crash.*.txt")
-		if err != nil {
-			log.Fatal(err)
-		}
-		if _, err := tmpfile.Write([]byte(cwversion.ShowStr())); err != nil {
-			tmpfile.Close()
-			log.Fatal(err)
-		}
-		if _, err := tmpfile.Write(debug.Stack()); err != nil {
-			tmpfile.Close()
-			log.Fatal(err)
-		}
-		if err := tmpfile.Close(); err != nil {
-			log.Fatal(err)
-		}
-
 		log.Errorf("crowdsec - goroutine %s crashed : %s", component, r)
 		log.Errorf("please report this error to https://github.com/crowdsecurity/crowdsec/")
-		log.Errorf("stacktrace/report is written to %s : please join it to your issue", tmpfile.Name())
-
-		/*if it's not a broken pipe error, we don't want to fatal. it can happen from Local API pov*/
-		if !brokenPipe {
-			log.Fatalf("crowdsec stopped")
-		}
-
+		filename := WriteStackTrace(r)
+		log.Errorf("stacktrace/report is written to %s : please join it to your issue", filename)
+		log.Fatalf("crowdsec stopped")
 	}
 }