fix SeaweedFS rename compatibility
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
parent
0a8edcd811
commit
a66d207291
1 changed files with 47 additions and 53 deletions
|
@ -295,63 +295,53 @@ func (fs *S3Fs) Rename(source, target string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
copySource := fs.Join(fs.config.Bucket, source)
|
||||
if fi.IsDir() {
|
||||
hasContents, err := fs.hasContents(source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasContents {
|
||||
return fmt.Errorf("cannot rename non empty directory: %#v", source)
|
||||
return fmt.Errorf("cannot rename non empty directory: %q", source)
|
||||
}
|
||||
if !strings.HasSuffix(copySource, "/") {
|
||||
copySource += "/"
|
||||
if err := fs.mkdirInternal(target); err != nil {
|
||||
return err
|
||||
}
|
||||
if !strings.HasSuffix(target, "/") {
|
||||
target += "/"
|
||||
}
|
||||
}
|
||||
var contentType string
|
||||
if fi.IsDir() {
|
||||
contentType = s3DirMimeType
|
||||
} else {
|
||||
contentType = mime.TypeByExtension(path.Ext(source))
|
||||
}
|
||||
copySource = pathEscape(copySource)
|
||||
contentType := mime.TypeByExtension(path.Ext(source))
|
||||
copySource := pathEscape(fs.Join(fs.config.Bucket, source))
|
||||
|
||||
if fi.Size() > 500*1024*1024 {
|
||||
fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
|
||||
source, fi.Size())
|
||||
err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
|
||||
} else {
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
if fi.Size() > 500*1024*1024 {
|
||||
fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
|
||||
source, fi.Size())
|
||||
err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
|
||||
} else {
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
_, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
CopySource: aws.String(copySource),
|
||||
Key: aws.String(target),
|
||||
StorageClass: types.StorageClass(fs.config.StorageClass),
|
||||
ACL: types.ObjectCannedACL(fs.config.ACL),
|
||||
ContentType: util.NilIfEmpty(contentType),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
_, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
CopySource: aws.String(copySource),
|
||||
Key: aws.String(target),
|
||||
StorageClass: types.StorageClass(fs.config.StorageClass),
|
||||
ACL: types.ObjectCannedACL(fs.config.ACL),
|
||||
ContentType: util.NilIfEmpty(contentType),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
metric.S3CopyObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
waiter := s3.NewObjectExistsWaiter(fs.svc)
|
||||
err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(target),
|
||||
}, 10*time.Second)
|
||||
metric.S3CopyObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
waiter := s3.NewObjectExistsWaiter(fs.svc)
|
||||
err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(target),
|
||||
}, 10*time.Second)
|
||||
metric.S3CopyObjectCompleted(err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if plugin.Handler.HasMetadater() {
|
||||
if !fi.IsDir() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if plugin.Handler.HasMetadater() {
|
||||
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
|
||||
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
|
||||
if err != nil {
|
||||
|
@ -399,14 +389,7 @@ func (fs *S3Fs) Mkdir(name string) error {
|
|||
if !fs.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
if !strings.HasSuffix(name, "/") {
|
||||
name += "/"
|
||||
}
|
||||
_, w, _, err := fs.Create(name, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return w.Close()
|
||||
return fs.mkdirInternal(name)
|
||||
}
|
||||
|
||||
// Symlink creates source as a symbolic link to target.
|
||||
|
@ -784,6 +767,17 @@ func (fs *S3Fs) setConfigDefaults() {
|
|||
}
|
||||
}
|
||||
|
||||
func (fs *S3Fs) mkdirInternal(name string) error {
|
||||
if !strings.HasSuffix(name, "/") {
|
||||
name += "/"
|
||||
}
|
||||
_, w, _, err := fs.Create(name, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return w.Close()
|
||||
}
|
||||
|
||||
func (fs *S3Fs) hasContents(name string) (bool, error) {
|
||||
prefix := fs.getPrefix(name)
|
||||
paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
|
||||
|
|
Loading…
Reference in a new issue