add Azure Blob support

This commit is contained in:
Nicola Murino 2020-10-25 08:18:48 +01:00
parent db7e81e9d0
commit 5ff8f75917
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB
31 changed files with 1502 additions and 157 deletions

View file

@ -7,7 +7,7 @@
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
Fully featured and highly configurable SFTP server with optional FTP/S and WebDAV support, written in Go.
It can serve local filesystem, S3 or Google Cloud Storage.
It can serve local filesystem, S3 (compatible) Object Storage, Google Cloud Storage and Azure Blob Storage.
## Features
@ -173,6 +173,10 @@ Each user can be mapped to the whole bucket or to a bucket virtual folder. This
Each user can be mapped with a Google Cloud Storage bucket or a bucket virtual folder. This way, the mapped bucket/virtual folder is exposed over SFTP/SCP/FTP/WebDAV. More information about Google Cloud Storage integration can be found [here](./docs/google-cloud-storage.md).
### Azure Blob Storage backend
Each user can be mapped with an Azure Blob Storage container or a container virtual folder. This way, the mapped container/virtual folder is exposed over SFTP/SCP/FTP/WebDAV. More information about Azure Blob Storage integration can be found [here](./docs/azure-blob-storage.md).
### Other Storage backends
Adding new storage backends is quite easy:

View file

