From 5f35b8b7044d9873efb7a3539f1a6a6624386064 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sun, 18 Dec 2022 10:29:57 +0100 Subject: [PATCH] gcsfs: backport from main Signed-off-by: Nicola Murino --- internal/vfs/gcsfs.go | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index 2ac4d760..217ddbd0 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -170,6 +170,19 @@ func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error p := NewPipeWriter(w) bkt := fs.svc.Bucket(fs.config.Bucket) obj := bkt.Object(name) + if flag == -1 { + obj = obj.If(storage.Conditions{DoesNotExist: true}) + } else { + attrs, statErr := fs.headObject(name) + if statErr == nil { + obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation}) + } else if fs.IsNotExist(statErr) { + obj = obj.If(storage.Conditions{DoesNotExist: true}) + } else { + fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr) + } + } + ctx, cancelFn := context.WithCancel(context.Background()) objectWriter := obj.NewWriter(ctx) var contentType string @@ -231,6 +244,16 @@ func (fs *GCSFs) Rename(source, target string) error { } else { src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName) dst := fs.svc.Bucket(fs.config.Bucket).Object(target) + attrs, statErr := fs.headObject(target) + if statErr == nil { + dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation}) + } else if fs.IsNotExist(statErr) { + dst = dst.If(storage.Conditions{DoesNotExist: true}) + } else { + fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v", + target, statErr) + } + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() @@ -276,11 +299,20 @@ func (fs *GCSFs) Remove(name string, isDir bool) error { name += "/" } } + obj := fs.svc.Bucket(fs.config.Bucket).Object(name) + attrs, statErr := fs.headObject(name) + if statErr == nil { + obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation}) + } else { + fsLog(fs, logger.LevelWarn, "unable to set precondition for deleting %q, stat err: %v", + name, statErr) + } + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() - err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx) - if fs.IsNotExist(err) && isDir { + err := obj.Delete(ctx) + if isDir && fs.IsNotExist(err) { // we can have directories without a trailing "/" (created using v2.1.0 and before) err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx) }