azblobfs.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189
  1. // Copyright (C) 2019-2022 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. //go:build !noazblob
  15. // +build !noazblob
  16. package vfs
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/base64"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "mime"
  25. "net/http"
  26. "os"
  27. "path"
  28. "path/filepath"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  34. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  35. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  36. "github.com/eikenb/pipeat"
  37. "github.com/google/uuid"
  38. "github.com/pkg/sftp"
  39. "github.com/drakkan/sftpgo/v2/logger"
  40. "github.com/drakkan/sftpgo/v2/metric"
  41. "github.com/drakkan/sftpgo/v2/plugin"
  42. "github.com/drakkan/sftpgo/v2/util"
  43. "github.com/drakkan/sftpgo/v2/version"
  44. )
  45. const (
  46. azureDefaultEndpoint = "blob.core.windows.net"
  47. )
  48. // AzureBlobFs is a Fs implementation for Azure Blob storage.
  49. type AzureBlobFs struct {
  50. connectionID string
  51. localTempDir string
  52. // if not empty this fs is mouted as virtual folder in the specified path
  53. mountPath string
  54. config *AzBlobFsConfig
  55. hasContainerAccess bool
  56. containerClient *azblob.ContainerClient
  57. ctxTimeout time.Duration
  58. ctxLongTimeout time.Duration
  59. }
  60. func init() {
  61. version.AddFeature("+azblob")
  62. }
  63. // NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage
  64. func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsConfig) (Fs, error) {
  65. if localTempDir == "" {
  66. if tempPath != "" {
  67. localTempDir = tempPath
  68. } else {
  69. localTempDir = filepath.Clean(os.TempDir())
  70. }
  71. }
  72. fs := &AzureBlobFs{
  73. connectionID: connectionID,
  74. localTempDir: localTempDir,
  75. mountPath: getMountPath(mountPath),
  76. config: &config,
  77. ctxTimeout: 30 * time.Second,
  78. ctxLongTimeout: 90 * time.Second,
  79. }
  80. if err := fs.config.validate(); err != nil {
  81. return fs, err
  82. }
  83. if err := fs.config.tryDecrypt(); err != nil {
  84. return fs, err
  85. }
  86. fs.setConfigDefaults()
  87. version := version.Get()
  88. clientOptions := &azblob.ClientOptions{
  89. Telemetry: policy.TelemetryOptions{
  90. ApplicationID: fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash),
  91. },
  92. }
  93. if fs.config.SASURL.GetPayload() != "" {
  94. parts, err := azblob.NewBlobURLParts(fs.config.SASURL.GetPayload())
  95. if err != nil {
  96. return fs, fmt.Errorf("invalid SAS URL: %w", err)
  97. }
  98. svc, err := azblob.NewServiceClientWithNoCredential(fs.config.SASURL.GetPayload(), clientOptions)
  99. if err != nil {
  100. return fs, fmt.Errorf("invalid credentials: %v", err)
  101. }
  102. if parts.ContainerName != "" {
  103. if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
  104. return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
  105. parts.ContainerName, fs.config.Container)
  106. }
  107. fs.config.Container = parts.ContainerName
  108. fs.containerClient, err = svc.NewContainerClient("")
  109. } else {
  110. if fs.config.Container == "" {
  111. return fs, errors.New("container is required with this SAS URL")
  112. }
  113. fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
  114. }
  115. fs.hasContainerAccess = false
  116. return fs, err
  117. }
  118. credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
  119. if err != nil {
  120. return fs, fmt.Errorf("invalid credentials: %v", err)
  121. }
  122. var endpoint string
  123. if fs.config.UseEmulator {
  124. endpoint = fmt.Sprintf("%s/%s", fs.config.Endpoint, fs.config.AccountName)
  125. } else {
  126. endpoint = fmt.Sprintf("https://%s.%s/", fs.config.AccountName, fs.config.Endpoint)
  127. }
  128. svc, err := azblob.NewServiceClientWithSharedKey(endpoint, credential, clientOptions)
  129. if err != nil {
  130. return fs, fmt.Errorf("invalid credentials: %v", err)
  131. }
  132. fs.hasContainerAccess = true
  133. fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
  134. return fs, err
  135. }
  136. // Name returns the name for the Fs implementation
  137. func (fs *AzureBlobFs) Name() string {
  138. if !fs.config.SASURL.IsEmpty() {
  139. return fmt.Sprintf("Azure Blob with SAS URL, container %#v", fs.config.Container)
  140. }
  141. return fmt.Sprintf("Azure Blob container %#v", fs.config.Container)
  142. }
  143. // ConnectionID returns the connection ID associated to this Fs implementation
  144. func (fs *AzureBlobFs) ConnectionID() string {
  145. return fs.connectionID
  146. }
  147. // Stat returns a FileInfo describing the named file
  148. func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
  149. if name == "" || name == "." {
  150. if fs.hasContainerAccess {
  151. err := fs.checkIfBucketExists()
  152. if err != nil {
  153. return nil, err
  154. }
  155. }
  156. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  157. }
  158. if fs.config.KeyPrefix == name+"/" {
  159. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  160. }
  161. attrs, err := fs.headObject(name)
  162. if err == nil {
  163. contentType := util.GetStringFromPointer(attrs.ContentType)
  164. isDir := contentType == dirMimeType
  165. metric.AZListObjectsCompleted(nil)
  166. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir,
  167. util.GetIntFromPointer(attrs.ContentLength),
  168. util.GetTimeFromPointer(attrs.LastModified), false))
  169. }
  170. if !fs.IsNotExist(err) {
  171. return nil, err
  172. }
  173. // now check if this is a prefix (virtual directory)
  174. hasContents, err := fs.hasContents(name)
  175. if err != nil {
  176. return nil, err
  177. }
  178. if hasContents {
  179. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  180. }
  181. return nil, os.ErrNotExist
  182. }
  183. // Lstat returns a FileInfo describing the named file
  184. func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
  185. return fs.Stat(name)
  186. }
  187. // Open opens the named file for reading
  188. func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  189. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  190. if err != nil {
  191. return nil, nil, nil, err
  192. }
  193. blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
  194. if err != nil {
  195. r.Close()
  196. w.Close()
  197. return nil, nil, nil, err
  198. }
  199. ctx, cancelFn := context.WithCancel(context.Background())
  200. go func() {
  201. defer cancelFn()
  202. err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
  203. w.CloseWithError(err) //nolint:errcheck
  204. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err)
  205. metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
  206. }()
  207. return nil, r, cancelFn, nil
  208. }
  209. // Create creates or opens the named file for writing
  210. func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  211. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  212. if err != nil {
  213. return nil, nil, nil, err
  214. }
  215. blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
  216. if err != nil {
  217. r.Close()
  218. w.Close()
  219. return nil, nil, nil, err
  220. }
  221. ctx, cancelFn := context.WithCancel(context.Background())
  222. p := NewPipeWriter(w)
  223. headers := azblob.BlobHTTPHeaders{}
  224. var contentType string
  225. if flag == -1 {
  226. contentType = dirMimeType
  227. } else {
  228. contentType = mime.TypeByExtension(path.Ext(name))
  229. }
  230. if contentType != "" {
  231. headers.BlobContentType = &contentType
  232. }
  233. go func() {
  234. defer cancelFn()
  235. err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
  236. r.CloseWithError(err) //nolint:errcheck
  237. p.Done(err)
  238. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %+v", name, r.GetReadedBytes(), err)
  239. metric.AZTransferCompleted(r.GetReadedBytes(), 0, err)
  240. }()
  241. return nil, p, cancelFn, nil
  242. }
  243. // Rename renames (moves) source to target.
  244. // We don't support renaming non empty directories since we should
  245. // rename all the contents too and this could take long time: think
  246. // about directories with thousands of files, for each file we should
  247. // execute a StartCopyFromURL call.
  248. func (fs *AzureBlobFs) Rename(source, target string) error {
  249. if source == target {
  250. return nil
  251. }
  252. fi, err := fs.Stat(source)
  253. if err != nil {
  254. return err
  255. }
  256. if fi.IsDir() {
  257. hasContents, err := fs.hasContents(source)
  258. if err != nil {
  259. return err
  260. }
  261. if hasContents {
  262. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  263. }
  264. }
  265. dstBlob, err := fs.containerClient.NewBlockBlobClient(target)
  266. if err != nil {
  267. return err
  268. }
  269. srcBlob, err := fs.containerClient.NewBlockBlobClient(source)
  270. if err != nil {
  271. return err
  272. }
  273. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  274. defer cancelFn()
  275. resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
  276. if err != nil {
  277. metric.AZCopyObjectCompleted(err)
  278. return err
  279. }
  280. copyStatus := azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
  281. nErrors := 0
  282. for copyStatus == azblob.CopyStatusTypePending {
  283. // Poll until the copy is complete.
  284. time.Sleep(500 * time.Millisecond)
  285. resp, err := dstBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  286. BlobAccessConditions: &azblob.BlobAccessConditions{},
  287. })
  288. if err != nil {
  289. // A GetProperties failure may be transient, so allow a couple
  290. // of them before giving up.
  291. nErrors++
  292. if ctx.Err() != nil || nErrors == 3 {
  293. metric.AZCopyObjectCompleted(err)
  294. return err
  295. }
  296. } else {
  297. copyStatus = azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
  298. }
  299. }
  300. if copyStatus != azblob.CopyStatusTypeSuccess {
  301. err := fmt.Errorf("copy failed with status: %s", copyStatus)
  302. metric.AZCopyObjectCompleted(err)
  303. return err
  304. }
  305. metric.AZCopyObjectCompleted(nil)
  306. fs.preserveModificationTime(source, target, fi)
  307. return fs.Remove(source, fi.IsDir())
  308. }
  309. // Remove removes the named file or (empty) directory.
  310. func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
  311. if isDir {
  312. hasContents, err := fs.hasContents(name)
  313. if err != nil {
  314. return err
  315. }
  316. if hasContents {
  317. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  318. }
  319. }
  320. blobBlock, err := fs.containerClient.NewBlockBlobClient(name)
  321. if err != nil {
  322. return err
  323. }
  324. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  325. defer cancelFn()
  326. _, err = blobBlock.Delete(ctx, &azblob.BlobDeleteOptions{
  327. DeleteSnapshots: azblob.DeleteSnapshotsOptionTypeInclude.ToPtr(),
  328. })
  329. metric.AZDeleteObjectCompleted(err)
  330. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  331. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  332. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
  333. }
  334. }
  335. return err
  336. }
  337. // Mkdir creates a new directory with the specified name and default permissions
  338. func (fs *AzureBlobFs) Mkdir(name string) error {
  339. _, err := fs.Stat(name)
  340. if !fs.IsNotExist(err) {
  341. return err
  342. }
  343. _, w, _, err := fs.Create(name, -1)
  344. if err != nil {
  345. return err
  346. }
  347. return w.Close()
  348. }
  349. // Symlink creates source as a symbolic link to target.
  350. func (*AzureBlobFs) Symlink(source, target string) error {
  351. return ErrVfsUnsupported
  352. }
  353. // Readlink returns the destination of the named symbolic link
  354. func (*AzureBlobFs) Readlink(name string) (string, error) {
  355. return "", ErrVfsUnsupported
  356. }
  357. // Chown changes the numeric uid and gid of the named file.
  358. func (*AzureBlobFs) Chown(name string, uid int, gid int) error {
  359. return ErrVfsUnsupported
  360. }
  361. // Chmod changes the mode of the named file to mode.
  362. func (*AzureBlobFs) Chmod(name string, mode os.FileMode) error {
  363. return ErrVfsUnsupported
  364. }
  365. // Chtimes changes the access and modification times of the named file.
  366. func (fs *AzureBlobFs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  367. if !plugin.Handler.HasMetadater() {
  368. return ErrVfsUnsupported
  369. }
  370. if !isUploading {
  371. info, err := fs.Stat(name)
  372. if err != nil {
  373. return err
  374. }
  375. if info.IsDir() {
  376. return ErrVfsUnsupported
  377. }
  378. }
  379. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  380. util.GetTimeAsMsSinceEpoch(mtime))
  381. }
  382. // Truncate changes the size of the named file.
  383. // Truncate by path is not supported, while truncating an opened
  384. // file is handled inside base transfer
  385. func (*AzureBlobFs) Truncate(name string, size int64) error {
  386. return ErrVfsUnsupported
  387. }
  388. // ReadDir reads the directory named by dirname and returns
  389. // a list of directory entries.
  390. func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  391. var result []os.FileInfo
  392. // dirname must be already cleaned
  393. prefix := fs.getPrefix(dirname)
  394. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  395. if err != nil {
  396. return result, err
  397. }
  398. prefixes := make(map[string]bool)
  399. pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
  400. Include: []azblob.ListBlobsIncludeItem{},
  401. Prefix: &prefix,
  402. })
  403. hasNext := true
  404. for hasNext {
  405. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  406. defer cancelFn()
  407. if hasNext = pager.NextPage(ctx); hasNext {
  408. resp := pager.PageResponse()
  409. for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
  410. name := util.GetStringFromPointer(blobPrefix.Name)
  411. // we don't support prefixes == "/" this will be sent if a key starts with "/"
  412. if name == "" || name == "/" {
  413. continue
  414. }
  415. // sometime we have duplicate prefixes, maybe an Azurite bug
  416. name = strings.TrimPrefix(name, prefix)
  417. if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
  418. continue
  419. }
  420. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  421. prefixes[strings.TrimSuffix(name, "/")] = true
  422. }
  423. for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
  424. name := util.GetStringFromPointer(blobItem.Name)
  425. name = strings.TrimPrefix(name, prefix)
  426. size := int64(0)
  427. isDir := false
  428. modTime := time.Now()
  429. if blobItem.Properties != nil {
  430. size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
  431. modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
  432. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  433. isDir = (contentType == dirMimeType)
  434. if isDir {
  435. // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
  436. if _, ok := prefixes[name]; ok {
  437. continue
  438. }
  439. prefixes[name] = true
  440. }
  441. }
  442. if t, ok := modTimes[name]; ok {
  443. modTime = util.GetTimeFromMsecSinceEpoch(t)
  444. }
  445. result = append(result, NewFileInfo(name, isDir, size, modTime, false))
  446. }
  447. }
  448. }
  449. err = pager.Err()
  450. metric.AZListObjectsCompleted(err)
  451. return result, err
  452. }
  453. // IsUploadResumeSupported returns true if resuming uploads is supported.
  454. // Resuming uploads is not supported on Azure Blob
  455. func (*AzureBlobFs) IsUploadResumeSupported() bool {
  456. return false
  457. }
  458. // IsAtomicUploadSupported returns true if atomic upload is supported.
  459. // Azure Blob uploads are already atomic, we don't need to upload to a temporary
  460. // file
  461. func (*AzureBlobFs) IsAtomicUploadSupported() bool {
  462. return false
  463. }
  464. // IsNotExist returns a boolean indicating whether the error is known to
  465. // report that a file or directory does not exist
  466. func (*AzureBlobFs) IsNotExist(err error) bool {
  467. if err == nil {
  468. return false
  469. }
  470. var errStorage *azblob.StorageError
  471. if errors.As(err, &errStorage) {
  472. return errStorage.StatusCode() == http.StatusNotFound
  473. }
  474. var errResp *azcore.ResponseError
  475. if errors.As(err, &errResp) {
  476. return errResp.StatusCode == http.StatusNotFound
  477. }
  478. // os.ErrNotExist can be returned internally by fs.Stat
  479. return errors.Is(err, os.ErrNotExist)
  480. }
  481. // IsPermission returns a boolean indicating whether the error is known to
  482. // report that permission is denied.
  483. func (*AzureBlobFs) IsPermission(err error) bool {
  484. if err == nil {
  485. return false
  486. }
  487. var errStorage *azblob.StorageError
  488. if errors.As(err, &errStorage) {
  489. statusCode := errStorage.StatusCode()
  490. return statusCode == http.StatusForbidden || statusCode == http.StatusUnauthorized
  491. }
  492. var errResp *azcore.ResponseError
  493. if errors.As(err, &errResp) {
  494. return errResp.StatusCode == http.StatusForbidden || errResp.StatusCode == http.StatusUnauthorized
  495. }
  496. return false
  497. }
  498. // IsNotSupported returns true if the error indicate an unsupported operation
  499. func (*AzureBlobFs) IsNotSupported(err error) bool {
  500. if err == nil {
  501. return false
  502. }
  503. return err == ErrVfsUnsupported
  504. }
  505. // CheckRootPath creates the specified local root directory if it does not exists
  506. func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
  507. // we need a local directory for temporary files
  508. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  509. return osFs.CheckRootPath(username, uid, gid)
  510. }
  511. // ScanRootDirContents returns the number of files contained in the bucket,
  512. // and their size
  513. func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
  514. numFiles := 0
  515. size := int64(0)
  516. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  517. Prefix: &fs.config.KeyPrefix,
  518. })
  519. hasNext := true
  520. for hasNext {
  521. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  522. defer cancelFn()
  523. if hasNext = pager.NextPage(ctx); hasNext {
  524. resp := pager.PageResponse()
  525. for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
  526. if blobItem.Properties != nil {
  527. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  528. isDir := (contentType == dirMimeType)
  529. blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength)
  530. if isDir && blobSize == 0 {
  531. continue
  532. }
  533. numFiles++
  534. size += blobSize
  535. }
  536. }
  537. }
  538. }
  539. err := pager.Err()
  540. metric.AZListObjectsCompleted(err)
  541. return numFiles, size, err
  542. }
  543. func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  544. fileNames := make(map[string]bool)
  545. prefix := ""
  546. if fsPrefix != "/" {
  547. prefix = strings.TrimPrefix(fsPrefix, "/")
  548. }
  549. pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
  550. Include: []azblob.ListBlobsIncludeItem{},
  551. Prefix: &prefix,
  552. })
  553. hasNext := true
  554. for hasNext {
  555. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  556. defer cancelFn()
  557. if hasNext = pager.NextPage(ctx); hasNext {
  558. resp := pager.PageResponse()
  559. for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
  560. name := util.GetStringFromPointer(blobItem.Name)
  561. name = strings.TrimPrefix(name, prefix)
  562. if blobItem.Properties != nil {
  563. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  564. isDir := (contentType == dirMimeType)
  565. if isDir {
  566. continue
  567. }
  568. fileNames[name] = true
  569. }
  570. }
  571. }
  572. }
  573. err := pager.Err()
  574. metric.AZListObjectsCompleted(err)
  575. return fileNames, err
  576. }
  577. // CheckMetadata checks the metadata consistency
  578. func (fs *AzureBlobFs) CheckMetadata() error {
  579. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  580. }
  581. // GetDirSize returns the number of files and the size for a folder
  582. // including any subfolders
  583. func (*AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
  584. return 0, 0, ErrVfsUnsupported
  585. }
  586. // GetAtomicUploadPath returns the path to use for an atomic upload.
  587. // Azure Blob Storage uploads are already atomic, we never call this method
  588. func (*AzureBlobFs) GetAtomicUploadPath(name string) string {
  589. return ""
  590. }
  591. // GetRelativePath returns the path for a file relative to the user's home dir.
  592. // This is the path as seen by SFTPGo users
  593. func (fs *AzureBlobFs) GetRelativePath(name string) string {
  594. rel := path.Clean(name)
  595. if rel == "." {
  596. rel = ""
  597. }
  598. if !path.IsAbs(rel) {
  599. rel = "/" + rel
  600. }
  601. if fs.config.KeyPrefix != "" {
  602. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  603. rel = "/"
  604. }
  605. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  606. }
  607. if fs.mountPath != "" {
  608. rel = path.Join(fs.mountPath, rel)
  609. }
  610. return rel
  611. }
  612. // Walk walks the file tree rooted at root, calling walkFn for each file or
  613. // directory in the tree, including root
  614. func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
  615. prefix := fs.getPrefix(root)
  616. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  617. Prefix: &prefix,
  618. })
  619. hasNext := true
  620. for hasNext {
  621. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  622. defer cancelFn()
  623. if hasNext = pager.NextPage(ctx); hasNext {
  624. resp := pager.PageResponse()
  625. for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
  626. name := util.GetStringFromPointer(blobItem.Name)
  627. if fs.isEqual(name, prefix) {
  628. continue
  629. }
  630. blobSize := int64(0)
  631. lastModified := time.Now()
  632. isDir := false
  633. if blobItem.Properties != nil {
  634. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  635. isDir = (contentType == dirMimeType)
  636. blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength)
  637. lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified)
  638. }
  639. err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil)
  640. if err != nil {
  641. return err
  642. }
  643. }
  644. }
  645. }
  646. err := pager.Err()
  647. if err != nil {
  648. metric.AZListObjectsCompleted(err)
  649. return err
  650. }
  651. metric.AZListObjectsCompleted(nil)
  652. return walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), nil)
  653. }
  654. // Join joins any number of path elements into a single path
  655. func (*AzureBlobFs) Join(elem ...string) string {
  656. return strings.TrimPrefix(path.Join(elem...), "/")
  657. }
  658. // HasVirtualFolders returns true if folders are emulated
  659. func (*AzureBlobFs) HasVirtualFolders() bool {
  660. return true
  661. }
  662. // ResolvePath returns the matching filesystem path for the specified sftp path
  663. func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
  664. if fs.mountPath != "" {
  665. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  666. }
  667. if !path.IsAbs(virtualPath) {
  668. virtualPath = path.Clean("/" + virtualPath)
  669. }
  670. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  671. }
  672. func (fs *AzureBlobFs) headObject(name string) (azblob.BlobGetPropertiesResponse, error) {
  673. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  674. defer cancelFn()
  675. blobClient, err := fs.containerClient.NewBlockBlobClient(name)
  676. if err != nil {
  677. return azblob.BlobGetPropertiesResponse{}, err
  678. }
  679. resp, err := blobClient.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  680. BlobAccessConditions: &azblob.BlobAccessConditions{},
  681. })
  682. metric.AZHeadObjectCompleted(err)
  683. return resp, err
  684. }
  685. // GetMimeType returns the content type
  686. func (fs *AzureBlobFs) GetMimeType(name string) (string, error) {
  687. response, err := fs.headObject(name)
  688. if err != nil {
  689. return "", err
  690. }
  691. return util.GetStringFromPointer(response.ContentType), nil
  692. }
  693. // Close closes the fs
  694. func (*AzureBlobFs) Close() error {
  695. return nil
  696. }
  697. // GetAvailableDiskSize returns the available size for the specified path
  698. func (*AzureBlobFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  699. return nil, ErrStorageSizeUnavailable
  700. }
  701. func (*AzureBlobFs) getPrefix(name string) string {
  702. prefix := ""
  703. if name != "" && name != "." {
  704. prefix = strings.TrimPrefix(name, "/")
  705. if !strings.HasSuffix(prefix, "/") {
  706. prefix += "/"
  707. }
  708. }
  709. return prefix
  710. }
  711. func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool {
  712. if key == virtualName {
  713. return true
  714. }
  715. if key == virtualName+"/" {
  716. return true
  717. }
  718. if key+"/" == virtualName {
  719. return true
  720. }
  721. return false
  722. }
  723. func (fs *AzureBlobFs) setConfigDefaults() {
  724. if fs.config.Endpoint == "" {
  725. fs.config.Endpoint = azureDefaultEndpoint
  726. }
  727. if fs.config.UploadPartSize == 0 {
  728. fs.config.UploadPartSize = 5
  729. }
  730. if fs.config.UploadPartSize < 1024*1024 {
  731. fs.config.UploadPartSize *= 1024 * 1024
  732. }
  733. if fs.config.UploadConcurrency == 0 {
  734. fs.config.UploadConcurrency = 5
  735. }
  736. if fs.config.DownloadPartSize == 0 {
  737. fs.config.DownloadPartSize = 5
  738. }
  739. if fs.config.DownloadPartSize < 1024*1024 {
  740. fs.config.DownloadPartSize *= 1024 * 1024
  741. }
  742. if fs.config.DownloadConcurrency == 0 {
  743. fs.config.DownloadConcurrency = 5
  744. }
  745. }
  746. func (fs *AzureBlobFs) checkIfBucketExists() error {
  747. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  748. defer cancelFn()
  749. _, err := fs.containerClient.GetProperties(ctx, &azblob.ContainerGetPropertiesOptions{})
  750. metric.AZHeadContainerCompleted(err)
  751. return err
  752. }
  753. func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
  754. result := false
  755. prefix := fs.getPrefix(name)
  756. maxResults := int32(1)
  757. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  758. MaxResults: &maxResults,
  759. Prefix: &prefix,
  760. })
  761. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  762. defer cancelFn()
  763. if pager.NextPage(ctx) {
  764. resp := pager.PageResponse()
  765. result = len(resp.ListBlobsFlatSegmentResponse.Segment.BlobItems) > 0
  766. }
  767. err := pager.Err()
  768. metric.AZListObjectsCompleted(err)
  769. return result, err
  770. }
  771. func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *azblob.BlockBlobClient, buf []byte,
  772. w io.WriterAt, offset, count, writeOffset int64,
  773. ) error {
  774. if count == 0 {
  775. return nil
  776. }
  777. resp, err := blockBlob.Download(ctx, &azblob.BlobDownloadOptions{
  778. Offset: &offset,
  779. Count: &count,
  780. })
  781. if err != nil {
  782. return err
  783. }
  784. body := resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2})
  785. defer body.Close()
  786. _, err = io.ReadAtLeast(body, buf, int(count))
  787. if err != nil {
  788. return err
  789. }
  790. _, err = fs.writeAtFull(w, buf, writeOffset, int(count))
  791. return err
  792. }
  793. func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *azblob.BlockBlobClient,
  794. offset int64, writer io.WriterAt,
  795. ) error {
  796. props, err := blockBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  797. BlobAccessConditions: &azblob.BlobAccessConditions{},
  798. })
  799. if err != nil {
  800. fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
  801. return err
  802. }
  803. contentLength := util.GetIntFromPointer(props.ContentLength)
  804. sizeToDownload := contentLength - offset
  805. if sizeToDownload < 0 {
  806. fsLog(fs, logger.LevelError, "invalid multipart download size or offset, size: %v, offset: %v, size to download: %v",
  807. contentLength, offset, sizeToDownload)
  808. return errors.New("the requested offset exceeds the file size")
  809. }
  810. if sizeToDownload == 0 {
  811. fsLog(fs, logger.LevelDebug, "nothing to download, offset %v, content length %v", offset, contentLength)
  812. return nil
  813. }
  814. partSize := fs.config.DownloadPartSize
  815. guard := make(chan struct{}, fs.config.DownloadConcurrency)
  816. blockCtxTimeout := time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute
  817. pool := newBufferAllocator(int(partSize))
  818. finished := false
  819. var wg sync.WaitGroup
  820. var errOnce sync.Once
  821. var hasError int32
  822. var poolError error
  823. poolCtx, poolCancel := context.WithCancel(ctx)
  824. defer poolCancel()
  825. for part := 0; !finished; part++ {
  826. start := offset
  827. end := offset + partSize
  828. if end >= contentLength {
  829. end = contentLength
  830. finished = true
  831. }
  832. writeOffset := int64(part) * partSize
  833. offset = end
  834. guard <- struct{}{}
  835. if atomic.LoadInt32(&hasError) == 1 {
  836. fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part)
  837. break
  838. }
  839. buf := pool.getBuffer()
  840. wg.Add(1)
  841. go func(start, end, writeOffset int64, buf []byte) {
  842. defer func() {
  843. pool.releaseBuffer(buf)
  844. <-guard
  845. wg.Done()
  846. }()
  847. innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
  848. defer cancelFn()
  849. count := end - start
  850. err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
  851. if err != nil {
  852. errOnce.Do(func() {
  853. fsLog(fs, logger.LevelError, "multipart download error: %+v", err)
  854. atomic.StoreInt32(&hasError, 1)
  855. poolError = fmt.Errorf("multipart download error: %w", err)
  856. poolCancel()
  857. })
  858. }
  859. }(start, end, writeOffset, buf)
  860. }
  861. wg.Wait()
  862. close(guard)
  863. pool.free()
  864. return poolError
  865. }
  866. func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
  867. blockBlob *azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders,
  868. ) error {
  869. partSize := fs.config.UploadPartSize
  870. guard := make(chan struct{}, fs.config.UploadConcurrency)
  871. blockCtxTimeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
  872. // sync.Pool seems to use a lot of memory so prefer our own, very simple, allocator
  873. // we only need to recycle few byte slices
  874. pool := newBufferAllocator(int(partSize))
  875. finished := false
  876. var blocks []string
  877. var wg sync.WaitGroup
  878. var errOnce sync.Once
  879. var hasError int32
  880. var poolError error
  881. poolCtx, poolCancel := context.WithCancel(ctx)
  882. defer poolCancel()
  883. for part := 0; !finished; part++ {
  884. buf := pool.getBuffer()
  885. n, err := fs.readFill(reader, buf)
  886. if err == io.EOF {
  887. // read finished, if n > 0 we need to process the last data chunck
  888. if n == 0 {
  889. pool.releaseBuffer(buf)
  890. break
  891. }
  892. finished = true
  893. } else if err != nil {
  894. pool.releaseBuffer(buf)
  895. pool.free()
  896. return err
  897. }
  898. // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
  899. // at the same time causing CommitBlockList to get a mix of blocks from all the clients.
  900. generatedUUID, err := uuid.NewRandom()
  901. if err != nil {
  902. pool.releaseBuffer(buf)
  903. pool.free()
  904. return fmt.Errorf("unable to generate block ID: %w", err)
  905. }
  906. blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String()))
  907. blocks = append(blocks, blockID)
  908. guard <- struct{}{}
  909. if atomic.LoadInt32(&hasError) == 1 {
  910. fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
  911. pool.releaseBuffer(buf)
  912. break
  913. }
  914. wg.Add(1)
  915. go func(blockID string, buf []byte, bufSize int) {
  916. defer func() {
  917. pool.releaseBuffer(buf)
  918. <-guard
  919. wg.Done()
  920. }()
  921. bufferReader := &bytesReaderWrapper{
  922. Reader: bytes.NewReader(buf[:bufSize]),
  923. }
  924. innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
  925. defer cancelFn()
  926. _, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{})
  927. if err != nil {
  928. errOnce.Do(func() {
  929. fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
  930. atomic.StoreInt32(&hasError, 1)
  931. poolError = fmt.Errorf("multipart upload error: %w", err)
  932. poolCancel()
  933. })
  934. }
  935. }(blockID, buf, n)
  936. }
  937. wg.Wait()
  938. close(guard)
  939. pool.free()
  940. if poolError != nil {
  941. return poolError
  942. }
  943. commitOptions := azblob.BlockBlobCommitBlockListOptions{
  944. BlobHTTPHeaders: httpHeaders,
  945. }
  946. if fs.config.AccessTier != "" {
  947. commitOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier)
  948. }
  949. _, err := blockBlob.CommitBlockList(ctx, blocks, &commitOptions)
  950. return err
  951. }
  952. func (*AzureBlobFs) writeAtFull(w io.WriterAt, buf []byte, offset int64, count int) (int, error) {
  953. written := 0
  954. for written < count {
  955. n, err := w.WriteAt(buf[written:count], offset+int64(written))
  956. written += n
  957. if err != nil {
  958. return written, err
  959. }
  960. }
  961. return written, nil
  962. }
  963. // copied from rclone
  964. func (*AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
  965. var nn int
  966. for n < len(buf) && err == nil {
  967. nn, err = r.Read(buf[n:])
  968. n += nn
  969. }
  970. return n, err
  971. }
  972. func (fs *AzureBlobFs) preserveModificationTime(source, target string, fi os.FileInfo) {
  973. if plugin.Handler.HasMetadater() {
  974. if !fi.IsDir() {
  975. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  976. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  977. if err != nil {
  978. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
  979. source, target, err)
  980. }
  981. }
  982. }
  983. }
  984. func (fs *AzureBlobFs) getCopyOptions() *azblob.BlobStartCopyOptions {
  985. copyOptions := &azblob.BlobStartCopyOptions{}
  986. if fs.config.AccessTier != "" {
  987. copyOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier)
  988. }
  989. return copyOptions
  990. }
  991. func (fs *AzureBlobFs) getStorageID() string {
  992. if fs.config.Endpoint != "" {
  993. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  994. return fmt.Sprintf("azblob://%v/%v", fs.config.Endpoint, fs.config.Container)
  995. }
  996. return fmt.Sprintf("azblob://%v%v", fs.config.Endpoint, fs.config.Container)
  997. }
  998. return fmt.Sprintf("azblob://%v", fs.config.Container)
  999. }
  1000. type bytesReaderWrapper struct {
  1001. *bytes.Reader
  1002. }
  1003. func (b *bytesReaderWrapper) Close() error {
  1004. return nil
  1005. }
  1006. type bufferAllocator struct {
  1007. sync.Mutex
  1008. available [][]byte
  1009. bufferSize int
  1010. finalized bool
  1011. }
  1012. func newBufferAllocator(size int) *bufferAllocator {
  1013. return &bufferAllocator{
  1014. bufferSize: size,
  1015. finalized: false,
  1016. }
  1017. }
  1018. func (b *bufferAllocator) getBuffer() []byte {
  1019. b.Lock()
  1020. defer b.Unlock()
  1021. if len(b.available) > 0 {
  1022. var result []byte
  1023. truncLength := len(b.available) - 1
  1024. result = b.available[truncLength]
  1025. b.available[truncLength] = nil
  1026. b.available = b.available[:truncLength]
  1027. return result
  1028. }
  1029. return make([]byte, b.bufferSize)
  1030. }
  1031. func (b *bufferAllocator) releaseBuffer(buf []byte) {
  1032. b.Lock()
  1033. defer b.Unlock()
  1034. if b.finalized || len(buf) != b.bufferSize {
  1035. return
  1036. }
  1037. b.available = append(b.available, buf)
  1038. }
  1039. func (b *bufferAllocator) free() {
  1040. b.Lock()
  1041. defer b.Unlock()
  1042. b.available = nil
  1043. b.finalized = true
  1044. }