@ -84,6 +84,13 @@ func newActionNotification(
endpoint = user.FsConfig.S3Config.Endpoint
} else if user.FsConfig.Provider == dataprovider.GCSFilesystemProvider {
bucket = user.FsConfig.GCSConfig.Bucket
} else if user.FsConfig.Provider == dataprovider.AzureBlobFilesystemProvider {
bucket = user.FsConfig.AzBlobConfig.Container
if user.FsConfig.AzBlobConfig.SASURL != "" {
endpoint = user.FsConfig.AzBlobConfig.SASURL
} else {
endpoint = user.FsConfig.AzBlobConfig.Endpoint
}
}
if err == ErrQuotaExceeded {

View file

@ -28,6 +28,11 @@ func TestNewActionNotification(t *testing.T) {
user.FsConfig.GCSConfig = vfs.GCSFsConfig{
Bucket: "gcsbucket",
}
user.FsConfig.AzBlobConfig = vfs.AzBlobFsConfig{
Container: "azcontainer",
SASURL: "azsasurl",
Endpoint: "azendpoint",
}
a := newActionNotification(user, operationDownload, "path", "target", "", ProtocolSFTP, 123, errors.New("fake error"))
assert.Equal(t, user.Username, a.Username)
assert.Equal(t, 0, len(a.Bucket))
@ -45,6 +50,18 @@ func TestNewActionNotification(t *testing.T) {
assert.Equal(t, "gcsbucket", a.Bucket)
assert.Equal(t, 0, len(a.Endpoint))
assert.Equal(t, 2, a.Status)
user.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
a = newActionNotification(user, operationDownload, "path", "target", "", ProtocolSCP, 123, nil)
assert.Equal(t, "azcontainer", a.Bucket)
assert.Equal(t, "azsasurl", a.Endpoint)
assert.Equal(t, 1, a.Status)
user.FsConfig.AzBlobConfig.SASURL = ""
a = newActionNotification(user, operationDownload, "path", "target", "", ProtocolSCP, 123, nil)
assert.Equal(t, "azcontainer", a.Bucket)
assert.Equal(t, "azendpoint", a.Endpoint)
assert.Equal(t, 1, a.Status)
}
func TestActionHTTP(t *testing.T) {

View file

@ -995,7 +995,7 @@ func validateFilesystemConfig(user *User) error {
if err != nil {
return &ValidationError{err: fmt.Sprintf("could not validate s3config: %v", err)}
}
if len(user.FsConfig.S3Config.AccessSecret) > 0 {
if user.FsConfig.S3Config.AccessSecret != "" {
vals := strings.Split(user.FsConfig.S3Config.AccessSecret, "$")
if !strings.HasPrefix(user.FsConfig.S3Config.AccessSecret, "$aes$") || len(vals) != 4 {
accessSecret, err := utils.EncryptData(user.FsConfig.S3Config.AccessSecret)
@ -1012,10 +1012,27 @@ func validateFilesystemConfig(user *User) error {
return &ValidationError{err: fmt.Sprintf("could not validate GCS config: %v", err)}
}
return nil
} else if user.FsConfig.Provider == AzureBlobFilesystemProvider {
err := vfs.ValidateAzBlobFsConfig(&user.FsConfig.AzBlobConfig)
if err != nil {
return &ValidationError{err: fmt.Sprintf("could not validate Azure Blob config: %v", err)}
}
if user.FsConfig.AzBlobConfig.AccountKey != "" {
vals := strings.Split(user.FsConfig.AzBlobConfig.AccountKey, "$")
if !strings.HasPrefix(user.FsConfig.AzBlobConfig.AccountKey, "$aes$") || len(vals) != 4 {
accountKey, err := utils.EncryptData(user.FsConfig.AzBlobConfig.AccountKey)
if err != nil {
return &ValidationError{err: fmt.Sprintf("could not encrypt Azure blob account key: %v", err)}
}
user.FsConfig.AzBlobConfig.AccountKey = accountKey
}
}
return nil
}
user.FsConfig.Provider = LocalFilesystemProvider
user.FsConfig.S3Config = vfs.S3FsConfig{}
user.FsConfig.GCSConfig = vfs.GCSFsConfig{}
user.FsConfig.AzBlobConfig = vfs.AzBlobFsConfig{}
return nil
}
@ -1248,6 +1265,8 @@ func HideUserSensitiveData(user *User) User {
user.FsConfig.S3Config.AccessSecret = utils.RemoveDecryptionKey(user.FsConfig.S3Config.AccessSecret)
} else if user.FsConfig.Provider == GCSFilesystemProvider {
user.FsConfig.GCSConfig.Credentials = nil
} else if user.FsConfig.Provider == AzureBlobFilesystemProvider {
user.FsConfig.AzBlobConfig.AccountKey = utils.RemoveDecryptionKey(user.FsConfig.AzBlobConfig.AccountKey)
}
return *user
}

View file

@ -124,16 +124,18 @@ type FilesystemProvider int
// supported values for FilesystemProvider
const (
LocalFilesystemProvider FilesystemProvider = iota // Local
S3FilesystemProvider // Amazon S3 compatible
GCSFilesystemProvider // Google Cloud Storage
LocalFilesystemProvider FilesystemProvider = iota // Local
S3FilesystemProvider // Amazon S3 compatible
GCSFilesystemProvider // Google Cloud Storage
AzureBlobFilesystemProvider // Azure Blob Storage
)
// Filesystem defines cloud storage filesystem details
type Filesystem struct {
Provider FilesystemProvider `json:"provider"`
S3Config vfs.S3FsConfig `json:"s3config,omitempty"`
GCSConfig vfs.GCSFsConfig `json:"gcsconfig,omitempty"`
Provider FilesystemProvider `json:"provider"`
S3Config vfs.S3FsConfig `json:"s3config,omitempty"`
GCSConfig vfs.GCSFsConfig `json:"gcsconfig,omitempty"`
AzBlobConfig vfs.AzBlobFsConfig `json:"azblobconfig,omitempty"`
}
// User defines a SFTPGo user
@ -196,6 +198,8 @@ func (u *User) GetFilesystem(connectionID string) (vfs.Fs, error) {
config := u.FsConfig.GCSConfig
config.CredentialFile = u.getGCSCredentialsFilePath()
return vfs.NewGCSFs(connectionID, u.GetHomeDir(), config)
} else if u.FsConfig.Provider == AzureBlobFilesystemProvider {
return vfs.NewAzBlobFs(connectionID, u.GetHomeDir(), u.FsConfig.AzBlobConfig)
}
return vfs.NewOsFs(connectionID, u.GetHomeDir(), u.VirtualFolders), nil
}
@ -626,6 +630,8 @@ func (u *User) GetInfoString() string {
result += "Storage: S3 "
} else if u.FsConfig.Provider == GCSFilesystemProvider {
result += "Storage: GCS "
} else if u.FsConfig.Provider == AzureBlobFilesystemProvider {
result += "Storage: Azure "
}
if len(u.PublicKeys) > 0 {
result += fmt.Sprintf("Public keys: %v ", len(u.PublicKeys))
@ -725,6 +731,17 @@ func (u *User) getACopy() User {
StorageClass: u.FsConfig.GCSConfig.StorageClass,
KeyPrefix: u.FsConfig.GCSConfig.KeyPrefix,
},
AzBlobConfig: vfs.AzBlobFsConfig{
Container: u.FsConfig.AzBlobConfig.Container,
AccountName: u.FsConfig.AzBlobConfig.AccountName,
AccountKey: u.FsConfig.AzBlobConfig.AccountKey,
Endpoint: u.FsConfig.AzBlobConfig.Endpoint,
SASURL: u.FsConfig.AzBlobConfig.SASURL,
KeyPrefix: u.FsConfig.AzBlobConfig.KeyPrefix,
UploadPartSize: u.FsConfig.AzBlobConfig.UploadPartSize,
UploadConcurrency: u.FsConfig.AzBlobConfig.UploadConcurrency,
UseEmulator: u.FsConfig.AzBlobConfig.UseEmulator,
},
}
return User{

View file

@ -45,7 +45,7 @@ For each account, the following properties can be configured:
- `allowed_extensions`, list of, case insensitive, allowed files extension. Shell like expansion is not supported so you have to specify `.jpg` and not `*.jpg`. Any file that does not end with this suffix will be denied
- `denied_extensions`, list of, case insensitive, denied files extension. Denied file extensions are evaluated before the allowed ones
- `path`, SFTP/SCP path, if no other specific filter is defined, the filter apply for sub directories too. For example if filters are defined for the paths `/` and `/sub` then the filters for `/` are applied for any file outside the `/sub` directory
- `fs_provider`, filesystem to serve via SFTP. Local filesystem and S3 Compatible Object Storage are supported
- `fs_provider`, filesystem to serve via SFTP. Local filesystem, S3 Compatible Object Storage, Google Cloud Storage and Azure Blob Storage are supported
- `s3_bucket`, required for S3 filesystem
- `s3_region`, required for S3 filesystem. Must match the region for your bucket. You can find here the list of available [AWS regions](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions). For example if your bucket is at `Frankfurt` you have to set the region to `eu-central-1`
- `s3_access_key`
@ -60,6 +60,15 @@ For each account, the following properties can be configured:
- `gcs_automatic_credentials`, integer. Set to 1 to use Application Default Credentials strategy or set to 0 to use explicit credentials via `gcs_credentials`
- `gcs_storage_class`
- `gcs_key_prefix`, allows to restrict access to the folder identified by this prefix and its contents
- `az_container`, Azure Blob Storage container
- `az_account_name`, Azure account name. leave blank to use SAS URL
- `az_account_key`, Azure account key. leave blank to use SAS URL. If provided it is stored encrypted (AES-256-GCM)
- `az_sas_url`, Azure shared access signature URL
- `az_endpoint`, Default is "blob.core.windows.net". If you use the emulator the endpoint must include the protocol, for example "http://127.0.0.1:10000"
- `az_upload_part_size`, the buffer size for multipart uploads (MB). Zero means the default (4 MB)
- `az_upload_concurrency`, how many parts are uploaded in parallel. Zero means the default (2)
- `az_key_prefix`, allows to restrict access to the folder identified by this prefix and its contents
- `az_use_emulator`, boolean
These properties are stored inside the data provider.

View file

@ -0,0 +1,20 @@
# Azure Blob Storage backend
To connect SFTPGo to Azure Blob Storage, you need to specify the access credentials. Azure Blob Storage has different options for credentials, we support:
1. Providing an account name and account key.
2. Providing a shared access signature (SAS).
If you authenticate using account and key you also need to specify a container. The endpoint can generally be left blank, the default is `blob.core.windows.net`.
If you provide a SAS URL the container is optional and if given it must match the one inside the shared access signature.
If you want to connect to an emulator such as [Azurite](https://github.com/Azure/Azurite) you need to provide the account name/key pair and an endpoint prefixed with the protocol, for example `http://127.0.0.1:10000`.
Specifying a different `key_prefix`, you can assign different "folders" of the same container to different users. This is similar to a chroot directory for local filesystem. Each SFTPGo user can only access the assigned folder and its contents. The folder identified by `key_prefix` does not need to be pre-created.
For multipart uploads you can customize the parts size and the upload concurrency. Please note that if the upload bandwidth between the client and SFTPGo is greater than the upload bandwidth between SFTPGo and the Azure Blob service then the client should wait for the last parts to be uploaded to Azure after finishing uploading the file to SFTPGo, and it may time out. Keep this in mind if you customize these parameters.
The configured container must exist.
This backend is very similar to the [S3](./s3.md) backend, and it has the same limitations.

View file

@ -14,6 +14,7 @@ The following build tags are available:
- `nogcs`, disable Google Cloud Storage backend, default enabled
- `nos3`, disable S3 Compabible Object Storage backends, default enabled
- `noazblob`, disable Azure Blob Storage backend, default enabled
- `nobolt`, disable Bolt data provider, default enabled
- `nomysql`, disable MySQL data provider, default enabled
- `nopgsql`, disable PostgreSQL data provider, default enabled

View file

@ -23,9 +23,9 @@ The external program can also read the following environment variables:
- `SFTPGO_ACTION_TARGET`, non-empty for `rename` `SFTPGO_ACTION`
- `SFTPGO_ACTION_SSH_CMD`, non-empty for `ssh_cmd` `SFTPGO_ACTION`
- `SFTPGO_ACTION_FILE_SIZE`, non-empty for `upload`, `download` and `delete` `SFTPGO_ACTION`
- `SFTPGO_ACTION_FS_PROVIDER`, `0` for local filesystem, `1` for S3 backend, `2` for Google Cloud Storage (GCS) backend
- `SFTPGO_ACTION_BUCKET`, non-empty for S3 and GCS backends
- `SFTPGO_ACTION_ENDPOINT`, non-empty for S3 backend if configured
- `SFTPGO_ACTION_FS_PROVIDER`, `0` for local filesystem, `1` for S3 backend, `2` for Google Cloud Storage (GCS) backend, `3` for Azure Blob Storage backend
- `SFTPGO_ACTION_BUCKET`, non-empty for S3, GCS and Azure backends
- `SFTPGO_ACTION_ENDPOINT`, non-empty for S3 and Azure backend if configured. For Azure this is the SAS URL, if configured otherwise the endpoint
- `SFTPGO_ACTION_STATUS`, integer. 0 means a generic error occurred. 1 means no error, 2 means quota exceeded error
- `SFTPGO_ACTION_PROTOCOL`, string. Possible values are `SSH`, `SFTP`, `SCP`, `FTP`, `DAV`
@ -40,9 +40,9 @@ If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. Th
- `target_path`, not null for `rename` action
- `ssh_cmd`, not null for `ssh_cmd` action
- `file_size`, not null for `upload`, `download`, `delete` actions
- `fs_provider`, `0` for local filesystem, `1` for S3 backend, `2` for Google Cloud Storage (GCS) backend
- `bucket`, not null for S3 and GCS backends
- `endpoint`, not null for S3 backend if configured
- `fs_provider`, `0` for local filesystem, `1` for S3 backend, `2` for Google Cloud Storage (GCS) backend, `3` for Azure Blob Storage backend
- `bucket`, not null for S3, GCS and Azure backends
- `endpoint`, not null for S3 and Azure backend if configured. For Azure this is the SAS URL, if configured otherwise the endpoint
- `status`, integer. 0 means a generic error occurred. 1 means no error, 2 means quota exceeded error
- `protocol`, string. Possible values are `SSH`, `FTP`, `DAV`

View file

@ -8,6 +8,4 @@ You can optionally specify a [storage class](https://cloud.google.com/storage/do
The configured bucket must exist.
Google Cloud Storage is exposed over HTTPS so if you are running SFTPGo as docker image please be sure to uncomment the line that installs `ca-certificates`, inside your `Dockerfile`, to be able to properly verify certificate authorities.
This backend is very similar to the [S3](./s3.md) backend, and it has the same limitations.

View file

@ -10,13 +10,11 @@ AWS SDK has different options for credentials. [More Detail](https://docs.aws.am
So, you need to provide access keys to activate option 1, or leave them blank to use the other ways to specify credentials.
Most S3 backends require HTTPS connections so if you are running SFTPGo as docker image please be sure to uncomment the line that installs `ca-certificates`, inside your `Dockerfile`, to be able to properly verify certificate authorities.
Specifying a different `key_prefix`, you can assign different "folders" of the same bucket to different users. This is similar to a chroot directory for local filesystem. Each SFTP/SCP user can only access the assigned folder and its contents. The folder identified by `key_prefix` does not need to be pre-created.
SFTPGo uses multipart uploads and parallel downloads for storing and retrieving files from S3.
For multipart uploads you can customize the parts size and the upload concurrency. Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out. Keep this in mind if you customize these parameters.
For multipart uploads you can customize the parts size and the upload concurrency. Please note that if the upload bandwidth between the client and SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then the client should wait for the last parts to be uploaded to S3 after finishing uploading the file to SFTPGo, and it may time out. Keep this in mind if you customize these parameters.
The configured bucket must exist.
@ -32,7 +30,7 @@ Some SFTP commands don't work over S3:
Other notes:
- `rename` is a two step operation: server-side copy and then deletion. So, it is not atomic as for local filesystem.
- We don't support renaming non empty directories since we should rename all the contents too and this could take a long time: think about directories with thousands of files; for each file we should do an AWS API call.
- We don't support renaming non empty directories since we should rename all the contents too and this could take a long time: think about directories with thousands of files: for each file we should do an AWS API call.
- For server side encryption, you have to configure the mapped bucket to automatically encrypt objects.
- A local home directory is still required to store temporary files.
- Clients that require advanced filesystem-like features such as `sshfs` are not supported.

View file

@ -82,7 +82,9 @@ class SFTPGoApiRequests:
s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='',
gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[],
denied_extensions=[], allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0,
max_upload_file_size=0, denied_protocols=[]):
max_upload_file_size=0, denied_protocols=[], az_container="", az_account_name="", az_account_key="",
az_sas_url="", az_endpoint="", az_upload_part_size=0, az_upload_concurrency=0, az_key_prefix="",
az_use_emulator=False):
user = {'id':user_id, 'username':username, 'uid':uid, 'gid':gid,
'max_sessions':max_sessions, 'quota_size':quota_size, 'quota_files':quota_files,
'upload_bandwidth':upload_bandwidth, 'download_bandwidth':download_bandwidth,
@ -106,7 +108,10 @@ class SFTPGoApiRequests:
user.update({'filesystem':self.buildFsConfig(fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret,
s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket,
gcs_key_prefix, gcs_storage_class, gcs_credentials_file,
gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency)})
gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency,
az_container, az_account_name, az_account_key, az_sas_url,
az_endpoint, az_upload_part_size, az_upload_concurrency, az_key_prefix,
az_use_emulator)})
return user
def buildVirtualFolders(self, vfolders):
@ -228,7 +233,9 @@ class SFTPGoApiRequests:
def buildFsConfig(self, fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint,
s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency):
gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency,
az_container, az_account_name, az_account_key, az_sas_url, az_endpoint, az_upload_part_size,
az_upload_concurrency, az_key_prefix, az_use_emulator):
fs_config = {'provider':0}
if fs_provider == 'S3':
s3config = {'bucket':s3_bucket, 'region':s3_region, 'access_key':s3_access_key, 'access_secret':
@ -246,6 +253,12 @@ class SFTPGoApiRequests:
gcsconfig.update({'credentials':base64.b64encode(creds.read().encode('UTF-8')).decode('UTF-8'),
'automatic_credentials':0})
fs_config.update({'provider':2, 'gcsconfig':gcsconfig})
elif fs_provider == "AzureBlob":
azureconfig = {"container":az_container, "account_name":az_account_name, "account_key":az_account_key,
"sas_url":az_sas_url, "endpoint":az_endpoint, "upload_part_size":az_upload_part_size,
"upload_concurrency":az_upload_concurrency, "key_prefix":az_key_prefix, "use_emulator":
az_use_emulator}
fs_config.update({'provider':3, 'azblobconfig':azureconfig})
return fs_config
def getUsers(self, limit=100, offset=0, order='ASC', username=''):
@ -263,13 +276,17 @@ class SFTPGoApiRequests:
s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='', s3_key_prefix='', gcs_bucket='',
gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='', gcs_automatic_credentials='automatic',
denied_login_methods=[], virtual_folders=[], denied_extensions=[], allowed_extensions=[],
s3_upload_part_size=0, s3_upload_concurrency=0, max_upload_file_size=0, denied_protocols=[]):
s3_upload_part_size=0, s3_upload_concurrency=0, max_upload_file_size=0, denied_protocols=[], az_container="",
az_account_name="", az_account_key="", az_sas_url="", az_endpoint="", az_upload_part_size=0,
az_upload_concurrency=0, az_key_prefix="", az_use_emulator=False):
u = self.buildUserObject(0, username, password, public_keys, home_dir, uid, gid, max_sessions,
quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth,
status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key,
s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions,
allowed_extensions, s3_upload_part_size, s3_upload_concurrency, max_upload_file_size, denied_protocols)
allowed_extensions, s3_upload_part_size, s3_upload_concurrency, max_upload_file_size, denied_protocols,
az_container, az_account_name, az_account_key, az_sas_url, az_endpoint, az_upload_part_size,
az_upload_concurrency, az_key_prefix, az_use_emulator)
r = requests.post(self.userPath, json=u, auth=self.auth, verify=self.verify)
self.printResponse(r)
@ -280,13 +297,17 @@ class SFTPGoApiRequests:
s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='',
gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[], denied_extensions=[],
allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0, max_upload_file_size=0,
denied_protocols=[], disconnect=0):
denied_protocols=[], disconnect=0, az_container="",
az_account_name="", az_account_key="", az_sas_url="", az_endpoint="", az_upload_part_size=0,
az_upload_concurrency=0, az_key_prefix="", az_use_emulator=False):
u = self.buildUserObject(user_id, username, password, public_keys, home_dir, uid, gid, max_sessions,
quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth,
status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key,
s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions,
allowed_extensions, s3_upload_part_size, s3_upload_concurrency, max_upload_file_size, denied_protocols)
allowed_extensions, s3_upload_part_size, s3_upload_concurrency, max_upload_file_size, denied_protocols,
az_container, az_account_name, az_account_key, az_sas_url, az_endpoint, az_upload_part_size,
az_upload_concurrency, az_key_prefix, az_use_emulator)
r = requests.put(urlparse.urljoin(self.userPath, 'user/' + str(user_id)), params={'disconnect':disconnect},
json=u, auth=self.auth, verify=self.verify)
self.printResponse(r)
@ -593,7 +614,7 @@ def addCommonUserArguments(parser):
parser.add_argument('--allowed-extensions', type=str, nargs='*', default=[], help='Allowed file extensions case insensitive. '
+'The format is /dir::ext1,ext2. For example: "/somedir::.jpg,.png" "/otherdir/subdir::.zip,.rar". ' +
'Default: %(default)s')
parser.add_argument('--fs', type=str, default='local', choices=['local', 'S3', 'GCS'],
parser.add_argument('--fs', type=str, default='local', choices=['local', 'S3', 'GCS', "AzureBlob"],
help='Filesystem provider. Default: %(default)s')
parser.add_argument('--s3-bucket', type=str, default='', help='Default: %(default)s')
parser.add_argument('--s3-key-prefix', type=str, default='', help='Virtual root directory. If non empty only this ' +
@ -616,6 +637,19 @@ def addCommonUserArguments(parser):
parser.add_argument('--gcs-credentials-file', type=str, default='', help='Default: %(default)s')
parser.add_argument('--gcs-automatic-credentials', type=str, default='automatic', choices=['explicit', 'automatic'],
help='If you provide a credentials file this argument will be setted to "explicit". Default: %(default)s')
parser.add_argument('--az-container', type=str, default='', help='Default: %(default)s')
parser.add_argument('--az-account-name', type=str, default='', help='Default: %(default)s')
parser.add_argument('--az-account-key', type=str, default='', help='Default: %(default)s')
parser.add_argument('--az-sas-url', type=str, default='', help='Shared access signature URL. Default: %(default)s')
parser.add_argument('--az-endpoint', type=str, default='', help='Default: %(default)s')
parser.add_argument('--az-upload-part-size', type=int, default=0, help='The buffer size for multipart uploads (MB). ' +
'Zero means the default (1 MB). Default: %(default)s')
parser.add_argument('--az-upload-concurrency', type=int, default=0, help='How many parts are uploaded in parallel. ' +
'Zero means the default (1). Default: %(default)s')
parser.add_argument('--az-key-prefix', type=str, default='', help='Virtual root directory. If non empty only this ' +
'directory and its contents will be available. Cannot start with "/". For example "folder/subfolder/".' +
' Default: %(default)s')
parser.add_argument('--az-use-emulator', type=bool, default=False, help='Default: %(default)s')
if __name__ == '__main__':
@ -769,7 +803,9 @@ if __name__ == '__main__':
args.s3_endpoint, args.s3_storage_class, args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix,
args.gcs_storage_class, args.gcs_credentials_file, args.gcs_automatic_credentials,
args.denied_login_methods, args.virtual_folders, args.denied_extensions, args.allowed_extensions,
args.s3_upload_part_size, args.s3_upload_concurrency, args.max_upload_file_size, args.denied_protocols)
args.s3_upload_part_size, args.s3_upload_concurrency, args.max_upload_file_size, args.denied_protocols,
args.az_container, args.az_account_name, args.az_account_key, args.az_sas_url, args.az_endpoint,
args.az_upload_part_size, args.az_upload_concurrency, args.az_key_prefix, args.az_use_emulator)
elif args.command == 'update-user':
api.updateUser(args.id, args.username, args.password, args.public_keys, args.home_dir, args.uid, args.gid,
args.max_sessions, args.quota_size, args.quota_files, args.permissions, args.upload_bandwidth,
@ -779,7 +815,9 @@ if __name__ == '__main__':
args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix, args.gcs_storage_class,
args.gcs_credentials_file, args.gcs_automatic_credentials, args.denied_login_methods,
args.virtual_folders, args.denied_extensions, args.allowed_extensions, args.s3_upload_part_size,
args.s3_upload_concurrency, args.max_upload_file_size, args.denied_protocols, args.disconnect)
args.s3_upload_concurrency, args.max_upload_file_size, args.denied_protocols, args.disconnect,
args.az_container, args.az_account_name, args.az_account_key, args.az_sas_url, args.az_endpoint,
args.az_upload_part_size, args.az_upload_concurrency, args.az_key_prefix, args.az_use_emulator)
elif args.command == 'delete-user':
api.deleteUser(args.id)
elif args.command == 'get-users':

