replication3.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. package controller
  2. import (
  3. "database/sql"
  4. "encoding/base64"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "os"
  10. "strings"
  11. "time"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/service/s3"
  14. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  15. "github.com/ente-io/museum/pkg/controller/discord"
  16. "github.com/ente-io/museum/pkg/repo"
  17. "github.com/ente-io/museum/pkg/utils/file"
  18. "github.com/ente-io/museum/pkg/utils/s3config"
  19. "github.com/ente-io/stacktrace"
  20. "github.com/prometheus/client_golang/prometheus"
  21. "github.com/prometheus/client_golang/prometheus/promauto"
  22. log "github.com/sirupsen/logrus"
  23. "github.com/spf13/viper"
  24. )
  25. // ReplicationController3 oversees version 3 of our object replication.
  26. //
  27. // The user's encrypted data starts off in 1 hot storage (Backblaze "b2"). This
  28. // controller then takes over and replicates it the other two replicas. It keeps
  29. // state in the object_copies table.
  30. //
  31. // Both v2 and v3 of object replication use the same hot storage (b2), but they
  32. // replicate to different buckets thereafter.
  33. //
  34. // The current implementation only works if the hot storage is b2. This is not
  35. // an inherent limitation, however the code has not yet been tested in other
  36. // scenarios, so there is a safety check preventing the replication from
  37. // happening if the current hot storage is not b2.
  38. type ReplicationController3 struct {
  39. S3Config *s3config.S3Config
  40. ObjectRepo *repo.ObjectRepository
  41. ObjectCopiesRepo *repo.ObjectCopiesRepository
  42. DiscordController *discord.DiscordController
  43. // URL of the Cloudflare worker to use for downloading the source object
  44. workerURL string
  45. // Base directory for temporary storage
  46. tempStorage string
  47. // Prometheus Metrics
  48. mUploadSuccess *prometheus.CounterVec
  49. mUploadFailure *prometheus.CounterVec
  50. // Cached S3 clients etc
  51. b2Client *s3.S3
  52. b2Bucket *string
  53. wasabiDest *UploadDestination
  54. scwDest *UploadDestination
  55. }
  56. type UploadDestination struct {
  57. DC string
  58. Client *s3.S3
  59. Uploader *s3manager.Uploader
  60. Bucket *string
  61. // The label to use for reporting metrics for uploads to this destination
  62. Label string
  63. // If true, we should ignore Wasabi 403 errors. See "Reuploads".
  64. HasComplianceHold bool
  65. // If true, the object is uploaded to the GLACIER class.
  66. IsGlacier bool
  67. }
  68. // StartReplication starts the background replication process.
  69. //
  70. // This method returns synchronously. ReplicationController3 will create
  71. // suitable number of goroutines to parallelize and perform the replication
  72. // asynchronously, as and when it notices new files that have not yet been
  73. // replicated (it does this by querying the object_copies table).
  74. func (c *ReplicationController3) StartReplication() error {
  75. // As a safety check, ensure that the current hot storage bucket is in b2.
  76. // This is because the replication v3 code has not yet been tested for other
  77. // scenarios (it'll likely work though, probably with minor modifications).
  78. hotDC := c.S3Config.GetHotDataCenter()
  79. if hotDC != c.S3Config.GetHotBackblazeDC() {
  80. return fmt.Errorf("v3 replication can currently only run when the primary hot data center is Backblaze. Instead, it was %s", hotDC)
  81. }
  82. workerURL := viper.GetString("replication.worker-url")
  83. if workerURL == "" {
  84. return fmt.Errorf("replication.worker-url was not defined")
  85. }
  86. c.workerURL = workerURL
  87. log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL)
  88. c.createMetrics()
  89. err := c.createTemporaryStorage()
  90. if err != nil {
  91. return err
  92. }
  93. c.createDestinations()
  94. workerCount := viper.GetInt("replication.worker-count")
  95. if workerCount == 0 {
  96. workerCount = 6
  97. }
  98. go c.startWorkers(workerCount)
  99. return nil
  100. }
  101. func (c *ReplicationController3) startWorkers(n int) {
  102. log.Infof("Starting %d workers for replication v3", n)
  103. for i := 0; i < n; i++ {
  104. go c.replicate(i)
  105. // Stagger the workers
  106. time.Sleep(time.Duration(2*i+1) * time.Second)
  107. }
  108. }
  109. func (c *ReplicationController3) createMetrics() {
  110. c.mUploadSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
  111. Name: "museum_replication_upload_success_total",
  112. Help: "Number of successful uploads during replication (each replica is counted separately)",
  113. }, []string{"destination"})
  114. c.mUploadFailure = promauto.NewCounterVec(prometheus.CounterOpts{
  115. Name: "museum_replication_upload_failure_total",
  116. Help: "Number of failed uploads during replication (each replica is counted separately)",
  117. }, []string{"destination"})
  118. }
  119. func (c *ReplicationController3) createTemporaryStorage() error {
  120. tempStorage := viper.GetString("replication.tmp-storage")
  121. if tempStorage == "" {
  122. tempStorage = "tmp/replication"
  123. }
  124. log.Infof("Temporary storage for replication v3 is: %s", tempStorage)
  125. err := file.DeleteAllFilesInDirectory(tempStorage)
  126. if err != nil {
  127. return stacktrace.Propagate(err, "Failed to deleting old files from %s", tempStorage)
  128. }
  129. err = file.MakeDirectoryIfNotExists(tempStorage)
  130. if err != nil {
  131. return stacktrace.Propagate(err, "Failed to create temporary storage %s", tempStorage)
  132. }
  133. c.tempStorage = tempStorage
  134. return nil
  135. }
  136. func (c *ReplicationController3) createDestinations() {
  137. // The s3manager.Uploader objects are safe for use concurrently. From the
  138. // AWS docs:
  139. //
  140. // > The Uploader structure that calls Upload(). It is safe to call Upload()
  141. // on this structure for multiple objects and across concurrent goroutines.
  142. // Mutating the Uploader's properties is not safe to be done concurrently.
  143. config := c.S3Config
  144. b2DC := config.GetHotBackblazeDC()
  145. b2Client := config.GetS3Client(b2DC)
  146. c.b2Client = &b2Client
  147. c.b2Bucket = config.GetBucket(b2DC)
  148. wasabiDC := config.GetHotWasabiDC()
  149. wasabiClient := config.GetS3Client(wasabiDC)
  150. c.wasabiDest = &UploadDestination{
  151. DC: wasabiDC,
  152. Client: &wasabiClient,
  153. Uploader: s3manager.NewUploaderWithClient(&wasabiClient),
  154. Bucket: config.GetBucket(wasabiDC),
  155. Label: "wasabi",
  156. HasComplianceHold: config.WasabiComplianceDC() == wasabiDC,
  157. }
  158. scwDC := config.GetColdScalewayDC()
  159. scwClient := config.GetS3Client(scwDC)
  160. c.scwDest = &UploadDestination{
  161. DC: scwDC,
  162. Client: &scwClient,
  163. Uploader: s3manager.NewUploaderWithClient(&scwClient),
  164. Bucket: config.GetBucket(scwDC),
  165. Label: "scaleway",
  166. // should be true, except when running in a local cluster (since minio doesn't
  167. // support specifying the GLACIER storage class).
  168. IsGlacier: !config.AreLocalBuckets(),
  169. }
  170. }
  171. // Entry point for the replication worker (goroutine)
  172. //
  173. // i is an arbitrary index of the current routine.
  174. func (c *ReplicationController3) replicate(i int) {
  175. // This is just
  176. //
  177. // while (true) { replicate() }
  178. //
  179. // but with an extra sleep for a bit if nothing got replicated - both when
  180. // something's wrong, or there's nothing to do.
  181. for {
  182. err := c.tryReplicate()
  183. if err != nil {
  184. // Sleep in proportion to the (arbitrary) index to space out the
  185. // workers further.
  186. time.Sleep(time.Duration(i+1) * time.Minute)
  187. }
  188. }
  189. }
  190. // Try to replicate an object.
  191. //
  192. // Return nil if something was replicated, otherwise return the error.
  193. //
  194. // A common and expected error is `sql.ErrNoRows`, which occurs if there are no
  195. // objects left to replicate currently.
  196. func (c *ReplicationController3) tryReplicate() error {
  197. // Fetch an object to replicate
  198. tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
  199. if err != nil {
  200. if !errors.Is(err, sql.ErrNoRows) {
  201. log.Errorf("Could not fetch an object to replicate: %s", err)
  202. }
  203. return stacktrace.Propagate(err, "")
  204. }
  205. objectKey := copies.ObjectKey
  206. logger := log.WithFields(log.Fields{
  207. "task": "replication",
  208. "object_key": objectKey,
  209. })
  210. commit := func(err error) error {
  211. // We don't rollback the transaction even in the case of errors, and
  212. // instead try to commit it after setting the last_attempt timestamp.
  213. //
  214. // This avoids the replication getting stuck in a loop trying (and
  215. // failing) to replicate the same object. The error would still need to
  216. // be resolved, but at least the replication would meanwhile move
  217. // forward, ignoring this row.
  218. if err != nil {
  219. logger.Error(err)
  220. }
  221. aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey)
  222. if aerr != nil {
  223. aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt")
  224. logger.Error(aerr)
  225. }
  226. cerr := tx.Commit()
  227. if cerr != nil {
  228. cerr = stacktrace.Propagate(err, "Failed to commit transaction")
  229. logger.Error(cerr)
  230. }
  231. if err == nil {
  232. err = aerr
  233. }
  234. if err == nil {
  235. err = cerr
  236. }
  237. if err == nil {
  238. logger.Info("Replication attempt succeeded")
  239. } else {
  240. logger.Info("Replication attempt failed")
  241. }
  242. return err
  243. }
  244. logger.Info("Replication attempt start")
  245. if copies.B2 == nil {
  246. err := errors.New("expected B2 copy to be in place before we start replication")
  247. return commit(stacktrace.Propagate(err, "Sanity check failed"))
  248. }
  249. if !copies.WantWasabi && !copies.WantSCW {
  250. err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate")
  251. return commit(stacktrace.Propagate(err, "Sanity check failed"))
  252. }
  253. ob, err := c.ObjectRepo.GetObjectState(tx, objectKey)
  254. if err != nil {
  255. return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
  256. }
  257. if ob.IsFileDeleted || ob.IsUserDeleted {
  258. // Update the object_copies to mark this object as not requiring further
  259. // replication. The row in object_copies will get deleted when the next
  260. // scheduled object deletion runs.
  261. err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey)
  262. if err != nil {
  263. return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
  264. }
  265. logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)",
  266. ob.IsFileDeleted, ob.IsUserDeleted)
  267. return commit(nil)
  268. }
  269. err = ensureSufficientSpace(ob.Size)
  270. if err != nil {
  271. // We don't have free space right now, maybe because other big files are
  272. // being downloaded simultanously, but we might get space later, so mark
  273. // a failed attempt that'll get retried later.
  274. //
  275. // Log this error though, so that it gets noticed if it happens too
  276. // frequently (the instance might need a bigger disk).
  277. return commit(stacktrace.Propagate(err, ""))
  278. }
  279. filePath, file, err := c.createTemporaryFile(objectKey)
  280. if err != nil {
  281. return commit(stacktrace.Propagate(err, "Failed to create temporary file"))
  282. }
  283. defer os.Remove(filePath)
  284. defer file.Close()
  285. size, err := c.downloadFromB2ViaWorker(objectKey, file, logger)
  286. if err != nil {
  287. return commit(stacktrace.Propagate(err, "Failed to download object from B2"))
  288. }
  289. logger.Infof("Downloaded %d bytes to %s", size, filePath)
  290. in := &UploadInput{
  291. File: file,
  292. ObjectKey: objectKey,
  293. ExpectedSize: size,
  294. Logger: logger,
  295. }
  296. err = nil
  297. if copies.WantWasabi && copies.Wasabi == nil {
  298. werr := c.replicateFile(in, c.wasabiDest, func() error {
  299. return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey)
  300. })
  301. err = werr
  302. }
  303. if copies.WantSCW && copies.SCW == nil {
  304. serr := c.replicateFile(in, c.scwDest, func() error {
  305. return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey)
  306. })
  307. if err == nil {
  308. err = serr
  309. }
  310. }
  311. return commit(err)
  312. }
  313. // Return an error if we risk running out of disk space if we try to download
  314. // and write a file of size.
  315. //
  316. // This function keeps a buffer of 1 GB free space in its calculations.
  317. func ensureSufficientSpace(size int64) error {
  318. free, err := file.FreeSpace("/")
  319. if err != nil {
  320. return stacktrace.Propagate(err, "Failed to fetch free space")
  321. }
  322. gb := uint64(1024) * 1024 * 1024
  323. need := uint64(size) + (2 * gb)
  324. if free < need {
  325. return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free)
  326. }
  327. return nil
  328. }
  329. // Create a temporary file for storing objectKey. Return both the path to the
  330. // file, and the handle to the file.
  331. //
  332. // The caller must Close() the returned file if it is not nil.
  333. func (c *ReplicationController3) createTemporaryFile(objectKey string) (string, *os.File, error) {
  334. fileName := strings.ReplaceAll(objectKey, "/", "_")
  335. filePath := c.tempStorage + "/" + fileName
  336. f, err := os.Create(filePath)
  337. if err != nil {
  338. return "", nil, stacktrace.Propagate(err, "Could not create temporary file at '%s' to download object", filePath)
  339. }
  340. return filePath, f, nil
  341. }
  342. // Download the object for objectKey from B2 hot storage, writing it into file.
  343. //
  344. // Return the size of the downloaded file.
  345. func (c *ReplicationController3) downloadFromB2ViaWorker(objectKey string, file *os.File, logger *log.Entry) (int64, error) {
  346. presignedURL, err := c.getPresignedB2URL(objectKey)
  347. if err != nil {
  348. return 0, stacktrace.Propagate(err, "Could not create create presigned URL for downloading object")
  349. }
  350. presignedEncodedURL := base64.StdEncoding.EncodeToString([]byte(presignedURL))
  351. client := &http.Client{}
  352. request, err := http.NewRequest("GET", c.workerURL, nil)
  353. if err != nil {
  354. return 0, stacktrace.Propagate(err, "Could not create request for worker %s", c.workerURL)
  355. }
  356. q := request.URL.Query()
  357. q.Add("src", presignedEncodedURL)
  358. request.URL.RawQuery = q.Encode()
  359. if c.S3Config.AreLocalBuckets() {
  360. originalURL := request.URL
  361. request, err = http.NewRequest("GET", presignedURL, nil)
  362. if err != nil {
  363. return 0, stacktrace.Propagate(err, "Could not create request for URL %s", presignedURL)
  364. }
  365. logger.Infof("Bypassing workerURL %s and instead directly GETting %s", originalURL, presignedURL)
  366. }
  367. response, err := client.Do(request)
  368. if err != nil {
  369. return 0, stacktrace.Propagate(err, "Call to CF worker failed for object %s", objectKey)
  370. }
  371. defer response.Body.Close()
  372. if response.StatusCode != http.StatusOK {
  373. if response.StatusCode == http.StatusNotFound {
  374. c.notifyDiscord("🔥 Could not find object in HotStorage: " + objectKey)
  375. }
  376. err = fmt.Errorf("CF Worker GET for object %s failed with HTTP status %s", objectKey, response.Status)
  377. return 0, stacktrace.Propagate(err, "")
  378. }
  379. n, err := io.Copy(file, response.Body)
  380. if err != nil {
  381. return 0, stacktrace.Propagate(err, "Failed to write HTTP response to file")
  382. }
  383. return n, nil
  384. }
  385. // Get a presigned URL to download the object with objectKey from the B2 bucket.
  386. func (c *ReplicationController3) getPresignedB2URL(objectKey string) (string, error) {
  387. r, _ := c.b2Client.GetObjectRequest(&s3.GetObjectInput{
  388. Bucket: c.b2Bucket,
  389. Key: &objectKey,
  390. })
  391. return r.Presign(PreSignedRequestValidityDuration)
  392. }
  393. func (c *ReplicationController3) notifyDiscord(message string) {
  394. c.DiscordController.Notify(message)
  395. }
  396. type UploadInput struct {
  397. File *os.File
  398. ObjectKey string
  399. ExpectedSize int64
  400. Logger *log.Entry
  401. }
  402. // Upload, verify and then update the DB to mark replication to dest.
  403. func (c *ReplicationController3) replicateFile(in *UploadInput, dest *UploadDestination, dbUpdateCopies func() error) error {
  404. logger := in.Logger.WithFields(log.Fields{
  405. "destination": dest.Label,
  406. "bucket": *dest.Bucket,
  407. })
  408. failure := func(err error) error {
  409. c.mUploadFailure.WithLabelValues(dest.Label).Inc()
  410. logger.Error(err)
  411. return err
  412. }
  413. err := c.uploadFile(in, dest)
  414. if err != nil {
  415. return failure(stacktrace.Propagate(err, "Failed to upload object"))
  416. }
  417. err = c.verifyUploadedFileSize(in, dest)
  418. if err != nil {
  419. return failure(stacktrace.Propagate(err, "Failed to verify upload"))
  420. }
  421. // The update of the object_keys is not done in the transaction where the
  422. // other updates to object_copies table are made. This is so that the
  423. // object_keys table (which is what'll be used to delete objects) is
  424. // (almost) always updated if the file gets uploaded successfully.
  425. //
  426. // The only time the update wouldn't happen is if museum gets restarted
  427. // between the successful completion of the upload to the bucket and this
  428. // query getting executed.
  429. //
  430. // While possible, that is a much smaller window as compared to the
  431. // transaction for updating object_copies, which could easily span minutes
  432. // as the transaction ends only after the object has been uploaded to all
  433. // replicas.
  434. rowsAffected, err := c.ObjectRepo.MarkObjectReplicated(in.ObjectKey, dest.DC)
  435. if err != nil {
  436. return failure(stacktrace.Propagate(err, "Failed to update object_keys to mark replication as completed"))
  437. }
  438. if rowsAffected != 1 {
  439. // It is possible that this row was updated earlier, after an upload
  440. // that got completed but before object_copies table could be updated in
  441. // the transaction (See "Reuploads").
  442. //
  443. // So do not treat this as an error.
  444. logger.Warnf("Expected 1 row to be updated, but got %d", rowsAffected)
  445. }
  446. err = dbUpdateCopies()
  447. if err != nil {
  448. return failure(stacktrace.Propagate(err, "Failed to update object_copies to mark replication as complete"))
  449. }
  450. c.mUploadSuccess.WithLabelValues(dest.Label).Inc()
  451. return nil
  452. }
  453. // Upload the given file to using uploader to the given bucket.
  454. //
  455. // # Reuploads
  456. //
  457. // It is possible that the object might already exist on remote. The known
  458. // scenario where this might happen is if museum gets restarted after having
  459. // completed the upload but before it got around to modifying the DB.
  460. //
  461. // The behaviour in this case is remote dependent.
  462. //
  463. // - Uploading an object with the same key on Scaleway would work normally.
  464. //
  465. // - But trying to add an object with the same key on the compliance locked
  466. // Wasabi would return an HTTP 403.
  467. //
  468. // We intercept the Wasabi 403 in this case and move ahead. The subsequent
  469. // object verification using the HEAD request will act as a sanity check for
  470. // the object.
  471. func (c *ReplicationController3) uploadFile(in *UploadInput, dest *UploadDestination) error {
  472. // Rewind the file pointer back to the start for the next upload.
  473. in.File.Seek(0, io.SeekStart)
  474. up := s3manager.UploadInput{
  475. Bucket: dest.Bucket,
  476. Key: &in.ObjectKey,
  477. Body: in.File,
  478. }
  479. if dest.IsGlacier {
  480. up.StorageClass = aws.String(s3.ObjectStorageClassGlacier)
  481. }
  482. result, err := dest.Uploader.Upload(&up)
  483. if err != nil && dest.HasComplianceHold && c.isRequestFailureAccessDenied(err) {
  484. in.Logger.Infof("Ignoring object that already exists on remote (we'll verify it using a HEAD check): %s", err)
  485. return nil
  486. }
  487. if err != nil {
  488. return stacktrace.Propagate(err, "Upload to bucket %s failed", *dest.Bucket)
  489. }
  490. in.Logger.Infof("Uploaded to bucket %s: %s", *dest.Bucket, result.Location)
  491. return nil
  492. }
  493. // Return true if the given error is because of an HTTP 403.
  494. //
  495. // See "Reuploads" for the scenario where these errors can arise.
  496. //
  497. // Specifically, this in an example of the HTTP 403 response we get when
  498. // trying to add an object to a Wasabi bucket that already has a compliance
  499. // locked object with the same key.
  500. //
  501. // HTTP/1.1 403 Forbidden
  502. // Content-Type: application/xml
  503. // Date: Tue, 20 Dec 2022 10:23:33 GMT
  504. // Server: WasabiS3/7.10.1193-2022-11-23-84c72037e8 (head2)
  505. //
  506. // <?xml version="1.0" encoding="UTF-8"?>
  507. // <Error>
  508. // <Code>AccessDenied</Code>
  509. // <Message>Access Denied</Message>
  510. // <RequestId>yyy</RequestId>
  511. // <HostId>zzz</HostId>
  512. // </Error>
  513. //
  514. // Printing the error type and details produces this:
  515. //
  516. // type: *s3err.RequestFailure
  517. // AccessDenied: Access Denied
  518. // status code: 403, request id: yyy, host id: zzz
  519. func (c *ReplicationController3) isRequestFailureAccessDenied(err error) bool {
  520. if reqerr, ok := err.(s3.RequestFailure); ok {
  521. if reqerr.Code() == "AccessDenied" {
  522. return true
  523. }
  524. }
  525. return false
  526. }
  527. // Verify the uploaded file by doing a HEAD check and comparing sizes
  528. func (c *ReplicationController3) verifyUploadedFileSize(in *UploadInput, dest *UploadDestination) error {
  529. res, err := dest.Client.HeadObject(&s3.HeadObjectInput{
  530. Bucket: dest.Bucket,
  531. Key: &in.ObjectKey,
  532. })
  533. if err != nil {
  534. return stacktrace.Propagate(err, "Fetching object info from bucket %s failed", *dest.Bucket)
  535. }
  536. if *res.ContentLength != in.ExpectedSize {
  537. err = fmt.Errorf("size of the uploaded file (%d) does not match the expected size (%d) in bucket %s",
  538. *res.ContentLength, in.ExpectedSize, *dest.Bucket)
  539. c.notifyDiscord(fmt.Sprint(err))
  540. return stacktrace.Propagate(err, "")
  541. }
  542. return nil
  543. }