123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- package controller
- import (
- "database/sql"
- "errors"
- "fmt"
- "strings"
- "sync"
- stime "time"
- "github.com/ente-io/museum/pkg/controller/lock"
- "github.com/ente-io/stacktrace"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- "github.com/spf13/viper"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/ente-io/museum/ente"
- "github.com/ente-io/museum/pkg/repo"
- "github.com/ente-io/museum/pkg/utils/s3config"
- enteString "github.com/ente-io/museum/pkg/utils/string"
- "github.com/ente-io/museum/pkg/utils/time"
- log "github.com/sirupsen/logrus"
- )
- // ObjectCleanupController exposes functions to remove orphan and stale entries
- // from object storage.
- //
- // There are 3 main types of orphans that can end up in our object storage:
- //
- // 1. We create presigned URLs for clients to upload their objects to. It might
- // happen that the client is able to successfully upload to these URLs, but
- // not tell museum about the successful upload.
- //
- // 2. During replication, we might have half-done multipart uploads.
- //
- // 3. When an existing object is updated (e.g. the user edits the file on iOS),
- // then the file entry in our DB is updated to point to the new object, and
- // the old object is now meant to be discarded.
- //
- // ObjectCleanupController is meant to manage all these scenarios over time.
- type ObjectCleanupController struct {
- Repo *repo.ObjectCleanupRepository
- ObjectRepo *repo.ObjectRepository
- LockController *lock.LockController
- ObjectController *ObjectController
- S3Config *s3config.S3Config
- // Prometheus Metrics
- mOrphanObjectsDeleted *prometheus.CounterVec
- }
- // PreSignedRequestValidityDuration is the lifetime of a pre-signed URL
- const PreSignedRequestValidityDuration = 7 * 24 * stime.Hour
- // PreSignedPartUploadRequestDuration is the lifetime of a pre-signed multipart URL
- const PreSignedPartUploadRequestDuration = 7 * 24 * stime.Hour
- // clearOrphanObjectsCheckInterval is the interval after which we check if the
- // ClearOrphanObjects job needs to be re-run.
- //
- // See also, clearOrphanObjectsMinimumJobInterval.
- const clearOrphanObjectsCheckInterval = 1 * 24 * stime.Hour
- // ClearOrphanObjectsMinimumJobInterval is the minimum interval that must pass
- // before we run another instance of the ClearOrphanObjects job.
- //
- // This interval is enforced across museum instances.
- const clearOrphanObjectsMinimumJobInterval = 2 * 24 * stime.Hour
- // Return a new instance of ObjectCleanupController
- func NewObjectCleanupController(
- objectCleanupRepo *repo.ObjectCleanupRepository,
- objectRepo *repo.ObjectRepository,
- lockController *lock.LockController,
- objectController *ObjectController,
- s3Config *s3config.S3Config,
- ) *ObjectCleanupController {
- mOrphanObjectsDeleted := promauto.NewCounterVec(prometheus.CounterOpts{
- Name: "museum_orphan_objects_deleted_total",
- Help: "Number of objects successfully deleted when clearing orphan objects",
- }, []string{"dc"})
- return &ObjectCleanupController{
- Repo: objectCleanupRepo,
- ObjectRepo: objectRepo,
- LockController: lockController,
- ObjectController: objectController,
- S3Config: s3Config,
- mOrphanObjectsDeleted: mOrphanObjectsDeleted,
- }
- }
- // StartRemovingUnreportedObjects starts goroutines to cleanup deletes those
- // objects that were possibly uploaded but not reported to the database
- func (c *ObjectCleanupController) StartRemovingUnreportedObjects() {
- // TODO: object_cleanup: This code is only currently tested for B2
- if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
- log.Info("Skipping RemovingUnreportedObjects since the Hot DC is not B2")
- return
- }
- workerCount := viper.GetInt("jobs.remove-unreported-objects.worker-count")
- if workerCount == 0 {
- workerCount = 1
- }
- log.Infof("Starting %d workers to remove-unreported-objects", workerCount)
- for i := 0; i < workerCount; i++ {
- go c.removeUnreportedObjectsWorker(i)
- }
- }
- // Entry point for the worker goroutine to cleanup unreported objects.
- //
- // i is an arbitrary index for the current goroutine.
- func (c *ObjectCleanupController) removeUnreportedObjectsWorker(i int) {
- for {
- count := c.removeUnreportedObjects()
- if count == 0 {
- stime.Sleep(stime.Duration(5+i) * stime.Minute)
- } else {
- stime.Sleep(stime.Second)
- }
- }
- }
- func (c *ObjectCleanupController) removeUnreportedObjects() int {
- logger := log.WithFields(log.Fields{
- "task": "remove-unreported-objects",
- })
- logger.Info("Removing unreported objects")
- count := 0
- tx, tempObjects, err := c.Repo.GetAndLockExpiredObjects()
- if err != nil {
- if !errors.Is(err, sql.ErrNoRows) {
- logger.Error(err)
- }
- return count
- }
- for _, tempObject := range tempObjects {
- err = c.removeUnreportedObject(tx, tempObject)
- if err != nil {
- continue
- }
- count += 1
- }
- logger.Infof("Removed %d objects", count)
- // We always commit the transaction, even on errors for individual rows. To
- // avoid object getting stuck in a loop, we increase their expiry times.
- cerr := tx.Commit()
- if cerr != nil {
- cerr = stacktrace.Propagate(err, "Failed to commit transaction")
- logger.Error(cerr)
- }
- return count
- }
- func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error {
- // TODO: object_cleanup
- // This should use the DC from TempObject (once we start persisting it)
- // dc := t.DataCenter
- dc := c.S3Config.GetHotDataCenter()
- logger := log.WithFields(log.Fields{
- "task": "remove-unreported-objects",
- "object_key": t.ObjectKey,
- "data_center": dc,
- "upload_id": t.UploadID,
- })
- skip := func(err error) error {
- logger.Errorf("Clearing tempObject failed: %v", err)
- newExpiry := time.MicrosecondsAfterDays(1)
- serr := c.Repo.SetExpiryForTempObject(tx, t, newExpiry)
- if serr != nil {
- logger.Errorf("Updating expiry for failed temp object failed: %v", serr)
- }
- return err
- }
- logger.Info("Clearing tempObject")
- exists, err := c.ObjectRepo.DoesObjectExist(tx, t.ObjectKey)
- if err != nil {
- return skip(stacktrace.Propagate(err, ""))
- }
- if exists {
- err := errors.New("aborting attempt to delete an object which has a DB entry")
- return skip(stacktrace.Propagate(err, ""))
- }
- if t.IsMultipart {
- err = c.abortMultipartUpload(t.ObjectKey, t.UploadID, dc)
- } else {
- err = c.DeleteObjectFromDataCenter(t.ObjectKey, dc)
- }
- if err != nil {
- return skip(err)
- }
- err = c.Repo.RemoveTempObject(tx, t)
- if err != nil {
- return skip(err)
- }
- return nil
- }
- // AddTempObjectKey creates a new temporary object entry.
- //
- // It persists a given object key as having been provided to a client for
- // uploading. If a client does not successfully mark this object's upload as
- // having completed within PreSignedRequestValidityDuration, this temp object
- // will be cleaned up.
- func (c *ObjectCleanupController) AddTempObjectKey(objectKey string, dc string) error {
- expiry := time.Microseconds() + (2 * PreSignedRequestValidityDuration.Microseconds())
- return c.addCleanupEntryForObjectKey(objectKey, dc, expiry)
- }
- // Add the object to a queue of "temporary" objects that are deleted (if they
- // exist) if this entry is not removed from the queue by expirationTime.
- func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string, dc string, expirationTime int64) error {
- err := c.Repo.AddTempObject(ente.TempObject{
- ObjectKey: objectKey,
- IsMultipart: false,
- DataCenter: dc,
- }, expirationTime)
- return stacktrace.Propagate(err, "")
- }
- // AddTempObjectMultipartKey creates a new temporary object entry for a
- // multlipart upload.
- //
- // See AddTempObjectKey for more details.
- func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, uploadID string, dc string) error {
- expiry := time.Microseconds() + (2 * PreSignedPartUploadRequestDuration.Microseconds())
- err := c.Repo.AddTempObject(ente.TempObject{
- ObjectKey: objectKey,
- IsMultipart: true,
- UploadID: uploadID,
- DataCenter: dc,
- }, expiry)
- return stacktrace.Propagate(err, "")
- }
- func (c *ObjectCleanupController) DeleteAllObjectsWithPrefix(prefix string, dc string) error {
- s3Client := c.S3Config.GetS3Client(dc)
- bucket := c.S3Config.GetBucket(dc)
- output, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
- Bucket: bucket,
- Prefix: &prefix,
- })
- if err != nil {
- log.Error(err)
- return stacktrace.Propagate(err, "")
- }
- var keys []string
- for _, obj := range output.Contents {
- keys = append(keys, *obj.Key)
- }
- for _, key := range keys {
- err = c.DeleteObjectFromDataCenter(key, dc)
- if err != nil {
- log.Error(err)
- return stacktrace.Propagate(err, "")
- }
- }
- return nil
- }
- func (c *ObjectCleanupController) DeleteObjectFromDataCenter(objectKey string, dc string) error {
- log.Info("Deleting " + objectKey + " from " + dc)
- var s3Client = c.S3Config.GetS3Client(dc)
- bucket := c.S3Config.GetBucket(dc)
- _, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
- Bucket: bucket,
- Key: &objectKey,
- })
- if err != nil {
- return stacktrace.Propagate(err, "")
- }
- err = s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
- Bucket: bucket,
- Key: &objectKey,
- })
- if err != nil {
- return stacktrace.Propagate(err, "")
- }
- return nil
- }
- func (c *ObjectCleanupController) abortMultipartUpload(objectKey string, uploadID string, dc string) error {
- s3Client := c.S3Config.GetS3Client(dc)
- bucket := c.S3Config.GetBucket(dc)
- _, err := s3Client.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
- Bucket: bucket,
- Key: &objectKey,
- UploadId: &uploadID,
- })
- if err != nil {
- if isUnknownUploadError(err) {
- log.Info("Could not find upload for " + objectKey)
- return nil
- }
- return stacktrace.Propagate(err, "")
- }
- r, err := s3Client.ListParts(&s3.ListPartsInput{
- Bucket: bucket,
- Key: &objectKey,
- UploadId: &uploadID,
- })
- if err != nil {
- if isUnknownUploadError(err) {
- // This is expected now, since we just aborted the upload
- return nil
- }
- return stacktrace.Propagate(err, "")
- }
- if len(r.Parts) > 0 {
- return stacktrace.NewError("abort Failed")
- }
- return nil
- }
- // The original code here checked for NoSuchUpload, presumably because that is
- // the error that B2 returns.
- //
- // Wasabi returns something similar:
- //
- // <Error>
- // <Code>NoSuchUpload</Code>
- // <Message>The specified upload does not exist. The upload ID may be invalid,
- // or the upload may have been aborted or completed.</Message>
- // ...
- //
- // However, Scaleway returns a different error, NoSuchKey
- //
- // <Error>
- // <Code>NoSuchKey</Code>
- // <Message>The specified key does not exist.</Message>
- // ...
- //
- // This method returns true if either of these occur.
- func isUnknownUploadError(err error) bool {
- // B2, Wasabi
- if strings.Contains(err.Error(), "NoSuchUpload") {
- return true
- }
- // Scaleway
- if strings.Contains(err.Error(), "NoSuchKey") {
- return true
- }
- return false
- }
- // StartClearingOrphanObjects is the entry point for the job that goes through
- // all the objects in the given datacenter, and deletes orphan objects for which
- // we do not have DB entries.
- //
- // Such orphan objects are expected to have been created because the code for
- // updating the DB entries when a file gets updated did not cleanup the
- // corresponding objects from object storage. Once we start keeping track of
- // such objects in a separate queue, this cron job won't be needed.
- func (c *ObjectCleanupController) StartClearingOrphanObjects() {
- // TODO: object_cleanup: This code is only currently tested for B2
- if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
- log.Info("Skipping ClearingOrphanObjects since the Hot DC is not B2")
- return
- }
- isJobEnabled := viper.GetBool("jobs.clear-orphan-objects.enabled")
- if !isJobEnabled {
- return
- }
- prefix := viper.GetString("jobs.clear-orphan-objects.prefix")
- log.Infof("Starting workers to clear-orphan-objects (prefix %s)", prefix)
- // TODO: object_cleanup: start workers for other DCs once the temp_objects
- // table supports specifying a DC
- go c.clearOrphanObjectsWorker(c.S3Config.GetHotBackblazeDC(), prefix)
- }
- // clearOrphanObjectsWorker is the entry point for the worker goroutine to
- // cleanup objects in a particular DC.
- func (c *ObjectCleanupController) clearOrphanObjectsWorker(dc string, prefix string) {
- for {
- c.ClearOrphanObjects(dc, prefix, false)
- stime.Sleep(clearOrphanObjectsCheckInterval)
- }
- }
- // IsValidClearOrphanObjectsDC verifies that the given DC is valid for use as
- // the target of an orphan object cleanup.
- func (c *ObjectCleanupController) IsValidClearOrphanObjectsDC(dc string) bool {
- if dc != c.S3Config.GetHotBackblazeDC() {
- return false
- }
- // TODO: object_cleanup: This code is only currently tested for B2
- if c.S3Config.GetHotDataCenter() != c.S3Config.GetHotBackblazeDC() {
- return false
- }
- return true
- }
- func (c *ObjectCleanupController) ClearOrphanObjects(dc string, prefix string, forceTaskLock bool) {
- logger := log.WithFields(log.Fields{
- "task": "clear-orphan-objects",
- "data_center": dc,
- })
- if !c.IsValidClearOrphanObjectsDC(dc) {
- logger.Errorf("Unsupported DC %s", dc)
- return
- }
- lockName := clearOrphanObjectsLockName(dc)
- if forceTaskLock {
- logger.Infof("Forcefully removing task lock %s", lockName)
- err := c.LockController.TaskLockingRepo.ReleaseLock(lockName)
- if err != nil {
- logger.Error(stacktrace.Propagate(err, ""))
- return
- }
- }
- if !c.LockController.TryLock(lockName, clearOrphanObjectsNextLockUntil()) {
- logger.Infof("Skipping since a lock could not be obtained")
- return
- }
- // The lock is not released intentionally
- //
- // By keeping the stale entry for the unheld lock in the DB, we will be able
- // to retain the timestamp when this job last ran. This is a kludgy way to
- // guarantee that clearOrphanObjectsMinimumJobInterval is enforced across
- // all museum instances (without introducing a new DB table).
- //
- // defer c.LockController.ReleaseLock(lockName)
- s3Config := c.S3Config
- dest := &CleanupOrphanObjectsDestination{
- DC: dc,
- Client: s3Config.GetS3Client(dc),
- Bucket: s3Config.GetBucket(dc),
- HasComplianceHold: s3Config.WasabiComplianceDC() == dc,
- }
- logger.Infof("Clearing orphan objects from bucket %s (hasComplianceHold %v)",
- *dest.Bucket, dest.HasComplianceHold)
- // Each directory listing of an S3 bucket returns a maximum of 1000 objects,
- // and an optional continuation token. Until there are more objects
- // (indicated by the presence of the continuation token), keep fetching
- // directory listings.
- //
- // For each directory listing, spawn 10 goroutines to go through chunks of
- // 100 each to clear orphan objects.
- //
- // Refresh the lock's acquisition time during each iteration since this job
- // can span hours, and we don't want a different instance to start another
- // run just because it was only considering the start time of the job.
- err := dest.Client.ListObjectVersionsPages(&s3.ListObjectVersionsInput{
- Bucket: dest.Bucket,
- Prefix: &prefix,
- },
- func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
- c.clearOrphanObjectsPage(page, dest, logger)
- lerr := c.LockController.ExtendLock(lockName, clearOrphanObjectsNextLockUntil())
- if lerr != nil {
- logger.Error(lerr)
- return false
- }
- return true
- })
- if err != nil {
- logger.Error(stacktrace.Propagate(err, ""))
- return
- }
- logger.Info("Cleared orphan objects")
- }
- func clearOrphanObjectsLockName(dc string) string {
- return fmt.Sprintf("clear-orphan-objects:%s", dc)
- }
- func clearOrphanObjectsNextLockUntil() int64 {
- return time.Microseconds() + clearOrphanObjectsMinimumJobInterval.Microseconds()
- }
- type CleanupOrphanObjectsDestination struct {
- DC string
- Client s3.S3
- Bucket *string
- // If true, this bucket has a compliance hold on objects that needs to be
- // removed first before they can be deleted.
- HasComplianceHold bool
- }
- // ObjectVersionOrDeleteMarker is an abstraction to allow us to reuse the same
- // code to delete both object versions and delete markers
- type ObjectVersionOrDeleteMarker struct {
- ObjectVersion *s3.ObjectVersion
- DeleteMarker *s3.DeleteMarkerEntry
- }
- func (od ObjectVersionOrDeleteMarker) GetKey() *string {
- if od.ObjectVersion != nil {
- return od.ObjectVersion.Key
- }
- return od.DeleteMarker.Key
- }
- func (od ObjectVersionOrDeleteMarker) GetLastModified() *stime.Time {
- if od.ObjectVersion != nil {
- return od.ObjectVersion.LastModified
- }
- return od.DeleteMarker.LastModified
- }
- func (od ObjectVersionOrDeleteMarker) GetVersionId() *string {
- if od.ObjectVersion != nil {
- return od.ObjectVersion.VersionId
- }
- return od.DeleteMarker.VersionId
- }
- func (c *ObjectCleanupController) clearOrphanObjectsPage(page *s3.ListObjectVersionsOutput, dest *CleanupOrphanObjectsDestination, logger *log.Entry) error {
- // MaxKeys is 1000. Until we can, break it into batches and create a
- // separate goroutine to process each batch.
- batchSize := 10
- versions := page.Versions
- nv := len(versions)
- deleteMarkers := page.DeleteMarkers
- nd := len(deleteMarkers)
- n := nv + nd
- logger.Infof("Processing page containing %d values (%d object versions, %d delete markers)", n, nv, nd)
- ods := make([]ObjectVersionOrDeleteMarker, n)
- for i := 0; i < nv; i++ {
- ods[i] = ObjectVersionOrDeleteMarker{ObjectVersion: versions[i]}
- }
- for i := 0; i < nd; i++ {
- ods[nv+i] = ObjectVersionOrDeleteMarker{DeleteMarker: deleteMarkers[i]}
- }
- var wg sync.WaitGroup
- for i := 0; i < n; i++ {
- end := i + batchSize
- if end > n {
- end = n
- }
- if i >= end {
- // Nothing left
- break
- }
- wg.Add(1)
- go func(i int, end int) {
- defer wg.Done()
- batch := ods[i:end]
- c.clearOrphanObjectsVersionOrDeleteMarkers(batch, dest, logger)
- }(i, end)
- i = end
- }
- wg.Wait()
- return nil
- }
- func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarkers(ods []ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
- for _, od := range ods {
- c.clearOrphanObjectsVersionOrDeleteMarker(od, dest, logger)
- }
- }
- func (c *ObjectCleanupController) clearOrphanObjectsVersionOrDeleteMarker(od ObjectVersionOrDeleteMarker, dest *CleanupOrphanObjectsDestination, logger *log.Entry) {
- if od.GetKey() == nil || od.GetLastModified() == nil {
- logger.Errorf("Ignoring object with missing fields: %v %v", od.GetKey(), od.GetLastModified())
- return
- }
- objectKey := *od.GetKey()
- lastModified := *od.GetLastModified()
- logger = logger.WithFields(log.Fields{
- "object_key": objectKey,
- "last_modified": lastModified,
- })
- exists, err := c.ObjectRepo.DoesObjectOrTempObjectExist(objectKey)
- if err != nil {
- logger.Error(stacktrace.Propagate(err, "Failed to determine if object already exists in DB"))
- return
- }
- if exists {
- return
- }
- // 2 days ago
- cutoff := stime.Now().AddDate(0, 0, -2)
- // As a safety check, ignore very recent objects from cleanup
- if lastModified.After(cutoff) {
- logger.Warnf("Ignoring too-recent orphan object since it was modified after %v", cutoff)
- return
- }
- logger.Infof("Found orphan object %v", od)
- if dest.HasComplianceHold {
- // Remove compliance hold.
- err := c.ObjectController.DisableObjectConditionalHold(&dest.Client, *dest.Bucket, objectKey)
- if err != nil {
- logger.Error(stacktrace.Propagate(err, "Failed to disable conditional hold on object"))
- return
- }
- // Add the object to the cleanup queue with an expiry time that is after
- // the compliance hold would've passed. Add 2 days of buffer too.
- expiryDays := s3config.WasabiObjectConditionalHoldDays + 2
- expiryTime := time.MicrosecondsAfterDays(expiryDays)
- c.addCleanupEntryForObjectKey(objectKey, dest.DC, expiryTime)
- logger.Infof("Disabled compliance hold and added an entry to cleanup orphan object after %v", expiryTime)
- } else {
- // Delete it right away.
- versionID := od.GetVersionId()
- logger.Infof("Deleting version '%s'", enteString.EmptyIfNil(versionID))
- err := c.DeleteObjectVersion(objectKey, versionID, dest)
- if err != nil {
- logger.Error(stacktrace.Propagate(err, "Failed to delete object"))
- }
- c.mOrphanObjectsDeleted.WithLabelValues(dest.DC).Inc()
- }
- }
- // DeleteObjectVersion can be used to delete objects from versioned buckets.
- //
- // If we delete an object in a versioning enabled bucket, deletion does not
- // actually remove the object and instead creates a delete marker:
- //
- // - When we delete a file, it creates a delete marker
- // - The delete marker becomes the latest version
- // - The old version of the file still remains
- //
- // If we explicitly pass a version ID in the delete call, then the delete marker
- // won't get created.
- //
- // > To delete versioned objects permanently, use `DELETE Object versionId`
- //
- // https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeletingObjectVersions.html
- func (c *ObjectCleanupController) DeleteObjectVersion(objectKey string, versionID *string, dest *CleanupOrphanObjectsDestination) error {
- _, err := dest.Client.DeleteObject(&s3.DeleteObjectInput{
- Bucket: dest.Bucket,
- Key: &objectKey,
- VersionId: versionID,
- })
- return stacktrace.Propagate(err, "")
- }
|