View file

@ -45,38 +45,34 @@ func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter,
// Read reads the contents to downloads.
func (t *transfer) Read(p []byte) (n int, err error) {
t.Connection.UpdateLastActivity()
var readed int
var e error
readed, e = t.reader.Read(p)
atomic.AddInt64(&t.BytesSent, int64(readed))
n, err = t.reader.Read(p)
atomic.AddInt64(&t.BytesSent, int64(n))
if e != nil && e != io.EOF {
t.TransferError(e)
return readed, e
if err != nil && err != io.EOF {
t.TransferError(err)
return
}
t.HandleThrottle()
return readed, e
return
}
// Write writes the uploaded contents.
func (t *transfer) Write(p []byte) (n int, err error) {
t.Connection.UpdateLastActivity()
var written int
var e error
written, e = t.writer.Write(p)
atomic.AddInt64(&t.BytesReceived, int64(written))
n, err = t.writer.Write(p)
atomic.AddInt64(&t.BytesReceived, int64(n))
if t.MaxWriteSize > 0 && e == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
e = common.ErrQuotaExceeded
if t.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
err = common.ErrQuotaExceeded
}
if e != nil {
t.TransferError(e)
return written, e
if err != nil {
t.TransferError(err)
return
}
t.HandleThrottle()
return written, e
return
}
// Seek sets the offset to resume an upload or a download

1
go.mod
View file

@ -5,6 +5,7 @@ go 1.14
require (
cloud.google.com/go v0.69.1 // indirect
cloud.google.com/go/storage v1.12.0
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962
github.com/alexedwards/argon2id v0.0.0-20200802152012-2464efd3196b
github.com/aws/aws-sdk-go v1.35.9

18
go.sum
View file

@ -37,6 +37,20 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.12.0 h1:4y3gHptW1EHVtcPAVE0eBBlFuGqEejTTG3KdIE0lUX4=
cloud.google.com/go/storage v1.12.0/go.mod h1:fFLk2dp2oAhDz8QFKwqrjdJvxSp/W2g7nillojlL5Ho=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY=
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.3/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA=
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 h1:KeNholpO2xKjgaaSyd+DyQRrsQjhbSeS7qe4nEw8aQw=
@ -212,6 +226,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
@ -294,6 +310,8 @@ github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
github.com/magiconair/properties v1.8.4 h1:8KGKTcQQGm0Kv7vEbKFErAoAOFyyacLStRtQSeYtvkY=
github.com/magiconair/properties v1.8.4/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d h1:oNAwILwmgWKFpuU+dXvI6dl9jG2mAWAZLX3r9s0PPiw=
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=

View file

@ -118,9 +118,13 @@ func updateUser(w http.ResponseWriter, r *http.Request) {
}
currentPermissions := user.Permissions
currentS3AccessSecret := ""
currentAzAccountKey := ""
if user.FsConfig.Provider == dataprovider.S3FilesystemProvider {
currentS3AccessSecret = user.FsConfig.S3Config.AccessSecret
}
if user.FsConfig.Provider == dataprovider.AzureBlobFilesystemProvider {
currentAzAccountKey = user.FsConfig.AzBlobConfig.AccountKey
}
user.Permissions = make(map[string][]string)
err = render.DecodeJSON(r.Body, &user)
if err != nil {
@ -131,13 +135,8 @@ func updateUser(w http.ResponseWriter, r *http.Request) {
if len(user.Permissions) == 0 {
user.Permissions = currentPermissions
}
// we use the new access secret if different from the old one and not empty
if user.FsConfig.Provider == dataprovider.S3FilesystemProvider {
if utils.RemoveDecryptionKey(currentS3AccessSecret) == user.FsConfig.S3Config.AccessSecret ||
(len(user.FsConfig.S3Config.AccessSecret) == 0 && len(user.FsConfig.S3Config.AccessKey) > 0) {
user.FsConfig.S3Config.AccessSecret = currentS3AccessSecret
}
}
updateEncryptedSecrets(&user, currentS3AccessSecret, currentAzAccountKey)
if user.ID != userID {
sendAPIResponse(w, r, err, "user ID in request body does not match user ID in path parameter", http.StatusBadRequest)
return
@ -181,3 +180,19 @@ func disconnectUser(username string) {
}
}
}
func updateEncryptedSecrets(user *dataprovider.User, currentS3AccessSecret, currentAzAccountKey string) {
// we use the new access secret if different from the old one and not empty
if user.FsConfig.Provider == dataprovider.S3FilesystemProvider {
if utils.RemoveDecryptionKey(currentS3AccessSecret) == user.FsConfig.S3Config.AccessSecret ||
(user.FsConfig.S3Config.AccessSecret == "" && user.FsConfig.S3Config.AccessKey != "") {
user.FsConfig.S3Config.AccessSecret = currentS3AccessSecret
}
}
if user.FsConfig.Provider == dataprovider.AzureBlobFilesystemProvider {
if utils.RemoveDecryptionKey(currentAzAccountKey) == user.FsConfig.AzBlobConfig.AccountKey ||
(user.FsConfig.AzBlobConfig.AccountKey == "" && user.FsConfig.AzBlobConfig.AccountName != "") {
user.FsConfig.AzBlobConfig.AccountKey = currentAzAccountKey
}
}
}

View file

@ -620,6 +620,9 @@ func compareUserFsConfig(expected *dataprovider.User, actual *dataprovider.User)
if err := compareGCSConfig(expected, actual); err != nil {
return err
}
if err := compareAzBlobConfig(expected, actual); err != nil {
return err
}
return nil
}
@ -633,8 +636,8 @@ func compareS3Config(expected *dataprovider.User, actual *dataprovider.User) err
if expected.FsConfig.S3Config.AccessKey != actual.FsConfig.S3Config.AccessKey {
return errors.New("S3 access key mismatch")
}
if err := checkS3AccessSecret(expected.FsConfig.S3Config.AccessSecret, actual.FsConfig.S3Config.AccessSecret); err != nil {
return err
if err := checkEncryptedSecret(expected.FsConfig.S3Config.AccessSecret, actual.FsConfig.S3Config.AccessSecret); err != nil {
return fmt.Errorf("S3 access secret mismatch: %v", err)
}
if expected.FsConfig.S3Config.Endpoint != actual.FsConfig.S3Config.Endpoint {
return errors.New("S3 endpoint mismatch")
@ -672,29 +675,61 @@ func compareGCSConfig(expected *dataprovider.User, actual *dataprovider.User) er
return nil
}
func checkS3AccessSecret(expectedAccessSecret, actualAccessSecret string) error {
func compareAzBlobConfig(expected *dataprovider.User, actual *dataprovider.User) error {
if expected.FsConfig.AzBlobConfig.Container != actual.FsConfig.AzBlobConfig.Container {
return errors.New("Azure Blob container mismatch")
}
if expected.FsConfig.AzBlobConfig.AccountName != actual.FsConfig.AzBlobConfig.AccountName {
return errors.New("Azure Blob account name mismatch")
}
if err := checkEncryptedSecret(expected.FsConfig.AzBlobConfig.AccountKey, actual.FsConfig.AzBlobConfig.AccountKey); err != nil {
return fmt.Errorf("Azure Blob account key mismatch: %v", err)
}
if expected.FsConfig.AzBlobConfig.Endpoint != actual.FsConfig.AzBlobConfig.Endpoint {
return errors.New("Azure Blob endpoint mismatch")
}
if expected.FsConfig.AzBlobConfig.SASURL != actual.FsConfig.AzBlobConfig.SASURL {
return errors.New("Azure Blob SASL URL mismatch")
}
if expected.FsConfig.AzBlobConfig.UploadPartSize != actual.FsConfig.AzBlobConfig.UploadPartSize {
return errors.New("Azure Blob upload part size mismatch")
}
if expected.FsConfig.AzBlobConfig.UploadConcurrency != actual.FsConfig.AzBlobConfig.UploadConcurrency {
return errors.New("Azure Blob upload concurrency mismatch")
}
if expected.FsConfig.AzBlobConfig.KeyPrefix != actual.FsConfig.AzBlobConfig.KeyPrefix &&
expected.FsConfig.AzBlobConfig.KeyPrefix+"/" != actual.FsConfig.AzBlobConfig.KeyPrefix {
return errors.New("Azure Blob key prefix mismatch")
}
if expected.FsConfig.AzBlobConfig.UseEmulator != actual.FsConfig.AzBlobConfig.UseEmulator {
return errors.New("Azure Blob use emulator mismatch")
}
return nil
}
func checkEncryptedSecret(expectedAccessSecret, actualAccessSecret string) error {
if len(expectedAccessSecret) > 0 {
vals := strings.Split(expectedAccessSecret, "$")
if strings.HasPrefix(expectedAccessSecret, "$aes$") && len(vals) == 4 {
expectedAccessSecret = utils.RemoveDecryptionKey(expectedAccessSecret)
if expectedAccessSecret != actualAccessSecret {
return fmt.Errorf("S3 access secret mismatch, expected: %v", expectedAccessSecret)
return fmt.Errorf("secret mismatch, expected: %v", expectedAccessSecret)
}
} else {
// here we check that actualAccessSecret is aes encrypted without the nonce
parts := strings.Split(actualAccessSecret, "$")
if !strings.HasPrefix(actualAccessSecret, "$aes$") || len(parts) != 3 {
return errors.New("Invalid S3 access secret")
return errors.New("invalid secret")
}
if len(parts) == len(vals) {
if expectedAccessSecret != actualAccessSecret {
return errors.New("S3 encrypted access secret mismatch")
return errors.New("encrypted secret mismatch")
}
}
}
} else {
if expectedAccessSecret != actualAccessSecret {
return errors.New("S3 access secret mismatch")
return errors.New("secret mismatch")
}
}
return nil

View file

@ -433,6 +433,23 @@ func TestAddUserInvalidFsConfig(t *testing.T) {
u.FsConfig.GCSConfig.Credentials = invalidBase64{}
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
assert.NoError(t, err)
u = getTestUser()
u.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
u.FsConfig.AzBlobConfig.SASURL = "http://foo\x7f.com/"
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
assert.NoError(t, err)
u.FsConfig.AzBlobConfig.SASURL = ""
u.FsConfig.AzBlobConfig.AccountName = "name"
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
assert.NoError(t, err)
u.FsConfig.AzBlobConfig.Container = "container"
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
assert.NoError(t, err)
u.FsConfig.AzBlobConfig.AccountKey = "key"
u.FsConfig.AzBlobConfig.KeyPrefix = "/amedir/subdir/"
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
assert.NoError(t, err)
}
func TestAddUserInvalidVirtualFolders(t *testing.T) {
@ -1024,6 +1041,50 @@ func TestUserGCSConfig(t *testing.T) {
assert.NoError(t, err)
}
func TestUserAzureBlobConfig(t *testing.T) {
user, _, err := httpd.AddUser(getTestUser(), http.StatusOK)
assert.NoError(t, err)
user.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
user.FsConfig.AzBlobConfig.Container = "test"
user.FsConfig.AzBlobConfig.AccountName = "Server-Account-Name"
user.FsConfig.AzBlobConfig.AccountKey = "Server-Account-Key"
user.FsConfig.AzBlobConfig.Endpoint = "http://127.0.0.1:9000"
user.FsConfig.AzBlobConfig.UploadPartSize = 8
user, _, err = httpd.UpdateUser(user, http.StatusOK, "")
assert.NoError(t, err)
_, err = httpd.RemoveUser(user, http.StatusOK)
assert.NoError(t, err)
user.Password = defaultPassword
user.ID = 0
secret, _ := utils.EncryptData("Server-Account-Key")
user.FsConfig.AzBlobConfig.AccountKey = secret
user, _, err = httpd.AddUser(user, http.StatusOK)
assert.NoError(t, err)
user.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
user.FsConfig.AzBlobConfig.Container = "test-container"
user.FsConfig.AzBlobConfig.AccountKey = "Server-Account-Key1"
user.FsConfig.AzBlobConfig.Endpoint = "http://localhost:9001"
user.FsConfig.AzBlobConfig.KeyPrefix = "somedir/subdir"
user.FsConfig.AzBlobConfig.UploadConcurrency = 5
user, _, err = httpd.UpdateUser(user, http.StatusOK, "")
assert.NoError(t, err)
user.FsConfig.Provider = dataprovider.LocalFilesystemProvider
user.FsConfig.AzBlobConfig = vfs.AzBlobFsConfig{}
user, _, err = httpd.UpdateUser(user, http.StatusOK, "")
assert.NoError(t, err)
// test user without access key and access secret (sas)
user.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
user.FsConfig.AzBlobConfig.SASURL = "https://myaccount.blob.core.windows.net/pictures/profile.jpg?sv=2012-02-12&st=2009-02-09&se=2009-02-10&sr=c&sp=r&si=YWJjZGVmZw%3d%3d&sig=dD80ihBh5jfNpymO5Hg1IdiJIEvHcJpCMiCMnN%2fRnbI%3d"
user.FsConfig.AzBlobConfig.KeyPrefix = "somedir/subdir"
user.FsConfig.AzBlobConfig.UploadPartSize = 6
user.FsConfig.AzBlobConfig.UploadConcurrency = 4
user, _, err = httpd.UpdateUser(user, http.StatusOK, "")
assert.NoError(t, err)
_, err = httpd.RemoveUser(user, http.StatusOK)
assert.NoError(t, err)
}
func TestUpdateUserNoCredentials(t *testing.T) {
user, _, err := httpd.AddUser(getTestUser(), http.StatusOK)
assert.NoError(t, err)
@ -2736,6 +2797,96 @@ func TestWebUserGCSMock(t *testing.T) {
err = os.Remove(credentialsFilePath)
assert.NoError(t, err)
}
func TestWebUserAzureBlobMock(t *testing.T) {
user := getTestUser()
userAsJSON := getUserAsJSON(t, user)
req, _ := http.NewRequest(http.MethodPost, userPath, bytes.NewBuffer(userAsJSON))
rr := executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
err := render.DecodeJSON(rr.Body, &user)
assert.NoError(t, err)
user.FsConfig.Provider = dataprovider.AzureBlobFilesystemProvider
user.FsConfig.AzBlobConfig.Container = "container"
user.FsConfig.AzBlobConfig.AccountName = "aname"
user.FsConfig.AzBlobConfig.AccountKey = "access-skey"
user.FsConfig.AzBlobConfig.Endpoint = "http://127.0.0.1:9000/path?b=c"
user.FsConfig.AzBlobConfig.KeyPrefix = "somedir/subdir/"
user.FsConfig.AzBlobConfig.UploadPartSize = 5
user.FsConfig.AzBlobConfig.UploadConcurrency = 4
user.FsConfig.AzBlobConfig.UseEmulator = true
form := make(url.Values)
form.Set("username", user.Username)
form.Set("home_dir", user.HomeDir)
form.Set("uid", "0")
form.Set("gid", strconv.FormatInt(int64(user.GID), 10))
form.Set("max_sessions", strconv.FormatInt(int64(user.MaxSessions), 10))
form.Set("quota_size", strconv.FormatInt(user.QuotaSize, 10))
form.Set("quota_files", strconv.FormatInt(int64(user.QuotaFiles), 10))
form.Set("upload_bandwidth", "0")
form.Set("download_bandwidth", "0")
form.Set("permissions", "*")
form.Set("sub_dirs_permissions", "")
form.Set("status", strconv.Itoa(user.Status))
form.Set("expiration_date", "2020-01-01 00:00:00")
form.Set("allowed_ip", "")
form.Set("denied_ip", "")
form.Set("fs_provider", "3")
form.Set("az_container", user.FsConfig.AzBlobConfig.Container)
form.Set("az_account_name", user.FsConfig.AzBlobConfig.AccountName)
form.Set("az_account_key", user.FsConfig.AzBlobConfig.AccountKey)
form.Set("az_sas_url", user.FsConfig.AzBlobConfig.SASURL)
form.Set("az_endpoint", user.FsConfig.AzBlobConfig.Endpoint)
form.Set("az_key_prefix", user.FsConfig.AzBlobConfig.KeyPrefix)
form.Set("az_use_emulator", "checked")
form.Set("allowed_extensions", "/dir1::.jpg,.png")
form.Set("denied_extensions", "/dir2::.zip")
form.Set("max_upload_file_size", "0")
// test invalid az_upload_part_size
form.Set("az_upload_part_size", "a")
b, contentType, _ := getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b)
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
// test invalid az_upload_concurrency
form.Set("az_upload_part_size", strconv.FormatInt(user.FsConfig.AzBlobConfig.UploadPartSize, 10))
form.Set("az_upload_concurrency", "a")
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b)
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
// now add the user
form.Set("az_upload_concurrency", strconv.Itoa(user.FsConfig.AzBlobConfig.UploadConcurrency))
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b)
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusSeeOther, rr.Code)
req, _ = http.NewRequest(http.MethodGet, userPath+"?limit=1&offset=0&order=ASC&username="+user.Username, nil)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
var users []dataprovider.User
err = render.DecodeJSON(rr.Body, &users)
assert.NoError(t, err)
assert.Equal(t, 1, len(users))
updateUser := users[0]
assert.Equal(t, int64(1577836800000), updateUser.ExpirationDate)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.Container, user.FsConfig.AzBlobConfig.Container)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.AccountName, user.FsConfig.AzBlobConfig.AccountName)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.Endpoint, user.FsConfig.AzBlobConfig.Endpoint)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.SASURL, user.FsConfig.AzBlobConfig.SASURL)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.KeyPrefix, user.FsConfig.AzBlobConfig.KeyPrefix)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.UploadPartSize, user.FsConfig.AzBlobConfig.UploadPartSize)
assert.Equal(t, updateUser.FsConfig.AzBlobConfig.UploadConcurrency, user.FsConfig.AzBlobConfig.UploadConcurrency)
assert.Equal(t, 2, len(updateUser.Filters.FileExtensions))
if !strings.HasPrefix(updateUser.FsConfig.AzBlobConfig.AccountKey, "$aes$") {
t.Error("azure account secret is not encrypted")
}
req, _ = http.NewRequest(http.MethodDelete, userPath+"/"+strconv.FormatInt(user.ID, 10), nil)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
}
func TestAddWebFoldersMock(t *testing.T) {
mappedPath := filepath.Clean(os.TempDir())

View file

@ -353,6 +353,47 @@ func TestCompareUserGCSConfig(t *testing.T) {
expected.FsConfig.GCSConfig.AutomaticCredentials = 0
}
func TestCompareUserAzureConfig(t *testing.T) {
expected := &dataprovider.User{}
actual := &dataprovider.User{}
expected.FsConfig.AzBlobConfig.Container = "a"
err := compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.Container = ""
expected.FsConfig.AzBlobConfig.AccountName = "aname"
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.AccountName = ""
expected.FsConfig.AzBlobConfig.AccountKey = "akey"
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.AccountKey = ""
expected.FsConfig.AzBlobConfig.Endpoint = "endpt"
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.Endpoint = ""
expected.FsConfig.AzBlobConfig.SASURL = "url"
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.SASURL = ""
expected.FsConfig.AzBlobConfig.UploadPartSize = 1
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.UploadPartSize = 0
expected.FsConfig.AzBlobConfig.UploadConcurrency = 1
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.UploadConcurrency = 0
expected.FsConfig.AzBlobConfig.KeyPrefix = "prefix/"
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.KeyPrefix = ""
expected.FsConfig.AzBlobConfig.UseEmulator = true
err = compareUserFsConfig(expected, actual)
assert.Error(t, err)
expected.FsConfig.AzBlobConfig.UseEmulator = false
}
func TestGCSWebInvalidFormFile(t *testing.T) {
form := make(url.Values)
form.Set("username", "test_username")

View file

@ -925,10 +925,8 @@ components:
minLength: 1
access_key:
type: string
minLength: 1
access_secret:
type: string
minLength: 1
description: the access secret is stored encrypted (AES-256-GCM)
endpoint:
type: string
@ -980,6 +978,37 @@ components:
- bucket
nullable: true
description: Google Cloud Storage configuration details
AzureBlobFsConfig:
type: object
properties:
container:
type: string
account_name:
type: string
description: Storage Account Name, leave blank to use SAS URL
account_key:
type: string
description: Storage Account Key leave blank to use SAS URL. The access key is stored encrypted (AES-256-GCM)
sas_url:
type: string
description: Shared access signature URL, leave blank if using account/key
endpoint:
type: string
description: optional endpoint. Default is "blob.core.windows.net". If you use the emulator the endpoint must include the protocol, for example "http://127.0.0.1:10000"
upload_part_size:
type: integer
description: the buffer size (in MB) to use for multipart uploads. If this value is set to zero, the default value (4MB) will be used.
upload_concurrency:
type: integer
description: the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used
key_prefix:
type: string
description: key_prefix is similar to a chroot directory for a local filesystem. If specified the user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole container contents will be available
example: folder/subfolder/
use_emulator:
type: boolean
nullable: true
description: Azure Blob Storage configuration details
FilesystemConfig:
type: object
properties:
@ -989,15 +1018,19 @@ components:
- 0
- 1
- 2
- 3
description: >
Providers:
* `0` - local filesystem
* `0` - Local filesystem
* `1` - S3 Compatible Object Storage
* `2` - Google Cloud Storage
* `3` - Azure Blob Storage
s3config:
$ref: '#/components/schemas/S3Config'
gcsconfig:
$ref: '#/components/schemas/GCSConfig'
azblobconfig:
$ref: '#/components/schemas/AzureBlobFsConfig'
description: Storage filesystem details
BaseVirtualFolder:
type: object

View file

@ -431,6 +431,22 @@ func getFsConfigFromUserPostFields(r *http.Request) (dataprovider.Filesystem, er
}
fs.GCSConfig.Credentials = fileBytes
fs.GCSConfig.AutomaticCredentials = 0
} else if fs.Provider == dataprovider.AzureBlobFilesystemProvider {
fs.AzBlobConfig.Container = r.Form.Get("az_container")
fs.AzBlobConfig.AccountName = r.Form.Get("az_account_name")
fs.AzBlobConfig.AccountKey = r.Form.Get("az_account_key")
fs.AzBlobConfig.SASURL = r.Form.Get("az_sas_url")
fs.AzBlobConfig.Endpoint = r.Form.Get("az_endpoint")
fs.AzBlobConfig.KeyPrefix = r.Form.Get("az_key_prefix")
fs.AzBlobConfig.UseEmulator = len(r.Form.Get("az_use_emulator")) > 0
fs.AzBlobConfig.UploadPartSize, err = strconv.ParseInt(r.Form.Get("az_upload_part_size"), 10, 64)
if err != nil {
return fs, err
}
fs.AzBlobConfig.UploadConcurrency, err = strconv.Atoi(r.Form.Get("az_upload_concurrency"))
if err != nil {
return fs, err
}
}
return fs, nil
}

