package controller

import (
	"database/sql"
	"encoding/base64"
	"errors"
	"fmt"
	"io"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"github.com/ente-io/museum/pkg/controller/discord"
	"github.com/ente-io/museum/pkg/repo"
	"github.com/ente-io/museum/pkg/utils/file"
	"github.com/ente-io/museum/pkg/utils/s3config"
	"github.com/ente-io/stacktrace"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	log "github.com/sirupsen/logrus"
	"github.com/spf13/viper"
)

// ReplicationController3 oversees version 3 of our object replication.
//
// The user's encrypted data starts off in 1 hot storage (Backblaze "b2"). This
// controller then takes over and replicates it the other two replicas. It keeps
// state in the object_copies table.
//
// Both v2 and v3 of object replication use the same hot storage (b2), but they
// replicate to different buckets thereafter.
//
// The current implementation only works if the hot storage is b2. This is not
// an inherent limitation, however the code has not yet been tested in other
// scenarios, so there is a safety check preventing the replication from
// happening if the current hot storage is not b2.
type ReplicationController3 struct {
	S3Config          *s3config.S3Config
	ObjectRepo        *repo.ObjectRepository
	ObjectCopiesRepo  *repo.ObjectCopiesRepository
	DiscordController *discord.DiscordController
	// URL of the Cloudflare worker to use for downloading the source object
	workerURL string
	// Base directory for temporary storage
	tempStorage string
	// Prometheus Metrics
	mUploadSuccess *prometheus.CounterVec
	mUploadFailure *prometheus.CounterVec
	// Cached S3 clients etc
	b2Client   *s3.S3
	b2Bucket   *string
	wasabiDest *UploadDestination
	scwDest    *UploadDestination
}

type UploadDestination struct {
	DC       string
	Client   *s3.S3
	Uploader *s3manager.Uploader
	Bucket   *string
	// The label to use for reporting metrics for uploads to this destination
	Label string
	// If true, we should ignore Wasabi 403 errors. See "Reuploads".
	HasComplianceHold bool
	// If true, the object is uploaded to the GLACIER class.
	IsGlacier bool
}

// StartReplication starts the background replication process.
//
// This method returns synchronously. ReplicationController3 will create
// suitable number of goroutines to parallelize and perform the replication
// asynchronously, as and when it notices new files that have not yet been
// replicated (it does this by querying the object_copies table).
func (c *ReplicationController3) StartReplication() error {
	// As a safety check, ensure that the current hot storage bucket is in b2.
	// This is because the replication v3 code has not yet been tested for other
	// scenarios (it'll likely work though, probably with minor modifications).
	hotDC := c.S3Config.GetHotDataCenter()
	if hotDC != c.S3Config.GetHotBackblazeDC() {
		return fmt.Errorf("v3 replication can currently only run when the primary hot data center is Backblaze. Instead, it was %s", hotDC)
	}

	workerURL := viper.GetString("replication.worker-url")
	if workerURL == "" {
		return fmt.Errorf("replication.worker-url was not defined")
	}
	c.workerURL = workerURL
	log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL)

	c.createMetrics()
	err := c.createTemporaryStorage()
	if err != nil {
		return err
	}
	c.createDestinations()

	workerCount := viper.GetInt("replication.worker-count")
	if workerCount == 0 {
		workerCount = 6
	}

	go c.startWorkers(workerCount)

	return nil
}

func (c *ReplicationController3) startWorkers(n int) {
	log.Infof("Starting %d workers for replication v3", n)

	for i := 0; i < n; i++ {
		go c.replicate(i)
		// Stagger the workers
		time.Sleep(time.Duration(2*i+1) * time.Second)
	}
}

func (c *ReplicationController3) createMetrics() {
	c.mUploadSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "museum_replication_upload_success_total",
		Help: "Number of successful uploads during replication (each replica is counted separately)",
	}, []string{"destination"})
	c.mUploadFailure = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "museum_replication_upload_failure_total",
		Help: "Number of failed uploads during replication (each replica is counted separately)",
	}, []string{"destination"})
}

