1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459 |
- // Copyright (C) 2019-2023 Nicola Murino
- //
- // This program is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Affero General Public License as published
- // by the Free Software Foundation, version 3.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Affero General Public License for more details.
- //
- // You should have received a copy of the GNU Affero General Public License
- // along with this program. If not, see <https://www.gnu.org/licenses/>.
- // Package common defines code shared among file transfer packages and protocols
- package common
- import (
- "context"
- "errors"
- "fmt"
- "net"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pires/go-proxyproto"
- "github.com/drakkan/sftpgo/v2/internal/command"
- "github.com/drakkan/sftpgo/v2/internal/dataprovider"
- "github.com/drakkan/sftpgo/v2/internal/httpclient"
- "github.com/drakkan/sftpgo/v2/internal/logger"
- "github.com/drakkan/sftpgo/v2/internal/metric"
- "github.com/drakkan/sftpgo/v2/internal/plugin"
- "github.com/drakkan/sftpgo/v2/internal/smtp"
- "github.com/drakkan/sftpgo/v2/internal/util"
- "github.com/drakkan/sftpgo/v2/internal/vfs"
- )
- // constants
- const (
- logSender = "common"
- uploadLogSender = "Upload"
- downloadLogSender = "Download"
- renameLogSender = "Rename"
- rmdirLogSender = "Rmdir"
- mkdirLogSender = "Mkdir"
- symlinkLogSender = "Symlink"
- removeLogSender = "Remove"
- chownLogSender = "Chown"
- chmodLogSender = "Chmod"
- chtimesLogSender = "Chtimes"
- copyLogSender = "Copy"
- truncateLogSender = "Truncate"
- operationDownload = "download"
- operationUpload = "upload"
- operationFirstDownload = "first-download"
- operationFirstUpload = "first-upload"
- operationDelete = "delete"
- operationCopy = "copy"
- // Pre-download action name
- OperationPreDownload = "pre-download"
- // Pre-upload action name
- OperationPreUpload = "pre-upload"
- operationPreDelete = "pre-delete"
- operationRename = "rename"
- operationMkdir = "mkdir"
- operationRmdir = "rmdir"
- // SSH command action name
- OperationSSHCmd = "ssh_cmd"
- chtimesFormat = "2006-01-02T15:04:05" // YYYY-MM-DDTHH:MM:SS
- idleTimeoutCheckInterval = 3 * time.Minute
- periodicTimeoutCheckInterval = 1 * time.Minute
- )
- // Stat flags
- const (
- StatAttrUIDGID = 1
- StatAttrPerms = 2
- StatAttrTimes = 4
- StatAttrSize = 8
- )
- // Transfer types
- const (
- TransferUpload = iota
- TransferDownload
- )
- // Supported protocols
- const (
- ProtocolSFTP = "SFTP"
- ProtocolSCP = "SCP"
- ProtocolSSH = "SSH"
- ProtocolFTP = "FTP"
- ProtocolWebDAV = "DAV"
- ProtocolHTTP = "HTTP"
- ProtocolHTTPShare = "HTTPShare"
- ProtocolDataRetention = "DataRetention"
- ProtocolOIDC = "OIDC"
- protocolEventAction = "EventAction"
- )
- // Upload modes
- const (
- UploadModeStandard = iota
- UploadModeAtomic
- UploadModeAtomicWithResume
- )
- func init() {
- Connections.clients = clientsMap{
- clients: make(map[string]int),
- }
- Connections.perUserConns = make(map[string]int)
- Connections.mapping = make(map[string]int)
- Connections.sshMapping = make(map[string]int)
- }
- // errors definitions
- var (
- ErrPermissionDenied = errors.New("permission denied")
- ErrNotExist = errors.New("no such file or directory")
- ErrOpUnsupported = errors.New("operation unsupported")
- ErrGenericFailure = errors.New("failure")
- ErrQuotaExceeded = errors.New("denying write due to space limit")
- ErrReadQuotaExceeded = errors.New("denying read due to quota limit")
- ErrConnectionDenied = errors.New("you are not allowed to connect")
- ErrNoBinding = errors.New("no binding configured")
- ErrCrtRevoked = errors.New("your certificate has been revoked")
- ErrNoCredentials = errors.New("no credential provided")
- ErrInternalFailure = errors.New("internal failure")
- ErrTransferAborted = errors.New("transfer aborted")
- ErrShuttingDown = errors.New("the service is shutting down")
- errNoTransfer = errors.New("requested transfer not found")
- errTransferMismatch = errors.New("transfer mismatch")
- )
- var (
- // Config is the configuration for the supported protocols
- Config Configuration
- // Connections is the list of active connections
- Connections ActiveConnections
- // QuotaScans is the list of active quota scans
- QuotaScans ActiveScans
- // ActiveMetadataChecks holds the active metadata checks
- ActiveMetadataChecks MetadataChecks
- transfersChecker TransfersChecker
- supportedProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP, ProtocolWebDAV,
- ProtocolHTTP, ProtocolHTTPShare, ProtocolOIDC}
- disconnHookProtocols = []string{ProtocolSFTP, ProtocolSCP, ProtocolSSH, ProtocolFTP}
- // the map key is the protocol, for each protocol we can have multiple rate limiters
- rateLimiters map[string][]*rateLimiter
- isShuttingDown atomic.Bool
- ftpLoginCommands = []string{"PASS", "USER"}
- )
- // Initialize sets the common configuration
- func Initialize(c Configuration, isShared int) error {
- isShuttingDown.Store(false)
- Config = c
- Config.Actions.ExecuteOn = util.RemoveDuplicates(Config.Actions.ExecuteOn, true)
- Config.Actions.ExecuteSync = util.RemoveDuplicates(Config.Actions.ExecuteSync, true)
- Config.ProxyAllowed = util.RemoveDuplicates(Config.ProxyAllowed, true)
- Config.idleLoginTimeout = 2 * time.Minute
- Config.idleTimeoutAsDuration = time.Duration(Config.IdleTimeout) * time.Minute
- startPeriodicChecks(periodicTimeoutCheckInterval, isShared)
- Config.defender = nil
- Config.allowList = nil
- Config.rateLimitersList = nil
- rateLimiters = make(map[string][]*rateLimiter)
- for _, rlCfg := range c.RateLimitersConfig {
- if rlCfg.isEnabled() {
- if err := rlCfg.validate(); err != nil {
- return fmt.Errorf("rate limiters initialization error: %w", err)
- }
- rateLimiter := rlCfg.getLimiter()
- for _, protocol := range rlCfg.Protocols {
- rateLimiters[protocol] = append(rateLimiters[protocol], rateLimiter)
- }
- }
- }
- if len(rateLimiters) > 0 {
- rateLimitersList, err := dataprovider.NewIPList(dataprovider.IPListTypeRateLimiterSafeList)
- if err != nil {
- return fmt.Errorf("unable to initialize ratelimiters list: %w", err)
- }
- Config.rateLimitersList = rateLimitersList
- }
- if c.DefenderConfig.Enabled {
- if !util.Contains(supportedDefenderDrivers, c.DefenderConfig.Driver) {
- return fmt.Errorf("unsupported defender driver %q", c.DefenderConfig.Driver)
- }
- var defender Defender
- var err error
- switch c.DefenderConfig.Driver {
- case DefenderDriverProvider:
- defender, err = newDBDefender(&c.DefenderConfig)
- default:
- defender, err = newInMemoryDefender(&c.DefenderConfig)
- }
- if err != nil {
- return fmt.Errorf("defender initialization error: %v", err)
- }
- logger.Info(logSender, "", "defender initialized with config %+v", c.DefenderConfig)
- Config.defender = defender
- }
- if c.AllowListStatus > 0 {
- allowList, err := dataprovider.NewIPList(dataprovider.IPListTypeAllowList)
- if err != nil {
- return fmt.Errorf("unable to initialize the allow list: %w", err)
- }
- logger.Info(logSender, "", "allow list initialized")
- Config.allowList = allowList
- }
- vfs.SetTempPath(c.TempPath)
- dataprovider.SetTempPath(c.TempPath)
- vfs.SetAllowSelfConnections(c.AllowSelfConnections)
- vfs.SetRenameMode(c.RenameMode)
- dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
- transfersChecker = getTransfersChecker(isShared)
- return nil
- }
- // CheckClosing returns an error if the service is closing
- func CheckClosing() error {
- if isShuttingDown.Load() {
- return ErrShuttingDown
- }
- return nil
- }
- // WaitForTransfers waits, for the specified grace time, for currently ongoing
- // client-initiated transfer sessions to completes.
- // A zero graceTime means no wait
- func WaitForTransfers(graceTime int) {
- if graceTime == 0 {
- return
- }
- if isShuttingDown.Swap(true) {
- return
- }
- if activeHooks.Load() == 0 && getActiveConnections() == 0 {
- return
- }
- graceTimer := time.NewTimer(time.Duration(graceTime) * time.Second)
- ticker := time.NewTicker(3 * time.Second)
- for {
- select {
- case <-ticker.C:
- hooks := activeHooks.Load()
- logger.Info(logSender, "", "active hooks: %d", hooks)
- if hooks == 0 && getActiveConnections() == 0 {
- logger.Info(logSender, "", "no more active connections, graceful shutdown")
- ticker.Stop()
- graceTimer.Stop()
- return
- }
- case <-graceTimer.C:
- logger.Info(logSender, "", "grace time expired, hard shutdown")
- ticker.Stop()
- return
- }
- }
- }
- // getActiveConnections returns the number of connections with active transfers
- func getActiveConnections() int {
- var activeConns int
- Connections.RLock()
- for _, c := range Connections.connections {
- if len(c.GetTransfers()) > 0 {
- activeConns++
- }
- }
- Connections.RUnlock()
- logger.Info(logSender, "", "number of connections with active transfers: %d", activeConns)
- return activeConns
- }
- // LimitRate blocks until all the configured rate limiters
- // allow one event to happen.
- // It returns an error if the time to wait exceeds the max
- // allowed delay
- func LimitRate(protocol, ip string) (time.Duration, error) {
- if Config.rateLimitersList != nil {
- isListed, _, err := Config.rateLimitersList.IsListed(ip, protocol)
- if err == nil && isListed {
- return 0, nil
- }
- }
- for _, limiter := range rateLimiters[protocol] {
- if delay, err := limiter.Wait(ip, protocol); err != nil {
- logger.Debug(logSender, "", "protocol %s ip %s: %v", protocol, ip, err)
- return delay, err
- }
- }
- return 0, nil
- }
- // Reload reloads the whitelist, the IP filter plugin and the defender's block and safe lists
- func Reload() error {
- plugin.Handler.ReloadFilter()
- return nil
- }
- // IsBanned returns true if the specified IP address is banned
- func IsBanned(ip, protocol string) bool {
- if plugin.Handler.IsIPBanned(ip, protocol) {
- return true
- }
- if Config.defender == nil {
- return false
- }
- return Config.defender.IsBanned(ip, protocol)
- }
- // GetDefenderBanTime returns the ban time for the given IP
- // or nil if the IP is not banned or the defender is disabled
- func GetDefenderBanTime(ip string) (*time.Time, error) {
- if Config.defender == nil {
- return nil, nil
- }
- return Config.defender.GetBanTime(ip)
- }
- // GetDefenderHosts returns hosts that are banned or for which some violations have been detected
- func GetDefenderHosts() ([]dataprovider.DefenderEntry, error) {
- if Config.defender == nil {
- return nil, nil
- }
- return Config.defender.GetHosts()
- }
- // GetDefenderHost returns a defender host by ip, if any
- func GetDefenderHost(ip string) (dataprovider.DefenderEntry, error) {
- if Config.defender == nil {
- return dataprovider.DefenderEntry{}, errors.New("defender is disabled")
- }
- return Config.defender.GetHost(ip)
- }
- // DeleteDefenderHost removes the specified IP address from the defender lists
- func DeleteDefenderHost(ip string) bool {
- if Config.defender == nil {
- return false
- }
- return Config.defender.DeleteHost(ip)
- }
- // GetDefenderScore returns the score for the given IP
- func GetDefenderScore(ip string) (int, error) {
- if Config.defender == nil {
- return 0, nil
- }
- return Config.defender.GetScore(ip)
- }
- // AddDefenderEvent adds the specified defender event for the given IP
- func AddDefenderEvent(ip, protocol string, event HostEvent) {
- if Config.defender == nil {
- return
- }
- Config.defender.AddEvent(ip, protocol, event)
- }
- func startPeriodicChecks(duration time.Duration, isShared int) {
- startEventScheduler()
- spec := fmt.Sprintf("@every %s", duration)
- _, err := eventScheduler.AddFunc(spec, Connections.checkTransfers)
- util.PanicOnError(err)
- logger.Info(logSender, "", "scheduled overquota transfers check, schedule %q", spec)
- if isShared == 1 {
- logger.Info(logSender, "", "add reload configs task")
- _, err := eventScheduler.AddFunc("@every 10m", smtp.ReloadProviderConf)
- util.PanicOnError(err)
- }
- if Config.IdleTimeout > 0 {
- ratio := idleTimeoutCheckInterval / periodicTimeoutCheckInterval
- spec = fmt.Sprintf("@every %s", duration*ratio)
- _, err = eventScheduler.AddFunc(spec, Connections.checkIdles)
- util.PanicOnError(err)
- logger.Info(logSender, "", "scheduled idle connections check, schedule %q", spec)
- }
- }
- // ActiveTransfer defines the interface for the current active transfers
- type ActiveTransfer interface {
- GetID() int64
- GetType() int
- GetSize() int64
- GetDownloadedSize() int64
- GetUploadedSize() int64
- GetVirtualPath() string
- GetStartTime() time.Time
- SignalClose(err error)
- Truncate(fsPath string, size int64) (int64, error)
- GetRealFsPath(fsPath string) string
- SetTimes(fsPath string, atime time.Time, mtime time.Time) bool
- GetTruncatedSize() int64
- HasSizeLimit() bool
- }
- // ActiveConnection defines the interface for the current active connections
- type ActiveConnection interface {
- GetID() string
- GetUsername() string
- GetRole() string
- GetMaxSessions() int
- GetLocalAddress() string
- GetRemoteAddress() string
- GetClientVersion() string
- GetProtocol() string
- GetConnectionTime() time.Time
- GetLastActivity() time.Time
- GetCommand() string
- Disconnect() error
- AddTransfer(t ActiveTransfer)
- RemoveTransfer(t ActiveTransfer)
- GetTransfers() []ConnectionTransfer
- SignalTransferClose(transferID int64, err error)
- CloseFS() error
- }
- // StatAttributes defines the attributes for set stat commands
- type StatAttributes struct {
- Mode os.FileMode
- Atime time.Time
- Mtime time.Time
- UID int
- GID int
- Flags int
- Size int64
- }
- // ConnectionTransfer defines the trasfer details
- type ConnectionTransfer struct {
- ID int64 `json:"-"`
- OperationType string `json:"operation_type"`
- StartTime int64 `json:"start_time"`
- Size int64 `json:"size"`
- VirtualPath string `json:"path"`
- HasSizeLimit bool `json:"-"`
- ULSize int64 `json:"-"`
- DLSize int64 `json:"-"`
- }
- func (t *ConnectionTransfer) getConnectionTransferAsString() string {
- result := ""
- switch t.OperationType {
- case operationUpload:
- result += "UL "
- case operationDownload:
- result += "DL "
- }
- result += fmt.Sprintf("%q ", t.VirtualPath)
- if t.Size > 0 {
- elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(t.StartTime))
- speed := float64(t.Size) / float64(util.GetTimeAsMsSinceEpoch(time.Now())-t.StartTime)
- result += fmt.Sprintf("Size: %s Elapsed: %s Speed: \"%.1f KB/s\"", util.ByteCountIEC(t.Size),
- util.GetDurationAsString(elapsed), speed)
- }
- return result
- }
- // Configuration defines configuration parameters common to all supported protocols
- type Configuration struct {
- // Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
- // 0 means disabled
- IdleTimeout int `json:"idle_timeout" mapstructure:"idle_timeout"`
- // UploadMode 0 means standard, the files are uploaded directly to the requested path.
- // 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
- // when the client ends the upload. Atomic mode avoid problems such as a web server that
- // serves partial files when the files are being uploaded.
- // In atomic mode if there is an upload error the temporary file is deleted and so the requested
- // upload path will not contain a partial file.
- // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
- // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
- // the upload.
- UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
- // Actions to execute for SFTP file operations and SSH commands
- Actions ProtocolActions `json:"actions" mapstructure:"actions"`
- // SetstatMode 0 means "normal mode": requests for changing permissions and owner/group are executed.
- // 1 means "ignore mode": requests for changing permissions and owner/group are silently ignored.
- // 2 means "ignore mode for cloud fs": requests for changing permissions and owner/group are
- // silently ignored for cloud based filesystem such as S3, GCS, Azure Blob. Requests for changing
- // modification times are ignored for cloud based filesystem if they are not supported.
- SetstatMode int `json:"setstat_mode" mapstructure:"setstat_mode"`
- // RenameMode defines how to handle directory renames. By default, renaming of non-empty directories
- // is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to 1 to enable recursive
- // renames for these providers, they may be slow, there is no atomic rename API like for local
- // filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
- RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
- // TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
- // If you set this option you must make sure that the defined path exists, is accessible for writing
- // by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise
- // the renaming for atomic uploads will become a copy and therefore may take a long time.
- // The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
- TempPath string `json:"temp_path" mapstructure:"temp_path"`
- // Support for HAProxy PROXY protocol.
- // If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGNIX, you can enable
- // the proxy protocol. It provides a convenient way to safely transport connection information
- // such as a client's address across multiple layers of NAT or TCP proxies to get the real
- // client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported.
- // - 0 means disabled
- // - 1 means proxy protocol enabled. Proxy header will be used and requests without proxy header will be accepted.
- // - 2 means proxy protocol required. Proxy header will be used and requests without proxy header will be rejected.
- // If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too,
- // for example for HAProxy add "send-proxy" or "send-proxy-v2" to each server configuration line.
- ProxyProtocol int `json:"proxy_protocol" mapstructure:"proxy_protocol"`
- // List of IP addresses and IP ranges allowed to send the proxy header.
- // If proxy protocol is set to 1 and we receive a proxy header from an IP that is not in the list then the
- // connection will be accepted and the header will be ignored.
- // If proxy protocol is set to 2 and we receive a proxy header from an IP that is not in the list then the
- // connection will be rejected.
- ProxyAllowed []string `json:"proxy_allowed" mapstructure:"proxy_allowed"`
- // Absolute path to an external program or an HTTP URL to invoke as soon as SFTPGo starts.
- // If you define an HTTP URL it will be invoked using a `GET` request.
- // Please note that SFTPGo services may not yet be available when this hook is run.
- // Leave empty do disable.
- StartupHook string `json:"startup_hook" mapstructure:"startup_hook"`
- // Absolute path to an external program or an HTTP URL to invoke after a user connects
- // and before he tries to login. It allows you to reject the connection based on the source
- // ip address. Leave empty do disable.
- PostConnectHook string `json:"post_connect_hook" mapstructure:"post_connect_hook"`
- // Absolute path to an external program or an HTTP URL to invoke after an SSH/FTP connection ends.
- // Leave empty do disable.
- PostDisconnectHook string `json:"post_disconnect_hook" mapstructure:"post_disconnect_hook"`
- // Absolute path to an external program or an HTTP URL to invoke after a data retention check completes.
- // Leave empty do disable.
- DataRetentionHook string `json:"data_retention_hook" mapstructure:"data_retention_hook"`
- // Maximum number of concurrent client connections. 0 means unlimited
- MaxTotalConnections int `json:"max_total_connections" mapstructure:"max_total_connections"`
- // Maximum number of concurrent client connections from the same host (IP). 0 means unlimited
- MaxPerHostConnections int `json:"max_per_host_connections" mapstructure:"max_per_host_connections"`
- // Defines the status of the global allow list. 0 means disabled, 1 enabled.
- // If enabled, only the listed IPs/networks can access the configured services, all other
- // client connections will be dropped before they even try to authenticate.
- // Ensure to enable this setting only after adding some allowed ip/networks from the WebAdmin/REST API
- AllowListStatus int `json:"allowlist_status" mapstructure:"allowlist_status"`
- // Allow users on this instance to use other users/virtual folders on this instance as storage backend.
- // Enable this setting if you know what you are doing.
- AllowSelfConnections int `json:"allow_self_connections" mapstructure:"allow_self_connections"`
- // Defender configuration
- DefenderConfig DefenderConfig `json:"defender" mapstructure:"defender"`
- // Rate limiter configurations
- RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
- idleTimeoutAsDuration time.Duration
- idleLoginTimeout time.Duration
- defender Defender
- allowList *dataprovider.IPList
- rateLimitersList *dataprovider.IPList
- }
- // IsAtomicUploadEnabled returns true if atomic upload is enabled
- func (c *Configuration) IsAtomicUploadEnabled() bool {
- return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume
- }
- // GetProxyListener returns a wrapper for the given listener that supports the
- // HAProxy Proxy Protocol
- func (c *Configuration) GetProxyListener(listener net.Listener) (*proxyproto.Listener, error) {
- var err error
- if c.ProxyProtocol > 0 {
- var policyFunc func(upstream net.Addr) (proxyproto.Policy, error)
- if c.ProxyProtocol == 1 && len(c.ProxyAllowed) > 0 {
- policyFunc, err = proxyproto.LaxWhiteListPolicy(c.ProxyAllowed)
- if err != nil {
- return nil, err
- }
- }
- if c.ProxyProtocol == 2 {
- if len(c.ProxyAllowed) == 0 {
- policyFunc = func(upstream net.Addr) (proxyproto.Policy, error) {
- return proxyproto.REQUIRE, nil
- }
- } else {
- policyFunc, err = proxyproto.StrictWhiteListPolicy(c.ProxyAllowed)
- if err != nil {
- return nil, err
- }
- }
- }
- return &proxyproto.Listener{
- Listener: listener,
- Policy: policyFunc,
- ReadHeaderTimeout: 10 * time.Second,
- }, nil
- }
- return nil, errors.New("proxy protocol not configured")
- }
- // GetRateLimitersStatus returns the rate limiters status
- func (c *Configuration) GetRateLimitersStatus() (bool, []string) {
- enabled := false
- var protocols []string
- for _, rlCfg := range c.RateLimitersConfig {
- if rlCfg.isEnabled() {
- enabled = true
- protocols = append(protocols, rlCfg.Protocols...)
- }
- }
- return enabled, util.RemoveDuplicates(protocols, false)
- }
- // IsAllowListEnabled returns true if the global allow list is enabled
- func (c *Configuration) IsAllowListEnabled() bool {
- return c.AllowListStatus > 0
- }
- // ExecuteStartupHook runs the startup hook if defined
- func (c *Configuration) ExecuteStartupHook() error {
- if c.StartupHook == "" {
- return nil
- }
- if strings.HasPrefix(c.StartupHook, "http") {
- var url *url.URL
- url, err := url.Parse(c.StartupHook)
- if err != nil {
- logger.Warn(logSender, "", "Invalid startup hook %q: %v", c.StartupHook, err)
- return err
- }
- startTime := time.Now()
- resp, err := httpclient.RetryableGet(url.String())
- if err != nil {
- logger.Warn(logSender, "", "Error executing startup hook: %v", err)
- return err
- }
- defer resp.Body.Close()
- logger.Debug(logSender, "", "Startup hook executed, elapsed: %v, response code: %v", time.Since(startTime), resp.StatusCode)
- return nil
- }
- if !filepath.IsAbs(c.StartupHook) {
- err := fmt.Errorf("invalid startup hook %q", c.StartupHook)
- logger.Warn(logSender, "", "Invalid startup hook %q", c.StartupHook)
- return err
- }
- startTime := time.Now()
- timeout, env, args := command.GetConfig(c.StartupHook, command.HookStartup)
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- cmd := exec.CommandContext(ctx, c.StartupHook, args...)
- cmd.Env = env
- err := cmd.Run()
- logger.Debug(logSender, "", "Startup hook executed, elapsed: %s, error: %v", time.Since(startTime), err)
- return nil
- }
- func (c *Configuration) executePostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
- startNewHook()
- defer hookEnded()
- ipAddr := util.GetIPFromRemoteAddress(remoteAddr)
- connDuration := int64(time.Since(connectionTime) / time.Millisecond)
- if strings.HasPrefix(c.PostDisconnectHook, "http") {
- var url *url.URL
- url, err := url.Parse(c.PostDisconnectHook)
- if err != nil {
- logger.Warn(protocol, connID, "Invalid post disconnect hook %q: %v", c.PostDisconnectHook, err)
- return
- }
- q := url.Query()
- q.Add("ip", ipAddr)
- q.Add("protocol", protocol)
- q.Add("username", username)
- q.Add("connection_duration", strconv.FormatInt(connDuration, 10))
- url.RawQuery = q.Encode()
- startTime := time.Now()
- resp, err := httpclient.RetryableGet(url.String())
- respCode := 0
- if err == nil {
- respCode = resp.StatusCode
- resp.Body.Close()
- }
- logger.Debug(protocol, connID, "Post disconnect hook response code: %v, elapsed: %v, err: %v",
- respCode, time.Since(startTime), err)
- return
- }
- if !filepath.IsAbs(c.PostDisconnectHook) {
- logger.Debug(protocol, connID, "invalid post disconnect hook %q", c.PostDisconnectHook)
- return
- }
- timeout, env, args := command.GetConfig(c.PostDisconnectHook, command.HookPostDisconnect)
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- startTime := time.Now()
- cmd := exec.CommandContext(ctx, c.PostDisconnectHook, args...)
- cmd.Env = append(env,
- fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
- fmt.Sprintf("SFTPGO_CONNECTION_USERNAME=%s", username),
- fmt.Sprintf("SFTPGO_CONNECTION_DURATION=%d", connDuration),
- fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
- err := cmd.Run()
- logger.Debug(protocol, connID, "Post disconnect hook executed, elapsed: %s error: %v", time.Since(startTime), err)
- }
- func (c *Configuration) checkPostDisconnectHook(remoteAddr, protocol, username, connID string, connectionTime time.Time) {
- if c.PostDisconnectHook == "" {
- return
- }
- if !util.Contains(disconnHookProtocols, protocol) {
- return
- }
- go c.executePostDisconnectHook(remoteAddr, protocol, username, connID, connectionTime)
- }
- // ExecutePostConnectHook executes the post connect hook if defined
- func (c *Configuration) ExecutePostConnectHook(ipAddr, protocol string) error {
- if c.PostConnectHook == "" {
- return nil
- }
- if strings.HasPrefix(c.PostConnectHook, "http") {
- var url *url.URL
- url, err := url.Parse(c.PostConnectHook)
- if err != nil {
- logger.Warn(protocol, "", "Login from ip %q denied, invalid post connect hook %q: %v",
- ipAddr, c.PostConnectHook, err)
- return getPermissionDeniedError(protocol)
- }
- q := url.Query()
- q.Add("ip", ipAddr)
- q.Add("protocol", protocol)
- url.RawQuery = q.Encode()
- resp, err := httpclient.RetryableGet(url.String())
- if err != nil {
- logger.Warn(protocol, "", "Login from ip %q denied, error executing post connect hook: %v", ipAddr, err)
- return getPermissionDeniedError(protocol)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- logger.Warn(protocol, "", "Login from ip %q denied, post connect hook response code: %v", ipAddr, resp.StatusCode)
- return getPermissionDeniedError(protocol)
- }
- return nil
- }
- if !filepath.IsAbs(c.PostConnectHook) {
- err := fmt.Errorf("invalid post connect hook %q", c.PostConnectHook)
- logger.Warn(protocol, "", "Login from ip %q denied: %v", ipAddr, err)
- return getPermissionDeniedError(protocol)
- }
- timeout, env, args := command.GetConfig(c.PostConnectHook, command.HookPostConnect)
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- cmd := exec.CommandContext(ctx, c.PostConnectHook, args...)
- cmd.Env = append(env,
- fmt.Sprintf("SFTPGO_CONNECTION_IP=%s", ipAddr),
- fmt.Sprintf("SFTPGO_CONNECTION_PROTOCOL=%s", protocol))
- err := cmd.Run()
- if err != nil {
- logger.Warn(protocol, "", "Login from ip %q denied, connect hook error: %v", ipAddr, err)
- return getPermissionDeniedError(protocol)
- }
- return nil
- }
- // SSHConnection defines an ssh connection.
- // Each SSH connection can open several channels for SFTP or SSH commands
- type SSHConnection struct {
- id string
- conn net.Conn
- lastActivity atomic.Int64
- }
- // NewSSHConnection returns a new SSHConnection
- func NewSSHConnection(id string, conn net.Conn) *SSHConnection {
- c := &SSHConnection{
- id: id,
- conn: conn,
- }
- c.lastActivity.Store(time.Now().UnixNano())
- return c
- }
- // GetID returns the ID for this SSHConnection
- func (c *SSHConnection) GetID() string {
- return c.id
- }
- // UpdateLastActivity updates last activity for this connection
- func (c *SSHConnection) UpdateLastActivity() {
- c.lastActivity.Store(time.Now().UnixNano())
- }
- // GetLastActivity returns the last connection activity
- func (c *SSHConnection) GetLastActivity() time.Time {
- return time.Unix(0, c.lastActivity.Load())
- }
- // Close closes the underlying network connection
- func (c *SSHConnection) Close() error {
- return c.conn.Close()
- }
- // ActiveConnections holds the currect active connections with the associated transfers
- type ActiveConnections struct {
- // clients contains both authenticated and estabilished connections and the ones waiting
- // for authentication
- clients clientsMap
- transfersCheckStatus atomic.Bool
- sync.RWMutex
- connections []ActiveConnection
- mapping map[string]int
- sshConnections []*SSHConnection
- sshMapping map[string]int
- perUserConns map[string]int
- }
- // internal method, must be called within a locked block
- func (conns *ActiveConnections) addUserConnection(username string) {
- if username == "" {
- return
- }
- conns.perUserConns[username]++
- }
- // internal method, must be called within a locked block
- func (conns *ActiveConnections) removeUserConnection(username string) {
- if username == "" {
- return
- }
- if val, ok := conns.perUserConns[username]; ok {
- conns.perUserConns[username]--
- if val > 1 {
- return
- }
- delete(conns.perUserConns, username)
- }
- }
- // GetActiveSessions returns the number of active sessions for the given username.
- // We return the open sessions for any protocol
- func (conns *ActiveConnections) GetActiveSessions(username string) int {
- conns.RLock()
- defer conns.RUnlock()
- return conns.perUserConns[username]
- }
- // Add adds a new connection to the active ones
- func (conns *ActiveConnections) Add(c ActiveConnection) error {
- conns.Lock()
- defer conns.Unlock()
- if username := c.GetUsername(); username != "" {
- if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
- if val := conns.perUserConns[username]; val >= maxSessions {
- return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
- }
- }
- conns.addUserConnection(username)
- }
- conns.mapping[c.GetID()] = len(conns.connections)
- conns.connections = append(conns.connections, c)
- metric.UpdateActiveConnectionsSize(len(conns.connections))
- logger.Debug(c.GetProtocol(), c.GetID(), "connection added, local address %q, remote address %q, num open connections: %d",
- c.GetLocalAddress(), c.GetRemoteAddress(), len(conns.connections))
- return nil
- }
- // Swap replaces an existing connection with the given one.
- // This method is useful if you have to change some connection details
- // for example for FTP is used to update the connection once the user
- // authenticates
- func (conns *ActiveConnections) Swap(c ActiveConnection) error {
- conns.Lock()
- defer conns.Unlock()
- if idx, ok := conns.mapping[c.GetID()]; ok {
- conn := conns.connections[idx]
- conns.removeUserConnection(conn.GetUsername())
- if username := c.GetUsername(); username != "" {
- if maxSessions := c.GetMaxSessions(); maxSessions > 0 {
- if val, ok := conns.perUserConns[username]; ok && val >= maxSessions {
- conns.addUserConnection(conn.GetUsername())
- return fmt.Errorf("too many open sessions: %d/%d", val, maxSessions)
- }
- }
- conns.addUserConnection(username)
- }
- err := conn.CloseFS()
- conns.connections[idx] = c
- logger.Debug(logSender, c.GetID(), "connection swapped, close fs error: %v", err)
- conn = nil
- return nil
- }
- return errors.New("connection to swap not found")
- }
- // Remove removes a connection from the active ones
- func (conns *ActiveConnections) Remove(connectionID string) {
- conns.Lock()
- defer conns.Unlock()
- if idx, ok := conns.mapping[connectionID]; ok {
- conn := conns.connections[idx]
- err := conn.CloseFS()
- lastIdx := len(conns.connections) - 1
- conns.connections[idx] = conns.connections[lastIdx]
- conns.connections[lastIdx] = nil
- conns.connections = conns.connections[:lastIdx]
- delete(conns.mapping, connectionID)
- if idx != lastIdx {
- conns.mapping[conns.connections[idx].GetID()] = idx
- }
- conns.removeUserConnection(conn.GetUsername())
- metric.UpdateActiveConnectionsSize(lastIdx)
- logger.Debug(conn.GetProtocol(), conn.GetID(), "connection removed, local address %q, remote address %q close fs error: %v, num open connections: %d",
- conn.GetLocalAddress(), conn.GetRemoteAddress(), err, lastIdx)
- if conn.GetProtocol() == ProtocolFTP && conn.GetUsername() == "" && !util.Contains(ftpLoginCommands, conn.GetCommand()) {
- ip := util.GetIPFromRemoteAddress(conn.GetRemoteAddress())
- logger.ConnectionFailedLog("", ip, dataprovider.LoginMethodNoAuthTryed, conn.GetProtocol(),
- dataprovider.ErrNoAuthTryed.Error())
- metric.AddNoAuthTryed()
- AddDefenderEvent(ip, ProtocolFTP, HostEventNoLoginTried)
- dataprovider.ExecutePostLoginHook(&dataprovider.User{}, dataprovider.LoginMethodNoAuthTryed, ip,
- conn.GetProtocol(), dataprovider.ErrNoAuthTryed)
- }
- Config.checkPostDisconnectHook(conn.GetRemoteAddress(), conn.GetProtocol(), conn.GetUsername(),
- conn.GetID(), conn.GetConnectionTime())
- return
- }
- logger.Debug(logSender, "", "connection id %q to remove not found!", connectionID)
- }
- // Close closes an active connection.
- // It returns true on success
- func (conns *ActiveConnections) Close(connectionID, role string) bool {
- conns.RLock()
- var result bool
- if idx, ok := conns.mapping[connectionID]; ok {
- c := conns.connections[idx]
- if role == "" || c.GetRole() == role {
- defer func(conn ActiveConnection) {
- err := conn.Disconnect()
- logger.Debug(conn.GetProtocol(), conn.GetID(), "close connection requested, close err: %v", err)
- }(c)
- result = true
- }
- }
- conns.RUnlock()
- return result
- }
- // AddSSHConnection adds a new ssh connection to the active ones
- func (conns *ActiveConnections) AddSSHConnection(c *SSHConnection) {
- conns.Lock()
- defer conns.Unlock()
- conns.sshMapping[c.GetID()] = len(conns.sshConnections)
- conns.sshConnections = append(conns.sshConnections, c)
- logger.Debug(logSender, c.GetID(), "ssh connection added, num open connections: %d", len(conns.sshConnections))
- }
- // RemoveSSHConnection removes a connection from the active ones
- func (conns *ActiveConnections) RemoveSSHConnection(connectionID string) {
- conns.Lock()
- defer conns.Unlock()
- if idx, ok := conns.sshMapping[connectionID]; ok {
- lastIdx := len(conns.sshConnections) - 1
- conns.sshConnections[idx] = conns.sshConnections[lastIdx]
- conns.sshConnections[lastIdx] = nil
- conns.sshConnections = conns.sshConnections[:lastIdx]
- delete(conns.sshMapping, connectionID)
- if idx != lastIdx {
- conns.sshMapping[conns.sshConnections[idx].GetID()] = idx
- }
- logger.Debug(logSender, connectionID, "ssh connection removed, num open ssh connections: %d", lastIdx)
- return
- }
- logger.Warn(logSender, "", "ssh connection to remove with id %q not found!", connectionID)
- }
- func (conns *ActiveConnections) checkIdles() {
- conns.RLock()
- for _, sshConn := range conns.sshConnections {
- idleTime := time.Since(sshConn.GetLastActivity())
- if idleTime > Config.idleTimeoutAsDuration {
- // we close an SSH connection if it has no active connections associated
- idToMatch := fmt.Sprintf("_%s_", sshConn.GetID())
- toClose := true
- for _, conn := range conns.connections {
- if strings.Contains(conn.GetID(), idToMatch) {
- if time.Since(conn.GetLastActivity()) <= Config.idleTimeoutAsDuration {
- toClose = false
- break
- }
- }
- }
- if toClose {
- defer func(c *SSHConnection) {
- err := c.Close()
- logger.Debug(logSender, c.GetID(), "close idle SSH connection, idle time: %v, close err: %v",
- time.Since(c.GetLastActivity()), err)
- }(sshConn)
- }
- }
- }
- for _, c := range conns.connections {
- idleTime := time.Since(c.GetLastActivity())
- isUnauthenticatedFTPUser := (c.GetProtocol() == ProtocolFTP && c.GetUsername() == "")
- if idleTime > Config.idleTimeoutAsDuration || (isUnauthenticatedFTPUser && idleTime > Config.idleLoginTimeout) {
- defer func(conn ActiveConnection) {
- err := conn.Disconnect()
- logger.Debug(conn.GetProtocol(), conn.GetID(), "close idle connection, idle time: %v, username: %q close err: %v",
- time.Since(conn.GetLastActivity()), conn.GetUsername(), err)
- }(c)
- }
- }
- conns.RUnlock()
- }
- func (conns *ActiveConnections) checkTransfers() {
- if conns.transfersCheckStatus.Load() {
- logger.Warn(logSender, "", "the previous transfer check is still running, skipping execution")
- return
- }
- conns.transfersCheckStatus.Store(true)
- defer conns.transfersCheckStatus.Store(false)
- conns.RLock()
- if len(conns.connections) < 2 {
- conns.RUnlock()
- return
- }
- var wg sync.WaitGroup
- logger.Debug(logSender, "", "start concurrent transfers check")
- // update the current size for transfers to monitors
- for _, c := range conns.connections {
- for _, t := range c.GetTransfers() {
- if t.HasSizeLimit {
- wg.Add(1)
- go func(transfer ConnectionTransfer, connID string) {
- defer wg.Done()
- transfersChecker.UpdateTransferCurrentSizes(transfer.ULSize, transfer.DLSize, transfer.ID, connID)
- }(t, c.GetID())
- }
- }
- }
- conns.RUnlock()
- logger.Debug(logSender, "", "waiting for the update of the transfers current size")
- wg.Wait()
- logger.Debug(logSender, "", "getting overquota transfers")
- overquotaTransfers := transfersChecker.GetOverquotaTransfers()
- logger.Debug(logSender, "", "number of overquota transfers: %v", len(overquotaTransfers))
- if len(overquotaTransfers) == 0 {
- return
- }
- conns.RLock()
- defer conns.RUnlock()
- for _, c := range conns.connections {
- for _, overquotaTransfer := range overquotaTransfers {
- if c.GetID() == overquotaTransfer.ConnID {
- logger.Info(logSender, c.GetID(), "user %q is overquota, try to close transfer id %v",
- c.GetUsername(), overquotaTransfer.TransferID)
- var err error
- if overquotaTransfer.TransferType == TransferDownload {
- err = getReadQuotaExceededError(c.GetProtocol())
- } else {
- err = getQuotaExceededError(c.GetProtocol())
- }
- c.SignalTransferClose(overquotaTransfer.TransferID, err)
- }
- }
- }
- logger.Debug(logSender, "", "transfers check completed")
- }
- // AddClientConnection stores a new client connection
- func (conns *ActiveConnections) AddClientConnection(ipAddr string) {
- conns.clients.add(ipAddr)
- }
- // RemoveClientConnection removes a disconnected client from the tracked ones
- func (conns *ActiveConnections) RemoveClientConnection(ipAddr string) {
- conns.clients.remove(ipAddr)
- }
- // GetClientConnections returns the total number of client connections
- func (conns *ActiveConnections) GetClientConnections() int32 {
- return conns.clients.getTotal()
- }
- // IsNewConnectionAllowed returns an error if the maximum number of concurrent allowed
- // connections is exceeded or a whitelist is defined and the specified ipAddr is not listed
- // or the service is shutting down
- func (conns *ActiveConnections) IsNewConnectionAllowed(ipAddr, protocol string) error {
- if isShuttingDown.Load() {
- return ErrShuttingDown
- }
- if Config.allowList != nil {
- isListed, _, err := Config.allowList.IsListed(ipAddr, protocol)
- if err != nil {
- logger.Error(logSender, "", "unable to query allow list, connection denied, ip %q, protocol %s, err: %v",
- ipAddr, protocol, err)
- return ErrConnectionDenied
- }
- if !isListed {
- return ErrConnectionDenied
- }
- }
- if Config.MaxTotalConnections == 0 && Config.MaxPerHostConnections == 0 {
- return nil
- }
- if Config.MaxPerHostConnections > 0 {
- if total := conns.clients.getTotalFrom(ipAddr); total > Config.MaxPerHostConnections {
- logger.Info(logSender, "", "active connections from %s %d/%d", ipAddr, total, Config.MaxPerHostConnections)
- AddDefenderEvent(ipAddr, protocol, HostEventLimitExceeded)
- return ErrConnectionDenied
- }
- }
- if Config.MaxTotalConnections > 0 {
- if total := conns.clients.getTotal(); total > int32(Config.MaxTotalConnections) {
- logger.Info(logSender, "", "active client connections %d/%d", total, Config.MaxTotalConnections)
- return ErrConnectionDenied
- }
- // on a single SFTP connection we could have multiple SFTP channels or commands
- // so we check the estabilished connections too
- conns.RLock()
- defer conns.RUnlock()
- if sess := len(conns.connections); sess >= Config.MaxTotalConnections {
- logger.Info(logSender, "", "active client sessions %d/%d", sess, Config.MaxTotalConnections)
- return ErrConnectionDenied
- }
- }
- return nil
- }
- // GetStats returns stats for active connections
- func (conns *ActiveConnections) GetStats(role string) []ConnectionStatus {
- conns.RLock()
- defer conns.RUnlock()
- stats := make([]ConnectionStatus, 0, len(conns.connections))
- node := dataprovider.GetNodeName()
- for _, c := range conns.connections {
- if role == "" || c.GetRole() == role {
- stat := ConnectionStatus{
- Username: c.GetUsername(),
- ConnectionID: c.GetID(),
- ClientVersion: c.GetClientVersion(),
- RemoteAddress: c.GetRemoteAddress(),
- ConnectionTime: util.GetTimeAsMsSinceEpoch(c.GetConnectionTime()),
- LastActivity: util.GetTimeAsMsSinceEpoch(c.GetLastActivity()),
- Protocol: c.GetProtocol(),
- Command: c.GetCommand(),
- Transfers: c.GetTransfers(),
- Node: node,
- }
- stats = append(stats, stat)
- }
- }
- return stats
- }
- // ConnectionStatus returns the status for an active connection
- type ConnectionStatus struct {
- // Logged in username
- Username string `json:"username"`
- // Unique identifier for the connection
- ConnectionID string `json:"connection_id"`
- // client's version string
- ClientVersion string `json:"client_version,omitempty"`
- // Remote address for this connection
- RemoteAddress string `json:"remote_address"`
- // Connection time as unix timestamp in milliseconds
- ConnectionTime int64 `json:"connection_time"`
- // Last activity as unix timestamp in milliseconds
- LastActivity int64 `json:"last_activity"`
- // Protocol for this connection
- Protocol string `json:"protocol"`
- // active uploads/downloads
- Transfers []ConnectionTransfer `json:"active_transfers,omitempty"`
- // SSH command or WebDAV method
- Command string `json:"command,omitempty"`
- // Node identifier, omitted for single node installations
- Node string `json:"node,omitempty"`
- }
- // GetConnectionDuration returns the connection duration as string
- func (c *ConnectionStatus) GetConnectionDuration() string {
- elapsed := time.Since(util.GetTimeFromMsecSinceEpoch(c.ConnectionTime))
- return util.GetDurationAsString(elapsed)
- }
- // GetConnectionInfo returns connection info.
- // Protocol,Client Version and RemoteAddress are returned.
- func (c *ConnectionStatus) GetConnectionInfo() string {
- var result strings.Builder
- result.WriteString(fmt.Sprintf("%v. Client: %q From: %q", c.Protocol, c.ClientVersion, c.RemoteAddress))
- if c.Command == "" {
- return result.String()
- }
- switch c.Protocol {
- case ProtocolSSH, ProtocolFTP:
- result.WriteString(fmt.Sprintf(". Command: %q", c.Command))
- case ProtocolWebDAV:
- result.WriteString(fmt.Sprintf(". Method: %q", c.Command))
- }
- return result.String()
- }
- // GetTransfersAsString returns the active transfers as string
- func (c *ConnectionStatus) GetTransfersAsString() string {
- result := ""
- for _, t := range c.Transfers {
- if result != "" {
- result += ". "
- }
- result += t.getConnectionTransferAsString()
- }
- return result
- }
- // ActiveQuotaScan defines an active quota scan for a user
- type ActiveQuotaScan struct {
- // Username to which the quota scan refers
- Username string `json:"username"`
- // quota scan start time as unix timestamp in milliseconds
- StartTime int64 `json:"start_time"`
- Role string `json:"-"`
- }
- // ActiveVirtualFolderQuotaScan defines an active quota scan for a virtual folder
- type ActiveVirtualFolderQuotaScan struct {
- // folder name to which the quota scan refers
- Name string `json:"name"`
- // quota scan start time as unix timestamp in milliseconds
- StartTime int64 `json:"start_time"`
- }
- // ActiveScans holds the active quota scans
- type ActiveScans struct {
- sync.RWMutex
- UserScans []ActiveQuotaScan
- FolderScans []ActiveVirtualFolderQuotaScan
- }
- // GetUsersQuotaScans returns the active users quota scans
- func (s *ActiveScans) GetUsersQuotaScans(role string) []ActiveQuotaScan {
- s.RLock()
- defer s.RUnlock()
- scans := make([]ActiveQuotaScan, 0, len(s.UserScans))
- for _, scan := range s.UserScans {
- if role == "" || role == scan.Role {
- scans = append(scans, ActiveQuotaScan{
- Username: scan.Username,
- StartTime: scan.StartTime,
- })
- }
- }
- return scans
- }
- // AddUserQuotaScan adds a user to the ones with active quota scans.
- // Returns false if the user has a quota scan already running
- func (s *ActiveScans) AddUserQuotaScan(username, role string) bool {
- s.Lock()
- defer s.Unlock()
- for _, scan := range s.UserScans {
- if scan.Username == username {
- return false
- }
- }
- s.UserScans = append(s.UserScans, ActiveQuotaScan{
- Username: username,
- StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
- Role: role,
- })
- return true
- }
- // RemoveUserQuotaScan removes a user from the ones with active quota scans.
- // Returns false if the user has no active quota scans
- func (s *ActiveScans) RemoveUserQuotaScan(username string) bool {
- s.Lock()
- defer s.Unlock()
- for idx, scan := range s.UserScans {
- if scan.Username == username {
- lastIdx := len(s.UserScans) - 1
- s.UserScans[idx] = s.UserScans[lastIdx]
- s.UserScans = s.UserScans[:lastIdx]
- return true
- }
- }
- return false
- }
- // GetVFoldersQuotaScans returns the active quota scans for virtual folders
- func (s *ActiveScans) GetVFoldersQuotaScans() []ActiveVirtualFolderQuotaScan {
- s.RLock()
- defer s.RUnlock()
- scans := make([]ActiveVirtualFolderQuotaScan, len(s.FolderScans))
- copy(scans, s.FolderScans)
- return scans
- }
- // AddVFolderQuotaScan adds a virtual folder to the ones with active quota scans.
- // Returns false if the folder has a quota scan already running
- func (s *ActiveScans) AddVFolderQuotaScan(folderName string) bool {
- s.Lock()
- defer s.Unlock()
- for _, scan := range s.FolderScans {
- if scan.Name == folderName {
- return false
- }
- }
- s.FolderScans = append(s.FolderScans, ActiveVirtualFolderQuotaScan{
- Name: folderName,
- StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
- })
- return true
- }
- // RemoveVFolderQuotaScan removes a folder from the ones with active quota scans.
- // Returns false if the folder has no active quota scans
- func (s *ActiveScans) RemoveVFolderQuotaScan(folderName string) bool {
- s.Lock()
- defer s.Unlock()
- for idx, scan := range s.FolderScans {
- if scan.Name == folderName {
- lastIdx := len(s.FolderScans) - 1
- s.FolderScans[idx] = s.FolderScans[lastIdx]
- s.FolderScans = s.FolderScans[:lastIdx]
- return true
- }
- }
- return false
- }
- // MetadataCheck defines an active metadata check
- type MetadataCheck struct {
- // Username to which the metadata check refers
- Username string `json:"username"`
- // check start time as unix timestamp in milliseconds
- StartTime int64 `json:"start_time"`
- Role string `json:"-"`
- }
- // MetadataChecks holds the active metadata checks
- type MetadataChecks struct {
- sync.RWMutex
- checks []MetadataCheck
- }
- // Get returns the active metadata checks
- func (c *MetadataChecks) Get(role string) []MetadataCheck {
- c.RLock()
- defer c.RUnlock()
- checks := make([]MetadataCheck, 0, len(c.checks))
- for _, check := range c.checks {
- if role == "" || role == check.Role {
- checks = append(checks, MetadataCheck{
- Username: check.Username,
- StartTime: check.StartTime,
- })
- }
- }
- return checks
- }
- // Add adds a user to the ones with active metadata checks.
- // Return false if a metadata check is already active for the specified user
- func (c *MetadataChecks) Add(username, role string) bool {
- c.Lock()
- defer c.Unlock()
- for idx := range c.checks {
- if c.checks[idx].Username == username {
- return false
- }
- }
- c.checks = append(c.checks, MetadataCheck{
- Username: username,
- StartTime: util.GetTimeAsMsSinceEpoch(time.Now()),
- Role: role,
- })
- return true
- }
- // Remove removes a user from the ones with active metadata checks
- func (c *MetadataChecks) Remove(username string) bool {
- c.Lock()
- defer c.Unlock()
- for idx := range c.checks {
- if c.checks[idx].Username == username {
- lastIdx := len(c.checks) - 1
- c.checks[idx] = c.checks[lastIdx]
- c.checks = c.checks[:lastIdx]
- return true
- }
- }
- return false
- }
|