View file

@ -42,8 +42,8 @@ provides:
description: |
Fully featured and highly configurable SFTP server
SFTPGo has optional FTP/S and WebDAV support.
It can serve local filesystem, S3 (Compatible) Object Storages
and Google Cloud Storage
It can serve local filesystem, S3 (Compatible) Object Storage,
Google Cloud Storage and Azure Blob Storage.
vendor: "SFTPGo"
homepage: "https://github.com/drakkan/sftpgo"
license: "GPL-3.0"

View file

@ -13,5 +13,5 @@ Depends: ${shlibs:Depends}, ${misc:Depends}
Recommends: bash-completion, python3-requests, python3-pygments, mime-support
Description: Fully featured and highly configurable SFTP server
SFTPGo has optional FTP/S and WebDAV support.
It can serve local filesystem, S3 (Compatible) Object Storages
and Google Cloud Storage
It can serve local filesystem, S3 (Compatible) Object Storage,
Google Cloud Storage and Azure Blob Storage.

View file

@ -82,20 +82,18 @@ func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter,
// It handles download bandwidth throttling too
func (t *transfer) ReadAt(p []byte, off int64) (n int, err error) {
t.Connection.UpdateLastActivity()
var readed int
var e error
readed, e = t.readerAt.ReadAt(p, off)
atomic.AddInt64(&t.BytesSent, int64(readed))
n, err = t.readerAt.ReadAt(p, off)
atomic.AddInt64(&t.BytesSent, int64(n))
if e != nil && e != io.EOF {
if err != nil && err != io.EOF {
if t.GetType() == common.TransferDownload {
t.TransferError(e)
t.TransferError(err)
}
return readed, e
return
}
t.HandleThrottle()
return readed, e
return
}
// WriteAt writes len(p) bytes to the uploaded file starting at byte offset off and updates the bytes received.
@ -107,21 +105,19 @@ func (t *transfer) WriteAt(p []byte, off int64) (n int, err error) {
t.TransferError(err)
return 0, err
}
var written int
var e error
written, e = t.writerAt.WriteAt(p, off)
atomic.AddInt64(&t.BytesReceived, int64(written))
n, err = t.writerAt.WriteAt(p, off)
atomic.AddInt64(&t.BytesReceived, int64(n))
if t.MaxWriteSize > 0 && e == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
e = common.ErrQuotaExceeded
if t.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
err = common.ErrQuotaExceeded
}
if e != nil {
t.TransferError(e)
return written, e
if err != nil {
t.TransferError(err)
return
}
t.HandleThrottle()
return written, e
return
}
// Close it is called when the transfer is completed.

