|
@@ -55,6 +55,7 @@ import (
|
|
|
const (
|
|
|
azureDefaultEndpoint = "blob.core.windows.net"
|
|
|
azBlobFsName = "AzureBlobFs"
|
|
|
+ azFolderKey = "hdi_isfolder"
|
|
|
)
|
|
|
|
|
|
// AzureBlobFs is a Fs implementation for Azure Blob storage.
|
|
@@ -182,6 +183,14 @@ func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
|
|
|
if err == nil {
|
|
|
contentType := util.GetStringFromPointer(attrs.ContentType)
|
|
|
isDir := contentType == dirMimeType
|
|
|
+ if !isDir {
|
|
|
+ for k, v := range attrs.Metadata {
|
|
|
+ if strings.ToLower(k) == azFolderKey {
|
|
|
+ isDir = (v == "true")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
metric.AZListObjectsCompleted(nil)
|
|
|
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir,
|
|
|
util.GetIntFromPointer(attrs.ContentLength),
|
|
@@ -217,7 +226,7 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader
|
|
|
go func() {
|
|
|
defer cancelFn()
|
|
|
|
|
|
- blockBlob := fs.containerClient.NewBlockBlobClient(name)
|
|
|
+ blockBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(name))
|
|
|
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
|
|
|
w.CloseWithError(err) //nolint:errcheck
|
|
|
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err)
|
|
@@ -238,8 +247,12 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
|
|
|
p := NewPipeWriter(w)
|
|
|
headers := blob.HTTPHeaders{}
|
|
|
var contentType string
|
|
|
+ var metadata map[string]string
|
|
|
if flag == -1 {
|
|
|
contentType = dirMimeType
|
|
|
+ metadata = map[string]string{
|
|
|
+ azFolderKey: "true",
|
|
|
+ }
|
|
|
} else {
|
|
|
contentType = mime.TypeByExtension(path.Ext(name))
|
|
|
}
|
|
@@ -250,8 +263,8 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
|
|
|
go func() {
|
|
|
defer cancelFn()
|
|
|
|
|
|
- blockBlob := fs.containerClient.NewBlockBlobClient(name)
|
|
|
- err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
|
|
|
+ blockBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(name))
|
|
|
+ err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers, metadata)
|
|
|
r.CloseWithError(err) //nolint:errcheck
|
|
|
p.Done(err)
|
|
|
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %+v", name, r.GetReadedBytes(), err)
|
|
@@ -282,43 +295,47 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
|
|
|
if hasContents {
|
|
|
return fmt.Errorf("cannot rename non empty directory: %#v", source)
|
|
|
}
|
|
|
- }
|
|
|
- ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
|
|
- defer cancelFn()
|
|
|
+ if err := fs.mkdirInternal(target); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
|
|
+ defer cancelFn()
|
|
|
|
|
|
- srcBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(source))
|
|
|
- dstBlob := fs.containerClient.NewBlockBlobClient(target)
|
|
|
- resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
|
|
|
- if err != nil {
|
|
|
- metric.AZCopyObjectCompleted(err)
|
|
|
- return err
|
|
|
- }
|
|
|
- copyStatus := blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
|
|
|
- nErrors := 0
|
|
|
- for copyStatus == blob.CopyStatusTypePending {
|
|
|
- // Poll until the copy is complete.
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
- resp, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
|
|
|
+ srcBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(source))
|
|
|
+ dstBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(target))
|
|
|
+ resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
|
|
|
if err != nil {
|
|
|
- // A GetProperties failure may be transient, so allow a couple
|
|
|
- // of them before giving up.
|
|
|
- nErrors++
|
|
|
- if ctx.Err() != nil || nErrors == 3 {
|
|
|
- metric.AZCopyObjectCompleted(err)
|
|
|
- return err
|
|
|
+ metric.AZCopyObjectCompleted(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ copyStatus := blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
|
|
|
+ nErrors := 0
|
|
|
+ for copyStatus == blob.CopyStatusTypePending {
|
|
|
+ // Poll until the copy is complete.
|
|
|
+ time.Sleep(500 * time.Millisecond)
|
|
|
+ resp, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
|
|
|
+ if err != nil {
|
|
|
+ // A GetProperties failure may be transient, so allow a couple
|
|
|
+ // of them before giving up.
|
|
|
+ nErrors++
|
|
|
+ if ctx.Err() != nil || nErrors == 3 {
|
|
|
+ metric.AZCopyObjectCompleted(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ copyStatus = blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
|
|
|
}
|
|
|
- } else {
|
|
|
- copyStatus = blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
|
|
|
}
|
|
|
- }
|
|
|
- if copyStatus != blob.CopyStatusTypeSuccess {
|
|
|
- err := fmt.Errorf("copy failed with status: %s", copyStatus)
|
|
|
- metric.AZCopyObjectCompleted(err)
|
|
|
- return err
|
|
|
- }
|
|
|
+ if copyStatus != blob.CopyStatusTypeSuccess {
|
|
|
+ err := fmt.Errorf("copy failed with status: %s", copyStatus)
|
|
|
+ metric.AZCopyObjectCompleted(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- metric.AZCopyObjectCompleted(nil)
|
|
|
- fs.preserveModificationTime(source, target, fi)
|
|
|
+ metric.AZCopyObjectCompleted(nil)
|
|
|
+ fs.preserveModificationTime(source, target, fi)
|
|
|
+ }
|
|
|
return fs.Remove(source, fi.IsDir())
|
|
|
}
|
|
|
|
|
@@ -337,11 +354,22 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
|
|
|
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
|
|
defer cancelFn()
|
|
|
|
|
|
- blobBlock := fs.containerClient.NewBlockBlobClient(name)
|
|
|
- deletSnapshots := blob.DeleteSnapshotsOptionTypeInclude
|
|
|
+ blobBlock := fs.containerClient.NewBlockBlobClient(url.PathEscape(name))
|
|
|
+ var deletSnapshots blob.DeleteSnapshotsOptionType
|
|
|
+ if !isDir {
|
|
|
+ deletSnapshots = blob.DeleteSnapshotsOptionTypeInclude
|
|
|
+ }
|
|
|
_, err := blobBlock.Delete(ctx, &blob.DeleteOptions{
|
|
|
DeleteSnapshots: &deletSnapshots,
|
|
|
})
|
|
|
+ if err != nil && isDir {
|
|
|
+ if fs.isBadRequestError(err) {
|
|
|
+ deletSnapshots = blob.DeleteSnapshotsOptionTypeInclude
|
|
|
+ _, err = blobBlock.Delete(ctx, &blob.DeleteOptions{
|
|
|
+ DeleteSnapshots: &deletSnapshots,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
metric.AZDeleteObjectCompleted(err)
|
|
|
if plugin.Handler.HasMetadater() && err == nil && !isDir {
|
|
|
if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
|
|
@@ -357,11 +385,7 @@ func (fs *AzureBlobFs) Mkdir(name string) error {
|
|
|
if !fs.IsNotExist(err) {
|
|
|
return err
|
|
|
}
|
|
|
- _, w, _, err := fs.Create(name, -1)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return w.Close()
|
|
|
+ return fs.mkdirInternal(name)
|
|
|
}
|
|
|
|
|
|
// Symlink creates source as a symbolic link to target.
|
|
@@ -424,8 +448,10 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|
|
prefixes := make(map[string]bool)
|
|
|
|
|
|
pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
|
|
|
- Include: container.ListBlobsInclude{},
|
|
|
- Prefix: &prefix,
|
|
|
+ Include: container.ListBlobsInclude{
|
|
|
+ //Metadata: true,
|
|
|
+ },
|
|
|
+ Prefix: &prefix,
|
|
|
})
|
|
|
|
|
|
for pager.More() {
|
|
@@ -462,7 +488,7 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|
|
size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
|
|
|
modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
|
|
|
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
|
|
|
- isDir = (contentType == dirMimeType)
|
|
|
+ isDir = checkDirectoryMarkers(contentType, blobItem.Metadata)
|
|
|
if isDir {
|
|
|
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
|
|
|
if _, ok := prefixes[name]; ok {
|
|
@@ -530,6 +556,17 @@ func (*AzureBlobFs) IsNotSupported(err error) bool {
|
|
|
return err == ErrVfsUnsupported
|
|
|
}
|
|
|
|
|
|
+func (*AzureBlobFs) isBadRequestError(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ var respErr *azcore.ResponseError
|
|
|
+ if errors.As(err, &respErr) {
|
|
|
+ return respErr.StatusCode == http.StatusBadRequest
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
// CheckRootPath creates the specified local root directory if it does not exists
|
|
|
func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
|
|
|
// we need a local directory for temporary files
|
|
@@ -544,6 +581,9 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
|
|
|
size := int64(0)
|
|
|
|
|
|
pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
|
|
+ Include: container.ListBlobsInclude{
|
|
|
+ Metadata: true,
|
|
|
+ },
|
|
|
Prefix: &fs.config.KeyPrefix,
|
|
|
})
|
|
|
|
|
@@ -559,7 +599,7 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
|
|
|
for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
|
|
|
if blobItem.Properties != nil {
|
|
|
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
|
|
|
- isDir := (contentType == dirMimeType)
|
|
|
+ isDir := checkDirectoryMarkers(contentType, blobItem.Metadata)
|
|
|
blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength)
|
|
|
if isDir && blobSize == 0 {
|
|
|
continue
|
|
@@ -585,8 +625,10 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
|
|
|
}
|
|
|
|
|
|
pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
|
|
|
- Include: container.ListBlobsInclude{},
|
|
|
- Prefix: &prefix,
|
|
|
+ Include: container.ListBlobsInclude{
|
|
|
+ //Metadata: true,
|
|
|
+ },
|
|
|
+ Prefix: &prefix,
|
|
|
})
|
|
|
|
|
|
for pager.More() {
|
|
@@ -603,7 +645,7 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
|
|
|
name = strings.TrimPrefix(name, prefix)
|
|
|
if blobItem.Properties != nil {
|
|
|
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
|
|
|
- isDir := (contentType == dirMimeType)
|
|
|
+ isDir := checkDirectoryMarkers(contentType, blobItem.Metadata)
|
|
|
if isDir {
|
|
|
continue
|
|
|
}
|
|
@@ -660,7 +702,10 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string {
|
|
|
func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
|
|
|
prefix := fs.getPrefix(root)
|
|
|
pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
|
|
- Prefix: &fs.config.KeyPrefix,
|
|
|
+ Include: container.ListBlobsInclude{
|
|
|
+ Metadata: true,
|
|
|
+ },
|
|
|
+ Prefix: &prefix,
|
|
|
})
|
|
|
|
|
|
for pager.More() {
|
|
@@ -682,7 +727,7 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
|
|
|
isDir := false
|
|
|
if blobItem.Properties != nil {
|
|
|
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
|
|
|
- isDir = (contentType == dirMimeType)
|
|
|
+ isDir = checkDirectoryMarkers(contentType, blobItem.Metadata)
|
|
|
blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength)
|
|
|
lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified)
|
|
|
}
|
|
@@ -722,7 +767,7 @@ func (fs *AzureBlobFs) headObject(name string) (blob.GetPropertiesResponse, erro
|
|
|
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
|
|
defer cancelFn()
|
|
|
|
|
|
- resp, err := fs.containerClient.NewBlockBlobClient(name).GetProperties(ctx, &blob.GetPropertiesOptions{})
|
|
|
+ resp, err := fs.containerClient.NewBlockBlobClient(url.PathEscape(name)).GetProperties(ctx, &blob.GetPropertiesOptions{})
|
|
|
|
|
|
metric.AZHeadObjectCompleted(err)
|
|
|
return resp, err
|
|
@@ -795,6 +840,14 @@ func (fs *AzureBlobFs) setConfigDefaults() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (fs *AzureBlobFs) mkdirInternal(name string) error {
|
|
|
+ _, w, _, err := fs.Create(name, -1)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return w.Close()
|
|
|
+}
|
|
|
+
|
|
|
func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
|
|
|
result := false
|
|
|
prefix := fs.getPrefix(name)
|
|
@@ -931,7 +984,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *b
|
|
|
}
|
|
|
|
|
|
func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
|
|
|
- blockBlob *blockblob.Client, httpHeaders *blob.HTTPHeaders,
|
|
|
+ blockBlob *blockblob.Client, httpHeaders *blob.HTTPHeaders, metadata map[string]string,
|
|
|
) error {
|
|
|
partSize := fs.config.UploadPartSize
|
|
|
guard := make(chan struct{}, fs.config.UploadConcurrency)
|
|
@@ -1021,6 +1074,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
|
|
|
|
|
|
commitOptions := blockblob.CommitBlockListOptions{
|
|
|
HTTPHeaders: httpHeaders,
|
|
|
+ Metadata: metadata,
|
|
|
}
|
|
|
if fs.config.AccessTier != "" {
|
|
|
commitOptions.Tier = (*blob.AccessTier)(&fs.config.AccessTier)
|
|
@@ -1083,6 +1137,18 @@ func (fs *AzureBlobFs) getStorageID() string {
|
|
|
return fmt.Sprintf("azblob://%v", fs.config.Container)
|
|
|
}
|
|
|
|
|
|
+func checkDirectoryMarkers(contentType string, metadata map[string]*string) bool {
|
|
|
+ if contentType == dirMimeType {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ for k, v := range metadata {
|
|
|
+ if strings.ToLower(k) == azFolderKey {
|
|
|
+ return util.GetStringFromPointer(v) == "true"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
func getAzContainerClientOptions() *container.ClientOptions {
|
|
|
version := version.Get()
|
|
|
return &container.ClientOptions{
|