瀏覽代碼

gcsfs: allow to customize upload part size/time

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
Nicola Murino 2 年之前
父節點
當前提交
0296e0cafa

+ 1 - 1
go.mod

@@ -51,7 +51,7 @@ require (
 	github.com/rs/cors v1.8.3-0.20220619195839-da52b0701de5
 	github.com/rs/xid v1.4.0
 	github.com/rs/zerolog v1.28.0
-	github.com/sftpgo/sdk v0.1.3-0.20221211151321-578e45601b27
+	github.com/sftpgo/sdk v0.1.3-0.20221217110036-383c1bb50fa0
 	github.com/shirou/gopsutil/v3 v3.22.11
 	github.com/spf13/afero v1.9.3
 	github.com/spf13/cobra v1.6.1

+ 2 - 2
go.sum

@@ -1452,8 +1452,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
 github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
 github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
 github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJxNzj3QBOf7dZwupeVC+mG1Lo=
-github.com/sftpgo/sdk v0.1.3-0.20221211151321-578e45601b27 h1:DjNme+rcw3zaiEkWyyrtimqDZd/83GW4qZhUghBkyrI=
-github.com/sftpgo/sdk v0.1.3-0.20221211151321-578e45601b27/go.mod h1:3GpW3Qy8IHH6kex0ny+Y6ayeYb9OJxz8Pxh3IZgAs2E=
+github.com/sftpgo/sdk v0.1.3-0.20221217110036-383c1bb50fa0 h1:e1OQroqX8SWV06Z270CxG2/v//Wx1026iXKTDRn5J1E=
+github.com/sftpgo/sdk v0.1.3-0.20221217110036-383c1bb50fa0/go.mod h1:3GpW3Qy8IHH6kex0ny+Y6ayeYb9OJxz8Pxh3IZgAs2E=
 github.com/shirou/gopsutil/v3 v3.22.11 h1:kxsPKS+Eeo+VnEQ2XCaGJepeP6KY53QoRTETx3+1ndM=
 github.com/shirou/gopsutil/v3 v3.22.11/go.mod h1:xl0EeL4vXJ+hQMAGN8B9VFpxukEMA0XdevQOe5MZ1oY=
 github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=

+ 1 - 1
internal/common/protocol_test.go

@@ -5870,7 +5870,7 @@ func TestEventRulePasswordExpiration(t *testing.T) {
 		email := lastReceivedEmail.get()
 		assert.Len(t, email.To, 1)
 		assert.Contains(t, email.To, user.Email)
-		assert.Contains(t, email.Data, "Your SFTPGo password expires in 5 days")
+		assert.Contains(t, email.Data, "your SFTPGo password expires in 5 days")
 		err = client.RemoveDirectory(dirName)
 		assert.NoError(t, err)
 	}

+ 10 - 0
internal/httpd/httpd_test.go

@@ -5015,6 +5015,8 @@ func TestUserHiddenFields(t *testing.T) {
 	u2.FsConfig.GCSConfig.Bucket = "test"
 	u2.FsConfig.GCSConfig.Credentials = kms.NewPlainSecret("fake credentials")
 	u2.FsConfig.GCSConfig.ACL = "bucketOwnerRead"
+	u2.FsConfig.GCSConfig.UploadPartSize = 5
+	u2.FsConfig.GCSConfig.UploadPartMaxTime = 20
 	user2, _, err := httpdtest.AddUser(u2, http.StatusCreated)
 	assert.NoError(t, err)
 
@@ -11719,6 +11721,8 @@ func TestLoginInvalidFs(t *testing.T) {
 	u.Filters.AllowAPIKeyAuth = true
 	u.FsConfig.Provider = sdk.GCSFilesystemProvider
 	u.FsConfig.GCSConfig.Bucket = "test"
+	u.FsConfig.GCSConfig.UploadPartSize = 1
+	u.FsConfig.GCSConfig.UploadPartMaxTime = 10
 	u.FsConfig.GCSConfig.Credentials = kms.NewPlainSecret("invalid JSON for credentials")
 	user, _, err := httpdtest.AddUser(u, http.StatusCreated)
 	assert.NoError(t, err)
@@ -19226,6 +19230,8 @@ func TestWebUserGCSMock(t *testing.T) {
 	user.FsConfig.GCSConfig.KeyPrefix = "somedir/subdir/"
 	user.FsConfig.GCSConfig.StorageClass = "standard"
 	user.FsConfig.GCSConfig.ACL = "publicReadWrite"
+	user.FsConfig.GCSConfig.UploadPartSize = 16
+	user.FsConfig.GCSConfig.UploadPartMaxTime = 32
 	form := make(url.Values)
 	form.Set(csrfFormToken, csrfToken)
 	form.Set("username", user.Username)
@@ -19252,6 +19258,8 @@ func TestWebUserGCSMock(t *testing.T) {
 	form.Set("gcs_storage_class", user.FsConfig.GCSConfig.StorageClass)
 	form.Set("gcs_acl", user.FsConfig.GCSConfig.ACL)
 	form.Set("gcs_key_prefix", user.FsConfig.GCSConfig.KeyPrefix)
+	form.Set("gcs_upload_part_size", strconv.FormatInt(user.FsConfig.GCSConfig.UploadPartSize, 10))
+	form.Set("gcs_upload_part_max_time", strconv.FormatInt(int64(user.FsConfig.GCSConfig.UploadPartMaxTime), 10))
 	form.Set("pattern_path0", "/dir1")
 	form.Set("patterns0", "*.jpg,*.png")
 	form.Set("pattern_type0", "allowed")
@@ -19292,6 +19300,8 @@ func TestWebUserGCSMock(t *testing.T) {
 	assert.Equal(t, user.FsConfig.GCSConfig.StorageClass, updateUser.FsConfig.GCSConfig.StorageClass)
 	assert.Equal(t, user.FsConfig.GCSConfig.ACL, updateUser.FsConfig.GCSConfig.ACL)
 	assert.Equal(t, user.FsConfig.GCSConfig.KeyPrefix, updateUser.FsConfig.GCSConfig.KeyPrefix)
+	assert.Equal(t, user.FsConfig.GCSConfig.UploadPartSize, updateUser.FsConfig.GCSConfig.UploadPartSize)
+	assert.Equal(t, user.FsConfig.GCSConfig.UploadPartMaxTime, updateUser.FsConfig.GCSConfig.UploadPartMaxTime)
 	if assert.Len(t, updateUser.Filters.FilePatterns, 1) {
 		assert.Equal(t, "/dir1", updateUser.Filters.FilePatterns[0].Path)
 		assert.Len(t, updateUser.Filters.FilePatterns[0].AllowedPatterns, 2)

+ 8 - 0
internal/httpd/webadmin.go

@@ -1513,6 +1513,14 @@ func getGCSConfig(r *http.Request) (vfs.GCSFsConfig, error) {
 	config.StorageClass = strings.TrimSpace(r.Form.Get("gcs_storage_class"))
 	config.ACL = strings.TrimSpace(r.Form.Get("gcs_acl"))
 	config.KeyPrefix = r.Form.Get("gcs_key_prefix")
+	uploadPartSize, err := strconv.ParseInt(r.Form.Get("gcs_upload_part_size"), 10, 64)
+	if err == nil {
+		config.UploadPartSize = uploadPartSize
+	}
+	uploadPartMaxTime, err := strconv.Atoi(r.Form.Get("gcs_upload_part_max_time"))
+	if err == nil {
+		config.UploadPartMaxTime = uploadPartMaxTime
+	}
 	autoCredentials := r.Form.Get("gcs_auto_credentials")
 	if autoCredentials != "" {
 		config.AutomaticCredentials = 1

+ 7 - 0
internal/httpdtest/httpdtest.go

@@ -2037,6 +2037,13 @@ func compareGCSConfig(expected *vfs.Filesystem, actual *vfs.Filesystem) error {
 	if expected.GCSConfig.AutomaticCredentials != actual.GCSConfig.AutomaticCredentials {
 		return errors.New("GCS automatic credentials mismatch")
 	}
+	if expected.GCSConfig.UploadPartSize != actual.GCSConfig.UploadPartSize {
+		return errors.New("GCS upload part size mismatch")
+	}
+	if expected.GCSConfig.UploadPartMaxTime != actual.GCSConfig.UploadPartMaxTime {
+		fmt.Printf("aaaaaaaaaa %v, %v", expected.GCSConfig.UploadPartMaxTime, actual.GCSConfig.UploadPartMaxTime)
+		return errors.New("GCS upload part max time mismatch")
+	}
 	return nil
 }
 

+ 2 - 0
internal/vfs/filesystem.go

@@ -308,6 +308,8 @@ func (f *Filesystem) GetACopy() Filesystem {
 				StorageClass:         f.GCSConfig.StorageClass,
 				ACL:                  f.GCSConfig.ACL,
 				KeyPrefix:            f.GCSConfig.KeyPrefix,
+				UploadPartSize:       f.GCSConfig.UploadPartSize,
+				UploadPartMaxTime:    f.GCSConfig.UploadPartMaxTime,
 			},
 			Credentials: f.GCSConfig.Credentials.Clone(),
 		},

+ 41 - 3
internal/vfs/gcsfs.go

@@ -170,8 +170,27 @@ func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error
 	p := NewPipeWriter(w)
 	bkt := fs.svc.Bucket(fs.config.Bucket)
 	obj := bkt.Object(name)
+	if flag == -1 {
+		obj = obj.If(storage.Conditions{DoesNotExist: true})
+	} else {
+		attrs, statErr := fs.headObject(name)
+		if statErr == nil {
+			obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
+		} else if fs.IsNotExist(statErr) {
+			obj = obj.If(storage.Conditions{DoesNotExist: true})
+		} else {
+			fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr)
+		}
+	}
+
 	ctx, cancelFn := context.WithCancel(context.Background())
 	objectWriter := obj.NewWriter(ctx)
+	if fs.config.UploadPartSize > 0 {
+		objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024
+	}
+	if fs.config.UploadPartMaxTime > 0 {
+		objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second
+	}
 	var contentType string
 	if flag == -1 {
 		contentType = dirMimeType
@@ -231,7 +250,17 @@ func (fs *GCSFs) Rename(source, target string) error {
 	} else {
 		src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName)
 		dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
-		ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
+		attrs, statErr := fs.headObject(target)
+		if statErr == nil {
+			dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
+		} else if fs.IsNotExist(statErr) {
+			dst = dst.If(storage.Conditions{DoesNotExist: true})
+		} else {
+			fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v",
+				target, statErr)
+		}
+
+		ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
 		defer cancelFn()
 
 		copier := dst.CopierFrom(src)
@@ -276,11 +305,20 @@ func (fs *GCSFs) Remove(name string, isDir bool) error {
 			name += "/"
 		}
 	}
+	obj := fs.svc.Bucket(fs.config.Bucket).Object(name)
+	attrs, statErr := fs.headObject(name)
+	if statErr == nil {
+		obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
+	} else {
+		fsLog(fs, logger.LevelWarn, "unable to set precondition for deleting %q, stat err: %v",
+			name, statErr)
+	}
+
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 
-	err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
-	if fs.IsNotExist(err) && isDir {
+	err := obj.Delete(ctx)
+	if isDir && fs.IsNotExist(err) {
 		// we can have directories without a trailing "/" (created using v2.1.0 and before)
 		err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx)
 	}

+ 12 - 0
internal/vfs/vfs.go

@@ -378,6 +378,12 @@ func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
 	if c.ACL != other.ACL {
 		return false
 	}
+	if c.UploadPartSize != other.UploadPartSize {
+		return false
+	}
+	if c.UploadPartMaxTime != other.UploadPartMaxTime {
+		return false
+	}
 	if c.Credentials == nil {
 		c.Credentials = kms.NewEmptySecret()
 	}
@@ -416,6 +422,12 @@ func (c *GCSFsConfig) validate() error {
 	}
 	c.StorageClass = strings.TrimSpace(c.StorageClass)
 	c.ACL = strings.TrimSpace(c.ACL)
+	if c.UploadPartSize < 0 {
+		c.UploadPartSize = 0
+	}
+	if c.UploadPartMaxTime < 0 {
+		c.UploadPartMaxTime = 0
+	}
 	return nil
 }
 

+ 6 - 0
openapi/openapi.yaml

@@ -5107,6 +5107,12 @@ components:
           type: string
           description: 'key_prefix is similar to a chroot directory for a local filesystem. If specified the user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole bucket contents will be available'
           example: folder/subfolder/
+        upload_part_size:
+          type: integer
+          description: 'The buffer size (in MB) to use for multipart uploads. The default value is 16MB. 0 means use the default'
+        upload_part_max_time:
+          type: integer
+          description: 'The maximum time allowed, in seconds, to upload a single chunk. The default value is 32. 0 means use the default'
       description: 'Google Cloud Storage configuration details. The "credentials" field must be populated only when adding/updating a user. It will be always omitted, since there are sensitive data, when you search/get users'
     AzureBlobFsConfig:
       type: object

+ 1 - 1
templates/email/password-expiration.html

@@ -15,5 +15,5 @@ along with this program.  If not, see <https://www.gnu.org/licenses/>.
 -->
 Hi {{.Username}},
 <br>
-<p>Your SFTPGo password {{if le .Days 0}}has expired{{else}}expires in {{.Days}} {{if eq .Days 1}}day{{else}}days{{end}}{{end}}.</p>
+<p>your SFTPGo password {{if le .Days 0}}has expired{{else}}expires in {{.Days}} {{if eq .Days 1}}day{{else}}days{{end}}{{end}}.</p>
 <p>Please login to the WebClient and set a new password.</p>

+ 21 - 0
templates/webadmin/fsconfig.html

@@ -237,6 +237,27 @@ along with this program.  If not, see <https://www.gnu.org/licenses/>.
             </div>
         </div>
 
+        <div class="form-group row fsconfig fsconfig-gcsfs">
+            <label for="idGCSUploadPartSize" class="col-sm-2 col-form-label">UL Part Size (MB)</label>
+            <div class="col-sm-3">
+                <input type="number" class="form-control" id="idGCSUploadPartSize" name="gcs_upload_part_size" placeholder=""
+                    value="{{.GCSConfig.UploadPartSize}}" aria-describedby="GCSPartSizeHelpBlock">
+                <small id="GCSPartSizeHelpBlock" class="form-text text-muted">
+                    The buffer size for multipart uploads. Zero means the default (16 MB)
+                </small>
+            </div>
+            <div class="col-sm-2"></div>
+            <label for="idGCSUploadTimeout" class="col-sm-2 col-form-label">UL Part Timeout (secs)</label>
+            <div class="col-sm-3">
+                <input type="number" class="form-control" id="idGCSUploadTimeout" name="gcs_upload_part_max_time"
+                    placeholder="" value="{{.GCSConfig.UploadPartMaxTime}}" min="0"
+                    aria-describedby="GCSUploadTimeoutHelpBlock">
+                <small id="GCSUploadTimeoutHelpBlock" class="form-text text-muted">
+                    Max time limit, in seconds, to upload a single part. 0 means the default (32 secs)
+                </small>
+            </div>
+        </div>
+
         <div class="form-group fsconfig fsconfig-gcsfs">
             <div class="form-check">
                 <input type="checkbox" class="form-check-input" id="idGCSAutoCredentials" name="gcs_auto_credentials"