refactor metadata support

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2024-02-17 19:49:34 +01:00
parent e2ff12c589
commit db0a467d33
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
4 changed files with 106 additions and 17 deletions

View file

@ -500,7 +500,7 @@ func (t *BaseTransfer) getUploadedFiles() int {
func (t *BaseTransfer) updateTimes() { func (t *BaseTransfer) updateTimes() {
if !t.aTime.IsZero() && !t.mTime.IsZero() { if !t.aTime.IsZero() && !t.mTime.IsZero() {
err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true) err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, false)
t.Connection.Log(logger.LevelDebug, "set times for file %q, atime: %v, mtime: %v, err: %v", t.Connection.Log(logger.LevelDebug, "set times for file %q, atime: %v, mtime: %v, err: %v",
t.fsPath, t.aTime, t.mTime, err) t.fsPath, t.aTime, t.mTime, err)
} }

View file

@ -29,6 +29,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -37,6 +38,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
@ -180,8 +182,11 @@ func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
if err == nil { if err == nil {
contentType := util.GetStringFromPointer(attrs.ContentType) contentType := util.GetStringFromPointer(attrs.ContentType)
isDir := checkDirectoryMarkers(contentType, attrs.Metadata) isDir := checkDirectoryMarkers(contentType, attrs.Metadata)
metric.AZListObjectsCompleted(nil) lastModified := util.GetTimeFromPointer(attrs.LastModified)
return NewFileInfo(name, isDir, util.GetIntFromPointer(attrs.ContentLength), util.GetTimeFromPointer(attrs.LastModified), false), nil if val := getAzureLastModified(attrs.Metadata); val > 0 {
lastModified = util.GetTimeFromMsecSinceEpoch(val)
}
return NewFileInfo(name, isDir, util.GetIntFromPointer(attrs.ContentLength), lastModified, false), nil
} }
if !fs.IsNotExist(err) { if !fs.IsNotExist(err) {
return nil, err return nil, err
@ -377,8 +382,25 @@ func (*AzureBlobFs) Chmod(_ string, _ os.FileMode) error {
} }
// Chtimes changes the access and modification times of the named file. // Chtimes changes the access and modification times of the named file.
func (fs *AzureBlobFs) Chtimes(_ string, _, _ time.Time, _ bool) error { func (fs *AzureBlobFs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
return ErrVfsUnsupported if isUploading {
return nil
}
props, err := fs.headObject(name)
if err != nil {
return err
}
metadata := props.Metadata
if metadata == nil {
metadata = make(map[string]*string)
}
metadata[lastModifiedField] = to.Ptr(strconv.FormatInt(mtime.UnixMilli(), 10))
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
_, err = fs.containerClient.NewBlockBlobClient(name).SetMetadata(ctx, metadata, &blob.SetMetadataOptions{})
return err
} }
// Truncate changes the size of the named file. // Truncate changes the size of the named file.
@ -395,7 +417,7 @@ func (fs *AzureBlobFs) ReadDir(dirname string) (DirLister, error) {
prefix := fs.getPrefix(dirname) prefix := fs.getPrefix(dirname)
pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Include: container.ListBlobsInclude{ Include: container.ListBlobsInclude{
//Metadata: true, Metadata: true,
}, },
Prefix: &prefix, Prefix: &prefix,
MaxResults: &azureBlobDefaultPageSize, MaxResults: &azureBlobDefaultPageSize,
@ -1089,7 +1111,7 @@ func checkDirectoryMarkers(contentType string, metadata map[string]*string) bool
} }
for k, v := range metadata { for k, v := range metadata {
if strings.ToLower(k) == azFolderKey { if strings.ToLower(k) == azFolderKey {
return util.GetStringFromPointer(v) == "true" return strings.ToLower(util.GetStringFromPointer(v)) == "true"
} }
} }
return false return false
@ -1231,6 +1253,9 @@ func (l *azureBlobDirLister) Next(limit int) ([]os.FileInfo, error) {
} }
l.prefixes[name] = true l.prefixes[name] = true
} }
if val := getAzureLastModified(blobItem.Metadata); val > 0 {
modTime = util.GetTimeFromMsecSinceEpoch(val)
}
} }
l.cache = append(l.cache, NewFileInfo(name, isDir, size, modTime, false)) l.cache = append(l.cache, NewFileInfo(name, isDir, size, modTime, false))
} }

View file

