vfs.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. // Package vfs provides local and remote filesystems support
  15. package vfs
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/url"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "runtime"
  25. "strings"
  26. "sync"
  27. "time"
  28. "github.com/eikenb/pipeat"
  29. "github.com/pkg/sftp"
  30. "github.com/sftpgo/sdk"
  31. "github.com/sftpgo/sdk/plugin/metadata"
  32. "github.com/drakkan/sftpgo/v2/internal/kms"
  33. "github.com/drakkan/sftpgo/v2/internal/logger"
  34. "github.com/drakkan/sftpgo/v2/internal/plugin"
  35. "github.com/drakkan/sftpgo/v2/internal/util"
  36. )
  37. const (
  38. dirMimeType = "inode/directory"
  39. s3fsName = "S3Fs"
  40. gcsfsName = "GCSFs"
  41. azBlobFsName = "AzureBlobFs"
  42. preResumeTimeout = 90 * time.Second
  43. )
  44. // Additional checks for files
  45. const (
  46. CheckParentDir = 1
  47. CheckResume = 2
  48. )
  49. var (
  50. validAzAccessTier = []string{"", "Archive", "Hot", "Cool"}
  51. // ErrStorageSizeUnavailable is returned if the storage backend does not support getting the size
  52. ErrStorageSizeUnavailable = errors.New("unable to get available size for this storage backend")
  53. // ErrVfsUnsupported defines the error for an unsupported VFS operation
  54. ErrVfsUnsupported = errors.New("not supported")
  55. tempPath string
  56. sftpFingerprints []string
  57. allowSelfConnections int
  58. renameMode int
  59. readMetadata int
  60. resumeMaxSize int64
  61. uploadMode int
  62. )
  63. // SetAllowSelfConnections sets the desired behaviour for self connections
  64. func SetAllowSelfConnections(value int) {
  65. allowSelfConnections = value
  66. }
  67. // SetTempPath sets the path for temporary files
  68. func SetTempPath(fsPath string) {
  69. tempPath = fsPath
  70. }
  71. // GetTempPath returns the path for temporary files
  72. func GetTempPath() string {
  73. return tempPath
  74. }
  75. // SetSFTPFingerprints sets the SFTP host key fingerprints
  76. func SetSFTPFingerprints(fp []string) {
  77. sftpFingerprints = fp
  78. }
  79. // SetRenameMode sets the rename mode
  80. func SetRenameMode(val int) {
  81. renameMode = val
  82. }
  83. // SetReadMetadataMode sets the read metadata mode
  84. func SetReadMetadataMode(val int) {
  85. readMetadata = val
  86. }
  87. // SetResumeMaxSize sets the max size allowed for resuming uploads for backends
  88. // with immutable objects
  89. func SetResumeMaxSize(val int64) {
  90. resumeMaxSize = val
  91. }
  92. // SetUploadMode sets the upload mode
  93. func SetUploadMode(val int) {
  94. uploadMode = val
  95. }
  96. // Fs defines the interface for filesystem backends
  97. type Fs interface {
  98. Name() string
  99. ConnectionID() string
  100. Stat(name string) (os.FileInfo, error)
  101. Lstat(name string) (os.FileInfo, error)
  102. Open(name string, offset int64) (File, PipeReader, func(), error)
  103. Create(name string, flag, checks int) (File, PipeWriter, func(), error)
  104. Rename(source, target string) (int, int64, error)
  105. Remove(name string, isDir bool) error
  106. Mkdir(name string) error
  107. Symlink(source, target string) error
  108. Chown(name string, uid int, gid int) error
  109. Chmod(name string, mode os.FileMode) error
  110. Chtimes(name string, atime, mtime time.Time, isUploading bool) error
  111. Truncate(name string, size int64) error
  112. ReadDir(dirname string) ([]os.FileInfo, error)
  113. Readlink(name string) (string, error)
  114. IsUploadResumeSupported() bool
  115. IsConditionalUploadResumeSupported(size int64) bool
  116. IsAtomicUploadSupported() bool
  117. CheckRootPath(username string, uid int, gid int) bool
  118. ResolvePath(virtualPath string) (string, error)
  119. IsNotExist(err error) bool
  120. IsPermission(err error) bool
  121. IsNotSupported(err error) bool
  122. ScanRootDirContents() (int, int64, error)
  123. GetDirSize(dirname string) (int, int64, error)
  124. GetAtomicUploadPath(name string) string
  125. GetRelativePath(name string) string
  126. Walk(root string, walkFn filepath.WalkFunc) error
  127. Join(elem ...string) string
  128. HasVirtualFolders() bool
  129. GetMimeType(name string) (string, error)
  130. GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
  131. CheckMetadata() error
  132. Close() error
  133. }
  134. // FsRealPather is a Fs that implements the RealPath method.
  135. type FsRealPather interface {
  136. Fs
  137. RealPath(p string) (string, error)
  138. }
  139. // fsMetadataChecker is a Fs that implements the getFileNamesInPrefix method.
  140. // This interface is used to abstract metadata consistency checks
  141. type fsMetadataChecker interface {
  142. Fs
  143. getFileNamesInPrefix(fsPrefix string) (map[string]bool, error)
  144. }
  145. // FsFileCopier is a Fs that implements the CopyFile method.
  146. type FsFileCopier interface {
  147. Fs
  148. CopyFile(source, target string, srcSize int64) error
  149. }
  150. // File defines an interface representing a SFTPGo file
  151. type File interface {
  152. io.Reader
  153. io.Writer
  154. io.Closer
  155. io.ReaderAt
  156. io.WriterAt
  157. io.Seeker
  158. Stat() (os.FileInfo, error)
  159. Name() string
  160. Truncate(size int64) error
  161. }
  162. // PipeWriter defines an interface representing a SFTPGo pipe writer
  163. type PipeWriter interface {
  164. io.Writer
  165. io.WriterAt
  166. io.Closer
  167. Done(err error)
  168. GetWrittenBytes() int64
  169. }
  170. // PipeReader defines an interface representing a SFTPGo pipe reader
  171. type PipeReader interface {
  172. io.Reader
  173. io.ReaderAt
  174. io.Closer
  175. setMetadata(value map[string]string)
  176. setMetadataFromPointerVal(value map[string]*string)
  177. Metadata() map[string]string
  178. }
  179. // Metadater defines an interface to implement to return metadata for a file
  180. type Metadater interface {
  181. Metadata() map[string]string
  182. }
  183. // QuotaCheckResult defines the result for a quota check
  184. type QuotaCheckResult struct {
  185. HasSpace bool
  186. AllowedSize int64
  187. AllowedFiles int
  188. UsedSize int64
  189. UsedFiles int
  190. QuotaSize int64
  191. QuotaFiles int
  192. }
  193. // GetRemainingSize returns the remaining allowed size
  194. func (q *QuotaCheckResult) GetRemainingSize() int64 {
  195. if q.QuotaSize > 0 {
  196. return q.QuotaSize - q.UsedSize
  197. }
  198. return 0
  199. }
  200. // GetRemainingFiles returns the remaining allowed files
  201. func (q *QuotaCheckResult) GetRemainingFiles() int {
  202. if q.QuotaFiles > 0 {
  203. return q.QuotaFiles - q.UsedFiles
  204. }
  205. return 0
  206. }
  207. // S3FsConfig defines the configuration for S3 based filesystem
  208. type S3FsConfig struct {
  209. sdk.BaseS3FsConfig
  210. AccessSecret *kms.Secret `json:"access_secret,omitempty"`
  211. }
  212. // HideConfidentialData hides confidential data
  213. func (c *S3FsConfig) HideConfidentialData() {
  214. if c.AccessSecret != nil {
  215. c.AccessSecret.Hide()
  216. }
  217. }
  218. func (c *S3FsConfig) isEqual(other S3FsConfig) bool {
  219. if c.Bucket != other.Bucket {
  220. return false
  221. }
  222. if c.KeyPrefix != other.KeyPrefix {
  223. return false
  224. }
  225. if c.Region != other.Region {
  226. return false
  227. }
  228. if c.AccessKey != other.AccessKey {
  229. return false
  230. }
  231. if c.RoleARN != other.RoleARN {
  232. return false
  233. }
  234. if c.Endpoint != other.Endpoint {
  235. return false
  236. }
  237. if c.StorageClass != other.StorageClass {
  238. return false
  239. }
  240. if c.ACL != other.ACL {
  241. return false
  242. }
  243. if !c.areMultipartFieldsEqual(other) {
  244. return false
  245. }
  246. if c.ForcePathStyle != other.ForcePathStyle {
  247. return false
  248. }
  249. if c.SkipTLSVerify != other.SkipTLSVerify {
  250. return false
  251. }
  252. return c.isSecretEqual(other)
  253. }
  254. func (c *S3FsConfig) areMultipartFieldsEqual(other S3FsConfig) bool {
  255. if c.UploadPartSize != other.UploadPartSize {
  256. return false
  257. }
  258. if c.UploadConcurrency != other.UploadConcurrency {
  259. return false
  260. }
  261. if c.DownloadConcurrency != other.DownloadConcurrency {
  262. return false
  263. }
  264. if c.DownloadPartSize != other.DownloadPartSize {
  265. return false
  266. }
  267. if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
  268. return false
  269. }
  270. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  271. return false
  272. }
  273. return true
  274. }
  275. func (c *S3FsConfig) isSecretEqual(other S3FsConfig) bool {
  276. if c.AccessSecret == nil {
  277. c.AccessSecret = kms.NewEmptySecret()
  278. }
  279. if other.AccessSecret == nil {
  280. other.AccessSecret = kms.NewEmptySecret()
  281. }
  282. return c.AccessSecret.IsEqual(other.AccessSecret)
  283. }
  284. func (c *S3FsConfig) checkCredentials() error {
  285. if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
  286. return util.NewI18nError(
  287. errors.New("access_key cannot be empty with access_secret not empty"),
  288. util.I18nErrorAccessKeyRequired,
  289. )
  290. }
  291. if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
  292. return util.NewI18nError(
  293. errors.New("access_secret cannot be empty with access_key not empty"),
  294. util.I18nErrorAccessSecretRequired,
  295. )
  296. }
  297. if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
  298. return errors.New("invalid encrypted access_secret")
  299. }
  300. if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
  301. return errors.New("invalid access_secret")
  302. }
  303. return nil
  304. }
  305. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  306. func (c *S3FsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  307. if err := c.validate(); err != nil {
  308. var errI18n *util.I18nError
  309. errValidation := util.NewValidationError(fmt.Sprintf("could not validate s3config: %v", err))
  310. if errors.As(err, &errI18n) {
  311. return util.NewI18nError(errValidation, errI18n.Message)
  312. }
  313. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  314. }
  315. if c.AccessSecret.IsPlain() {
  316. c.AccessSecret.SetAdditionalData(additionalData)
  317. err := c.AccessSecret.Encrypt()
  318. if err != nil {
  319. return util.NewI18nError(
  320. util.NewValidationError(fmt.Sprintf("could not encrypt s3 access secret: %v", err)),
  321. util.I18nErrorFsValidation,
  322. )
  323. }
  324. }
  325. return nil
  326. }
  327. func (c *S3FsConfig) checkPartSizeAndConcurrency() error {
  328. if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
  329. return util.NewI18nError(
  330. errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  331. util.I18nErrorULPartSizeInvalid,
  332. )
  333. }
  334. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  335. return util.NewI18nError(
  336. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  337. util.I18nErrorULConcurrencyInvalid,
  338. )
  339. }
  340. if c.DownloadPartSize != 0 && (c.DownloadPartSize < 5 || c.DownloadPartSize > 5000) {
  341. return util.NewI18nError(
  342. errors.New("download_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)"),
  343. util.I18nErrorDLPartSizeInvalid,
  344. )
  345. }
  346. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  347. return util.NewI18nError(
  348. fmt.Errorf("invalid download concurrency: %v", c.DownloadConcurrency),
  349. util.I18nErrorDLConcurrencyInvalid,
  350. )
  351. }
  352. return nil
  353. }
  354. func (c *S3FsConfig) isSameResource(other S3FsConfig) bool {
  355. if c.Bucket != other.Bucket {
  356. return false
  357. }
  358. if c.Endpoint != other.Endpoint {
  359. return false
  360. }
  361. return c.Region == other.Region
  362. }
  363. // validate returns an error if the configuration is not valid
  364. func (c *S3FsConfig) validate() error {
  365. if c.AccessSecret == nil {
  366. c.AccessSecret = kms.NewEmptySecret()
  367. }
  368. if c.Bucket == "" {
  369. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  370. }
  371. // the region may be embedded within the endpoint for some S3 compatible
  372. // object storage, for example B2
  373. if c.Endpoint == "" && c.Region == "" {
  374. return util.NewI18nError(errors.New("region cannot be empty"), util.I18nErrorRegionRequired)
  375. }
  376. if err := c.checkCredentials(); err != nil {
  377. return err
  378. }
  379. if c.KeyPrefix != "" {
  380. if strings.HasPrefix(c.KeyPrefix, "/") {
  381. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  382. }
  383. c.KeyPrefix = path.Clean(c.KeyPrefix)
  384. if !strings.HasSuffix(c.KeyPrefix, "/") {
  385. c.KeyPrefix += "/"
  386. }
  387. }
  388. c.StorageClass = strings.TrimSpace(c.StorageClass)
  389. c.ACL = strings.TrimSpace(c.ACL)
  390. return c.checkPartSizeAndConcurrency()
  391. }
  392. // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
  393. type GCSFsConfig struct {
  394. sdk.BaseGCSFsConfig
  395. Credentials *kms.Secret `json:"credentials,omitempty"`
  396. }
  397. // HideConfidentialData hides confidential data
  398. func (c *GCSFsConfig) HideConfidentialData() {
  399. if c.Credentials != nil {
  400. c.Credentials.Hide()
  401. }
  402. }
  403. // ValidateAndEncryptCredentials validates the configuration and encrypts credentials if they are in plain text
  404. func (c *GCSFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  405. if err := c.validate(); err != nil {
  406. var errI18n *util.I18nError
  407. errValidation := util.NewValidationError(fmt.Sprintf("could not validate GCS config: %v", err))
  408. if errors.As(err, &errI18n) {
  409. return util.NewI18nError(errValidation, errI18n.Message)
  410. }
  411. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  412. }
  413. if c.Credentials.IsPlain() {
  414. c.Credentials.SetAdditionalData(additionalData)
  415. err := c.Credentials.Encrypt()
  416. if err != nil {
  417. return util.NewI18nError(
  418. util.NewValidationError(fmt.Sprintf("could not encrypt GCS credentials: %v", err)),
  419. util.I18nErrorFsValidation,
  420. )
  421. }
  422. }
  423. return nil
  424. }
  425. func (c *GCSFsConfig) isEqual(other GCSFsConfig) bool {
  426. if c.Bucket != other.Bucket {
  427. return false
  428. }
  429. if c.KeyPrefix != other.KeyPrefix {
  430. return false
  431. }
  432. if c.AutomaticCredentials != other.AutomaticCredentials {
  433. return false
  434. }
  435. if c.StorageClass != other.StorageClass {
  436. return false
  437. }
  438. if c.ACL != other.ACL {
  439. return false
  440. }
  441. if c.UploadPartSize != other.UploadPartSize {
  442. return false
  443. }
  444. if c.UploadPartMaxTime != other.UploadPartMaxTime {
  445. return false
  446. }
  447. if c.Credentials == nil {
  448. c.Credentials = kms.NewEmptySecret()
  449. }
  450. if other.Credentials == nil {
  451. other.Credentials = kms.NewEmptySecret()
  452. }
  453. return c.Credentials.IsEqual(other.Credentials)
  454. }
  455. func (c *GCSFsConfig) isSameResource(other GCSFsConfig) bool {
  456. return c.Bucket == other.Bucket
  457. }
  458. // validate returns an error if the configuration is not valid
  459. func (c *GCSFsConfig) validate() error {
  460. if c.Credentials == nil || c.AutomaticCredentials == 1 {
  461. c.Credentials = kms.NewEmptySecret()
  462. }
  463. if c.Bucket == "" {
  464. return util.NewI18nError(errors.New("bucket cannot be empty"), util.I18nErrorBucketRequired)
  465. }
  466. if c.KeyPrefix != "" {
  467. if strings.HasPrefix(c.KeyPrefix, "/") {
  468. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  469. }
  470. c.KeyPrefix = path.Clean(c.KeyPrefix)
  471. if !strings.HasSuffix(c.KeyPrefix, "/") {
  472. c.KeyPrefix += "/"
  473. }
  474. }
  475. if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
  476. return errors.New("invalid encrypted credentials")
  477. }
  478. if c.AutomaticCredentials == 0 && !c.Credentials.IsValidInput() {
  479. return util.NewI18nError(errors.New("invalid credentials"), util.I18nErrorFsCredentialsRequired)
  480. }
  481. c.StorageClass = strings.TrimSpace(c.StorageClass)
  482. c.ACL = strings.TrimSpace(c.ACL)
  483. if c.UploadPartSize < 0 {
  484. c.UploadPartSize = 0
  485. }
  486. if c.UploadPartMaxTime < 0 {
  487. c.UploadPartMaxTime = 0
  488. }
  489. return nil
  490. }
  491. // AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
  492. type AzBlobFsConfig struct {
  493. sdk.BaseAzBlobFsConfig
  494. // Storage Account Key leave blank to use SAS URL.
  495. // The access key is stored encrypted based on the kms configuration
  496. AccountKey *kms.Secret `json:"account_key,omitempty"`
  497. // Shared access signature URL, leave blank if using account/key
  498. SASURL *kms.Secret `json:"sas_url,omitempty"`
  499. }
  500. // HideConfidentialData hides confidential data
  501. func (c *AzBlobFsConfig) HideConfidentialData() {
  502. if c.AccountKey != nil {
  503. c.AccountKey.Hide()
  504. }
  505. if c.SASURL != nil {
  506. c.SASURL.Hide()
  507. }
  508. }
  509. func (c *AzBlobFsConfig) isEqual(other AzBlobFsConfig) bool {
  510. if c.Container != other.Container {
  511. return false
  512. }
  513. if c.AccountName != other.AccountName {
  514. return false
  515. }
  516. if c.Endpoint != other.Endpoint {
  517. return false
  518. }
  519. if c.SASURL.IsEmpty() {
  520. c.SASURL = kms.NewEmptySecret()
  521. }
  522. if other.SASURL.IsEmpty() {
  523. other.SASURL = kms.NewEmptySecret()
  524. }
  525. if !c.SASURL.IsEqual(other.SASURL) {
  526. return false
  527. }
  528. if c.KeyPrefix != other.KeyPrefix {
  529. return false
  530. }
  531. if c.UploadPartSize != other.UploadPartSize {
  532. return false
  533. }
  534. if c.UploadConcurrency != other.UploadConcurrency {
  535. return false
  536. }
  537. if c.DownloadPartSize != other.DownloadPartSize {
  538. return false
  539. }
  540. if c.DownloadConcurrency != other.DownloadConcurrency {
  541. return false
  542. }
  543. if c.UseEmulator != other.UseEmulator {
  544. return false
  545. }
  546. if c.AccessTier != other.AccessTier {
  547. return false
  548. }
  549. return c.isSecretEqual(other)
  550. }
  551. func (c *AzBlobFsConfig) isSecretEqual(other AzBlobFsConfig) bool {
  552. if c.AccountKey == nil {
  553. c.AccountKey = kms.NewEmptySecret()
  554. }
  555. if other.AccountKey == nil {
  556. other.AccountKey = kms.NewEmptySecret()
  557. }
  558. return c.AccountKey.IsEqual(other.AccountKey)
  559. }
  560. // ValidateAndEncryptCredentials validates the configuration and encrypts access secret if it is in plain text
  561. func (c *AzBlobFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  562. if err := c.validate(); err != nil {
  563. var errI18n *util.I18nError
  564. errValidation := util.NewValidationError(fmt.Sprintf("could not validate Azure Blob config: %v", err))
  565. if errors.As(err, &errI18n) {
  566. return util.NewI18nError(errValidation, errI18n.Message)
  567. }
  568. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  569. }
  570. if c.AccountKey.IsPlain() {
  571. c.AccountKey.SetAdditionalData(additionalData)
  572. if err := c.AccountKey.Encrypt(); err != nil {
  573. return util.NewI18nError(
  574. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob account key: %v", err)),
  575. util.I18nErrorFsValidation,
  576. )
  577. }
  578. }
  579. if c.SASURL.IsPlain() {
  580. c.SASURL.SetAdditionalData(additionalData)
  581. if err := c.SASURL.Encrypt(); err != nil {
  582. return util.NewI18nError(
  583. util.NewValidationError(fmt.Sprintf("could not encrypt Azure blob SAS URL: %v", err)),
  584. util.I18nErrorFsValidation,
  585. )
  586. }
  587. }
  588. return nil
  589. }
  590. func (c *AzBlobFsConfig) checkCredentials() error {
  591. if c.SASURL.IsPlain() {
  592. _, err := url.Parse(c.SASURL.GetPayload())
  593. if err != nil {
  594. return util.NewI18nError(err, util.I18nErrorSASURLInvalid)
  595. }
  596. return nil
  597. }
  598. if c.SASURL.IsEncrypted() && !c.SASURL.IsValid() {
  599. return errors.New("invalid encrypted sas_url")
  600. }
  601. if !c.SASURL.IsEmpty() {
  602. return nil
  603. }
  604. if c.AccountName == "" || !c.AccountKey.IsValidInput() {
  605. return util.NewI18nError(errors.New("credentials cannot be empty or invalid"), util.I18nErrorAccountNameRequired)
  606. }
  607. if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
  608. return errors.New("invalid encrypted account_key")
  609. }
  610. return nil
  611. }
  612. func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error {
  613. if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
  614. return util.NewI18nError(
  615. fmt.Errorf("invalid upload part size: %v", c.UploadPartSize),
  616. util.I18nErrorULPartSizeInvalid,
  617. )
  618. }
  619. if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
  620. return util.NewI18nError(
  621. fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency),
  622. util.I18nErrorULConcurrencyInvalid,
  623. )
  624. }
  625. if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 {
  626. return util.NewI18nError(
  627. fmt.Errorf("invalid download part size: %v", c.DownloadPartSize),
  628. util.I18nErrorDLPartSizeInvalid,
  629. )
  630. }
  631. if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 {
  632. return util.NewI18nError(
  633. fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency),
  634. util.I18nErrorDLConcurrencyInvalid,
  635. )
  636. }
  637. return nil
  638. }
  639. func (c *AzBlobFsConfig) tryDecrypt() error {
  640. if err := c.AccountKey.TryDecrypt(); err != nil {
  641. return fmt.Errorf("unable to decrypt account key: %w", err)
  642. }
  643. if err := c.SASURL.TryDecrypt(); err != nil {
  644. return fmt.Errorf("unable to decrypt SAS URL: %w", err)
  645. }
  646. return nil
  647. }
  648. func (c *AzBlobFsConfig) isSameResource(other AzBlobFsConfig) bool {
  649. if c.AccountName != other.AccountName {
  650. return false
  651. }
  652. if c.Endpoint != other.Endpoint {
  653. return false
  654. }
  655. return c.SASURL.GetPayload() == other.SASURL.GetPayload()
  656. }
  657. // validate returns an error if the configuration is not valid
  658. func (c *AzBlobFsConfig) validate() error {
  659. if c.AccountKey == nil {
  660. c.AccountKey = kms.NewEmptySecret()
  661. }
  662. if c.SASURL == nil {
  663. c.SASURL = kms.NewEmptySecret()
  664. }
  665. // container could be embedded within SAS URL we check this at runtime
  666. if c.SASURL.IsEmpty() && c.Container == "" {
  667. return util.NewI18nError(errors.New("container cannot be empty"), util.I18nErrorContainerRequired)
  668. }
  669. if err := c.checkCredentials(); err != nil {
  670. return err
  671. }
  672. if c.KeyPrefix != "" {
  673. if strings.HasPrefix(c.KeyPrefix, "/") {
  674. return util.NewI18nError(errors.New("key_prefix cannot start with /"), util.I18nErrorKeyPrefixInvalid)
  675. }
  676. c.KeyPrefix = path.Clean(c.KeyPrefix)
  677. if !strings.HasSuffix(c.KeyPrefix, "/") {
  678. c.KeyPrefix += "/"
  679. }
  680. }
  681. if err := c.checkPartSizeAndConcurrency(); err != nil {
  682. return err
  683. }
  684. if !util.Contains(validAzAccessTier, c.AccessTier) {
  685. return fmt.Errorf("invalid access tier %q, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
  686. }
  687. return nil
  688. }
  689. // CryptFsConfig defines the configuration to store local files as encrypted
  690. type CryptFsConfig struct {
  691. sdk.OSFsConfig
  692. Passphrase *kms.Secret `json:"passphrase,omitempty"`
  693. }
  694. // HideConfidentialData hides confidential data
  695. func (c *CryptFsConfig) HideConfidentialData() {
  696. if c.Passphrase != nil {
  697. c.Passphrase.Hide()
  698. }
  699. }
  700. func (c *CryptFsConfig) isEqual(other CryptFsConfig) bool {
  701. if c.Passphrase == nil {
  702. c.Passphrase = kms.NewEmptySecret()
  703. }
  704. if other.Passphrase == nil {
  705. other.Passphrase = kms.NewEmptySecret()
  706. }
  707. return c.Passphrase.IsEqual(other.Passphrase)
  708. }
  709. // ValidateAndEncryptCredentials validates the configuration and encrypts the passphrase if it is in plain text
  710. func (c *CryptFsConfig) ValidateAndEncryptCredentials(additionalData string) error {
  711. if err := c.validate(); err != nil {
  712. var errI18n *util.I18nError
  713. errValidation := util.NewValidationError(fmt.Sprintf("could not validate crypt fs config: %v", err))
  714. if errors.As(err, &errI18n) {
  715. return util.NewI18nError(errValidation, errI18n.Message)
  716. }
  717. return util.NewI18nError(errValidation, util.I18nErrorFsValidation)
  718. }
  719. if c.Passphrase.IsPlain() {
  720. c.Passphrase.SetAdditionalData(additionalData)
  721. if err := c.Passphrase.Encrypt(); err != nil {
  722. return util.NewI18nError(
  723. util.NewValidationError(fmt.Sprintf("could not encrypt Crypt fs passphrase: %v", err)),
  724. util.I18nErrorFsValidation,
  725. )
  726. }
  727. }
  728. return nil
  729. }
  730. func (c *CryptFsConfig) isSameResource(other CryptFsConfig) bool {
  731. return c.Passphrase.GetPayload() == other.Passphrase.GetPayload()
  732. }
  733. // validate returns an error if the configuration is not valid
  734. func (c *CryptFsConfig) validate() error {
  735. if c.Passphrase == nil || c.Passphrase.IsEmpty() {
  736. return util.NewI18nError(errors.New("invalid passphrase"), util.I18nErrorPassphraseRequired)
  737. }
  738. if !c.Passphrase.IsValidInput() {
  739. return util.NewI18nError(errors.New("passphrase cannot be empty or invalid"), util.I18nErrorPassphraseRequired)
  740. }
  741. if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
  742. return errors.New("invalid encrypted passphrase")
  743. }
  744. return nil
  745. }
  746. // pipeWriter defines a wrapper for pipeat.PipeWriterAt.
  747. type pipeWriter struct {
  748. *pipeat.PipeWriterAt
  749. err error
  750. done chan bool
  751. }
  752. // NewPipeWriter initializes a new PipeWriter
  753. func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
  754. return &pipeWriter{
  755. PipeWriterAt: w,
  756. err: nil,
  757. done: make(chan bool),
  758. }
  759. }
  760. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
  761. func (p *pipeWriter) Close() error {
  762. p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
  763. <-p.done
  764. return p.err
  765. }
  766. // Done unlocks other goroutines waiting on Close().
  767. // It must be called when the upload ends
  768. func (p *pipeWriter) Done(err error) {
  769. p.err = err
  770. p.done <- true
  771. }
  772. func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
  773. return &pipeWriterAtOffset{
  774. pipeWriter: &pipeWriter{
  775. PipeWriterAt: w,
  776. err: nil,
  777. done: make(chan bool),
  778. },
  779. offset: offset,
  780. writeOffset: offset,
  781. }
  782. }
  783. type pipeWriterAtOffset struct {
  784. *pipeWriter
  785. offset int64
  786. writeOffset int64
  787. }
  788. func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
  789. if off < p.offset {
  790. return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
  791. }
  792. return p.pipeWriter.WriteAt(buf, off-p.offset)
  793. }
  794. func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
  795. n, err := p.WriteAt(buf, p.writeOffset)
  796. p.writeOffset += int64(n)
  797. return n, err
  798. }
  799. // NewPipeReader initializes a new PipeReader
  800. func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader {
  801. return &pipeReader{
  802. PipeReaderAt: r,
  803. }
  804. }
  805. // pipeReader defines a wrapper for pipeat.PipeReaderAt.
  806. type pipeReader struct {
  807. *pipeat.PipeReaderAt
  808. mu sync.RWMutex
  809. metadata map[string]string
  810. }
  811. func (p *pipeReader) setMetadata(value map[string]string) {
  812. p.mu.Lock()
  813. defer p.mu.Unlock()
  814. p.metadata = value
  815. }
  816. func (p *pipeReader) setMetadataFromPointerVal(value map[string]*string) {
  817. p.mu.Lock()
  818. defer p.mu.Unlock()
  819. if len(value) == 0 {
  820. p.metadata = nil
  821. return
  822. }
  823. p.metadata = map[string]string{}
  824. for k, v := range value {
  825. val := util.GetStringFromPointer(v)
  826. if val != "" {
  827. p.metadata[k] = val
  828. }
  829. }
  830. }
  831. // Metadata implements the Metadater interface
  832. func (p *pipeReader) Metadata() map[string]string {
  833. p.mu.RLock()
  834. defer p.mu.RUnlock()
  835. if len(p.metadata) == 0 {
  836. return nil
  837. }
  838. result := make(map[string]string)
  839. for k, v := range p.metadata {
  840. result[k] = v
  841. }
  842. return result
  843. }
  844. func isEqualityCheckModeValid(mode int) bool {
  845. return mode >= 0 || mode <= 1
  846. }
  847. // isDirectory checks if a path exists and is a directory
  848. func isDirectory(fs Fs, path string) (bool, error) {
  849. fileInfo, err := fs.Stat(path)
  850. if err != nil {
  851. return false, err
  852. }
  853. return fileInfo.IsDir(), err
  854. }
  855. // IsLocalOsFs returns true if fs is a local filesystem implementation
  856. func IsLocalOsFs(fs Fs) bool {
  857. return fs.Name() == osFsName
  858. }
  859. // IsCryptOsFs returns true if fs is an encrypted local filesystem implementation
  860. func IsCryptOsFs(fs Fs) bool {
  861. return fs.Name() == cryptFsName
  862. }
  863. // IsSFTPFs returns true if fs is an SFTP filesystem
  864. func IsSFTPFs(fs Fs) bool {
  865. return strings.HasPrefix(fs.Name(), sftpFsName)
  866. }
  867. // IsHTTPFs returns true if fs is an HTTP filesystem
  868. func IsHTTPFs(fs Fs) bool {
  869. return strings.HasPrefix(fs.Name(), httpFsName)
  870. }
  871. // IsBufferedLocalOrSFTPFs returns true if this is a buffered SFTP or local filesystem
  872. func IsBufferedLocalOrSFTPFs(fs Fs) bool {
  873. if osFs, ok := fs.(*OsFs); ok {
  874. return osFs.writeBufferSize > 0
  875. }
  876. if !IsSFTPFs(fs) {
  877. return false
  878. }
  879. return !fs.IsUploadResumeSupported()
  880. }
  881. // FsOpenReturnsFile returns true if fs.Open returns a *os.File handle
  882. func FsOpenReturnsFile(fs Fs) bool {
  883. if osFs, ok := fs.(*OsFs); ok {
  884. return osFs.readBufferSize == 0
  885. }
  886. if sftpFs, ok := fs.(*SFTPFs); ok {
  887. return sftpFs.config.BufferSize == 0
  888. }
  889. return false
  890. }
  891. // IsLocalOrSFTPFs returns true if fs is local or SFTP
  892. func IsLocalOrSFTPFs(fs Fs) bool {
  893. return IsLocalOsFs(fs) || IsSFTPFs(fs)
  894. }
  895. // HasTruncateSupport returns true if the fs supports truncate files
  896. func HasTruncateSupport(fs Fs) bool {
  897. return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
  898. }
  899. // IsRenameAtomic returns true if renaming a directory is supposed to be atomic
  900. func IsRenameAtomic(fs Fs) bool {
  901. if strings.HasPrefix(fs.Name(), s3fsName) {
  902. return false
  903. }
  904. if strings.HasPrefix(fs.Name(), gcsfsName) {
  905. return false
  906. }
  907. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  908. return false
  909. }
  910. return true
  911. }
  912. // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
  913. func HasImplicitAtomicUploads(fs Fs) bool {
  914. if strings.HasPrefix(fs.Name(), s3fsName) {
  915. return uploadMode&4 == 0
  916. }
  917. if strings.HasPrefix(fs.Name(), gcsfsName) {
  918. return uploadMode&8 == 0
  919. }
  920. if strings.HasPrefix(fs.Name(), azBlobFsName) {
  921. return uploadMode&16 == 0
  922. }
  923. return false
  924. }
  925. // HasOpenRWSupport returns true if the fs can open a file
  926. // for reading and writing at the same time
  927. func HasOpenRWSupport(fs Fs) bool {
  928. if IsLocalOsFs(fs) {
  929. return true
  930. }
  931. if IsSFTPFs(fs) && fs.IsUploadResumeSupported() {
  932. return true
  933. }
  934. return false
  935. }
  936. // IsLocalOrCryptoFs returns true if fs is local or local encrypted
  937. func IsLocalOrCryptoFs(fs Fs) bool {
  938. return IsLocalOsFs(fs) || IsCryptOsFs(fs)
  939. }
  940. // SetPathPermissions calls fs.Chown.
  941. // It does nothing for local filesystem on windows
  942. func SetPathPermissions(fs Fs, path string, uid int, gid int) {
  943. if uid == -1 && gid == -1 {
  944. return
  945. }
  946. if IsLocalOsFs(fs) {
  947. if runtime.GOOS == "windows" {
  948. return
  949. }
  950. }
  951. if err := fs.Chown(path, uid, gid); err != nil {
  952. fsLog(fs, logger.LevelWarn, "error chowning path %v: %v", path, err)
  953. }
  954. }
  955. // IsUploadResumeSupported returns true if resuming uploads is supported
  956. func IsUploadResumeSupported(fs Fs, size int64) bool {
  957. if fs.IsUploadResumeSupported() {
  958. return true
  959. }
  960. return fs.IsConditionalUploadResumeSupported(size)
  961. }
  962. func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
  963. if !plugin.Handler.HasMetadater() {
  964. return info, nil
  965. }
  966. if info.IsDir() {
  967. return info, nil
  968. }
  969. mTime, err := plugin.Handler.GetModificationTime(storageID, ensureAbsPath(objectPath), info.IsDir())
  970. if errors.Is(err, metadata.ErrNoSuchObject) {
  971. return info, nil
  972. }
  973. if err != nil {
  974. return info, err
  975. }
  976. info.modTime = util.GetTimeFromMsecSinceEpoch(mTime)
  977. return info, nil
  978. }
  979. func getFolderModTimes(storageID, dirName string) (map[string]int64, error) {
  980. var err error
  981. modTimes := make(map[string]int64)
  982. if plugin.Handler.HasMetadater() {
  983. modTimes, err = plugin.Handler.GetModificationTimes(storageID, ensureAbsPath(dirName))
  984. if err != nil && !errors.Is(err, metadata.ErrNoSuchObject) {
  985. return modTimes, err
  986. }
  987. }
  988. return modTimes, nil
  989. }
  990. func ensureAbsPath(name string) string {
  991. if path.IsAbs(name) {
  992. return name
  993. }
  994. return path.Join("/", name)
  995. }
  996. func fsMetadataCheck(fs fsMetadataChecker, storageID, keyPrefix string) error {
  997. if !plugin.Handler.HasMetadater() {
  998. return nil
  999. }
  1000. limit := 100
  1001. from := ""
  1002. for {
  1003. metadataFolders, err := plugin.Handler.GetMetadataFolders(storageID, from, limit)
  1004. if err != nil {
  1005. fsLog(fs, logger.LevelError, "unable to get folders: %v", err)
  1006. return err
  1007. }
  1008. for _, folder := range metadataFolders {
  1009. from = folder
  1010. fsPrefix := folder
  1011. if !strings.HasSuffix(folder, "/") {
  1012. fsPrefix += "/"
  1013. }
  1014. if keyPrefix != "" {
  1015. if !strings.HasPrefix(fsPrefix, "/"+keyPrefix) {
  1016. fsLog(fs, logger.LevelDebug, "skip metadata check for folder %q outside prefix %q",
  1017. folder, keyPrefix)
  1018. continue
  1019. }
  1020. }
  1021. fsLog(fs, logger.LevelDebug, "check metadata for folder %q", folder)
  1022. metadataValues, err := plugin.Handler.GetModificationTimes(storageID, folder)
  1023. if err != nil {
  1024. fsLog(fs, logger.LevelError, "unable to get modification times for folder %q: %v", folder, err)
  1025. return err
  1026. }
  1027. if len(metadataValues) == 0 {
  1028. fsLog(fs, logger.LevelDebug, "no metadata for folder %q", folder)
  1029. continue
  1030. }
  1031. fileNames, err := fs.getFileNamesInPrefix(fsPrefix)
  1032. if err != nil {
  1033. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %v", fsPrefix, err)
  1034. return err
  1035. }
  1036. // now check if we have metadata for a missing object
  1037. for k := range metadataValues {
  1038. if _, ok := fileNames[k]; !ok {
  1039. filePath := ensureAbsPath(path.Join(folder, k))
  1040. if err = plugin.Handler.RemoveMetadata(storageID, filePath); err != nil {
  1041. fsLog(fs, logger.LevelError, "unable to remove metadata for missing file %q: %v", filePath, err)
  1042. } else {
  1043. fsLog(fs, logger.LevelDebug, "metadata removed for missing file %q", filePath)
  1044. }
  1045. }
  1046. }
  1047. }
  1048. if len(metadataFolders) < limit {
  1049. return nil
  1050. }
  1051. }
  1052. }
  1053. func validateOSFsConfig(config *sdk.OSFsConfig) error {
  1054. if config.ReadBufferSize < 0 || config.ReadBufferSize > 10 {
  1055. return fmt.Errorf("invalid read buffer size must be between 0 and 10 MB")
  1056. }
  1057. if config.WriteBufferSize < 0 || config.WriteBufferSize > 10 {
  1058. return fmt.Errorf("invalid write buffer size must be between 0 and 10 MB")
  1059. }
  1060. return nil
  1061. }
  1062. func doCopy(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  1063. if buf == nil {
  1064. buf = make([]byte, 32768)
  1065. }
  1066. for {
  1067. nr, er := src.Read(buf)
  1068. if nr > 0 {
  1069. nw, ew := dst.Write(buf[0:nr])
  1070. if nw < 0 || nr < nw {
  1071. nw = 0
  1072. if ew == nil {
  1073. ew = errors.New("invalid write")
  1074. }
  1075. }
  1076. written += int64(nw)
  1077. if ew != nil {
  1078. err = ew
  1079. break
  1080. }
  1081. if nr != nw {
  1082. err = io.ErrShortWrite
  1083. break
  1084. }
  1085. }
  1086. if er != nil {
  1087. if er != io.EOF {
  1088. err = er
  1089. }
  1090. break
  1091. }
  1092. }
  1093. return written, err
  1094. }
  1095. func getMountPath(mountPath string) string {
  1096. if mountPath == "/" {
  1097. return ""
  1098. }
  1099. return mountPath
  1100. }
  1101. func getLocalTempDir() string {
  1102. if tempPath != "" {
  1103. return tempPath
  1104. }
  1105. return filepath.Clean(os.TempDir())
  1106. }
  1107. func fsLog(fs Fs, level logger.LogLevel, format string, v ...any) {
  1108. logger.Log(level, fs.Name(), fs.ConnectionID(), format, v...)
  1109. }