2020-11-30 09:37:17 +00:00
package apiclient
import (
"bytes"
"encoding/json"
"fmt"
"io"
2023-06-22 13:01:34 +00:00
"math/rand"
2020-11-30 09:37:17 +00:00
"net/http"
"net/http/httputil"
"net/url"
2023-06-22 13:01:34 +00:00
"sync"
"time"
2020-11-30 09:37:17 +00:00
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2023-06-22 13:01:34 +00:00
"github.com/crowdsecurity/crowdsec/pkg/fflag"
"github.com/crowdsecurity/crowdsec/pkg/models"
2020-11-30 09:37:17 +00:00
)
type APIKeyTransport struct {
APIKey string
// Transport is the underlying HTTP transport to use when making requests.
// It will default to http.DefaultTransport if nil.
Transport http . RoundTripper
URL * url . URL
VersionPrefix string
UserAgent string
}
// RoundTrip implements the RoundTripper interface.
func ( t * APIKeyTransport ) RoundTrip ( req * http . Request ) ( * http . Response , error ) {
if t . APIKey == "" {
2020-11-30 15:15:07 +00:00
return nil , errors . New ( "APIKey is empty" )
2020-11-30 09:37:17 +00:00
}
// We must make a copy of the Request so
// that we don't modify the Request we were given. This is required by the
// specification of http.RoundTripper.
req = cloneRequest ( req )
req . Header . Add ( "X-Api-Key" , t . APIKey )
if t . UserAgent != "" {
req . Header . Add ( "User-Agent" , t . UserAgent )
}
log . Debugf ( "req-api: %s %s" , req . Method , req . URL . String ( ) )
if log . GetLevel ( ) >= log . TraceLevel {
dump , _ := httputil . DumpRequest ( req , true )
log . Tracef ( "auth-api request: %s" , string ( dump ) )
}
// Make the HTTP request.
resp , err := t . transport ( ) . RoundTrip ( req )
if err != nil {
log . Errorf ( "auth-api: auth with api key failed return nil response, error: %s" , err )
return resp , err
}
if log . GetLevel ( ) >= log . TraceLevel {
dump , _ := httputil . DumpResponse ( resp , true )
log . Tracef ( "auth-api response: %s" , string ( dump ) )
}
log . Debugf ( "resp-api: http %d" , resp . StatusCode )
return resp , err
}
func ( t * APIKeyTransport ) Client ( ) * http . Client {
return & http . Client { Transport : t }
}
func ( t * APIKeyTransport ) transport ( ) http . RoundTripper {
if t . Transport != nil {
return t . Transport
}
return http . DefaultTransport
}
2023-01-09 13:49:21 +00:00
type retryRoundTripper struct {
next http . RoundTripper
maxAttempts int
retryStatusCodes [ ] int
withBackOff bool
onBeforeRequest func ( attempt int )
}
func ( r retryRoundTripper ) ShouldRetry ( statusCode int ) bool {
for _ , code := range r . retryStatusCodes {
if code == statusCode {
return true
}
}
return false
}
func ( r retryRoundTripper ) RoundTrip ( req * http . Request ) ( * http . Response , error ) {
var resp * http . Response
var err error
2023-08-16 19:04:07 +00:00
2023-01-09 13:49:21 +00:00
backoff := 0
2023-08-16 19:04:07 +00:00
maxAttempts := r . maxAttempts
if fflag . DisableHttpRetryBackoff . IsEnabled ( ) {
maxAttempts = 1
}
for i := 0 ; i < maxAttempts ; i ++ {
2023-01-09 13:49:21 +00:00
if i > 0 {
2023-08-16 19:04:07 +00:00
if r . withBackOff {
2023-01-09 13:49:21 +00:00
backoff += 10 + rand . Intn ( 20 )
}
log . Infof ( "retrying in %d seconds (attempt %d of %d)" , backoff , i + 1 , r . maxAttempts )
select {
case <- req . Context ( ) . Done ( ) :
return resp , req . Context ( ) . Err ( )
case <- time . After ( time . Duration ( backoff ) * time . Second ) :
}
}
if r . onBeforeRequest != nil {
r . onBeforeRequest ( i )
}
clonedReq := cloneRequest ( req )
resp , err = r . next . RoundTrip ( clonedReq )
2023-01-13 11:58:12 +00:00
if err != nil {
2023-08-16 19:04:07 +00:00
left := maxAttempts - i - 1
if left > 0 {
log . Errorf ( "error while performing request: %s; %d retries left" , err , left )
}
2023-01-13 11:58:12 +00:00
continue
}
if ! r . ShouldRetry ( resp . StatusCode ) {
return resp , nil
2023-01-09 13:49:21 +00:00
}
}
return resp , err
}
2020-11-30 09:37:17 +00:00
type JWTTransport struct {
MachineID * string
Password * strfmt . Password
2022-12-14 15:42:46 +00:00
Token string
2020-11-30 09:37:17 +00:00
Expiration time . Time
Scenarios [ ] string
URL * url . URL
VersionPrefix string
UserAgent string
// Transport is the underlying HTTP transport to use when making requests.
// It will default to http.DefaultTransport if nil.
2023-01-09 13:49:21 +00:00
Transport http . RoundTripper
UpdateScenario func ( ) ( [ ] string , error )
refreshTokenMutex sync . Mutex
2020-11-30 09:37:17 +00:00
}
func ( t * JWTTransport ) refreshJwtToken ( ) error {
var err error
if t . UpdateScenario != nil {
t . Scenarios , err = t . UpdateScenario ( )
if err != nil {
return fmt . Errorf ( "can't update scenario list: %s" , err )
}
2020-12-14 10:54:16 +00:00
log . Debugf ( "scenarios list updated for '%s'" , * t . MachineID )
2020-11-30 09:37:17 +00:00
}
var auth = models . WatcherAuthRequest {
MachineID : t . MachineID ,
Password : t . Password ,
Scenarios : t . Scenarios ,
}
var response models . WatcherAuthResponse
/ *
we don ' t use the main client , so let ' s build the body
* /
2022-02-01 21:08:06 +00:00
var buf io . ReadWriter = & bytes . Buffer { }
2020-11-30 09:37:17 +00:00
enc := json . NewEncoder ( buf )
enc . SetEscapeHTML ( false )
err = enc . Encode ( auth )
if err != nil {
2023-06-22 13:01:34 +00:00
return fmt . Errorf ( "could not encode jwt auth body: %w" , err )
2020-11-30 09:37:17 +00:00
}
2022-08-16 07:46:10 +00:00
req , err := http . NewRequest ( http . MethodPost , fmt . Sprintf ( "%s%s/watchers/login" , t . URL , t . VersionPrefix ) , buf )
2020-11-30 09:37:17 +00:00
if err != nil {
2023-06-22 13:01:34 +00:00
return fmt . Errorf ( "could not create request: %w" , err )
2020-11-30 09:37:17 +00:00
}
req . Header . Add ( "Content-Type" , "application/json" )
2023-01-09 13:49:21 +00:00
client := & http . Client {
Transport : & retryRoundTripper {
next : http . DefaultTransport ,
maxAttempts : 5 ,
withBackOff : true ,
retryStatusCodes : [ ] int { http . StatusTooManyRequests , http . StatusServiceUnavailable , http . StatusGatewayTimeout , http . StatusInternalServerError } ,
} ,
}
2020-11-30 09:37:17 +00:00
if t . UserAgent != "" {
req . Header . Add ( "User-Agent" , t . UserAgent )
}
if log . GetLevel ( ) >= log . TraceLevel {
dump , _ := httputil . DumpRequest ( req , true )
log . Tracef ( "auth-jwt request: %s" , string ( dump ) )
}
log . Debugf ( "auth-jwt(auth): %s %s" , req . Method , req . URL . String ( ) )
resp , err := client . Do ( req )
if err != nil {
2023-06-22 13:01:34 +00:00
return fmt . Errorf ( "could not get jwt token: %w" , err )
2020-11-30 09:37:17 +00:00
}
log . Debugf ( "auth-jwt : http %d" , resp . StatusCode )
if log . GetLevel ( ) >= log . TraceLevel {
dump , _ := httputil . DumpResponse ( resp , true )
log . Tracef ( "auth-jwt response: %s" , string ( dump ) )
}
defer resp . Body . Close ( )
if resp . StatusCode < 200 || resp . StatusCode >= 300 {
2021-09-02 10:46:32 +00:00
log . Debugf ( "received response status %q when fetching %v" , resp . Status , req . URL )
2023-01-09 13:49:21 +00:00
2021-09-02 10:46:32 +00:00
err = CheckResponse ( resp )
if err != nil {
return err
}
2020-11-30 09:37:17 +00:00
}
if err := json . NewDecoder ( resp . Body ) . Decode ( & response ) ; err != nil {
2023-06-22 13:01:34 +00:00
return fmt . Errorf ( "unable to decode response: %w" , err )
2020-11-30 09:37:17 +00:00
}
if err := t . Expiration . UnmarshalText ( [ ] byte ( response . Expire ) ) ; err != nil {
2023-06-22 13:01:34 +00:00
return fmt . Errorf ( "unable to parse jwt expiration: %w" , err )
2020-11-30 09:37:17 +00:00
}
2022-12-14 15:42:46 +00:00
t . Token = response . Token
2020-11-30 09:37:17 +00:00
2022-12-14 15:42:46 +00:00
log . Debugf ( "token %s will expire on %s" , t . Token , t . Expiration . String ( ) )
2020-11-30 09:37:17 +00:00
return nil
}
// RoundTrip implements the RoundTripper interface.
func ( t * JWTTransport ) RoundTrip ( req * http . Request ) ( * http . Response , error ) {
2023-01-09 13:49:21 +00:00
// in a few occasions several goroutines will execute refreshJwtToken concurrently which is useless and will cause overload on CAPI
// we use a mutex to avoid this
2023-02-16 15:16:26 +00:00
//We also bypass the refresh if we are requesting the login endpoint, as it does not require a token, and it leads to do 2 requests instead of one (refresh + actual login request)
2023-01-09 13:49:21 +00:00
t . refreshTokenMutex . Lock ( )
2023-02-16 15:16:26 +00:00
if req . URL . Path != "/" + t . VersionPrefix + "/watchers/login" && ( t . Token == "" || t . Expiration . Add ( - time . Minute ) . Before ( time . Now ( ) . UTC ( ) ) ) {
2020-11-30 09:37:17 +00:00
if err := t . refreshJwtToken ( ) ; err != nil {
2023-01-09 13:49:21 +00:00
t . refreshTokenMutex . Unlock ( )
2020-11-30 09:37:17 +00:00
return nil , err
}
}
2023-01-09 13:49:21 +00:00
t . refreshTokenMutex . Unlock ( )
2020-11-30 09:37:17 +00:00
2022-12-14 15:42:46 +00:00
if t . UserAgent != "" {
req . Header . Add ( "User-Agent" , t . UserAgent )
}
req . Header . Add ( "Authorization" , fmt . Sprintf ( "Bearer %s" , t . Token ) )
2020-11-30 09:37:17 +00:00
if log . GetLevel ( ) >= log . TraceLevel {
2022-12-14 15:42:46 +00:00
//requestToDump := cloneRequest(req)
2020-11-30 09:37:17 +00:00
dump , _ := httputil . DumpRequest ( req , true )
log . Tracef ( "req-jwt: %s" , string ( dump ) )
}
2022-12-14 15:42:46 +00:00
2020-11-30 09:37:17 +00:00
// Make the HTTP request.
resp , err := t . transport ( ) . RoundTrip ( req )
if log . GetLevel ( ) >= log . TraceLevel {
dump , _ := httputil . DumpResponse ( resp , true )
2021-01-15 17:14:50 +00:00
log . Tracef ( "resp-jwt: %s (err:%v)" , string ( dump ) , err )
2020-11-30 09:37:17 +00:00
}
2022-12-14 15:42:46 +00:00
if err != nil {
2021-01-14 15:04:10 +00:00
/*we had an error (network error for example, or 401 because token is refused), reset the token ?*/
2022-12-14 15:42:46 +00:00
t . Token = ""
2023-06-22 13:01:34 +00:00
return resp , fmt . Errorf ( "performing jwt auth: %w" , err )
2020-11-30 09:37:17 +00:00
}
2022-12-14 15:42:46 +00:00
2023-08-16 19:04:07 +00:00
if resp != nil {
log . Debugf ( "resp-jwt: %d" , resp . StatusCode )
}
2022-12-14 15:42:46 +00:00
2020-11-30 09:37:17 +00:00
return resp , nil
}
func ( t * JWTTransport ) Client ( ) * http . Client {
return & http . Client { Transport : t }
}
2023-01-09 13:49:21 +00:00
func ( t * JWTTransport ) ResetToken ( ) {
log . Debug ( "resetting jwt token" )
t . refreshTokenMutex . Lock ( )
t . Token = ""
t . refreshTokenMutex . Unlock ( )
}
2020-11-30 09:37:17 +00:00
func ( t * JWTTransport ) transport ( ) http . RoundTripper {
2023-01-09 13:49:21 +00:00
var transport http . RoundTripper
2020-11-30 09:37:17 +00:00
if t . Transport != nil {
2023-01-09 13:49:21 +00:00
transport = t . Transport
} else {
transport = http . DefaultTransport
}
// a round tripper that retries once when the status is unauthorized and 5 times when infrastructure is overloaded
return & retryRoundTripper {
next : & retryRoundTripper {
next : transport ,
maxAttempts : 5 ,
withBackOff : true ,
retryStatusCodes : [ ] int { http . StatusTooManyRequests , http . StatusServiceUnavailable , http . StatusGatewayTimeout } ,
} ,
maxAttempts : 2 ,
withBackOff : false ,
retryStatusCodes : [ ] int { http . StatusUnauthorized , http . StatusForbidden } ,
onBeforeRequest : func ( attempt int ) {
// reset the token only in the second attempt as this is when we know we had a 401 or 403
// the second attempt is supposed to refresh the token
if attempt > 0 {
t . ResetToken ( )
}
} ,
2020-11-30 09:37:17 +00:00
}
}
// cloneRequest returns a clone of the provided *http.Request. The clone is a
// shallow copy of the struct and its Header map.
func cloneRequest ( r * http . Request ) * http . Request {
// shallow copy of the struct
r2 := new ( http . Request )
* r2 = * r
// deep copy of the Header
r2 . Header = make ( http . Header , len ( r . Header ) )
for k , s := range r . Header {
r2 . Header [ k ] = append ( [ ] string ( nil ) , s ... )
}
2022-12-14 15:42:46 +00:00
if r . Body != nil {
var b bytes . Buffer
b . ReadFrom ( r . Body )
r . Body = io . NopCloser ( & b )
r2 . Body = io . NopCloser ( bytes . NewReader ( b . Bytes ( ) ) )
}
2020-11-30 09:37:17 +00:00
return r2
}