object_cleanup.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. package controller
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. stime "time"
  9. "github.com/ente-io/museum/pkg/controller/lock"
  10. "github.com/ente-io/stacktrace"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/prometheus/client_golang/prometheus/promauto"
  13. "github.com/spf13/viper"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/ente-io/museum/ente"
  16. "github.com/ente-io/museum/pkg/repo"
  17. "github.com/ente-io/museum/pkg/utils/s3config"
  18. enteString "github.com/ente-io/museum/pkg/utils/string"
  19. "github.com/ente-io/museum/pkg/utils/time"
  20. log "github.com/sirupsen/logrus"
  21. )
  22. // ObjectCleanupController exposes functions to remove orphan and stale entries
  23. // from object storage.
  24. //
  25. // There are 3 main types of orphans that can end up in our object storage:
  26. //
  27. // 1. We create presigned URLs for clients to upload their objects to. It might
  28. // happen that the client is able to successfully upload to these URLs, but
  29. // not tell museum about the successful upload.
  30. //
  31. // 2. During replication, we might have half-done multipart uploads.
  32. //
  33. // 3. When an existing object is updated (e.g. the user edits the file on iOS),
  34. // then the file entry in our DB is updated to point to the new object, and
  35. // the old object is now meant to be discarded.
  36. //
  37. // ObjectCleanupController is meant to manage all these scenarios over time.
  38. type ObjectCleanupController struct {
  39. Repo *repo.ObjectCleanupRepository
  40. ObjectRepo *repo.ObjectRepository
  41. LockController *lock.LockController
  42. ObjectController *ObjectController
  43. S3Config *s3config.S3Config
  44. // Prometheus Metrics
  45. mOrphanObjectsDeleted *prometheus.CounterVec
  46. }
  47. // PreSignedRequestValidityDuration is the lifetime of a pre-signed URL
  48. const PreSignedRequestValidityDuration = 7 * 24 * stime.Hour
  49. // PreSignedPartUploadRequestDuration is the lifetime of a pre-signed multipart URL
  50. const PreSignedPartUploadRequestDuration = 7 * 24 * stime.Hour
  51. // clearOrphanObjectsCheckInterval is the interval after which we check if the
  52. // ClearOrphanObjects job needs to be re-run.
  53. //
  54. // See also, clearOrphanObjectsMinimumJobInterval.
  55. const clearOrphanObjectsCheckInterval = 1 * 24 * stime.Hour
  56. // ClearOrphanObjectsMinimumJobInterval is the minimum interval that must pass
  57. // before we run another instance of the ClearOrphanObjects job.
  58. //
  59. // This interval is enforced across museum instances.
  60. const clearOrphanObjectsMinimumJobInterval = 2 * 24 * stime.Hour
  61. // Return a new instance of ObjectCleanupController
  62. func NewObjectCleanupController(
  63. objectCleanupRepo *repo.ObjectCleanupRepository,
  64. objectRepo *repo.ObjectRepository,
  65. lockController *lock.LockController,
  66. objectController *ObjectController,
  67. s3Config *s3config.S3Config,
  68. ) *ObjectCleanupController {
  69. mOrphanObjectsDeleted := promauto.NewCounterVec(prometheus.CounterOpts{
  70. Name: "museum_orphan_objects_deleted_total",
  71. Help: "Number of objects successfully deleted when clearing orphan objects",
  72. }, []string{"dc"})
  73. return &ObjectCleanupController{
  74. Repo: objectCleanupRepo,
  75. ObjectRepo: objectRepo,
  76. LockController: lockController,
  77. ObjectController: objectController,
  78. S3Config: s3Config,
  79. mOrphanObjectsDeleted: mOrphanObjectsDeleted,
  80. }
  81. }
  82. // StartRemovingUnreportedObjects starts goroutines to cleanup deletes those
  83. // objects that were possibly uploaded but not reported to the database
  84. func (c *ObjectCleanupController) StartRemovingUnreportedObjects() {
  85. // TODO: object_cleanup: This code is only currently tested for B2
  86. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  87. log.Info("Skipping RemovingUnreportedObjects since the Hot DC is not B2")
  88. return
  89. }
  90. workerCount := viper.GetInt("jobs.remove-unreported-objects.worker-count")
  91. if workerCount == 0 {
  92. workerCount = 1
  93. }
  94. log.Infof("Starting %d workers to remove-unreported-objects", workerCount)
  95. for i := 0; i < workerCount; i++ {
  96. go c.removeUnreportedObjectsWorker(i)
  97. }
  98. }
  99. // Entry point for the worker goroutine to cleanup unreported objects.
  100. //
  101. // i is an arbitrary index for the current goroutine.
  102. func (c *ObjectCleanupController) removeUnreportedObjectsWorker(i int) {
  103. for {
  104. count := c.removeUnreportedObjects()
  105. if count == 0 {
  106. stime.Sleep(stime.Duration(5+i) * stime.Minute)
  107. } else {
  108. stime.Sleep(stime.Second)
  109. }
  110. }
  111. }
  112. func (c *ObjectCleanupController) removeUnreportedObjects() int {
  113. logger := log.WithFields(log.Fields{
  114. "task": "remove-unreported-objects",
  115. })
  116. logger.Info("Removing unreported objects")
  117. count := 0
  118. tx, tempObjects, err := c.Repo.GetAndLockExpiredObjects()
  119. if err != nil {
  120. if !errors.Is(err, sql.ErrNoRows) {
  121. logger.Error(err)
  122. }
  123. return count
  124. }
  125. for _, tempObject := range tempObjects {
  126. err = c.removeUnreportedObject(tx, tempObject)
  127. if err != nil {
  128. continue
  129. }
  130. count += 1
  131. }
  132. logger.Infof("Removed %d objects", count)
  133. // We always commit the transaction, even on errors for individual rows. To
  134. // avoid object getting stuck in a loop, we increase their expiry times.
  135. cerr := tx.Commit()
  136. if cerr != nil {
  137. cerr = stacktrace.Propagate(err, "Failed to commit transaction")
  138. logger.Error(cerr)
  139. }
  140. return count
  141. }
  142. func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error {
  143. // TODO: object_cleanup
  144. // This should use the DC from TempObject (once we start persisting it)
  145. // dc := t.DataCenter
  146. dc := c.S3Config.GetHotDataCenter()
  147. logger := log.WithFields(log.Fields{
  148. "task": "remove-unreported-objects",
  149. "object_key": t.ObjectKey,
  150. "data_center": dc,
  151. "upload_id": t.UploadID,
  152. })
  153. skip := func(err error) error {
  154. logger.Errorf("Clearing tempObject failed: %v", err)
  155. newExpiry := time.MicrosecondsAfterDays(1)
  156. serr := c.Repo.SetExpiryForTempObject(tx, t, newExpiry)
  157. if serr != nil {
  158. logger.Errorf("Updating expiry for failed temp object failed: %v", serr)
  159. }
  160. return err
  161. }
  162. logger.Info("Clearing tempObject")
  163. exists, err := c.ObjectRepo.DoesObjectExist(tx, t.ObjectKey)
  164. if err != nil {
  165. return skip(stacktrace.Propagate(err, ""))
  166. }
  167. if exists {
  168. err := errors.New("aborting attempt to delete an object which has a DB entry")
  169. return skip(stacktrace.Propagate(err, ""))
  170. }
  171. if t.IsMultipart {
  172. err = c.abortMultipartUpload(t.ObjectKey, t.UploadID, dc)
  173. } else {
  174. err = c.DeleteObjectFromDataCenter(t.ObjectKey, dc)
  175. }
  176. if err != nil {
  177. return skip(err)
  178. }
  179. err = c.Repo.RemoveTempObject(tx, t)
  180. if err != nil {
  181. return skip(err)
  182. }
  183. return nil
  184. }
  185. // AddTempObjectKey creates a new temporary object entry.
  186. //
  187. // It persists a given object key as having been provided to a client for
  188. // uploading. If a client does not successfully mark this object's upload as
  189. // having completed within PreSignedRequestValidityDuration, this temp object
  190. // will be cleaned up.
  191. func (c *ObjectCleanupController) AddTempObjectKey(objectKey string, dc string) error {
  192. expiry := time.Microseconds() + (2 * PreSignedRequestValidityDuration.Microseconds())
  193. return c.addCleanupEntryForObjectKey(objectKey, dc, expiry)
  194. }
  195. // Add the object to a queue of "temporary" objects that are deleted (if they
  196. // exist) if this entry is not removed from the queue by expirationTime.
  197. func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string, dc string, expirationTime int64) error {
  198. err := c.Repo.AddTempObject(ente.TempObject{
  199. ObjectKey: objectKey,
  200. IsMultipart: false,
  201. DataCenter: dc,
  202. }, expirationTime)
  203. return stacktrace.Propagate(err, "")
  204. }
  205. // AddTempObjectMultipartKey creates a new temporary object entry for a
  206. // multlipart upload.
  207. //
  208. // See AddTempObjectKey for more details.
  209. func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, uploadID string, dc string) error {
  210. expiry := time.Microseconds() + (2 * PreSignedPartUploadRequestDuration.Microseconds())
  211. err := c.Repo.AddTempObject(ente.TempObject{
  212. ObjectKey: objectKey,
  213. IsMultipart: true,
  214. UploadID: uploadID,
  215. DataCenter: dc,
  216. }, expiry)
  217. return stacktrace.Propagate(err, "")
  218. }
  219. func (c *ObjectCleanupController) DeleteAllObjectsWithPrefix(prefix string, dc string) error {
  220. s3Client := c.S3Config.GetS3Client(dc)
  221. bucket := c.S3Config.GetBucket(dc)
  222. output, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
  223. Bucket: bucket,
  224. Prefix: &prefix,
  225. })
  226. if err != nil {
  227. log.WithFields(log.Fields{
  228. "prefix": prefix,
  229. "dc": dc,
  230. }).WithError(err).Error("Failed to list objects")
  231. return stacktrace.Propagate(err, "")
  232. }
  233. var keys []string
  234. for _, obj := range output.Contents {
  235. keys = append(keys, *obj.Key)
  236. }
  237. for _, key := range keys {
  238. err = c.DeleteObjectFromDataCenter(key, dc)
  239. if err != nil {
  240. log.WithFields(log.Fields{
  241. "object_key": key,
  242. "dc": dc,
  243. }).WithError(err).Error("Failed to delete object")
  244. return stacktrace.Propagate(err, "")
  245. }
  246. }
  247. return nil
  248. }
  249. func (c *ObjectCleanupController) DeleteObjectFromDataCenter(objectKey string, dc string) error {
  250. log.Info("Deleting " + objectKey + " from " + dc)
  251. var s3Client = c.S3Config.GetS3Client(dc)
  252. bucket := c.S3Config.GetBucket(dc)
  253. _, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
  254. Bucket: bucket,
  255. Key: &objectKey,
  256. })
  257. if err != nil {
  258. return stacktrace.Propagate(err, "")
  259. }
  260. err = s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
  261. Bucket: bucket,
  262. Key: &objectKey,
  263. })
  264. if err != nil {
  265. return stacktrace.Propagate(err, "")
  266. }
  267. return nil
  268. }
  269. func (c *ObjectCleanupController) abortMultipartUpload(objectKey string, uploadID string, dc string) error {
  270. s3Client := c.S3Config.GetS3Client(dc)
  271. bucket := c.S3Config.GetBucket(dc)
  272. _, err := s3Client.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
  273. Bucket: bucket,
  274. Key: &objectKey,
  275. UploadId: &uploadID,
  276. })
  277. if err != nil {
  278. if isUnknownUploadError(err) {
  279. log.Info("Could not find upload for " + objectKey)
  280. return nil
  281. }
  282. return stacktrace.Propagate(err, "")
  283. }
  284. r, err := s3Client.ListParts(&s3.ListPartsInput{
  285. Bucket: bucket,
  286. Key: &objectKey,
  287. UploadId: &uploadID,
  288. })
  289. if err != nil {
  290. if isUnknownUploadError(err) {
  291. // This is expected now, since we just aborted the upload
  292. return nil
  293. }
  294. return stacktrace.Propagate(err, "")
  295. }
  296. if len(r.Parts) > 0 {
  297. return stacktrace.NewError("abort Failed")
  298. }
  299. return nil
  300. }
  301. // The original code here checked for NoSuchUpload, presumably because that is
  302. // the error that B2 returns.
  303. //
  304. // Wasabi returns something similar:
  305. //
  306. // <Error>
  307. // <Code>NoSuchUpload</Code>
  308. // <Message>The specified upload does not exist. The upload ID may be invalid,
  309. // or the upload may have been aborted or completed.</Message>
  310. // ...
  311. //
  312. // However, Scaleway returns a different error, NoSuchKey
  313. //
  314. // <Error>
  315. // <Code>NoSuchKey</Code>
  316. // <Message>The specified key does not exist.</Message>
  317. // ...
  318. //
  319. // This method returns true if either of these occur.
  320. func isUnknownUploadError(err error) bool {
  321. // B2, Wasabi
  322. if strings.Contains(err.Error(), "NoSuchUpload") {
  323. return true
  324. }
  325. // Scaleway
  326. if strings.Contains(err.Error(), "NoSuchKey") {
  327. return true
  328. }
  329. return false
  330. }
  331. // StartClearingOrphanObjects is the entry point for the job that goes through
  332. // all the objects in the given datacenter, and deletes orphan objects for which
  333. // we do not have DB entries.
  334. //
  335. // Such orphan objects are expected to have been created because the code for
  336. // updating the DB entries when a file gets updated did not cleanup the
  337. // corresponding objects from object storage. Once we start keeping track of
  338. // such objects in a separate queue, this cron job won't be needed.
  339. func (c *ObjectCleanupController) StartClearingOrphanObjects() {
  340. // TODO: object_cleanup: This code is only currently tested for B2
  341. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  342. log.Info("Skipping ClearingOrphanObjects since the Hot DC is not B2")
  343. return
  344. }
  345. isJobEnabled := viper.GetBool("jobs.clear-orphan-objects.enabled")
  346. if !isJobEnabled {
  347. return
  348. }
  349. prefix := viper.GetString("jobs.clear-orphan-objects.prefix")
  350. log.Infof("Starting workers to clear-orphan-objects (prefix %s)", prefix)
  351. // TODO: object_cleanup: start workers for other DCs once the temp_objects
  352. // table supports specifying a DC
  353. go c.clearOrphanObjectsWorker(c.S3Config.GetHotBackblazeDC(), prefix)
  354. }
  355. // clearOrphanObjectsWorker is the entry point for the worker goroutine to
  356. // cleanup objects in a particular DC.
  357. func (c *ObjectCleanupController) clearOrphanObjectsWorker(dc string, prefix string) {
  358. for {
  359. c.ClearOrphanObjects(dc, prefix, false)
  360. stime.Sleep(clearOrphanObjectsCheckInterval)
  361. }
  362. }
  363. // IsValidClearOrphanObjectsDC verifies that the given DC is valid for use as
  364. // the target of an orphan object cleanup.
  365. func (c *ObjectCleanupController) IsValidClearOrphanObjectsDC(dc string) bool {
  366. if dc != c.S3Config.GetHotBackblazeDC() {
  367. return false
  368. }
  369. // TODO: object_cleanup: This code is only currently tested for B2
  370. if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
  371. return false
  372. }
  373. return true
  374. }
  375. func (c *ObjectCleanupController) ClearOrphanObjects(dc string, prefix string, forceTaskLock bool) {
  376. logger := log.WithFields(log.Fields{
  377. "task": "clear-orphan-objects",
  378. "data_center": dc,
  379. })
  380. if !c.IsValidClearOrphanObjectsDC(dc) {
  381. logger.Errorf("Unsupported DC %s", dc)
  382. return
  383. }
  384. lockName := clearOrphanObjectsLockName(dc)
  385. if forceTaskLock {
  386. logger.Infof("Forcefully removing task lock %s", lockName)
  387. err := c.LockController.TaskLockingRepo.ReleaseLock(lockName)
  388. if err != nil {
  389. logger.Error(stacktrace.Propagate(err, ""))
  390. return
  391. }
  392. }
  393. if !c.LockController.TryLock(lockName, clearOrphanObjectsNextLockUntil()) {
  394. logger.Infof("Skipping since a lock could not be obtained")
  395. return
  396. }
  397. // The lock is not released intentionally
  398. //
  399. // By keeping the stale entry for the unheld lock in the DB, we will be able
  400. // to retain the timestamp when this job last ran. This is a kludgy way to
  401. // guarantee that clearOrphanObjectsMinimumJobInterval is enforced across
  402. // all museum instances (without introducing a new DB table).
  403. //
  404. // defer c.LockController.ReleaseLock(lockName)
  405. s3Config := c.S3Config
  406. dest := &CleanupOrphanObjectsDestination{
  407. DC: dc,
  408. Client: s3Config.GetS3Client(dc),
  409. Bucket: s3Config.GetBucket(dc),
  410. HasComplianceHold: s3Config.WasabiComplianceDC() == dc,
  411. }
  412. logger.Infof("Clearing orphan objects from bucket %s (hasComplianceHold %v)",
  413. *dest.Bucket, dest.HasComplianceHold)
  414. // Each directory listing of an S3 bucket returns a maximum of 1000 objects,
  415. // and an optional continuation token. Until there are more objects
  416. // (indicated by the presence of the continuation token), keep fetching
  417. // directory listings.
  418. //
  419. // For each directory listing, spawn 10 goroutines to go through chunks of
  420. // 100 each to clear orphan objects.
  421. //
  422. // Refresh the lock's acquisition time during each iteration since this job
  423. // can span hours, and we don't want a different instance to start another
  424. // run just because it was only considering the start time of the job.
  425. err := dest.Client.ListObjectVersionsPages(&s3.ListObjectVersionsInput{
  426. Bucket: dest.Bucket,
  427. Prefix: &prefix,
  428. },
  429. func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
  430. c.clearOrphanObjectsPage(page, dest, logger)
  431. lerr := c.LockController.ExtendLock(lockName, clearOrphanObjectsNextLockUntil())
  432. if lerr != nil {
  433. logger.Error(lerr)
  434. return false
  435. }
  436. return true
  437. })
  438. if err != nil {
  439. logger.Error(stacktrace.Propagate(err, ""))
  440. return
  441. }
  442. logger.Info("Cleared orphan objects")
  443. }
  444. func clearOrphanObjectsLockName(dc string) string {
  445. return fmt.Sprintf("clear-orphan-objects:%s", dc)
  446. }
  447. func clearOrphanObjectsNextLockUntil() int64 {
  448. return time.Microseconds() + clearOrphanObjectsMinimumJobInterval.Microseconds()
  449. }
  450. type CleanupOrphanObjectsDestination struct {
  451. DC string
  452. Client s3.S3
  453. Bucket *string
  454. // If true, this bucket has a compliance hold on objects that needs to be
  455. // removed first before they can be deleted.
  456. HasComplianceHold bool
  457. }
  458. // ObjectVersionOrDeleteMarker is an abstraction to allow us to reuse the same
  459. // code to delete both object versions and delete markers
  460. type ObjectVersionOrDeleteMarker struct {
  461. ObjectVersion *s3.ObjectVersion
  462. DeleteMarker *s3.DeleteMarkerEntry
  463. }
  464. func (od ObjectVersionOrDeleteMarker) GetKey() *string {
  465. if od.ObjectVersion != nil {
  466. return od.ObjectVersion.Key
  467. }
  468. return od.DeleteMarker.Key
  469. }
  470. func (od ObjectVersionOrDeleteMarker) GetLastModified() *stime.Time {
  471. if od.ObjectVersion != nil {
  472. return od.ObjectVersion.LastModified
  473. }
  474. return od.DeleteMarker.LastModified
  475. }
  476. func (od ObjectVersionOrDeleteMarker) GetVersionId() *string {
  477. if od.ObjectVersion != nil {
  478. return od.ObjectVersion.VersionId
  479. }
  480. return od.DeleteMarker.VersionId
  481. }
  482. func (c *ObjectCleanupController) clearOrphanObjectsPage(page *s3.ListObjectVersionsOutput, dest *CleanupOrphanObjectsDestination, logger *log.Entry) error {
  483. // MaxKeys is 1000. Until we can, break it into batches and create a
  484. // separate goroutine to process each batch.
  485. batchSize := 10
  486. versions := page.Versions
  487. nv := len(versions)
  488. deleteMarkers := page.DeleteMarkers
  489. nd := len(deleteMarkers)
  490. n := nv + nd
  491. logger.Infof("Processing page containing %d values (%d object versions, %d delete markers)", n, nv, nd)
  492. ods := make([]ObjectVersionOrDeleteMarker, n)
  493. for i := 0; i < nv; i++ {
  494. ods[i] = ObjectVersionOrDeleteMarker{ObjectVersion: versions[i]}
  495. }
  496. for i := 0; i < nd; i++ {
  497. ods[nv+i] = ObjectVersionOrDeleteMarker{DeleteMarker: deleteMarkers[i]}
  498. }
  499. var wg sync.WaitGroup
  500. for i := 0; i < n; i++ {
  501. end := i + batchSize
  502. if end > n {
  503. end = n
  504. }
  505. if i >= end {
  506. // Nothing left
  507. break
  508. }
  509. wg.Add(1)
  510. go func(i int, end int) {
  511. defer wg.Done()
  512. batch := ods[i:end]
  513. c.clearOrphanObjectsVersionOrDeleteMarkers(batch, dest, logger)
  514. }(i, end)
  515. i = end
  516. }
  517. wg.Wait()
  518. return nil
  519. }
  520. func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarkers(ods []ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
  521. for _, od := range ods {
  522. c.clearOrphanObjectsVersionOrDeleteMarker(od, dest, logger)
  523. }
  524. }
  525. func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarker(od ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
  526. if od.GetKey() == nil || od.GetLastModified() == nil {
  527. logger.Errorf("Ignoring object with missing fields: %v %v", od.GetKey(), od.GetLastModified())
  528. return
  529. }
  530. objectKey := *od.GetKey()
  531. lastModified := *od.GetLastModified()
  532. logger = logger.WithFields(log.Fields{
  533. "object_key": objectKey,
  534. "last_modified": lastModified,
  535. })
  536. exists, err := c.ObjectRepo.DoesObjectOrTempObjectExist(objectKey)
  537. if err != nil {
  538. logger.Error(stacktrace.Propagate(err, "Failed to determine if object already exists in DB"))
  539. return
  540. }
  541. if exists {
  542. return
  543. }
  544. // 2 days ago
  545. cutoff := stime.Now().AddDate(0, 0, -2)
  546. // As a safety check, ignore very recent objects from cleanup
  547. if lastModified.After(cutoff) {
  548. logger.Warnf("Ignoring too-recent orphan object since it was modified after %v", cutoff)
  549. return
  550. }
  551. logger.Infof("Found orphan object %v", od)
  552. if dest.HasComplianceHold {
  553. // Remove compliance hold.
  554. err := c.ObjectController.DisableObjectConditionalHold(&dest.Client, *dest.Bucket, objectKey)
  555. if err != nil {
  556. logger.Error(stacktrace.Propagate(err, "Failed to disable conditional hold on object"))
  557. return
  558. }
  559. // Add the object to the cleanup queue with an expiry time that is after
  560. // the compliance hold would've passed. Add 2 days of buffer too.
  561. expiryDays := s3config.WasabiObjectConditionalHoldDays + 2
  562. expiryTime := time.MicrosecondsAfterDays(expiryDays)
  563. c.addCleanupEntryForObjectKey(objectKey, dest.DC, expiryTime)
  564. logger.Infof("Disabled compliance hold and added an entry to cleanup orphan object after %v", expiryTime)
  565. } else {
  566. // Delete it right away.
  567. versionID := od.GetVersionId()
  568. logger.Infof("Deleting version '%s'", enteString.EmptyIfNil(versionID))
  569. err := c.DeleteObjectVersion(objectKey, versionID, dest)
  570. if err != nil {
  571. logger.Error(stacktrace.Propagate(err, "Failed to delete object"))
  572. }
  573. c.mOrphanObjectsDeleted.WithLabelValues(dest.DC).Inc()
  574. }
  575. }
  576. // DeleteObjectVersion can be used to delete objects from versioned buckets.
  577. //
  578. // If we delete an object in a versioning enabled bucket, deletion does not
  579. // actually remove the object and instead creates a delete marker:
  580. //
  581. // - When we delete a file, it creates a delete marker
  582. // - The delete marker becomes the latest version
  583. // - The old version of the file still remains
  584. //
  585. // If we explicitly pass a version ID in the delete call, then the delete marker
  586. // won't get created.
  587. //
  588. // > To delete versioned objects permanently, use `DELETE Object versionId`
  589. //
  590. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeletingObjectVersions.html
  591. func (c *ObjectCleanupController) DeleteObjectVersion(objectKey string, versionID *string, dest *CleanupOrphanObjectsDestination) error {
  592. _, err := dest.Client.DeleteObject(&s3.DeleteObjectInput{
  593. Bucket: dest.Bucket,
  594. Key: &objectKey,
  595. VersionId: versionID,
  596. })
  597. return stacktrace.Propagate(err, "")
  598. }