s3: don't paginate to find zero-byte-keyed dirs (#277)

Fixes #275
This commit is contained in:
Giorgio Pellero 2021-01-14 12:01:25 +01:00 committed by Nicola Murino
parent 5d4f758c47
commit d42fcc3786
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB

View file

@ -4,7 +4,6 @@ package vfs
import (
"context"
"errors"
"fmt"
"mime"
"net/url"
@ -122,6 +121,7 @@ func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
}
obj, err := fs.headObject(name)
if err == nil {
// a "dir" has a trailing "/" so we cannot have a directory here
objSize := *obj.ContentLength
objectModTime := *obj.LastModified
return NewFileInfo(name, false, objSize, objectModTime, false), nil
@ -136,53 +136,22 @@ func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
} else if err != nil {
return nil, err
}
// the requested file could still be a directory as a zero bytes key
// with a forwarding slash.
// As last resort we do a list dir to find it
return fs.getStatCompat(name)
// the requested file may still be a directory as a zero bytes key
// with a trailing forward slash (created using mkdir).
// S3 doesn't return content type when listing objects, so we have
// create "dirs" adding a trailing "/" to the key
return fs.getStatForDir(name)
}
func (fs *S3Fs) getStatCompat(name string) (os.FileInfo, error) {
func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
var result FileInfo
prefix := path.Dir(name)
if prefix == "/" || prefix == "." {
prefix = ""
} else {
prefix = strings.TrimPrefix(prefix, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
obj, err := fs.headObject(name + "/")
if err != nil {
return result, err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(fs.config.Bucket),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, p := range page.CommonPrefixes {
if fs.isEqual(p.Prefix, name) {
result = NewFileInfo(name, true, 0, time.Now(), false)
return false
}
}
for _, fileObject := range page.Contents {
if fs.isEqual(fileObject.Key, name) {
objectSize := *fileObject.Size
objectModTime := *fileObject.LastModified
isDir := strings.HasSuffix(*fileObject.Key, "/") && objectSize == 0
result = NewFileInfo(name, isDir, objectSize, objectModTime, false)
return false
}
}
return true
})
metrics.S3ListObjectsCompleted(err)
if err == nil && result.Name() == "" {
err = errors.New("404 no such file or directory")
}
return result, err
objSize := *obj.ContentLength
objectModTime := *obj.LastModified
return NewFileInfo(name, true, objSize, objectModTime, false), nil
}
// Lstat returns a FileInfo describing the named file
@ -624,19 +593,6 @@ func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
return result, isDir
}
func (fs *S3Fs) isEqual(s3Key *string, virtualName string) bool {
if *s3Key == virtualName {
return true
}
if "/"+*s3Key == virtualName {
return true
}
if "/"+*s3Key == virtualName+"/" {
return true
}
return false
}
func (fs *S3Fs) checkIfBucketExists() error {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()