View file

@ -278,6 +278,7 @@
<option value="0" {{if eq .User.FsConfig.Provider 0 }}selected{{end}}>local</option>
<option value="1" {{if eq .User.FsConfig.Provider 1 }}selected{{end}}>Amazon S3 (Compatible)</option>
<option value="2" {{if eq .User.FsConfig.Provider 2 }}selected{{end}}>Google Cloud Storage</option>
<option value="3" {{if eq .User.FsConfig.Provider 3 }}selected{{end}}>Azure Blob Storage</option>
</select>
</div>
</div>
@ -399,6 +400,81 @@
</div>
</div>
<div class="form-group row azblob">
<label for="idAzContainer" class="col-sm-2 col-form-label">Container</label>
<div class="col-sm-3">
<input type="text" class="form-control" id="idAzContainer" name="az_container" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.Container}}" maxlength="255">
</div>
<div class="col-sm-2"></div>
<label for="idAzAccountName" class="col-sm-2 col-form-label">Account Name</label>
<div class="col-sm-3">
<input type="text" class="form-control" id="idAzAccountName" name="az_account_name" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.AccountName}}" maxlength="255">
</div>
</div>
<div class="form-group row azblob">
<label for="idAzAccountKey" class="col-sm-2 col-form-label">Account Key</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="idAzAccountKey" name="az_account_key" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.AccountKey}}" maxlength="255">
</div>
</div>
<div class="form-group row azblob">
<label for="idAzSASURL" class="col-sm-2 col-form-label">SAS URL</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="idAzSASURL" name="az_sas_url" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.SASURL}}" maxlength="255">
</div>
</div>
<div class="form-group row azblob">
<label for="idAzEndpoint" class="col-sm-2 col-form-label">Endpoint</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="idAzEndpoint" name="az_endpoint" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.Endpoint}}" maxlength="255">
</div>
</div>
<div class="form-group row azblob">
<label for="idAzPartSize" class="col-sm-2 col-form-label">UL Part Size (MB)</label>
<div class="col-sm-3">
<input type="number" class="form-control" id="idAzPartSize" name="az_upload_part_size" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.UploadPartSize}}" aria-describedby="AzPartSizeHelpBlock">
<small id="AzPartSizeHelpBlock" class="form-text text-muted">
The buffer size for multipart uploads. Zero means the default (4 MB)
</small>
</div>
<div class="col-sm-2"></div>
<label for="idAzUploadConcurrency" class="col-sm-2 col-form-label">UL Concurrency</label>
<div class="col-sm-3">
<input type="number" class="form-control" id="idAzUploadConcurrency" name="az_upload_concurrency" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.UploadConcurrency}}" min="0" aria-describedby="AzConcurrencyHelpBlock">
<small id="AzConcurrencyHelpBlock" class="form-text text-muted">
How many parts are uploaded in parallel. Zero means the default (2)
</small>
</div>
</div>
<div class="form-group row azblob">
<label for="idAzKeyPrefix" class="col-sm-2 col-form-label">Key Prefix</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="idAzKeyPrefix" name="az_key_prefix" placeholder=""
value="{{.User.FsConfig.AzBlobConfig.KeyPrefix}}" maxlength="255" aria-describedby="AzKeyPrefixHelpBlock">
<small id="AzKeyPrefixHelpBlock" class="form-text text-muted">
Similar to a chroot for local filesystem. Cannot start with "/". Example: "somedir/subdir/".
</small>
</div>
</div>
<div class="form-group azblob">
<div class="form-check">
<input type="checkbox" class="form-check-input" id="idUseEmulator" name="az_use_emulator" {{if .User.FsConfig.AzBlobConfig.UseEmulator}}checked{{end}}>
<label for="idUseEmulator" class="form-check-label">Use Azure Blob emulator</label>
</div>
</div>
{{if not .IsAdd}}
<div class="form-group">
<div class="form-check">
@ -461,15 +537,27 @@
if (val == '1'){
$('.form-group.row.gcs').hide();
$('.form-group.gcs').hide();
$('.form-group.row.azblob').hide();
$('.form-group.azblob').hide();
$('.form-group.row.s3').show();
} else if (val == '2'){
$('.form-group.row.gcs').show();
$('.form-group.gcs').show();
$('.form-group.row.azblob').hide();
$('.form-group.azblob').hide();
$('.form-group.row.s3').hide();
} else if (val == '3'){
$('.form-group.row.azblob').show();
$('.form-group.azblob').show();
$('.form-group.row.gcs').hide();
$('.form-group.gcs').hide();
$('.form-group.row.s3').hide();
} else {
$('.form-group.row.gcs').hide();
$('.form-group.gcs').hide();
$('.form-group.row.s3').hide();
$('.form-group.row.azblob').hide();
$('.form-group.azblob').hide();
}
}
</script>