func (c *ReplicationController3) createTemporaryStorage() error {
	tempStorage := viper.GetString("replication.tmp-storage")
	if tempStorage == "" {
		tempStorage = "tmp/replication"
	}

	log.Infof("Temporary storage for replication v3 is: %s", tempStorage)

	err := file.DeleteAllFilesInDirectory(tempStorage)
	if err != nil {
		return stacktrace.Propagate(err, "Failed to deleting old files from %s", tempStorage)
	}

	err = file.MakeDirectoryIfNotExists(tempStorage)
	if err != nil {
		return stacktrace.Propagate(err, "Failed to create temporary storage %s", tempStorage)
	}

	c.tempStorage = tempStorage

	return nil
}

func (c *ReplicationController3) createDestinations() {
	// The s3manager.Uploader objects are safe for use concurrently. From the
	// AWS docs:
	//
	// > The Uploader structure that calls Upload(). It is safe to call Upload()
	//   on this structure for multiple objects and across concurrent goroutines.
	//   Mutating the Uploader's properties is not safe to be done concurrently.

	config := c.S3Config

	b2DC := config.GetHotBackblazeDC()
	b2Client := config.GetS3Client(b2DC)
	c.b2Client = &b2Client
	c.b2Bucket = config.GetBucket(b2DC)

	wasabiDC := config.GetHotWasabiDC()
	wasabiClient := config.GetS3Client(wasabiDC)
	c.wasabiDest = &UploadDestination{
		DC:                wasabiDC,
		Client:            &wasabiClient,
		Uploader:          s3manager.NewUploaderWithClient(&wasabiClient),
		Bucket:            config.GetBucket(wasabiDC),
		Label:             "wasabi",
		HasComplianceHold: config.WasabiComplianceDC() == wasabiDC,
	}

	scwDC := config.GetColdScalewayDC()
	scwClient := config.GetS3Client(scwDC)
	c.scwDest = &UploadDestination{
		DC:       scwDC,
		Client:   &scwClient,
		Uploader: s3manager.NewUploaderWithClient(&scwClient),
		Bucket:   config.GetBucket(scwDC),
		Label:    "scaleway",
		// should be true, except when running in a local cluster (since minio doesn't
		// support specifying the GLACIER storage class).
		IsGlacier: !config.AreLocalBuckets(),
	}
}

// Entry point for the replication worker (goroutine)
//
// i is an arbitrary index of the current routine.
func (c *ReplicationController3) replicate(i int) {
	// This is just
	//
	//    while (true) { replicate() }
	//
	// but with an extra sleep for a bit if nothing got replicated - both when
	// something's wrong, or there's nothing to do.
	for {
		err := c.tryReplicate()
		if err != nil {
			// Sleep in proportion to the (arbitrary) index to space out the
			// workers further.
			time.Sleep(time.Duration(i+1) * time.Minute)
		}
	}
}

