From a66d20729162a107af564f377847f1163c0b4d11 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sun, 20 Nov 2022 13:06:58 +0100 Subject: [PATCH] fix SeaweedFS rename compatibility Signed-off-by: Nicola Murino --- internal/vfs/s3fs.go | 100 ++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 53 deletions(-) diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index 06975c78..294b56a5 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -295,63 +295,53 @@ func (fs *S3Fs) Rename(source, target string) error { if err != nil { return err } - copySource := fs.Join(fs.config.Bucket, source) if fi.IsDir() { hasContents, err := fs.hasContents(source) if err != nil { return err } if hasContents { - return fmt.Errorf("cannot rename non empty directory: %#v", source) + return fmt.Errorf("cannot rename non empty directory: %q", source) } - if !strings.HasSuffix(copySource, "/") { - copySource += "/" + if err := fs.mkdirInternal(target); err != nil { + return err } - if !strings.HasSuffix(target, "/") { - target += "/" - } - } - var contentType string - if fi.IsDir() { - contentType = s3DirMimeType } else { - contentType = mime.TypeByExtension(path.Ext(source)) - } - copySource = pathEscape(copySource) + contentType := mime.TypeByExtension(path.Ext(source)) + copySource := pathEscape(fs.Join(fs.config.Bucket, source)) - if fi.Size() > 500*1024*1024 { - fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy", - source, fi.Size()) - err = fs.doMultipartCopy(copySource, target, contentType, fi.Size()) - } else { - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) - defer cancelFn() + if fi.Size() > 500*1024*1024 { + fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy", + source, fi.Size()) + err = fs.doMultipartCopy(copySource, target, contentType, fi.Size()) + } else { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() - _, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{ - Bucket: aws.String(fs.config.Bucket), - CopySource: aws.String(copySource), - Key: aws.String(target), - StorageClass: types.StorageClass(fs.config.StorageClass), - ACL: types.ObjectCannedACL(fs.config.ACL), - ContentType: util.NilIfEmpty(contentType), - }) - } - if err != nil { + _, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(fs.config.Bucket), + CopySource: aws.String(copySource), + Key: aws.String(target), + StorageClass: types.StorageClass(fs.config.StorageClass), + ACL: types.ObjectCannedACL(fs.config.ACL), + ContentType: util.NilIfEmpty(contentType), + }) + } + if err != nil { + metric.S3CopyObjectCompleted(err) + return err + } + + waiter := s3.NewObjectExistsWaiter(fs.svc) + err = waiter.Wait(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(fs.config.Bucket), + Key: aws.String(target), + }, 10*time.Second) metric.S3CopyObjectCompleted(err) - return err - } - - waiter := s3.NewObjectExistsWaiter(fs.svc) - err = waiter.Wait(context.Background(), &s3.HeadObjectInput{ - Bucket: aws.String(fs.config.Bucket), - Key: aws.String(target), - }, 10*time.Second) - metric.S3CopyObjectCompleted(err) - if err != nil { - return err - } - if plugin.Handler.HasMetadater() { - if !fi.IsDir() { + if err != nil { + return err + } + if plugin.Handler.HasMetadater() { err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target), util.GetTimeAsMsSinceEpoch(fi.ModTime())) if err != nil { @@ -399,14 +389,7 @@ func (fs *S3Fs) Mkdir(name string) error { if !fs.IsNotExist(err) { return err } - if !strings.HasSuffix(name, "/") { - name += "/" - } - _, w, _, err := fs.Create(name, -1) - if err != nil { - return err - } - return w.Close() + return fs.mkdirInternal(name) } // Symlink creates source as a symbolic link to target. @@ -784,6 +767,17 @@ func (fs *S3Fs) setConfigDefaults() { } } +func (fs *S3Fs) mkdirInternal(name string) error { + if !strings.HasSuffix(name, "/") { + name += "/" + } + _, w, _, err := fs.Create(name, -1) + if err != nil { + return err + } + return w.Close() +} + func (fs *S3Fs) hasContents(name string) (bool, error) { prefix := fs.getPrefix(name) paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{