object.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package controller
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/service/s3"
  6. "github.com/ente-io/museum/pkg/controller/lock"
  7. "github.com/ente-io/museum/pkg/external/wasabi"
  8. "github.com/ente-io/museum/pkg/repo"
  9. "github.com/ente-io/museum/pkg/utils/file"
  10. "github.com/ente-io/museum/pkg/utils/s3config"
  11. "github.com/ente-io/museum/pkg/utils/time"
  12. "github.com/ente-io/stacktrace"
  13. log "github.com/sirupsen/logrus"
  14. )
  15. // ObjectController manages various operations specific to object storage,
  16. // including dealing with the special cases for individual replicas.
  17. //
  18. // The user's encrypted data is replicated to three places - 2 hot storage data
  19. // centers, and 1 cold storage. All three of them provide S3 compatible APIs
  20. // that we use to add and remove objects. However, there are still some specific
  21. // (and intentional) differences in the way the three replicas work. e.g.
  22. // objects stored in Wasabi are also placed under a special compliance mode,
  23. // which is a Wasabi specific feature.
  24. type ObjectController struct {
  25. S3Config *s3config.S3Config
  26. ObjectRepo *repo.ObjectRepository
  27. QueueRepo *repo.QueueRepository
  28. LockController *lock.LockController
  29. complianceCronRunning bool
  30. }
  31. const (
  32. RemoveComplianceHoldsLock = "remove_compliance_holds_lock"
  33. )
  34. // RemoveComplianceHolds removes the Wasabi compliance hold from objects in
  35. // Wasabi for files which have been deleted.
  36. //
  37. // Removing the compliance hold will allow these files to then be deleted when
  38. // we subsequently attempt to delete the objects from Wasabi after
  39. // DeleteObjectQueue delay (x days currently).
  40. func (c *ObjectController) RemoveComplianceHolds() {
  41. if c.S3Config.WasabiComplianceDC() == "" {
  42. // Wasabi compliance is currently disabled in config, nothing to do.
  43. return
  44. }
  45. if c.complianceCronRunning {
  46. log.Info("Skipping RemoveComplianceHolds cron run as another instance is still running")
  47. return
  48. }
  49. c.complianceCronRunning = true
  50. defer func() {
  51. c.complianceCronRunning = false
  52. }()
  53. lockStatus := c.LockController.TryLock(RemoveComplianceHoldsLock, time.MicrosecondsAfterHours(24))
  54. if !lockStatus {
  55. log.Warning(fmt.Sprintf("Failed to acquire lock %s", RemoveComplianceHoldsLock))
  56. return
  57. }
  58. defer func() {
  59. c.LockController.ReleaseLock(RemoveComplianceHoldsLock)
  60. }()
  61. items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.RemoveComplianceHoldQueue, 1000)
  62. if err != nil {
  63. log.WithError(err).Error("Failed to fetch items from queue")
  64. return
  65. }
  66. log.Infof("Removing compliance holds on %d deleted files", len(items))
  67. for _, i := range items {
  68. c.removeComplianceHold(i)
  69. }
  70. log.Infof("Removed compliance holds on %d deleted files", len(items))
  71. }
  72. func (c *ObjectController) removeComplianceHold(qItem repo.QueueItem) {
  73. logger := log.WithFields(log.Fields{
  74. "item": qItem.Item,
  75. "queue_id": qItem.Id,
  76. })
  77. objectKey := qItem.Item
  78. lockName := file.GetLockNameForObject(objectKey)
  79. if !c.LockController.TryLock(lockName, time.MicrosecondsAfterHours(1)) {
  80. logger.Info("Unable to acquire lock")
  81. return
  82. }
  83. defer c.LockController.ReleaseLock(lockName)
  84. dcs, err := c.ObjectRepo.GetDataCentersForObject(objectKey)
  85. if err != nil {
  86. logger.Error("Could not fetch datacenters", err)
  87. return
  88. }
  89. config := c.S3Config
  90. complianceDC := config.WasabiComplianceDC()
  91. s3Client := config.GetS3Client(complianceDC)
  92. bucket := *config.GetBucket(complianceDC)
  93. for _, dc := range dcs {
  94. if dc == complianceDC {
  95. logger.Info("Removing compliance hold")
  96. err = c.DisableObjectConditionalHold(&s3Client, bucket, objectKey)
  97. if err != nil {
  98. logger.WithError(err).Errorf("Failed to remove compliance hold (dc: %s, bucket: %s)", dc, bucket)
  99. return
  100. }
  101. logger.Infof("Removed compliance hold for %s/%s", bucket, objectKey)
  102. break
  103. }
  104. }
  105. err = c.QueueRepo.DeleteItem(repo.RemoveComplianceHoldQueue, qItem.Item)
  106. if err != nil {
  107. logger.WithError(err).Error("Failed to remove item from the queue")
  108. return
  109. }
  110. }
  111. // DisableObjectConditionalHold disables the Wasabi compliance conditional hold
  112. // that has been placed on object. This way, we can enable these objects to be
  113. // cleaned up when the user permanently deletes them.
  114. func (c *ObjectController) DisableObjectConditionalHold(s3Client *s3.S3, bucket string, objectKey string) error {
  115. _, err := wasabi.PutObjectCompliance(s3Client, &wasabi.PutObjectComplianceInput{
  116. Bucket: aws.String(bucket),
  117. Key: aws.String(objectKey),
  118. ObjectComplianceConfiguration: &wasabi.ObjectComplianceConfiguration{
  119. ConditionalHold: aws.Bool(false),
  120. },
  121. })
  122. return stacktrace.Propagate(err, "Failed to update ObjectCompliance for %s/%s", bucket, objectKey)
  123. }