// Try to replicate an object.
//
// Return nil if something was replicated, otherwise return the error.
//
// A common and expected error is `sql.ErrNoRows`, which occurs if there are no
// objects left to replicate currently.
func (c *ReplicationController3) tryReplicate() error {
	// Fetch an object to replicate
	tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
	if err != nil {
		if !errors.Is(err, sql.ErrNoRows) {
			log.Errorf("Could not fetch an object to replicate: %s", err)
		}
		return stacktrace.Propagate(err, "")
	}

	objectKey := copies.ObjectKey

	logger := log.WithFields(log.Fields{
		"task":       "replication",
		"object_key": objectKey,
	})

	commit := func(err error) error {
		// We don't rollback the transaction even in the case of errors, and
		// instead try to commit it after setting the last_attempt timestamp.
		//
		// This avoids the replication getting stuck in a loop trying (and
		// failing) to replicate the same object. The error would still need to
		// be resolved, but at least the replication would meanwhile move
		// forward, ignoring this row.

		if err != nil {
			logger.Error(err)
		}

		aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey)
		if aerr != nil {
			aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt")
			logger.Error(aerr)
		}

		cerr := tx.Commit()
		if cerr != nil {
			cerr = stacktrace.Propagate(err, "Failed to commit transaction")
			logger.Error(cerr)
		}

		if err == nil {
			err = aerr
		}
		if err == nil {
			err = cerr
		}

		if err == nil {
			logger.Info("Replication attempt succeeded")
		} else {
			logger.Info("Replication attempt failed")
		}

		return err
	}

	logger.Info("Replication attempt start")

	if copies.B2 == nil {
		err := errors.New("expected B2 copy to be in place before we start replication")
		return commit(stacktrace.Propagate(err, "Sanity check failed"))
	}

	if !copies.WantWasabi && !copies.WantSCW {
		err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate")
		return commit(stacktrace.Propagate(err, "Sanity check failed"))
	}

	ob, err := c.ObjectRepo.GetObjectState(tx, objectKey)
	if err != nil {
		return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
	}

	if ob.IsFileDeleted || ob.IsUserDeleted {
		// Update the object_copies to mark this object as not requiring further
		// replication. The row in object_copies will get deleted when the next
		// scheduled object deletion runs.
		err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey)
		if err != nil {
			return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
		}
		logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)",
			ob.IsFileDeleted, ob.IsUserDeleted)
		return commit(nil)
	}

	err = ensureSufficientSpace(ob.Size)
	if err != nil {
		// We don't have free space right now, maybe because other big files are
		// being downloaded simultanously, but we might get space later, so mark
		// a failed attempt that'll get retried later.
		//
		// Log this error though, so that it gets noticed if it happens too
		// frequently (the instance might need a bigger disk).
		return commit(stacktrace.Propagate(err, ""))
	}

	filePath, file, err := c.createTemporaryFile(objectKey)
	if err != nil {
		return commit(stacktrace.Propagate(err, "Failed to create temporary file"))
	}
	defer os.Remove(filePath)
	defer file.Close()

	size, err := c.downloadFromB2ViaWorker(objectKey, file, logger)
	if err != nil {
		return commit(stacktrace.Propagate(err, "Failed to download object from B2"))
	}
	logger.Infof("Downloaded %d bytes to %s", size, filePath)

	in := &UploadInput{
		File:         file,
		ObjectKey:    objectKey,
		ExpectedSize: size,
		Logger:       logger,
	}

	err = nil

	if copies.WantWasabi && copies.Wasabi == nil {
		werr := c.replicateFile(in, c.wasabiDest, func() error {
			return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey)
		})
		err = werr
	}

	if copies.WantSCW && copies.SCW == nil {
		serr := c.replicateFile(in, c.scwDest, func() error {
			return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey)
		})
		if err == nil {
			err = serr
		}
	}

	return commit(err)
}

// Return an error if we risk running out of disk space if we try to download
// and write a file of size.
//
// This function keeps a buffer of 1 GB free space in its calculations.
func ensureSufficientSpace(size int64) error {
	free, err := file.FreeSpace("/")
	if err != nil {
		return stacktrace.Propagate(err, "Failed to fetch free space")
	}

	gb := uint64(1024) * 1024 * 1024
	need := uint64(size) + (2 * gb)
	if free < need {
		return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free)
	}

	return nil
}

// Create a temporary file for storing objectKey. Return both the path to the
// file, and the handle to the file.
//
// The caller must Close() the returned file if it is not nil.
func (c *ReplicationController3) createTemporaryFile(objectKey string) (string, *os.File, error) {
	fileName := strings.ReplaceAll(objectKey, "/", "_")
	filePath := c.tempStorage + "/" + fileName
	f, err := os.Create(filePath)
	if err != nil {
		return "", nil, stacktrace.Propagate(err, "Could not create temporary file at '%s' to download object", filePath)
	}
	return filePath, f, nil
}

