From bcf088f586a7b8fb85d8dc1445c7289be1e22f8c Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Fri, 20 Aug 2021 09:35:06 +0200 Subject: [PATCH] data provider: update internal caches if the data provider is shared --- config/config.go | 2 ++ config/config_test.go | 3 ++ dataprovider/bolt.go | 5 +++ dataprovider/cacheduser.go | 7 +++++ dataprovider/dataprovider.go | 60 ++++++++++++++++++++++++++++++++++++ dataprovider/memory.go | 5 +++ dataprovider/mysql.go | 4 +++ dataprovider/pgsql.go | 4 +++ dataprovider/sqlcommon.go | 30 ++++++++++++++++++ dataprovider/sqlite.go | 5 +++ dataprovider/sqlqueries.go | 4 +++ docs/full-configuration.md | 1 + sftpgo.json | 3 +- 13 files changed, 132 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 6899044c..a32d208b 100644 --- a/config/config.go +++ b/config/config.go @@ -252,6 +252,7 @@ func Init() { SkipNaturalKeysValidation: false, DelayedQuotaUpdate: 0, CreateDefaultAdmin: false, + IsShared: 0, }, HTTPDConfig: httpd.Conf{ Bindings: []httpd.Binding{defaultHTTPDBinding}, @@ -1065,6 +1066,7 @@ func setViperDefaults() { viper.SetDefault("data_provider.skip_natural_keys_validation", globalConf.ProviderConf.SkipNaturalKeysValidation) viper.SetDefault("data_provider.delayed_quota_update", globalConf.ProviderConf.DelayedQuotaUpdate) viper.SetDefault("data_provider.create_default_admin", globalConf.ProviderConf.CreateDefaultAdmin) + viper.SetDefault("data_provider.is_shared", globalConf.ProviderConf.IsShared) viper.SetDefault("httpd.templates_path", globalConf.HTTPDConfig.TemplatesPath) viper.SetDefault("httpd.static_files_path", globalConf.HTTPDConfig.StaticFilesPath) viper.SetDefault("httpd.backups_path", globalConf.HTTPDConfig.BackupsPath) diff --git a/config/config_test.go b/config/config_test.go index c67a7456..06da40db 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -844,6 +844,7 @@ func TestConfigFromEnv(t *testing.T) { os.Setenv("SFTPGO_WEBDAVD__BINDINGS__0__PORT", "12000") os.Setenv("SFTPGO_DATA_PROVIDER__PASSWORD_HASHING__ARGON2_OPTIONS__ITERATIONS", "41") os.Setenv("SFTPGO_DATA_PROVIDER__POOL_SIZE", "10") + os.Setenv("SFTPGO_DATA_PROVIDER__IS_SHARED", "1") os.Setenv("SFTPGO_DATA_PROVIDER__ACTIONS__EXECUTE_ON", "add") os.Setenv("SFTPGO_KMS__SECRETS__URL", "local") os.Setenv("SFTPGO_KMS__SECRETS__MASTER_KEY_PATH", "path") @@ -853,6 +854,7 @@ func TestConfigFromEnv(t *testing.T) { os.Unsetenv("SFTPGO_WEBDAVD__BINDINGS__0__PORT") os.Unsetenv("SFTPGO_DATA_PROVIDER__PASSWORD_HASHING__ARGON2_OPTIONS__ITERATIONS") os.Unsetenv("SFTPGO_DATA_PROVIDER__POOL_SIZE") + os.Unsetenv("SFTPGO_DATA_PROVIDER__IS_SHARED") os.Unsetenv("SFTPGO_DATA_PROVIDER__ACTIONS__EXECUTE_ON") os.Unsetenv("SFTPGO_KMS__SECRETS__URL") os.Unsetenv("SFTPGO_KMS__SECRETS__MASTER_KEY_PATH") @@ -866,6 +868,7 @@ func TestConfigFromEnv(t *testing.T) { dataProviderConf := config.GetProviderConf() assert.Equal(t, uint32(41), dataProviderConf.PasswordHashing.Argon2Options.Iterations) assert.Equal(t, 10, dataProviderConf.PoolSize) + assert.Equal(t, 1, dataProviderConf.IsShared) assert.Len(t, dataProviderConf.Actions.ExecuteOn, 1) assert.Contains(t, dataProviderConf.Actions.ExecuteOn, "add") kmsConfig := config.GetKMSConfig() diff --git a/dataprovider/bolt.go b/dataprovider/bolt.go index 11cbfaf7..0f3c095f 100644 --- a/dataprovider/bolt.go +++ b/dataprovider/bolt.go @@ -670,6 +670,11 @@ func (p *BoltProvider) dumpUsers() ([]User, error) { return users, err } +// bolt provider cannot be shared, so we always return no recently updated users +func (p *BoltProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) { + return nil, nil +} + func (p *BoltProvider) getUsers(limit int, offset int, order string) ([]User, error) { users := make([]User, 0, limit) var err error diff --git a/dataprovider/cacheduser.go b/dataprovider/cacheduser.go index 6e5a97f6..c5bf4877 100644 --- a/dataprovider/cacheduser.go +++ b/dataprovider/cacheduser.go @@ -6,6 +6,7 @@ import ( "golang.org/x/net/webdav" + "github.com/drakkan/sftpgo/v2/logger" "github.com/drakkan/sftpgo/v2/util" ) @@ -67,16 +68,22 @@ func (cache *usersCache) swap(user *User) { if cachedUser, ok := cache.users[user.Username]; ok { if cachedUser.User.Password != user.Password { + providerLog(logger.LevelDebug, "current password different from the cached one for user %#v, removing from cache", + user.Username) // the password changed, the cached user is no longer valid delete(cache.users, user.Username) return } if cachedUser.User.isFsEqual(user) { // the updated user has the same fs as the cached one, we can preserve the lock filesystem + providerLog(logger.LevelDebug, "current password and fs unchanged for for user %#v, swap cached one", + user.Username) cachedUser.User = *user cache.users[user.Username] = cachedUser } else { // filesystem changed, the cached user is no longer valid + providerLog(logger.LevelDebug, "current fs different from the cached one for user %#v, removing from cache", + user.Username) delete(cache.users, user.Username) } } diff --git a/dataprovider/dataprovider.go b/dataprovider/dataprovider.go index 74a4ece6..7b9b334a 100644 --- a/dataprovider/dataprovider.go +++ b/dataprovider/dataprovider.go @@ -133,9 +133,13 @@ var ( pbkdfPwdPrefixes = []string{pbkdf2SHA1Prefix, pbkdf2SHA256Prefix, pbkdf2SHA512Prefix, pbkdf2SHA256B64SaltPrefix} pbkdfPwdB64SaltPrefixes = []string{pbkdf2SHA256B64SaltPrefix} unixPwdPrefixes = []string{md5cryptPwdPrefix, md5cryptApr1PwdPrefix, sha512cryptPwdPrefix} + sharedProviders = []string{PGSQLDataProviderName, MySQLDataProviderName, CockroachDataProviderName} logSender = "dataProvider" availabilityTicker *time.Ticker availabilityTickerDone chan bool + updateCachesTicker *time.Ticker + updateCachesTickerDone chan bool + lastCachesUpdate int64 credentialsDirPath string sqlTableUsers = "users" sqlTableFolders = "folders" @@ -337,6 +341,12 @@ type Config struct { // on first start. // You can also create the first admin user by using the web interface or by loading initial data. CreateDefaultAdmin bool `json:"create_default_admin" mapstructure:"create_default_admin"` + // If the data provider is shared across multiple SFTPGo instances, set this parameter to 1. + // MySQL, PostgreSQL and CockroachDB can be shared, this setting is ignored for other data + // providers. For shared data providers, SFTPGo periodically reloads the latest updated users, + // based on the "updated_at" field, and updates its internal caches if users are updated from + // a different instance. This check, if enabled, is executed every 10 minutes + IsShared int `json:"is_shared" mapstructure:"is_shared"` } // BackupData defines the structure for the backup/restore files @@ -391,6 +401,7 @@ type Provider interface { deleteUser(user *User) error getUsers(limit int, offset int, order string) ([]User, error) dumpUsers() ([]User, error) + getRecentlyUpdatedUsers(after int64) ([]User, error) updateLastLogin(username string) error updateAdminLastLogin(username string) error setUpdatedAt(username string) @@ -484,6 +495,7 @@ func Initialize(cnf Config, basePath string, checkAdmins bool) error { } atomic.StoreInt32(&isAdminCreated, int32(len(admins))) startAvailabilityTimer() + startUpdateCachesTimer() delayedQuotaUpdater.start() return nil } @@ -1133,6 +1145,11 @@ func Close() error { availabilityTickerDone <- true availabilityTicker = nil } + if updateCachesTicker != nil { + updateCachesTicker.Stop() + updateCachesTickerDone <- true + updateCachesTicker = nil + } return provider.close() } @@ -1861,6 +1878,49 @@ func getSSLMode() string { return "" } +func checkCacheUpdates() { + providerLog(logger.LevelDebug, "start caches check, update time %v", util.GetTimeFromMsecSinceEpoch(lastCachesUpdate)) + checkTime := util.GetTimeAsMsSinceEpoch(time.Now()) + users, err := provider.getRecentlyUpdatedUsers(lastCachesUpdate) + if err != nil { + providerLog(logger.LevelWarn, "unable to get recently updated users: %v", err) + return + } + for _, user := range users { + providerLog(logger.LevelDebug, "invalidate caches for user %#v", user.Username) + webDAVUsersCache.swap(&user) + cachedPasswords.Remove(user.Username) + } + + lastCachesUpdate = checkTime + providerLog(logger.LevelDebug, "end caches check, new update time %v", util.GetTimeFromMsecSinceEpoch(lastCachesUpdate)) +} + +func startUpdateCachesTimer() { + if config.IsShared == 0 { + return + } + if !util.IsStringInSlice(config.Driver, sharedProviders) { + providerLog(logger.LevelWarn, "update caches not supported for provider %v", config.Driver) + return + } + lastCachesUpdate = util.GetTimeAsMsSinceEpoch(time.Now()) + providerLog(logger.LevelDebug, "update caches check started for provider %v", config.Driver) + updateCachesTicker = time.NewTicker(1 * time.Minute) + updateCachesTickerDone = make(chan bool) + + go func() { + for { + select { + case <-updateCachesTickerDone: + return + case <-updateCachesTicker.C: + checkCacheUpdates() + } + } + }() +} + func startAvailabilityTimer() { availabilityTicker = time.NewTicker(30 * time.Second) availabilityTickerDone = make(chan bool) diff --git a/dataprovider/memory.go b/dataprovider/memory.go index b949a906..6dee59cf 100644 --- a/dataprovider/memory.go +++ b/dataprovider/memory.go @@ -364,6 +364,11 @@ func (p *MemoryProvider) dumpFolders() ([]vfs.BaseVirtualFolder, error) { return folders, nil } +// memory provider cannot be shared, so we always return no recently updated users +func (p *MemoryProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) { + return nil, nil +} + func (p *MemoryProvider) getUsers(limit int, offset int, order string) ([]User, error) { users := make([]User, 0, limit) var err error diff --git a/dataprovider/mysql.go b/dataprovider/mysql.go index 8ecc035f..24ceb99e 100644 --- a/dataprovider/mysql.go +++ b/dataprovider/mysql.go @@ -166,6 +166,10 @@ func (p *MySQLProvider) dumpUsers() ([]User, error) { return sqlCommonDumpUsers(p.dbHandle) } +func (p *MySQLProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) { + return sqlCommonGetRecentlyUpdatedUsers(after, p.dbHandle) +} + func (p *MySQLProvider) getUsers(limit int, offset int, order string) ([]User, error) { return sqlCommonGetUsers(limit, offset, order, p.dbHandle) } diff --git a/dataprovider/pgsql.go b/dataprovider/pgsql.go index fbbf3da6..44e98536 100644 --- a/dataprovider/pgsql.go +++ b/dataprovider/pgsql.go @@ -179,6 +179,10 @@ func (p *PGSQLProvider) dumpUsers() ([]User, error) { return sqlCommonDumpUsers(p.dbHandle) } +func (p *PGSQLProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) { + return sqlCommonGetRecentlyUpdatedUsers(after, p.dbHandle) +} + func (p *PGSQLProvider) getUsers(limit int, offset int, order string) ([]User, error) { return sqlCommonGetUsers(limit, offset, order, p.dbHandle) } diff --git a/dataprovider/sqlcommon.go b/dataprovider/sqlcommon.go index f5f93243..d936faaf 100644 --- a/dataprovider/sqlcommon.go +++ b/dataprovider/sqlcommon.go @@ -677,6 +677,36 @@ func sqlCommonDumpUsers(dbHandle sqlQuerier) ([]User, error) { return getUsersWithVirtualFolders(ctx, users, dbHandle) } +func sqlCommonGetRecentlyUpdatedUsers(after int64, dbHandle sqlQuerier) ([]User, error) { + users := make([]User, 0, 10) + ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout) + defer cancel() + q := getRecentlyUpdatedUsersQuery() + stmt, err := dbHandle.PrepareContext(ctx, q) + if err != nil { + providerLog(logger.LevelWarn, "error preparing database query %#v: %v", q, err) + return nil, err + } + defer stmt.Close() + + rows, err := stmt.QueryContext(ctx, after) + if err == nil { + defer rows.Close() + for rows.Next() { + u, err := getUserFromDbRow(rows) + if err != nil { + return users, err + } + users = append(users, u) + } + } + err = rows.Err() + if err != nil { + return users, err + } + return getUsersWithVirtualFolders(ctx, users, dbHandle) +} + func sqlCommonGetUsers(limit int, offset int, order string, dbHandle sqlQuerier) ([]User, error) { users := make([]User, 0, limit) ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout) diff --git a/dataprovider/sqlite.go b/dataprovider/sqlite.go index f131e16c..23e89ccf 100644 --- a/dataprovider/sqlite.go +++ b/dataprovider/sqlite.go @@ -162,6 +162,11 @@ func (p *SQLiteProvider) dumpUsers() ([]User, error) { return sqlCommonDumpUsers(p.dbHandle) } +// SQLite provider cannot be shared, so we always return no recently updated users +func (p *SQLiteProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) { + return nil, nil +} + func (p *SQLiteProvider) getUsers(limit int, offset int, order string) ([]User, error) { return sqlCommonGetUsers(limit, offset, order, p.dbHandle) } diff --git a/dataprovider/sqlqueries.go b/dataprovider/sqlqueries.go index 42824f2a..8310f98b 100644 --- a/dataprovider/sqlqueries.go +++ b/dataprovider/sqlqueries.go @@ -140,6 +140,10 @@ func getUsersQuery(order string) string { order, sqlPlaceholders[0], sqlPlaceholders[1]) } +func getRecentlyUpdatedUsersQuery() string { + return fmt.Sprintf(`SELECT %v FROM %v WHERE updated_at >= %v`, selectUserFields, sqlTableUsers, sqlPlaceholders[0]) +} + func getDumpUsersQuery() string { return fmt.Sprintf(`SELECT %v FROM %v`, selectUserFields, sqlTableUsers) } diff --git a/docs/full-configuration.md b/docs/full-configuration.md index 8a9ebabd..a1693c09 100644 --- a/docs/full-configuration.md +++ b/docs/full-configuration.md @@ -202,6 +202,7 @@ The configuration file contains the following sections: - `update_mode`, integer. Defines how the database will be initialized/updated. 0 means automatically. 1 means manually using the initprovider sub-command. - `skip_natural_keys_validation`, boolean. If `true` you can use any UTF-8 character for natural keys as username, admin name, folder name. These keys are used in URIs for REST API and Web admin. If `false` only unreserved URI characters are allowed: ALPHA / DIGIT / "-" / "." / "_" / "~". Default: `false`. - `create_default_admin`, boolean. If enabled, a default admin user with username `admin` and password `password` will be created on first start. The default values can be overridden using the environment variables `SFTPGO_DEFAULT_ADMIN_USERNAME` and `SFTPGO_DEFAULT_ADMIN_PASSWORD`. You can also create the first admin user by using the web interface or by loading initial data. Default `false`. + - `is_shared`, integer. If the data provider is shared across multiple SFTPGo instances, set this parameter to `1`. `MySQL`, `PostgreSQL` and `CockroachDB` can be shared, this setting is ignored for other data providers. For shared data providers, SFTPGo periodically reloads the latest updated users, based on the `updated_at` field, and updates its internal caches if users are updated from a different instance. This check, if enabled, is executed every 10 minutes. Default: `0`. - **"httpd"**, the configuration for the HTTP server used to serve REST API and to expose the built-in web interface - `bindings`, list of structs. Each struct has the following fields: - `port`, integer. The port used for serving HTTP requests. Default: 8080. diff --git a/sftpgo.json b/sftpgo.json index 74a516f3..347b01c5 100644 --- a/sftpgo.json +++ b/sftpgo.json @@ -190,7 +190,8 @@ "password_caching": true, "update_mode": 0, "skip_natural_keys_validation": false, - "create_default_admin": false + "create_default_admin": false, + "is_shared": 0 }, "httpd": { "bindings": [