package controller import ( "database/sql" "encoding/base64" "errors" "fmt" "io" "net/http" "os" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/ente-io/museum/pkg/controller/discord" "github.com/ente-io/museum/pkg/repo" "github.com/ente-io/museum/pkg/utils/file" "github.com/ente-io/museum/pkg/utils/s3config" "github.com/ente-io/stacktrace" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "github.com/spf13/viper" ) // ReplicationController3 oversees version 3 of our object replication. // // The user's encrypted data starts off in 1 hot storage (Backblaze "b2"). This // controller then takes over and replicates it the other two replicas. It keeps // state in the object_copies table. // // Both v2 and v3 of object replication use the same hot storage (b2), but they // replicate to different buckets thereafter. // // The current implementation only works if the hot storage is b2. This is not // an inherent limitation, however the code has not yet been tested in other // scenarios, so there is a safety check preventing the replication from // happening if the current hot storage is not b2. type ReplicationController3 struct { S3Config *s3config.S3Config ObjectRepo *repo.ObjectRepository ObjectCopiesRepo *repo.ObjectCopiesRepository DiscordController *discord.DiscordController // URL of the Cloudflare worker to use for downloading the source object workerURL string // Base directory for temporary storage tempStorage string // Prometheus Metrics mUploadSuccess *prometheus.CounterVec mUploadFailure *prometheus.CounterVec // Cached S3 clients etc b2Client *s3.S3 b2Bucket *string wasabiDest *UploadDestination scwDest *UploadDestination } type UploadDestination struct { DC string Client *s3.S3 Uploader *s3manager.Uploader Bucket *string // The label to use for reporting metrics for uploads to this destination Label string // If true, we should ignore Wasabi 403 errors. See "Reuploads". HasComplianceHold bool // If true, the object is uploaded to the GLACIER class. IsGlacier bool } // StartReplication starts the background replication process. // // This method returns synchronously. ReplicationController3 will create // suitable number of goroutines to parallelize and perform the replication // asynchronously, as and when it notices new files that have not yet been // replicated (it does this by querying the object_copies table). func (c *ReplicationController3) StartReplication() error { // As a safety check, ensure that the current hot storage bucket is in b2. // This is because the replication v3 code has not yet been tested for other // scenarios (it'll likely work though, probably with minor modifications). hotDC := c.S3Config.GetHotDataCenter() if hotDC != c.S3Config.GetHotBackblazeDC() { return fmt.Errorf("v3 replication can currently only run when the primary hot data center is Backblaze. Instead, it was %s", hotDC) } workerURL := viper.GetString("replication.worker-url") if workerURL == "" { return fmt.Errorf("replication.worker-url was not defined") } c.workerURL = workerURL log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL) c.createMetrics() err := c.createTemporaryStorage() if err != nil { return err } c.createDestinations() workerCount := viper.GetInt("replication.worker-count") if workerCount == 0 { workerCount = 6 } go c.startWorkers(workerCount) return nil } func (c *ReplicationController3) startWorkers(n int) { log.Infof("Starting %d workers for replication v3", n) for i := 0; i < n; i++ { go c.replicate(i) // Stagger the workers time.Sleep(time.Duration(2*i+1) * time.Second) } } func (c *ReplicationController3) createMetrics() { c.mUploadSuccess = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "museum_replication_upload_success_total", Help: "Number of successful uploads during replication (each replica is counted separately)", }, []string{"destination"}) c.mUploadFailure = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "museum_replication_upload_failure_total", Help: "Number of failed uploads during replication (each replica is counted separately)", }, []string{"destination"}) } func (c *ReplicationController3) createTemporaryStorage() error { tempStorage := viper.GetString("replication.tmp-storage") if tempStorage == "" { tempStorage = "tmp/replication" } log.Infof("Temporary storage for replication v3 is: %s", tempStorage) err := file.DeleteAllFilesInDirectory(tempStorage) if err != nil { return stacktrace.Propagate(err, "Failed to deleting old files from %s", tempStorage) } err = file.MakeDirectoryIfNotExists(tempStorage) if err != nil { return stacktrace.Propagate(err, "Failed to create temporary storage %s", tempStorage) } c.tempStorage = tempStorage return nil } func (c *ReplicationController3) createDestinations() { // The s3manager.Uploader objects are safe for use concurrently. From the // AWS docs: // // > The Uploader structure that calls Upload(). It is safe to call Upload() // on this structure for multiple objects and across concurrent goroutines. // Mutating the Uploader's properties is not safe to be done concurrently. config := c.S3Config b2DC := config.GetHotBackblazeDC() b2Client := config.GetS3Client(b2DC) c.b2Client = &b2Client c.b2Bucket = config.GetBucket(b2DC) wasabiDC := config.GetHotWasabiDC() wasabiClient := config.GetS3Client(wasabiDC) c.wasabiDest = &UploadDestination{ DC: wasabiDC, Client: &wasabiClient, Uploader: s3manager.NewUploaderWithClient(&wasabiClient), Bucket: config.GetBucket(wasabiDC), Label: "wasabi", HasComplianceHold: config.WasabiComplianceDC() == wasabiDC, } scwDC := config.GetColdScalewayDC() scwClient := config.GetS3Client(scwDC) c.scwDest = &UploadDestination{ DC: scwDC, Client: &scwClient, Uploader: s3manager.NewUploaderWithClient(&scwClient), Bucket: config.GetBucket(scwDC), Label: "scaleway", // should be true, except when running in a local cluster (since minio doesn't // support specifying the GLACIER storage class). IsGlacier: !config.AreLocalBuckets(), } } // Entry point for the replication worker (goroutine) // // i is an arbitrary index of the current routine. func (c *ReplicationController3) replicate(i int) { // This is just // // while (true) { replicate() } // // but with an extra sleep for a bit if nothing got replicated - both when // something's wrong, or there's nothing to do. for { err := c.tryReplicate() if err != nil { // Sleep in proportion to the (arbitrary) index to space out the // workers further. time.Sleep(time.Duration(i+1) * time.Minute) } } } // Try to replicate an object. // // Return nil if something was replicated, otherwise return the error. // // A common and expected error is `sql.ErrNoRows`, which occurs if there are no // objects left to replicate currently. func (c *ReplicationController3) tryReplicate() error { // Fetch an object to replicate tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject() if err != nil { if !errors.Is(err, sql.ErrNoRows) { log.Errorf("Could not fetch an object to replicate: %s", err) } return stacktrace.Propagate(err, "") } objectKey := copies.ObjectKey logger := log.WithFields(log.Fields{ "task": "replication", "object_key": objectKey, }) commit := func(err error) error { // We don't rollback the transaction even in the case of errors, and // instead try to commit it after setting the last_attempt timestamp. // // This avoids the replication getting stuck in a loop trying (and // failing) to replicate the same object. The error would still need to // be resolved, but at least the replication would meanwhile move // forward, ignoring this row. if err != nil { logger.Error(err) } aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey) if aerr != nil { aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt") logger.Error(aerr) } cerr := tx.Commit() if cerr != nil { cerr = stacktrace.Propagate(err, "Failed to commit transaction") logger.Error(cerr) } if err == nil { err = aerr } if err == nil { err = cerr } if err == nil { logger.Info("Replication attempt succeeded") } else { logger.Info("Replication attempt failed") } return err } logger.Info("Replication attempt start") if copies.B2 == nil { err := errors.New("expected B2 copy to be in place before we start replication") return commit(stacktrace.Propagate(err, "Sanity check failed")) } if !copies.WantWasabi && !copies.WantSCW { err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate") return commit(stacktrace.Propagate(err, "Sanity check failed")) } ob, err := c.ObjectRepo.GetObjectState(tx, objectKey) if err != nil { return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status")) } if ob.IsFileDeleted || ob.IsUserDeleted { // Update the object_copies to mark this object as not requiring further // replication. The row in object_copies will get deleted when the next // scheduled object deletion runs. err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey) if err != nil { return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication")) } logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)", ob.IsFileDeleted, ob.IsUserDeleted) return commit(nil) } err = ensureSufficientSpace(ob.Size) if err != nil { // We don't have free space right now, maybe because other big files are // being downloaded simultanously, but we might get space later, so mark // a failed attempt that'll get retried later. // // Log this error though, so that it gets noticed if it happens too // frequently (the instance might need a bigger disk). return commit(stacktrace.Propagate(err, "")) } filePath, file, err := c.createTemporaryFile(objectKey) if err != nil { return commit(stacktrace.Propagate(err, "Failed to create temporary file")) } defer os.Remove(filePath) defer file.Close() size, err := c.downloadFromB2ViaWorker(objectKey, file, logger) if err != nil { return commit(stacktrace.Propagate(err, "Failed to download object from B2")) } logger.Infof("Downloaded %d bytes to %s", size, filePath) in := &UploadInput{ File: file, ObjectKey: objectKey, ExpectedSize: size, Logger: logger, } err = nil if copies.WantWasabi && copies.Wasabi == nil { werr := c.replicateFile(in, c.wasabiDest, func() error { return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey) }) err = werr } if copies.WantSCW && copies.SCW == nil { serr := c.replicateFile(in, c.scwDest, func() error { return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey) }) if err == nil { err = serr } } return commit(err) } // Return an error if we risk running out of disk space if we try to download // and write a file of size. // // This function keeps a buffer of 1 GB free space in its calculations. func ensureSufficientSpace(size int64) error { free, err := file.FreeSpace("/") if err != nil { return stacktrace.Propagate(err, "Failed to fetch free space") } gb := uint64(1024) * 1024 * 1024 need := uint64(size) + (2 * gb) if free < need { return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free) } return nil } // Create a temporary file for storing objectKey. Return both the path to the // file, and the handle to the file. // // The caller must Close() the returned file if it is not nil. func (c *ReplicationController3) createTemporaryFile(objectKey string) (string, *os.File, error) { fileName := strings.ReplaceAll(objectKey, "/", "_") filePath := c.tempStorage + "/" + fileName f, err := os.Create(filePath) if err != nil { return "", nil, stacktrace.Propagate(err, "Could not create temporary file at '%s' to download object", filePath) } return filePath, f, nil } // Download the object for objectKey from B2 hot storage, writing it into file. // // Return the size of the downloaded file. func (c *ReplicationController3) downloadFromB2ViaWorker(objectKey string, file *os.File, logger *log.Entry) (int64, error) { presignedURL, err := c.getPresignedB2URL(objectKey) if err != nil { return 0, stacktrace.Propagate(err, "Could not create create presigned URL for downloading object") } presignedEncodedURL := base64.StdEncoding.EncodeToString([]byte(presignedURL)) client := &http.Client{} request, err := http.NewRequest("GET", c.workerURL, nil) if err != nil { return 0, stacktrace.Propagate(err, "Could not create request for worker %s", c.workerURL) } q := request.URL.Query() q.Add("src", presignedEncodedURL) request.URL.RawQuery = q.Encode() if c.S3Config.AreLocalBuckets() { originalURL := request.URL request, err = http.NewRequest("GET", presignedURL, nil) if err != nil { return 0, stacktrace.Propagate(err, "Could not create request for URL %s", presignedURL) } logger.Infof("Bypassing workerURL %s and instead directly GETting %s", originalURL, presignedURL) } response, err := client.Do(request) if err != nil { return 0, stacktrace.Propagate(err, "Call to CF worker failed for object %s", objectKey) } defer response.Body.Close() if response.StatusCode != http.StatusOK { if response.StatusCode == http.StatusNotFound { c.notifyDiscord("🔥 Could not find object in HotStorage: " + objectKey) } err = fmt.Errorf("CF Worker GET for object %s failed with HTTP status %s", objectKey, response.Status) return 0, stacktrace.Propagate(err, "") } n, err := io.Copy(file, response.Body) if err != nil { return 0, stacktrace.Propagate(err, "Failed to write HTTP response to file") } return n, nil } // Get a presigned URL to download the object with objectKey from the B2 bucket. func (c *ReplicationController3) getPresignedB2URL(objectKey string) (string, error) { r, _ := c.b2Client.GetObjectRequest(&s3.GetObjectInput{ Bucket: c.b2Bucket, Key: &objectKey, }) return r.Presign(PreSignedRequestValidityDuration) } func (c *ReplicationController3) notifyDiscord(message string) { c.DiscordController.Notify(message) } type UploadInput struct { File *os.File ObjectKey string ExpectedSize int64 Logger *log.Entry } // Upload, verify and then update the DB to mark replication to dest. func (c *ReplicationController3) replicateFile(in *UploadInput, dest *UploadDestination, dbUpdateCopies func() error) error { logger := in.Logger.WithFields(log.Fields{ "destination": dest.Label, "bucket": *dest.Bucket, }) failure := func(err error) error { c.mUploadFailure.WithLabelValues(dest.Label).Inc() logger.Error(err) return err } err := c.uploadFile(in, dest) if err != nil { return failure(stacktrace.Propagate(err, "Failed to upload object")) } err = c.verifyUploadedFileSize(in, dest) if err != nil { return failure(stacktrace.Propagate(err, "Failed to verify upload")) } // The update of the object_keys is not done in the transaction where the // other updates to object_copies table are made. This is so that the // object_keys table (which is what'll be used to delete objects) is // (almost) always updated if the file gets uploaded successfully. // // The only time the update wouldn't happen is if museum gets restarted // between the successful completion of the upload to the bucket and this // query getting executed. // // While possible, that is a much smaller window as compared to the // transaction for updating object_copies, which could easily span minutes // as the transaction ends only after the object has been uploaded to all // replicas. rowsAffected, err := c.ObjectRepo.MarkObjectReplicated(in.ObjectKey, dest.DC) if err != nil { return failure(stacktrace.Propagate(err, "Failed to update object_keys to mark replication as completed")) } if rowsAffected != 1 { // It is possible that this row was updated earlier, after an upload // that got completed but before object_copies table could be updated in // the transaction (See "Reuploads"). // // So do not treat this as an error. logger.Warnf("Expected 1 row to be updated, but got %d", rowsAffected) } err = dbUpdateCopies() if err != nil { return failure(stacktrace.Propagate(err, "Failed to update object_copies to mark replication as complete")) } c.mUploadSuccess.WithLabelValues(dest.Label).Inc() return nil } // Upload the given file to using uploader to the given bucket. // // # Reuploads // // It is possible that the object might already exist on remote. The known // scenario where this might happen is if museum gets restarted after having // completed the upload but before it got around to modifying the DB. // // The behaviour in this case is remote dependent. // // - Uploading an object with the same key on Scaleway would work normally. // // - But trying to add an object with the same key on the compliance locked // Wasabi would return an HTTP 403. // // We intercept the Wasabi 403 in this case and move ahead. The subsequent // object verification using the HEAD request will act as a sanity check for // the object. func (c *ReplicationController3) uploadFile(in *UploadInput, dest *UploadDestination) error { // Rewind the file pointer back to the start for the next upload. in.File.Seek(0, io.SeekStart) up := s3manager.UploadInput{ Bucket: dest.Bucket, Key: &in.ObjectKey, Body: in.File, } if dest.IsGlacier { up.StorageClass = aws.String(s3.ObjectStorageClassGlacier) } result, err := dest.Uploader.Upload(&up) if err != nil && dest.HasComplianceHold && c.isRequestFailureAccessDenied(err) { in.Logger.Infof("Ignoring object that already exists on remote (we'll verify it using a HEAD check): %s", err) return nil } if err != nil { return stacktrace.Propagate(err, "Upload to bucket %s failed", *dest.Bucket) } in.Logger.Infof("Uploaded to bucket %s: %s", *dest.Bucket, result.Location) return nil } // Return true if the given error is because of an HTTP 403. // // See "Reuploads" for the scenario where these errors can arise. // // Specifically, this in an example of the HTTP 403 response we get when // trying to add an object to a Wasabi bucket that already has a compliance // locked object with the same key. // // HTTP/1.1 403 Forbidden // Content-Type: application/xml // Date: Tue, 20 Dec 2022 10:23:33 GMT // Server: WasabiS3/7.10.1193-2022-11-23-84c72037e8 (head2) // // // // AccessDenied // Access Denied // yyy // zzz // // // Printing the error type and details produces this: // // type: *s3err.RequestFailure // AccessDenied: Access Denied // status code: 403, request id: yyy, host id: zzz func (c *ReplicationController3) isRequestFailureAccessDenied(err error) bool { if reqerr, ok := err.(s3.RequestFailure); ok { if reqerr.Code() == "AccessDenied" { return true } } return false } // Verify the uploaded file by doing a HEAD check and comparing sizes func (c *ReplicationController3) verifyUploadedFileSize(in *UploadInput, dest *UploadDestination) error { res, err := dest.Client.HeadObject(&s3.HeadObjectInput{ Bucket: dest.Bucket, Key: &in.ObjectKey, }) if err != nil { return stacktrace.Propagate(err, "Fetching object info from bucket %s failed", *dest.Bucket) } if *res.ContentLength != in.ExpectedSize { err = fmt.Errorf("size of the uploaded file (%d) does not match the expected size (%d) in bucket %s", *res.ContentLength, in.ExpectedSize, *dest.Bucket) c.notifyDiscord(fmt.Sprint(err)) return stacktrace.Propagate(err, "") } return nil }