file_copy.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package file_copy
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/service/s3"
  5. "github.com/ente-io/museum/ente"
  6. "github.com/ente-io/museum/pkg/controller"
  7. "github.com/ente-io/museum/pkg/repo"
  8. "github.com/ente-io/museum/pkg/utils/auth"
  9. "github.com/ente-io/museum/pkg/utils/s3config"
  10. enteTime "github.com/ente-io/museum/pkg/utils/time"
  11. "github.com/gin-contrib/requestid"
  12. "github.com/gin-gonic/gin"
  13. "github.com/sirupsen/logrus"
  14. "golang.org/x/sync/errgroup"
  15. "sync"
  16. "time"
  17. )
  18. const ()
  19. type FileCopyController struct {
  20. S3Config *s3config.S3Config
  21. FileController *controller.FileController
  22. FileRepo *repo.FileRepository
  23. CollectionCtrl *controller.CollectionController
  24. ObjectRepo *repo.ObjectRepository
  25. }
  26. type copyS3ObjectReq struct {
  27. SourceS3Object ente.S3ObjectKey
  28. DestObjectKey string
  29. }
  30. type fileCopyInternal struct {
  31. SourceFile ente.File
  32. DestCollectionID int64
  33. // The FileKey is encrypted with the destination collection's key
  34. EncryptedFileKey string
  35. EncryptedFileKeyNonce string
  36. FileCopyReq *copyS3ObjectReq
  37. ThumbCopyReq *copyS3ObjectReq
  38. }
  39. func (fci fileCopyInternal) newFile(ownedID int64) ente.File {
  40. newFileAttributes := fci.SourceFile.File
  41. newFileAttributes.ObjectKey = fci.FileCopyReq.DestObjectKey
  42. newThumbAttributes := fci.SourceFile.Thumbnail
  43. newThumbAttributes.ObjectKey = fci.ThumbCopyReq.DestObjectKey
  44. return ente.File{
  45. OwnerID: ownedID,
  46. CollectionID: fci.DestCollectionID,
  47. EncryptedKey: fci.EncryptedFileKey,
  48. KeyDecryptionNonce: fci.EncryptedFileKeyNonce,
  49. File: newFileAttributes,
  50. Thumbnail: newThumbAttributes,
  51. Metadata: fci.SourceFile.Metadata,
  52. UpdationTime: enteTime.Microseconds(),
  53. IsDeleted: false,
  54. }
  55. }
  56. func (fc *FileCopyController) CopyFiles(c *gin.Context, req ente.CopyFileSyncRequest) (*ente.CopyResponse, error) {
  57. userID := auth.GetUserID(c.Request.Header)
  58. app := auth.GetApp(c)
  59. logger := logrus.WithFields(logrus.Fields{"req_id": requestid.Get(c), "user_id": userID})
  60. err := fc.CollectionCtrl.IsCopyAllowed(c, userID, req)
  61. if err != nil {
  62. return nil, err
  63. }
  64. fileIDs := make([]int64, 0, len(req.CollectionFileItems))
  65. fileToCollectionFileMap := make(map[int64]*ente.CollectionFileItem, len(req.CollectionFileItems))
  66. for i := range req.CollectionFileItems {
  67. item := &req.CollectionFileItems[i]
  68. fileToCollectionFileMap[item.ID] = item
  69. fileIDs = append(fileIDs, item.ID)
  70. }
  71. s3ObjectsToCopy, err := fc.ObjectRepo.GetObjectsForFileIDs(fileIDs)
  72. if err != nil {
  73. return nil, err
  74. }
  75. // note: this assumes that preview existingFilesToCopy for videos are not tracked inside the object_keys table
  76. if len(s3ObjectsToCopy) != 2*len(fileIDs) {
  77. return nil, ente.NewInternalError(fmt.Sprintf("expected %d objects, got %d", 2*len(fileIDs), len(s3ObjectsToCopy)))
  78. }
  79. // todo:(neeraj) if the total size is greater than 1GB, do an early check if the user can upload the existingFilesToCopy
  80. var totalSize int64
  81. for _, obj := range s3ObjectsToCopy {
  82. totalSize += obj.FileSize
  83. }
  84. logger.WithField("totalSize", totalSize).Info("total size of existingFilesToCopy to copy")
  85. // request the uploadUrls using existing method. This is to ensure that orphan objects are automatically cleaned up
  86. // todo:(neeraj) optimize this method by removing the need for getting a signed url for each object
  87. uploadUrls, err := fc.FileController.GetUploadURLs(c, userID, len(s3ObjectsToCopy), app)
  88. if err != nil {
  89. return nil, err
  90. }
  91. existingFilesToCopy, err := fc.FileRepo.GetFileAttributesForCopy(fileIDs)
  92. if err != nil {
  93. return nil, err
  94. }
  95. if len(existingFilesToCopy) != len(fileIDs) {
  96. return nil, ente.NewInternalError(fmt.Sprintf("expected %d existingFilesToCopy, got %d", len(fileIDs), len(existingFilesToCopy)))
  97. }
  98. fileOGS3Object := make(map[int64]*copyS3ObjectReq)
  99. fileThumbS3Object := make(map[int64]*copyS3ObjectReq)
  100. for i, s3Obj := range s3ObjectsToCopy {
  101. if s3Obj.Type == ente.FILE {
  102. fileOGS3Object[s3Obj.FileID] = &copyS3ObjectReq{
  103. SourceS3Object: s3Obj,
  104. DestObjectKey: uploadUrls[i].ObjectKey,
  105. }
  106. } else if s3Obj.Type == ente.THUMBNAIL {
  107. fileThumbS3Object[s3Obj.FileID] = &copyS3ObjectReq{
  108. SourceS3Object: s3Obj,
  109. DestObjectKey: uploadUrls[i].ObjectKey,
  110. }
  111. } else {
  112. return nil, ente.NewInternalError(fmt.Sprintf("unexpected object type %s", s3Obj.Type))
  113. }
  114. }
  115. fileCopyList := make([]fileCopyInternal, 0, len(existingFilesToCopy))
  116. for i := range existingFilesToCopy {
  117. file := existingFilesToCopy[i]
  118. collectionItem := fileToCollectionFileMap[file.ID]
  119. if collectionItem.ID != file.ID {
  120. return nil, ente.NewInternalError(fmt.Sprintf("expected collectionItem.ID %d, got %d", file.ID, collectionItem.ID))
  121. }
  122. fileCopy := fileCopyInternal{
  123. SourceFile: file,
  124. DestCollectionID: req.DstCollection,
  125. EncryptedFileKey: fileToCollectionFileMap[file.ID].EncryptedKey,
  126. EncryptedFileKeyNonce: fileToCollectionFileMap[file.ID].KeyDecryptionNonce,
  127. FileCopyReq: fileOGS3Object[file.ID],
  128. ThumbCopyReq: fileThumbS3Object[file.ID],
  129. }
  130. fileCopyList = append(fileCopyList, fileCopy)
  131. }
  132. oldToNewFileIDMap := make(map[int64]int64)
  133. var wg sync.WaitGroup
  134. errChan := make(chan error, len(fileCopyList))
  135. for _, fileCopy := range fileCopyList {
  136. wg.Add(1)
  137. go func(fileCopy fileCopyInternal) {
  138. defer wg.Done()
  139. newFile, err := fc.createCopy(c, fileCopy, userID, app)
  140. if err != nil {
  141. errChan <- err
  142. return
  143. }
  144. oldToNewFileIDMap[fileCopy.SourceFile.ID] = newFile.ID
  145. }(fileCopy)
  146. }
  147. // Wait for all goroutines to finish
  148. wg.Wait()
  149. // Close the error channel and check if there were any errors
  150. close(errChan)
  151. if err, ok := <-errChan; ok {
  152. return nil, err
  153. }
  154. return &ente.CopyResponse{OldToNewFileIDMap: oldToNewFileIDMap}, nil
  155. }
  156. func (fc *FileCopyController) createCopy(c *gin.Context, fcInternal fileCopyInternal, userID int64, app ente.App) (*ente.File, error) {
  157. // using HotS3Client copy the File and Thumbnail
  158. s3Client := fc.S3Config.GetHotS3Client()
  159. hotBucket := fc.S3Config.GetHotBucket()
  160. g := new(errgroup.Group)
  161. g.Go(func() error {
  162. return copyS3Object(s3Client, hotBucket, fcInternal.FileCopyReq)
  163. })
  164. g.Go(func() error {
  165. return copyS3Object(s3Client, hotBucket, fcInternal.ThumbCopyReq)
  166. })
  167. if err := g.Wait(); err != nil {
  168. return nil, err
  169. }
  170. file := fcInternal.newFile(userID)
  171. newFile, err := fc.FileController.Create(c, userID, file, "", app)
  172. if err != nil {
  173. return nil, err
  174. }
  175. return &newFile, nil
  176. }
  177. // Helper function for S3 object copying.
  178. func copyS3Object(s3Client *s3.S3, bucket *string, req *copyS3ObjectReq) error {
  179. copySource := fmt.Sprintf("%s/%s", *bucket, req.SourceS3Object.ObjectKey)
  180. copyInput := &s3.CopyObjectInput{
  181. Bucket: bucket,
  182. CopySource: &copySource,
  183. Key: &req.DestObjectKey,
  184. }
  185. start := time.Now()
  186. _, err := s3Client.CopyObject(copyInput)
  187. elapsed := time.Since(start)
  188. if err != nil {
  189. return fmt.Errorf("failed to copy (%s) from %s to %s: %w", req.SourceS3Object.Type, copySource, req.DestObjectKey, err)
  190. }
  191. logrus.WithField("duration", elapsed).WithField("size", req.SourceS3Object.FileSize).Infof("copied (%s) from %s to %s", req.SourceS3Object.Type, copySource, req.DestObjectKey)
  192. return nil
  193. }