vfs.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "time"
  27. "github.com/eikenb/pipeat"
  28. "github.com/pkg/sftp"
  29. "github.com/sftpgo/sdk"
  30. "github.com/sftpgo/sdk/plugin/metadata"
  31. "github.com/drakkan/sftpgo/v2/internal/kms"
  32. "github.com/drakkan/sftpgo/v2/internal/logger"
  33. "github.com/drakkan/sftpgo/v2/internal/plugin"
  34. "github.com/drakkan/sftpgo/v2/internal/util"
  35. )
  36. const (
  37. dirMimeType = "inode/directory"
  38. s3fsName = "S3Fs"
  39. gcsfsName = "GCSFs"
  40. azBlobFsName = "AzureBlobFs"
  41. )
  42. var (
  43. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  44. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  45. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  46. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  47. ErrVfsUnsupported = errors.New("not supported")
  48. tempPath string
  49. sftpFingerprints []string
  50. allowSelfConnections int
  51. renameMode int
  52. )
  53. // SetAllowSelfConnections sets the desired behaviour for self connections
  54. func SetAllowSelfConnections(value int) {
  55. allowSelfConnections = value
  56. }
  57. // SetTempPath sets the path for temporary files
  58. func SetTempPath(fsPath string) {
  59. tempPath = fsPath
  60. }
  61. // GetTempPath returns the path for temporary files
  62. func GetTempPath() string {
  63. return tempPath
  64. }
  65. // SetSFTPFingerprints sets the SFTP host key fingerprints
  66. func SetSFTPFingerprints(fp []string) {
  67. sftpFingerprints = fp
  68. }
  69. // SetRenameMode sets the rename mode
  70. func SetRenameMode(val int) {
  71. renameMode = val
  72. }
  73. // Fs defines the interface for filesystem backends
  74. type Fs interface {
  75. Name() string
  76. ConnectionID() string
  77. Stat(name string) (os.FileInfo, error)
  78. Lstat(name string) (os.FileInfo, error)
  79. Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error)
  80. Create(name string, flag int) (File, *PipeWriter, func(), error)
  81. Rename(source, target string) (int, int64, error)
  82. Remove(name string, isDir bool) error
  83. Mkdir(name string) error
  84. Symlink(source, target string) error
  85. Chown(name string, uid int, gid int) error
  86. Chmod(name string, mode os.FileMode) error
  87. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  88. Truncate(name string, size int64) error
  89. ReadDir(dirname string) ([]os.FileInfo, error)
  90. Readlink(name string) (string, error)
  91. IsUploadResumeSupported() bool
  92. IsAtomicUploadSupported() bool
  93. CheckRootPath(username string, uid int, gid int) bool
  94. ResolvePath(virtualPath string) (string, error)
  95. IsNotExist(err error) bool
  96. IsPermission(err error) bool
  97. IsNotSupported(err error) bool
  98. ScanRootDirContents() (int, int64, error)
  99. GetDirSize(dirname string) (int, int64, error)
  100. GetAtomicUploadPath(name string) string
  101. GetRelativePath(name string) string
  102. Walk(root string, walkFn filepath.WalkFunc) error
  103. Join(elem ...string) string
  104. HasVirtualFolders() bool
  105. GetMimeType(name string) (string, error)
  106. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  107. CheckMetadata() error
  108. Close() error
  109. }
  110. // FsRealPather is a Fs that implements the RealPath method.
  111. type FsRealPather interface {
  112. Fs
  113. RealPath(p string) (string, error)
  114. }
  115. // fsMetadataChecker is a Fs that implements the getFileNamesInPrefix method.
  116. // This interface is used to abstract metadata consistency checks
  117. type fsMetadataChecker interface {
  118. Fs
  119. getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
  120. }
  121. // FsFileCopier is a Fs that implements the CopyFile method.
  122. type FsFileCopier interface {
  123. Fs
  124. CopyFile(source, target string, srcSize int64) error
  125. }
  126. // File defines an interface representing a SFTPGo file
  127. type File interface {
  128. io.Reader
  129. io.Writer
  130. io.Closer
  131. io.ReaderAt
  132. io.WriterAt
  133. io.Seeker
  134. Stat() (os.FileInfo, error)
  135. Name() string
  136. Truncate(size int64) error
  137. }
  138. // QuotaCheckResult defines the result for a quota check
  139. type QuotaCheckResult struct {
  140. HasSpace bool
  141. AllowedSize int64
  142. AllowedFiles int
  143. UsedSize int64
  144. UsedFiles int
  145. QuotaSize int64
  146. QuotaFiles int
  147. }
  148. // GetRemainingSize returns the remaining allowed size
  149. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  150. if q.QuotaSize > 0 {
  151. return q.QuotaSize - q.UsedSize
  152. }
  153. return 0
  154. }
  155. // GetRemainingFiles returns the remaining allowed files
  156. func (q *QuotaCheckResult) GetRemainingFiles() int {
  157. if q.QuotaFiles > 0 {
  158. return q.QuotaFiles - q.UsedFiles
  159. }
  160. return 0
  161. }
  162. // S3FsConfig defines the configuration for S3 based filesystem
  163. type S3FsConfig struct {
  164. sdk.BaseS3FsConfig
  165. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  166. }
  167. // HideConfidentialData hides confidential data
  168. func (c *S3FsConfig) HideConfidentialData() {
  169. if c.AccessSecret != nil {
  170. c.AccessSecret.Hide()
  171. }
  172. }
  173. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  174. if c.Bucket != other.Bucket {
  175. return false
  176. }
  177. if c.KeyPrefix != other.KeyPrefix {
  178. return false
  179. }
  180. if c.Region != other.Region {
  181. return false
  182. }
  183. if c.AccessKey != other.AccessKey {
  184. return false
  185. }
  186. if c.RoleARN != other.RoleARN {
  187. return false
  188. }
  189. if c.Endpoint != other.Endpoint {
  190. return false
  191. }
  192. if c.StorageClass != other.StorageClass {
  193. return false
  194. }
  195. if c.ACL != other.ACL {
  196. return false
  197. }
  198. if !c.areMultipartFieldsEqual(other) {
  199. return false
  200. }
  201. if c.ForcePathStyle != other.ForcePathStyle {
  202. return false
  203. }
  204. return c.isSecretEqual(other)
  205. }
  206. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  207. if c.UploadPartSize != other.UploadPartSize {
  208. return false
  209. }
  210. if c.UploadConcurrency != other.UploadConcurrency {
  211. return false
  212. }
  213. if c.DownloadConcurrency != other.DownloadConcurrency {
  214. return false
  215. }
  216. if c.DownloadPartSize != other.DownloadPartSize {
  217. return false
  218. }
  219. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  220. return false
  221. }
  222. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  223. return false
  224. }
  225. return true
  226. }
  227. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  228. if c.AccessSecret == nil {
  229. c.AccessSecret = kms.NewEmptySecret()
  230. }
  231. if other.AccessSecret == nil {
  232. other.AccessSecret = kms.NewEmptySecret()
  233. }
  234. return c.AccessSecret.IsEqual(other.AccessSecret)
  235. }
  236. func (c *S3FsConfig) checkCredentials() error {
  237. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  238. return errors.New("access_key cannot be empty with access_secret not empty")
  239. }
  240. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  241. return errors.New("access_secret cannot be empty with access_key not empty")
  242. }
  243. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  244. return errors.New("invalid encrypted access_secret")
  245. }
  246. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  247. return errors.New("invalid access_secret")
  248. }
  249. return nil
  250. }
  251. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  252. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  253. if err := c.validate(); err != nil {
  254. return util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  255. }
  256. if c.AccessSecret.IsPlain() {
  257. c.AccessSecret.SetAdditionalData(additionalData)
  258. err := c.AccessSecret.Encrypt()
  259. if err != nil {
  260. return util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err))
  261. }
  262. }
  263. return nil
  264. }
  265. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  266. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  267. return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  268. }
  269. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  270. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  271. }
  272. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  273. return errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
  274. }
  275. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  276. return fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency)
  277. }
  278. return nil
  279. }
  280. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  281. if c.Bucket != other.Bucket {
  282. return false
  283. }
  284. if c.Endpoint != other.Endpoint {
  285. return false
  286. }
  287. return c.Region == other.Region
  288. }
  289. // validate returns an error if the configuration is not valid
  290. func (c *S3FsConfig) validate() error {
  291. if c.AccessSecret == nil {
  292. c.AccessSecret = kms.NewEmptySecret()
  293. }
  294. if c.Bucket == "" {
  295. return errors.New("bucket cannot be empty")
  296. }
  297. // the region may be embedded within the endpoint for some S3 compatible
  298. // object storage, for example B2
  299. if c.Endpoint == "" && c.Region == "" {
  300. return errors.New("region cannot be empty")
  301. }
  302. if err := c.checkCredentials(); err != nil {
  303. return err
  304. }
  305. if c.KeyPrefix != "" {
  306. if strings.HasPrefix(c.KeyPrefix, "/") {
  307. return errors.New("key_prefix cannot start with /")
  308. }
  309. c.KeyPrefix = path.Clean(c.KeyPrefix)
  310. if !strings.HasSuffix(c.KeyPrefix, "/") {
  311. c.KeyPrefix += "/"
  312. }
  313. }
  314. c.StorageClass = strings.TrimSpace(c.StorageClass)
  315. c.ACL = strings.TrimSpace(c.ACL)
  316. return c.checkPartSizeAndConcurrency()
  317. }
  318. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  319. type GCSFsConfig struct {
  320. sdk.BaseGCSFsConfig
  321. Credentials *kms.Secret `json:"credentials,omitempty"`
  322. }
  323. // HideConfidentialData hides confidential data
  324. func (c *GCSFsConfig) HideConfidentialData() {
  325. if c.Credentials != nil {
  326. c.Credentials.Hide()
  327. }
  328. }
  329. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  330. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  331. if err := c.validate(); err != nil {
  332. return util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  333. }
  334. if c.Credentials.IsPlain() {
  335. c.Credentials.SetAdditionalData(additionalData)
  336. err := c.Credentials.Encrypt()
  337. if err != nil {
  338. return util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err))
  339. }
  340. }
  341. return nil
  342. }
  343. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  344. if c.Bucket != other.Bucket {
  345. return false
  346. }
  347. if c.KeyPrefix != other.KeyPrefix {
  348. return false
  349. }
  350. if c.AutomaticCredentials != other.AutomaticCredentials {
  351. return false
  352. }
  353. if c.StorageClass != other.StorageClass {
  354. return false
  355. }
  356. if c.ACL != other.ACL {
  357. return false
  358. }
  359. if c.UploadPartSize != other.UploadPartSize {
  360. return false
  361. }
  362. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  363. return false
  364. }
  365. if c.Credentials == nil {
  366. c.Credentials = kms.NewEmptySecret()
  367. }
  368. if other.Credentials == nil {
  369. other.Credentials = kms.NewEmptySecret()
  370. }
  371. return c.Credentials.IsEqual(other.Credentials)
  372. }
  373. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  374. return c.Bucket == other.Bucket
  375. }
  376. // validate returns an error if the configuration is not valid
  377. func (c *GCSFsConfig) validate() error {
  378. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  379. c.Credentials = kms.NewEmptySecret()
  380. }
  381. if c.Bucket == "" {
  382. return errors.New("bucket cannot be empty")
  383. }
  384. if c.KeyPrefix != "" {
  385. if strings.HasPrefix(c.KeyPrefix, "/") {
  386. return errors.New("key_prefix cannot start with /")
  387. }
  388. c.KeyPrefix = path.Clean(c.KeyPrefix)
  389. if !strings.HasSuffix(c.KeyPrefix, "/") {
  390. c.KeyPrefix += "/"
  391. }
  392. }
  393. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  394. return errors.New("invalid encrypted credentials")
  395. }
  396. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  397. return errors.New("invalid credentials")
  398. }
  399. c.StorageClass = strings.TrimSpace(c.StorageClass)
  400. c.ACL = strings.TrimSpace(c.ACL)
  401. if c.UploadPartSize < 0 {
  402. c.UploadPartSize = 0
  403. }
  404. if c.UploadPartMaxTime < 0 {
  405. c.UploadPartMaxTime = 0
  406. }
  407. return nil
  408. }
  409. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  410. type AzBlobFsConfig struct {
  411. sdk.BaseAzBlobFsConfig
  412. // Storage Account Key leave blank to use SAS URL.
  413. // The access key is stored encrypted based on the kms configuration
  414. AccountKey *kms.Secret `json:"account_key,omitempty"`
  415. // Shared access signature URL, leave blank if using account/key
  416. SASURL *kms.Secret `json:"sas_url,omitempty"`
  417. }
  418. // HideConfidentialData hides confidential data
  419. func (c *AzBlobFsConfig) HideConfidentialData() {
  420. if c.AccountKey != nil {
  421. c.AccountKey.Hide()
  422. }
  423. if c.SASURL != nil {
  424. c.SASURL.Hide()
  425. }
  426. }
  427. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  428. if c.Container != other.Container {
  429. return false
  430. }
  431. if c.AccountName != other.AccountName {
  432. return false
  433. }
  434. if c.Endpoint != other.Endpoint {
  435. return false
  436. }
  437. if c.SASURL.IsEmpty() {
  438. c.SASURL = kms.NewEmptySecret()
  439. }
  440. if other.SASURL.IsEmpty() {
  441. other.SASURL = kms.NewEmptySecret()
  442. }
  443. if !c.SASURL.IsEqual(other.SASURL) {
  444. return false
  445. }
  446. if c.KeyPrefix != other.KeyPrefix {
  447. return false
  448. }
  449. if c.UploadPartSize != other.UploadPartSize {
  450. return false
  451. }
  452. if c.UploadConcurrency != other.UploadConcurrency {
  453. return false
  454. }
  455. if c.DownloadPartSize != other.DownloadPartSize {
  456. return false
  457. }
  458. if c.DownloadConcurrency != other.DownloadConcurrency {
  459. return false
  460. }
  461. if c.UseEmulator != other.UseEmulator {
  462. return false
  463. }
  464. if c.AccessTier != other.AccessTier {
  465. return false
  466. }
  467. return c.isSecretEqual(other)
  468. }
  469. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  470. if c.AccountKey == nil {
  471. c.AccountKey = kms.NewEmptySecret()
  472. }
  473. if other.AccountKey == nil {
  474. other.AccountKey = kms.NewEmptySecret()
  475. }
  476. return c.AccountKey.IsEqual(other.AccountKey)
  477. }
  478. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  479. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  480. if err := c.validate(); err != nil {
  481. return util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  482. }
  483. if c.AccountKey.IsPlain() {
  484. c.AccountKey.SetAdditionalData(additionalData)
  485. if err := c.AccountKey.Encrypt(); err != nil {
  486. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err))
  487. }
  488. }
  489. if c.SASURL.IsPlain() {
  490. c.SASURL.SetAdditionalData(additionalData)
  491. if err := c.SASURL.Encrypt(); err != nil {
  492. return util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err))
  493. }
  494. }
  495. return nil
  496. }
  497. func (c *AzBlobFsConfig) checkCredentials() error {
  498. if c.SASURL.IsPlain() {
  499. _, err := url.Parse(c.SASURL.GetPayload())
  500. return err
  501. }
  502. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  503. return errors.New("invalid encrypted sas_url")
  504. }
  505. if !c.SASURL.IsEmpty() {
  506. return nil
  507. }
  508. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  509. return errors.New("credentials cannot be empty or invalid")
  510. }
  511. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  512. return errors.New("invalid encrypted account_key")
  513. }
  514. return nil
  515. }
  516. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  517. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  518. return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize)
  519. }
  520. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  521. return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
  522. }
  523. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  524. return fmt.Errorf("invalid download part size: %v", c.DownloadPartSize)
  525. }
  526. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  527. return fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency)
  528. }
  529. return nil
  530. }
  531. func (c *AzBlobFsConfig) tryDecrypt() error {
  532. if err := c.AccountKey.TryDecrypt(); err != nil {
  533. return fmt.Errorf("unable to decrypt account key: %w", err)
  534. }
  535. if err := c.SASURL.TryDecrypt(); err != nil {
  536. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  537. }
  538. return nil
  539. }
  540. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  541. if c.AccountName != other.AccountName {
  542. return false
  543. }
  544. if c.Endpoint != other.Endpoint {
  545. return false
  546. }
  547. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  548. }
  549. // validate returns an error if the configuration is not valid
  550. func (c *AzBlobFsConfig) validate() error {
  551. if c.AccountKey == nil {
  552. c.AccountKey = kms.NewEmptySecret()
  553. }
  554. if c.SASURL == nil {
  555. c.SASURL = kms.NewEmptySecret()
  556. }
  557. // container could be embedded within SAS URL we check this at runtime
  558. if c.SASURL.IsEmpty() && c.Container == "" {
  559. return errors.New("container cannot be empty")
  560. }
  561. if err := c.checkCredentials(); err != nil {
  562. return err
  563. }
  564. if c.KeyPrefix != "" {
  565. if strings.HasPrefix(c.KeyPrefix, "/") {
  566. return errors.New("key_prefix cannot start with /")
  567. }
  568. c.KeyPrefix = path.Clean(c.KeyPrefix)
  569. if !strings.HasSuffix(c.KeyPrefix, "/") {
  570. c.KeyPrefix += "/"
  571. }
  572. }
  573. if err := c.checkPartSizeAndConcurrency(); err != nil {
  574. return err
  575. }
  576. if !util.Contains(validAzAccessTier, c.AccessTier) {
  577. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  578. }
  579. return nil
  580. }
  581. // CryptFsConfig defines the configuration to store local files as encrypted
  582. type CryptFsConfig struct {
  583. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  584. }
  585. // HideConfidentialData hides confidential data
  586. func (c *CryptFsConfig) HideConfidentialData() {
  587. if c.Passphrase != nil {
  588. c.Passphrase.Hide()
  589. }
  590. }
  591. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  592. if c.Passphrase == nil {
  593. c.Passphrase = kms.NewEmptySecret()
  594. }
  595. if other.Passphrase == nil {
  596. other.Passphrase = kms.NewEmptySecret()
  597. }
  598. return c.Passphrase.IsEqual(other.Passphrase)
  599. }
  600. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  601. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  602. if err := c.validate(); err != nil {
  603. return util.NewValidationError(fmt.Sprintf("could not validate Crypt fs config: %v", err))
  604. }
  605. if c.Passphrase.IsPlain() {
  606. c.Passphrase.SetAdditionalData(additionalData)
  607. if err := c.Passphrase.Encrypt(); err != nil {
  608. return util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err))
  609. }
  610. }
  611. return nil
  612. }
  613. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  614. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  615. }
  616. // validate returns an error if the configuration is not valid
  617. func (c *CryptFsConfig) validate() error {
  618. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  619. return errors.New("invalid passphrase")
  620. }
  621. if !c.Passphrase.IsValidInput() {
  622. return errors.New("passphrase cannot be empty or invalid")
  623. }
  624. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  625. return errors.New("invalid encrypted passphrase")
  626. }
  627. return nil
  628. }
  629. // PipeWriter defines a wrapper for pipeat.PipeWriterAt.
  630. type PipeWriter struct {
  631. writer *pipeat.PipeWriterAt
  632. err error
  633. done chan bool
  634. }
  635. // NewPipeWriter initializes a new PipeWriter
  636. func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
  637. return &PipeWriter{
  638. writer: w,
  639. err: nil,
  640. done: make(chan bool),
  641. }
  642. }
  643. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  644. func (p *PipeWriter) Close() error {
  645. p.writer.Close() //nolint:errcheck // the returned error is always null
  646. <-p.done
  647. return p.err
  648. }
  649. // Done unlocks other goroutines waiting on Close().
  650. // It must be called when the upload ends
  651. func (p *PipeWriter) Done(err error) {
  652. p.err = err
  653. p.done <- true
  654. }
  655. // WriteAt is a wrapper for pipeat WriteAt
  656. func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
  657. return p.writer.WriteAt(data, off)
  658. }
  659. // Write is a wrapper for pipeat Write
  660. func (p *PipeWriter) Write(data []byte) (int, error) {
  661. return p.writer.Write(data)
  662. }
  663. func isEqualityCheckModeValid(mode int) bool {
  664. return mode >= 0 || mode <= 1
  665. }
  666. // isDirectory checks if a path exists and is a directory
  667. func isDirectory(fs Fs, path string) (bool, error) {
  668. fileInfo, err := fs.Stat(path)
  669. if err != nil {
  670. return false, err
  671. }
  672. return fileInfo.IsDir(), err
  673. }
  674. // IsLocalOsFs returns true if fs is a local filesystem implementation
  675. func IsLocalOsFs(fs Fs) bool {
  676. return fs.Name() == osFsName
  677. }
  678. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  679. func IsCryptOsFs(fs Fs) bool {
  680. return fs.Name() == cryptFsName
  681. }
  682. // IsSFTPFs returns true if fs is an SFTP filesystem
  683. func IsSFTPFs(fs Fs) bool {
  684. return strings.HasPrefix(fs.Name(), sftpFsName)
  685. }
  686. // IsHTTPFs returns true if fs is an HTTP filesystem
  687. func IsHTTPFs(fs Fs) bool {
  688. return strings.HasPrefix(fs.Name(), httpFsName)
  689. }
  690. // IsBufferedSFTPFs returns true if this is a buffered SFTP filesystem
  691. func IsBufferedSFTPFs(fs Fs) bool {
  692. if !IsSFTPFs(fs) {
  693. return false
  694. }
  695. return !fs.IsUploadResumeSupported()
  696. }
  697. // IsLocalOrUnbufferedSFTPFs returns true if fs is local or SFTP with no buffer
  698. func IsLocalOrUnbufferedSFTPFs(fs Fs) bool {
  699. if IsLocalOsFs(fs) {
  700. return true
  701. }
  702. if IsSFTPFs(fs) {
  703. return fs.IsUploadResumeSupported()
  704. }
  705. return false
  706. }
  707. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  708. func IsLocalOrSFTPFs(fs Fs) bool {
  709. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  710. }
  711. // HasTruncateSupport returns true if the fs supports truncate files
  712. func HasTruncateSupport(fs Fs) bool {
  713. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  714. }
  715. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  716. func HasImplicitAtomicUploads(fs Fs) bool {
  717. if strings.HasPrefix(fs.Name(), s3fsName) {
  718. return true
  719. }
  720. if strings.HasPrefix(fs.Name(), gcsfsName) {
  721. return true
  722. }
  723. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  724. return true
  725. }
  726. return false
  727. }
  728. // HasOpenRWSupport returns true if the fs can open a file
  729. // for reading and writing at the same time
  730. func HasOpenRWSupport(fs Fs) bool {
  731. if IsLocalOsFs(fs) {
  732. return true
  733. }
  734. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  735. return true
  736. }
  737. return false
  738. }
  739. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  740. func IsLocalOrCryptoFs(fs Fs) bool {
  741. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  742. }
  743. // SetPathPermissions calls fs.Chown.
  744. // It does nothing for local filesystem on windows
  745. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  746. if uid == -1 && gid == -1 {
  747. return
  748. }
  749. if IsLocalOsFs(fs) {
  750. if runtime.GOOS == "windows" {
  751. return
  752. }
  753. }
  754. if err := fs.Chown(path, uid, gid); err != nil {
  755. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  756. }
  757. }
  758. func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
  759. if !plugin.Handler.HasMetadater() {
  760. return info, nil
  761. }
  762. if info.IsDir() {
  763. return info, nil
  764. }
  765. mTime, err := plugin.Handler.GetModificationTime(storageID, ensureAbsPath(objectPath), info.IsDir())
  766. if errors.Is(err, metadata.ErrNoSuchObject) {
  767. return info, nil
  768. }
  769. if err != nil {
  770. return info, err
  771. }
  772. info.modTime = util.GetTimeFromMsecSinceEpoch(mTime)
  773. return info, nil
  774. }
  775. func getFolderModTimes(storageID, dirName string) (map[string]int64, error) {
  776. var err error
  777. modTimes := make(map[string]int64)
  778. if plugin.Handler.HasMetadater() {
  779. modTimes, err = plugin.Handler.GetModificationTimes(storageID, ensureAbsPath(dirName))
  780. if err != nil && !errors.Is(err, metadata.ErrNoSuchObject) {
  781. return modTimes, err
  782. }
  783. }
  784. return modTimes, nil
  785. }
  786. func ensureAbsPath(name string) string {
  787. if path.IsAbs(name) {
  788. return name
  789. }
  790. return path.Join("/", name)
  791. }
  792. func fsMetadataCheck(fs fsMetadataChecker, storageID, keyPrefix string) error {
  793. if !plugin.Handler.HasMetadater() {
  794. return nil
  795. }
  796. limit := 100
  797. from := ""
  798. for {
  799. metadataFolders, err := plugin.Handler.GetMetadataFolders(storageID, from, limit)
  800. if err != nil {
  801. fsLog(fs, logger.LevelError, "unable to get folders: %v", err)
  802. return err
  803. }
  804. for _, folder := range metadataFolders {
  805. from = folder
  806. fsPrefix := folder
  807. if !strings.HasSuffix(folder, "/") {
  808. fsPrefix += "/"
  809. }
  810. if keyPrefix != "" {
  811. if !strings.HasPrefix(fsPrefix, "/"+keyPrefix) {
  812. fsLog(fs, logger.LevelDebug, "skip metadata check for folder %q outside prefix %q",
  813. folder, keyPrefix)
  814. continue
  815. }
  816. }
  817. fsLog(fs, logger.LevelDebug, "check metadata for folder %q", folder)
  818. metadataValues, err := plugin.Handler.GetModificationTimes(storageID, folder)
  819. if err != nil {
  820. fsLog(fs, logger.LevelError, "unable to get modification times for folder %q: %v", folder, err)
  821. return err
  822. }
  823. if len(metadataValues) == 0 {
  824. fsLog(fs, logger.LevelDebug, "no metadata for folder %q", folder)
  825. continue
  826. }
  827. fileNames, err := fs.getFileNamesInPrefix(fsPrefix)
  828. if err != nil {
  829. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %v", fsPrefix, err)
  830. return err
  831. }
  832. // now check if we have metadata for a missing object
  833. for k := range metadataValues {
  834. if _, ok := fileNames[k]; !ok {
  835. filePath := ensureAbsPath(path.Join(folder, k))
  836. if err = plugin.Handler.RemoveMetadata(storageID, filePath); err != nil {
  837. fsLog(fs, logger.LevelError, "unable to remove metadata for missing file %q: %v", filePath, err)
  838. } else {
  839. fsLog(fs, logger.LevelDebug, "metadata removed for missing file %q", filePath)
  840. }
  841. }
  842. }
  843. }
  844. if len(metadataFolders) < limit {
  845. return nil
  846. }
  847. }
  848. }
  849. func getMountPath(mountPath string) string {
  850. if mountPath == "/" {
  851. return ""
  852. }
  853. return mountPath
  854. }
  855. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  856. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  857. }