fix SeaweedFS rename compatibility
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
parent
2290137868
commit
fc1ba36ae5
1 changed files with 47 additions and 53 deletions
|
@ -295,63 +295,53 @@ func (fs *S3Fs) Rename(source, target string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
copySource := fs.Join(fs.config.Bucket, source)
|
|
||||||
if fi.IsDir() {
|
if fi.IsDir() {
|
||||||
hasContents, err := fs.hasContents(source)
|
hasContents, err := fs.hasContents(source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if hasContents {
|
if hasContents {
|
||||||
return fmt.Errorf("cannot rename non empty directory: %#v", source)
|
return fmt.Errorf("cannot rename non empty directory: %q", source)
|
||||||
}
|
}
|
||||||
if !strings.HasSuffix(copySource, "/") {
|
if err := fs.mkdirInternal(target); err != nil {
|
||||||
copySource += "/"
|
return err
|
||||||
}
|
}
|
||||||
if !strings.HasSuffix(target, "/") {
|
|
||||||
target += "/"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var contentType string
|
|
||||||
if fi.IsDir() {
|
|
||||||
contentType = s3DirMimeType
|
|
||||||
} else {
|
} else {
|
||||||
contentType = mime.TypeByExtension(path.Ext(source))
|
contentType := mime.TypeByExtension(path.Ext(source))
|
||||||
}
|
copySource := pathEscape(fs.Join(fs.config.Bucket, source))
|
||||||
copySource = pathEscape(copySource)
|
|
||||||
|
|
||||||
if fi.Size() > 500*1024*1024 {
|
if fi.Size() > 500*1024*1024 {
|
||||||
fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
|
fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
|
||||||
source, fi.Size())
|
source, fi.Size())
|
||||||
err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
|
err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
|
||||||
} else {
|
} else {
|
||||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||||
defer cancelFn()
|
defer cancelFn()
|
||||||
|
|
||||||
_, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
|
_, err = fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
|
||||||
Bucket: aws.String(fs.config.Bucket),
|
Bucket: aws.String(fs.config.Bucket),
|
||||||
CopySource: aws.String(copySource),
|
CopySource: aws.String(copySource),
|
||||||
Key: aws.String(target),
|
Key: aws.String(target),
|
||||||
StorageClass: types.StorageClass(fs.config.StorageClass),
|
StorageClass: types.StorageClass(fs.config.StorageClass),
|
||||||
ACL: types.ObjectCannedACL(fs.config.ACL),
|
ACL: types.ObjectCannedACL(fs.config.ACL),
|
||||||
ContentType: util.NilIfEmpty(contentType),
|
ContentType: util.NilIfEmpty(contentType),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metric.S3CopyObjectCompleted(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
waiter := s3.NewObjectExistsWaiter(fs.svc)
|
||||||
|
err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
|
||||||
|
Bucket: aws.String(fs.config.Bucket),
|
||||||
|
Key: aws.String(target),
|
||||||
|
}, 10*time.Second)
|
||||||
metric.S3CopyObjectCompleted(err)
|
metric.S3CopyObjectCompleted(err)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
waiter := s3.NewObjectExistsWaiter(fs.svc)
|
if plugin.Handler.HasMetadater() {
|
||||||
err = waiter.Wait(context.Background(), &s3.HeadObjectInput{
|
|
||||||
Bucket: aws.String(fs.config.Bucket),
|
|
||||||
Key: aws.String(target),
|
|
||||||
}, 10*time.Second)
|
|
||||||
metric.S3CopyObjectCompleted(err)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if plugin.Handler.HasMetadater() {
|
|
||||||
if !fi.IsDir() {
|
|
||||||
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
|
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
|
||||||
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
|
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -399,14 +389,7 @@ func (fs *S3Fs) Mkdir(name string) error {
|
||||||
if !fs.IsNotExist(err) {
|
if !fs.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !strings.HasSuffix(name, "/") {
|
return fs.mkdirInternal(name)
|
||||||
name += "/"
|
|
||||||
}
|
|
||||||
_, w, _, err := fs.Create(name, -1)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return w.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Symlink creates source as a symbolic link to target.
|
// Symlink creates source as a symbolic link to target.
|
||||||
|
@ -784,6 +767,17 @@ func (fs *S3Fs) setConfigDefaults() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *S3Fs) mkdirInternal(name string) error {
|
||||||
|
if !strings.HasSuffix(name, "/") {
|
||||||
|
name += "/"
|
||||||
|
}
|
||||||
|
_, w, _, err := fs.Create(name, -1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return w.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *S3Fs) hasContents(name string) (bool, error) {
|
func (fs *S3Fs) hasContents(name string) (bool, error) {
|
||||||
prefix := fs.getPrefix(name)
|
prefix := fs.getPrefix(name)
|
||||||
paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
|
paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
|
||||||
|
|
Loading…
Reference in a new issue