cryptfs: fix quota for overwrites if upload fails
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
parent
3e44a1dd2d
commit
0e54fa5655
11 changed files with 145 additions and 45 deletions
6
go.mod
6
go.mod
|
@ -68,7 +68,7 @@ require (
|
||||||
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b
|
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b
|
||||||
golang.org/x/net v0.0.0-20221004154528-8021a29435af
|
golang.org/x/net v0.0.0-20221004154528-8021a29435af
|
||||||
golang.org/x/oauth2 v0.0.0-20221006150949-b44042a4b9c1
|
golang.org/x/oauth2 v0.0.0-20221006150949-b44042a4b9c1
|
||||||
golang.org/x/sys v0.0.0-20221006211917-84dc82d7e875
|
golang.org/x/sys v0.0.0-20221010170243-090e33056c14
|
||||||
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
|
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
|
||||||
google.golang.org/api v0.98.0
|
google.golang.org/api v0.98.0
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||||
|
@ -118,7 +118,7 @@ require (
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.1.1 // indirect
|
github.com/klauspost/cpuid/v2 v2.1.2 // indirect
|
||||||
github.com/kr/fs v0.1.0 // indirect
|
github.com/kr/fs v0.1.0 // indirect
|
||||||
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
|
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
|
||||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||||
|
@ -159,7 +159,7 @@ require (
|
||||||
golang.org/x/tools v0.1.12 // indirect
|
golang.org/x/tools v0.1.12 // indirect
|
||||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||||
google.golang.org/appengine v1.6.7 // indirect
|
google.golang.org/appengine v1.6.7 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 // indirect
|
google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e // indirect
|
||||||
google.golang.org/grpc v1.50.0 // indirect
|
google.golang.org/grpc v1.50.0 // indirect
|
||||||
google.golang.org/protobuf v1.28.1 // indirect
|
google.golang.org/protobuf v1.28.1 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
|
|
12
go.sum
12
go.sum
|
@ -1064,8 +1064,8 @@ github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
|
||||||
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
|
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
|
||||||
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
|
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
|
||||||
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||||
github.com/klauspost/cpuid/v2 v2.1.1 h1:t0wUqjowdm8ezddV5k0tLWVklVuvLJpoHeb4WBdydm0=
|
github.com/klauspost/cpuid/v2 v2.1.2 h1:XhdX4fqAJUA0yj+kUwMavO0hHrSPAecYdYf1ZmxHvak=
|
||||||
github.com/klauspost/cpuid/v2 v2.1.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
|
github.com/klauspost/cpuid/v2 v2.1.2/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
|
||||||
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
|
@ -1916,8 +1916,8 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20221006211917-84dc82d7e875 h1:AzgQNqF+FKwyQ5LbVrVqOcuuFB67N47F9+htZYH0wFM=
|
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc=
|
||||||
golang.org/x/sys v0.0.0-20221006211917-84dc82d7e875/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
|
@ -2206,8 +2206,8 @@ google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljW
|
||||||
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
||||||
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
||||||
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc=
|
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc=
|
||||||
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 h1:Ezh2cpcnP5Rq60sLensUsFnxh7P6513NLvNtCm9iyJ4=
|
google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e h1:halCgTFuLWDRD61piiNSxPsARANGD3Xl16hPrLgLiIg=
|
||||||
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91/go.mod h1:3526vdqwhZAwq4wsRUaVG555sVgsNmIjRtO7t/JH29U=
|
google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e/go.mod h1:3526vdqwhZAwq4wsRUaVG555sVgsNmIjRtO7t/JH29U=
|
||||||
google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
||||||
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
|
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
|
||||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||||
|
|
|
@ -37,10 +37,14 @@ import (
|
||||||
type MockOsFs struct {
|
type MockOsFs struct {
|
||||||
vfs.Fs
|
vfs.Fs
|
||||||
hasVirtualFolders bool
|
hasVirtualFolders bool
|
||||||
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns the name for the Fs implementation
|
// Name returns the name for the Fs implementation
|
||||||
func (fs *MockOsFs) Name() string {
|
func (fs *MockOsFs) Name() string {
|
||||||
|
if fs.name != "" {
|
||||||
|
return fs.name
|
||||||
|
}
|
||||||
return "mockOsFs"
|
return "mockOsFs"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,9 +61,10 @@ func (fs *MockOsFs) Chtimes(name string, atime, mtime time.Time, isUploading boo
|
||||||
return vfs.ErrVfsUnsupported
|
return vfs.ErrVfsUnsupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockOsFs(hasVirtualFolders bool, connectionID, rootDir string) vfs.Fs {
|
func newMockOsFs(hasVirtualFolders bool, connectionID, rootDir, name string) vfs.Fs {
|
||||||
return &MockOsFs{
|
return &MockOsFs{
|
||||||
Fs: vfs.NewOsFs(connectionID, rootDir, ""),
|
Fs: vfs.NewOsFs(connectionID, rootDir, ""),
|
||||||
|
name: name,
|
||||||
hasVirtualFolders: hasVirtualFolders,
|
hasVirtualFolders: hasVirtualFolders,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,7 +113,7 @@ func TestSetStatMode(t *testing.T) {
|
||||||
}
|
}
|
||||||
user.Permissions = make(map[string][]string)
|
user.Permissions = make(map[string][]string)
|
||||||
user.Permissions["/"] = []string{dataprovider.PermAny}
|
user.Permissions["/"] = []string{dataprovider.PermAny}
|
||||||
fs := newMockOsFs(true, "", user.GetHomeDir())
|
fs := newMockOsFs(true, "", user.GetHomeDir(), "")
|
||||||
conn := NewBaseConnection("", ProtocolWebDAV, "", "", user)
|
conn := NewBaseConnection("", ProtocolWebDAV, "", "", user)
|
||||||
err := conn.handleChmod(fs, fakePath, fakePath, nil)
|
err := conn.handleChmod(fs, fakePath, fakePath, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
@ -429,7 +434,7 @@ func TestMaxWriteSize(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(90), size)
|
assert.Equal(t, int64(90), size)
|
||||||
|
|
||||||
fs = newMockOsFs(true, fs.ConnectionID(), user.GetHomeDir())
|
fs = newMockOsFs(true, fs.ConnectionID(), user.GetHomeDir(), "")
|
||||||
size, err = conn.GetMaxWriteSize(quotaResult, true, 100, fs.IsUploadResumeSupported())
|
size, err = conn.GetMaxWriteSize(quotaResult, true, 100, fs.IsUploadResumeSupported())
|
||||||
assert.EqualError(t, err, ErrOpUnsupported.Error())
|
assert.EqualError(t, err, ErrOpUnsupported.Error())
|
||||||
assert.Equal(t, int64(0), size)
|
assert.Equal(t, int64(0), size)
|
||||||
|
|
|
@ -479,6 +479,71 @@ func TestSetStat(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCryptFsUserUploadErrorOverwrite(t *testing.T) {
|
||||||
|
u := getCryptFsUser()
|
||||||
|
u.QuotaSize = 6000
|
||||||
|
user, _, err := httpdtest.AddUser(u, http.StatusCreated)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
var buf []byte
|
||||||
|
for i := 0; i < 4000; i++ {
|
||||||
|
buf = append(buf, []byte("a")...)
|
||||||
|
}
|
||||||
|
bufSize := int64(len(buf))
|
||||||
|
reader := bytes.NewReader(buf)
|
||||||
|
conn, client, err := getSftpClient(user)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
defer conn.Close()
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
f, err := client.Create(testFileName + "_big")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err := io.Copy(f, reader)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, bufSize, n)
|
||||||
|
err = f.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
encryptedSize, err := getEncryptedFileSize(bufSize)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
expectedSize := encryptedSize
|
||||||
|
user, _, err = httpdtest.GetUserByUsername(user.Username, http.StatusOK)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, user.UsedQuotaFiles)
|
||||||
|
assert.Equal(t, expectedSize, user.UsedQuotaSize)
|
||||||
|
// now write a small file
|
||||||
|
f, err = client.Create(testFileName)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
_, err = f.Write(testFileContent)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = f.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
encryptedSize, err = getEncryptedFileSize(int64(len(testFileContent)))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
user, _, err = httpdtest.GetUserByUsername(user.Username, http.StatusOK)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, user.UsedQuotaFiles)
|
||||||
|
assert.Equal(t, expectedSize+encryptedSize, user.UsedQuotaSize)
|
||||||
|
// try to overwrite this file with a big one, this cause an overquota error
|
||||||
|
// the partial file is deleted and the quota updated
|
||||||
|
_, err = reader.Seek(0, io.SeekStart)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
f, err = client.Create(testFileName)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
_, err = io.Copy(f, reader)
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = f.Close()
|
||||||
|
assert.Error(t, err)
|
||||||
|
user, _, err = httpdtest.GetUserByUsername(user.Username, http.StatusOK)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, user.UsedQuotaFiles)
|
||||||
|
assert.Equal(t, expectedSize, user.UsedQuotaSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = httpdtest.RemoveUser(user, http.StatusOK)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = os.RemoveAll(user.GetHomeDir())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestChtimesOpenHandle(t *testing.T) {
|
func TestChtimesOpenHandle(t *testing.T) {
|
||||||
localUser, _, err := httpdtest.AddUser(getTestUser(), http.StatusCreated)
|
localUser, _, err := httpdtest.AddUser(getTestUser(), http.StatusCreated)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
|
@ -306,19 +306,26 @@ func (t *BaseTransfer) TransferError(err error) {
|
||||||
t.BytesReceived.Load(), elapsed)
|
t.BytesReceived.Load(), elapsed)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTransfer) getUploadFileSize() (int64, error) {
|
func (t *BaseTransfer) getUploadFileSize() (int64, int, error) {
|
||||||
var fileSize int64
|
var fileSize int64
|
||||||
|
var deletedFiles int
|
||||||
|
|
||||||
info, err := t.Fs.Stat(t.fsPath)
|
info, err := t.Fs.Stat(t.fsPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fileSize = info.Size()
|
fileSize = info.Size()
|
||||||
}
|
}
|
||||||
if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
|
if t.ErrTransfer != nil && vfs.IsCryptOsFs(t.Fs) {
|
||||||
errDelete := t.Fs.Remove(t.fsPath, false)
|
errDelete := t.Fs.Remove(t.fsPath, false)
|
||||||
if errDelete != nil {
|
if errDelete != nil {
|
||||||
t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
|
t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
|
||||||
|
} else {
|
||||||
|
fileSize = 0
|
||||||
|
deletedFiles = 1
|
||||||
|
t.BytesReceived.Store(0)
|
||||||
|
t.MinWriteOffset = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fileSize, err
|
return fileSize, deletedFiles, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// return 1 if the file is outside the user home dir
|
// return 1 if the file is outside the user home dir
|
||||||
|
@ -347,10 +354,7 @@ func (t *BaseTransfer) Close() error {
|
||||||
defer t.Connection.RemoveTransfer(t)
|
defer t.Connection.RemoveTransfer(t)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
numFiles := 0
|
numFiles := t.getUploadedFiles()
|
||||||
if t.isNewFile {
|
|
||||||
numFiles = 1
|
|
||||||
}
|
|
||||||
metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
|
metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
|
||||||
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
|
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
|
||||||
if t.transferQuota.HasSizeLimits() {
|
if t.transferQuota.HasSizeLimits() {
|
||||||
|
@ -361,7 +365,6 @@ func (t *BaseTransfer) Close() error {
|
||||||
// if quota is exceeded we try to remove the partial file for uploads to local filesystem
|
// if quota is exceeded we try to remove the partial file for uploads to local filesystem
|
||||||
err = t.Fs.Remove(t.File.Name(), false)
|
err = t.Fs.Remove(t.File.Name(), false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
numFiles--
|
|
||||||
t.BytesReceived.Store(0)
|
t.BytesReceived.Store(0)
|
||||||
t.MinWriteOffset = 0
|
t.MinWriteOffset = 0
|
||||||
}
|
}
|
||||||
|
@ -373,13 +376,12 @@ func (t *BaseTransfer) Close() error {
|
||||||
t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
|
t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
|
||||||
t.effectiveFsPath, t.fsPath, err)
|
t.effectiveFsPath, t.fsPath, err)
|
||||||
// the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
|
// the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
|
||||||
numFiles -= t.checkUploadOutsideHomeDir(err)
|
t.checkUploadOutsideHomeDir(err)
|
||||||
} else {
|
} else {
|
||||||
err = t.Fs.Remove(t.effectiveFsPath, false)
|
err = t.Fs.Remove(t.effectiveFsPath, false)
|
||||||
t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, deletion error: %v",
|
t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, deletion error: %v",
|
||||||
t.ErrTransfer, t.effectiveFsPath, err)
|
t.ErrTransfer, t.effectiveFsPath, err)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
numFiles--
|
|
||||||
t.BytesReceived.Store(0)
|
t.BytesReceived.Store(0)
|
||||||
t.MinWriteOffset = 0
|
t.MinWriteOffset = 0
|
||||||
}
|
}
|
||||||
|
@ -393,11 +395,19 @@ func (t *BaseTransfer) Close() error {
|
||||||
ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
|
ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
|
||||||
t.BytesSent.Load(), t.ErrTransfer)
|
t.BytesSent.Load(), t.ErrTransfer)
|
||||||
} else {
|
} else {
|
||||||
uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
|
statSize, deletedFiles, errStat := t.getUploadFileSize()
|
||||||
if statSize, errStat := t.getUploadFileSize(); errStat == nil {
|
if errStat == nil {
|
||||||
uploadFileSize = statSize
|
uploadFileSize = statSize
|
||||||
|
} else {
|
||||||
|
uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
|
||||||
|
if t.Fs.IsNotExist(errStat) {
|
||||||
|
uploadFileSize = 0
|
||||||
|
numFiles--
|
||||||
|
}
|
||||||
}
|
}
|
||||||
t.Connection.Log(logger.LevelDebug, "upload file size %v", uploadFileSize)
|
numFiles -= deletedFiles
|
||||||
|
t.Connection.Log(logger.LevelDebug, "upload file size %d, num files %d, deleted files %d, fs path %q",
|
||||||
|
uploadFileSize, numFiles, deletedFiles, t.fsPath)
|
||||||
numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize)
|
numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize)
|
||||||
t.updateQuota(numFiles, uploadFileSize)
|
t.updateQuota(numFiles, uploadFileSize)
|
||||||
t.updateTimes()
|
t.updateTimes()
|
||||||
|
@ -458,6 +468,14 @@ func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize int64) (int, int
|
||||||
return numFiles, fileSize
|
return numFiles, fileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *BaseTransfer) getUploadedFiles() int {
|
||||||
|
numFiles := 0
|
||||||
|
if t.isNewFile {
|
||||||
|
numFiles = 1
|
||||||
|
}
|
||||||
|
return numFiles
|
||||||
|
}
|
||||||
|
|
||||||
func (t *BaseTransfer) updateTimes() {
|
func (t *BaseTransfer) updateTimes() {
|
||||||
if !t.aTime.IsZero() && !t.mTime.IsZero() {
|
if !t.aTime.IsZero() && !t.mTime.IsZero() {
|
||||||
err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
|
err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
|
||||||
|
@ -467,12 +485,12 @@ func (t *BaseTransfer) updateTimes() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
|
func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
|
||||||
// S3 uploads are atomic, if there is an error nothing is uploaded
|
// Uploads on some filesystem (S3 and similar) are atomic, if there is an error nothing is uploaded
|
||||||
if t.File == nil && t.ErrTransfer != nil && !t.Connection.User.HasBufferedSFTP(t.GetVirtualPath()) {
|
if t.File == nil && t.ErrTransfer != nil && vfs.HasImplicitAtomicUploads(t.Fs) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
sizeDiff := fileSize - t.InitialSize
|
sizeDiff := fileSize - t.InitialSize
|
||||||
if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
|
if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff != 0) {
|
||||||
vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
|
vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
|
dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
|
||||||
|
|
|
@ -40,7 +40,6 @@ func TestTransferUpdateQuota(t *testing.T) {
|
||||||
transfer.BytesReceived.Store(123)
|
transfer.BytesReceived.Store(123)
|
||||||
errFake := errors.New("fake error")
|
errFake := errors.New("fake error")
|
||||||
transfer.TransferError(errFake)
|
transfer.TransferError(errFake)
|
||||||
assert.False(t, transfer.updateQuota(1, 0))
|
|
||||||
err := transfer.Close()
|
err := transfer.Close()
|
||||||
if assert.Error(t, err) {
|
if assert.Error(t, err) {
|
||||||
assert.EqualError(t, err, errFake.Error())
|
assert.EqualError(t, err, errFake.Error())
|
||||||
|
@ -61,6 +60,10 @@ func TestTransferUpdateQuota(t *testing.T) {
|
||||||
assert.True(t, transfer.updateQuota(1, 0))
|
assert.True(t, transfer.updateQuota(1, 0))
|
||||||
err = transfer.Close()
|
err = transfer.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
transfer.ErrTransfer = errFake
|
||||||
|
transfer.Fs = newMockOsFs(true, "", "", "S3Fs fake")
|
||||||
|
assert.False(t, transfer.updateQuota(1, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTransferThrottling(t *testing.T) {
|
func TestTransferThrottling(t *testing.T) {
|
||||||
|
@ -297,13 +300,14 @@ func TestRemovePartialCryptoFile(t *testing.T) {
|
||||||
transfer := NewBaseTransfer(nil, conn, nil, testFile, testFile, "/transfer_test_file", TransferUpload,
|
transfer := NewBaseTransfer(nil, conn, nil, testFile, testFile, "/transfer_test_file", TransferUpload,
|
||||||
0, 0, 0, 0, true, fs, dataprovider.TransferQuota{})
|
0, 0, 0, 0, true, fs, dataprovider.TransferQuota{})
|
||||||
transfer.ErrTransfer = errors.New("test error")
|
transfer.ErrTransfer = errors.New("test error")
|
||||||
_, err = transfer.getUploadFileSize()
|
_, _, err = transfer.getUploadFileSize()
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
err = os.WriteFile(testFile, []byte("test data"), os.ModePerm)
|
err = os.WriteFile(testFile, []byte("test data"), os.ModePerm)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
size, err := transfer.getUploadFileSize()
|
size, deletedFiles, err := transfer.getUploadFileSize()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(9), size)
|
assert.Equal(t, int64(0), size)
|
||||||
|
assert.Equal(t, 1, deletedFiles)
|
||||||
assert.NoFileExists(t, testFile)
|
assert.NoFileExists(t, testFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -496,15 +496,6 @@ func (u *User) GetPermissionsForPath(p string) []string {
|
||||||
return permissions
|
return permissions
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBufferedSFTP returns true if the user has a SFTP filesystem with buffering enabled
|
|
||||||
func (u *User) HasBufferedSFTP(name string) bool {
|
|
||||||
fs := u.GetFsConfigForPath(name)
|
|
||||||
if fs.Provider == sdk.SFTPFilesystemProvider {
|
|
||||||
return fs.SFTPConfig.BufferSize > 0
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *User) getForbiddenSFTPSelfUsers(username string) ([]string, error) {
|
func (u *User) getForbiddenSFTPSelfUsers(username string) ([]string, error) {
|
||||||
sftpUser, err := UserExists(username)
|
sftpUser, err := UserExists(username)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -54,6 +54,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
azureDefaultEndpoint = "blob.core.windows.net"
|
azureDefaultEndpoint = "blob.core.windows.net"
|
||||||
|
azBlobFsName = "AzureBlobFs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AzureBlobFs is a Fs implementation for Azure Blob storage.
|
// AzureBlobFs is a Fs implementation for Azure Blob storage.
|
||||||
|
@ -158,9 +159,9 @@ func (fs *AzureBlobFs) initFromSASURL() (Fs, error) {
|
||||||
// Name returns the name for the Fs implementation
|
// Name returns the name for the Fs implementation
|
||||||
func (fs *AzureBlobFs) Name() string {
|
func (fs *AzureBlobFs) Name() string {
|
||||||
if !fs.config.SASURL.IsEmpty() {
|
if !fs.config.SASURL.IsEmpty() {
|
||||||
return fmt.Sprintf("Azure Blob with SAS URL, container %#v", fs.config.Container)
|
return fmt.Sprintf("%s with SAS URL, container %q", azBlobFsName, fs.config.Container)
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("Azure Blob container %#v", fs.config.Container)
|
return fmt.Sprintf("%s container %q", azBlobFsName, fs.config.Container)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectionID returns the connection ID associated to this Fs implementation
|
// ConnectionID returns the connection ID associated to this Fs implementation
|
||||||
|
|
|
@ -45,6 +45,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultGCSPageSize = 5000
|
defaultGCSPageSize = 5000
|
||||||
|
gcsfsName = "GCSFs"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -104,7 +105,7 @@ func NewGCSFs(connectionID, localTempDir, mountPath string, config GCSFsConfig)
|
||||||
|
|
||||||
// Name returns the name for the Fs implementation
|
// Name returns the name for the Fs implementation
|
||||||
func (fs *GCSFs) Name() string {
|
func (fs *GCSFs) Name() string {
|
||||||
return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket)
|
return fmt.Sprintf("%s bucket %q", gcsfsName, fs.config.Bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectionID returns the connection ID associated to this Fs implementation
|
// ConnectionID returns the connection ID associated to this Fs implementation
|
||||||
|
|
|
@ -57,6 +57,7 @@ const (
|
||||||
// using this mime type for directories improves compatibility with s3fs-fuse
|
// using this mime type for directories improves compatibility with s3fs-fuse
|
||||||
s3DirMimeType = "application/x-directory"
|
s3DirMimeType = "application/x-directory"
|
||||||
s3TransferBufferSize = 256 * 1024
|
s3TransferBufferSize = 256 * 1024
|
||||||
|
s3fsName = "S3Fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3Fs is a Fs implementation for AWS S3 compatible object storages
|
// S3Fs is a Fs implementation for AWS S3 compatible object storages
|
||||||
|
@ -139,7 +140,7 @@ func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig)
|
||||||
|
|
||||||
// Name returns the name for the Fs implementation
|
// Name returns the name for the Fs implementation
|
||||||
func (fs *S3Fs) Name() string {
|
func (fs *S3Fs) Name() string {
|
||||||
return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
|
return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectionID returns the connection ID associated to this Fs implementation
|
// ConnectionID returns the connection ID associated to this Fs implementation
|
||||||
|
|
|
@ -746,6 +746,20 @@ func HasTruncateSupport(fs Fs) bool {
|
||||||
return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
|
return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
|
||||||
|
func HasImplicitAtomicUploads(fs Fs) bool {
|
||||||
|
if strings.HasPrefix(fs.Name(), s3fsName) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(fs.Name(), gcsfsName) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(fs.Name(), azBlobFsName) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// HasOpenRWSupport returns true if the fs can open a file
|
// HasOpenRWSupport returns true if the fs can open a file
|
||||||
// for reading and writing at the same time
|
// for reading and writing at the same time
|
||||||
func HasOpenRWSupport(fs Fs) bool {
|
func HasOpenRWSupport(fs Fs) bool {
|
||||||
|
|
Loading…
Reference in a new issue