소스 검색

cloud backends: stat and other performance improvements

Nicola Murino 4 년 전
부모
커밋
6ad4cc317c
7개의 변경된 파일434개의 추가작업 그리고 255개의 파일을 삭제
  1. 1 1
      dataprovider/dataprovider.go
  2. 1 1
      httpd/api_quota.go
  3. 1 1
      sftpd/sftpd_test.go
  4. 107 107
      vfs/azblobfs.go
  5. 165 64
      vfs/gcsfs.go
  6. 30 30
      vfs/osfs.go
  7. 129 51
      vfs/s3fs.go

+ 1 - 1
dataprovider/dataprovider.go

@@ -1974,7 +1974,7 @@ func executeAction(operation string, user User) {
 
 // after migrating database to v4 we have to update the quota for the imported folders
 func updateVFoldersQuotaAfterRestore(foldersToScan []string) {
-	fs := vfs.NewOsFs("", "", nil).(vfs.OsFs)
+	fs := vfs.NewOsFs("", "", nil).(*vfs.OsFs)
 	for _, folder := range foldersToScan {
 		providerLog(logger.LevelDebug, "starting quota scan after migration for folder %#v", folder)
 		vfolder, err := provider.getFolderByPath(folder)

+ 1 - 1
httpd/api_quota.go

@@ -171,7 +171,7 @@ func doQuotaScan(user dataprovider.User) error {
 
 func doFolderQuotaScan(folder vfs.BaseVirtualFolder) error {
 	defer common.QuotaScans.RemoveVFolderQuotaScan(folder.MappedPath)
-	fs := vfs.NewOsFs("", "", nil).(vfs.OsFs)
+	fs := vfs.NewOsFs("", "", nil).(*vfs.OsFs)
 	numFiles, size, err := fs.GetDirSize(folder.MappedPath)
 	if err != nil {
 		logger.Warn(logSender, "", "error scanning folder %#v: %v", folder.MappedPath, err)

+ 1 - 1
sftpd/sftpd_test.go

@@ -5688,7 +5688,7 @@ func TestResolveVirtualPaths(t *testing.T) {
 	})
 	err := os.MkdirAll(mappedPath, os.ModePerm)
 	assert.NoError(t, err)
-	osFs := vfs.NewOsFs("", user.GetHomeDir(), user.VirtualFolders).(vfs.OsFs)
+	osFs := vfs.NewOsFs("", user.GetHomeDir(), user.VirtualFolders).(*vfs.OsFs)
 	b, f := osFs.GetFsPaths("/vdir/a.txt")
 	assert.Equal(t, mappedPath, b)
 	assert.Equal(t, filepath.Join(mappedPath, "a.txt"), f)

+ 107 - 107
vfs/azblobfs.go

@@ -51,7 +51,7 @@ func init() {
 
 // NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage
 func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, error) {
-	fs := AzureBlobFs{
+	fs := &AzureBlobFs{
 		connectionID:   connectionID,
 		localTempDir:   localTempDir,
 		config:         config,
@@ -68,7 +68,10 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs,
 		}
 		fs.config.AccountKey = accountKey
 	}
-	setConfigDefaults(&fs)
+	fs.setConfigDefaults()
+
+	version := version.Get()
+	telemetryValue := fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash)
 
 	if fs.config.SASURL != "" {
 		u, err := url.Parse(fs.config.SASURL)
@@ -80,7 +83,7 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs,
 				TryTimeout: maxTryTimeout,
 			},
 			Telemetry: azblob.TelemetryOptions{
-				Value: "SFTPGo",
+				Value: telemetryValue,
 			},
 		})
 		// Check if we have container level SAS or account level SAS
@@ -123,7 +126,7 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs,
 			TryTimeout: maxTryTimeout,
 		},
 		Telemetry: azblob.TelemetryOptions{
-			Value: "SFTPGo",
+			Value: telemetryValue,
 		},
 	})
 	serviceURL := azblob.NewServiceURL(*u, pipeline)
@@ -132,24 +135,8 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs,
 	return fs, nil
 }
 
-func setConfigDefaults(fs *AzureBlobFs) {
-	if fs.config.Endpoint == "" {
-		fs.config.Endpoint = azureDefaultEndpoint
-	}
-	if fs.config.UploadPartSize == 0 {
-		fs.config.UploadPartSize = 4
-	}
-	fs.config.UploadPartSize *= 1024 * 1024
-	if fs.config.UploadConcurrency == 0 {
-		fs.config.UploadConcurrency = 2
-	}
-	if fs.config.AccessTier == "" {
-		fs.config.AccessTier = string(azblob.AccessTierNone)
-	}
-}
-
 // Name returns the name for the Fs implementation
