s3fs.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !nos3
  15. // +build !nos3
  16. package vfs
  17. import (
  18. "context"
  19. "crypto/tls"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "mime"
  24. "net"
  25. "net/http"
  26. "net/url"
  27. "os"
  28. "path"
  29. "path/filepath"
  30. "sort"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/aws/aws-sdk-go-v2/aws"
  36. awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  37. "github.com/aws/aws-sdk-go-v2/config"
  38. "github.com/aws/aws-sdk-go-v2/credentials"
  39. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  40. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  41. "github.com/aws/aws-sdk-go-v2/service/s3"
  42. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  43. "github.com/aws/aws-sdk-go-v2/service/sts"
  44. "github.com/eikenb/pipeat"
  45. "github.com/pkg/sftp"
  46. "github.com/drakkan/sftpgo/v2/internal/logger"
  47. "github.com/drakkan/sftpgo/v2/internal/metric"
  48. "github.com/drakkan/sftpgo/v2/internal/util"
  49. "github.com/drakkan/sftpgo/v2/internal/version"
  50. )
  51. const (
  52. // using this mime type for directories improves compatibility with s3fs-fuse
  53. s3DirMimeType = "application/x-directory"
  54. s3TransferBufferSize = 256 * 1024
  55. s3CopyObjectThreshold = 500 * 1024 * 1024
  56. )
  57. var (
  58. s3DirMimeTypes = []string{s3DirMimeType, "httpd/unix-directory"}
  59. s3DefaultPageSize = int32(5000)
  60. )
  61. // S3Fs is a Fs implementation for AWS S3 compatible object storages
  62. type S3Fs struct {
  63. connectionID string
  64. localTempDir string
  65. // if not empty this fs is mouted as virtual folder in the specified path
  66. mountPath string
  67. config *S3FsConfig
  68. svc *s3.Client
  69. ctxTimeout time.Duration
  70. }
  71. func init() {
  72. version.AddFeature("+s3")
  73. }
  74. // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
  75. // object storage
  76. func NewS3Fs(connectionID, localTempDir, mountPath string, s3Config S3FsConfig) (Fs, error) {
  77. if localTempDir == "" {
  78. localTempDir = getLocalTempDir()
  79. }
  80. fs := &S3Fs{
  81. connectionID: connectionID,
  82. localTempDir: localTempDir,
  83. mountPath: getMountPath(mountPath),
  84. config: &s3Config,
  85. ctxTimeout: 30 * time.Second,
  86. }
  87. if err := fs.config.validate(); err != nil {
  88. return fs, err
  89. }
  90. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  91. defer cancel()
  92. awsConfig, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(
  93. getAWSHTTPClient(0, 30*time.Second, fs.config.SkipTLSVerify)),
  94. )
  95. if err != nil {
  96. return fs, fmt.Errorf("unable to get AWS config: %w", err)
  97. }
  98. if fs.config.Region != "" {
  99. awsConfig.Region = fs.config.Region
  100. }
  101. if !fs.config.AccessSecret.IsEmpty() {
  102. if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
  103. return fs, err
  104. }
  105. awsConfig.Credentials = aws.NewCredentialsCache(
  106. credentials.NewStaticCredentialsProvider(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), ""))
  107. }
  108. fs.setConfigDefaults()
  109. if fs.config.RoleARN != "" {
  110. client := sts.NewFromConfig(awsConfig)
  111. creds := stscreds.NewAssumeRoleProvider(client, fs.config.RoleARN)
  112. awsConfig.Credentials = creds
  113. }
  114. fs.svc = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
  115. o.AppID = fmt.Sprintf("SFTPGo-%s", version.Get().CommitHash)
  116. o.UsePathStyle = fs.config.ForcePathStyle
  117. if fs.config.Endpoint != "" {
  118. o.BaseEndpoint = aws.String(fs.config.Endpoint)
  119. }
  120. })
  121. return fs, nil
  122. }
  123. // Name returns the name for the Fs implementation
  124. func (fs *S3Fs) Name() string {
  125. return fmt.Sprintf("%s bucket %q", s3fsName, fs.config.Bucket)
  126. }
  127. // ConnectionID returns the connection ID associated to this Fs implementation
  128. func (fs *S3Fs) ConnectionID() string {
  129. return fs.connectionID
  130. }
  131. // Stat returns a FileInfo describing the named file
  132. func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
  133. var result *FileInfo
  134. if name == "" || name == "/" || name == "." {
  135. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  136. }
  137. if fs.config.KeyPrefix == name+"/" {
  138. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  139. }
  140. obj, err := fs.headObject(name)
  141. if err == nil {
  142. // Some S3 providers (like SeaweedFS) remove the trailing '/' from object keys.
  143. // So we check some common content types to detect if this is a "directory".
  144. isDir := util.Contains(s3DirMimeTypes, util.GetStringFromPointer(obj.ContentType))
  145. if util.GetIntFromPointer(obj.ContentLength) == 0 && !isDir {
  146. _, err = fs.headObject(name + "/")
  147. isDir = err == nil
  148. }
  149. return NewFileInfo(name, isDir, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  150. }
  151. if !fs.IsNotExist(err) {
  152. return result, err
  153. }
  154. // now check if this is a prefix (virtual directory)
  155. hasContents, err := fs.hasContents(name)
  156. if err == nil && hasContents {
  157. return NewFileInfo(name, true, 0, time.Unix(0, 0), false), nil
  158. } else if err != nil {
  159. return nil, err
  160. }
  161. // the requested file may still be a directory as a zero bytes key
  162. // with a trailing forward slash (created using mkdir).
  163. // S3 doesn't return content type when listing objects, so we have
  164. // create "dirs" adding a trailing "/" to the key
  165. return fs.getStatForDir(name)
  166. }
  167. func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
  168. var result *FileInfo
  169. obj, err := fs.headObject(name + "/")
  170. if err != nil {
  171. return result, err
  172. }
  173. return NewFileInfo(name, true, util.GetIntFromPointer(obj.ContentLength), util.GetTimeFromPointer(obj.LastModified), false), nil
  174. }
  175. // Lstat returns a FileInfo describing the named file
  176. func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
  177. return fs.Stat(name)
  178. }
  179. // Open opens the named file for reading
  180. func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
  181. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  182. if err != nil {
  183. return nil, nil, nil, err
  184. }
  185. p := NewPipeReader(r)
  186. if readMetadata > 0 {
  187. attrs, err := fs.headObject(name)
  188. if err != nil {
  189. r.Close()
  190. w.Close()
  191. return nil, nil, nil, err
  192. }
  193. p.setMetadata(attrs.Metadata)
  194. }
  195. ctx, cancelFn := context.WithCancel(context.Background())
  196. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  197. d.Concurrency = fs.config.DownloadConcurrency
  198. d.PartSize = fs.config.DownloadPartSize
  199. if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
  200. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  201. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  202. fs.config.SkipTLSVerify)
  203. })
  204. }
  205. })
  206. var streamRange *string
  207. if offset > 0 {
  208. streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
  209. }
  210. go func() {
  211. defer cancelFn()
  212. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  213. Bucket: aws.String(fs.config.Bucket),
  214. Key: aws.String(name),
  215. Range: streamRange,
  216. })
  217. w.CloseWithError(err) //nolint:errcheck
  218. fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
  219. metric.S3TransferCompleted(n, 1, err)
  220. }()
  221. return nil, p, cancelFn, nil
  222. }
  223. // Create creates or opens the named file for writing
  224. func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
  225. if checks&CheckParentDir != 0 {
  226. _, err := fs.Stat(path.Dir(name))
  227. if err != nil {
  228. return nil, nil, nil, err
  229. }
  230. }
  231. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  232. if err != nil {
  233. return nil, nil, nil, err
  234. }
  235. var p PipeWriter
  236. if checks&CheckResume != 0 {
  237. p = newPipeWriterAtOffset(w, 0)
  238. } else {
  239. p = NewPipeWriter(w)
  240. }
  241. ctx, cancelFn := context.WithCancel(context.Background())
  242. uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
  243. u.Concurrency = fs.config.UploadConcurrency
  244. u.PartSize = fs.config.UploadPartSize
  245. if fs.config.UploadPartMaxTime > 0 {
  246. u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) {
  247. o.HTTPClient = getAWSHTTPClient(fs.config.UploadPartMaxTime, 100*time.Millisecond,
  248. fs.config.SkipTLSVerify)
  249. })
  250. }
  251. })
  252. go func() {
  253. defer cancelFn()
  254. var contentType string
  255. if flag == -1 {
  256. contentType = s3DirMimeType
  257. } else {
  258. contentType = mime.TypeByExtension(path.Ext(name))
  259. }
  260. _, err := uploader.Upload(ctx, &s3.PutObjectInput{
  261. Bucket: aws.String(fs.config.Bucket),
  262. Key: aws.String(name),
  263. Body: r,
  264. ACL: types.ObjectCannedACL(fs.config.ACL),
  265. StorageClass: types.StorageClass(fs.config.StorageClass),
  266. ContentType: util.NilIfEmpty(contentType),
  267. })
  268. r.CloseWithError(err) //nolint:errcheck
  269. p.Done(err)
  270. fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
  271. name, fs.config.ACL, r.GetReadedBytes(), err)
  272. metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
  273. }()
  274. if checks&CheckResume != 0 {
  275. readCh := make(chan error, 1)
  276. go func() {
  277. n, err := fs.downloadToWriter(name, p)
  278. pw := p.(*pipeWriterAtOffset)
  279. pw.offset = 0
  280. pw.writeOffset = n
  281. readCh <- err
  282. }()
  283. err = <-readCh
  284. if err != nil {
  285. cancelFn()
  286. p.Close()
  287. fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
  288. return nil, nil, nil, err
  289. }
  290. }
  291. if uploadMode&4 != 0 {
  292. return nil, p, nil, nil
  293. }
  294. return nil, p, cancelFn, nil
  295. }
  296. // Rename renames (moves) source to target.
  297. func (fs *S3Fs) Rename(source, target string) (int, int64, error) {
  298. if source == target {
  299. return -1, -1, nil
  300. }
  301. _, err := fs.Stat(path.Dir(target))
  302. if err != nil {
  303. return -1, -1, err
  304. }
  305. fi, err := fs.Stat(source)
  306. if err != nil {
  307. return -1, -1, err
  308. }
  309. return fs.renameInternal(source, target, fi, 0)
  310. }
  311. // Remove removes the named file or (empty) directory.
  312. func (fs *S3Fs) Remove(name string, isDir bool) error {
  313. if isDir {
  314. hasContents, err := fs.hasContents(name)
  315. if err != nil {
  316. return err
  317. }
  318. if hasContents {
  319. return fmt.Errorf("cannot remove non empty directory: %q", name)
  320. }
  321. if !strings.HasSuffix(name, "/") {
  322. name += "/"
  323. }
  324. }
  325. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  326. defer cancelFn()
  327. _, err := fs.svc.DeleteObject(ctx, &s3.DeleteObjectInput{
  328. Bucket: aws.String(fs.config.Bucket),
  329. Key: aws.String(name),
  330. })
  331. metric.S3DeleteObjectCompleted(err)
  332. return err
  333. }
  334. // Mkdir creates a new directory with the specified name and default permissions
  335. func (fs *S3Fs) Mkdir(name string) error {
  336. _, err := fs.Stat(name)
  337. if !fs.IsNotExist(err) {
  338. return err
  339. }
  340. return fs.mkdirInternal(name)
  341. }
  342. // Symlink creates source as a symbolic link to target.
  343. func (*S3Fs) Symlink(_, _ string) error {
  344. return ErrVfsUnsupported
  345. }
  346. // Readlink returns the destination of the named symbolic link
  347. func (*S3Fs) Readlink(_ string) (string, error) {
  348. return "", ErrVfsUnsupported
  349. }
  350. // Chown changes the numeric uid and gid of the named file.
  351. func (*S3Fs) Chown(_ string, _ int, _ int) error {
  352. return ErrVfsUnsupported
  353. }
  354. // Chmod changes the mode of the named file to mode.
  355. func (*S3Fs) Chmod(_ string, _ os.FileMode) error {
  356. return ErrVfsUnsupported
  357. }
  358. // Chtimes changes the access and modification times of the named file.
  359. func (fs *S3Fs) Chtimes(_ string, _, _ time.Time, _ bool) error {
  360. return ErrVfsUnsupported
  361. }
  362. // Truncate changes the size of the named file.
  363. // Truncate by path is not supported, while truncating an opened
  364. // file is handled inside base transfer
  365. func (*S3Fs) Truncate(_ string, _ int64) error {
  366. return ErrVfsUnsupported
  367. }
  368. // ReadDir reads the directory named by dirname and returns
  369. // a list of directory entries.
  370. func (fs *S3Fs) ReadDir(dirname string) (DirLister, error) {
  371. // dirname must be already cleaned
  372. prefix := fs.getPrefix(dirname)
  373. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  374. Bucket: aws.String(fs.config.Bucket),
  375. Prefix: aws.String(prefix),
  376. Delimiter: aws.String("/"),
  377. MaxKeys: &s3DefaultPageSize,
  378. })
  379. return &s3DirLister{
  380. paginator: paginator,
  381. timeout: fs.ctxTimeout,
  382. prefix: prefix,
  383. prefixes: make(map[string]bool),
  384. }, nil
  385. }
  386. // IsUploadResumeSupported returns true if resuming uploads is supported.
  387. // Resuming uploads is not supported on S3
  388. func (*S3Fs) IsUploadResumeSupported() bool {
  389. return false
  390. }
  391. // IsConditionalUploadResumeSupported returns if resuming uploads is supported
  392. // for the specified size
  393. func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
  394. return size <= resumeMaxSize
  395. }
  396. // IsAtomicUploadSupported returns true if atomic upload is supported.
  397. // S3 uploads are already atomic, we don't need to upload to a temporary
  398. // file
  399. func (*S3Fs) IsAtomicUploadSupported() bool {
  400. return false
  401. }
  402. // IsNotExist returns a boolean indicating whether the error is known to
  403. // report that a file or directory does not exist
  404. func (*S3Fs) IsNotExist(err error) bool {
  405. if err == nil {
  406. return false
  407. }
  408. var re *awshttp.ResponseError
  409. if errors.As(err, &re) {
  410. if re.Response != nil {
  411. return re.Response.StatusCode == http.StatusNotFound
  412. }
  413. }
  414. return false
  415. }
  416. // IsPermission returns a boolean indicating whether the error is known to
  417. // report that permission is denied.
  418. func (*S3Fs) IsPermission(err error) bool {
  419. if err == nil {
  420. return false
  421. }
  422. var re *awshttp.ResponseError
  423. if errors.As(err, &re) {
  424. if re.Response != nil {
  425. return re.Response.StatusCode == http.StatusForbidden ||
  426. re.Response.StatusCode == http.StatusUnauthorized
  427. }
  428. }
  429. return false
  430. }
  431. // IsNotSupported returns true if the error indicate an unsupported operation
  432. func (*S3Fs) IsNotSupported(err error) bool {
  433. if err == nil {
  434. return false
  435. }
  436. return errors.Is(err, ErrVfsUnsupported)
  437. }
  438. // CheckRootPath creates the specified local root directory if it does not exists
  439. func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
  440. // we need a local directory for temporary files
  441. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "", nil)
  442. return osFs.CheckRootPath(username, uid, gid)
  443. }
  444. // ScanRootDirContents returns the number of files contained in the bucket,
  445. // and their size
  446. func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
  447. return fs.GetDirSize(fs.config.KeyPrefix)
  448. }
  449. // GetDirSize returns the number of files and the size for a folder
  450. // including any subfolders
  451. func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
  452. prefix := fs.getPrefix(dirname)
  453. numFiles := 0
  454. size := int64(0)
  455. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  456. Bucket: aws.String(fs.config.Bucket),
  457. Prefix: aws.String(prefix),
  458. MaxKeys: &s3DefaultPageSize,
  459. })
  460. for paginator.HasMorePages() {
  461. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  462. defer cancelFn()
  463. page, err := paginator.NextPage(ctx)
  464. if err != nil {
  465. metric.S3ListObjectsCompleted(err)
  466. return numFiles, size, err
  467. }
  468. for _, fileObject := range page.Contents {
  469. isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
  470. objectSize := util.GetIntFromPointer(fileObject.Size)
  471. if isDir && objectSize == 0 {
  472. continue
  473. }
  474. numFiles++
  475. size += objectSize
  476. }
  477. fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
  478. }
  479. metric.S3ListObjectsCompleted(nil)
  480. return numFiles, size, nil
  481. }
  482. // GetAtomicUploadPath returns the path to use for an atomic upload.
  483. // S3 uploads are already atomic, we never call this method for S3
  484. func (*S3Fs) GetAtomicUploadPath(_ string) string {
  485. return ""
  486. }
  487. // GetRelativePath returns the path for a file relative to the user's home dir.
  488. // This is the path as seen by SFTPGo users
  489. func (fs *S3Fs) GetRelativePath(name string) string {
  490. rel := path.Clean(name)
  491. if rel == "." {
  492. rel = ""
  493. }
  494. if !path.IsAbs(rel) {
  495. rel = "/" + rel
  496. }
  497. if fs.config.KeyPrefix != "" {
  498. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  499. rel = "/"
  500. }
  501. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  502. }
  503. if fs.mountPath != "" {
  504. rel = path.Join(fs.mountPath, rel)
  505. }
  506. return rel
  507. }
  508. // Walk walks the file tree rooted at root, calling walkFn for each file or
  509. // directory in the tree, including root. The result are unordered
  510. func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
  511. prefix := fs.getPrefix(root)
  512. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  513. Bucket: aws.String(fs.config.Bucket),
  514. Prefix: aws.String(prefix),
  515. MaxKeys: &s3DefaultPageSize,
  516. })
  517. for paginator.HasMorePages() {
  518. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  519. defer cancelFn()
  520. page, err := paginator.NextPage(ctx)
  521. if err != nil {
  522. metric.S3ListObjectsCompleted(err)
  523. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), err) //nolint:errcheck
  524. return err
  525. }
  526. for _, fileObject := range page.Contents {
  527. name, isDir := fs.resolve(fileObject.Key, prefix)
  528. if name == "" {
  529. continue
  530. }
  531. err := walkFn(util.GetStringFromPointer(fileObject.Key),
  532. NewFileInfo(name, isDir, util.GetIntFromPointer(fileObject.Size),
  533. util.GetTimeFromPointer(fileObject.LastModified), false), nil)
  534. if err != nil {
  535. return err
  536. }
  537. }
  538. }
  539. metric.S3ListObjectsCompleted(nil)
  540. walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) //nolint:errcheck
  541. return nil
  542. }
  543. // Join joins any number of path elements into a single path
  544. func (*S3Fs) Join(elem ...string) string {
  545. return strings.TrimPrefix(path.Join(elem...), "/")
  546. }
  547. // HasVirtualFolders returns true if folders are emulated
  548. func (*S3Fs) HasVirtualFolders() bool {
  549. return true
  550. }
  551. // ResolvePath returns the matching filesystem path for the specified virtual path
  552. func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
  553. if fs.mountPath != "" {
  554. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  555. }
  556. if !path.IsAbs(virtualPath) {
  557. virtualPath = path.Clean("/" + virtualPath)
  558. }
  559. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  560. }
  561. // CopyFile implements the FsFileCopier interface
  562. func (fs *S3Fs) CopyFile(source, target string, srcSize int64) (int, int64, error) {
  563. numFiles := 1
  564. sizeDiff := srcSize
  565. attrs, err := fs.headObject(target)
  566. if err == nil {
  567. sizeDiff -= util.GetIntFromPointer(attrs.ContentLength)
  568. numFiles = 0
  569. } else {
  570. if !fs.IsNotExist(err) {
  571. return 0, 0, err
  572. }
  573. }
  574. if err := fs.copyFileInternal(source, target, srcSize); err != nil {
  575. return 0, 0, err
  576. }
  577. return numFiles, sizeDiff, nil
  578. }
  579. func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
  580. result := strings.TrimPrefix(util.GetStringFromPointer(name), prefix)
  581. isDir := strings.HasSuffix(result, "/")
  582. if isDir {
  583. result = strings.TrimSuffix(result, "/")
  584. }
  585. return result, isDir
  586. }
  587. func (fs *S3Fs) setConfigDefaults() {
  588. if fs.config.UploadPartSize == 0 {
  589. fs.config.UploadPartSize = manager.DefaultUploadPartSize
  590. } else {
  591. if fs.config.UploadPartSize < 1024*1024 {
  592. fs.config.UploadPartSize *= 1024 * 1024
  593. }
  594. }
  595. if fs.config.UploadConcurrency == 0 {
  596. fs.config.UploadConcurrency = manager.DefaultUploadConcurrency
  597. }
  598. if fs.config.DownloadPartSize == 0 {
  599. fs.config.DownloadPartSize = manager.DefaultDownloadPartSize
  600. } else {
  601. if fs.config.DownloadPartSize < 1024*1024 {
  602. fs.config.DownloadPartSize *= 1024 * 1024
  603. }
  604. }
  605. if fs.config.DownloadConcurrency == 0 {
  606. fs.config.DownloadConcurrency = manager.DefaultDownloadConcurrency
  607. }
  608. }
  609. func (fs *S3Fs) copyFileInternal(source, target string, fileSize int64) error {
  610. contentType := mime.TypeByExtension(path.Ext(source))
  611. copySource := pathEscape(fs.Join(fs.config.Bucket, source))
  612. if fileSize > s3CopyObjectThreshold {
  613. fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
  614. source, fileSize)
  615. err := fs.doMultipartCopy(copySource, target, contentType, fileSize)
  616. metric.S3CopyObjectCompleted(err)
  617. return err
  618. }
  619. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  620. defer cancelFn()
  621. _, err := fs.svc.CopyObject(ctx, &s3.CopyObjectInput{
  622. Bucket: aws.String(fs.config.Bucket),
  623. CopySource: aws.String(copySource),
  624. Key: aws.String(target),
  625. StorageClass: types.StorageClass(fs.config.StorageClass),
  626. ACL: types.ObjectCannedACL(fs.config.ACL),
  627. ContentType: util.NilIfEmpty(contentType),
  628. })
  629. metric.S3CopyObjectCompleted(err)
  630. return err
  631. }
  632. func (fs *S3Fs) renameInternal(source, target string, fi os.FileInfo, recursion int) (int, int64, error) {
  633. var numFiles int
  634. var filesSize int64
  635. if fi.IsDir() {
  636. if renameMode == 0 {
  637. hasContents, err := fs.hasContents(source)
  638. if err != nil {
  639. return numFiles, filesSize, err
  640. }
  641. if hasContents {
  642. return numFiles, filesSize, fmt.Errorf("%w: cannot rename non empty directory: %q", ErrVfsUnsupported, source)
  643. }
  644. }
  645. if err := fs.mkdirInternal(target); err != nil {
  646. return numFiles, filesSize, err
  647. }
  648. if renameMode == 1 {
  649. files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion)
  650. numFiles += files
  651. filesSize += size
  652. if err != nil {
  653. return numFiles, filesSize, err
  654. }
  655. }
  656. } else {
  657. if err := fs.copyFileInternal(source, target, fi.Size()); err != nil {
  658. return numFiles, filesSize, err
  659. }
  660. numFiles++
  661. filesSize += fi.Size()
  662. }
  663. err := fs.Remove(source, fi.IsDir())
  664. if fs.IsNotExist(err) {
  665. err = nil
  666. }
  667. return numFiles, filesSize, err
  668. }
  669. func (fs *S3Fs) mkdirInternal(name string) error {
  670. if !strings.HasSuffix(name, "/") {
  671. name += "/"
  672. }
  673. _, w, _, err := fs.Create(name, -1, 0)
  674. if err != nil {
  675. return err
  676. }
  677. return w.Close()
  678. }
  679. func (fs *S3Fs) hasContents(name string) (bool, error) {
  680. prefix := fs.getPrefix(name)
  681. maxKeys := int32(2)
  682. paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
  683. Bucket: aws.String(fs.config.Bucket),
  684. Prefix: aws.String(prefix),
  685. MaxKeys: &maxKeys,
  686. })
  687. if paginator.HasMorePages() {
  688. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  689. defer cancelFn()
  690. page, err := paginator.NextPage(ctx)
  691. metric.S3ListObjectsCompleted(err)
  692. if err != nil {
  693. return false, err
  694. }
  695. for _, obj := range page.Contents {
  696. name, _ := fs.resolve(obj.Key, prefix)
  697. if name == "" || name == "/" {
  698. continue
  699. }
  700. return true, nil
  701. }
  702. return false, nil
  703. }
  704. metric.S3ListObjectsCompleted(nil)
  705. return false, nil
  706. }
  707. func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int64) error {
  708. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  709. defer cancelFn()
  710. res, err := fs.svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
  711. Bucket: aws.String(fs.config.Bucket),
  712. Key: aws.String(target),
  713. StorageClass: types.StorageClass(fs.config.StorageClass),
  714. ACL: types.ObjectCannedACL(fs.config.ACL),
  715. ContentType: util.NilIfEmpty(contentType),
  716. })
  717. if err != nil {
  718. return fmt.Errorf("unable to create multipart copy request: %w", err)
  719. }
  720. uploadID := util.GetStringFromPointer(res.UploadId)
  721. if uploadID == "" {
  722. return errors.New("unable to get multipart copy upload ID")
  723. }
  724. // We use 32 MB part size and copy 10 parts in parallel.
  725. // These values are arbitrary. We don't want to start too many goroutines
  726. maxPartSize := int64(32 * 1024 * 1024)
  727. if fileSize > int64(100*1024*1024*1024) {
  728. maxPartSize = int64(500 * 1024 * 1024)
  729. }
  730. guard := make(chan struct{}, 10)
  731. finished := false
  732. var completedParts []types.CompletedPart
  733. var partMutex sync.Mutex
  734. var wg sync.WaitGroup
  735. var hasError atomic.Bool
  736. var errOnce sync.Once
  737. var copyError error
  738. var partNumber int32
  739. var offset int64
  740. opCtx, opCancel := context.WithCancel(context.Background())
  741. defer opCancel()
  742. for partNumber = 1; !finished; partNumber++ {
  743. start := offset
  744. end := offset + maxPartSize
  745. if end >= fileSize {
  746. end = fileSize
  747. finished = true
  748. }
  749. offset = end
  750. guard <- struct{}{}
  751. if hasError.Load() {
  752. fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
  753. break
  754. }
  755. wg.Add(1)
  756. go func(partNum int32, partStart, partEnd int64) {
  757. defer func() {
  758. <-guard
  759. wg.Done()
  760. }()
  761. innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
  762. defer innerCancelFn()
  763. partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
  764. Bucket: aws.String(fs.config.Bucket),
  765. CopySource: aws.String(source),
  766. Key: aws.String(target),
  767. PartNumber: &partNum,
  768. UploadId: aws.String(uploadID),
  769. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
  770. })
  771. if err != nil {
  772. errOnce.Do(func() {
  773. fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
  774. hasError.Store(true)
  775. copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
  776. opCancel()
  777. abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  778. defer abortCancelFn()
  779. _, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
  780. Bucket: aws.String(fs.config.Bucket),
  781. Key: aws.String(target),
  782. UploadId: aws.String(uploadID),
  783. })
  784. if errAbort != nil {
  785. fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
  786. }
  787. })
  788. return
  789. }
  790. partMutex.Lock()
  791. completedParts = append(completedParts, types.CompletedPart{
  792. ETag: partResp.CopyPartResult.ETag,
  793. PartNumber: &partNum,
  794. })
  795. partMutex.Unlock()
  796. }(partNumber, start, end)
  797. }
  798. wg.Wait()
  799. close(guard)
  800. if copyError != nil {
  801. return copyError
  802. }
  803. sort.Slice(completedParts, func(i, j int) bool {
  804. getPartNumber := func(number *int32) int32 {
  805. if number == nil {
  806. return 0
  807. }
  808. return *number
  809. }
  810. return getPartNumber(completedParts[i].PartNumber) < getPartNumber(completedParts[j].PartNumber)
  811. })
  812. completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  813. defer completeCancelFn()
  814. _, err = fs.svc.CompleteMultipartUpload(completeCtx, &s3.CompleteMultipartUploadInput{
  815. Bucket: aws.String(fs.config.Bucket),
  816. Key: aws.String(target),
  817. UploadId: aws.String(uploadID),
  818. MultipartUpload: &types.CompletedMultipartUpload{
  819. Parts: completedParts,
  820. },
  821. })
  822. if err != nil {
  823. return fmt.Errorf("unable to complete multipart upload: %w", err)
  824. }
  825. return nil
  826. }
  827. func (fs *S3Fs) getPrefix(name string) string {
  828. prefix := ""
  829. if name != "" && name != "." && name != "/" {
  830. prefix = strings.TrimPrefix(name, "/")
  831. if !strings.HasSuffix(prefix, "/") {
  832. prefix += "/"
  833. }
  834. }
  835. return prefix
  836. }
  837. func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
  838. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  839. defer cancelFn()
  840. obj, err := fs.svc.HeadObject(ctx, &s3.HeadObjectInput{
  841. Bucket: aws.String(fs.config.Bucket),
  842. Key: aws.String(name),
  843. })
  844. metric.S3HeadObjectCompleted(err)
  845. return obj, err
  846. }
  847. // GetMimeType returns the content type
  848. func (fs *S3Fs) GetMimeType(name string) (string, error) {
  849. obj, err := fs.headObject(name)
  850. if err != nil {
  851. return "", err
  852. }
  853. return util.GetStringFromPointer(obj.ContentType), nil
  854. }
  855. // Close closes the fs
  856. func (*S3Fs) Close() error {
  857. return nil
  858. }
  859. // GetAvailableDiskSize returns the available size for the specified path
  860. func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
  861. return nil, ErrStorageSizeUnavailable
  862. }
  863. func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
  864. fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
  865. ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
  866. defer cancelFn()
  867. downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
  868. d.Concurrency = fs.config.DownloadConcurrency
  869. d.PartSize = fs.config.DownloadPartSize
  870. if fs.config.DownloadPartMaxTime > 0 {
  871. d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
  872. o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond,
  873. fs.config.SkipTLSVerify)
  874. })
  875. }
  876. })
  877. n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
  878. Bucket: aws.String(fs.config.Bucket),
  879. Key: aws.String(name),
  880. })
  881. fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
  882. name, n, err)
  883. metric.S3TransferCompleted(n, 1, err)
  884. return n, err
  885. }
  886. type s3DirLister struct {
  887. baseDirLister
  888. paginator *s3.ListObjectsV2Paginator
  889. timeout time.Duration
  890. prefix string
  891. prefixes map[string]bool
  892. metricUpdated bool
  893. }
  894. func (l *s3DirLister) resolve(name *string) (string, bool) {
  895. result := strings.TrimPrefix(util.GetStringFromPointer(name), l.prefix)
  896. isDir := strings.HasSuffix(result, "/")
  897. if isDir {
  898. result = strings.TrimSuffix(result, "/")
  899. }
  900. return result, isDir
  901. }
  902. func (l *s3DirLister) Next(limit int) ([]os.FileInfo, error) {
  903. if limit <= 0 {
  904. return nil, errInvalidDirListerLimit
  905. }
  906. if len(l.cache) >= limit {
  907. return l.returnFromCache(limit), nil
  908. }
  909. if !l.paginator.HasMorePages() {
  910. if !l.metricUpdated {
  911. l.metricUpdated = true
  912. metric.S3ListObjectsCompleted(nil)
  913. }
  914. return l.returnFromCache(limit), io.EOF
  915. }
  916. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
  917. defer cancelFn()
  918. page, err := l.paginator.NextPage(ctx)
  919. if err != nil {
  920. metric.S3ListObjectsCompleted(err)
  921. return l.cache, err
  922. }
  923. for _, p := range page.CommonPrefixes {
  924. // prefixes have a trailing slash
  925. name, _ := l.resolve(p.Prefix)
  926. if name == "" {
  927. continue
  928. }
  929. if _, ok := l.prefixes[name]; ok {
  930. continue
  931. }
  932. l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
  933. l.prefixes[name] = true
  934. }
  935. for _, fileObject := range page.Contents {
  936. objectModTime := util.GetTimeFromPointer(fileObject.LastModified)
  937. objectSize := util.GetIntFromPointer(fileObject.Size)
  938. name, isDir := l.resolve(fileObject.Key)
  939. if name == "" || name == "/" {
  940. continue
  941. }
  942. if isDir {
  943. if _, ok := l.prefixes[name]; ok {
  944. continue
  945. }
  946. l.prefixes[name] = true
  947. }
  948. l.cache = append(l.cache, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
  949. }
  950. return l.returnFromCache(limit), nil
  951. }
  952. func (l *s3DirLister) Close() error {
  953. return l.baseDirLister.Close()
  954. }
  955. func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration, skipTLSVerify bool) *awshttp.BuildableClient {
  956. c := awshttp.NewBuildableClient().
  957. WithDialerOptions(func(d *net.Dialer) {
  958. d.Timeout = 8 * time.Second
  959. }).
  960. WithTransportOptions(func(tr *http.Transport) {
  961. tr.IdleConnTimeout = idleConnectionTimeout
  962. tr.WriteBufferSize = s3TransferBufferSize
  963. tr.ReadBufferSize = s3TransferBufferSize
  964. if skipTLSVerify {
  965. if tr.TLSClientConfig != nil {
  966. tr.TLSClientConfig.InsecureSkipVerify = skipTLSVerify
  967. } else {
  968. tr.TLSClientConfig = &tls.Config{
  969. MinVersion: awshttp.DefaultHTTPTransportTLSMinVersion,
  970. InsecureSkipVerify: skipTLSVerify,
  971. }
  972. }
  973. }
  974. })
  975. if timeout > 0 {
  976. c = c.WithTimeout(time.Duration(timeout) * time.Second)
  977. }
  978. return c
  979. }
  980. // ideally we should simply use url.PathEscape:
  981. //
  982. // https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
  983. //
  984. // but this cause issue with some vendors, see #483, the code below is copied from rclone
  985. func pathEscape(in string) string {
  986. var u url.URL
  987. u.Path = in
  988. return strings.ReplaceAll(u.String(), "+", "%2B")
  989. }