object_cleanup.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package controller
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. stime "time"
  9. "github.com/ente-io/museum/pkg/controller/lock"
  10. "github.com/ente-io/stacktrace"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/prometheus/client_golang/prometheus/promauto"
  13. "github.com/spf13/viper"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/ente-io/museum/ente"
  16. "github.com/ente-io/museum/pkg/repo"
  17. "github.com/ente-io/museum/pkg/utils/s3config"
  18. enteString "github.com/ente-io/museum/pkg/utils/string"
  19. "github.com/ente-io/museum/pkg/utils/time"
  20. log "github.com/sirupsen/logrus"
  21. )
  22. // ObjectCleanupController exposes functions to remove orphan and stale entries
  23. // from object storage.
  24. //
  25. // There are 3 main types of orphans that can end up in our object storage:
  26. //
  27. // 1. We create presigned URLs for clients to upload their objects to. It might
  28. // happen that the client is able to successfully upload to these URLs, but
  29. // not tell museum about the successful upload.
  30. //
  31. // 2. During replication, we might have half-done multipart uploads.
  32. //
  33. // 3. When an existing object is updated (e.g. the user edits the file on iOS),
  34. // then the file entry in our DB is updated to point to the new object, and
  35. // the old object is now meant to be discarded.
  36. //
  37. // ObjectCleanupController is meant to manage all these scenarios over time.
  38. type ObjectCleanupController struct {
  39. Repo *repo.ObjectCleanupRepository
  40. ObjectRepo *repo.ObjectRepository
  41. LockController *lock.LockController
  42. ObjectController *ObjectController
  43. S3Config *s3config.S3Config
  44. // Prometheus Metrics
  45. mOrphanObjectsDeleted *prometheus.CounterVec
  46. }
  47. // PreSignedRequestValidityDuration is the lifetime of a pre-signed URL
  48. const PreSignedRequestValidityDuration = 7 * 24 * stime.Hour
  49. // PreSignedPartUploadRequestDuration is the lifetime of a pre-signed multipart URL
  50. const PreSignedPartUploadRequestDuration = 7 * 24 * stime.Hour
  51. // clearOrphanObjectsCheckInterval is the interval after which we check if the
  52. // ClearOrphanObjects job needs to be re-run.
  53. //
  54. // See also, clearOrphanObjectsMinimumJobInterval.
  55. const clearOrphanObjectsCheckInterval = 1 * 24 * stime.Hour
  56. // ClearOrphanObjectsMinimumJobInterval is the minimum interval that must pass
  57. // before we run another instance of the ClearOrphanObjects job.
  58. //
  59. // This interval is enforced across museum instances.
  60. const clearOrphanObjectsMinimumJobInterval = 2 * 24 * stime.Hour
  61. // Return a new instance of ObjectCleanupController
  62. func NewObjectCleanupController(
  63. objectCleanupRepo *repo.ObjectCleanupRepository,
  64. objectRepo *repo.ObjectRepository,
  65. lockController *lock.LockController,
  66. objectController *ObjectController,
  67. s3Config *s3config.S3Config,
  68. ) *ObjectCleanupController {
  69. mOrphanObjectsDeleted := promauto.NewCounterVec(prometheus.CounterOpts{
  70. Name: "museum_orphan_objects_deleted_total",
  71. Help: "Number of objects successfully deleted when clearing orphan objects",
  72. }, []string{"dc"})
  73. return &ObjectCleanupController{
  74. Repo: objectCleanupRepo,
  75. ObjectRepo: objectRepo,
  76. LockController: lockController,
  77. ObjectController: objectController,
  78. S3Config: s3Config,
  79. mOrphanObjectsDeleted: mOrphanObjectsDeleted,
  80. }
  81. }
  82. // StartRemovingUnreportedObjects starts goroutines to cleanup deletes those
  83. // objects that were possibly uploaded but not reported to the database
  84. func (c *ObjectCleanupController) StartRemovingUnreportedObjects() {
  85. // TODO: object_cleanup: This code is only currently tested for B2
  86. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  87. log.Info("Skipping RemovingUnreportedObjects since the Hot DC is not B2")
  88. return
  89. }
  90. workerCount := viper.GetInt("jobs.remove-unreported-objects.worker-count")
  91. if workerCount == 0 {
  92. workerCount = 1
  93. }
  94. log.Infof("Starting %d workers to remove-unreported-objects", workerCount)
  95. for i := 0; i < workerCount; i++ {
  96. go c.removeUnreportedObjectsWorker(i)
  97. }
  98. }
  99. // Entry point for the worker goroutine to cleanup unreported objects.
  100. //
  101. // i is an arbitrary index for the current goroutine.
  102. func (c *ObjectCleanupController) removeUnreportedObjectsWorker(i int) {
  103. for {
  104. count := c.removeUnreportedObjects()
  105. if count == 0 {
  106. stime.Sleep(stime.Duration(5+i) * stime.Minute)
  107. } else {
  108. stime.Sleep(stime.Second)
  109. }
  110. }
  111. }
  112. func (c *ObjectCleanupController) removeUnreportedObjects() int {
  113. logger := log.WithFields(log.Fields{
  114. "task": "remove-unreported-objects",
  115. })
  116. logger.Info("Removing unreported objects")
  117. count := 0
  118. tx, tempObjects, err := c.Repo.GetAndLockExpiredObjects()
  119. if err != nil {
  120. if !errors.Is(err, sql.ErrNoRows) {
  121. logger.Error(err)
  122. }
  123. return count
  124. }
  125. for _, tempObject := range tempObjects {
  126. err = c.removeUnreportedObject(tx, tempObject)
  127. if err != nil {
  128. continue
  129. }
  130. count += 1
  131. }
  132. logger.Infof("Removed %d objects", count)
  133. // We always commit the transaction, even on errors for individual rows. To
  134. // avoid object getting stuck in a loop, we increase their expiry times.
  135. cerr := tx.Commit()
  136. if cerr != nil {
  137. cerr = stacktrace.Propagate(err, "Failed to commit transaction")
  138. logger.Error(cerr)
  139. }
  140. return count
  141. }
  142. func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error {
  143. // TODO: object_cleanup
  144. // This should use the DC from TempObject (once we start persisting it)
  145. // dc := t.DataCenter
  146. dc := c.S3Config.GetHotDataCenter()
  147. logger := log.WithFields(log.Fields{
  148. "task": "remove-unreported-objects",
  149. "object_key": t.ObjectKey,
  150. "data_center": dc,
  151. "upload_id": t.UploadID,
  152. })
  153. skip := func(err error) error {
  154. logger.Errorf("Clearing tempObject failed: %v", err)
  155. newExpiry := time.MicrosecondsAfterDays(1)
  156. serr := c.Repo.SetExpiryForTempObject(tx, t, newExpiry)
  157. if serr != nil {
  158. logger.Errorf("Updating expiry for failed temp object failed: %v", serr)
  159. }
  160. return err
  161. }
  162. logger.Info("Clearing tempObject")
  163. exists, err := c.ObjectRepo.DoesObjectExist(tx, t.ObjectKey)
  164. if err != nil {
  165. return skip(stacktrace.Propagate(err, ""))
  166. }
  167. if exists {
  168. err := errors.New("aborting attempt to delete an object which has a DB entry")
  169. return skip(stacktrace.Propagate(err, ""))
  170. }
  171. if t.IsMultipart {
  172. err = c.abortMultipartUpload(t.ObjectKey, t.UploadID, dc)
  173. } else {
  174. err = c.DeleteObjectFromDataCenter(t.ObjectKey, dc)
  175. }
  176. if err != nil {
  177. return skip(err)
  178. }
  179. err = c.Repo.RemoveTempObject(tx, t)
  180. if err != nil {
  181. return skip(err)
  182. }
  183. return nil
  184. }
  185. // AddTempObjectKey creates a new temporary object entry.
  186. //
  187. // It persists a given object key as having been provided to a client for
  188. // uploading. If a client does not successfully mark this object's upload as
  189. // having completed within PreSignedRequestValidityDuration, this temp object
  190. // will be cleaned up.
  191. func (c *ObjectCleanupController) AddTempObjectKey(objectKey string, dc string) error {
  192. expiry := time.Microseconds() + (2 * PreSignedRequestValidityDuration.Microseconds())
  193. return c.addCleanupEntryForObjectKey(objectKey, dc, expiry)
  194. }
  195. // Add the object to a queue of "temporary" objects that are deleted (if they
  196. // exist) if this entry is not removed from the queue by expirationTime.
  197. func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string, dc string, expirationTime int64) error {
  198. err := c.Repo.AddTempObject(ente.TempObject{
  199. ObjectKey: objectKey,
  200. IsMultipart: false,
  201. DataCenter: dc,
  202. }, expirationTime)
  203. return stacktrace.Propagate(err, "")
  204. }
  205. // AddTempObjectMultipartKey creates a new temporary object entry for a
  206. // multlipart upload.
  207. //
  208. // See AddTempObjectKey for more details.
  209. func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, uploadID string, dc string) error {
  210. expiry := time.Microseconds() + (2 * PreSignedPartUploadRequestDuration.Microseconds())
  211. err := c.Repo.AddTempObject(ente.TempObject{
  212. ObjectKey: objectKey,
  213. IsMultipart: true,
  214. UploadID: uploadID,
  215. DataCenter: dc,
  216. }, expiry)
  217. return stacktrace.Propagate(err, "")
  218. }
  219. func (c *ObjectCleanupController) DeleteAllObjectsWithPrefix(prefix string, dc string) error {
  220. s3Client := c.S3Config.GetS3Client(dc)
  221. bucket := c.S3Config.GetBucket(dc)
  222. output, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
  223. Bucket: bucket,
  224. Prefix: &prefix,
  225. })
  226. if err != nil {
  227. log.Error(err)
  228. return stacktrace.Propagate(err, "")
  229. }
  230. var keys []string
  231. for _, obj := range output.Contents {
  232. keys = append(keys, *obj.Key)
  233. }
  234. for _, key := range keys {
  235. err = c.DeleteObjectFromDataCenter(key, dc)
  236. if err != nil {
  237. log.Error(err)
  238. return stacktrace.Propagate(err, "")
  239. }
  240. }
  241. return nil
  242. }
  243. func (c *ObjectCleanupController) DeleteObjectFromDataCenter(objectKey string, dc string) error {
  244. log.Info("Deleting " + objectKey + " from " + dc)
  245. var s3Client = c.S3Config.GetS3Client(dc)
  246. bucket := c.S3Config.GetBucket(dc)
  247. _, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
  248. Bucket: bucket,
  249. Key: &objectKey,
  250. })
  251. if err != nil {
  252. return stacktrace.Propagate(err, "")
  253. }
  254. err = s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
  255. Bucket: bucket,
  256. Key: &objectKey,
  257. })
  258. if err != nil {
  259. return stacktrace.Propagate(err, "")
  260. }
  261. return nil
  262. }
  263. func (c *ObjectCleanupController) abortMultipartUpload(objectKey string, uploadID string, dc string) error {
  264. s3Client := c.S3Config.GetS3Client(dc)
  265. bucket := c.S3Config.GetBucket(dc)
  266. _, err := s3Client.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
  267. Bucket: bucket,
  268. Key: &objectKey,
  269. UploadId: &uploadID,
  270. })
  271. if err != nil {
  272. if isUnknownUploadError(err) {
  273. log.Info("Could not find upload for " + objectKey)
  274. return nil
  275. }
  276. return stacktrace.Propagate(err, "")
  277. }
  278. r, err := s3Client.ListParts(&s3.ListPartsInput{
  279. Bucket: bucket,
  280. Key: &objectKey,
  281. UploadId: &uploadID,
  282. })
  283. if err != nil {
  284. if isUnknownUploadError(err) {
  285. // This is expected now, since we just aborted the upload
  286. return nil
  287. }
  288. return stacktrace.Propagate(err, "")
  289. }
  290. if len(r.Parts) > 0 {
  291. return stacktrace.NewError("abort Failed")
  292. }
  293. return nil
  294. }
  295. // The original code here checked for NoSuchUpload, presumably because that is
  296. // the error that B2 returns.
  297. //
  298. // Wasabi returns something similar:
  299. //
  300. // <Error>
  301. // <Code>NoSuchUpload</Code>
  302. // <Message>The specified upload does not exist. The upload ID may be invalid,
  303. // or the upload may have been aborted or completed.</Message>
  304. // ...
  305. //
  306. // However, Scaleway returns a different error, NoSuchKey
  307. //
  308. // <Error>
  309. // <Code>NoSuchKey</Code>
  310. // <Message>The specified key does not exist.</Message>
  311. // ...
  312. //
  313. // This method returns true if either of these occur.
  314. func isUnknownUploadError(err error) bool {
  315. // B2, Wasabi
  316. if strings.Contains(err.Error(), "NoSuchUpload") {
  317. return true
  318. }
  319. // Scaleway
  320. if strings.Contains(err.Error(), "NoSuchKey") {
  321. return true
  322. }
  323. return false
  324. }
  325. // StartClearingOrphanObjects is the entry point for the job that goes through
  326. // all the objects in the given datacenter, and deletes orphan objects for which
  327. // we do not have DB entries.
  328. //
  329. // Such orphan objects are expected to have been created because the code for
  330. // updating the DB entries when a file gets updated did not cleanup the
  331. // corresponding objects from object storage. Once we start keeping track of
  332. // such objects in a separate queue, this cron job won't be needed.
  333. func (c *ObjectCleanupController) StartClearingOrphanObjects() {
  334. // TODO: object_cleanup: This code is only currently tested for B2
  335. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  336. log.Info("Skipping ClearingOrphanObjects since the Hot DC is not B2")
  337. return
  338. }
  339. isJobEnabled := viper.GetBool("jobs.clear-orphan-objects.enabled")
  340. if !isJobEnabled {
  341. return
  342. }
  343. prefix := viper.GetString("jobs.clear-orphan-objects.prefix")
  344. log.Infof("Starting workers to clear-orphan-objects (prefix %s)", prefix)
  345. // TODO: object_cleanup: start workers for other DCs once the temp_objects
  346. // table supports specifying a DC
  347. go c.clearOrphanObjectsWorker(c.S3Config.GetHotBackblazeDC(), prefix)
  348. }
  349. // clearOrphanObjectsWorker is the entry point for the worker goroutine to
  350. // cleanup objects in a particular DC.
  351. func (c *ObjectCleanupController) clearOrphanObjectsWorker(dc string, prefix string) {
  352. for {
  353. c.ClearOrphanObjects(dc, prefix, false)
  354. stime.Sleep(clearOrphanObjectsCheckInterval)
  355. }
  356. }
  357. // IsValidClearOrphanObjectsDC verifies that the given DC is valid for use as
  358. // the target of an orphan object cleanup.
  359. func (c *ObjectCleanupController) IsValidClearOrphanObjectsDC(dc string) bool {
  360. if dc != c.S3Config.GetHotBackblazeDC() {
  361. return false
  362. }
  363. // TODO: object_cleanup: This code is only currently tested for B2
  364. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  365. return false
  366. }
  367. return true
  368. }
  369. func (c *ObjectCleanupController) ClearOrphanObjects(dc string, prefix string, forceTaskLock bool) {
  370. logger := log.WithFields(log.Fields{
  371. "task": "clear-orphan-objects",
  372. "data_center": dc,
  373. })
  374. if !c.IsValidClearOrphanObjectsDC(dc) {
  375. logger.Errorf("Unsupported DC %s", dc)
  376. return
  377. }
  378. lockName := clearOrphanObjectsLockName(dc)
  379. if forceTaskLock {
  380. logger.Infof("Forcefully removing task lock %s", lockName)
  381. err := c.LockController.TaskLockingRepo.ReleaseLock(lockName)
  382. if err != nil {
  383. logger.Error(stacktrace.Propagate(err, ""))
  384. return
  385. }
  386. }
  387. if !c.LockController.TryLock(lockName, clearOrphanObjectsNextLockUntil()) {
  388. logger.Infof("Skipping since a lock could not be obtained")
  389. return
  390. }
  391. // The lock is not released intentionally
  392. //
  393. // By keeping the stale entry for the unheld lock in the DB, we will be able
  394. // to retain the timestamp when this job last ran. This is a kludgy way to
  395. // guarantee that clearOrphanObjectsMinimumJobInterval is enforced across
  396. // all museum instances (without introducing a new DB table).
  397. //
  398. // defer c.LockController.ReleaseLock(lockName)
  399. s3Config := c.S3Config
  400. dest := &CleanupOrphanObjectsDestination{
  401. DC: dc,
  402. Client: s3Config.GetS3Client(dc),
  403. Bucket: s3Config.GetBucket(dc),
  404. HasComplianceHold: s3Config.WasabiComplianceDC() == dc,
  405. }
  406. logger.Infof("Clearing orphan objects from bucket %s (hasComplianceHold %v)",
  407. *dest.Bucket, dest.HasComplianceHold)
  408. // Each directory listing of an S3 bucket returns a maximum of 1000 objects,
  409. // and an optional continuation token. Until there are more objects
  410. // (indicated by the presence of the continuation token), keep fetching
  411. // directory listings.
  412. //
  413. // For each directory listing, spawn 10 goroutines to go through chunks of
  414. // 100 each to clear orphan objects.
  415. //
  416. // Refresh the lock's acquisition time during each iteration since this job
  417. // can span hours, and we don't want a different instance to start another
  418. // run just because it was only considering the start time of the job.
  419. err := dest.Client.ListObjectVersionsPages(&s3.ListObjectVersionsInput{
  420. Bucket: dest.Bucket,
  421. Prefix: &prefix,
  422. },
  423. func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
  424. c.clearOrphanObjectsPage(page, dest, logger)
  425. lerr := c.LockController.ExtendLock(lockName, clearOrphanObjectsNextLockUntil())
  426. if lerr != nil {
  427. logger.Error(lerr)
  428. return false
  429. }
  430. return true
  431. })
  432. if err != nil {
  433. logger.Error(stacktrace.Propagate(err, ""))
  434. return
  435. }
  436. logger.Info("Cleared orphan objects")
  437. }
  438. func clearOrphanObjectsLockName(dc string) string {
  439. return fmt.Sprintf("clear-orphan-objects:%s", dc)
  440. }
  441. func clearOrphanObjectsNextLockUntil() int64 {
  442. return time.Microseconds() + clearOrphanObjectsMinimumJobInterval.Microseconds()
  443. }
  444. type CleanupOrphanObjectsDestination struct {
  445. DC string
  446. Client s3.S3
  447. Bucket *string
  448. // If true, this bucket has a compliance hold on objects that needs to be
  449. // removed first before they can be deleted.
  450. HasComplianceHold bool
  451. }
  452. // ObjectVersionOrDeleteMarker is an abstraction to allow us to reuse the same
  453. // code to delete both object versions and delete markers
  454. type ObjectVersionOrDeleteMarker struct {
  455. ObjectVersion *s3.ObjectVersion
  456. DeleteMarker *s3.DeleteMarkerEntry
  457. }
  458. func (od ObjectVersionOrDeleteMarker) GetKey() *string {
  459. if od.ObjectVersion != nil {
  460. return od.ObjectVersion.Key
  461. }
  462. return od.DeleteMarker.Key
  463. }
  464. func (od ObjectVersionOrDeleteMarker) GetLastModified() *stime.Time {
  465. if od.ObjectVersion != nil {
  466. return od.ObjectVersion.LastModified
  467. }
  468. return od.DeleteMarker.LastModified
  469. }
  470. func (od ObjectVersionOrDeleteMarker) GetVersionId() *string {
  471. if od.ObjectVersion != nil {
  472. return od.ObjectVersion.VersionId
  473. }
  474. return od.DeleteMarker.VersionId
  475. }
  476. func (c *ObjectCleanupController) clearOrphanObjectsPage(page *s3.ListObjectVersionsOutput, dest *CleanupOrphanObjectsDestination, logger *log.Entry) error {
  477. // MaxKeys is 1000. Until we can, break it into batches and create a
  478. // separate goroutine to process each batch.
  479. batchSize := 10
  480. versions := page.Versions
  481. nv := len(versions)
  482. deleteMarkers := page.DeleteMarkers
  483. nd := len(deleteMarkers)
  484. n := nv + nd
  485. logger.Infof("Processing page containing %d values (%d object versions, %d delete markers)", n, nv, nd)
  486. ods := make([]ObjectVersionOrDeleteMarker, n)
  487. for i := 0; i < nv; i++ {
  488. ods[i] = ObjectVersionOrDeleteMarker{ObjectVersion: versions[i]}
  489. }
  490. for i := 0; i < nd; i++ {
  491. ods[nv+i] = ObjectVersionOrDeleteMarker{DeleteMarker: deleteMarkers[i]}
  492. }
  493. var wg sync.WaitGroup
  494. for i := 0; i < n; i++ {
  495. end := i + batchSize
  496. if end > n {
  497. end = n
  498. }
  499. if i >= end {
  500. // Nothing left
  501. break
  502. }
  503. wg.Add(1)
  504. go func(i int, end int) {
  505. defer wg.Done()
  506. batch := ods[i:end]
  507. c.clearOrphanObjectsVersionOrDeleteMarkers(batch, dest, logger)
  508. }(i, end)
  509. i = end
  510. }
  511. wg.Wait()
  512. return nil
  513. }
  514. func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarkers(ods []ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
  515. for _, od := range ods {
  516. c.clearOrphanObjectsVersionOrDeleteMarker(od, dest, logger)
  517. }
  518. }
  519. func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarker(od ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
  520. if od.GetKey() == nil || od.GetLastModified() == nil {
  521. logger.Errorf("Ignoring object with missing fields: %v %v", od.GetKey(), od.GetLastModified())
  522. return
  523. }
  524. objectKey := *od.GetKey()
  525. lastModified := *od.GetLastModified()
  526. logger = logger.WithFields(log.Fields{
  527. "object_key": objectKey,
  528. "last_modified": lastModified,
  529. })
  530. exists, err := c.ObjectRepo.DoesObjectOrTempObjectExist(objectKey)
  531. if err != nil {
  532. logger.Error(stacktrace.Propagate(err, "Failed to determine if object already exists in DB"))
  533. return
  534. }
  535. if exists {
  536. return
  537. }
  538. // 2 days ago
  539. cutoff := stime.Now().AddDate(0, 0, -2)
  540. // As a safety check, ignore very recent objects from cleanup
  541. if lastModified.After(cutoff) {
  542. logger.Warnf("Ignoring too-recent orphan object since it was modified after %v", cutoff)
  543. return
  544. }
  545. logger.Infof("Found orphan object %v", od)
  546. if dest.HasComplianceHold {
  547. // Remove compliance hold.
  548. err := c.ObjectController.DisableObjectConditionalHold(&dest.Client, *dest.Bucket, objectKey)
  549. if err != nil {
  550. logger.Error(stacktrace.Propagate(err, "Failed to disable conditional hold on object"))
  551. return
  552. }
  553. // Add the object to the cleanup queue with an expiry time that is after
  554. // the compliance hold would've passed. Add 2 days of buffer too.
  555. expiryDays := s3config.WasabiObjectConditionalHoldDays + 2
  556. expiryTime := time.MicrosecondsAfterDays(expiryDays)
  557. c.addCleanupEntryForObjectKey(objectKey, dest.DC, expiryTime)
  558. logger.Infof("Disabled compliance hold and added an entry to cleanup orphan object after %v", expiryTime)
  559. } else {
  560. // Delete it right away.
  561. versionID := od.GetVersionId()
  562. logger.Infof("Deleting version '%s'", enteString.EmptyIfNil(versionID))
  563. err := c.DeleteObjectVersion(objectKey, versionID, dest)
  564. if err != nil {
  565. logger.Error(stacktrace.Propagate(err, "Failed to delete object"))
  566. }
  567. c.mOrphanObjectsDeleted.WithLabelValues(dest.DC).Inc()
  568. }
  569. }
  570. // DeleteObjectVersion can be used to delete objects from versioned buckets.
  571. //
  572. // If we delete an object in a versioning enabled bucket, deletion does not
  573. // actually remove the object and instead creates a delete marker:
  574. //
  575. // - When we delete a file, it creates a delete marker
  576. // - The delete marker becomes the latest version
  577. // - The old version of the file still remains
  578. //
  579. // If we explicitly pass a version ID in the delete call, then the delete marker
  580. // won't get created.
  581. //
  582. // > To delete versioned objects permanently, use `DELETE Object versionId`
  583. //
  584. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeletingObjectVersions.html
  585. func (c *ObjectCleanupController) DeleteObjectVersion(objectKey string, versionID *string, dest *CleanupOrphanObjectsDestination) error {
  586. _, err := dest.Client.DeleteObject(&s3.DeleteObjectInput{
  587. Bucket: dest.Bucket,
  588. Key: &objectKey,
  589. VersionId: versionID,
  590. })
  591. return stacktrace.Propagate(err, "")
  592. }