724
vfs/azblobfs.go Normal file
View file

@ -0,0 +1,724 @@
// +build !noazblob
package vfs
import (
"context"
"errors"
"fmt"
"io"
"mime"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/eikenb/pipeat"
"github.com/drakkan/sftpgo/logger"
"github.com/drakkan/sftpgo/utils"
"github.com/drakkan/sftpgo/version"
)
const azureDefaultEndpoint = "blob.core.windows.net"
// max time of an azure web request response window (whether or not data is flowing)
// this is the same value used in rclone
var maxTryTimeout = time.Hour * 24 * 365
// AzureBlobFs is a Fs implementation for Azure Blob storage.
type AzureBlobFs struct {
connectionID string
localTempDir string
config AzBlobFsConfig
svc *azblob.ServiceURL
containerURL azblob.ContainerURL
ctxTimeout time.Duration
ctxLongTimeout time.Duration
}
func init() {
version.AddFeature("+azblob")
}
// NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage
func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, error) {
fs := AzureBlobFs{
connectionID: connectionID,
localTempDir: localTempDir,
config: config,
ctxTimeout: 30 * time.Second,
ctxLongTimeout: 300 * time.Second,
}
if err := ValidateAzBlobFsConfig(&fs.config); err != nil {
return fs, err
}
if fs.config.AccountKey != "" {
accountKey, err := utils.DecryptData(fs.config.AccountKey)
if err != nil {
return fs, err
}
fs.config.AccountKey = accountKey
}
setConfigDefaults(&fs)
if fs.config.SASURL != "" {
u, err := url.Parse(fs.config.SASURL)
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{
Retry: azblob.RetryOptions{
TryTimeout: maxTryTimeout,
},
Telemetry: azblob.TelemetryOptions{
Value: "SFTPGo",
},
})
// Check if we have container level SAS or account level SAS
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
return fs, fmt.Errorf("Container name in SAS URL %#v and container provided %#v do not match",
parts.ContainerName, fs.config.Container)
}
fs.svc = nil
fs.containerURL = azblob.NewContainerURL(*u, pipeline)
} else {
if fs.config.Container == "" {
return fs, errors.New("container is required with this SAS URL")
}
serviceURL := azblob.NewServiceURL(*u, pipeline)
fs.svc = &serviceURL
fs.containerURL = fs.svc.NewContainerURL(fs.config.Container)
}
return fs, nil
}
credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey)
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
var u *url.URL
if fs.config.UseEmulator {
// for the emulator we expect the endpoint prefixed with the protocol, for example:
// http://127.0.0.1:10000
u, err = url.Parse(fmt.Sprintf("%s/%s", fs.config.Endpoint, fs.config.AccountName))
} else {
u, err = url.Parse(fmt.Sprintf("https://%s.%s", fs.config.AccountName, fs.config.Endpoint))
}
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
TryTimeout: maxTryTimeout,
},
Telemetry: azblob.TelemetryOptions{
Value: "SFTPGo",
},
})
serviceURL := azblob.NewServiceURL(*u, pipeline)
fs.svc = &serviceURL
fs.containerURL = fs.svc.NewContainerURL(fs.config.Container)
return fs, nil
}
func setConfigDefaults(fs *AzureBlobFs) {
if fs.config.Endpoint == "" {
fs.config.Endpoint = azureDefaultEndpoint
}
if fs.config.UploadPartSize == 0 {
fs.config.UploadPartSize = 4
}
fs.config.UploadPartSize *= 1024 * 1024
if fs.config.UploadConcurrency == 0 {
fs.config.UploadConcurrency = 2
}
}
// Name returns the name for the Fs implementation
func (fs AzureBlobFs) Name() string {
if fs.config.SASURL != "" {
return fmt.Sprintf("Azure Blob SAS URL %#v", fs.config.Container)
}
return fmt.Sprintf("Azure Blob container %#v", fs.config.Container)
}
// ConnectionID returns the connection ID associated to this Fs implementation
func (fs AzureBlobFs) ConnectionID() string {
return fs.connectionID
}
// Stat returns a FileInfo describing the named file
func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) {
if name == "" || name == "." {
err := fs.checkIfBucketExists()
if err != nil {
return nil, err
}
return NewFileInfo(name, true, 0, time.Now(), false), nil
}
if fs.config.KeyPrefix == name+"/" {
return NewFileInfo(name, true, 0, time.Now(), false), nil
}
prefix := fs.getPrefixForStat(name)
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := fs.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: prefix,
})
if err != nil {
return nil, err
}
marker = listBlob.NextMarker
for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
if fs.isEqual(blobPrefix.Name, name) {
return NewFileInfo(name, true, 0, time.Now(), false), nil
}
}
for _, blobInfo := range listBlob.Segment.BlobItems {
if fs.isEqual(blobInfo.Name, name) {
isDir := false
if blobInfo.Properties.ContentType != nil {
isDir = (*blobInfo.Properties.ContentType == dirMimeType)
}
size := int64(0)
if blobInfo.Properties.ContentLength != nil {
size = *blobInfo.Properties.ContentLength
}
return NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false), nil
}
}
}
return nil, errors.New("404 no such file or directory")
}
// Lstat returns a FileInfo describing the named file
func (fs AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
return fs.Stat(name)
}
// Open opens the named file for reading
func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
ctx, cancelFn := context.WithCancel(context.Background())
blobDownloadResponse, err := blobBlockURL.Download(ctx, offset, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
if err != nil {
r.Close()
w.Close()
cancelFn()
return nil, nil, nil, err
}
body := blobDownloadResponse.Body(azblob.RetryReaderOptions{
MaxRetryRequests: 3,
})
go func() {
defer cancelFn()
defer body.Close()
n, err := io.Copy(w, body)
w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
}()
return nil, r, cancelFn, nil
}
// Create creates or opens the named file for writing
func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeWriter(w)
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
ctx, cancelFn := context.WithCancel(context.Background())
headers := azblob.BlobHTTPHeaders{}
var contentType string
if flag == -1 {
contentType = dirMimeType
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
if contentType != "" {
headers.ContentType = contentType
}
go func() {
defer cancelFn()
uploadOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(fs.config.UploadPartSize),
BlobHTTPHeaders: headers,
MaxBuffers: fs.config.UploadConcurrency,
}
_, err := azblob.UploadStreamToBlockBlob(ctx, r, blobBlockURL, uploadOptions)
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, r.GetReadedBytes(), err)
}()
return nil, p, cancelFn, nil
}
// Rename renames (moves) source to target.
// We don't support renaming non empty directories since we should
// rename all the contents too and this could take long time: think
// about directories with thousands of files, for each file we should
// execute a StartCopyFromURL call.
func (fs AzureBlobFs) Rename(source, target string) error {
if source == target {
return nil
}
fi, err := fs.Stat(source)
if err != nil {
return err
}
if fi.IsDir() {
contents, err := fs.ReadDir(source)
if err != nil {
return err
}
if len(contents) > 0 {
return fmt.Errorf("Cannot rename non empty directory: %#v", source)
}
}
dstBlobURL := fs.containerURL.NewBlobURL(target)
srcURL := fs.containerURL.NewBlobURL(source).URL()
md := azblob.Metadata{}
mac := azblob.ModifiedAccessConditions{}
bac := azblob.BlobAccessConditions{}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
resp, err := dstBlobURL.StartCopyFromURL(ctx, srcURL, md, mac, bac)
if err != nil {
return err
}
copyStatus := resp.CopyStatus()
nErrors := 0
for copyStatus == azblob.CopyStatusPending {
// Poll until the copy is complete.
time.Sleep(500 * time.Millisecond)
propertiesResp, err := dstBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
// A GetProperties failure may be transient, so allow a couple
// of them before giving up.
nErrors++
if ctx.Err() != nil || nErrors == 3 {
return err
}
} else {
copyStatus = propertiesResp.CopyStatus()
}
}
if copyStatus != azblob.CopyStatusSuccess {
return fmt.Errorf("Copy failed with status: %s", copyStatus)
}
return fs.Remove(source, fi.IsDir())
}
// Remove removes the named file or (empty) directory.
func (fs AzureBlobFs) Remove(name string, isDir bool) error {
if isDir {
contents, err := fs.ReadDir(name)
if err != nil {
return err
}
if len(contents) > 0 {
return fmt.Errorf("Cannot remove non empty directory: %#v", name)
}
}
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
_, err := blobBlockURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
return err
}
// Mkdir creates a new directory with the specified name and default permissions
func (fs AzureBlobFs) Mkdir(name string) error {
_, err := fs.Stat(name)
if !fs.IsNotExist(err) {
return err
}
_, w, _, err := fs.Create(name, -1)
if err != nil {
return err
}
return w.Close()
}
// Symlink creates source as a symbolic link to target.
func (AzureBlobFs) Symlink(source, target string) error {
return errors.New("403 symlinks are not supported")
}
// Readlink returns the destination of the named symbolic link
func (AzureBlobFs) Readlink(name string) (string, error) {
return "", errors.New("403 readlink is not supported")
}
// Chown changes the numeric uid and gid of the named file.
// Silently ignored.
func (AzureBlobFs) Chown(name string, uid int, gid int) error {
return nil
}
// Chmod changes the mode of the named file to mode.
// Silently ignored.
func (AzureBlobFs) Chmod(name string, mode os.FileMode) error {
return nil
}
// Chtimes changes the access and modification times of the named file.
// Silently ignored.
func (AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error {
return errors.New("403 chtimes is not supported")
}
// Truncate changes the size of the named file.
// Truncate by path is not supported, while truncating an opened
// file is handled inside base transfer
func (AzureBlobFs) Truncate(name string, size int64) error {
return errors.New("403 truncate is not supported")
}
// ReadDir reads the directory named by dirname and returns
// a list of directory entries.
func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
var result []os.FileInfo
// dirname must be already cleaned
prefix := ""
if dirname != "" && dirname != "." {
prefix = strings.TrimPrefix(dirname, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
prefixes := make(map[string]bool)
for marker := (azblob.Marker{}); marker.NotDone(); {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
listBlob, err := fs.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: prefix,
})
if err != nil {
return nil, err
}
marker = listBlob.NextMarker
for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
// we don't support prefixes == "/" this will be sent if a key starts with "/"
if blobPrefix.Name == "/" {
continue
}
name := strings.TrimPrefix(blobPrefix.Name, prefix)
result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
prefixes[strings.TrimSuffix(name, "/")] = true
}
for _, blobInfo := range listBlob.Segment.BlobItems {
name := strings.TrimPrefix(blobInfo.Name, prefix)
size := int64(0)
if blobInfo.Properties.ContentLength != nil {
size = *blobInfo.Properties.ContentLength
}
isDir := false
if blobInfo.Properties.ContentType != nil {
isDir = (*blobInfo.Properties.ContentType == dirMimeType)
if isDir {
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
if _, ok := prefixes[name]; ok {
continue
}
}
}
result = append(result, NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false))
}
}
return result, nil
}
// IsUploadResumeSupported returns true if upload resume is supported.
// Upload Resume is not supported on Azure Blob
func (AzureBlobFs) IsUploadResumeSupported() bool {
return false
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
// Azure Blob uploads are already atomic, we don't need to upload to a temporary
// file
func (AzureBlobFs) IsAtomicUploadSupported() bool {
return false
}
// IsNotExist returns a boolean indicating whether the error is known to
// report that a file or directory does not exist
func (AzureBlobFs) IsNotExist(err error) bool {
if err == nil {
return false
}
if storageErr, ok := err.(azblob.StorageError); ok {
if storageErr.Response().StatusCode == http.StatusNotFound { //nolint:bodyclose
return true
}
if storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound ||
storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return true
}
}
return strings.Contains(err.Error(), "404")
}
// IsPermission returns a boolean indicating whether the error is known to
// report that permission is denied.
func (AzureBlobFs) IsPermission(err error) bool {
if err == nil {
return false
}
if storageErr, ok := err.(azblob.StorageError); ok {
code := storageErr.Response().StatusCode //nolint:bodyclose
if code == http.StatusForbidden || code == http.StatusUnauthorized {
return true
}
if storageErr.ServiceCode() == azblob.ServiceCodeInsufficientAccountPermissions ||
storageErr.ServiceCode() == azblob.ServiceCodeInvalidAuthenticationInfo ||
storageErr.ServiceCode() == azblob.ServiceCodeUnauthorizedBlobOverwrite {
return true
}
}
return strings.Contains(err.Error(), "403")
}
// CheckRootPath creates the specified local root directory if it does not exists
func (fs AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
// we need a local directory for temporary files
osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
return osFs.CheckRootPath(username, uid, gid)
}
// ScanRootDirContents returns the number of files contained in the bucket,
// and their size
func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) {
numFiles := 0
size := int64(0)
for marker := (azblob.Marker{}); marker.NotDone(); {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
listBlob, err := fs.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: fs.config.KeyPrefix,
})
if err != nil {
return numFiles, size, err
}
marker = listBlob.NextMarker
for _, blobInfo := range listBlob.Segment.BlobItems {
isDir := false
if blobInfo.Properties.ContentType != nil {
isDir = (*blobInfo.Properties.ContentType == dirMimeType)
}
blobSize := int64(0)
if blobInfo.Properties.ContentLength != nil {
blobSize = *blobInfo.Properties.ContentLength
}
if isDir && blobSize == 0 {
continue
}
numFiles++
size += blobSize
}
}
return numFiles, size, nil
}
// GetDirSize returns the number of files and the size for a folder
// including any subfolders
func (AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
return 0, 0, errUnsupported
}
// GetAtomicUploadPath returns the path to use for an atomic upload.
// Azure Blob Storage uploads are already atomic, we never call this method
func (AzureBlobFs) GetAtomicUploadPath(name string) string {
return ""
}
// GetRelativePath returns the path for a file relative to the user's home dir.
// This is the path as seen by SFTPGo users
func (fs AzureBlobFs) GetRelativePath(name string) string {
rel := path.Clean(name)
if rel == "." {
rel = ""
}
if !path.IsAbs(rel) {
rel = "/" + rel
}
if len(fs.config.KeyPrefix) > 0 {
if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
rel = "/"
}
rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
}
return rel
}
// Walk walks the file tree rooted at root, calling walkFn for each file or
// directory in the tree, including root
func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
prefix := ""
if root != "" && root != "." {
prefix = strings.TrimPrefix(root, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
for marker := (azblob.Marker{}); marker.NotDone(); {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
listBlob, err := fs.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: prefix,
})
if err != nil {
return err
}
marker = listBlob.NextMarker
for _, blobInfo := range listBlob.Segment.BlobItems {
isDir := false
if blobInfo.Properties.ContentType != nil {
isDir = (*blobInfo.Properties.ContentType == dirMimeType)
}
name := path.Clean(blobInfo.Name)
if len(name) == 0 {
continue
}
blobSize := int64(0)
if blobInfo.Properties.ContentLength != nil {
blobSize = *blobInfo.Properties.ContentLength
}
err = walkFn(blobInfo.Name, NewFileInfo(name, isDir, blobSize, blobInfo.Properties.LastModified, false), nil)
if err != nil {
return err
}
}
}
return walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), nil)
}
// Join joins any number of path elements into a single path
func (AzureBlobFs) Join(elem ...string) string {
return strings.TrimPrefix(path.Join(elem...), "/")
}
// HasVirtualFolders returns true if folders are emulated
func (AzureBlobFs) HasVirtualFolders() bool {
return true
}
// ResolvePath returns the matching filesystem path for the specified sftp path
func (fs AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
if !path.IsAbs(virtualPath) {
virtualPath = path.Clean("/" + virtualPath)
}
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
}
// GetMimeType implements MimeTyper interface
func (fs AzureBlobFs) GetMimeType(name string) (string, error) {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
response, err := blobBlockURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
return "", err
}
return response.ContentType(), nil
}
func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool {
if key == virtualName {
return true
}
if key == virtualName+"/" {
return true
}
if key+"/" == virtualName {
return true
}
return false
}
func (fs *AzureBlobFs) checkIfBucketExists() error {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
_, err := fs.containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
return err
}
func (fs *AzureBlobFs) getPrefixForStat(name string) string {
prefix := path.Dir(name)
if prefix == "/" || prefix == "." || prefix == "" {
prefix = ""
} else {
prefix = strings.TrimPrefix(prefix, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
return prefix
}

18
vfs/azblobfs_disabled.go Normal file
View file

@ -0,0 +1,18 @@
// +build noazblob
package vfs
import (
"errors"
"github.com/drakkan/sftpgo/version"
)
func init() {
version.AddFeature("-azblob")
}
// NewAzBlobFs returns an error, Azure Blob storage is disabled
func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, error) {
return nil, errors.New("Azure Blob Storage disabled at build time")
}

View file

@ -70,10 +70,10 @@ func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error)
// Name returns the name for the Fs implementation
func (fs GCSFs) Name() string {
return fmt.Sprintf("GCSFs bucket: %#v", fs.config.Bucket)
return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket)
}
// ConnectionID returns the SSH connection ID associated to this Fs implementation
// ConnectionID returns the connection ID associated to this Fs implementation
func (fs GCSFs) ConnectionID() string {
return fs.connectionID
}
@ -82,7 +82,7 @@ func (fs GCSFs) ConnectionID() string {
func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
var result FileInfo
var err error
if len(name) == 0 || name == "." {
if name == "" || name == "." {
err := fs.checkIfBucketExists()
if err != nil {
return result, err
@ -111,7 +111,7 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
metrics.GCSListObjectsCompleted(err)
return result, err
}
if len(attrs.Prefix) > 0 {
if attrs.Prefix != "" {
if fs.isEqual(attrs.Prefix, name) {
result = NewFileInfo(name, true, 0, time.Now(), false)
break
@ -128,7 +128,7 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
}
}
metrics.GCSListObjectsCompleted(nil)
if len(result.Name()) == 0 {
if result.Name() == "" {
err = errors.New("404 no such file or directory")
}
return result, err
@ -181,7 +181,12 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), er
obj := bkt.Object(name)
ctx, cancelFn := context.WithCancel(context.Background())
objectWriter := obj.NewWriter(ctx)
contentType := mime.TypeByExtension(path.Ext(name))
var contentType string
if flag == -1 {
contentType = dirMimeType
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
if contentType != "" {
objectWriter.ObjectAttrs.ContentType = contentType
}
@ -274,7 +279,7 @@ func (fs GCSFs) Mkdir(name string) error {
if !strings.HasSuffix(name, "/") {
name += "/"
}
_, w, _, err := fs.Create(name, 0)
_, w, _, err := fs.Create(name, -1)
if err != nil {
return err
}
@ -322,7 +327,7 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
var result []os.FileInfo
// dirname must be already cleaned
prefix := ""
if len(dirname) > 0 && dirname != "." {
if dirname != "" && dirname != "." {
prefix = strings.TrimPrefix(dirname, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
@ -346,7 +351,7 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
metrics.GCSListObjectsCompleted(err)
return result, err
}
if len(attrs.Prefix) > 0 {
if attrs.Prefix != "" {
name, _ := fs.resolve(attrs.Prefix, prefix)
result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
} else {
@ -442,6 +447,10 @@ func (fs GCSFs) ScanRootDirContents() (int, int64, error) {
if !attrs.Deleted.IsZero() {
continue
}
isDir := strings.HasSuffix(attrs.Name, "/")
if isDir && attrs.Size == 0 {
continue
}
numFiles++
size += attrs.Size
}
@ -456,13 +465,13 @@ func (GCSFs) GetDirSize(dirname string) (int, int64, error) {
}
// GetAtomicUploadPath returns the path to use for an atomic upload.
// S3 uploads are already atomic, we never call this method for S3
// GCS uploads are already atomic, we never call this method for GCS
func (GCSFs) GetAtomicUploadPath(name string) string {
return ""
}
// GetRelativePath returns the path for a file relative to the user's home dir.
// This is the path as seen by SFTP users
// This is the path as seen by SFTPGo users
func (fs GCSFs) GetRelativePath(name string) string {
rel := path.Clean(name)
if rel == "." {
@ -484,7 +493,7 @@ func (fs GCSFs) GetRelativePath(name string) string {
// directory in the tree, including root
func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
prefix := ""
if len(root) > 0 && root != "." {
if root != "" && root != "." {
prefix = strings.TrimPrefix(root, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
@ -522,7 +531,7 @@ func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
}
err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
if err != nil {
break
return err
}
}
@ -541,12 +550,12 @@ func (GCSFs) HasVirtualFolders() bool {
return true
}
// ResolvePath returns the matching filesystem path for the specified sftp path
func (fs GCSFs) ResolvePath(sftpPath string) (string, error) {
if !path.IsAbs(sftpPath) {
sftpPath = path.Clean("/" + sftpPath)
// ResolvePath returns the matching filesystem path for the specified virtual path
func (fs GCSFs) ResolvePath(virtualPath string) (string, error) {
if !path.IsAbs(virtualPath) {
virtualPath = path.Clean("/" + virtualPath)
}
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(sftpPath, "/")), nil
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
}
func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
@ -558,14 +567,14 @@ func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
return result, isDir
}
func (fs *GCSFs) isEqual(key string, sftpName string) bool {
if key == sftpName {
func (fs *GCSFs) isEqual(key string, virtualName string) bool {
if key == virtualName {
return true
}
if key == sftpName+"/" {
if key == virtualName+"/" {
return true
}
if key+"/" == sftpName {
if key+"/" == virtualName {
return true
}
return false
@ -582,7 +591,7 @@ func (fs *GCSFs) checkIfBucketExists() error {
func (fs *GCSFs) getPrefixForStat(name string) string {
prefix := path.Dir(name)
if prefix == "/" || prefix == "." || len(prefix) == 0 {
if prefix == "/" || prefix == "." || prefix == "" {
prefix = ""
} else {
prefix = strings.TrimPrefix(prefix, "/")

View file

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"mime"
"os"
"path"
"path/filepath"
@ -55,11 +56,11 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
}
awsConfig := aws.NewConfig()
if len(fs.config.Region) > 0 {
if fs.config.Region != "" {
awsConfig.WithRegion(fs.config.Region)
}
if len(fs.config.AccessSecret) > 0 {
if fs.config.AccessSecret != "" {
accessSecret, err := utils.DecryptData(fs.config.AccessSecret)
if err != nil {
return fs, err
@ -68,7 +69,7 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret, "")
}
if len(fs.config.Endpoint) > 0 {
if fs.config.Endpoint != "" {
awsConfig.Endpoint = aws.String(fs.config.Endpoint)
awsConfig.S3ForcePathStyle = aws.Bool(true)
}
@ -96,10 +97,10 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
// Name returns the name for the Fs implementation
func (fs S3Fs) Name() string {
return fmt.Sprintf("S3Fs bucket: %#v", fs.config.Bucket)
return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
}
// ConnectionID returns the SSH connection ID associated to this Fs implementation
// ConnectionID returns the connection ID associated to this Fs implementation
func (fs S3Fs) ConnectionID() string {
return fs.connectionID
}
@ -151,7 +152,7 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
return true
})
metrics.S3ListObjectsCompleted(err)
if err == nil && len(result.Name()) == 0 {
if err == nil && result.Name() == "" {
err = errors.New("404 no such file or directory")
}
return result, err
@ -201,11 +202,18 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), err
go func() {
defer cancelFn()
key := name
var contentType string
if flag == -1 {
contentType = dirMimeType
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(key),
Body: r,
StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(key),
Body: r,
StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
ContentEncoding: utils.NilIfEmpty(contentType),
}, func(u *s3manager.Uploader) {
u.Concurrency = fs.config.UploadConcurrency
u.PartSize = fs.config.UploadPartSize
@ -300,7 +308,7 @@ func (fs S3Fs) Mkdir(name string) error {
if !strings.HasSuffix(name, "/") {
name += "/"
}
_, w, _, err := fs.Create(name, 0)
_, w, _, err := fs.Create(name, -1)
if err != nil {
return err
}
@ -446,6 +454,10 @@ func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
Prefix: aws.String(fs.config.KeyPrefix),
}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, fileObject := range page.Contents {
isDir := strings.HasSuffix(*fileObject.Key, "/")
if isDir && *fileObject.Size == 0 {
continue
}
numFiles++
size += *fileObject.Size
}
@ -468,7 +480,7 @@ func (S3Fs) GetAtomicUploadPath(name string) string {
}
// GetRelativePath returns the path for a file relative to the user's home dir.
// This is the path as seen by SFTP users
// This is the path as seen by SFTPGo users
func (fs S3Fs) GetRelativePath(name string) string {
rel := path.Clean(name)
if rel == "." {
@ -533,12 +545,12 @@ func (S3Fs) HasVirtualFolders() bool {
return true
}
// ResolvePath returns the matching filesystem path for the specified sftp path
func (fs S3Fs) ResolvePath(sftpPath string) (string, error) {
if !path.IsAbs(sftpPath) {
sftpPath = path.Clean("/" + sftpPath)
// ResolvePath returns the matching filesystem path for the specified virtual path
func (fs S3Fs) ResolvePath(virtualPath string) (string, error) {
if !path.IsAbs(virtualPath) {
virtualPath = path.Clean("/" + virtualPath)
}
return fs.Join("/", fs.config.KeyPrefix, sftpPath), nil
return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
}
func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
@ -555,14 +567,14 @@ func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
return result, isDir
}
func (fs *S3Fs) isEqual(s3Key *string, sftpName string) bool {
if *s3Key == sftpName {
func (fs *S3Fs) isEqual(s3Key *string, virtualName string) bool {
if *s3Key == virtualName {
return true
}
if "/"+*s3Key == sftpName {
if "/"+*s3Key == virtualName {
return true
}
if "/"+*s3Key == sftpName+"/" {
if "/"+*s3Key == virtualName+"/" {
return true
}
return false

View file

@ -4,6 +4,7 @@ package vfs
import (
"errors"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
@ -16,6 +17,8 @@ import (
"github.com/drakkan/sftpgo/logger"
)
const dirMimeType = "inode/directory"
// Fs defines the interface for filesystem backends
type Fs interface {
Name() string
@ -126,6 +129,41 @@ type GCSFsConfig struct {
StorageClass string `json:"storage_class,omitempty"`
}
// AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
type AzBlobFsConfig struct {
Container string `json:"container,omitempty"`
// Storage Account Name, leave blank to use SAS URL
AccountName string `json:"account_name,omitempty"`
// Storage Account Key leave blank to use SAS URL.
// The access key is stored encrypted (AES-256-GCM)
AccountKey string `json:"account_key,omitempty"`
// Optional endpoint. Default is "blob.core.windows.net".
// If you use the emulator the endpoint must include the protocol,
// for example "http://127.0.0.1:10000"
Endpoint string `json:"endpoint,omitempty"`
// Shared access signature URL, leave blank if using account/key
SASURL string `json:"sas_url,omitempty"`
// KeyPrefix is similar to a chroot directory for local filesystem.
// If specified then the SFTPGo userd will only see objects that starts
// with this prefix and so you can restrict access to a specific
// folder. The prefix, if not empty, must not start with "/" and must
// end with "/".
// If empty the whole bucket contents will be available
KeyPrefix string `json:"key_prefix,omitempty"`
// The buffer size (in MB) to use for multipart uploads.
// If this value is set to zero, the default value (1MB) for the Azure SDK will be used.
// Please note that if the upload bandwidth between the SFTPGo client and SFTPGo server is
// greater than the upload bandwidth between SFTPGo and Azure then the SFTP client have
// to wait for the upload of the last parts to Azure after it ends the file upload to SFTPGo,
// and it may time out.
// Keep this in mind if you customize these parameters.
UploadPartSize int64 `json:"upload_part_size,omitempty"`
// How many parts are uploaded in parallel
UploadConcurrency int `json:"upload_concurrency,omitempty"`
// Set to true if you use an Azure emulator such as Azurite
UseEmulator bool `json:"use_emulator,omitempty"`
}
// PipeWriter defines a wrapper for pipeat.PipeWriterAt.
type PipeWriter struct {
writer *pipeat.PipeWriterAt
@ -194,7 +232,7 @@ func ValidateS3FsConfig(config *S3FsConfig) error {
if len(config.AccessSecret) == 0 && len(config.AccessKey) > 0 {
return errors.New("access_secret cannot be empty with access_key not empty")
}
if len(config.KeyPrefix) > 0 {
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
@ -214,10 +252,10 @@ func ValidateS3FsConfig(config *S3FsConfig) error {
// ValidateGCSFsConfig returns nil if the specified GCS config is valid, otherwise an error
func ValidateGCSFsConfig(config *GCSFsConfig, credentialsFilePath string) error {
if len(config.Bucket) == 0 {
if config.Bucket == "" {
return errors.New("bucket cannot be empty")
}
if len(config.KeyPrefix) > 0 {
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
@ -238,6 +276,36 @@ func ValidateGCSFsConfig(config *GCSFsConfig, credentialsFilePath string) error
return nil
}
// ValidateAzBlobFsConfig returns nil if the specified Azure Blob config is valid, otherwise an error
func ValidateAzBlobFsConfig(config *AzBlobFsConfig) error {
if config.SASURL != "" {
_, err := url.Parse(config.SASURL)
return err
}
if config.Container == "" {
return errors.New("container cannot be empty")
}
if config.AccountName == "" || config.AccountKey == "" {
return errors.New("credentials cannot be empty")
}
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
config.KeyPrefix = path.Clean(config.KeyPrefix)
if !strings.HasSuffix(config.KeyPrefix, "/") {
config.KeyPrefix += "/"
}
}
if config.UploadPartSize < 0 {
return fmt.Errorf("invalid upload part size: %v", config.UploadPartSize)
}
if config.UploadConcurrency < 0 {
return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
}
return nil
}
// SetPathPermissions calls fs.Chown.
// It does nothing for local filesystem on windows
func SetPathPermissions(fs Fs, path string, uid int, gid int) {

View file

@ -122,29 +122,27 @@ func (f *webDavFile) Read(p []byte) (n int, err error) {
f.TransferError(common.ErrOpUnsupported)
return 0, common.ErrOpUnsupported
}
_, r, cancelFn, err := f.Fs.Open(f.GetFsPath(), 0)
_, r, cancelFn, e := f.Fs.Open(f.GetFsPath(), 0)
f.Lock()
f.reader = r
f.ErrTransfer = err
f.ErrTransfer = e
f.BaseTransfer.SetCancelFn(cancelFn)
f.startOffset = 0
f.Unlock()
if err != nil {
return 0, err
if e != nil {
return 0, e
}
}
var readed int
var e error
readed, e = f.reader.Read(p)
atomic.AddInt64(&f.BytesSent, int64(readed))
n, err = f.reader.Read(p)
atomic.AddInt64(&f.BytesSent, int64(n))
if e != nil && e != io.EOF {
f.TransferError(e)
return readed, e
if err != nil && err != io.EOF {
f.TransferError(err)
return
}
f.HandleThrottle()
return readed, e
return
}
// Write writes the uploaded contents.
@ -154,21 +152,19 @@ func (f *webDavFile) Write(p []byte) (n int, err error) {
}
f.Connection.UpdateLastActivity()
var written int
var e error
written, e = f.writer.Write(p)
atomic.AddInt64(&f.BytesReceived, int64(written))
n, err = f.writer.Write(p)
atomic.AddInt64(&f.BytesReceived, int64(n))
if f.MaxWriteSize > 0 && e == nil && atomic.LoadInt64(&f.BytesReceived) > f.MaxWriteSize {
e = common.ErrQuotaExceeded
if f.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&f.BytesReceived) > f.MaxWriteSize {
err = common.ErrQuotaExceeded
}
if e != nil {
f.TransferError(e)
return written, e
if err != nil {
f.TransferError(err)
return
}
f.HandleThrottle()
return written, e
return
}
// Seek sets the offset for the next Read or Write on the writer to offset,