123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625 |
- package controller
- import (
- "database/sql"
- "encoding/base64"
- "errors"
- "fmt"
- "io"
- "net/http"
- "os"
- "strings"
- "time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/s3/s3manager"
- "github.com/ente-io/museum/pkg/controller/discord"
- "github.com/ente-io/museum/pkg/repo"
- "github.com/ente-io/museum/pkg/utils/file"
- "github.com/ente-io/museum/pkg/utils/s3config"
- "github.com/ente-io/stacktrace"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- log "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- )
- // ReplicationController3 oversees version 3 of our object replication.
- //
- // The user's encrypted data starts off in 1 hot storage (Backblaze "b2"). This
- // controller then takes over and replicates it the other two replicas. It keeps
- // state in the object_copies table.
- //
- // Both v2 and v3 of object replication use the same hot storage (b2), but they
- // replicate to different buckets thereafter.
- //
- // The current implementation only works if the hot storage is b2. This is not
- // an inherent limitation, however the code has not yet been tested in other
- // scenarios, so there is a safety check preventing the replication from
- // happening if the current hot storage is not b2.
- type ReplicationController3 struct {
- S3Config *s3config.S3Config
- ObjectRepo *repo.ObjectRepository
- ObjectCopiesRepo *repo.ObjectCopiesRepository
- DiscordController *discord.DiscordController
- // URL of the Cloudflare worker to use for downloading the source object
- workerURL string
- // Base directory for temporary storage
- tempStorage string
- // Prometheus Metrics
- mUploadSuccess *prometheus.CounterVec
- mUploadFailure *prometheus.CounterVec
- // Cached S3 clients etc
- b2Client *s3.S3
- b2Bucket *string
- wasabiDest *UploadDestination
- scwDest *UploadDestination
- }
- type UploadDestination struct {
- DC string
- Client *s3.S3
- Uploader *s3manager.Uploader
- Bucket *string
- // The label to use for reporting metrics for uploads to this destination
- Label string
- // If true, we should ignore Wasabi 403 errors. See "Reuploads".
- HasComplianceHold bool
- // If true, the object is uploaded to the GLACIER class.
- IsGlacier bool
- }
- // StartReplication starts the background replication process.
- //
- // This method returns synchronously. ReplicationController3 will create
- // suitable number of goroutines to parallelize and perform the replication
- // asynchronously, as and when it notices new files that have not yet been
- // replicated (it does this by querying the object_copies table).
- func (c *ReplicationController3) StartReplication() error {
- // As a safety check, ensure that the current hot storage bucket is in b2.
- // This is because the replication v3 code has not yet been tested for other
- // scenarios (it'll likely work though, probably with minor modifications).
- hotDC := c.S3Config.GetHotDataCenter()
- if hotDC != c.S3Config.GetHotBackblazeDC() {
- return fmt.Errorf("v3 replication can currently only run when the primary hot data center is Backblaze. Instead, it was %s", hotDC)
- }
- workerURL := viper.GetString("replication.worker-url")
- if workerURL == "" {
- return fmt.Errorf("replication.worker-url was not defined")
- }
- c.workerURL = workerURL
- log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL)
- c.createMetrics()
- err := c.createTemporaryStorage()
- if err != nil {
- return err
- }
- c.createDestinations()
- workerCount := viper.GetInt("replication.worker-count")
- if workerCount == 0 {
- workerCount = 6
- }
- go c.startWorkers(workerCount)
- return nil
- }
- func (c *ReplicationController3) startWorkers(n int) {
- log.Infof("Starting %d workers for replication v3", n)
- for i := 0; i < n; i++ {
- go c.replicate(i)
- // Stagger the workers
- time.Sleep(time.Duration(2*i+1) * time.Second)
- }
- }
- func (c *ReplicationController3) createMetrics() {
- c.mUploadSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
- Name: "museum_replication_upload_success_total",
- Help: "Number of successful uploads during replication (each replica is counted separately)",
- }, []string{"destination"})
- c.mUploadFailure = promauto.NewCounterVec(prometheus.CounterOpts{
- Name: "museum_replication_upload_failure_total",
- Help: "Number of failed uploads during replication (each replica is counted separately)",
- }, []string{"destination"})
- }
- func (c *ReplicationController3) createTemporaryStorage() error {
- tempStorage := viper.GetString("replication.tmp-storage")
- if tempStorage == "" {
- tempStorage = "tmp/replication"
- }
- log.Infof("Temporary storage for replication v3 is: %s", tempStorage)
- err := file.DeleteAllFilesInDirectory(tempStorage)
- if err != nil {
- return stacktrace.Propagate(err, "Failed to deleting old files from %s", tempStorage)
- }
- err = file.MakeDirectoryIfNotExists(tempStorage)
- if err != nil {
- return stacktrace.Propagate(err, "Failed to create temporary storage %s", tempStorage)
- }
- c.tempStorage = tempStorage
- return nil
- }
- func (c *ReplicationController3) createDestinations() {
- // The s3manager.Uploader objects are safe for use concurrently. From the
- // AWS docs:
- //
- // > The Uploader structure that calls Upload(). It is safe to call Upload()
- // on this structure for multiple objects and across concurrent goroutines.
- // Mutating the Uploader's properties is not safe to be done concurrently.
- config := c.S3Config
- b2DC := config.GetHotBackblazeDC()
- b2Client := config.GetS3Client(b2DC)
- c.b2Client = &b2Client
- c.b2Bucket = config.GetBucket(b2DC)
- wasabiDC := config.GetHotWasabiDC()
- wasabiClient := config.GetS3Client(wasabiDC)
- c.wasabiDest = &UploadDestination{
- DC: wasabiDC,
- Client: &wasabiClient,
- Uploader: s3manager.NewUploaderWithClient(&wasabiClient),
- Bucket: config.GetBucket(wasabiDC),
- Label: "wasabi",
- HasComplianceHold: config.WasabiComplianceDC() == wasabiDC,
- }
- scwDC := config.GetColdScalewayDC()
- scwClient := config.GetS3Client(scwDC)
- c.scwDest = &UploadDestination{
- DC: scwDC,
- Client: &scwClient,
- Uploader: s3manager.NewUploaderWithClient(&scwClient),
- Bucket: config.GetBucket(scwDC),
- Label: "scaleway",
- // should be true, except when running in a local cluster (since minio doesn't
- // support specifying the GLACIER storage class).
- IsGlacier: !config.AreLocalBuckets(),
- }
- }
- // Entry point for the replication worker (goroutine)
- //
- // i is an arbitrary index of the current routine.
- func (c *ReplicationController3) replicate(i int) {
- // This is just
- //
- // while (true) { replicate() }
- //
- // but with an extra sleep for a bit if nothing got replicated - both when
- // something's wrong, or there's nothing to do.
- for {
- err := c.tryReplicate()
- if err != nil {
- // Sleep in proportion to the (arbitrary) index to space out the
- // workers further.
- time.Sleep(time.Duration(i+1) * time.Minute)
- }
- }
- }
- // Try to replicate an object.
- //
- // Return nil if something was replicated, otherwise return the error.
- //
- // A common and expected error is `sql.ErrNoRows`, which occurs if there are no
- // objects left to replicate currently.
- func (c *ReplicationController3) tryReplicate() error {
- // Fetch an object to replicate
- tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
- if err != nil {
- if !errors.Is(err, sql.ErrNoRows) {
- log.Errorf("Could not fetch an object to replicate: %s", err)
- }
- return stacktrace.Propagate(err, "")
- }
- objectKey := copies.ObjectKey
- logger := log.WithFields(log.Fields{
- "task": "replication",
- "object_key": objectKey,
- })
- commit := func(err error) error {
- // We don't rollback the transaction even in the case of errors, and
- // instead try to commit it after setting the last_attempt timestamp.
- //
- // This avoids the replication getting stuck in a loop trying (and
- // failing) to replicate the same object. The error would still need to
- // be resolved, but at least the replication would meanwhile move
- // forward, ignoring this row.
- if err != nil {
- logger.Error(err)
- }
- aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey)
- if aerr != nil {
- aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt")
- logger.Error(aerr)
- }
- cerr := tx.Commit()
- if cerr != nil {
- cerr = stacktrace.Propagate(err, "Failed to commit transaction")
- logger.Error(cerr)
- }
- if err == nil {
- err = aerr
- }
- if err == nil {
- err = cerr
- }
- if err == nil {
- logger.Info("Replication attempt succeeded")
- } else {
- logger.Info("Replication attempt failed")
- }
- return err
- }
- logger.Info("Replication attempt start")
- if copies.B2 == nil {
- err := errors.New("expected B2 copy to be in place before we start replication")
- return commit(stacktrace.Propagate(err, "Sanity check failed"))
- }
- if !copies.WantWasabi && !copies.WantSCW {
- err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate")
- return commit(stacktrace.Propagate(err, "Sanity check failed"))
- }
- ob, err := c.ObjectRepo.GetObjectState(tx, objectKey)
- if err != nil {
- return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
- }
- if ob.IsFileDeleted || ob.IsUserDeleted {
- // Update the object_copies to mark this object as not requiring further
- // replication. The row in object_copies will get deleted when the next
- // scheduled object deletion runs.
- err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey)
- if err != nil {
- return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
- }
- logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)",
- ob.IsFileDeleted, ob.IsUserDeleted)
- return commit(nil)
- }
- err = ensureSufficientSpace(ob.Size)
- if err != nil {
- // We don't have free space right now, maybe because other big files are
- // being downloaded simultanously, but we might get space later, so mark
- // a failed attempt that'll get retried later.
- //
- // Log this error though, so that it gets noticed if it happens too
- // frequently (the instance might need a bigger disk).
- return commit(stacktrace.Propagate(err, ""))
- }
- filePath, file, err := c.createTemporaryFile(objectKey)
- if err != nil {
- return commit(stacktrace.Propagate(err, "Failed to create temporary file"))
- }
- defer os.Remove(filePath)
- defer file.Close()
- size, err := c.downloadFromB2ViaWorker(objectKey, file, logger)
- if err != nil {
- return commit(stacktrace.Propagate(err, "Failed to download object from B2"))
- }
- logger.Infof("Downloaded %d bytes to %s", size, filePath)
- in := &UploadInput{
- File: file,
- ObjectKey: objectKey,
- ExpectedSize: size,
- Logger: logger,
- }
- err = nil
- if copies.WantWasabi && copies.Wasabi == nil {
- werr := c.replicateFile(in, c.wasabiDest, func() error {
- return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey)
- })
- err = werr
- }
- if copies.WantSCW && copies.SCW == nil {
- serr := c.replicateFile(in, c.scwDest, func() error {
- return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey)
- })
- if err == nil {
- err = serr
- }
- }
- return commit(err)
- }
- // Return an error if we risk running out of disk space if we try to download
- // and write a file of size.
- //
- // This function keeps a buffer of 1 GB free space in its calculations.
- func ensureSufficientSpace(size int64) error {
- free, err := file.FreeSpace("/")
- if err != nil {
- return stacktrace.Propagate(err, "Failed to fetch free space")
- }
- gb := uint64(1024) * 1024 * 1024
- need := uint64(size) + (2 * gb)
- if free < need {
- return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free)
- }
- return nil
- }
- // Create a temporary file for storing objectKey. Return both the path to the
- // file, and the handle to the file.
- //
- // The caller must Close() the returned file if it is not nil.
- func (c *ReplicationController3) createTemporaryFile(objectKey string) (string, *os.File, error) {
- fileName := strings.ReplaceAll(objectKey, "/", "_")
- filePath := c.tempStorage + "/" + fileName
- f, err := os.Create(filePath)
- if err != nil {
- return "", nil, stacktrace.Propagate(err, "Could not create temporary file at '%s' to download object", filePath)
- }
- return filePath, f, nil
- }
- // Download the object for objectKey from B2 hot storage, writing it into file.
- //
- // Return the size of the downloaded file.
- func (c *ReplicationController3) downloadFromB2ViaWorker(objectKey string, file *os.File, logger *log.Entry) (int64, error) {
- presignedURL, err := c.getPresignedB2URL(objectKey)
- if err != nil {
- return 0, stacktrace.Propagate(err, "Could not create create presigned URL for downloading object")
- }
- presignedEncodedURL := base64.StdEncoding.EncodeToString([]byte(presignedURL))
- client := &http.Client{}
- request, err := http.NewRequest("GET", c.workerURL, nil)
- if err != nil {
- return 0, stacktrace.Propagate(err, "Could not create request for worker %s", c.workerURL)
- }
- q := request.URL.Query()
- q.Add("src", presignedEncodedURL)
- request.URL.RawQuery = q.Encode()
- if c.S3Config.AreLocalBuckets() {
- originalURL := request.URL
- request, err = http.NewRequest("GET", presignedURL, nil)
- if err != nil {
- return 0, stacktrace.Propagate(err, "Could not create request for URL %s", presignedURL)
- }
- logger.Infof("Bypassing workerURL %s and instead directly GETting %s", originalURL, presignedURL)
- }
- response, err := client.Do(request)
- if err != nil {
- return 0, stacktrace.Propagate(err, "Call to CF worker failed for object %s", objectKey)
- }
- defer response.Body.Close()
- if response.StatusCode != http.StatusOK {
- if response.StatusCode == http.StatusNotFound {
- c.notifyDiscord("🔥 Could not find object in HotStorage: " + objectKey)
- }
- err = fmt.Errorf("CF Worker GET for object %s failed with HTTP status %s", objectKey, response.Status)
- return 0, stacktrace.Propagate(err, "")
- }
- n, err := io.Copy(file, response.Body)
- if err != nil {
- return 0, stacktrace.Propagate(err, "Failed to write HTTP response to file")
- }
- return n, nil
- }
- // Get a presigned URL to download the object with objectKey from the B2 bucket.
- func (c *ReplicationController3) getPresignedB2URL(objectKey string) (string, error) {
- r, _ := c.b2Client.GetObjectRequest(&s3.GetObjectInput{
- Bucket: c.b2Bucket,
- Key: &objectKey,
- })
- return r.Presign(PreSignedRequestValidityDuration)
- }
- func (c *ReplicationController3) notifyDiscord(message string) {
- c.DiscordController.Notify(message)
- }
- type UploadInput struct {
- File *os.File
- ObjectKey string
- ExpectedSize int64
- Logger *log.Entry
- }
- // Upload, verify and then update the DB to mark replication to dest.
- func (c *ReplicationController3) replicateFile(in *UploadInput, dest *UploadDestination, dbUpdateCopies func() error) error {
- logger := in.Logger.WithFields(log.Fields{
- "destination": dest.Label,
- "bucket": *dest.Bucket,
- })
- failure := func(err error) error {
- c.mUploadFailure.WithLabelValues(dest.Label).Inc()
- logger.Error(err)
- return err
- }
- err := c.uploadFile(in, dest)
- if err != nil {
- return failure(stacktrace.Propagate(err, "Failed to upload object"))
- }
- err = c.verifyUploadedFileSize(in, dest)
- if err != nil {
- return failure(stacktrace.Propagate(err, "Failed to verify upload"))
- }
- // The update of the object_keys is not done in the transaction where the
- // other updates to object_copies table are made. This is so that the
- // object_keys table (which is what'll be used to delete objects) is
- // (almost) always updated if the file gets uploaded successfully.
- //
- // The only time the update wouldn't happen is if museum gets restarted
- // between the successful completion of the upload to the bucket and this
- // query getting executed.
- //
- // While possible, that is a much smaller window as compared to the
- // transaction for updating object_copies, which could easily span minutes
- // as the transaction ends only after the object has been uploaded to all
- // replicas.
- rowsAffected, err := c.ObjectRepo.MarkObjectReplicated(in.ObjectKey, dest.DC)
- if err != nil {
- return failure(stacktrace.Propagate(err, "Failed to update object_keys to mark replication as completed"))
- }
- if rowsAffected != 1 {
- // It is possible that this row was updated earlier, after an upload
- // that got completed but before object_copies table could be updated in
- // the transaction (See "Reuploads").
- //
- // So do not treat this as an error.
- logger.Warnf("Expected 1 row to be updated, but got %d", rowsAffected)
- }
- err = dbUpdateCopies()
- if err != nil {
- return failure(stacktrace.Propagate(err, "Failed to update object_copies to mark replication as complete"))
- }
- c.mUploadSuccess.WithLabelValues(dest.Label).Inc()
- return nil
- }
- // Upload the given file to using uploader to the given bucket.
- //
- // # Reuploads
- //
- // It is possible that the object might already exist on remote. The known
- // scenario where this might happen is if museum gets restarted after having
- // completed the upload but before it got around to modifying the DB.
- //
- // The behaviour in this case is remote dependent.
- //
- // - Uploading an object with the same key on Scaleway would work normally.
- //
- // - But trying to add an object with the same key on the compliance locked
- // Wasabi would return an HTTP 403.
- //
- // We intercept the Wasabi 403 in this case and move ahead. The subsequent
- // object verification using the HEAD request will act as a sanity check for
- // the object.
- func (c *ReplicationController3) uploadFile(in *UploadInput, dest *UploadDestination) error {
- // Rewind the file pointer back to the start for the next upload.
- in.File.Seek(0, io.SeekStart)
- up := s3manager.UploadInput{
- Bucket: dest.Bucket,
- Key: &in.ObjectKey,
- Body: in.File,
- }
- if dest.IsGlacier {
- up.StorageClass = aws.String(s3.ObjectStorageClassGlacier)
- }
- result, err := dest.Uploader.Upload(&up)
- if err != nil && dest.HasComplianceHold && c.isRequestFailureAccessDenied(err) {
- in.Logger.Infof("Ignoring object that already exists on remote (we'll verify it using a HEAD check): %s", err)
- return nil
- }
- if err != nil {
- return stacktrace.Propagate(err, "Upload to bucket %s failed", *dest.Bucket)
- }
- in.Logger.Infof("Uploaded to bucket %s: %s", *dest.Bucket, result.Location)
- return nil
- }
- // Return true if the given error is because of an HTTP 403.
- //
- // See "Reuploads" for the scenario where these errors can arise.
- //
- // Specifically, this in an example of the HTTP 403 response we get when
- // trying to add an object to a Wasabi bucket that already has a compliance
- // locked object with the same key.
- //
- // HTTP/1.1 403 Forbidden
- // Content-Type: application/xml
- // Date: Tue, 20 Dec 2022 10:23:33 GMT
- // Server: WasabiS3/7.10.1193-2022-11-23-84c72037e8 (head2)
- //
- // <?xml version="1.0" encoding="UTF-8"?>
- // <Error>
- // <Code>AccessDenied</Code>
- // <Message>Access Denied</Message>
- // <RequestId>yyy</RequestId>
- // <HostId>zzz</HostId>
- // </Error>
- //
- // Printing the error type and details produces this:
- //
- // type: *s3err.RequestFailure
- // AccessDenied: Access Denied
- // status code: 403, request id: yyy, host id: zzz
- func (c *ReplicationController3) isRequestFailureAccessDenied(err error) bool {
- if reqerr, ok := err.(s3.RequestFailure); ok {
- if reqerr.Code() == "AccessDenied" {
- return true
- }
- }
- return false
- }
- // Verify the uploaded file by doing a HEAD check and comparing sizes
- func (c *ReplicationController3) verifyUploadedFileSize(in *UploadInput, dest *UploadDestination) error {
- res, err := dest.Client.HeadObject(&s3.HeadObjectInput{
- Bucket: dest.Bucket,
- Key: &in.ObjectKey,
- })
- if err != nil {
- return stacktrace.Propagate(err, "Fetching object info from bucket %s failed", *dest.Bucket)
- }
- if *res.ContentLength != in.ExpectedSize {
- err = fmt.Errorf("size of the uploaded file (%d) does not match the expected size (%d) in bucket %s",
- *res.ContentLength, in.ExpectedSize, *dest.Bucket)
- c.notifyDiscord(fmt.Sprint(err))
- return stacktrace.Propagate(err, "")
- }
- return nil
- }
|