gcsfs: backport from main

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2022-12-18 10:29:57 +01:00
parent 08e29d4ee0
commit 5f35b8b704
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF

View file

@ -170,6 +170,19 @@ func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error
p := NewPipeWriter(w)
bkt := fs.svc.Bucket(fs.config.Bucket)
obj := bkt.Object(name)
if flag == -1 {
obj = obj.If(storage.Conditions{DoesNotExist: true})
} else {
attrs, statErr := fs.headObject(name)
if statErr == nil {
obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(statErr) {
obj = obj.If(storage.Conditions{DoesNotExist: true})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr)
}
}
ctx, cancelFn := context.WithCancel(context.Background())
objectWriter := obj.NewWriter(ctx)
var contentType string
@ -231,6 +244,16 @@ func (fs *GCSFs) Rename(source, target string) error {
} else {
src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName)
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
attrs, statErr := fs.headObject(target)
if statErr == nil {
dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(statErr) {
dst = dst.If(storage.Conditions{DoesNotExist: true})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v",
target, statErr)
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
@ -276,11 +299,20 @@ func (fs *GCSFs) Remove(name string, isDir bool) error {
name += "/"
}
}
obj := fs.svc.Bucket(fs.config.Bucket).Object(name)
attrs, statErr := fs.headObject(name)
if statErr == nil {
obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for deleting %q, stat err: %v",
name, statErr)
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
if fs.IsNotExist(err) && isDir {
err := obj.Delete(ctx)
if isDir && fs.IsNotExist(err) {
// we can have directories without a trailing "/" (created using v2.1.0 and before)
err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx)
}