Przeglądaj źródła

data provider: update internal caches if the data provider is shared

Nicola Murino 3 lat temu
rodzic
commit
bcf088f586

+ 2 - 0
config/config.go

@@ -252,6 +252,7 @@ func Init() {
 			SkipNaturalKeysValidation: false,
 			DelayedQuotaUpdate:        0,
 			CreateDefaultAdmin:        false,
+			IsShared:                  0,
 		},
 		HTTPDConfig: httpd.Conf{
 			Bindings:           []httpd.Binding{defaultHTTPDBinding},
@@ -1065,6 +1066,7 @@ func setViperDefaults() {
 	viper.SetDefault("data_provider.skip_natural_keys_validation", globalConf.ProviderConf.SkipNaturalKeysValidation)
 	viper.SetDefault("data_provider.delayed_quota_update", globalConf.ProviderConf.DelayedQuotaUpdate)
 	viper.SetDefault("data_provider.create_default_admin", globalConf.ProviderConf.CreateDefaultAdmin)
+	viper.SetDefault("data_provider.is_shared", globalConf.ProviderConf.IsShared)
 	viper.SetDefault("httpd.templates_path", globalConf.HTTPDConfig.TemplatesPath)
 	viper.SetDefault("httpd.static_files_path", globalConf.HTTPDConfig.StaticFilesPath)
 	viper.SetDefault("httpd.backups_path", globalConf.HTTPDConfig.BackupsPath)

+ 3 - 0
config/config_test.go

@@ -844,6 +844,7 @@ func TestConfigFromEnv(t *testing.T) {
 	os.Setenv("SFTPGO_WEBDAVD__BINDINGS__0__PORT", "12000")
 	os.Setenv("SFTPGO_DATA_PROVIDER__PASSWORD_HASHING__ARGON2_OPTIONS__ITERATIONS", "41")
 	os.Setenv("SFTPGO_DATA_PROVIDER__POOL_SIZE", "10")
+	os.Setenv("SFTPGO_DATA_PROVIDER__IS_SHARED", "1")
 	os.Setenv("SFTPGO_DATA_PROVIDER__ACTIONS__EXECUTE_ON", "add")
 	os.Setenv("SFTPGO_KMS__SECRETS__URL", "local")
 	os.Setenv("SFTPGO_KMS__SECRETS__MASTER_KEY_PATH", "path")
@@ -853,6 +854,7 @@ func TestConfigFromEnv(t *testing.T) {
 		os.Unsetenv("SFTPGO_WEBDAVD__BINDINGS__0__PORT")
 		os.Unsetenv("SFTPGO_DATA_PROVIDER__PASSWORD_HASHING__ARGON2_OPTIONS__ITERATIONS")
 		os.Unsetenv("SFTPGO_DATA_PROVIDER__POOL_SIZE")
+		os.Unsetenv("SFTPGO_DATA_PROVIDER__IS_SHARED")
 		os.Unsetenv("SFTPGO_DATA_PROVIDER__ACTIONS__EXECUTE_ON")
 		os.Unsetenv("SFTPGO_KMS__SECRETS__URL")
 		os.Unsetenv("SFTPGO_KMS__SECRETS__MASTER_KEY_PATH")
@@ -866,6 +868,7 @@ func TestConfigFromEnv(t *testing.T) {
 	dataProviderConf := config.GetProviderConf()
 	assert.Equal(t, uint32(41), dataProviderConf.PasswordHashing.Argon2Options.Iterations)
 	assert.Equal(t, 10, dataProviderConf.PoolSize)
+	assert.Equal(t, 1, dataProviderConf.IsShared)
 	assert.Len(t, dataProviderConf.Actions.ExecuteOn, 1)
 	assert.Contains(t, dataProviderConf.Actions.ExecuteOn, "add")
 	kmsConfig := config.GetKMSConfig()

+ 5 - 0
dataprovider/bolt.go

@@ -670,6 +670,11 @@ func (p *BoltProvider) dumpUsers() ([]User, error) {
 	return users, err
 }
 
+// bolt provider cannot be shared, so we always return no recently updated users
+func (p *BoltProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) {
+	return nil, nil
+}
+
 func (p *BoltProvider) getUsers(limit int, offset int, order string) ([]User, error) {
 	users := make([]User, 0, limit)
 	var err error

+ 7 - 0
dataprovider/cacheduser.go

@@ -6,6 +6,7 @@ import (
 
 	"golang.org/x/net/webdav"
 
+	"github.com/drakkan/sftpgo/v2/logger"
 	"github.com/drakkan/sftpgo/v2/util"
 )
 
@@ -67,16 +68,22 @@ func (cache *usersCache) swap(user *User) {
 
 	if cachedUser, ok := cache.users[user.Username]; ok {
 		if cachedUser.User.Password != user.Password {
+			providerLog(logger.LevelDebug, "current password different from the cached one for user %#v, removing from cache",
+				user.Username)
 			// the password changed, the cached user is no longer valid
 			delete(cache.users, user.Username)
 			return
 		}
 		if cachedUser.User.isFsEqual(user) {
 			// the updated user has the same fs as the cached one, we can preserve the lock filesystem
+			providerLog(logger.LevelDebug, "current password and fs unchanged for for user %#v, swap cached one",
+				user.Username)
 			cachedUser.User = *user
 			cache.users[user.Username] = cachedUser
 		} else {
 			// filesystem changed, the cached user is no longer valid
+			providerLog(logger.LevelDebug, "current fs different from the cached one for user %#v, removing from cache",
+				user.Username)
 			delete(cache.users, user.Username)
 		}
 	}

+ 60 - 0
dataprovider/dataprovider.go

@@ -133,9 +133,13 @@ var (
 	pbkdfPwdPrefixes        = []string{pbkdf2SHA1Prefix, pbkdf2SHA256Prefix, pbkdf2SHA512Prefix, pbkdf2SHA256B64SaltPrefix}
 	pbkdfPwdB64SaltPrefixes = []string{pbkdf2SHA256B64SaltPrefix}
 	unixPwdPrefixes         = []string{md5cryptPwdPrefix, md5cryptApr1PwdPrefix, sha512cryptPwdPrefix}
+	sharedProviders         = []string{PGSQLDataProviderName, MySQLDataProviderName, CockroachDataProviderName}
 	logSender               = "dataProvider"
 	availabilityTicker      *time.Ticker
 	availabilityTickerDone  chan bool
+	updateCachesTicker      *time.Ticker
+	updateCachesTickerDone  chan bool
+	lastCachesUpdate        int64
 	credentialsDirPath      string
 	sqlTableUsers           = "users"
 	sqlTableFolders         = "folders"
@@ -337,6 +341,12 @@ type Config struct {
 	// on first start.
 	// You can also create the first admin user by using the web interface or by loading initial data.
 	CreateDefaultAdmin bool `json:"create_default_admin" mapstructure:"create_default_admin"`
+	// If the data provider is shared across multiple SFTPGo instances, set this parameter to 1.
+	// MySQL, PostgreSQL and CockroachDB can be shared, this setting is ignored for other data
+	// providers. For shared data providers, SFTPGo periodically reloads the latest updated users,
+	// based on the "updated_at" field, and updates its internal caches if users are updated from
+	// a different instance. This check, if enabled, is executed every 10 minutes
+	IsShared int `json:"is_shared" mapstructure:"is_shared"`
 }
 
 // BackupData defines the structure for the backup/restore files
@@ -391,6 +401,7 @@ type Provider interface {
 	deleteUser(user *User) error
 	getUsers(limit int, offset int, order string) ([]User, error)
 	dumpUsers() ([]User, error)
+	getRecentlyUpdatedUsers(after int64) ([]User, error)
 	updateLastLogin(username string) error
 	updateAdminLastLogin(username string) error
 	setUpdatedAt(username string)
@@ -484,6 +495,7 @@ func Initialize(cnf Config, basePath string, checkAdmins bool) error {
 	}
 	atomic.StoreInt32(&isAdminCreated, int32(len(admins)))
 	startAvailabilityTimer()
+	startUpdateCachesTimer()
 	delayedQuotaUpdater.start()
 	return nil
 }
@@ -1133,6 +1145,11 @@ func Close() error {
 		availabilityTickerDone <- true
 		availabilityTicker = nil
 	}
+	if updateCachesTicker != nil {
+		updateCachesTicker.Stop()
+		updateCachesTickerDone <- true
+		updateCachesTicker = nil
+	}
 	return provider.close()
 }
 
@@ -1861,6 +1878,49 @@ func getSSLMode() string {
 	return ""
 }
 
+func checkCacheUpdates() {
+	providerLog(logger.LevelDebug, "start caches check, update time %v", util.GetTimeFromMsecSinceEpoch(lastCachesUpdate))
+	checkTime := util.GetTimeAsMsSinceEpoch(time.Now())
+	users, err := provider.getRecentlyUpdatedUsers(lastCachesUpdate)
+	if err != nil {
+		providerLog(logger.LevelWarn, "unable to get recently updated users: %v", err)
+		return
+	}
+	for _, user := range users {
+		providerLog(logger.LevelDebug, "invalidate caches for user %#v", user.Username)
+		webDAVUsersCache.swap(&user)
+		cachedPasswords.Remove(user.Username)
+	}
+
+	lastCachesUpdate = checkTime
+	providerLog(logger.LevelDebug, "end caches check, new update time %v", util.GetTimeFromMsecSinceEpoch(lastCachesUpdate))
+}
+
+func startUpdateCachesTimer() {
+	if config.IsShared == 0 {
+		return
+	}
+	if !util.IsStringInSlice(config.Driver, sharedProviders) {
+		providerLog(logger.LevelWarn, "update caches not supported for provider %v", config.Driver)
+		return
+	}
+	lastCachesUpdate = util.GetTimeAsMsSinceEpoch(time.Now())
+	providerLog(logger.LevelDebug, "update caches check started for provider %v", config.Driver)
+	updateCachesTicker = time.NewTicker(1 * time.Minute)
+	updateCachesTickerDone = make(chan bool)
+
+	go func() {
+		for {
+			select {
+			case <-updateCachesTickerDone:
+				return
+			case <-updateCachesTicker.C:
+				checkCacheUpdates()
+			}
+		}
+	}()
+}
+
 func startAvailabilityTimer() {
 	availabilityTicker = time.NewTicker(30 * time.Second)
 	availabilityTickerDone = make(chan bool)

+ 5 - 0
dataprovider/memory.go

@@ -364,6 +364,11 @@ func (p *MemoryProvider) dumpFolders() ([]vfs.BaseVirtualFolder, error) {
 	return folders, nil
 }
 
+// memory provider cannot be shared, so we always return no recently updated users
+func (p *MemoryProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) {
+	return nil, nil
+}
+
 func (p *MemoryProvider) getUsers(limit int, offset int, order string) ([]User, error) {
 	users := make([]User, 0, limit)
 	var err error

+ 4 - 0
dataprovider/mysql.go

@@ -166,6 +166,10 @@ func (p *MySQLProvider) dumpUsers() ([]User, error) {
 	return sqlCommonDumpUsers(p.dbHandle)
 }
 
+func (p *MySQLProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) {
+	return sqlCommonGetRecentlyUpdatedUsers(after, p.dbHandle)
+}
+
 func (p *MySQLProvider) getUsers(limit int, offset int, order string) ([]User, error) {
 	return sqlCommonGetUsers(limit, offset, order, p.dbHandle)
 }

+ 4 - 0
dataprovider/pgsql.go

@@ -179,6 +179,10 @@ func (p *PGSQLProvider) dumpUsers() ([]User, error) {
 	return sqlCommonDumpUsers(p.dbHandle)
 }
 
+func (p *PGSQLProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) {
+	return sqlCommonGetRecentlyUpdatedUsers(after, p.dbHandle)
+}
+
 func (p *PGSQLProvider) getUsers(limit int, offset int, order string) ([]User, error) {
 	return sqlCommonGetUsers(limit, offset, order, p.dbHandle)
 }

+ 30 - 0
dataprovider/sqlcommon.go

@@ -677,6 +677,36 @@ func sqlCommonDumpUsers(dbHandle sqlQuerier) ([]User, error) {
 	return getUsersWithVirtualFolders(ctx, users, dbHandle)
 }
 
+func sqlCommonGetRecentlyUpdatedUsers(after int64, dbHandle sqlQuerier) ([]User, error) {
+	users := make([]User, 0, 10)
+	ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
+	defer cancel()
+	q := getRecentlyUpdatedUsersQuery()
+	stmt, err := dbHandle.PrepareContext(ctx, q)
+	if err != nil {
+		providerLog(logger.LevelWarn, "error preparing database query %#v: %v", q, err)
+		return nil, err
+	}
+	defer stmt.Close()
+
+	rows, err := stmt.QueryContext(ctx, after)
+	if err == nil {
+		defer rows.Close()
+		for rows.Next() {
+			u, err := getUserFromDbRow(rows)
+			if err != nil {
+				return users, err
+			}
+			users = append(users, u)
+		}
+	}
+	err = rows.Err()
+	if err != nil {
+		return users, err
+	}
+	return getUsersWithVirtualFolders(ctx, users, dbHandle)
+}
+
 func sqlCommonGetUsers(limit int, offset int, order string, dbHandle sqlQuerier) ([]User, error) {
 	users := make([]User, 0, limit)
 	ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)

+ 5 - 0
dataprovider/sqlite.go

@@ -162,6 +162,11 @@ func (p *SQLiteProvider) dumpUsers() ([]User, error) {
 	return sqlCommonDumpUsers(p.dbHandle)
 }
 
+// SQLite provider cannot be shared, so we always return no recently updated users
+func (p *SQLiteProvider) getRecentlyUpdatedUsers(after int64) ([]User, error) {
+	return nil, nil
+}
+
 func (p *SQLiteProvider) getUsers(limit int, offset int, order string) ([]User, error) {
 	return sqlCommonGetUsers(limit, offset, order, p.dbHandle)
 }

+ 4 - 0
dataprovider/sqlqueries.go

@@ -140,6 +140,10 @@ func getUsersQuery(order string) string {
 		order, sqlPlaceholders[0], sqlPlaceholders[1])
 }
 
+func getRecentlyUpdatedUsersQuery() string {
+	return fmt.Sprintf(`SELECT %v FROM %v WHERE updated_at >= %v`, selectUserFields, sqlTableUsers, sqlPlaceholders[0])
+}
+
 func getDumpUsersQuery() string {
 	return fmt.Sprintf(`SELECT %v FROM %v`, selectUserFields, sqlTableUsers)
 }

+ 1 - 0
docs/full-configuration.md

@@ -202,6 +202,7 @@ The configuration file contains the following sections:
   - `update_mode`, integer. Defines how the database will be initialized/updated. 0 means automatically. 1 means manually using the initprovider sub-command.
   - `skip_natural_keys_validation`, boolean. If `true` you can use any UTF-8 character for natural keys as username, admin name, folder name. These keys are used in URIs for REST API and Web admin. If `false` only unreserved URI characters are allowed: ALPHA / DIGIT / "-" / "." / "_" / "~". Default: `false`.
   - `create_default_admin`, boolean. If enabled, a default admin user with username `admin` and password `password` will be created on first start. The default values can be overridden using the environment variables `SFTPGO_DEFAULT_ADMIN_USERNAME` and `SFTPGO_DEFAULT_ADMIN_PASSWORD`. You can also create the first admin user by using the web interface or by loading initial data. Default `false`.
+  - `is_shared`, integer. If the data provider is shared across multiple SFTPGo instances, set this parameter to `1`. `MySQL`, `PostgreSQL` and `CockroachDB` can be shared, this setting is ignored for other data providers. For shared data providers, SFTPGo periodically reloads the latest updated users, based on the `updated_at` field, and updates its internal caches if users are updated from a different instance. This check, if enabled, is executed every 10 minutes. Default: `0`.
 - **"httpd"**, the configuration for the HTTP server used to serve REST API and to expose the built-in web interface
   - `bindings`, list of structs. Each struct has the following fields:
     - `port`, integer. The port used for serving HTTP requests. Default: 8080.

+ 2 - 1
sftpgo.json

@@ -190,7 +190,8 @@
     "password_caching": true,
     "update_mode": 0,
     "skip_natural_keys_validation": false,
-    "create_default_admin": false
+    "create_default_admin": false,
+    "is_shared": 0
   },
   "httpd": {
     "bindings": [