// Download the object for objectKey from B2 hot storage, writing it into file.
//
// Return the size of the downloaded file.
func (c *ReplicationController3) downloadFromB2ViaWorker(objectKey string, file *os.File, logger *log.Entry) (int64, error) {
	presignedURL, err := c.getPresignedB2URL(objectKey)
	if err != nil {
		return 0, stacktrace.Propagate(err, "Could not create create presigned URL for downloading object")
	}

	presignedEncodedURL := base64.StdEncoding.EncodeToString([]byte(presignedURL))

	client := &http.Client{}

	request, err := http.NewRequest("GET", c.workerURL, nil)
	if err != nil {
		return 0, stacktrace.Propagate(err, "Could not create request for worker %s", c.workerURL)
	}

	q := request.URL.Query()
	q.Add("src", presignedEncodedURL)
	request.URL.RawQuery = q.Encode()

	if c.S3Config.AreLocalBuckets() {
		originalURL := request.URL
		request, err = http.NewRequest("GET", presignedURL, nil)
		if err != nil {
			return 0, stacktrace.Propagate(err, "Could not create request for URL %s", presignedURL)
		}
		logger.Infof("Bypassing workerURL %s and instead directly GETting %s", originalURL, presignedURL)
	}

	response, err := client.Do(request)
	if err != nil {
		return 0, stacktrace.Propagate(err, "Call to CF worker failed for object %s", objectKey)
	}
	defer response.Body.Close()

	if response.StatusCode != http.StatusOK {
		if response.StatusCode == http.StatusNotFound {
			c.notifyDiscord("🔥 Could not find object in HotStorage: " + objectKey)
		}
		err = fmt.Errorf("CF Worker GET for object %s failed with HTTP status %s", objectKey, response.Status)
		return 0, stacktrace.Propagate(err, "")
	}

	n, err := io.Copy(file, response.Body)
	if err != nil {
		return 0, stacktrace.Propagate(err, "Failed to write HTTP response to file")
	}

	return n, nil
}

// Get a presigned URL to download the object with objectKey from the B2 bucket.
func (c *ReplicationController3) getPresignedB2URL(objectKey string) (string, error) {
	r, _ := c.b2Client.GetObjectRequest(&s3.GetObjectInput{
		Bucket: c.b2Bucket,
		Key:    &objectKey,
	})
	return r.Presign(PreSignedRequestValidityDuration)
}

func (c *ReplicationController3) notifyDiscord(message string) {
	c.DiscordController.Notify(message)
}

type UploadInput struct {
	File         *os.File
	ObjectKey    string
	ExpectedSize int64
	Logger       *log.Entry
}

// Upload, verify and then update the DB to mark replication to dest.
func (c *ReplicationController3) replicateFile(in *UploadInput, dest *UploadDestination, dbUpdateCopies func() error) error {
	logger := in.Logger.WithFields(log.Fields{
		"destination": dest.Label,
		"bucket":      *dest.Bucket,
	})

	failure := func(err error) error {
		c.mUploadFailure.WithLabelValues(dest.Label).Inc()
		logger.Error(err)
		return err
	}

	err := c.uploadFile(in, dest)
	if err != nil {
		return failure(stacktrace.Propagate(err, "Failed to upload object"))
	}

	err = c.verifyUploadedFileSize(in, dest)
	if err != nil {
		return failure(stacktrace.Propagate(err, "Failed to verify upload"))
	}

	// The update of the object_keys is not done in the transaction where the
	// other updates to object_copies table are made. This is so that the
	// object_keys table (which is what'll be used to delete objects) is
	// (almost) always updated if the file gets uploaded successfully.
	//
	// The only time the update wouldn't happen is if museum gets restarted
	// between the successful completion of the upload to the bucket and this
	// query getting executed.
	//
	// While possible, that is a much smaller window as compared to the
	// transaction for updating object_copies, which could easily span minutes
	// as the transaction ends only after the object has been uploaded to all
	// replicas.
	rowsAffected, err := c.ObjectRepo.MarkObjectReplicated(in.ObjectKey, dest.DC)
	if err != nil {
		return failure(stacktrace.Propagate(err, "Failed to update object_keys to mark replication as completed"))
	}

	if rowsAffected != 1 {
		// It is possible that this row was updated earlier, after an upload
		// that got completed but before object_copies table could be updated in
		// the transaction (See "Reuploads").
		//
		// So do not treat this as an error.
		logger.Warnf("Expected 1 row to be updated, but got %d", rowsAffected)
	}

	err = dbUpdateCopies()
	if err != nil {
		return failure(stacktrace.Propagate(err, "Failed to update object_copies to mark replication as complete"))
	}

	c.mUploadSuccess.WithLabelValues(dest.Label).Inc()
	return nil
}

