gcsfs: use pagers when listing bucket objects
Hopefully fixes #746 Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
parent
056daaddfc
commit
df828b6021
4 changed files with 161 additions and 108 deletions
10
go.mod
10
go.mod
|
@ -4,11 +4,11 @@ go 1.17
|
|||
|
||||
require (
|
||||
cloud.google.com/go/storage v1.21.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.22.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0
|
||||
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962
|
||||
github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387
|
||||
github.com/aws/aws-sdk-go v1.43.10
|
||||
github.com/aws/aws-sdk-go v1.43.11
|
||||
github.com/cockroachdb/cockroach-go/v2 v2.2.8
|
||||
github.com/coreos/go-oidc/v3 v3.1.0
|
||||
github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001
|
||||
|
@ -26,7 +26,7 @@ require (
|
|||
github.com/hashicorp/go-plugin v1.4.3
|
||||
github.com/hashicorp/go-retryablehttp v0.7.0
|
||||
github.com/jlaffaye/ftp v0.0.0-20201112195030-9aae4d151126
|
||||
github.com/klauspost/compress v1.14.4
|
||||
github.com/klauspost/compress v1.15.0
|
||||
github.com/lestrrat-go/jwx v1.2.20
|
||||
github.com/lib/pq v1.10.4
|
||||
github.com/lithammer/shortuuid/v3 v3.0.7
|
||||
|
@ -81,7 +81,7 @@ require (
|
|||
github.com/fsnotify/fsnotify v1.5.1 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-test/deep v1.0.8 // indirect
|
||||
github.com/goccy/go-json v0.9.4 // indirect
|
||||
github.com/goccy/go-json v0.9.5 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.7 // indirect
|
||||
|
@ -130,7 +130,7 @@ require (
|
|||
golang.org/x/tools v0.1.9 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220303160752-862486edd9cc // indirect
|
||||
google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8 // indirect
|
||||
google.golang.org/grpc v1.44.0 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/ini.v1 v1.66.4 // indirect
|
||||
|
|
18
go.sum
18
go.sum
|
@ -86,8 +86,9 @@ github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl
|
|||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v57.0.0+incompatible h1:isVki3PbIFrwKvKdVP1byxo73/pt+Nn174YxW1k4PNw=
|
||||
github.com/Azure/azure-sdk-for-go v57.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 h1:qoVeMsc9/fh/yhxVaA0obYjVH/oI/ihrOoMwsLS9KSA=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.22.0 h1:zBJcBJwte0x6PcPK7XaWDMvK2o2ZM2f1sMaqNNavQ5g=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.22.0/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.1 h1:sLZ/Y+P/5RRtsXWylBjB5lkgixYfm0MQPiwrSX//JSo=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
|
||||
|
@ -144,8 +145,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI
|
|||
github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
|
||||
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
|
||||
github.com/aws/aws-sdk-go v1.40.34/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
|
||||
github.com/aws/aws-sdk-go v1.43.10 h1:lFX6gzTBltYBnlJBjd2DWRCmqn2CbTcs6PW99/Dme7k=
|
||||
github.com/aws/aws-sdk-go v1.43.10/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/aws/aws-sdk-go v1.43.11 h1:NebCNJ2QvsFCnsKT1ei98bfwTPEoO2qwtWT42tJ3N3Q=
|
||||
github.com/aws/aws-sdk-go v1.43.11/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.7.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.4.0/go.mod h1:dgGR+Qq7Wjcd4AOAW5Rf5Tnv3+x7ed6kETXyS9WCuAY=
|
||||
|
@ -293,8 +294,9 @@ github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22
|
|||
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
|
||||
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
|
||||
github.com/goccy/go-json v0.7.6/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/goccy/go-json v0.9.4 h1:L8MLKG2mvVXiQu07qB6hmfqeSYQdOnqPot2GhsIwIaI=
|
||||
github.com/goccy/go-json v0.9.4/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/goccy/go-json v0.9.5 h1:ooSMW526ZjK+EaL5elrSyN2EzIfi/3V0m4+HJEDYLik=
|
||||
github.com/goccy/go-json v0.9.5/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
|
||||
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
|
||||
|
@ -516,8 +518,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
|
||||
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=
|
||||
github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.0.11 h1:i2lw1Pm7Yi/4O6XCSyJWqEHI2MDw2FzUK6o/D21xn2A=
|
||||
github.com/klauspost/cpuid/v2 v2.0.11/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
|
||||
|
@ -1190,8 +1192,8 @@ google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2
|
|||
google.golang.org/genproto v0.0.0-20220216160803-4663080d8bc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
|
||||
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
|
||||
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
|
||||
google.golang.org/genproto v0.0.0-20220303160752-862486edd9cc h1:fb/ViRpv3ln/LvbqZtTpoOd1YQDNH12gaGZreoSFovE=
|
||||
google.golang.org/genproto v0.0.0-20220303160752-862486edd9cc/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
|
||||
google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8 h1:U9V52f6rAgINH7kT+musA1qF8kWyVOxzF8eYuOVuFwQ=
|
||||
google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
|
||||
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||
|
|
224
vfs/gcsfs.go
224
vfs/gcsfs.go
|
@ -31,6 +31,10 @@ import (
|
|||
"github.com/drakkan/sftpgo/v2/version"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultPageSize = 5000
|
||||
)
|
||||
|
||||
var (
|
||||
gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"}
|
||||
)
|
||||
|
@ -156,6 +160,7 @@ func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fu
|
|||
go func() {
|
||||
defer cancelFn()
|
||||
defer objectReader.Close()
|
||||
|
||||
n, err := io.Copy(w, objectReader)
|
||||
w.CloseWithError(err) //nolint:errcheck
|
||||
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
|
||||
|
@ -236,6 +241,7 @@ func (fs *GCSFs) Rename(source, target string) error {
|
|||
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
copier := dst.CopierFrom(src)
|
||||
if fs.config.StorageClass != "" {
|
||||
copier.StorageClass = fs.config.StorageClass
|
||||
|
@ -382,55 +388,61 @@ func (fs *GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|||
}
|
||||
|
||||
prefixes := make(map[string]bool)
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
it := bkt.Objects(ctx, query)
|
||||
pager := iterator.NewPager(it, defaultPageSize, "")
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
var objects []*storage.ObjectAttrs
|
||||
pageToken, err := pager.NextPage(&objects)
|
||||
if err != nil {
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return result, err
|
||||
}
|
||||
if attrs.Prefix != "" {
|
||||
name, _ := fs.resolve(attrs.Prefix, prefix)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := prefixes[name]; ok {
|
||||
continue
|
||||
}
|
||||
result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
|
||||
prefixes[name] = true
|
||||
} else {
|
||||
name, isDir := fs.resolve(attrs.Name, prefix)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
}
|
||||
if attrs.ContentType == dirMimeType {
|
||||
isDir = true
|
||||
}
|
||||
if isDir {
|
||||
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
|
||||
|
||||
for _, attrs := range objects {
|
||||
if attrs.Prefix != "" {
|
||||
name, _ := fs.resolve(attrs.Prefix, prefix, attrs.ContentType)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := prefixes[name]; ok {
|
||||
continue
|
||||
}
|
||||
result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
|
||||
prefixes[name] = true
|
||||
} else {
|
||||
name, isDir := fs.resolve(attrs.Name, prefix, attrs.ContentType)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
}
|
||||
if isDir {
|
||||
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
|
||||
if _, ok := prefixes[name]; ok {
|
||||
continue
|
||||
}
|
||||
prefixes[name] = true
|
||||
}
|
||||
modTime := attrs.Updated
|
||||
if t, ok := modTimes[name]; ok {
|
||||
modTime = util.GetTimeFromMsecSinceEpoch(t)
|
||||
}
|
||||
result = append(result, NewFileInfo(name, isDir, attrs.Size, modTime, false))
|
||||
}
|
||||
modTime := attrs.Updated
|
||||
if t, ok := modTimes[name]; ok {
|
||||
modTime = util.GetTimeFromMsecSinceEpoch(t)
|
||||
}
|
||||
result = append(result, NewFileInfo(name, isDir, attrs.Size, modTime, false))
|
||||
}
|
||||
|
||||
objects = nil
|
||||
if pageToken == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
metric.GCSListObjectsCompleted(nil)
|
||||
return result, nil
|
||||
}
|
||||
|
@ -506,27 +518,37 @@ func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
|
|||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
it := bkt.Objects(ctx, query)
|
||||
pager := iterator.NewPager(it, defaultPageSize, "")
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
var objects []*storage.ObjectAttrs
|
||||
pageToken, err := pager.NextPage(&objects)
|
||||
if err != nil {
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return numFiles, size, err
|
||||
}
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
|
||||
for _, attrs := range objects {
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
}
|
||||
isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
|
||||
if isDir && attrs.Size == 0 {
|
||||
continue
|
||||
}
|
||||
numFiles++
|
||||
size += attrs.Size
|
||||
}
|
||||
isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
|
||||
if isDir && attrs.Size == 0 {
|
||||
continue
|
||||
|
||||
objects = nil
|
||||
if pageToken == "" {
|
||||
break
|
||||
}
|
||||
numFiles++
|
||||
size += attrs.Size
|
||||
}
|
||||
|
||||
metric.GCSListObjectsCompleted(nil)
|
||||
return numFiles, size, err
|
||||
}
|
||||
|
@ -546,34 +568,43 @@ func (fs *GCSFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
|
|||
if err != nil {
|
||||
return fileNames, err
|
||||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
it := bkt.Objects(ctx, query)
|
||||
pager := iterator.NewPager(it, defaultPageSize, "")
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
var objects []*storage.ObjectAttrs
|
||||
pageToken, err := pager.NextPage(&objects)
|
||||
if err != nil {
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return fileNames, err
|
||||
}
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
|
||||
for _, attrs := range objects {
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
}
|
||||
if attrs.Prefix == "" {
|
||||
name, isDir := fs.resolve(attrs.Name, prefix, attrs.ContentType)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if isDir {
|
||||
continue
|
||||
}
|
||||
fileNames[name] = true
|
||||
}
|
||||
}
|
||||
if attrs.Prefix == "" {
|
||||
name, isDir := fs.resolve(attrs.Name, prefix)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if isDir || attrs.ContentType == dirMimeType {
|
||||
continue
|
||||
}
|
||||
fileNames[name] = true
|
||||
|
||||
objects = nil
|
||||
if pageToken == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
metric.GCSListObjectsCompleted(nil)
|
||||
return fileNames, nil
|
||||
}
|
||||
|
@ -635,33 +666,39 @@ func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
|
|||
return err
|
||||
}
|
||||
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
it := bkt.Objects(ctx, query)
|
||||
pager := iterator.NewPager(it, defaultPageSize, "")
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
var objects []*storage.ObjectAttrs
|
||||
pageToken, err := pager.NextPage(&objects)
|
||||
if err != nil {
|
||||
walkFn(root, nil, err) //nolint:errcheck
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return err
|
||||
}
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
|
||||
for _, attrs := range objects {
|
||||
if !attrs.Deleted.IsZero() {
|
||||
continue
|
||||
}
|
||||
name, isDir := fs.resolve(attrs.Name, prefix, attrs.ContentType)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
name, isDir := fs.resolve(attrs.Name, prefix)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if attrs.ContentType == dirMimeType {
|
||||
isDir = true
|
||||
}
|
||||
err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
objects = nil
|
||||
if pageToken == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -691,12 +728,15 @@ func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
|
|||
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
|
||||
}
|
||||
|
||||
func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
|
||||
func (fs *GCSFs) resolve(name, prefix, contentType string) (string, bool) {
|
||||
result := strings.TrimPrefix(name, prefix)
|
||||
isDir := strings.HasSuffix(result, "/")
|
||||
if isDir {
|
||||
result = strings.TrimSuffix(result, "/")
|
||||
}
|
||||
if contentType == dirMimeType {
|
||||
isDir = true
|
||||
}
|
||||
return result, isDir
|
||||
}
|
||||
|
||||
|
@ -735,6 +775,7 @@ func (fs *GCSFs) getObjectStat(name string) (string, os.FileInfo, error) {
|
|||
func (fs *GCSFs) checkIfBucketExists() error {
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
_, err := bkt.Attrs(ctx)
|
||||
metric.GCSHeadBucketCompleted(err)
|
||||
|
@ -755,22 +796,23 @@ func (fs *GCSFs) hasContents(name string) (bool, error) {
|
|||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
it := bkt.Objects(ctx, query)
|
||||
// if we have a dir object with a trailing slash it will be returned so we set the size to 2
|
||||
it.PageInfo().MaxSize = 2
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return result, err
|
||||
}
|
||||
name, _ := fs.resolve(attrs.Name, prefix)
|
||||
pager := iterator.NewPager(it, 2, "")
|
||||
|
||||
var objects []*storage.ObjectAttrs
|
||||
_, err = pager.NextPage(&objects)
|
||||
if err != nil {
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
for _, attrs := range objects {
|
||||
name, _ := fs.resolve(attrs.Name, prefix, attrs.ContentType)
|
||||
// a dir object with a trailing slash will result in an empty name
|
||||
if name == "/" || name == "" {
|
||||
continue
|
||||
|
@ -779,7 +821,7 @@ func (fs *GCSFs) hasContents(name string) (bool, error) {
|
|||
break
|
||||
}
|
||||
|
||||
metric.GCSListObjectsCompleted(err)
|
||||
metric.GCSListObjectsCompleted(nil)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
|
17
vfs/s3fs.go
17
vfs/s3fs.go
|
@ -199,6 +199,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
|
|||
|
||||
go func() {
|
||||
defer cancelFn()
|
||||
|
||||
n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(name),
|
||||
|
@ -236,6 +237,7 @@ func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error)
|
|||
}
|
||||
go func() {
|
||||
defer cancelFn()
|
||||
|
||||
key := name
|
||||
var contentType string
|
||||
if flag == -1 {
|
||||
|
@ -305,6 +307,7 @@ func (fs *S3Fs) Rename(source, target string) error {
|
|||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
_, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
CopySource: aws.String(pathEscape(copySource)),
|
||||
|
@ -354,6 +357,7 @@ func (fs *S3Fs) Remove(name string, isDir bool) error {
|
|||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
_, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(name),
|
||||
|
@ -446,9 +450,9 @@ func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|||
return result, err
|
||||
}
|
||||
prefixes := make(map[string]bool)
|
||||
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
err = fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
|
@ -559,6 +563,7 @@ func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
|
|||
size := int64(0)
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Prefix: aws.String(fs.config.KeyPrefix),
|
||||
|
@ -583,7 +588,7 @@ func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
|
|||
if fsPrefix != "/" {
|
||||
prefix = strings.TrimPrefix(fsPrefix, "/")
|
||||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
|
||||
|
@ -656,8 +661,9 @@ func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
|
|||
prefix += "/"
|
||||
}
|
||||
}
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
|
@ -721,6 +727,7 @@ func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
|
|||
func (fs *S3Fs) checkIfBucketExists() error {
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
_, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
})
|
||||
|
@ -762,6 +769,7 @@ func (fs *S3Fs) hasContents(name string) (bool, error) {
|
|||
maxResults := int64(2)
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
|
@ -786,6 +794,7 @@ func (fs *S3Fs) hasContents(name string) (bool, error) {
|
|||
func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(name),
|
||||
|
|
Loading…
Reference in a new issue