From ba9fe38b8b775c1d5be4c64a673f40fd0848e653 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Thu, 24 Nov 2022 18:14:24 +0100 Subject: [PATCH] azblob: handle dirs metadata Signed-off-by: Nicola Murino --- go.mod | 16 ++-- go.sum | 32 +++---- internal/vfs/azblobfs.go | 178 +++++++++++++++++++++++++++------------ internal/vfs/gcsfs.go | 74 ++++++++-------- 4 files changed, 182 insertions(+), 118 deletions(-) diff --git a/go.mod b/go.mod index 9ccd1c71..5131a668 100644 --- a/go.mod +++ b/go.mod @@ -9,15 +9,15 @@ require ( github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387 github.com/aws/aws-sdk-go-v2 v1.17.1 - github.com/aws/aws-sdk-go-v2/config v1.18.2 - github.com/aws/aws-sdk-go-v2/credentials v1.13.2 + github.com/aws/aws-sdk-go-v2/config v1.18.3 + github.com/aws/aws-sdk-go-v2/credentials v1.13.3 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.41 - github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.23 - github.com/aws/aws-sdk-go-v2/service/s3 v1.29.3 - github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.7 - github.com/aws/aws-sdk-go-v2/service/sts v1.17.4 - github.com/cockroachdb/cockroach-go/v2 v2.2.18 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.42 + github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.24 + github.com/aws/aws-sdk-go-v2/service/s3 v1.29.4 + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.8 + github.com/aws/aws-sdk-go-v2/service/sts v1.17.5 + github.com/cockroachdb/cockroach-go/v2 v2.2.19 github.com/coreos/go-oidc/v3 v3.4.0 github.com/drakkan/webdav v0.0.0-20221101181759-17ed21f9337b github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 diff --git a/go.sum b/go.sum index eeed7de5..66b5474d 100644 --- a/go.sum +++ b/go.sum @@ -233,17 +233,17 @@ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3/go.mod h1:gNsR5CaXK github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9 h1:RKci2D7tMwpvGpDNZnGQw9wk6v7o/xSwFcUAuNPoB8k= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9/go.mod h1:vCmV1q1VK8eoQJ5+aYE7PkK1K6v41qJ5pJdK3ggCDvg= github.com/aws/aws-sdk-go-v2/config v1.15.15/go.mod h1:A1Lzyy/o21I5/s2FbyX5AevQfSVXpvvIDCoVFD0BC4E= -github.com/aws/aws-sdk-go-v2/config v1.18.2 h1:tRhTb3xMZsB0gW0sXWpqs9FeIP8iQp5SvnvwiPXzHwo= -github.com/aws/aws-sdk-go-v2/config v1.18.2/go.mod h1:9XVoZTdD8ICjrgI5ddb8j918q6lEZkFYpb7uohgvU6c= +github.com/aws/aws-sdk-go-v2/config v1.18.3 h1:3kfBKcX3votFX84dm00U8RGA1sCCh3eRMOGzg5dCWfU= +github.com/aws/aws-sdk-go-v2/config v1.18.3/go.mod h1:BYdrbeCse3ZnOD5+2/VE/nATOK8fEUpBtmPMdKSyhMU= github.com/aws/aws-sdk-go-v2/credentials v1.12.10/go.mod h1:g5eIM5XRs/OzIIK81QMBl+dAuDyoLN0VYaLP+tBqEOk= -github.com/aws/aws-sdk-go-v2/credentials v1.13.2 h1:F/v1w0XcFDZjL0bCdi9XWJenoPKjGbzljBhDKcryzEQ= -github.com/aws/aws-sdk-go-v2/credentials v1.13.2/go.mod h1:eAT5aj/WJ2UDIA0IVNFc2byQLeD89SDEi4cjzH/MKoQ= +github.com/aws/aws-sdk-go-v2/credentials v1.13.3 h1:ur+FHdp4NbVIv/49bUjBW+FE7e57HOo03ELodttmagk= +github.com/aws/aws-sdk-go-v2/credentials v1.13.3/go.mod h1:/rOMmqYBcFfNbRPU0iN9IgGqD5+V2yp3iWNmIlz0wI4= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.9/go.mod h1:KDCCm4ONIdHtUloDcFvK2+vshZvx4Zmj7UMDfusuz5s= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 h1:E3PXZSI3F2bzyj6XxUXdTIfvp425HHhwKsFvmzBwHgs= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19/go.mod h1:VihW95zQpeKQWVPGkwT+2+WJNQV8UXFfMTWdU6VErL8= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.21/go.mod h1:iIYPrQ2rYfZiB/iADYlhj9HHZ9TTi6PqKQPAqygohbE= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.41 h1:ssgdsNm11dvFtO7F/AeiW4dAO3eGsDeg5fwpag/JP/I= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.41/go.mod h1:CS+AbDFAaPU9TQOo7U6mVV23YvqCOElnqmh0XQjgJ1g= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.42 h1:bxgBYvvBh+W1RnNYP4ROXEB8N+HSSucDszfE7Rb+kfU= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.42/go.mod h1:LHOsygMiW/14CkFxdXxvzKyMh3jbk/QfZVaDtCbLkl8= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.15/go.mod h1:pWrr2OoHlT7M/Pd2y4HV3gJyPb3qj5qMmnPkKSNPYK4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25 h1:nBO/RFxeq/IS5G9Of+ZrgucRciie2qpLy++3UGZ+q2E= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY= @@ -269,14 +269,14 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.9/go.mod h1:Rc5+wn2 github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.19 h1:piDBAaWkaxkkVV3xJJbTehXCZRXYs49kvpi/LG6LR2o= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.19/go.mod h1:BmQWRVkLTmyNzYPFAZgon53qKLWBNSvonugD1MrSWUs= github.com/aws/aws-sdk-go-v2/service/kms v1.18.1/go.mod h1:4PZMUkc9rXHWGVB5J9vKaZy3D7Nai79ORworQ3ASMiM= -github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.23 h1:NZKyAfM6J2OPehuSZI8woNGTxYi5jMkTDWGEFwR6/FA= -github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.23/go.mod h1:mRGY+k3s1yt7yQA3AfzJhnr68OCs1xDfQfIABFUk+ek= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.24 h1:DYr+X4xrRzcthq2OLJzsiS/uSJhZ/HHxXG0yUgGZceU= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.24/go.mod h1:mRGY+k3s1yt7yQA3AfzJhnr68OCs1xDfQfIABFUk+ek= github.com/aws/aws-sdk-go-v2/service/s3 v1.27.2/go.mod h1:u+566cosFI+d+motIz3USXEh6sN8Nq4GrNXSg2RXVMo= -github.com/aws/aws-sdk-go-v2/service/s3 v1.29.3 h1:F6wgg8aHGNyhaAy2ONnWBThiPdLa386qNA0j33FIuSM= -github.com/aws/aws-sdk-go-v2/service/s3 v1.29.3/go.mod h1:/NHbqPRiwxSPVOB2Xr+StDEH+GWV/64WwnUjv4KYzV0= +github.com/aws/aws-sdk-go-v2/service/s3 v1.29.4 h1:QgmmWifaYZZcpaw3y1+ccRlgH6jAvLm4K/MBGUc7cNM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.29.4/go.mod h1:/NHbqPRiwxSPVOB2Xr+StDEH+GWV/64WwnUjv4KYzV0= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.15.14/go.mod h1:xakbH8KMsQQKqzX87uyyzTHshc/0/Df8bsTneTS5pFU= -github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.7 h1:bfC2Q8ABNbYYm9mh3NfPy5kvnWOPtiqS018NBGDwPl8= -github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.7/go.mod h1:k6CPuxyzO247nYEM1baEwHH1kRtosRCvgahAepaaShw= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.8 h1:Zw48FHykP40fKMxPmagkuzklpEuDPLhvUjKP8Ygrds0= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.8/go.mod h1:k6CPuxyzO247nYEM1baEwHH1kRtosRCvgahAepaaShw= github.com/aws/aws-sdk-go-v2/service/sns v1.17.10/go.mod h1:uITsRNVMeCB3MkWpXxXw0eDz8pW4TYLzj+eyQtbhSxM= github.com/aws/aws-sdk-go-v2/service/sqs v1.19.1/go.mod h1:A94o564Gj+Yn+7QO1eLFeI7UVv3riy/YBFOfICVqFvU= github.com/aws/aws-sdk-go-v2/service/ssm v1.27.6/go.mod h1:fiFzQgj4xNOg4/wqmAiPvzgDMXPD+cUEplX/CYn+0j0= @@ -286,8 +286,8 @@ github.com/aws/aws-sdk-go-v2/service/sso v1.11.25/go.mod h1:IARHuzTXmj1C0KS35vbo github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 h1:jcw6kKZrtNfBPJkaHrscDOZoe5gvi9wjudnxvozYFJo= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8/go.mod h1:er2JHN+kBY6FcMfcBBKNGCT3CarImmdFzishsqBmSRI= github.com/aws/aws-sdk-go-v2/service/sts v1.16.10/go.mod h1:cftkHYN6tCDNfkSasAmclSfl4l7cySoay8vz7p/ce0E= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.4 h1:YNncBj5dVYd05i4ZQ+YicOotSXo0ufc9P8kTioi13EM= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.4/go.mod h1:bXcN3koeVYiJcdDU89n3kCYILob7Y34AeLopUbZgLT4= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.5 h1:60SJ4lhvn///8ygCzYy2l53bFW/Q15bVfyjyAWo6zuw= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.5/go.mod h1:bXcN3koeVYiJcdDU89n3kCYILob7Y34AeLopUbZgLT4= github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.13.4 h1:/RN2z1txIJWeXeOkzX+Hk/4Uuvv7dWtCjbmVJcrskyk= github.com/aws/smithy-go v1.13.4/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= @@ -359,8 +359,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/cockroachdb/cockroach-go/v2 v2.2.18 h1:bRNZWqzRSZesVYFjDAl1jgFb1jhIFIEreWIC2kPbcdY= -github.com/cockroachdb/cockroach-go/v2 v2.2.18/go.mod h1:mzlIDDBALQfEjv/7DU12fb2AfQ/MUYTlychcMpWp9QI= +github.com/cockroachdb/cockroach-go/v2 v2.2.19 h1:YIHyz17jZumBeXPuoZKq/0nrITsqDoDD8/KQt3/xiyc= +github.com/cockroachdb/cockroach-go/v2 v2.2.19/go.mod h1:mzlIDDBALQfEjv/7DU12fb2AfQ/MUYTlychcMpWp9QI= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index e17d2e8e..5adfc09b 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -55,6 +55,7 @@ import ( const ( azureDefaultEndpoint = "blob.core.windows.net" azBlobFsName = "AzureBlobFs" + azFolderKey = "hdi_isfolder" ) // AzureBlobFs is a Fs implementation for Azure Blob storage. @@ -182,6 +183,14 @@ func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) { if err == nil { contentType := util.GetStringFromPointer(attrs.ContentType) isDir := contentType == dirMimeType + if !isDir { + for k, v := range attrs.Metadata { + if strings.ToLower(k) == azFolderKey { + isDir = (v == "true") + break + } + } + } metric.AZListObjectsCompleted(nil) return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, util.GetIntFromPointer(attrs.ContentLength), @@ -217,7 +226,7 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader go func() { defer cancelFn() - blockBlob := fs.containerClient.NewBlockBlobClient(name) + blockBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(name)) err := fs.handleMultipartDownload(ctx, blockBlob, offset, w) w.CloseWithError(err) //nolint:errcheck fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err) @@ -238,8 +247,12 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(), p := NewPipeWriter(w) headers := blob.HTTPHeaders{} var contentType string + var metadata map[string]string if flag == -1 { contentType = dirMimeType + metadata = map[string]string{ + azFolderKey: "true", + } } else { contentType = mime.TypeByExtension(path.Ext(name)) } @@ -250,8 +263,8 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(), go func() { defer cancelFn() - blockBlob := fs.containerClient.NewBlockBlobClient(name) - err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers) + blockBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(name)) + err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers, metadata) r.CloseWithError(err) //nolint:errcheck p.Done(err) fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %+v", name, r.GetReadedBytes(), err) @@ -282,43 +295,47 @@ func (fs *AzureBlobFs) Rename(source, target string) error { if hasContents { return fmt.Errorf("cannot rename non empty directory: %#v", source) } - } - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) - defer cancelFn() - - srcBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(source)) - dstBlob := fs.containerClient.NewBlockBlobClient(target) - resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions()) - if err != nil { - metric.AZCopyObjectCompleted(err) - return err - } - copyStatus := blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) - nErrors := 0 - for copyStatus == blob.CopyStatusTypePending { - // Poll until the copy is complete. - time.Sleep(500 * time.Millisecond) - resp, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{}) - if err != nil { - // A GetProperties failure may be transient, so allow a couple - // of them before giving up. - nErrors++ - if ctx.Err() != nil || nErrors == 3 { - metric.AZCopyObjectCompleted(err) - return err - } - } else { - copyStatus = blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) + if err := fs.mkdirInternal(target); err != nil { + return err } - } - if copyStatus != blob.CopyStatusTypeSuccess { - err := fmt.Errorf("copy failed with status: %s", copyStatus) - metric.AZCopyObjectCompleted(err) - return err - } + } else { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) + defer cancelFn() - metric.AZCopyObjectCompleted(nil) - fs.preserveModificationTime(source, target, fi) + srcBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(source)) + dstBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(target)) + resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions()) + if err != nil { + metric.AZCopyObjectCompleted(err) + return err + } + copyStatus := blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) + nErrors := 0 + for copyStatus == blob.CopyStatusTypePending { + // Poll until the copy is complete. + time.Sleep(500 * time.Millisecond) + resp, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{}) + if err != nil { + // A GetProperties failure may be transient, so allow a couple + // of them before giving up. + nErrors++ + if ctx.Err() != nil || nErrors == 3 { + metric.AZCopyObjectCompleted(err) + return err + } + } else { + copyStatus = blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) + } + } + if copyStatus != blob.CopyStatusTypeSuccess { + err := fmt.Errorf("copy failed with status: %s", copyStatus) + metric.AZCopyObjectCompleted(err) + return err + } + + metric.AZCopyObjectCompleted(nil) + fs.preserveModificationTime(source, target, fi) + } return fs.Remove(source, fi.IsDir()) } @@ -337,11 +354,22 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error { ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() - blobBlock := fs.containerClient.NewBlockBlobClient(name) - deletSnapshots := blob.DeleteSnapshotsOptionTypeInclude + blobBlock := fs.containerClient.NewBlockBlobClient(url.PathEscape(name)) + var deletSnapshots blob.DeleteSnapshotsOptionType + if !isDir { + deletSnapshots = blob.DeleteSnapshotsOptionTypeInclude + } _, err := blobBlock.Delete(ctx, &blob.DeleteOptions{ DeleteSnapshots: &deletSnapshots, }) + if err != nil && isDir { + if fs.isBadRequestError(err) { + deletSnapshots = blob.DeleteSnapshotsOptionTypeInclude + _, err = blobBlock.Delete(ctx, &blob.DeleteOptions{ + DeleteSnapshots: &deletSnapshots, + }) + } + } metric.AZDeleteObjectCompleted(err) if plugin.Handler.HasMetadater() && err == nil && !isDir { if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil { @@ -357,11 +385,7 @@ func (fs *AzureBlobFs) Mkdir(name string) error { if !fs.IsNotExist(err) { return err } - _, w, _, err := fs.Create(name, -1) - if err != nil { - return err - } - return w.Close() + return fs.mkdirInternal(name) } // Symlink creates source as a symbolic link to target. @@ -424,8 +448,10 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { prefixes := make(map[string]bool) pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ - Include: container.ListBlobsInclude{}, - Prefix: &prefix, + Include: container.ListBlobsInclude{ + //Metadata: true, + }, + Prefix: &prefix, }) for pager.More() { @@ -462,7 +488,7 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { size = util.GetIntFromPointer(blobItem.Properties.ContentLength) modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified) contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) - isDir = (contentType == dirMimeType) + isDir = checkDirectoryMarkers(contentType, blobItem.Metadata) if isDir { // check if the dir is already included, it will be sent as blob prefix if it contains at least one item if _, ok := prefixes[name]; ok { @@ -530,6 +556,17 @@ func (*AzureBlobFs) IsNotSupported(err error) bool { return err == ErrVfsUnsupported } +func (*AzureBlobFs) isBadRequestError(err error) bool { + if err == nil { + return false + } + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + return respErr.StatusCode == http.StatusBadRequest + } + return false +} + // CheckRootPath creates the specified local root directory if it does not exists func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool { // we need a local directory for temporary files @@ -544,6 +581,9 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) { size := int64(0) pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Include: container.ListBlobsInclude{ + Metadata: true, + }, Prefix: &fs.config.KeyPrefix, }) @@ -559,7 +599,7 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) { for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems { if blobItem.Properties != nil { contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) - isDir := (contentType == dirMimeType) + isDir := checkDirectoryMarkers(contentType, blobItem.Metadata) blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength) if isDir && blobSize == 0 { continue @@ -585,8 +625,10 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e } pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ - Include: container.ListBlobsInclude{}, - Prefix: &prefix, + Include: container.ListBlobsInclude{ + //Metadata: true, + }, + Prefix: &prefix, }) for pager.More() { @@ -603,7 +645,7 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e name = strings.TrimPrefix(name, prefix) if blobItem.Properties != nil { contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) - isDir := (contentType == dirMimeType) + isDir := checkDirectoryMarkers(contentType, blobItem.Metadata) if isDir { continue } @@ -660,7 +702,10 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string { func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { prefix := fs.getPrefix(root) pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ - Prefix: &fs.config.KeyPrefix, + Include: container.ListBlobsInclude{ + Metadata: true, + }, + Prefix: &prefix, }) for pager.More() { @@ -682,7 +727,7 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { isDir := false if blobItem.Properties != nil { contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) - isDir = (contentType == dirMimeType) + isDir = checkDirectoryMarkers(contentType, blobItem.Metadata) blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength) lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified) } @@ -722,7 +767,7 @@ func (fs *AzureBlobFs) headObject(name string) (blob.GetPropertiesResponse, erro ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() - resp, err := fs.containerClient.NewBlockBlobClient(name).GetProperties(ctx, &blob.GetPropertiesOptions{}) + resp, err := fs.containerClient.NewBlockBlobClient(url.PathEscape(name)).GetProperties(ctx, &blob.GetPropertiesOptions{}) metric.AZHeadObjectCompleted(err) return resp, err @@ -795,6 +840,14 @@ func (fs *AzureBlobFs) setConfigDefaults() { } } +func (fs *AzureBlobFs) mkdirInternal(name string) error { + _, w, _, err := fs.Create(name, -1) + if err != nil { + return err + } + return w.Close() +} + func (fs *AzureBlobFs) hasContents(name string) (bool, error) { result := false prefix := fs.getPrefix(name) @@ -931,7 +984,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *b } func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, - blockBlob *blockblob.Client, httpHeaders *blob.HTTPHeaders, + blockBlob *blockblob.Client, httpHeaders *blob.HTTPHeaders, metadata map[string]string, ) error { partSize := fs.config.UploadPartSize guard := make(chan struct{}, fs.config.UploadConcurrency) @@ -1021,6 +1074,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read commitOptions := blockblob.CommitBlockListOptions{ HTTPHeaders: httpHeaders, + Metadata: metadata, } if fs.config.AccessTier != "" { commitOptions.Tier = (*blob.AccessTier)(&fs.config.AccessTier) @@ -1083,6 +1137,18 @@ func (fs *AzureBlobFs) getStorageID() string { return fmt.Sprintf("azblob://%v", fs.config.Container) } +func checkDirectoryMarkers(contentType string, metadata map[string]*string) bool { + if contentType == dirMimeType { + return true + } + for k, v := range metadata { + if strings.ToLower(k) == azFolderKey { + return util.GetStringFromPointer(v) == "true" + } + } + return false +} + func getAzContainerClientOptions() *container.ClientOptions { version := version.Get() return &container.ClientOptions{ diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index fe4f6c4c..3508b82b 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -226,38 +226,32 @@ func (fs *GCSFs) Rename(source, target string) error { if hasContents { return fmt.Errorf("cannot rename non empty directory: %#v", source) } - if !strings.HasSuffix(target, "/") { - target += "/" + if err := fs.mkdirInternal(target); err != nil { + return err } - } - src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName) - dst := fs.svc.Bucket(fs.config.Bucket).Object(target) - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) - defer cancelFn() - - copier := dst.CopierFrom(src) - if fs.config.StorageClass != "" { - copier.StorageClass = fs.config.StorageClass - } - if fs.config.ACL != "" { - copier.PredefinedACL = fs.config.ACL - } - var contentType string - if fi.IsDir() { - contentType = dirMimeType } else { - contentType = mime.TypeByExtension(path.Ext(source)) - } - if contentType != "" { - copier.ContentType = contentType - } - _, err = copier.Run(ctx) - metric.GCSCopyObjectCompleted(err) - if err != nil { - return err - } - if plugin.Handler.HasMetadater() { - if !fi.IsDir() { + src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName) + dst := fs.svc.Bucket(fs.config.Bucket).Object(target) + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() + + copier := dst.CopierFrom(src) + if fs.config.StorageClass != "" { + copier.StorageClass = fs.config.StorageClass + } + if fs.config.ACL != "" { + copier.PredefinedACL = fs.config.ACL + } + contentType := mime.TypeByExtension(path.Ext(source)) + if contentType != "" { + copier.ContentType = contentType + } + _, err = copier.Run(ctx) + metric.GCSCopyObjectCompleted(err) + if err != nil { + return err + } + if plugin.Handler.HasMetadater() { err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target), util.GetTimeAsMsSinceEpoch(fi.ModTime())) if err != nil { @@ -306,14 +300,7 @@ func (fs *GCSFs) Mkdir(name string) error { if !fs.IsNotExist(err) { return err } - if !strings.HasSuffix(name, "/") { - name += "/" - } - _, w, _, err := fs.Create(name, -1) - if err != nil { - return err - } - return w.Close() + return fs.mkdirInternal(name) } // Symlink creates source as a symbolic link to target. @@ -762,6 +749,17 @@ func (fs *GCSFs) getObjectStat(name string) (string, os.FileInfo, error) { return name + "/", info, err } +func (fs *GCSFs) mkdirInternal(name string) error { + if !strings.HasSuffix(name, "/") { + name += "/" + } + _, w, _, err := fs.Create(name, -1) + if err != nil { + return err + } + return w.Close() +} + func (fs *GCSFs) hasContents(name string) (bool, error) { result := false prefix := fs.getPrefix(name)