// Upload the given file to using uploader to the given bucket.
//
// # Reuploads
//
// It is possible that the object might already exist on remote. The known
// scenario where this might happen is if museum gets restarted after having
// completed the upload but before it got around to modifying the DB.
//
// The behaviour in this case is remote dependent.
//
//   - Uploading an object with the same key on Scaleway would work normally.
//
//   - But trying to add an object with the same key on the compliance locked
//     Wasabi would return an HTTP 403.
//
// We intercept the Wasabi 403 in this case and move ahead. The subsequent
// object verification using the HEAD request will act as a sanity check for
// the object.
func (c *ReplicationController3) uploadFile(in *UploadInput, dest *UploadDestination) error {
	// Rewind the file pointer back to the start for the next upload.
	in.File.Seek(0, io.SeekStart)

	up := s3manager.UploadInput{
		Bucket: dest.Bucket,
		Key:    &in.ObjectKey,
		Body:   in.File,
	}
	if dest.IsGlacier {
		up.StorageClass = aws.String(s3.ObjectStorageClassGlacier)
	}

	result, err := dest.Uploader.Upload(&up)
	if err != nil && dest.HasComplianceHold && c.isRequestFailureAccessDenied(err) {
		in.Logger.Infof("Ignoring object that already exists on remote (we'll verify it using a HEAD check): %s", err)
		return nil
	}
	if err != nil {
		return stacktrace.Propagate(err, "Upload to bucket %s failed", *dest.Bucket)
	}

	in.Logger.Infof("Uploaded to bucket %s: %s", *dest.Bucket, result.Location)

	return nil
}

// Return true if the given error is because of an HTTP 403.
//
// See "Reuploads" for the scenario where these errors can arise.
//
// Specifically, this in an example of the HTTP 403 response we get when
// trying to add an object to a Wasabi bucket that already has a compliance
// locked object with the same key.
//
//		HTTP/1.1 403 Forbidden
//		Content-Type: application/xml
//		Date: Tue, 20 Dec 2022 10:23:33 GMT
//		Server: WasabiS3/7.10.1193-2022-11-23-84c72037e8 (head2)
//
//		<?xml version="1.0" encoding="UTF-8"?>
//		<Error>
//		 	<Code>AccessDenied</Code>
//		 	<Message>Access Denied</Message>
//		 	<RequestId>yyy</RequestId>
//		 	<HostId>zzz</HostId>
//	    </Error>
//
// Printing the error type and details produces this:
//
//	type: *s3err.RequestFailure
//	AccessDenied: Access Denied
//	  status code: 403, request id: yyy, host id: zzz
func (c *ReplicationController3) isRequestFailureAccessDenied(err error) bool {
	if reqerr, ok := err.(s3.RequestFailure); ok {
		if reqerr.Code() == "AccessDenied" {
			return true
		}
	}
	return false
}

// Verify the uploaded file by doing a HEAD check and comparing sizes
func (c *ReplicationController3) verifyUploadedFileSize(in *UploadInput, dest *UploadDestination) error {
	res, err := dest.Client.HeadObject(&s3.HeadObjectInput{
		Bucket: dest.Bucket,
		Key:    &in.ObjectKey,
	})
	if err != nil {
		return stacktrace.Propagate(err, "Fetching object info from bucket %s failed", *dest.Bucket)
	}

	if *res.ContentLength != in.ExpectedSize {
		err = fmt.Errorf("size of the uploaded file (%d) does not match the expected size (%d) in bucket %s",
			*res.ContentLength, in.ExpectedSize, *dest.Bucket)
		c.notifyDiscord(fmt.Sprint(err))
		return stacktrace.Propagate(err, "")
	}

	return nil
}