@ -26,6 +26,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"time" "time"
@ -39,6 +40,7 @@ import (
"github.com/drakkan/sftpgo/v2/internal/logger" "github.com/drakkan/sftpgo/v2/internal/logger"
"github.com/drakkan/sftpgo/v2/internal/metric" "github.com/drakkan/sftpgo/v2/internal/metric"
"github.com/drakkan/sftpgo/v2/internal/util"
"github.com/drakkan/sftpgo/v2/internal/version" "github.com/drakkan/sftpgo/v2/internal/version"
) )
@ -47,7 +49,7 @@ const (
) )
var ( var (
gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"} gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType", "Metadata"}
) )
// GCSFs is a Fs implementation for Google Cloud Storage. // GCSFs is a Fs implementation for Google Cloud Storage.
@ -335,8 +337,32 @@ func (*GCSFs) Chmod(_ string, _ os.FileMode) error {
} }
// Chtimes changes the access and modification times of the named file. // Chtimes changes the access and modification times of the named file.
func (fs *GCSFs) Chtimes(_ string, _, _ time.Time, _ bool) error { func (fs *GCSFs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
return ErrVfsUnsupported if isUploading {
return nil
}
obj := fs.svc.Bucket(fs.config.Bucket).Object(name)
attrs, err := fs.headObject(name)
if err != nil {
return err
}
obj = obj.If(storage.Conditions{MetagenerationMatch: attrs.Metageneration})
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
metadata := attrs.Metadata
if metadata == nil {
metadata = make(map[string]string)
}
metadata[lastModifiedField] = strconv.FormatInt(mtime.UnixMilli(), 10)
objectAttrsToUpdate := storage.ObjectAttrsToUpdate{
Metadata: metadata,
}
_, err = obj.Update(ctx, objectAttrsToUpdate)
return err
} }
// Truncate changes the size of the named file. // Truncate changes the size of the named file.
@ -615,6 +641,9 @@ func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
if err == nil { if err == nil {
objSize := attrs.Size objSize := attrs.Size
objectModTime := attrs.Updated objectModTime := attrs.Updated
if val := getLastModified(attrs.Metadata); val > 0 {
objectModTime = util.GetTimeFromMsecSinceEpoch(val)
}
isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/") isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
return NewFileInfo(name, isDir, objSize, objectModTime, false), nil return NewFileInfo(name, isDir, objSize, objectModTime, false), nil
} }
@ -634,7 +663,11 @@ func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewFileInfo(name, true, attrs.Size, attrs.Updated, false), nil objectModTime := attrs.Updated
if val := getLastModified(attrs.Metadata); val > 0 {
objectModTime = util.GetTimeFromMsecSinceEpoch(val)
}
return NewFileInfo(name, true, attrs.Size, objectModTime, false), nil
} }
func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, flag int, name string) { func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, flag int, name string) {
@ -927,7 +960,11 @@ func (l *gcsDirLister) Next(limit int) ([]os.FileInfo, error) {
} }
l.prefixes[name] = true l.prefixes[name] = true
} }
l.cache = append(l.cache, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false)) modTime := attrs.Updated
if val := getLastModified(attrs.Metadata); val > 0 {
modTime = util.GetTimeFromMsecSinceEpoch(val)
}
l.cache = append(l.cache, NewFileInfo(name, isDir, attrs.Size, modTime, false))
} }
} }

View file

@ -24,6 +24,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -38,11 +39,12 @@ import (
) )
const ( const (
dirMimeType = "inode/directory" dirMimeType = "inode/directory"
s3fsName = "S3Fs" s3fsName = "S3Fs"
gcsfsName = "GCSFs" gcsfsName = "GCSFs"
azBlobFsName = "AzureBlobFs" azBlobFsName = "AzureBlobFs"
preResumeTimeout = 90 * time.Second lastModifiedField = "sftpgo_last_modified"
preResumeTimeout = 90 * time.Second
// ListerBatchSize defines the default limit for DirLister implementations // ListerBatchSize defines the default limit for DirLister implementations
ListerBatchSize = 1000 ListerBatchSize = 1000
) )
@ -1080,6 +1082,31 @@ func IsUploadResumeSupported(fs Fs, size int64) bool {
return fs.IsConditionalUploadResumeSupported(size) return fs.IsConditionalUploadResumeSupported(size)
} }
func getLastModified(metadata map[string]string) int64 {
if val, ok := metadata[lastModifiedField]; ok {
lastModified, err := strconv.ParseInt(val, 10, 64)
if err == nil {
return lastModified
}
}
return 0
}
func getAzureLastModified(metadata map[string]*string) int64 {
for k, v := range metadata {
if strings.ToLower(k) == lastModifiedField {
if val := util.GetStringFromPointer(v); val != "" {
lastModified, err := strconv.ParseInt(val, 10, 64)
if err == nil {
return lastModified
}
}
return 0
}
}
return 0
}
func validateOSFsConfig(config *sdk.OSFsConfig) error { func validateOSFsConfig(config *sdk.OSFsConfig) error {
if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 { if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB") return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")