common.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953
  1. // Package common defines code shared among file transfer packages and protocols
  2. package common
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "os/exec"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. "github.com/pires/go-proxyproto"
  18. "github.com/drakkan/sftpgo/dataprovider"
  19. "github.com/drakkan/sftpgo/httpclient"
  20. "github.com/drakkan/sftpgo/logger"
  21. "github.com/drakkan/sftpgo/metrics"
  22. "github.com/drakkan/sftpgo/utils"
  23. )
  24. // constants
  25. const (
  26. logSender = "common"
  27. uploadLogSender = "Upload"
  28. downloadLogSender = "Download"
  29. renameLogSender = "Rename"
  30. rmdirLogSender = "Rmdir"
  31. mkdirLogSender = "Mkdir"
  32. symlinkLogSender = "Symlink"
  33. removeLogSender = "Remove"
  34. chownLogSender = "Chown"
  35. chmodLogSender = "Chmod"
  36. chtimesLogSender = "Chtimes"
  37. truncateLogSender = "Truncate"
  38. operationDownload = "download"
  39. operationUpload = "upload"
  40. operationDelete = "delete"
  41. operationPreDelete = "pre-delete"
  42. operationRename = "rename"
  43. operationSSHCmd = "ssh_cmd"
  44. chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
  45. idleTimeoutCheckInterval = 3 * time.Minute
  46. )
  47. // Stat flags
  48. const (
  49. StatAttrUIDGID = 1
  50. StatAttrPerms = 2
  51. StatAttrTimes = 4
  52. StatAttrSize = 8
  53. )
  54. // Transfer types
  55. const (
  56. TransferUpload = iota
  57. TransferDownload
  58. )
  59. // Supported protocols
  60. const (
  61. ProtocolSFTP = "SFTP"
  62. ProtocolSCP = "SCP"
  63. ProtocolSSH = "SSH"
  64. ProtocolFTP = "FTP"
  65. ProtocolWebDAV = "DAV"
  66. ProtocolHTTP = "HTTP"
  67. )
  68. // Upload modes
  69. const (
  70. UploadModeStandard = iota
  71. UploadModeAtomic
  72. UploadModeAtomicWithResume
  73. )
  74. func init() {
  75. Connections.clients = clientsMap{
  76. clients: make(map[string]int),
  77. }
  78. }
  79. // errors definitions
  80. var (
  81. ErrPermissionDenied = errors.New("permission denied")
  82. ErrNotExist = errors.New("no such file or directory")
  83. ErrOpUnsupported = errors.New("operation unsupported")
  84. ErrGenericFailure = errors.New("failure")
  85. ErrQuotaExceeded = errors.New("denying write due to space limit")
  86. ErrSkipPermissionsCheck = errors.New("permission check skipped")
  87. ErrConnectionDenied = errors.New("you are not allowed to connect")
  88. ErrNoBinding = errors.New("no binding configured")
  89. ErrCrtRevoked = errors.New("your certificate has been revoked")
  90. errNoTransfer = errors.New("requested transfer not found")
  91. errTransferMismatch = errors.New("transfer mismatch")
  92. )
  93. var (
  94. // Config is the configuration for the supported protocols
  95. Config Configuration
  96. // Connections is the list of active connections
  97. Connections ActiveConnections
  98. // QuotaScans is the list of active quota scans
  99. QuotaScans ActiveScans
  100. idleTimeoutTicker *time.Ticker
  101. idleTimeoutTickerDone chan bool
  102. supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV, ProtocolHTTP}
  103. // the map key is the protocol, for each protocol we can have multiple rate limiters
  104. rateLimiters map[string][]*rateLimiter
  105. )
  106. // Initialize sets the common configuration
  107. func Initialize(c Configuration) error {
  108. Config = c
  109. Config.idleLoginTimeout = 2 * time.Minute
  110. Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
  111. if Config.IdleTimeout > 0 {
  112. startIdleTimeoutTicker(idleTimeoutCheckInterval)
  113. }
  114. Config.defender = nil
  115. if c.DefenderConfig.Enabled {
  116. defender, err := newInMemoryDefender(&c.DefenderConfig)
  117. if err != nil {
  118. return fmt.Errorf("defender initialization error: %v", err)
  119. }
  120. logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
  121. Config.defender = defender
  122. }
  123. rateLimiters = make(map[string][]*rateLimiter)
  124. for _, rlCfg := range c.RateLimitersConfig {
  125. if rlCfg.isEnabled() {
  126. if err := rlCfg.validate(); err != nil {
  127. return fmt.Errorf("rate limiters initialization error: %v", err)
  128. }
  129. rateLimiter := rlCfg.getLimiter()
  130. for _, protocol := range rlCfg.Protocols {
  131. rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
  132. }
  133. }
  134. }
  135. return nil
  136. }
  137. // LimitRate blocks until all the configured rate limiters
  138. // allow one event to happen.
  139. // It returns an error if the time to wait exceeds the max
  140. // allowed delay
  141. func LimitRate(protocol, ip string) (time.Duration, error) {
  142. for _, limiter := range rateLimiters[protocol] {
  143. if delay, err := limiter.Wait(ip); err != nil {
  144. logger.Debug(logSender, "", "protocol %v ip %v: %v", protocol, ip, err)
  145. return delay, err
  146. }
  147. }
  148. return 0, nil
  149. }
  150. // ReloadDefender reloads the defender's block and safe lists
  151. func ReloadDefender() error {
  152. if Config.defender == nil {
  153. return nil
  154. }
  155. return Config.defender.Reload()
  156. }
  157. // IsBanned returns true if the specified IP address is banned
  158. func IsBanned(ip string) bool {
  159. if Config.defender == nil {
  160. return false
  161. }
  162. return Config.defender.IsBanned(ip)
  163. }
  164. // GetDefenderBanTime returns the ban time for the given IP
  165. // or nil if the IP is not banned or the defender is disabled
  166. func GetDefenderBanTime(ip string) *time.Time {
  167. if Config.defender == nil {
  168. return nil
  169. }
  170. return Config.defender.GetBanTime(ip)
  171. }
  172. // Unban removes the specified IP address from the banned ones
  173. func Unban(ip string) bool {
  174. if Config.defender == nil {
  175. return false
  176. }
  177. return Config.defender.Unban(ip)
  178. }
  179. // GetDefenderScore returns the score for the given IP
  180. func GetDefenderScore(ip string) int {
  181. if Config.defender == nil {
  182. return 0
  183. }
  184. return Config.defender.GetScore(ip)
  185. }
  186. // AddDefenderEvent adds the specified defender event for the given IP
  187. func AddDefenderEvent(ip string, event HostEvent) {
  188. if Config.defender == nil {
  189. return
  190. }
  191. Config.defender.AddEvent(ip, event)
  192. }
  193. // the ticker cannot be started/stopped from multiple goroutines
  194. func startIdleTimeoutTicker(duration time.Duration) {
  195. stopIdleTimeoutTicker()
  196. idleTimeoutTicker = time.NewTicker(duration)
  197. idleTimeoutTickerDone = make(chan bool)
  198. go func() {
  199. for {
  200. select {
  201. case <-idleTimeoutTickerDone:
  202. return
  203. case <-idleTimeoutTicker.C:
  204. Connections.checkIdles()
  205. }
  206. }
  207. }()
  208. }
  209. func stopIdleTimeoutTicker() {
  210. if idleTimeoutTicker != nil {
  211. idleTimeoutTicker.Stop()
  212. idleTimeoutTickerDone <- true
  213. idleTimeoutTicker = nil
  214. }
  215. }
  216. // ActiveTransfer defines the interface for the current active transfers
  217. type ActiveTransfer interface {
  218. GetID() uint64
  219. GetType() int
  220. GetSize() int64
  221. GetVirtualPath() string
  222. GetStartTime() time.Time
  223. SignalClose()
  224. Truncate(fsPath string, size int64) (int64, error)
  225. GetRealFsPath(fsPath string) string
  226. }
  227. // ActiveConnection defines the interface for the current active connections
  228. type ActiveConnection interface {
  229. GetID() string
  230. GetUsername() string
  231. GetRemoteAddress() string
  232. GetClientVersion() string
  233. GetProtocol() string
  234. GetConnectionTime() time.Time
  235. GetLastActivity() time.Time
  236. GetCommand() string
  237. Disconnect() error
  238. AddTransfer(t ActiveTransfer)
  239. RemoveTransfer(t ActiveTransfer)
  240. GetTransfers() []ConnectionTransfer
  241. CloseFS() error
  242. }
  243. // StatAttributes defines the attributes for set stat commands
  244. type StatAttributes struct {
  245. Mode os.FileMode
  246. Atime time.Time
  247. Mtime time.Time
  248. UID int
  249. GID int
  250. Flags int
  251. Size int64
  252. }
  253. // ConnectionTransfer defines the trasfer details to expose
  254. type ConnectionTransfer struct {
  255. ID uint64 `json:"-"`
  256. OperationType string `json:"operation_type"`
  257. StartTime int64 `json:"start_time"`
  258. Size int64 `json:"size"`
  259. VirtualPath string `json:"path"`
  260. }
  261. func (t *ConnectionTransfer) getConnectionTransferAsString() string {
  262. result := ""
  263. switch t.OperationType {
  264. case operationUpload:
  265. result += "UL "
  266. case operationDownload:
  267. result += "DL "
  268. }
  269. result += fmt.Sprintf("%#v ", t.VirtualPath)
  270. if t.Size > 0 {
  271. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(t.StartTime))
  272. speed := float64(t.Size) / float64(utils.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
  273. result += fmt.Sprintf("Size: %#v Elapsed: %#v Speed: \"%.1f KB/s\"", utils.ByteCountIEC(t.Size),
  274. utils.GetDurationAsString(elapsed), speed)
  275. }
  276. return result
  277. }
  278. // Configuration defines configuration parameters common to all supported protocols
  279. type Configuration struct {
  280. // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
  281. // 0 means disabled
  282. IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
  283. // UploadMode 0 means standard, the files are uploaded directly to the requested path.
  284. // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
  285. // when the client ends the upload. Atomic mode avoid problems such as a web server that
  286. // serves partial files when the files are being uploaded.
  287. // In atomic mode if there is an upload error the temporary file is deleted and so the requested
  288. // upload path will not contain a partial file.
  289. // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
  290. // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
  291. // the upload.
  292. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
  293. // Actions to execute for SFTP file operations and SSH commands
  294. Actions ProtocolActions `json:"actions" mapstructure:"actions"`
  295. // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
  296. // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
  297. // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group/time are
  298. // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob
  299. SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
  300. // Support for HAProxy PROXY protocol.
  301. // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
  302. // the proxy protocol. It provides a convenient way to safely transport connection information
  303. // such as a client's address across multiple layers of NAT or TCP proxies to get the real
  304. // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
  305. // - 0 means disabled
  306. // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
  307. // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
  308. // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
  309. // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
  310. ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
  311. // List of IP addresses and IP ranges allowed to send the proxy header.
  312. // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
  313. // connection will be accepted and the header will be ignored.
  314. // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
  315. // connection will be rejected.
  316. ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
  317. // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
  318. // If you define an HTTP URL it will be invoked using a `GET` request.
  319. // Please note that SFTPGo services may not yet be available when this hook is run.
  320. // Leave empty do disable.
  321. StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
  322. // Absolute path to an external program or an HTTP URL to invoke after a user connects
  323. // and before he tries to login. It allows you to reject the connection based on the source
  324. // ip address. Leave empty do disable.
  325. PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
  326. // Maximum number of concurrent client connections. 0 means unlimited
  327. MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
  328. // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
  329. MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
  330. // Defender configuration
  331. DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
  332. // Rate limiter configurations
  333. RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
  334. idleTimeoutAsDuration time.Duration
  335. idleLoginTimeout time.Duration
  336. defender Defender
  337. }
  338. // IsAtomicUploadEnabled returns true if atomic upload is enabled
  339. func (c *Configuration) IsAtomicUploadEnabled() bool {
  340. return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
  341. }
  342. // GetProxyListener returns a wrapper for the given listener that supports the
  343. // HAProxy Proxy Protocol or nil if the proxy protocol is not configured
  344. func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
  345. var proxyListener *proxyproto.Listener
  346. var err error
  347. if c.ProxyProtocol > 0 {
  348. var policyFunc func(upstream net.Addr) (proxyproto.Policy, error)
  349. if c.ProxyProtocol == 1 && len(c.ProxyAllowed) > 0 {
  350. policyFunc, err = proxyproto.LaxWhiteListPolicy(c.ProxyAllowed)
  351. if err != nil {
  352. return nil, err
  353. }
  354. }
  355. if c.ProxyProtocol == 2 {
  356. if len(c.ProxyAllowed) == 0 {
  357. policyFunc = func(upstream net.Addr) (proxyproto.Policy, error) {
  358. return proxyproto.REQUIRE, nil
  359. }
  360. } else {
  361. policyFunc, err = proxyproto.StrictWhiteListPolicy(c.ProxyAllowed)
  362. if err != nil {
  363. return nil, err
  364. }
  365. }
  366. }
  367. proxyListener = &proxyproto.Listener{
  368. Listener: listener,
  369. Policy: policyFunc,
  370. }
  371. }
  372. return proxyListener, nil
  373. }
  374. // ExecuteStartupHook runs the startup hook if defined
  375. func (c *Configuration) ExecuteStartupHook() error {
  376. if c.StartupHook == "" {
  377. return nil
  378. }
  379. if strings.HasPrefix(c.StartupHook, "http") {
  380. var url *url.URL
  381. url, err := url.Parse(c.StartupHook)
  382. if err != nil {
  383. logger.Warn(logSender, "", "Invalid startup hook %#v: %v", c.StartupHook, err)
  384. return err
  385. }
  386. startTime := time.Now()
  387. httpClient := httpclient.GetRetraybleHTTPClient()
  388. resp, err := httpClient.Get(url.String())
  389. if err != nil {
  390. logger.Warn(logSender, "", "Error executing startup hook: %v", err)
  391. return err
  392. }
  393. defer resp.Body.Close()
  394. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
  395. return nil
  396. }
  397. if !filepath.IsAbs(c.StartupHook) {
  398. err := fmt.Errorf("invalid startup hook %#v", c.StartupHook)
  399. logger.Warn(logSender, "", "Invalid startup hook %#v", c.StartupHook)
  400. return err
  401. }
  402. startTime := time.Now()
  403. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  404. defer cancel()
  405. cmd := exec.CommandContext(ctx, c.StartupHook)
  406. err := cmd.Run()
  407. logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, error: %v", time.Since(startTime), err)
  408. return nil
  409. }
  410. // ExecutePostConnectHook executes the post connect hook if defined
  411. func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
  412. if c.PostConnectHook == "" {
  413. return nil
  414. }
  415. if strings.HasPrefix(c.PostConnectHook, "http") {
  416. var url *url.URL
  417. url, err := url.Parse(c.PostConnectHook)
  418. if err != nil {
  419. logger.Warn(protocol, "", "Login from ip %#v denied, invalid post connect hook %#v: %v",
  420. ipAddr, c.PostConnectHook, err)
  421. return err
  422. }
  423. httpClient := httpclient.GetRetraybleHTTPClient()
  424. q := url.Query()
  425. q.Add("ip", ipAddr)
  426. q.Add("protocol", protocol)
  427. url.RawQuery = q.Encode()
  428. resp, err := httpClient.Get(url.String())
  429. if err != nil {
  430. logger.Warn(protocol, "", "Login from ip %#v denied, error executing post connect hook: %v", ipAddr, err)
  431. return err
  432. }
  433. defer resp.Body.Close()
  434. if resp.StatusCode != http.StatusOK {
  435. logger.Warn(protocol, "", "Login from ip %#v denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
  436. return errUnexpectedHTTResponse
  437. }
  438. return nil
  439. }
  440. if !filepath.IsAbs(c.PostConnectHook) {
  441. err := fmt.Errorf("invalid post connect hook %#v", c.PostConnectHook)
  442. logger.Warn(protocol, "", "Login from ip %#v denied: %v", ipAddr, err)
  443. return err
  444. }
  445. ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  446. defer cancel()
  447. cmd := exec.CommandContext(ctx, c.PostConnectHook)
  448. cmd.Env = append(os.Environ(),
  449. fmt.Sprintf("SFTPGO_CONNECTION_IP=%v", ipAddr),
  450. fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%v", protocol))
  451. err := cmd.Run()
  452. if err != nil {
  453. logger.Warn(protocol, "", "Login from ip %#v denied, connect hook error: %v", ipAddr, err)
  454. }
  455. return err
  456. }
  457. // SSHConnection defines an ssh connection.
  458. // Each SSH connection can open several channels for SFTP or SSH commands
  459. type SSHConnection struct {
  460. id string
  461. conn net.Conn
  462. lastActivity int64
  463. }
  464. // NewSSHConnection returns a new SSHConnection
  465. func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
  466. return &SSHConnection{
  467. id: id,
  468. conn: conn,
  469. lastActivity: time.Now().UnixNano(),
  470. }
  471. }
  472. // GetID returns the ID for this SSHConnection
  473. func (c *SSHConnection) GetID() string {
  474. return c.id
  475. }
  476. // UpdateLastActivity updates last activity for this connection
  477. func (c *SSHConnection) UpdateLastActivity() {
  478. atomic.StoreInt64(&c.lastActivity, time.Now().UnixNano())
  479. }
  480. // GetLastActivity returns the last connection activity
  481. func (c *SSHConnection) GetLastActivity() time.Time {
  482. return time.Unix(0, atomic.LoadInt64(&c.lastActivity))
  483. }
  484. // Close closes the underlying network connection
  485. func (c *SSHConnection) Close() error {
  486. return c.conn.Close()
  487. }
  488. // ActiveConnections holds the currect active connections with the associated transfers
  489. type ActiveConnections struct {
  490. // clients contains both authenticated and estabilished connections and the ones waiting
  491. // for authentication
  492. clients clientsMap
  493. sync.RWMutex
  494. connections []ActiveConnection
  495. sshConnections []*SSHConnection
  496. }
  497. // GetActiveSessions returns the number of active sessions for the given username.
  498. // We return the open sessions for any protocol
  499. func (conns *ActiveConnections) GetActiveSessions(username string) int {
  500. conns.RLock()
  501. defer conns.RUnlock()
  502. numSessions := 0
  503. for _, c := range conns.connections {
  504. if c.GetUsername() == username {
  505. numSessions++
  506. }
  507. }
  508. return numSessions
  509. }
  510. // Add adds a new connection to the active ones
  511. func (conns *ActiveConnections) Add(c ActiveConnection) {
  512. conns.Lock()
  513. defer conns.Unlock()
  514. conns.connections = append(conns.connections, c)
  515. metrics.UpdateActiveConnectionsSize(len(conns.connections))
  516. logger.Debug(c.GetProtocol(), c.GetID(), "connection added, num open connections: %v", len(conns.connections))
  517. }
  518. // Swap replaces an existing connection with the given one.
  519. // This method is useful if you have to change some connection details
  520. // for example for FTP is used to update the connection once the user
  521. // authenticates
  522. func (conns *ActiveConnections) Swap(c ActiveConnection) error {
  523. conns.Lock()
  524. defer conns.Unlock()
  525. for idx, conn := range conns.connections {
  526. if conn.GetID() == c.GetID() {
  527. conn = nil
  528. conns.connections[idx] = c
  529. return nil
  530. }
  531. }
  532. return errors.New("connection to swap not found")
  533. }
  534. // Remove removes a connection from the active ones
  535. func (conns *ActiveConnections) Remove(connectionID string) {
  536. conns.Lock()
  537. defer conns.Unlock()
  538. for idx, conn := range conns.connections {
  539. if conn.GetID() == connectionID {
  540. err := conn.CloseFS()
  541. lastIdx := len(conns.connections) - 1
  542. conns.connections[idx] = conns.connections[lastIdx]
  543. conns.connections[lastIdx] = nil
  544. conns.connections = conns.connections[:lastIdx]
  545. metrics.UpdateActiveConnectionsSize(lastIdx)
  546. logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, close fs error: %v, num open connections: %v",
  547. err, lastIdx)
  548. return
  549. }
  550. }
  551. logger.Warn(logSender, "", "connection id %#v to remove not found!", connectionID)
  552. }
  553. // Close closes an active connection.
  554. // It returns true on success
  555. func (conns *ActiveConnections) Close(connectionID string) bool {
  556. conns.RLock()
  557. result := false
  558. for _, c := range conns.connections {
  559. if c.GetID() == connectionID {
  560. defer func(conn ActiveConnection) {
  561. err := conn.Disconnect()
  562. logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
  563. }(c)
  564. result = true
  565. break
  566. }
  567. }
  568. conns.RUnlock()
  569. return result
  570. }
  571. // AddSSHConnection adds a new ssh connection to the active ones
  572. func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
  573. conns.Lock()
  574. defer conns.Unlock()
  575. conns.sshConnections = append(conns.sshConnections, c)
  576. logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %v", len(conns.sshConnections))
  577. }
  578. // RemoveSSHConnection removes a connection from the active ones
  579. func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
  580. conns.Lock()
  581. defer conns.Unlock()
  582. for idx, conn := range conns.sshConnections {
  583. if conn.GetID() == connectionID {
  584. lastIdx := len(conns.sshConnections) - 1
  585. conns.sshConnections[idx] = conns.sshConnections[lastIdx]
  586. conns.sshConnections[lastIdx] = nil
  587. conns.sshConnections = conns.sshConnections[:lastIdx]
  588. logger.Debug(logSender, conn.GetID(), "ssh connection removed, num open ssh connections: %v", lastIdx)
  589. return
  590. }
  591. }
  592. logger.Warn(logSender, "", "ssh connection to remove with id %#v not found!", connectionID)
  593. }
  594. func (conns *ActiveConnections) checkIdles() {
  595. conns.RLock()
  596. for _, sshConn := range conns.sshConnections {
  597. idleTime := time.Since(sshConn.GetLastActivity())
  598. if idleTime > Config.idleTimeoutAsDuration {
  599. // we close the an ssh connection if it has no active connections associated
  600. idToMatch := fmt.Sprintf("_%v_", sshConn.GetID())
  601. toClose := true
  602. for _, conn := range conns.connections {
  603. if strings.Contains(conn.GetID(), idToMatch) {
  604. toClose = false
  605. break
  606. }
  607. }
  608. if toClose {
  609. defer func(c *SSHConnection) {
  610. err := c.Close()
  611. logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
  612. time.Since(c.GetLastActivity()), err)
  613. }(sshConn)
  614. }
  615. }
  616. }
  617. for _, c := range conns.connections {
  618. idleTime := time.Since(c.GetLastActivity())
  619. isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
  620. if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
  621. defer func(conn ActiveConnection, isFTPNoAuth bool) {
  622. err := conn.Disconnect()
  623. logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %#v close err: %v",
  624. time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
  625. if isFTPNoAuth {
  626. ip := utils.GetIPFromRemoteAddress(c.GetRemoteAddress())
  627. logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, c.GetProtocol(), "client idle")
  628. metrics.AddNoAuthTryed()
  629. AddDefenderEvent(ip, HostEventNoLoginTried)
  630. dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip, c.GetProtocol(),
  631. dataprovider.ErrNoAuthTryed)
  632. }
  633. }(c, isUnauthenticatedFTPUser)
  634. }
  635. }
  636. conns.RUnlock()
  637. }
  638. // AddClientConnection stores a new client connection
  639. func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
  640. conns.clients.add(ipAddr)
  641. }
  642. // RemoveClientConnection removes a disconnected client from the tracked ones
  643. func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
  644. conns.clients.remove(ipAddr)
  645. }
  646. // GetClientConnections returns the total number of client connections
  647. func (conns *ActiveConnections) GetClientConnections() int32 {
  648. return conns.clients.getTotal()
  649. }
  650. // IsNewConnectionAllowed returns false if the maximum number of concurrent allowed connections is exceeded
  651. func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr string) bool {
  652. if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
  653. return true
  654. }
  655. if Config.MaxPerHostConnections > 0 {
  656. if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
  657. logger.Debug(logSender, "", "active connections from %v %v/%v", ipAddr, total, Config.MaxPerHostConnections)
  658. AddDefenderEvent(ipAddr, HostEventLimitExceeded)
  659. return false
  660. }
  661. }
  662. if Config.MaxTotalConnections > 0 {
  663. if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
  664. logger.Debug(logSender, "", "active client connections %v/%v", total, Config.MaxTotalConnections)
  665. return false
  666. }
  667. // on a single SFTP connection we could have multiple SFTP channels or commands
  668. // so we check the estabilished connections too
  669. conns.RLock()
  670. defer conns.RUnlock()
  671. return len(conns.connections) < Config.MaxTotalConnections
  672. }
  673. return true
  674. }
  675. // GetStats returns stats for active connections
  676. func (conns *ActiveConnections) GetStats() []*ConnectionStatus {
  677. conns.RLock()
  678. defer conns.RUnlock()
  679. stats := make([]*ConnectionStatus, 0, len(conns.connections))
  680. for _, c := range conns.connections {
  681. stat := &ConnectionStatus{
  682. Username: c.GetUsername(),
  683. ConnectionID: c.GetID(),
  684. ClientVersion: c.GetClientVersion(),
  685. RemoteAddress: c.GetRemoteAddress(),
  686. ConnectionTime: utils.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
  687. LastActivity: utils.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
  688. Protocol: c.GetProtocol(),
  689. Command: c.GetCommand(),
  690. Transfers: c.GetTransfers(),
  691. }
  692. stats = append(stats, stat)
  693. }
  694. return stats
  695. }
  696. // ConnectionStatus returns the status for an active connection
  697. type ConnectionStatus struct {
  698. // Logged in username
  699. Username string `json:"username"`
  700. // Unique identifier for the connection
  701. ConnectionID string `json:"connection_id"`
  702. // client's version string
  703. ClientVersion string `json:"client_version,omitempty"`
  704. // Remote address for this connection
  705. RemoteAddress string `json:"remote_address"`
  706. // Connection time as unix timestamp in milliseconds
  707. ConnectionTime int64 `json:"connection_time"`
  708. // Last activity as unix timestamp in milliseconds
  709. LastActivity int64 `json:"last_activity"`
  710. // Protocol for this connection
  711. Protocol string `json:"protocol"`
  712. // active uploads/downloads
  713. Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
  714. // SSH command or WebDAV method
  715. Command string `json:"command,omitempty"`
  716. }
  717. // GetConnectionDuration returns the connection duration as string
  718. func (c *ConnectionStatus) GetConnectionDuration() string {
  719. elapsed := time.Since(utils.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
  720. return utils.GetDurationAsString(elapsed)
  721. }
  722. // GetConnectionInfo returns connection info.
  723. // Protocol,Client Version and RemoteAddress are returned.
  724. func (c *ConnectionStatus) GetConnectionInfo() string {
  725. var result strings.Builder
  726. result.WriteString(fmt.Sprintf("%v. Client: %#v From: %#v", c.Protocol, c.ClientVersion, c.RemoteAddress))
  727. if c.Command == "" {
  728. return result.String()
  729. }
  730. switch c.Protocol {
  731. case ProtocolSSH, ProtocolFTP:
  732. result.WriteString(fmt.Sprintf(". Command: %#v", c.Command))
  733. case ProtocolWebDAV:
  734. result.WriteString(fmt.Sprintf(". Method: %#v", c.Command))
  735. }
  736. return result.String()
  737. }
  738. // GetTransfersAsString returns the active transfers as string
  739. func (c *ConnectionStatus) GetTransfersAsString() string {
  740. result := ""
  741. for _, t := range c.Transfers {
  742. if result != "" {
  743. result += ". "
  744. }
  745. result += t.getConnectionTransferAsString()
  746. }
  747. return result
  748. }
  749. // ActiveQuotaScan defines an active quota scan for a user home dir
  750. type ActiveQuotaScan struct {
  751. // Username to which the quota scan refers
  752. Username string `json:"username"`
  753. // quota scan start time as unix timestamp in milliseconds
  754. StartTime int64 `json:"start_time"`
  755. }
  756. // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
  757. type ActiveVirtualFolderQuotaScan struct {
  758. // folder name to which the quota scan refers
  759. Name string `json:"name"`
  760. // quota scan start time as unix timestamp in milliseconds
  761. StartTime int64 `json:"start_time"`
  762. }
  763. // ActiveScans holds the active quota scans
  764. type ActiveScans struct {
  765. sync.RWMutex
  766. UserHomeScans []ActiveQuotaScan
  767. FolderScans []ActiveVirtualFolderQuotaScan
  768. }
  769. // GetUsersQuotaScans returns the active quota scans for users home directories
  770. func (s *ActiveScans) GetUsersQuotaScans() []ActiveQuotaScan {
  771. s.RLock()
  772. defer s.RUnlock()
  773. scans := make([]ActiveQuotaScan, len(s.UserHomeScans))
  774. copy(scans, s.UserHomeScans)
  775. return scans
  776. }
  777. // AddUserQuotaScan adds a user to the ones with active quota scans.
  778. // Returns false if the user has a quota scan already running
  779. func (s *ActiveScans) AddUserQuotaScan(username string) bool {
  780. s.Lock()
  781. defer s.Unlock()
  782. for _, scan := range s.UserHomeScans {
  783. if scan.Username == username {
  784. return false
  785. }
  786. }
  787. s.UserHomeScans = append(s.UserHomeScans, ActiveQuotaScan{
  788. Username: username,
  789. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  790. })
  791. return true
  792. }
  793. // RemoveUserQuotaScan removes a user from the ones with active quota scans.
  794. // Returns false if the user has no active quota scans
  795. func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
  796. s.Lock()
  797. defer s.Unlock()
  798. indexToRemove := -1
  799. for i, scan := range s.UserHomeScans {
  800. if scan.Username == username {
  801. indexToRemove = i
  802. break
  803. }
  804. }
  805. if indexToRemove >= 0 {
  806. s.UserHomeScans[indexToRemove] = s.UserHomeScans[len(s.UserHomeScans)-1]
  807. s.UserHomeScans = s.UserHomeScans[:len(s.UserHomeScans)-1]
  808. return true
  809. }
  810. return false
  811. }
  812. // GetVFoldersQuotaScans returns the active quota scans for virtual folders
  813. func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
  814. s.RLock()
  815. defer s.RUnlock()
  816. scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
  817. copy(scans, s.FolderScans)
  818. return scans
  819. }
  820. // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
  821. // Returns false if the folder has a quota scan already running
  822. func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
  823. s.Lock()
  824. defer s.Unlock()
  825. for _, scan := range s.FolderScans {
  826. if scan.Name == folderName {
  827. return false
  828. }
  829. }
  830. s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
  831. Name: folderName,
  832. StartTime: utils.GetTimeAsMsSinceEpoch(time.Now()),
  833. })
  834. return true
  835. }
  836. // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
  837. // Returns false if the folder has no active quota scans
  838. func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
  839. s.Lock()
  840. defer s.Unlock()
  841. indexToRemove := -1
  842. for i, scan := range s.FolderScans {
  843. if scan.Name == folderName {
  844. indexToRemove = i
  845. break
  846. }
  847. }
  848. if indexToRemove >= 0 {
  849. s.FolderScans[indexToRemove] = s.FolderScans[len(s.FolderScans)-1]
  850. s.FolderScans = s.FolderScans[:len(s.FolderScans)-1]
  851. return true
  852. }
  853. return false
  854. }