s3fs.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "mime"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/aws/aws-sdk-go-v2/aws"
  34. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  35. "github.com/aws/aws-sdk-go-v2/config"
  36. "github.com/aws/aws-sdk-go-v2/credentials"
  37. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  38. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  39. "github.com/aws/aws-sdk-go-v2/service/s3"
  40. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  41. "github.com/aws/aws-sdk-go-v2/service/sts"
  42. "github.com/eikenb/pipeat"
  43. "github.com/pkg/sftp"
  44. "github.com/drakkan/sftpgo/v2/internal/logger"
  45. "github.com/drakkan/sftpgo/v2/internal/metric"
  46. "github.com/drakkan/sftpgo/v2/internal/plugin"
  47. "github.com/drakkan/sftpgo/v2/internal/util"
  48. "github.com/drakkan/sftpgo/v2/internal/version"
  49. )
  50. const (
  51. // using this mime type for directories improves compatibility with s3fs-fuse
  52. s3DirMimeType = "application/x-directory"
  53. s3TransferBufferSize = 256 * 1024
  54. )
  55. var (
  56. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  57. )
  58. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  59. type S3Fs struct {
  60. connectionID string
  61. localTempDir string
  62. // if not empty this fs is mouted as virtual folder in the specified path
  63. mountPath string
  64. config *S3FsConfig
  65. svc *s3.Client
  66. ctxTimeout time.Duration
  67. }
  68. func init() {
  69. version.AddFeature("+s3")
  70. }
  71. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  72. // object storage
  73. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  74. if localTempDir == "" {
  75. if tempPath != "" {
  76. localTempDir = tempPath
  77. } else {
  78. localTempDir = filepath.Clean(os.TempDir())
  79. }
  80. }
  81. fs := &S3Fs{
  82. connectionID: connectionID,
  83. localTempDir: localTempDir,
  84. mountPath: getMountPath(mountPath),
  85. config: &s3Config,
  86. ctxTimeout: 30 * time.Second,
  87. }
  88. if err := fs.config.validate(); err != nil {
  89. return fs, err
  90. }
  91. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  92. defer cancel()
  93. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(getAWSHTTPClient(0, 30*time.Second)))
  94. if err != nil {
  95. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  96. }
  97. if fs.config.Region != "" {
  98. awsConfig.Region = fs.config.Region
  99. }
  100. if !fs.config.AccessSecret.IsEmpty() {
  101. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  102. return fs, err
  103. }
  104. awsConfig.Credentials = aws.NewCredentialsCache(
  105. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  106. }
  107. if fs.config.Endpoint != "" {
  108. endpointResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
  109. return aws.Endpoint{
  110. URL: fs.config.Endpoint,
  111. HostnameImmutable: fs.config.ForcePathStyle,
  112. PartitionID: "aws",
  113. SigningRegion: fs.config.Region,
  114. Source: aws.EndpointSourceCustom,
  115. }, nil
  116. })
  117. awsConfig.EndpointResolverWithOptions = endpointResolver
  118. }
  119. fs.setConfigDefaults()
  120. if fs.config.RoleARN != "" {
  121. client := sts.NewFromConfig(awsConfig)
  122. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  123. awsConfig.Credentials = creds
  124. }
  125. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  126. o.UsePathStyle = fs.config.ForcePathStyle
  127. })
  128. return fs, nil
  129. }
  130. // Name returns the name for the Fs implementation
  131. func (fs *S3Fs) Name() string {
  132. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  133. }
  134. // ConnectionID returns the connection ID associated to this Fs implementation
  135. func (fs *S3Fs) ConnectionID() string {
  136. return fs.connectionID
  137. }
  138. // Stat returns a FileInfo describing the named file
  139. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  140. var result *FileInfo
  141. if name == "" || name == "/" || name == "." {
  142. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  143. }
  144. if fs.config.KeyPrefix == name+"/" {
  145. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  146. }
  147. obj, err := fs.headObject(name)
  148. if err == nil {
  149. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  150. // So we check some common content types to detect if this is a "directory".
  151. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  152. if obj.ContentLength == 0 && !isDir {
  153. _, err = fs.headObject(name + "/")
  154. isDir = err == nil
  155. }
  156. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, obj.ContentLength,
  157. util.GetTimeFromPointer(obj.LastModified), false))
  158. }
  159. if !fs.IsNotExist(err) {
  160. return result, err
  161. }
  162. // now check if this is a prefix (virtual directory)
  163. hasContents, err := fs.hasContents(name)
  164. if err == nil && hasContents {
  165. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  166. } else if err != nil {
  167. return nil, err
  168. }
  169. // the requested file may still be a directory as a zero bytes key
  170. // with a trailing forward slash (created using mkdir).
  171. // S3 doesn't return content type when listing objects, so we have
  172. // create "dirs" adding a trailing "/" to the key
  173. return fs.getStatForDir(name)
  174. }
  175. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  176. var result *FileInfo
  177. obj, err := fs.headObject(name + "/")
  178. if err != nil {
  179. return result, err
  180. }
  181. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, obj.ContentLength,
  182. util.GetTimeFromPointer(obj.LastModified), false))
  183. }
  184. // Lstat returns a FileInfo describing the named file
  185. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  186. return fs.Stat(name)
  187. }
  188. // Open opens the named file for reading
  189. func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  190. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  191. if err != nil {
  192. return nil, nil, nil, err
  193. }
  194. ctx, cancelFn := context.WithCancel(context.Background())
  195. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  196. d.Concurrency = fs.config.DownloadConcurrency
  197. d.PartSize = fs.config.DownloadPartSize
  198. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  199. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  200. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
  201. })
  202. }
  203. })
  204. var streamRange *string
  205. if offset > 0 {
  206. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  207. }
  208. go func() {
  209. defer cancelFn()
  210. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  211. Bucket: aws.String(fs.config.Bucket),
  212. Key: aws.String(name),
  213. Range: streamRange,
  214. })
  215. w.CloseWithError(err) //nolint:errcheck
  216. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  217. metric.S3TransferCompleted(n, 1, err)
  218. }()
  219. return nil, r, cancelFn, nil
  220. }
  221. // Create creates or opens the named file for writing
  222. func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) {
  223. if checks&CheckParentDir != 0 {
  224. _, err := fs.Stat(path.Dir(name))
  225. if err != nil {
  226. return nil, nil, nil, err
  227. }
  228. }
  229. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  230. if err != nil {
  231. return nil, nil, nil, err
  232. }
  233. p := NewPipeWriter(w)
  234. ctx, cancelFn := context.WithCancel(context.Background())
  235. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  236. u.Concurrency = fs.config.UploadConcurrency
  237. u.PartSize = fs.config.UploadPartSize
  238. if fs.config.UploadPartMaxTime > 0 {
  239. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  240. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond)
  241. })
  242. }
  243. })
  244. go func() {
  245. defer cancelFn()
  246. var contentType string
  247. if flag == -1 {
  248. contentType = s3DirMimeType
  249. } else {
  250. contentType = mime.TypeByExtension(path.Ext(name))
  251. }
  252. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  253. Bucket: aws.String(fs.config.Bucket),
  254. Key: aws.String(name),
  255. Body: r,
  256. ACL: types.ObjectCannedACL(fs.config.ACL),
  257. StorageClass: types.StorageClass(fs.config.StorageClass),
  258. ContentType: util.NilIfEmpty(contentType),
  259. })
  260. r.CloseWithError(err) //nolint:errcheck
  261. p.Done(err)
  262. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
  263. name, fs.config.ACL, r.GetReadedBytes(), err)
  264. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  265. }()
  266. return nil, p, cancelFn, nil
  267. }
  268. // Rename renames (moves) source to target.
  269. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  270. if source == target {
  271. return -1, -1, nil
  272. }
  273. _, err := fs.Stat(path.Dir(target))
  274. if err != nil {
  275. return -1, -1, err
  276. }
  277. fi, err := fs.Stat(source)
  278. if err != nil {
  279. return -1, -1, err
  280. }
  281. return fs.renameInternal(source, target, fi)
  282. }
  283. // Remove removes the named file or (empty) directory.
  284. func (fs *S3Fs) Remove(name string, isDir bool) error {
  285. if isDir {
  286. hasContents, err := fs.hasContents(name)
  287. if err != nil {
  288. return err
  289. }
  290. if hasContents {
  291. return fmt.Errorf("cannot remove non empty directory: %q", name)
  292. }
  293. if !strings.HasSuffix(name, "/") {
  294. name += "/"
  295. }
  296. }
  297. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  298. defer cancelFn()
  299. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  300. Bucket: aws.String(fs.config.Bucket),
  301. Key: aws.String(name),
  302. })
  303. metric.S3DeleteObjectCompleted(err)
  304. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  305. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  306. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %q: %+v", name, errMetadata)
  307. }
  308. }
  309. return err
  310. }
  311. // Mkdir creates a new directory with the specified name and default permissions
  312. func (fs *S3Fs) Mkdir(name string) error {
  313. _, err := fs.Stat(name)
  314. if !fs.IsNotExist(err) {
  315. return err
  316. }
  317. return fs.mkdirInternal(name)
  318. }
  319. // Symlink creates source as a symbolic link to target.
  320. func (*S3Fs) Symlink(_, _ string) error {
  321. return ErrVfsUnsupported
  322. }
  323. // Readlink returns the destination of the named symbolic link
  324. func (*S3Fs) Readlink(_ string) (string, error) {
  325. return "", ErrVfsUnsupported
  326. }
  327. // Chown changes the numeric uid and gid of the named file.
  328. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  329. return ErrVfsUnsupported
  330. }
  331. // Chmod changes the mode of the named file to mode.
  332. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  333. return ErrVfsUnsupported
  334. }
  335. // Chtimes changes the access and modification times of the named file.
  336. func (fs *S3Fs) Chtimes(name string, _, mtime time.Time, isUploading bool) error {
  337. if !plugin.Handler.HasMetadater() {
  338. return ErrVfsUnsupported
  339. }
  340. if !isUploading {
  341. info, err := fs.Stat(name)
  342. if err != nil {
  343. return err
  344. }
  345. if info.IsDir() {
  346. return ErrVfsUnsupported
  347. }
  348. }
  349. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  350. util.GetTimeAsMsSinceEpoch(mtime))
  351. }
  352. // Truncate changes the size of the named file.
  353. // Truncate by path is not supported, while truncating an opened
  354. // file is handled inside base transfer
  355. func (*S3Fs) Truncate(_ string, _ int64) error {
  356. return ErrVfsUnsupported
  357. }
  358. // ReadDir reads the directory named by dirname and returns
  359. // a list of directory entries.
  360. func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
  361. var result []os.FileInfo
  362. // dirname must be already cleaned
  363. prefix := fs.getPrefix(dirname)
  364. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  365. if err != nil {
  366. return result, err
  367. }
  368. prefixes := make(map[string]bool)
  369. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  370. Bucket: aws.String(fs.config.Bucket),
  371. Prefix: aws.String(prefix),
  372. Delimiter: aws.String("/"),
  373. })
  374. for paginator.HasMorePages() {
  375. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  376. defer cancelFn()
  377. page, err := paginator.NextPage(ctx)
  378. if err != nil {
  379. metric.S3ListObjectsCompleted(err)
  380. return result, err
  381. }
  382. for _, p := range page.CommonPrefixes {
  383. // prefixes have a trailing slash
  384. name, _ := fs.resolve(p.Prefix, prefix)
  385. if name == "" {
  386. continue
  387. }
  388. if _, ok := prefixes[name]; ok {
  389. continue
  390. }
  391. result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  392. prefixes[name] = true
  393. }
  394. for _, fileObject := range page.Contents {
  395. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  396. name, isDir := fs.resolve(fileObject.Key, prefix)
  397. if name == "" || name == "/" {
  398. continue
  399. }
  400. if isDir {
  401. if _, ok := prefixes[name]; ok {
  402. continue
  403. }
  404. prefixes[name] = true
  405. }
  406. if t, ok := modTimes[name]; ok {
  407. objectModTime = util.GetTimeFromMsecSinceEpoch(t)
  408. }
  409. result = append(result, NewFileInfo(name, (isDir && fileObject.Size == 0), fileObject.Size,
  410. objectModTime, false))
  411. }
  412. }
  413. metric.S3ListObjectsCompleted(nil)
  414. return result, nil
  415. }
  416. // IsUploadResumeSupported returns true if resuming uploads is supported.
  417. // Resuming uploads is not supported on S3
  418. func (*S3Fs) IsUploadResumeSupported() bool {
  419. return false
  420. }
  421. // IsAtomicUploadSupported returns true if atomic upload is supported.
  422. // S3 uploads are already atomic, we don't need to upload to a temporary
  423. // file
  424. func (*S3Fs) IsAtomicUploadSupported() bool {
  425. return false
  426. }
  427. // IsNotExist returns a boolean indicating whether the error is known to
  428. // report that a file or directory does not exist
  429. func (*S3Fs) IsNotExist(err error) bool {
  430. if err == nil {
  431. return false
  432. }
  433. var re *awshttp.ResponseError
  434. if errors.As(err, &re) {
  435. if re.Response != nil {
  436. return re.Response.StatusCode == http.StatusNotFound
  437. }
  438. }
  439. return false
  440. }
  441. // IsPermission returns a boolean indicating whether the error is known to
  442. // report that permission is denied.
  443. func (*S3Fs) IsPermission(err error) bool {
  444. if err == nil {
  445. return false
  446. }
  447. var re *awshttp.ResponseError
  448. if errors.As(err, &re) {
  449. if re.Response != nil {
  450. return re.Response.StatusCode == http.StatusForbidden ||
  451. re.Response.StatusCode == http.StatusUnauthorized
  452. }
  453. }
  454. return false
  455. }
  456. // IsNotSupported returns true if the error indicate an unsupported operation
  457. func (*S3Fs) IsNotSupported(err error) bool {
  458. if err == nil {
  459. return false
  460. }
  461. return err == ErrVfsUnsupported
  462. }
  463. // CheckRootPath creates the specified local root directory if it does not exists
  464. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  465. // we need a local directory for temporary files
  466. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  467. return osFs.CheckRootPath(username, uid, gid)
  468. }
  469. // ScanRootDirContents returns the number of files contained in the bucket,
  470. // and their size
  471. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  472. return fs.GetDirSize(fs.config.KeyPrefix)
  473. }
  474. func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  475. fileNames := make(map[string]bool)
  476. prefix := ""
  477. if fsPrefix != "/" {
  478. prefix = strings.TrimPrefix(fsPrefix, "/")
  479. }
  480. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  481. Bucket: aws.String(fs.config.Bucket),
  482. Prefix: aws.String(prefix),
  483. Delimiter: aws.String("/"),
  484. })
  485. for paginator.HasMorePages() {
  486. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  487. defer cancelFn()
  488. page, err := paginator.NextPage(ctx)
  489. if err != nil {
  490. metric.S3ListObjectsCompleted(err)
  491. if err != nil {
  492. fsLog(fs, logger.LevelError, "unable to get content for prefix %q: %+v", prefix, err)
  493. return nil, err
  494. }
  495. return fileNames, err
  496. }
  497. for _, fileObject := range page.Contents {
  498. name, isDir := fs.resolve(fileObject.Key, prefix)
  499. if name != "" && !isDir {
  500. fileNames[name] = true
  501. }
  502. }
  503. }
  504. metric.S3ListObjectsCompleted(nil)
  505. return fileNames, nil
  506. }
  507. // CheckMetadata checks the metadata consistency
  508. func (fs *S3Fs) CheckMetadata() error {
  509. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  510. }
  511. // GetDirSize returns the number of files and the size for a folder
  512. // including any subfolders
  513. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  514. prefix := fs.getPrefix(dirname)
  515. numFiles := 0
  516. size := int64(0)
  517. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  518. Bucket: aws.String(fs.config.Bucket),
  519. Prefix: aws.String(prefix),
  520. })
  521. for paginator.HasMorePages() {
  522. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  523. defer cancelFn()
  524. page, err := paginator.NextPage(ctx)
  525. if err != nil {
  526. metric.S3ListObjectsCompleted(err)
  527. return numFiles, size, err
  528. }
  529. for _, fileObject := range page.Contents {
  530. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  531. if isDir && fileObject.Size == 0 {
  532. continue
  533. }
  534. numFiles++
  535. size += fileObject.Size
  536. if numFiles%1000 == 0 {
  537. fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
  538. }
  539. }
  540. }
  541. metric.S3ListObjectsCompleted(nil)
  542. return numFiles, size, nil
  543. }
  544. // GetAtomicUploadPath returns the path to use for an atomic upload.
  545. // S3 uploads are already atomic, we never call this method for S3
  546. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  547. return ""
  548. }
  549. // GetRelativePath returns the path for a file relative to the user's home dir.
  550. // This is the path as seen by SFTPGo users
  551. func (fs *S3Fs) GetRelativePath(name string) string {
  552. rel := path.Clean(name)
  553. if rel == "." {
  554. rel = ""
  555. }
  556. if !path.IsAbs(rel) {
  557. rel = "/" + rel
  558. }
  559. if fs.config.KeyPrefix != "" {
  560. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  561. rel = "/"
  562. }
  563. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  564. }
  565. if fs.mountPath != "" {
  566. rel = path.Join(fs.mountPath, rel)
  567. }
  568. return rel
  569. }
  570. // Walk walks the file tree rooted at root, calling walkFn for each file or
  571. // directory in the tree, including root. The result are unordered
  572. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  573. prefix := fs.getPrefix(root)
  574. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  575. Bucket: aws.String(fs.config.Bucket),
  576. Prefix: aws.String(prefix),
  577. })
  578. for paginator.HasMorePages() {
  579. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  580. defer cancelFn()
  581. page, err := paginator.NextPage(ctx)
  582. if err != nil {
  583. metric.S3ListObjectsCompleted(err)
  584. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  585. return err
  586. }
  587. for _, fileObject := range page.Contents {
  588. name, isDir := fs.resolve(fileObject.Key, prefix)
  589. if name == "" {
  590. continue
  591. }
  592. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  593. NewFileInfo(name, isDir, fileObject.Size, util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  594. if err != nil {
  595. return err
  596. }
  597. }
  598. }
  599. metric.S3ListObjectsCompleted(nil)
  600. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  601. return nil
  602. }
  603. // Join joins any number of path elements into a single path
  604. func (*S3Fs) Join(elem ...string) string {
  605. return strings.TrimPrefix(path.Join(elem...), "/")
  606. }
  607. // HasVirtualFolders returns true if folders are emulated
  608. func (*S3Fs) HasVirtualFolders() bool {
  609. return true
  610. }
  611. // ResolvePath returns the matching filesystem path for the specified virtual path
  612. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  613. if fs.mountPath != "" {
  614. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  615. }
  616. if !path.IsAbs(virtualPath) {
  617. virtualPath = path.Clean("/" + virtualPath)
  618. }
  619. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  620. }
  621. // CopyFile implements the FsFileCopier interface
  622. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) error {
  623. return fs.copyFileInternal(source, target, srcSize)
  624. }
  625. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  626. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  627. isDir := strings.HasSuffix(result, "/")
  628. if isDir {
  629. result = strings.TrimSuffix(result, "/")
  630. }
  631. return result, isDir
  632. }
  633. func (fs *S3Fs) setConfigDefaults() {
  634. if fs.config.UploadPartSize == 0 {
  635. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  636. } else {
  637. if fs.config.UploadPartSize < 1024*1024 {
  638. fs.config.UploadPartSize *= 1024 * 1024
  639. }
  640. }
  641. if fs.config.UploadConcurrency == 0 {
  642. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  643. }
  644. if fs.config.DownloadPartSize == 0 {
  645. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  646. } else {
  647. if fs.config.DownloadPartSize < 1024*1024 {
  648. fs.config.DownloadPartSize *= 1024 * 1024
  649. }
  650. }
  651. if fs.config.DownloadConcurrency == 0 {
  652. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  653. }
  654. }
  655. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  656. contentType := mime.TypeByExtension(path.Ext(source))
  657. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  658. if fileSize > 500*1024*1024 {
  659. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  660. source, fileSize)
  661. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  662. metric.S3CopyObjectCompleted(err)
  663. return err
  664. }
  665. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  666. defer cancelFn()
  667. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  668. Bucket: aws.String(fs.config.Bucket),
  669. CopySource: aws.String(copySource),
  670. Key: aws.String(target),
  671. StorageClass: types.StorageClass(fs.config.StorageClass),
  672. ACL: types.ObjectCannedACL(fs.config.ACL),
  673. ContentType: util.NilIfEmpty(contentType),
  674. })
  675. metric.S3CopyObjectCompleted(err)
  676. return err
  677. }
  678. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
  679. var numFiles int
  680. var filesSize int64
  681. if fi.IsDir() {
  682. if renameMode == 0 {
  683. hasContents, err := fs.hasContents(source)
  684. if err != nil {
  685. return numFiles, filesSize, err
  686. }
  687. if hasContents {
  688. return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
  689. }
  690. }
  691. if err := fs.mkdirInternal(target); err != nil {
  692. return numFiles, filesSize, err
  693. }
  694. if renameMode == 1 {
  695. entries, err := fs.ReadDir(source)
  696. if err != nil {
  697. return numFiles, filesSize, err
  698. }
  699. for _, info := range entries {
  700. sourceEntry := fs.Join(source, info.Name())
  701. targetEntry := fs.Join(target, info.Name())
  702. files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
  703. if err != nil {
  704. if fs.IsNotExist(err) {
  705. fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
  706. continue
  707. }
  708. return numFiles, filesSize, err
  709. }
  710. numFiles += files
  711. filesSize += size
  712. }
  713. }
  714. } else {
  715. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  716. return numFiles, filesSize, err
  717. }
  718. numFiles++
  719. filesSize += fi.Size()
  720. if plugin.Handler.HasMetadater() {
  721. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  722. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  723. if err != nil {
  724. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %q -> %q: %+v",
  725. source, target, err)
  726. }
  727. }
  728. }
  729. err := fs.Remove(source, fi.IsDir())
  730. if fs.IsNotExist(err) {
  731. err = nil
  732. }
  733. return numFiles, filesSize, err
  734. }
  735. func (fs *S3Fs) mkdirInternal(name string) error {
  736. if !strings.HasSuffix(name, "/") {
  737. name += "/"
  738. }
  739. _, w, _, err := fs.Create(name, -1, 0)
  740. if err != nil {
  741. return err
  742. }
  743. return w.Close()
  744. }
  745. func (fs *S3Fs) hasContents(name string) (bool, error) {
  746. prefix := fs.getPrefix(name)
  747. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  748. Bucket: aws.String(fs.config.Bucket),
  749. Prefix: aws.String(prefix),
  750. MaxKeys: 2,
  751. })
  752. if paginator.HasMorePages() {
  753. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  754. defer cancelFn()
  755. page, err := paginator.NextPage(ctx)
  756. metric.S3ListObjectsCompleted(err)
  757. if err != nil {
  758. return false, err
  759. }
  760. for _, obj := range page.Contents {
  761. name, _ := fs.resolve(obj.Key, prefix)
  762. if name == "" || name == "/" {
  763. continue
  764. }
  765. return true, nil
  766. }
  767. return false, nil
  768. }
  769. metric.S3ListObjectsCompleted(nil)
  770. return false, nil
  771. }
  772. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  773. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  774. defer cancelFn()
  775. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  776. Bucket: aws.String(fs.config.Bucket),
  777. Key: aws.String(target),
  778. StorageClass: types.StorageClass(fs.config.StorageClass),
  779. ACL: types.ObjectCannedACL(fs.config.ACL),
  780. ContentType: util.NilIfEmpty(contentType),
  781. })
  782. if err != nil {
  783. return fmt.Errorf("unable to create multipart copy request: %w", err)
  784. }
  785. uploadID := util.GetStringFromPointer(res.UploadId)
  786. if uploadID == "" {
  787. return errors.New("unable to get multipart copy upload ID")
  788. }
  789. // We use 32 MB part size and copy 10 parts in parallel.
  790. // These values are arbitrary. We don't want to start too many goroutines
  791. maxPartSize := int64(32 * 1024 * 1024)
  792. if fileSize > int64(100*1024*1024*1024) {
  793. maxPartSize = int64(500 * 1024 * 1024)
  794. }
  795. guard := make(chan struct{}, 10)
  796. finished := false
  797. var completedParts []types.CompletedPart
  798. var partMutex sync.Mutex
  799. var wg sync.WaitGroup
  800. var hasError atomic.Bool
  801. var errOnce sync.Once
  802. var copyError error
  803. var partNumber int32
  804. var offset int64
  805. opCtx, opCancel := context.WithCancel(context.Background())
  806. defer opCancel()
  807. for partNumber = 1; !finished; partNumber++ {
  808. start := offset
  809. end := offset + maxPartSize
  810. if end >= fileSize {
  811. end = fileSize
  812. finished = true
  813. }
  814. offset = end
  815. guard <- struct{}{}
  816. if hasError.Load() {
  817. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  818. break
  819. }
  820. wg.Add(1)
  821. go func(partNum int32, partStart, partEnd int64) {
  822. defer func() {
  823. <-guard
  824. wg.Done()
  825. }()
  826. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  827. defer innerCancelFn()
  828. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  829. Bucket: aws.String(fs.config.Bucket),
  830. CopySource: aws.String(source),
  831. Key: aws.String(target),
  832. PartNumber: partNum,
  833. UploadId: aws.String(uploadID),
  834. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  835. })
  836. if err != nil {
  837. errOnce.Do(func() {
  838. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  839. hasError.Store(true)
  840. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  841. opCancel()
  842. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  843. defer abortCancelFn()
  844. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  845. Bucket: aws.String(fs.config.Bucket),
  846. Key: aws.String(target),
  847. UploadId: aws.String(uploadID),
  848. })
  849. if errAbort != nil {
  850. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  851. }
  852. })
  853. return
  854. }
  855. partMutex.Lock()
  856. completedParts = append(completedParts, types.CompletedPart{
  857. ETag: partResp.CopyPartResult.ETag,
  858. PartNumber: partNum,
  859. })
  860. partMutex.Unlock()
  861. }(partNumber, start, end)
  862. }
  863. wg.Wait()
  864. close(guard)
  865. if copyError != nil {
  866. return copyError
  867. }
  868. sort.Slice(completedParts, func(i, j int) bool {
  869. return completedParts[i].PartNumber < completedParts[j].PartNumber
  870. })
  871. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  872. defer completeCancelFn()
  873. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  874. Bucket: aws.String(fs.config.Bucket),
  875. Key: aws.String(target),
  876. UploadId: aws.String(uploadID),
  877. MultipartUpload: &types.CompletedMultipartUpload{
  878. Parts: completedParts,
  879. },
  880. })
  881. if err != nil {
  882. return fmt.Errorf("unable to complete multipart upload: %w", err)
  883. }
  884. return nil
  885. }
  886. func (fs *S3Fs) getPrefix(name string) string {
  887. prefix := ""
  888. if name != "" && name != "." && name != "/" {
  889. prefix = strings.TrimPrefix(name, "/")
  890. if !strings.HasSuffix(prefix, "/") {
  891. prefix += "/"
  892. }
  893. }
  894. return prefix
  895. }
  896. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  897. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  898. defer cancelFn()
  899. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  900. Bucket: aws.String(fs.config.Bucket),
  901. Key: aws.String(name),
  902. })
  903. metric.S3HeadObjectCompleted(err)
  904. return obj, err
  905. }
  906. // GetMimeType returns the content type
  907. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  908. obj, err := fs.headObject(name)
  909. if err != nil {
  910. return "", err
  911. }
  912. return util.GetStringFromPointer(obj.ContentType), nil
  913. }
  914. // Close closes the fs
  915. func (*S3Fs) Close() error {
  916. return nil
  917. }
  918. // GetAvailableDiskSize returns the available size for the specified path
  919. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  920. return nil, ErrStorageSizeUnavailable
  921. }
  922. func (fs *S3Fs) getStorageID() string {
  923. if fs.config.Endpoint != "" {
  924. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  925. return fmt.Sprintf("s3://%v/%v", fs.config.Endpoint, fs.config.Bucket)
  926. }
  927. return fmt.Sprintf("s3://%v%v", fs.config.Endpoint, fs.config.Bucket)
  928. }
  929. return fmt.Sprintf("s3://%v", fs.config.Bucket)
  930. }
  931. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
  932. c := awshttp.NewBuildableClient().
  933. WithDialerOptions(func(d *net.Dialer) {
  934. d.Timeout = 8 * time.Second
  935. }).
  936. WithTransportOptions(func(tr *http.Transport) {
  937. tr.IdleConnTimeout = idleConnectionTimeout
  938. tr.WriteBufferSize = s3TransferBufferSize
  939. tr.ReadBufferSize = s3TransferBufferSize
  940. })
  941. if timeout > 0 {
  942. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  943. }
  944. return c
  945. }
  946. // ideally we should simply use url.PathEscape:
  947. //
  948. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  949. //
  950. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  951. func pathEscape(in string) string {
  952. var u url.URL
  953. u.Path = in
  954. return strings.ReplaceAll(u.String(), "+", "%2B")
  955. }