-func (fs AzureBlobFs) Name() string {
+func (fs *AzureBlobFs) Name() string {
 	if fs.config.SASURL != "" {
 		return fmt.Sprintf("Azure Blob SAS URL %#v", fs.config.Container)
 	}
@@ -157,12 +144,12 @@ func (fs AzureBlobFs) Name() string {
 }
 
 // ConnectionID returns the connection ID associated to this Fs implementation
-func (fs AzureBlobFs) ConnectionID() string {
+func (fs *AzureBlobFs) ConnectionID() string {
 	return fs.connectionID
 }
 
 // Stat returns a FileInfo describing the named file
-func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) {
+func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
 	if name == "" || name == "." {
 		err := fs.checkIfBucketExists()
 		if err != nil {
@@ -173,59 +160,34 @@ func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) {
 	if fs.config.KeyPrefix == name+"/" {
 		return NewFileInfo(name, true, 0, time.Now(), false), nil
 	}
-	prefix := fs.getPrefixForStat(name)
-	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
-	defer cancelFn()
 
-	for marker := (azblob.Marker{}); marker.NotDone(); {
-		listBlob, err := fs.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
-			Details: azblob.BlobListingDetails{
-				Copy:             false,
-				Metadata:         false,
-				Snapshots:        false,
-				UncommittedBlobs: false,
-				Deleted:          false,
-			},
-			Prefix: prefix,
-		})
-		if err != nil {
-			metrics.AZListObjectsCompleted(err)
-			return nil, err
-		}
-		marker = listBlob.NextMarker
-		for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
-			if fs.isEqual(blobPrefix.Name, name) {
-				metrics.AZListObjectsCompleted(nil)
-				return NewFileInfo(name, true, 0, time.Now(), false), nil
-			}
-		}
-		for _, blobInfo := range listBlob.Segment.BlobItems {
-			if fs.isEqual(blobInfo.Name, name) {
-				isDir := false
-				if blobInfo.Properties.ContentType != nil {
-					isDir = (*blobInfo.Properties.ContentType == dirMimeType)
-				}
-				size := int64(0)
-				if blobInfo.Properties.ContentLength != nil {
-					size = *blobInfo.Properties.ContentLength
-				}
-				metrics.AZListObjectsCompleted(nil)
-				return NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false), nil
-			}
-		}
+	attrs, err := fs.headObject(name)
+	if err == nil {
+		isDir := (attrs.ContentType() == dirMimeType)
+		metrics.AZListObjectsCompleted(nil)
+		return NewFileInfo(name, isDir, attrs.ContentLength(), attrs.LastModified(), false), nil
+	}
+	if !fs.IsNotExist(err) {
+		return nil, err
+	}
+	// now check if this is a prefix (virtual directory)
+	hasContents, err := fs.hasContents(name)
+	if err != nil {
+		return nil, err
+	}
+	if hasContents {
+		return NewFileInfo(name, true, 0, time.Now(), false), nil
 	}
-
-	metrics.AZListObjectsCompleted(nil)
 	return nil, errors.New("404 no such file or directory")
 }
 
 // Lstat returns a FileInfo describing the named file
-func (fs AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
+func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
 	return fs.Stat(name)
 }
 
 // Open opens the named file for reading
-func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
+func (fs *AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -257,7 +219,7 @@ func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeRea
 }
 
 // Create creates or opens the named file for writing
-func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
+func (fs *AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -304,7 +266,7 @@ func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func
 // rename all the contents too and this could take long time: think
 // about directories with thousands of files, for each file we should
 // execute a StartCopyFromURL call.
-func (fs AzureBlobFs) Rename(source, target string) error {
+func (fs *AzureBlobFs) Rename(source, target string) error {
 	if source == target {
 		return nil
 	}
@@ -313,11 +275,11 @@ func (fs AzureBlobFs) Rename(source, target string) error {
 		return err
 	}
 	if fi.IsDir() {
-		contents, err := fs.ReadDir(source)
+		hasContents, err := fs.hasContents(source)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot rename non empty directory: %#v", source)
 		}
 	}
@@ -363,13 +325,13 @@ func (fs AzureBlobFs) Rename(source, target string) error {
 }
 
 // Remove removes the named file or (empty) directory.
-func (fs AzureBlobFs) Remove(name string, isDir bool) error {
+func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
 	if isDir {
-		contents, err := fs.ReadDir(name)
+		hasContents, err := fs.hasContents(name)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot remove non empty directory: %#v", name)
 		}
 	}
@@ -383,7 +345,7 @@ func (fs AzureBlobFs) Remove(name string, isDir bool) error {
 }
 
 // Mkdir creates a new directory with the specified name and default permissions
-func (fs AzureBlobFs) Mkdir(name string) error {
+func (fs *AzureBlobFs) Mkdir(name string) error {
 	_, err := fs.Stat(name)
 	if !fs.IsNotExist(err) {
 		return err
@@ -396,43 +358,43 @@ func (fs AzureBlobFs) Mkdir(name string) error {
 }
 
 // Symlink creates source as a symbolic link to target.
-func (AzureBlobFs) Symlink(source, target string) error {
+func (*AzureBlobFs) Symlink(source, target string) error {
 	return errors.New("403 symlinks are not supported")
 }
 
 // Readlink returns the destination of the named symbolic link
-func (AzureBlobFs) Readlink(name string) (string, error) {
+func (*AzureBlobFs) Readlink(name string) (string, error) {
 	return "", errors.New("403 readlink is not supported")
 }
 
 // Chown changes the numeric uid and gid of the named file.
 // Silently ignored.
-func (AzureBlobFs) Chown(name string, uid int, gid int) error {
+func (*AzureBlobFs) Chown(name string, uid int, gid int) error {
 	return nil
 }
 
 // Chmod changes the mode of the named file to mode.
 // Silently ignored.
-func (AzureBlobFs) Chmod(name string, mode os.FileMode) error {
+func (*AzureBlobFs) Chmod(name string, mode os.FileMode) error {
 	return nil
 }
 
 // Chtimes changes the access and modification times of the named file.
 // Silently ignored.
-func (AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error {
+func (*AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error {
 	return errors.New("403 chtimes is not supported")
 }
 
 // Truncate changes the size of the named file.
 // Truncate by path is not supported, while truncating an opened
 // file is handled inside base transfer
-func (AzureBlobFs) Truncate(name string, size int64) error {
+func (*AzureBlobFs) Truncate(name string, size int64) error {
 	return errors.New("403 truncate is not supported")
 }
 
 // ReadDir reads the directory named by dirname and returns
 // a list of directory entries.
-func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
+func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 	var result []os.FileInfo
 	// dirname must be already cleaned
 	prefix := ""
@@ -504,20 +466,20 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 
 // IsUploadResumeSupported returns true if upload resume is supported.
 // Upload Resume is not supported on Azure Blob
-func (AzureBlobFs) IsUploadResumeSupported() bool {
+func (*AzureBlobFs) IsUploadResumeSupported() bool {
 	return false
 }
 
 // IsAtomicUploadSupported returns true if atomic upload is supported.
 // Azure Blob uploads are already atomic, we don't need to upload to a temporary
 // file
-func (AzureBlobFs) IsAtomicUploadSupported() bool {
+func (*AzureBlobFs) IsAtomicUploadSupported() bool {
 	return false
 }
 
 // IsNotExist returns a boolean indicating whether the error is known to
 // report that a file or directory does not exist
-func (AzureBlobFs) IsNotExist(err error) bool {
+func (*AzureBlobFs) IsNotExist(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -537,7 +499,7 @@ func (AzureBlobFs) IsNotExist(err error) bool {
 
 // IsPermission returns a boolean indicating whether the error is known to
 // report that permission is denied.
-func (AzureBlobFs) IsPermission(err error) bool {
+func (*AzureBlobFs) IsPermission(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -556,7 +518,7 @@ func (AzureBlobFs) IsPermission(err error) bool {
 }
 
 // CheckRootPath creates the specified local root directory if it does not exists
-func (fs AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
+func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
 	// we need a local directory for temporary files
 	osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
 	return osFs.CheckRootPath(username, uid, gid)
@@ -564,7 +526,7 @@ func (fs AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
 
 // ScanRootDirContents returns the number of files contained in the bucket,
 // and their size
-func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) {
+func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
 	numFiles := 0
 	size := int64(0)
 
@@ -610,19 +572,19 @@ func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) {
 
 // GetDirSize returns the number of files and the size for a folder
 // including any subfolders
-func (AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
+func (*AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
 	return 0, 0, errUnsupported
 }
 
 // GetAtomicUploadPath returns the path to use for an atomic upload.
 // Azure Blob Storage uploads are already atomic, we never call this method
-func (AzureBlobFs) GetAtomicUploadPath(name string) string {
+func (*AzureBlobFs) GetAtomicUploadPath(name string) string {
 	return ""
 }
 
 // GetRelativePath returns the path for a file relative to the user's home dir.
 // This is the path as seen by SFTPGo users
-func (fs AzureBlobFs) GetRelativePath(name string) string {
+func (fs *AzureBlobFs) GetRelativePath(name string) string {
 	rel := path.Clean(name)
 	if rel == "." {
 		rel = ""
@@ -630,7 +592,7 @@ func (fs AzureBlobFs) GetRelativePath(name string) string {
 	if !path.IsAbs(rel) {
 		rel = "/" + rel
 	}
-	if len(fs.config.KeyPrefix) > 0 {
+	if fs.config.KeyPrefix != "" {
 		if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
 			rel = "/"
 		}
@@ -641,7 +603,7 @@ func (fs AzureBlobFs) GetRelativePath(name string) string {
 
 // Walk walks the file tree rooted at root, calling walkFn for each file or
 // directory in the tree, including root
-func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
+func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
 	prefix := ""
 	if root != "" && root != "." {
 		prefix = strings.TrimPrefix(root, "/")
@@ -673,15 +635,14 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
 			if blobInfo.Properties.ContentType != nil {
 				isDir = (*blobInfo.Properties.ContentType == dirMimeType)
 			}
-			name := path.Clean(blobInfo.Name)
-			if len(name) == 0 {
+			if fs.isEqual(blobInfo.Name, prefix) {
 				continue
 			}
 			blobSize := int64(0)
 			if blobInfo.Properties.ContentLength != nil {
 				blobSize = *blobInfo.Properties.ContentLength
 			}
-			err = walkFn(blobInfo.Name, NewFileInfo(name, isDir, blobSize, blobInfo.Properties.LastModified, false), nil)
+			err = walkFn(blobInfo.Name, NewFileInfo(blobInfo.Name, isDir, blobSize, blobInfo.Properties.LastModified, false), nil)
 			if err != nil {
 				return err
 			}
@@ -693,31 +654,36 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
 }
 
 // Join joins any number of path elements into a single path
-func (AzureBlobFs) Join(elem ...string) string {
+func (*AzureBlobFs) Join(elem ...string) string {
 	return strings.TrimPrefix(path.Join(elem...), "/")
 }
 
 // HasVirtualFolders returns true if folders are emulated
-func (AzureBlobFs) HasVirtualFolders() bool {
+func (*AzureBlobFs) HasVirtualFolders() bool {
 	return true
 }
 
 // ResolvePath returns the matching filesystem path for the specified sftp path
-func (fs AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
+func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
 	if !path.IsAbs(virtualPath) {
 		virtualPath = path.Clean("/" + virtualPath)
 	}
 	return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
 }
 
-// GetMimeType implements MimeTyper interface
-func (fs AzureBlobFs) GetMimeType(name string) (string, error) {
+func (fs *AzureBlobFs) headObject(name string) (*azblob.BlobGetPropertiesResponse, error) {
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 
 	blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
 	response, err := blobBlockURL.GetProperties(ctx, azblob.BlobAccessConditions{})
 	metrics.AZHeadObjectCompleted(err)
+	return response, err
+}
+
+// GetMimeType implements MimeTyper interface
+func (fs *AzureBlobFs) GetMimeType(name string) (string, error) {
+	response, err := fs.headObject(name)
 	if err != nil {
 		return "", err
 	}
@@ -737,6 +703,22 @@ func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool {
 	return false
 }
 
+func (fs *AzureBlobFs) setConfigDefaults() {
+	if fs.config.Endpoint == "" {
+		fs.config.Endpoint = azureDefaultEndpoint
+	}
+	if fs.config.UploadPartSize == 0 {
+		fs.config.UploadPartSize = 4
+	}
+	fs.config.UploadPartSize *= 1024 * 1024
+	if fs.config.UploadConcurrency == 0 {
+		fs.config.UploadConcurrency = 2
+	}
+	if fs.config.AccessTier == "" {
+		fs.config.AccessTier = string(azblob.AccessTierNone)
+	}
+}
+
 func (fs *AzureBlobFs) checkIfBucketExists() error {
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
@@ -746,17 +728,35 @@ func (fs *AzureBlobFs) checkIfBucketExists() error {
 	return err
 }
 
-func (fs *AzureBlobFs) getPrefixForStat(name string) string {
-	prefix := path.Dir(name)
-	if prefix == "/" || prefix == "." || prefix == "" {
-		prefix = ""
-	} else {
-		prefix = strings.TrimPrefix(prefix, "/")
+func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
+	result := false
+	prefix := ""
+	if name != "" && name != "." {
+		prefix = strings.TrimPrefix(name, "/")
 		if !strings.HasSuffix(prefix, "/") {
 			prefix += "/"
 		}
 	}
-	return prefix
+	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
+	defer cancelFn()
+
+	listBlob, err := fs.containerURL.ListBlobsFlatSegment(ctx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{
+		Details: azblob.BlobListingDetails{
+			Copy:             false,
+			Metadata:         false,
+			Snapshots:        false,
+			UncommittedBlobs: false,
+			Deleted:          false,
+		},
+		Prefix:     prefix,
+		MaxResults: 1,
+	})
+	metrics.AZListObjectsCompleted(err)
+	if err != nil {
+		return result, err
+	}
+	result = len(listBlob.Segment.BlobItems) > 0
+	return result, err
 }
 
 func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, blockBlobURL azblob.BlockBlobURL,

+ 165 - 64
vfs/gcsfs.go

@@ -27,7 +27,7 @@ import (
 )
 
 var (
-	gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated"}
+	gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"}
 )
 
 // GCSFs is a Fs implementation for Google Cloud Storage.
@@ -47,7 +47,7 @@ func init() {
 // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage
 func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) {
 	var err error
-	fs := GCSFs{
+	fs := &GCSFs{
 		connectionID:   connectionID,
 		localTempDir:   localTempDir,
 		config:         config,
@@ -69,17 +69,17 @@ func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error)
 }
 
 // Name returns the name for the Fs implementation
-func (fs GCSFs) Name() string {
+func (fs *GCSFs) Name() string {
 	return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket)
 }
 
 // ConnectionID returns the connection ID associated to this Fs implementation
-func (fs GCSFs) ConnectionID() string {
+func (fs *GCSFs) ConnectionID() string {
 	return fs.connectionID
 }
 
 // Stat returns a FileInfo describing the named file
-func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
+func (fs *GCSFs) Stat(name string) (os.FileInfo, error) {
 	var result FileInfo
 	var err error
 	if name == "" || name == "." {
@@ -92,9 +92,32 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
 	if fs.config.KeyPrefix == name+"/" {
 		return NewFileInfo(name, true, 0, time.Now(), false), nil
 	}
+	attrs, err := fs.headObject(name)
+	if err == nil {
+		objSize := attrs.Size
+		objectModTime := attrs.Updated
+		isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
+		return NewFileInfo(name, isDir, objSize, objectModTime, false), nil
+	}
+	if !fs.IsNotExist(err) {
+		return result, err
+	}
+	// now check if this is a prefix (virtual directory)
+	hasContents, err := fs.hasContents(name)
+	if err == nil && hasContents {
+		return NewFileInfo(name, true, 0, time.Now(), false), nil
+	} else if err != nil {
+		return nil, err
+	}
+	// search a dir ending with "/" for backward compatibility
+	return fs.getStatCompat(name)
+}
+
+func (fs *GCSFs) getStatCompat(name string) (os.FileInfo, error) {
+	var result FileInfo
 	prefix := fs.getPrefixForStat(name)
 	query := &storage.Query{Prefix: prefix, Delimiter: "/"}
-	err = query.SetAttrSelection(gcsDefaultFieldsSelection)
+	err := query.SetAttrSelection(gcsDefaultFieldsSelection)
 	if err != nil {
 		return nil, err
 	}
@@ -135,12 +158,12 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) {
 }
 
 // Lstat returns a FileInfo describing the named file
-func (fs GCSFs) Lstat(name string) (os.FileInfo, error) {
+func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
 	return fs.Stat(name)
 }
 
 // Open opens the named file for reading
-func (fs GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
+func (fs *GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -171,7 +194,7 @@ func (fs GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt,
 }
 
 // Create creates or opens the named file for writing
-func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
+func (fs *GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -190,7 +213,7 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), er
 	if contentType != "" {
 		objectWriter.ObjectAttrs.ContentType = contentType
 	}
-	if len(fs.config.StorageClass) > 0 {
+	if fs.config.StorageClass != "" {
 		objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
 	}
 	go func() {
@@ -210,7 +233,7 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), er
 // rename all the contents too and this could take long time: think
 // about directories with thousands of files, for each file we should
 // execute a CopyObject call.
-func (fs GCSFs) Rename(source, target string) error {
+func (fs *GCSFs) Rename(source, target string) error {
 	if source == target {
 		return nil
 	}
@@ -219,28 +242,31 @@ func (fs GCSFs) Rename(source, target string) error {
 		return err
 	}
 	if fi.IsDir() {
-		contents, err := fs.ReadDir(source)
+		hasContents, err := fs.hasContents(source)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot rename non empty directory: %#v", source)
 		}
-		if !strings.HasSuffix(source, "/") {
-			source += "/"
-		}
-		if !strings.HasSuffix(target, "/") {
-			target += "/"
-		}
 	}
 	src := fs.svc.Bucket(fs.config.Bucket).Object(source)
 	dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 	copier := dst.CopierFrom(src)
-	if len(fs.config.StorageClass) > 0 {
+	if fs.config.StorageClass != "" {
 		copier.StorageClass = fs.config.StorageClass
 	}
+	var contentType string
+	if fi.IsDir() {
+		contentType = dirMimeType
+	} else {
+		contentType = mime.TypeByExtension(path.Ext(source))
+	}
+	if contentType != "" {
+		copier.ContentType = contentType
+	}
 	_, err = copier.Run(ctx)
 	metrics.GCSCopyObjectCompleted(err)
 	if err != nil {
@@ -250,35 +276,35 @@ func (fs GCSFs) Rename(source, target string) error {
 }
 
 // Remove removes the named file or (empty) directory.
-func (fs GCSFs) Remove(name string, isDir bool) error {
+func (fs *GCSFs) Remove(name string, isDir bool) error {
 	if isDir {
-		contents, err := fs.ReadDir(name)
+		hasContents, err := fs.hasContents(name)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot remove non empty directory: %#v", name)
 		}
-		if !strings.HasSuffix(name, "/") {
-			name += "/"
-		}
 	}
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
+
 	err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
 	metrics.GCSDeleteObjectCompleted(err)
+	if fs.IsNotExist(err) && isDir {
+		name = name + "/"
+		err = fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
+		metrics.GCSDeleteObjectCompleted(err)
+	}
 	return err
 }
 
 // Mkdir creates a new directory with the specified name and default permissions
-func (fs GCSFs) Mkdir(name string) error {
+func (fs *GCSFs) Mkdir(name string) error {
 	_, err := fs.Stat(name)
 	if !fs.IsNotExist(err) {
 		return err
 	}
-	if !strings.HasSuffix(name, "/") {
-		name += "/"
-	}
 	_, w, _, err := fs.Create(name, -1)
 	if err != nil {
 		return err
@@ -287,59 +313,57 @@ func (fs GCSFs) Mkdir(name string) error {
 }
 
 // Symlink creates source as a symbolic link to target.
-func (GCSFs) Symlink(source, target string) error {
+func (*GCSFs) Symlink(source, target string) error {
 	return errors.New("403 symlinks are not supported")
 }
 
 // Readlink returns the destination of the named symbolic link
-func (GCSFs) Readlink(name string) (string, error) {
+func (*GCSFs) Readlink(name string) (string, error) {
 	return "", errors.New("403 readlink is not supported")
 }
 
 // Chown changes the numeric uid and gid of the named file.
 // Silently ignored.
-func (GCSFs) Chown(name string, uid int, gid int) error {
+func (*GCSFs) Chown(name string, uid int, gid int) error {
 	return nil
 }
 
 // Chmod changes the mode of the named file to mode.
 // Silently ignored.
-func (GCSFs) Chmod(name string, mode os.FileMode) error {
+func (*GCSFs) Chmod(name string, mode os.FileMode) error {
 	return nil
 }
 
 // Chtimes changes the access and modification times of the named file.
 // Silently ignored.
-func (GCSFs) Chtimes(name string, atime, mtime time.Time) error {
+func (*GCSFs) Chtimes(name string, atime, mtime time.Time) error {
 	return errors.New("403 chtimes is not supported")
 }
 
 // Truncate changes the size of the named file.
 // Truncate by path is not supported, while truncating an opened
 // file is handled inside base transfer
-func (GCSFs) Truncate(name string, size int64) error {
+func (*GCSFs) Truncate(name string, size int64) error {
 	return errors.New("403 truncate is not supported")
 }
 
 // ReadDir reads the directory named by dirname and returns
 // a list of directory entries.
-func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
+func (fs *GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 	var result []os.FileInfo
 	// dirname must be already cleaned
-	prefix := ""
-	if dirname != "" && dirname != "." {
-		prefix = strings.TrimPrefix(dirname, "/")
-		if !strings.HasSuffix(prefix, "/") {
-			prefix += "/"
-		}
-	}
+	prefix := fs.getPrefix(dirname)
+
 	query := &storage.Query{Prefix: prefix, Delimiter: "/"}
 	err := query.SetAttrSelection(gcsDefaultFieldsSelection)
 	if err != nil {
 		return nil, err
 	}
+
+	prefixes := make(map[string]bool)
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
+
 	bkt := fs.svc.Bucket(fs.config.Bucket)
 	it := bkt.Objects(ctx, query)
 	for {
@@ -353,15 +377,32 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 		}
 		if attrs.Prefix != "" {
 			name, _ := fs.resolve(attrs.Prefix, prefix)
+			if name == "" {
+				continue
+			}
+			if _, ok := prefixes[name]; ok {
+				continue
+			}
 			result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
+			prefixes[name] = true
 		} else {
 			name, isDir := fs.resolve(attrs.Name, prefix)
-			if len(name) == 0 {
+			if name == "" {
 				continue
 			}
 			if !attrs.Deleted.IsZero() {
 				continue
 			}
+			if attrs.ContentType == dirMimeType {
+				isDir = true
+			}
+			if isDir {
+				// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
+				if _, ok := prefixes[name]; ok {
+					continue
+				}
+				prefixes[name] = true
+			}
 			fi := NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false)
 			result = append(result, fi)
 		}
@@ -372,20 +413,20 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 
 // IsUploadResumeSupported returns true if upload resume is supported.
 // SFTP Resume is not supported on S3
-func (GCSFs) IsUploadResumeSupported() bool {
+func (*GCSFs) IsUploadResumeSupported() bool {
 	return false
 }
 
 // IsAtomicUploadSupported returns true if atomic upload is supported.
 // S3 uploads are already atomic, we don't need to upload to a temporary
 // file
-func (GCSFs) IsAtomicUploadSupported() bool {
+func (*GCSFs) IsAtomicUploadSupported() bool {
 	return false
 }
 
 // IsNotExist returns a boolean indicating whether the error is known to
 // report that a file or directory does not exist
-func (GCSFs) IsNotExist(err error) bool {
+func (*GCSFs) IsNotExist(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -402,7 +443,7 @@ func (GCSFs) IsNotExist(err error) bool {
 
 // IsPermission returns a boolean indicating whether the error is known to
 // report that permission is denied.
-func (GCSFs) IsPermission(err error) bool {
+func (*GCSFs) IsPermission(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -415,7 +456,7 @@ func (GCSFs) IsPermission(err error) bool {
 }
 
 // CheckRootPath creates the specified local root directory if it does not exists
-func (fs GCSFs) CheckRootPath(username string, uid int, gid int) bool {
+func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool {
 	// we need a local directory for temporary files
 	osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
 	return osFs.CheckRootPath(username, uid, gid)
@@ -423,7 +464,7 @@ func (fs GCSFs) CheckRootPath(username string, uid int, gid int) bool {
 
 // ScanRootDirContents returns the number of files contained in the bucket,
 // and their size
-func (fs GCSFs) ScanRootDirContents() (int, int64, error) {
+func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
 	numFiles := 0
 	size := int64(0)
 	query := &storage.Query{Prefix: fs.config.KeyPrefix}
@@ -447,7 +488,7 @@ func (fs GCSFs) ScanRootDirContents() (int, int64, error) {
 		if !attrs.Deleted.IsZero() {
 			continue
 		}
-		isDir := strings.HasSuffix(attrs.Name, "/")
+		isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
 		if isDir && attrs.Size == 0 {
 			continue
 		}
@@ -460,19 +501,19 @@ func (fs GCSFs) ScanRootDirContents() (int, int64, error) {
 
 // GetDirSize returns the number of files and the size for a folder
 // including any subfolders
-func (GCSFs) GetDirSize(dirname string) (int, int64, error) {
+func (*GCSFs) GetDirSize(dirname string) (int, int64, error) {
 	return 0, 0, errUnsupported
 }
 
 // GetAtomicUploadPath returns the path to use for an atomic upload.
 // GCS uploads are already atomic, we never call this method for GCS
-func (GCSFs) GetAtomicUploadPath(name string) string {
+func (*GCSFs) GetAtomicUploadPath(name string) string {
 	return ""
 }
 
 // GetRelativePath returns the path for a file relative to the user's home dir.
 // This is the path as seen by SFTPGo users
-func (fs GCSFs) GetRelativePath(name string) string {
+func (fs *GCSFs) GetRelativePath(name string) string {
 	rel := path.Clean(name)
 	if rel == "." {
 		rel = ""
@@ -480,7 +521,7 @@ func (fs GCSFs) GetRelativePath(name string) string {
 	if !path.IsAbs(rel) {
 		rel = "/" + rel
 	}
-	if len(fs.config.KeyPrefix) > 0 {
+	if fs.config.KeyPrefix != "" {
 		if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
 			rel = "/"
 		}
@@ -491,7 +532,7 @@ func (fs GCSFs) GetRelativePath(name string) string {
 
 // Walk walks the file tree rooted at root, calling walkFn for each file or
 // directory in the tree, including root
-func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
+func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
 	prefix := ""
 	if root != "" && root != "." {
 		prefix = strings.TrimPrefix(root, "/")
@@ -524,11 +565,13 @@ func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
 		if !attrs.Deleted.IsZero() {
 			continue
 		}
-		isDir := strings.HasSuffix(attrs.Name, "/")
-		name := path.Clean(attrs.Name)
-		if len(name) == 0 {
+		name, isDir := fs.resolve(attrs.Name, prefix)
+		if name == "" {
 			continue
 		}
+		if attrs.ContentType == dirMimeType {
+			isDir = true
+		}
 		err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
 		if err != nil {
 			return err
@@ -541,7 +584,7 @@ func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
 }
 
 // Join joins any number of path elements into a single path
-func (GCSFs) Join(elem ...string) string {
+func (*GCSFs) Join(elem ...string) string {
 	return strings.TrimPrefix(path.Join(elem...), "/")
 }
 
@@ -551,7 +594,7 @@ func (GCSFs) HasVirtualFolders() bool {
 }
 
 // ResolvePath returns the matching filesystem path for the specified virtual path
-func (fs GCSFs) ResolvePath(virtualPath string) (string, error) {
+func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
 	if !path.IsAbs(virtualPath) {
 		virtualPath = path.Clean("/" + virtualPath)
 	}
@@ -589,6 +632,59 @@ func (fs *GCSFs) checkIfBucketExists() error {
 	return err
 }
 
+func (fs *GCSFs) hasContents(name string) (bool, error) {
+	result := false
+	prefix := ""
+	if name != "" && name != "." {
+		prefix = strings.TrimPrefix(name, "/")
+		if !strings.HasSuffix(prefix, "/") {
+			prefix += "/"
+		}
+	}
+	query := &storage.Query{Prefix: prefix}
+	err := query.SetAttrSelection(gcsDefaultFieldsSelection)
+	if err != nil {
+		return result, err
+	}
+	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
+	defer cancelFn()
+	bkt := fs.svc.Bucket(fs.config.Bucket)
+	it := bkt.Objects(ctx, query)
+	// if we have a dir object with a trailing slash it will be returned so we set the size to 2
+	it.PageInfo().MaxSize = 2
+	for {
+		attrs, err := it.Next()
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			metrics.GCSListObjectsCompleted(err)
+			return result, err
+		}
+		name, _ := fs.resolve(attrs.Name, prefix)
+		// a dir object with a trailing slash will result in an empty name
+		if name == "/" || name == "" {
+			continue
+		}
+		result = true
+		break
+	}
+
+	metrics.GCSListObjectsCompleted(err)
+	return result, nil
+}
+
+func (fs *GCSFs) getPrefix(name string) string {
+	prefix := ""
+	if name != "" && name != "." && name != "/" {
+		prefix = strings.TrimPrefix(name, "/")
+		if !strings.HasSuffix(prefix, "/") {
+			prefix += "/"
+		}
+	}
+	return prefix
+}
+
 func (fs *GCSFs) getPrefixForStat(name string) string {
 	prefix := path.Dir(name)
 	if prefix == "/" || prefix == "." || prefix == "" {
@@ -602,8 +698,7 @@ func (fs *GCSFs) getPrefixForStat(name string) string {
 	return prefix
 }
 
-// GetMimeType implements MimeTyper interface
-func (fs GCSFs) GetMimeType(name string) (string, error) {
+func (fs *GCSFs) headObject(name string) (*storage.ObjectAttrs, error) {
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 
@@ -611,6 +706,12 @@ func (fs GCSFs) GetMimeType(name string) (string, error) {
 	obj := bkt.Object(name)
 	attrs, err := obj.Attrs(ctx)
 	metrics.GCSHeadObjectCompleted(err)
+	return attrs, err
+}
+
+// GetMimeType implements MimeTyper interface
+func (fs *GCSFs) GetMimeType(name string) (string, error) {
+	attrs, err := fs.headObject(name)
 	if err != nil {
 		return "", err
 	}

+ 30 - 30
vfs/osfs.go

@@ -30,7 +30,7 @@ type OsFs struct {
 
 // NewOsFs returns an OsFs object that allows to interact with local Os filesystem
 func NewOsFs(connectionID, rootDir string, virtualFolders []VirtualFolder) Fs {
-	return OsFs{
+	return &OsFs{
 		name:           osFsName,
 		connectionID:   connectionID,
 		rootDir:        rootDir,
@@ -39,17 +39,17 @@ func NewOsFs(connectionID, rootDir string, virtualFolders []VirtualFolder) Fs {
 }
 
 // Name returns the name for the Fs implementation
-func (fs OsFs) Name() string {
+func (fs *OsFs) Name() string {
 	return fs.name
 }
 
 // ConnectionID returns the SSH connection ID associated to this Fs implementation
-func (fs OsFs) ConnectionID() string {
+func (fs *OsFs) ConnectionID() string {
 	return fs.connectionID
 }
 
 // Stat returns a FileInfo describing the named file
-func (fs OsFs) Stat(name string) (os.FileInfo, error) {
+func (fs *OsFs) Stat(name string) (os.FileInfo, error) {
 	fi, err := os.Stat(name)
 	if err != nil {
 		return fi, err
@@ -64,7 +64,7 @@ func (fs OsFs) Stat(name string) (os.FileInfo, error) {
 }
 
 // Lstat returns a FileInfo describing the named file
-func (fs OsFs) Lstat(name string) (os.FileInfo, error) {
+func (fs *OsFs) Lstat(name string) (os.FileInfo, error) {
 	fi, err := os.Lstat(name)
 	if err != nil {
 		return fi, err
@@ -79,13 +79,13 @@ func (fs OsFs) Lstat(name string) (os.FileInfo, error) {
 }
 
 // Open opens the named file for reading
-func (OsFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
+func (*OsFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
 	f, err := os.Open(name)
 	return f, nil, nil, err
 }
 
 // Create creates or opens the named file for writing
-func (OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
+func (*OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
 	var err error
 	var f *os.File
 	if flag == 0 {
@@ -97,28 +97,28 @@ func (OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error)
 }
 
 // Rename renames (moves) source to target
-func (OsFs) Rename(source, target string) error {
+func (*OsFs) Rename(source, target string) error {
 	return os.Rename(source, target)
 }
 
 // Remove removes the named file or (empty) directory.
-func (OsFs) Remove(name string, isDir bool) error {
+func (*OsFs) Remove(name string, isDir bool) error {
 	return os.Remove(name)
 }
 
 // Mkdir creates a new directory with the specified name and default permissions
-func (OsFs) Mkdir(name string) error {
+func (*OsFs) Mkdir(name string) error {
 	return os.Mkdir(name, os.ModePerm)
 }
 
 // Symlink creates source as a symbolic link to target.
-func (OsFs) Symlink(source, target string) error {
+func (*OsFs) Symlink(source, target string) error {
 	return os.Symlink(source, target)
 }
 
 // Readlink returns the destination of the named symbolic link
 // as absolute virtual path
-func (fs OsFs) Readlink(name string) (string, error) {
+func (fs *OsFs) Readlink(name string) (string, error) {
 	p, err := os.Readlink(name)
 	if err != nil {
 		return p, err
@@ -127,28 +127,28 @@ func (fs OsFs) Readlink(name string) (string, error) {
 }
 
 // Chown changes the numeric uid and gid of the named file.
-func (OsFs) Chown(name string, uid int, gid int) error {
+func (*OsFs) Chown(name string, uid int, gid int) error {
 	return os.Chown(name, uid, gid)
 }
 
 // Chmod changes the mode of the named file to mode
-func (OsFs) Chmod(name string, mode os.FileMode) error {
+func (*OsFs) Chmod(name string, mode os.FileMode) error {
 	return os.Chmod(name, mode)
 }
 
 // Chtimes changes the access and modification times of the named file
-func (OsFs) Chtimes(name string, atime, mtime time.Time) error {
+func (*OsFs) Chtimes(name string, atime, mtime time.Time) error {
 	return os.Chtimes(name, atime, mtime)
 }
 
 // Truncate changes the size of the named file
-func (OsFs) Truncate(name string, size int64) error {
+func (*OsFs) Truncate(name string, size int64) error {
 	return os.Truncate(name, size)
 }
 
 // ReadDir reads the directory named by dirname and returns
 // a list of directory entries.
-func (OsFs) ReadDir(dirname string) ([]os.FileInfo, error) {
+func (*OsFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 	f, err := os.Open(dirname)
 	if err != nil {
 		return nil, err
@@ -162,29 +162,29 @@ func (OsFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 }
 
 // IsUploadResumeSupported returns true if upload resume is supported
-func (OsFs) IsUploadResumeSupported() bool {
+func (*OsFs) IsUploadResumeSupported() bool {
 	return true
 }
 
 // IsAtomicUploadSupported returns true if atomic upload is supported
-func (OsFs) IsAtomicUploadSupported() bool {
+func (*OsFs) IsAtomicUploadSupported() bool {
 	return true
 }
 
 // IsNotExist returns a boolean indicating whether the error is known to
 // report that a file or directory does not exist
-func (OsFs) IsNotExist(err error) bool {
+func (*OsFs) IsNotExist(err error) bool {
 	return os.IsNotExist(err)
 }
 
 // IsPermission returns a boolean indicating whether the error is known to
 // report that permission is denied.
-func (OsFs) IsPermission(err error) bool {
+func (*OsFs) IsPermission(err error) bool {
 	return os.IsPermission(err)
 }
 
 // CheckRootPath creates the root directory if it does not exists
-func (fs OsFs) CheckRootPath(username string, uid int, gid int) bool {
+func (fs *OsFs) CheckRootPath(username string, uid int, gid int) bool {
 	var err error
 	if _, err = fs.Stat(fs.rootDir); fs.IsNotExist(err) {
 		err = os.MkdirAll(fs.rootDir, os.ModePerm)
@@ -207,7 +207,7 @@ func (fs OsFs) CheckRootPath(username string, uid int, gid int) bool {
 
 // ScanRootDirContents returns the number of files contained in a directory and
 // their size
-func (fs OsFs) ScanRootDirContents() (int, int64, error) {
+func (fs *OsFs) ScanRootDirContents() (int, int64, error) {
 	numFiles, size, err := fs.GetDirSize(fs.rootDir)
 	for _, v := range fs.virtualFolders {
 		if !v.IsIncludedInUserQuota() {
@@ -228,7 +228,7 @@ func (fs OsFs) ScanRootDirContents() (int, int64, error) {
 }
 
 // GetAtomicUploadPath returns the path to use for an atomic upload
-func (OsFs) GetAtomicUploadPath(name string) string {
+func (*OsFs) GetAtomicUploadPath(name string) string {
 	dir := filepath.Dir(name)
 	guid := xid.New().String()
 	return filepath.Join(dir, ".sftpgo-upload."+guid+"."+filepath.Base(name))
@@ -236,7 +236,7 @@ func (OsFs) GetAtomicUploadPath(name string) string {
 
 // GetRelativePath returns the path for a file relative to the user's home dir.
 // This is the path as seen by SFTP users
-func (fs OsFs) GetRelativePath(name string) string {
+func (fs *OsFs) GetRelativePath(name string) string {
 	basePath := fs.rootDir
 	virtualPath := "/"
 	for _, v := range fs.virtualFolders {
@@ -258,17 +258,17 @@ func (fs OsFs) GetRelativePath(name string) string {
 
 // Walk walks the file tree rooted at root, calling walkFn for each file or
 // directory in the tree, including root
-func (OsFs) Walk(root string, walkFn filepath.WalkFunc) error {
+func (*OsFs) Walk(root string, walkFn filepath.WalkFunc) error {
 	return filepath.Walk(root, walkFn)
 }
 
 // Join joins any number of path elements into a single path
-func (OsFs) Join(elem ...string) string {
+func (*OsFs) Join(elem ...string) string {
 	return filepath.Join(elem...)
 }
 
 // ResolvePath returns the matching filesystem path for the specified sftp path
-func (fs OsFs) ResolvePath(sftpPath string) (string, error) {
+func (fs *OsFs) ResolvePath(sftpPath string) (string, error) {
 	if !filepath.IsAbs(fs.rootDir) {
 		return "", fmt.Errorf("Invalid root path: %v", fs.rootDir)
 	}
@@ -295,7 +295,7 @@ func (fs OsFs) ResolvePath(sftpPath string) (string, error) {
 
 // GetDirSize returns the number of files and the size for a folder
 // including any subfolders
-func (fs OsFs) GetDirSize(dirname string) (int, int64, error) {
+func (fs *OsFs) GetDirSize(dirname string) (int, int64, error) {
 	numFiles := 0
 	size := int64(0)
 	isDir, err := IsDirectory(fs, dirname)
@@ -315,7 +315,7 @@ func (fs OsFs) GetDirSize(dirname string) (int, int64, error) {
 }
 
 // HasVirtualFolders returns true if folders are emulated
-func (OsFs) HasVirtualFolders() bool {
+func (*OsFs) HasVirtualFolders() bool {
 	return false
 }
 

+ 129 - 51
vfs/s3fs.go

@@ -44,7 +44,7 @@ func init() {
 // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
 // object storage
 func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
-	fs := S3Fs{
+	fs := &S3Fs{
 		connectionID:   connectionID,
 		localTempDir:   localTempDir,
 		config:         config,
@@ -96,17 +96,17 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
 }
 
 // Name returns the name for the Fs implementation
-func (fs S3Fs) Name() string {
+func (fs *S3Fs) Name() string {
 	return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
 }
 
 // ConnectionID returns the connection ID associated to this Fs implementation
-func (fs S3Fs) ConnectionID() string {
+func (fs *S3Fs) ConnectionID() string {
 	return fs.connectionID
 }
 
 // Stat returns a FileInfo describing the named file
-func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
+func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
 	var result FileInfo
 	if name == "/" || name == "." {
 		err := fs.checkIfBucketExists()
@@ -118,6 +118,30 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
 	if "/"+fs.config.KeyPrefix == name+"/" {
 		return NewFileInfo(name, true, 0, time.Now(), false), nil
 	}
+	obj, err := fs.headObject(name)
+	if err == nil {
+		objSize := *obj.ContentLength
+		objectModTime := *obj.LastModified
+		return NewFileInfo(name, false, objSize, objectModTime, false), nil
+	}
+	if !fs.IsNotExist(err) {
+		return result, err
+	}
+	// now check if this is a prefix (virtual directory)
+	hasContents, err := fs.hasContents(name)
+	if err == nil && hasContents {
+		return NewFileInfo(name, true, 0, time.Now(), false), nil
+	} else if err != nil {
+		return nil, err
+	}
+	// the requested file could still be a directory as a zero bytes key
+	// with a forwarding slash.
+	// As last resort we do a list dir to find it
+	return fs.getStatCompat(name)
+}
+
+func (fs *S3Fs) getStatCompat(name string) (os.FileInfo, error) {
+	var result FileInfo
 	prefix := path.Dir(name)
 	if prefix == "/" || prefix == "." {
 		prefix = ""
@@ -129,6 +153,7 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
 	}
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
+
 	err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
 		Bucket:    aws.String(fs.config.Bucket),
 		Prefix:    aws.String(prefix),
@@ -144,7 +169,7 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
 			if fs.isEqual(fileObject.Key, name) {
 				objectSize := *fileObject.Size
 				objectModTime := *fileObject.LastModified
-				isDir := strings.HasSuffix(*fileObject.Key, "/")
+				isDir := strings.HasSuffix(*fileObject.Key, "/") && objectSize == 0
 				result = NewFileInfo(name, isDir, objectSize, objectModTime, false)
 				return false
 			}
@@ -159,12 +184,12 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) {
 }
 
 // Lstat returns a FileInfo describing the named file
-func (fs S3Fs) Lstat(name string) (os.FileInfo, error) {
+func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
 	return fs.Stat(name)
 }
 
 // Open opens the named file for reading
-func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
+func (fs *S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -191,7 +216,7 @@ func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt,
 }
 
 // Create creates or opens the named file for writing
-func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
+func (fs *S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
 	r, w, err := pipeat.PipeInDir(fs.localTempDir)
 	if err != nil {
 		return nil, nil, nil, err
@@ -209,11 +234,11 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), err
 			contentType = mime.TypeByExtension(path.Ext(name))
 		}
 		response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
-			Bucket:          aws.String(fs.config.Bucket),
-			Key:             aws.String(key),
-			Body:            r,
-			StorageClass:    utils.NilIfEmpty(fs.config.StorageClass),
-			ContentEncoding: utils.NilIfEmpty(contentType),
+			Bucket:       aws.String(fs.config.Bucket),
+			Key:          aws.String(key),
+			Body:         r,
+			StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
+			ContentType:  utils.NilIfEmpty(contentType),
 		}, func(u *s3manager.Uploader) {
 			u.Concurrency = fs.config.UploadConcurrency
 			u.PartSize = fs.config.UploadPartSize
@@ -237,7 +262,7 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), err
 //
 // https://github.com/aws/aws-sdk-go/pull/2653
 //
-func (fs S3Fs) Rename(source, target string) error {
+func (fs *S3Fs) Rename(source, target string) error {
 	if source == target {
 		return nil
 	}
@@ -247,11 +272,11 @@ func (fs S3Fs) Rename(source, target string) error {
 	}
 	copySource := fs.Join(fs.config.Bucket, source)
 	if fi.IsDir() {
-		contents, err := fs.ReadDir(source)
+		hasContents, err := fs.hasContents(source)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot rename non empty directory: %#v", source)
 		}
 		if !strings.HasSuffix(copySource, "/") {
@@ -261,12 +286,20 @@ func (fs S3Fs) Rename(source, target string) error {
 			target += "/"
 		}
 	}
+	var contentType string
+	if fi.IsDir() {
+		contentType = dirMimeType
+	} else {
+		contentType = mime.TypeByExtension(path.Ext(source))
+	}
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 	_, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
-		Bucket:     aws.String(fs.config.Bucket),
-		CopySource: aws.String(copySource),
-		Key:        aws.String(target),
+		Bucket:       aws.String(fs.config.Bucket),
+		CopySource:   aws.String(copySource),
+		Key:          aws.String(target),
+		StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
+		ContentType:  utils.NilIfEmpty(contentType),
 	})
 	metrics.S3CopyObjectCompleted(err)
 	if err != nil {
@@ -276,13 +309,13 @@ func (fs S3Fs) Rename(source, target string) error {
 }
 
 // Remove removes the named file or (empty) directory.
-func (fs S3Fs) Remove(name string, isDir bool) error {
+func (fs *S3Fs) Remove(name string, isDir bool) error {
 	if isDir {
-		contents, err := fs.ReadDir(name)
+		hasContents, err := fs.hasContents(name)
 		if err != nil {
 			return err
 		}
-		if len(contents) > 0 {
+		if hasContents {
 			return fmt.Errorf("Cannot remove non empty directory: %#v", name)
 		}
 		if !strings.HasSuffix(name, "/") {
@@ -300,7 +333,7 @@ func (fs S3Fs) Remove(name string, isDir bool) error {
 }
 
 // Mkdir creates a new directory with the specified name and default permissions
-func (fs S3Fs) Mkdir(name string) error {
+func (fs *S3Fs) Mkdir(name string) error {
 	_, err := fs.Stat(name)
 	if !fs.IsNotExist(err) {
 		return err
@@ -316,43 +349,43 @@ func (fs S3Fs) Mkdir(name string) error {
 }
 
 // Symlink creates source as a symbolic link to target.
-func (S3Fs) Symlink(source, target string) error {
+func (*S3Fs) Symlink(source, target string) error {
 	return errors.New("403 symlinks are not supported")
 }
 
 // Readlink returns the destination of the named symbolic link
-func (S3Fs) Readlink(name string) (string, error) {
+func (*S3Fs) Readlink(name string) (string, error) {
 	return "", errors.New("403 readlink is not supported")
 }
 
 // Chown changes the numeric uid and gid of the named file.
 // Silently ignored.
-func (S3Fs) Chown(name string, uid int, gid int) error {
+func (*S3Fs) Chown(name string, uid int, gid int) error {
 	return nil
 }
 
 // Chmod changes the mode of the named file to mode.
 // Silently ignored.
-func (S3Fs) Chmod(name string, mode os.FileMode) error {
+func (*S3Fs) Chmod(name string, mode os.FileMode) error {
 	return nil
 }
 
 // Chtimes changes the access and modification times of the named file.
 // Silently ignored.
-func (S3Fs) Chtimes(name string, atime, mtime time.Time) error {
+func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
 	return errors.New("403 chtimes is not supported")
 }
 
 // Truncate changes the size of the named file.
 // Truncate by path is not supported, while truncating an opened
 // file is handled inside base transfer
-func (S3Fs) Truncate(name string, size int64) error {
+func (*S3Fs) Truncate(name string, size int64) error {
 	return errors.New("403 truncate is not supported")
 }
 
 // ReadDir reads the directory named by dirname and returns
 // a list of directory entries.
-func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
+func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
 	var result []os.FileInfo
 	// dirname must be already cleaned
 	prefix := ""
@@ -362,6 +395,9 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
 			prefix += "/"
 		}
 	}
+
+	prefixes := make(map[string]bool)
+
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 	err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
@@ -370,17 +406,31 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
 		Delimiter: aws.String("/"),
 	}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
 		for _, p := range page.CommonPrefixes {
-			name, isDir := fs.resolve(p.Prefix, prefix)
-			result = append(result, NewFileInfo(name, isDir, 0, time.Now(), false))
+			// prefixes have a trailing slash
+			name, _ := fs.resolve(p.Prefix, prefix)
+			if name == "" {
+				continue
+			}
+			if _, ok := prefixes[name]; ok {
+				continue
+			}
+			result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
+			prefixes[name] = true
 		}
 		for _, fileObject := range page.Contents {
 			objectSize := *fileObject.Size
 			objectModTime := *fileObject.LastModified
 			name, isDir := fs.resolve(fileObject.Key, prefix)
-			if len(name) == 0 {
+			if name == "" {
 				continue
 			}
-			result = append(result, NewFileInfo(name, isDir, objectSize, objectModTime, false))
+			if isDir {
+				if _, ok := prefixes[name]; ok {
+					continue
+				}
+				prefixes[name] = true
+			}
+			result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
 		}
 		return true
 	})
@@ -390,20 +440,20 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
 
 // IsUploadResumeSupported returns true if upload resume is supported.
 // SFTP Resume is not supported on S3
-func (S3Fs) IsUploadResumeSupported() bool {
+func (*S3Fs) IsUploadResumeSupported() bool {
 	return false
 }
 
 // IsAtomicUploadSupported returns true if atomic upload is supported.
 // S3 uploads are already atomic, we don't need to upload to a temporary
 // file
-func (S3Fs) IsAtomicUploadSupported() bool {
+func (*S3Fs) IsAtomicUploadSupported() bool {
 	return false
 }
 
 // IsNotExist returns a boolean indicating whether the error is known to
 // report that a file or directory does not exist
-func (S3Fs) IsNotExist(err error) bool {
+func (*S3Fs) IsNotExist(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -428,7 +478,7 @@ func (S3Fs) IsNotExist(err error) bool {
 
 // IsPermission returns a boolean indicating whether the error is known to
 // report that permission is denied.
-func (S3Fs) IsPermission(err error) bool {
+func (*S3Fs) IsPermission(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -436,7 +486,7 @@ func (S3Fs) IsPermission(err error) bool {
 }
 
 // CheckRootPath creates the specified local root directory if it does not exists
-func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool {
+func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
 	// we need a local directory for temporary files
 	osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
 	return osFs.CheckRootPath(username, uid, gid)
@@ -444,7 +494,7 @@ func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool {
 
 // ScanRootDirContents returns the number of files contained in the bucket,
 // and their size
-func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
+func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
 	numFiles := 0
 	size := int64(0)
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
@@ -469,19 +519,19 @@ func (fs S3Fs) ScanRootDirContents() (int, int64, error) {
 
 // GetDirSize returns the number of files and the size for a folder
 // including any subfolders
-func (S3Fs) GetDirSize(dirname string) (int, int64, error) {
+func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
 	return 0, 0, errUnsupported
 }
 
 // GetAtomicUploadPath returns the path to use for an atomic upload.
 // S3 uploads are already atomic, we never call this method for S3
-func (S3Fs) GetAtomicUploadPath(name string) string {
+func (*S3Fs) GetAtomicUploadPath(name string) string {
 	return ""
 }
 
 // GetRelativePath returns the path for a file relative to the user's home dir.
 // This is the path as seen by SFTPGo users
-func (fs S3Fs) GetRelativePath(name string) string {
+func (fs *S3Fs) GetRelativePath(name string) string {
 	rel := path.Clean(name)
 	if rel == "." {
 		rel = ""
@@ -489,7 +539,7 @@ func (fs S3Fs) GetRelativePath(name string) string {
 	if !strings.HasPrefix(rel, "/") {
 		return "/" + rel
 	}
-	if len(fs.config.KeyPrefix) > 0 {
+	if fs.config.KeyPrefix != "" {
 		if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
 			rel = "/"
 		}
@@ -500,7 +550,7 @@ func (fs S3Fs) GetRelativePath(name string) string {
 
 // Walk walks the file tree rooted at root, calling walkFn for each file or
 // directory in the tree, including root. The result are unordered
-func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
+func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
 	prefix := ""
 	if root != "/" && root != "." {
 		prefix = strings.TrimPrefix(root, "/")
@@ -519,7 +569,7 @@ func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
 			objectModTime := *fileObject.LastModified
 			isDir := strings.HasSuffix(*fileObject.Key, "/")
 			name := path.Clean(*fileObject.Key)
-			if len(name) == 0 {
+			if name == "/" || name == "." {
 				continue
 			}
 			err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
@@ -536,17 +586,17 @@ func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
 }
 
 // Join joins any number of path elements into a single path
-func (S3Fs) Join(elem ...string) string {
+func (*S3Fs) Join(elem ...string) string {
 	return path.Join(elem...)
 }
 
 // HasVirtualFolders returns true if folders are emulated
-func (S3Fs) HasVirtualFolders() bool {
+func (*S3Fs) HasVirtualFolders() bool {
 	return true
 }
 
 // ResolvePath returns the matching filesystem path for the specified virtual path
-func (fs S3Fs) ResolvePath(virtualPath string) (string, error) {
+func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
 	if !path.IsAbs(virtualPath) {
 		virtualPath = path.Clean("/" + virtualPath)
 	}
@@ -590,8 +640,30 @@ func (fs *S3Fs) checkIfBucketExists() error {
 	return err
 }
 
-// GetMimeType implements MimeTyper interface
-func (fs S3Fs) GetMimeType(name string) (string, error) {
+func (fs *S3Fs) hasContents(name string) (bool, error) {
+	prefix := ""
+	if name != "/" && name != "." {
+		prefix = strings.TrimPrefix(name, "/")
+		if !strings.HasSuffix(prefix, "/") {
+			prefix += "/"
+		}
+	}
+	maxResults := int64(1)
+	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
+	defer cancelFn()
+	results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
+		Bucket:  aws.String(fs.config.Bucket),
+		Prefix:  aws.String(prefix),
+		MaxKeys: &maxResults,
+	})
+	metrics.S3ListObjectsCompleted(err)
+	if err != nil {
+		return false, err
+	}
+	return len(results.Contents) > 0, nil
+}
+
+func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
 	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
 	defer cancelFn()
 	obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
@@ -599,6 +671,12 @@ func (fs S3Fs) GetMimeType(name string) (string, error) {
 		Key:    aws.String(name),
 	})
 	metrics.S3HeadObjectCompleted(err)
+	return obj, err
+}
+
+// GetMimeType implements MimeTyper interface
+func (fs *S3Fs) GetMimeType(name string) (string, error) {
+	obj, err := fs.headObject(name)
 	if err != nil {
 		return "", err
 	}