gcsfs.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. // +build !nogcs
  2. package vfs
  3. import (
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "mime"
  11. "net/http"
  12. "os"
  13. "path"
  14. "path/filepath"
  15. "strings"
  16. "time"
  17. "cloud.google.com/go/storage"
  18. "github.com/eikenb/pipeat"
  19. "google.golang.org/api/googleapi"
  20. "google.golang.org/api/iterator"
  21. "google.golang.org/api/option"
  22. "github.com/drakkan/sftpgo/kms"
  23. "github.com/drakkan/sftpgo/logger"
  24. "github.com/drakkan/sftpgo/metrics"
  25. "github.com/drakkan/sftpgo/version"
  26. )
  27. var (
  28. gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"}
  29. )
  30. // GCSFs is a Fs implementation for Google Cloud Storage.
  31. type GCSFs struct {
  32. connectionID string
  33. localTempDir string
  34. config GCSFsConfig
  35. svc *storage.Client
  36. ctxTimeout time.Duration
  37. ctxLongTimeout time.Duration
  38. }
  39. func init() {
  40. version.AddFeature("+gcs")
  41. }
  42. // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage
  43. func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) {
  44. var err error
  45. fs := &GCSFs{
  46. connectionID: connectionID,
  47. localTempDir: localTempDir,
  48. config: config,
  49. ctxTimeout: 30 * time.Second,
  50. ctxLongTimeout: 300 * time.Second,
  51. }
  52. if err = ValidateGCSFsConfig(&fs.config, fs.config.CredentialFile); err != nil {
  53. return fs, err
  54. }
  55. ctx := context.Background()
  56. if fs.config.AutomaticCredentials > 0 {
  57. fs.svc, err = storage.NewClient(ctx)
  58. } else if !fs.config.Credentials.IsEmpty() {
  59. if fs.config.Credentials.IsEncrypted() {
  60. err = fs.config.Credentials.Decrypt()
  61. if err != nil {
  62. return fs, err
  63. }
  64. }
  65. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())))
  66. } else {
  67. var creds []byte
  68. creds, err = ioutil.ReadFile(fs.config.CredentialFile)
  69. if err != nil {
  70. return fs, err
  71. }
  72. secret := kms.NewEmptySecret()
  73. err = json.Unmarshal(creds, secret)
  74. if err != nil {
  75. return fs, err
  76. }
  77. err = secret.Decrypt()
  78. if err != nil {
  79. return fs, err
  80. }
  81. fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(secret.GetPayload())))
  82. }
  83. return fs, err
  84. }
  85. // Name returns the name for the Fs implementation
  86. func (fs *GCSFs) Name() string {
  87. return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket)
  88. }
  89. // ConnectionID returns the connection ID associated to this Fs implementation
  90. func (fs *GCSFs) ConnectionID() string {
  91. return fs.connectionID
  92. }
  93. // Stat returns a FileInfo describing the named file
  94. func (fs *GCSFs) Stat(name string) (os.FileInfo, error) {
  95. var result FileInfo
  96. var err error
  97. if name == "" || name == "." {
  98. err := fs.checkIfBucketExists()
  99. if err != nil {
  100. return result, err
  101. }
  102. return NewFileInfo(name, true, 0, time.Now(), false), nil
  103. }
  104. if fs.config.KeyPrefix == name+"/" {
  105. return NewFileInfo(name, true, 0, time.Now(), false), nil
  106. }
  107. attrs, err := fs.headObject(name)
  108. if err == nil {
  109. objSize := attrs.Size
  110. objectModTime := attrs.Updated
  111. isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
  112. return NewFileInfo(name, isDir, objSize, objectModTime, false), nil
  113. }
  114. if !fs.IsNotExist(err) {
  115. return result, err
  116. }
  117. // now check if this is a prefix (virtual directory)
  118. hasContents, err := fs.hasContents(name)
  119. if err == nil && hasContents {
  120. return NewFileInfo(name, true, 0, time.Now(), false), nil
  121. } else if err != nil {
  122. return nil, err
  123. }
  124. // search a dir ending with "/" for backward compatibility
  125. return fs.getStatCompat(name)
  126. }
  127. func (fs *GCSFs) getStatCompat(name string) (os.FileInfo, error) {
  128. var result FileInfo
  129. prefix := fs.getPrefixForStat(name)
  130. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  131. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  132. if err != nil {
  133. return nil, err
  134. }
  135. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  136. defer cancelFn()
  137. bkt := fs.svc.Bucket(fs.config.Bucket)
  138. it := bkt.Objects(ctx, query)
  139. for {
  140. attrs, err := it.Next()
  141. if err == iterator.Done {
  142. break
  143. }
  144. if err != nil {
  145. metrics.GCSListObjectsCompleted(err)
  146. return result, err
  147. }
  148. if attrs.Prefix != "" {
  149. if fs.isEqual(attrs.Prefix, name) {
  150. result = NewFileInfo(name, true, 0, time.Now(), false)
  151. break
  152. }
  153. } else {
  154. if !attrs.Deleted.IsZero() {
  155. continue
  156. }
  157. if fs.isEqual(attrs.Name, name) {
  158. isDir := strings.HasSuffix(attrs.Name, "/")
  159. result = NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false)
  160. break
  161. }
  162. }
  163. }
  164. metrics.GCSListObjectsCompleted(nil)
  165. if result.Name() == "" {
  166. err = errors.New("404 no such file or directory")
  167. }
  168. return result, err
  169. }
  170. // Lstat returns a FileInfo describing the named file
  171. func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
  172. return fs.Stat(name)
  173. }
  174. // Open opens the named file for reading
  175. func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  176. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  177. if err != nil {
  178. return nil, nil, nil, err
  179. }
  180. bkt := fs.svc.Bucket(fs.config.Bucket)
  181. obj := bkt.Object(name)
  182. ctx, cancelFn := context.WithCancel(context.Background())
  183. objectReader, err := obj.NewRangeReader(ctx, offset, -1)
  184. if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" {
  185. err = fmt.Errorf("Range request is not possible for gzip content encoding, requested offset %v", offset)
  186. objectReader.Close()
  187. }
  188. if err != nil {
  189. r.Close()
  190. w.Close()
  191. cancelFn()
  192. return nil, nil, nil, err
  193. }
  194. go func() {
  195. defer cancelFn()
  196. defer objectReader.Close()
  197. n, err := io.Copy(w, objectReader)
  198. w.CloseWithError(err) //nolint:errcheck
  199. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
  200. metrics.GCSTransferCompleted(n, 1, err)
  201. }()
  202. return nil, r, cancelFn, nil
  203. }
  204. // Create creates or opens the named file for writing
  205. func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  206. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  207. if err != nil {
  208. return nil, nil, nil, err
  209. }
  210. p := NewPipeWriter(w)
  211. bkt := fs.svc.Bucket(fs.config.Bucket)
  212. obj := bkt.Object(name)
  213. ctx, cancelFn := context.WithCancel(context.Background())
  214. objectWriter := obj.NewWriter(ctx)
  215. var contentType string
  216. if flag == -1 {
  217. contentType = dirMimeType
  218. } else {
  219. contentType = mime.TypeByExtension(path.Ext(name))
  220. }
  221. if contentType != "" {
  222. objectWriter.ObjectAttrs.ContentType = contentType
  223. }
  224. if fs.config.StorageClass != "" {
  225. objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
  226. }
  227. go func() {
  228. defer cancelFn()
  229. n, err := io.Copy(objectWriter, r)
  230. closeErr := objectWriter.Close()
  231. if err == nil {
  232. err = closeErr
  233. }
  234. r.CloseWithError(err) //nolint:errcheck
  235. p.Done(err)
  236. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, n, err)
  237. metrics.GCSTransferCompleted(n, 0, err)
  238. }()
  239. return nil, p, cancelFn, nil
  240. }
  241. // Rename renames (moves) source to target.
  242. // We don't support renaming non empty directories since we should
  243. // rename all the contents too and this could take long time: think
  244. // about directories with thousands of files, for each file we should
  245. // execute a CopyObject call.
  246. func (fs *GCSFs) Rename(source, target string) error {
  247. if source == target {
  248. return nil
  249. }
  250. fi, err := fs.Stat(source)
  251. if err != nil {
  252. return err
  253. }
  254. if fi.IsDir() {
  255. hasContents, err := fs.hasContents(source)
  256. if err != nil {
  257. return err
  258. }
  259. if hasContents {
  260. return fmt.Errorf("Cannot rename non empty directory: %#v", source)
  261. }
  262. }
  263. src := fs.svc.Bucket(fs.config.Bucket).Object(source)
  264. dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
  265. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  266. defer cancelFn()
  267. copier := dst.CopierFrom(src)
  268. if fs.config.StorageClass != "" {
  269. copier.StorageClass = fs.config.StorageClass
  270. }
  271. var contentType string
  272. if fi.IsDir() {
  273. contentType = dirMimeType
  274. } else {
  275. contentType = mime.TypeByExtension(path.Ext(source))
  276. }
  277. if contentType != "" {
  278. copier.ContentType = contentType
  279. }
  280. _, err = copier.Run(ctx)
  281. metrics.GCSCopyObjectCompleted(err)
  282. if err != nil {
  283. return err
  284. }
  285. return fs.Remove(source, fi.IsDir())
  286. }
  287. // Remove removes the named file or (empty) directory.
  288. func (fs *GCSFs) Remove(name string, isDir bool) error {
  289. if isDir {
  290. hasContents, err := fs.hasContents(name)
  291. if err != nil {
  292. return err
  293. }
  294. if hasContents {
  295. return fmt.Errorf("Cannot remove non empty directory: %#v", name)
  296. }
  297. }
  298. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  299. defer cancelFn()
  300. err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
  301. metrics.GCSDeleteObjectCompleted(err)
  302. if fs.IsNotExist(err) && isDir {
  303. name = name + "/"
  304. err = fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx)
  305. metrics.GCSDeleteObjectCompleted(err)
  306. }
  307. return err
  308. }
  309. // Mkdir creates a new directory with the specified name and default permissions
  310. func (fs *GCSFs) Mkdir(name string) error {
  311. _, err := fs.Stat(name)
  312. if !fs.IsNotExist(err) {
  313. return err
  314. }
  315. _, w, _, err := fs.Create(name, -1)
  316. if err != nil {
  317. return err
  318. }
  319. return w.Close()
  320. }
  321. // Symlink creates source as a symbolic link to target.
  322. func (*GCSFs) Symlink(source, target string) error {
  323. return ErrVfsUnsupported
  324. }
  325. // Readlink returns the destination of the named symbolic link
  326. func (*GCSFs) Readlink(name string) (string, error) {
  327. return "", ErrVfsUnsupported
  328. }
  329. // Chown changes the numeric uid and gid of the named file.
  330. func (*GCSFs) Chown(name string, uid int, gid int) error {
  331. return ErrVfsUnsupported
  332. }
  333. // Chmod changes the mode of the named file to mode.
  334. func (*GCSFs) Chmod(name string, mode os.FileMode) error {
  335. return ErrVfsUnsupported
  336. }
  337. // Chtimes changes the access and modification times of the named file.
  338. func (*GCSFs) Chtimes(name string, atime, mtime time.Time) error {
  339. return ErrVfsUnsupported
  340. }
  341. // Truncate changes the size of the named file.
  342. // Truncate by path is not supported, while truncating an opened
  343. // file is handled inside base transfer
  344. func (*GCSFs) Truncate(name string, size int64) error {
  345. return ErrVfsUnsupported
  346. }
  347. // ReadDir reads the directory named by dirname and returns
  348. // a list of directory entries.
  349. func (fs *GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  350. var result []os.FileInfo
  351. // dirname must be already cleaned
  352. prefix := fs.getPrefix(dirname)
  353. query := &storage.Query{Prefix: prefix, Delimiter: "/"}
  354. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  355. if err != nil {
  356. return nil, err
  357. }
  358. prefixes := make(map[string]bool)
  359. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  360. defer cancelFn()
  361. bkt := fs.svc.Bucket(fs.config.Bucket)
  362. it := bkt.Objects(ctx, query)
  363. for {
  364. attrs, err := it.Next()
  365. if err == iterator.Done {
  366. break
  367. }
  368. if err != nil {
  369. metrics.GCSListObjectsCompleted(err)
  370. return result, err
  371. }
  372. if attrs.Prefix != "" {
  373. name, _ := fs.resolve(attrs.Prefix, prefix)
  374. if name == "" {
  375. continue
  376. }
  377. if _, ok := prefixes[name]; ok {
  378. continue
  379. }
  380. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  381. prefixes[name] = true
  382. } else {
  383. name, isDir := fs.resolve(attrs.Name, prefix)
  384. if name == "" {
  385. continue
  386. }
  387. if !attrs.Deleted.IsZero() {
  388. continue
  389. }
  390. if attrs.ContentType == dirMimeType {
  391. isDir = true
  392. }
  393. if isDir {
  394. // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
  395. if _, ok := prefixes[name]; ok {
  396. continue
  397. }
  398. prefixes[name] = true
  399. }
  400. fi := NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false)
  401. result = append(result, fi)
  402. }
  403. }
  404. metrics.GCSListObjectsCompleted(nil)
  405. return result, nil
  406. }
  407. // IsUploadResumeSupported returns true if upload resume is supported.
  408. // SFTP Resume is not supported on S3
  409. func (*GCSFs) IsUploadResumeSupported() bool {
  410. return false
  411. }
  412. // IsAtomicUploadSupported returns true if atomic upload is supported.
  413. // S3 uploads are already atomic, we don't need to upload to a temporary
  414. // file
  415. func (*GCSFs) IsAtomicUploadSupported() bool {
  416. return false
  417. }
  418. // IsNotExist returns a boolean indicating whether the error is known to
  419. // report that a file or directory does not exist
  420. func (*GCSFs) IsNotExist(err error) bool {
  421. if err == nil {
  422. return false
  423. }
  424. if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist {
  425. return true
  426. }
  427. if e, ok := err.(*googleapi.Error); ok {
  428. if e.Code == http.StatusNotFound {
  429. return true
  430. }
  431. }
  432. return strings.Contains(err.Error(), "404")
  433. }
  434. // IsPermission returns a boolean indicating whether the error is known to
  435. // report that permission is denied.
  436. func (*GCSFs) IsPermission(err error) bool {
  437. if err == nil {
  438. return false
  439. }
  440. if e, ok := err.(*googleapi.Error); ok {
  441. if e.Code == http.StatusForbidden || e.Code == http.StatusUnauthorized {
  442. return true
  443. }
  444. }
  445. return strings.Contains(err.Error(), "403")
  446. }
  447. // IsNotSupported returns true if the error indicate an unsupported operation
  448. func (*GCSFs) IsNotSupported(err error) bool {
  449. if err == nil {
  450. return false
  451. }
  452. return err == ErrVfsUnsupported
  453. }
  454. // CheckRootPath creates the specified local root directory if it does not exists
  455. func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool {
  456. // we need a local directory for temporary files
  457. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
  458. return osFs.CheckRootPath(username, uid, gid)
  459. }
  460. // ScanRootDirContents returns the number of files contained in the bucket,
  461. // and their size
  462. func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
  463. numFiles := 0
  464. size := int64(0)
  465. query := &storage.Query{Prefix: fs.config.KeyPrefix}
  466. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  467. if err != nil {
  468. return numFiles, size, err
  469. }
  470. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  471. defer cancelFn()
  472. bkt := fs.svc.Bucket(fs.config.Bucket)
  473. it := bkt.Objects(ctx, query)
  474. for {
  475. attrs, err := it.Next()
  476. if err == iterator.Done {
  477. break
  478. }
  479. if err != nil {
  480. metrics.GCSListObjectsCompleted(err)
  481. return numFiles, size, err
  482. }
  483. if !attrs.Deleted.IsZero() {
  484. continue
  485. }
  486. isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
  487. if isDir && attrs.Size == 0 {
  488. continue
  489. }
  490. numFiles++
  491. size += attrs.Size
  492. }
  493. metrics.GCSListObjectsCompleted(nil)
  494. return numFiles, size, err
  495. }
  496. // GetDirSize returns the number of files and the size for a folder
  497. // including any subfolders
  498. func (*GCSFs) GetDirSize(dirname string) (int, int64, error) {
  499. return 0, 0, ErrVfsUnsupported
  500. }
  501. // GetAtomicUploadPath returns the path to use for an atomic upload.
  502. // GCS uploads are already atomic, we never call this method for GCS
  503. func (*GCSFs) GetAtomicUploadPath(name string) string {
  504. return ""
  505. }
  506. // GetRelativePath returns the path for a file relative to the user's home dir.
  507. // This is the path as seen by SFTPGo users
  508. func (fs *GCSFs) GetRelativePath(name string) string {
  509. rel := path.Clean(name)
  510. if rel == "." {
  511. rel = ""
  512. }
  513. if !path.IsAbs(rel) {
  514. rel = "/" + rel
  515. }
  516. if fs.config.KeyPrefix != "" {
  517. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  518. rel = "/"
  519. }
  520. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  521. }
  522. return rel
  523. }
  524. // Walk walks the file tree rooted at root, calling walkFn for each file or
  525. // directory in the tree, including root
  526. func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
  527. prefix := ""
  528. if root != "" && root != "." {
  529. prefix = strings.TrimPrefix(root, "/")
  530. if !strings.HasSuffix(prefix, "/") {
  531. prefix += "/"
  532. }
  533. }
  534. query := &storage.Query{Prefix: prefix}
  535. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  536. if err != nil {
  537. walkFn(root, nil, err) //nolint:errcheck
  538. return err
  539. }
  540. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  541. defer cancelFn()
  542. bkt := fs.svc.Bucket(fs.config.Bucket)
  543. it := bkt.Objects(ctx, query)
  544. for {
  545. attrs, err := it.Next()
  546. if err == iterator.Done {
  547. break
  548. }
  549. if err != nil {
  550. walkFn(root, nil, err) //nolint:errcheck
  551. metrics.GCSListObjectsCompleted(err)
  552. return err
  553. }
  554. if !attrs.Deleted.IsZero() {
  555. continue
  556. }
  557. name, isDir := fs.resolve(attrs.Name, prefix)
  558. if name == "" {
  559. continue
  560. }
  561. if attrs.ContentType == dirMimeType {
  562. isDir = true
  563. }
  564. err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
  565. if err != nil {
  566. return err
  567. }
  568. }
  569. walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
  570. metrics.GCSListObjectsCompleted(err)
  571. return err
  572. }
  573. // Join joins any number of path elements into a single path
  574. func (*GCSFs) Join(elem ...string) string {
  575. return strings.TrimPrefix(path.Join(elem...), "/")
  576. }
  577. // HasVirtualFolders returns true if folders are emulated
  578. func (GCSFs) HasVirtualFolders() bool {
  579. return true
  580. }
  581. // ResolvePath returns the matching filesystem path for the specified virtual path
  582. func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
  583. if !path.IsAbs(virtualPath) {
  584. virtualPath = path.Clean("/" + virtualPath)
  585. }
  586. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  587. }
  588. func (fs *GCSFs) resolve(name string, prefix string) (string, bool) {
  589. result := strings.TrimPrefix(name, prefix)
  590. isDir := strings.HasSuffix(result, "/")
  591. if isDir {
  592. result = strings.TrimSuffix(result, "/")
  593. }
  594. return result, isDir
  595. }
  596. func (fs *GCSFs) isEqual(key string, virtualName string) bool {
  597. if key == virtualName {
  598. return true
  599. }
  600. if key == virtualName+"/" {
  601. return true
  602. }
  603. if key+"/" == virtualName {
  604. return true
  605. }
  606. return false
  607. }
  608. func (fs *GCSFs) checkIfBucketExists() error {
  609. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  610. defer cancelFn()
  611. bkt := fs.svc.Bucket(fs.config.Bucket)
  612. _, err := bkt.Attrs(ctx)
  613. metrics.GCSHeadBucketCompleted(err)
  614. return err
  615. }
  616. func (fs *GCSFs) hasContents(name string) (bool, error) {
  617. result := false
  618. prefix := ""
  619. if name != "" && name != "." {
  620. prefix = strings.TrimPrefix(name, "/")
  621. if !strings.HasSuffix(prefix, "/") {
  622. prefix += "/"
  623. }
  624. }
  625. query := &storage.Query{Prefix: prefix}
  626. err := query.SetAttrSelection(gcsDefaultFieldsSelection)
  627. if err != nil {
  628. return result, err
  629. }
  630. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  631. defer cancelFn()
  632. bkt := fs.svc.Bucket(fs.config.Bucket)
  633. it := bkt.Objects(ctx, query)
  634. // if we have a dir object with a trailing slash it will be returned so we set the size to 2
  635. it.PageInfo().MaxSize = 2
  636. for {
  637. attrs, err := it.Next()
  638. if err == iterator.Done {
  639. break
  640. }
  641. if err != nil {
  642. metrics.GCSListObjectsCompleted(err)
  643. return result, err
  644. }
  645. name, _ := fs.resolve(attrs.Name, prefix)
  646. // a dir object with a trailing slash will result in an empty name
  647. if name == "/" || name == "" {
  648. continue
  649. }
  650. result = true
  651. break
  652. }
  653. metrics.GCSListObjectsCompleted(err)
  654. return result, nil
  655. }
  656. func (fs *GCSFs) getPrefix(name string) string {
  657. prefix := ""
  658. if name != "" && name != "." && name != "/" {
  659. prefix = strings.TrimPrefix(name, "/")
  660. if !strings.HasSuffix(prefix, "/") {
  661. prefix += "/"
  662. }
  663. }
  664. return prefix
  665. }
  666. func (fs *GCSFs) getPrefixForStat(name string) string {
  667. prefix := path.Dir(name)
  668. if prefix == "/" || prefix == "." || prefix == "" {
  669. prefix = ""
  670. } else {
  671. prefix = strings.TrimPrefix(prefix, "/")
  672. if !strings.HasSuffix(prefix, "/") {
  673. prefix += "/"
  674. }
  675. }
  676. return prefix
  677. }
  678. func (fs *GCSFs) headObject(name string) (*storage.ObjectAttrs, error) {
  679. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  680. defer cancelFn()
  681. bkt := fs.svc.Bucket(fs.config.Bucket)
  682. obj := bkt.Object(name)
  683. attrs, err := obj.Attrs(ctx)
  684. metrics.GCSHeadObjectCompleted(err)
  685. return attrs, err
  686. }
  687. // GetMimeType returns the content type
  688. func (fs *GCSFs) GetMimeType(name string) (string, error) {
  689. attrs, err := fs.headObject(name)
  690. if err != nil {
  691. return "", err
  692. }
  693. return attrs.ContentType, nil
  694. }