From d41377506088f8ad2ab43fafeb16a273338c905e Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sun, 18 Feb 2024 10:12:53 +0100 Subject: [PATCH] vfs: log progress after each page iteration Signed-off-by: Nicola Murino --- internal/vfs/azblobfs.go | 7 +++-- internal/vfs/gcsfs.go | 66 +++++++++++++++++++++++++--------------- internal/vfs/s3fs.go | 4 +-- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index f541ef1d..a091eaf8 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -543,11 +543,9 @@ func (fs *AzureBlobFs) GetDirSize(dirname string) (int, int64, error) { } numFiles++ size += blobSize - if numFiles%1000 == 0 { - fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size) - } } } + fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size) } metric.AZListObjectsCompleted(nil) @@ -616,6 +614,9 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { isDir = checkDirectoryMarkers(contentType, blobItem.Metadata) blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength) lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified) + if val := getAzureLastModified(blobItem.Metadata); val > 0 { + lastModified = util.GetTimeFromMsecSinceEpoch(val) + } } err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil) if err != nil { diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index f71ed0ca..5dd1874d 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -476,21 +476,20 @@ func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) { if err != nil { return numFiles, size, err } - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) - defer cancelFn() - bkt := fs.svc.Bucket(fs.config.Bucket) - it := bkt.Objects(ctx, query) - pager := iterator.NewPager(it, defaultGCSPageSize, "") + iteratePage := func(nextPageToken string) (string, error) { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() + + bkt := fs.svc.Bucket(fs.config.Bucket) + it := bkt.Objects(ctx, query) + pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken) - for { var objects []*storage.ObjectAttrs pageToken, err := pager.NextPage(&objects) if err != nil { - metric.GCSListObjectsCompleted(err) - return numFiles, size, err + return pageToken, err } - for _, attrs := range objects { if !attrs.Deleted.IsZero() { continue @@ -501,12 +500,18 @@ func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) { } numFiles++ size += attrs.Size - if numFiles%1000 == 0 { - fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size) - } } + return pageToken, nil + } - objects = nil + pageToken := "" + for { + pageToken, err = iteratePage(pageToken) + if err != nil { + metric.GCSListObjectsCompleted(err) + return numFiles, size, err + } + fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size) if pageToken == "" { break } @@ -556,22 +561,20 @@ func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { return err } - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) - defer cancelFn() + iteratePage := func(nextPageToken string) (string, error) { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() - bkt := fs.svc.Bucket(fs.config.Bucket) - it := bkt.Objects(ctx, query) - pager := iterator.NewPager(it, defaultGCSPageSize, "") + bkt := fs.svc.Bucket(fs.config.Bucket) + it := bkt.Objects(ctx, query) + pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken) - for { var objects []*storage.ObjectAttrs pageToken, err := pager.NextPage(&objects) if err != nil { walkFn(root, nil, err) //nolint:errcheck - metric.GCSListObjectsCompleted(err) - return err + return pageToken, err } - for _, attrs := range objects { if !attrs.Deleted.IsZero() { continue @@ -580,13 +583,26 @@ func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { if name == "" { continue } - err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil) + objectModTime := attrs.Updated + if val := getLastModified(attrs.Metadata); val > 0 { + objectModTime = util.GetTimeFromMsecSinceEpoch(val) + } + err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, objectModTime, false), nil) if err != nil { - return err + return pageToken, err } } - objects = nil + return pageToken, nil + } + + pageToken := "" + for { + pageToken, err = iteratePage(pageToken) + if err != nil { + metric.GCSListObjectsCompleted(err) + return err + } if pageToken == "" { break } diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index 2cbdcce0..ece051a3 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -528,10 +528,8 @@ func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) { } numFiles++ size += objectSize - if numFiles%1000 == 0 { - fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size) - } } + fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size) } metric.S3ListObjectsCompleted(nil)