push.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package controller
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strconv"
  8. firebase "firebase.google.com/go"
  9. "firebase.google.com/go/messaging"
  10. "github.com/ente-io/museum/ente"
  11. "github.com/ente-io/museum/pkg/repo"
  12. "github.com/ente-io/museum/pkg/utils/config"
  13. "github.com/ente-io/museum/pkg/utils/time"
  14. "github.com/ente-io/stacktrace"
  15. log "github.com/sirupsen/logrus"
  16. "github.com/spf13/viper"
  17. "google.golang.org/api/option"
  18. )
  19. // PushController controls all push related operations
  20. type PushController struct {
  21. PushRepo *repo.PushTokenRepository
  22. TaskLockRepo *repo.TaskLockRepository
  23. HostName string
  24. FirebaseClient *messaging.Client
  25. }
  26. type PushToken struct {
  27. UserID int64
  28. FCMToken *string
  29. APNSToken *string
  30. CreatedAt int64
  31. UpdatedAt int64
  32. LastNotifiedAt int64
  33. }
  34. // Interval before which the last push was sent
  35. const pushIntervalInMinutes = 60
  36. // Limit defined by FirebaseClient.SendAll(...)
  37. const concurrentPushesInOneShot = 500
  38. const taskLockName = "fcm-push-lock"
  39. const taskLockDurationInMinutes = 5
  40. // As proposed by https://firebase.google.com/docs/cloud-messaging/manage-tokens#ensuring-registration-token-freshness
  41. const tokenExpiryDurationInDays = 61
  42. func NewPushController(pushRepo *repo.PushTokenRepository, taskLockRepo *repo.TaskLockRepository, hostName string) *PushController {
  43. client, err := newFirebaseClient()
  44. if err != nil {
  45. log.Error(fmt.Errorf("error creating Firebase client: %v", err))
  46. }
  47. return &PushController{PushRepo: pushRepo, TaskLockRepo: taskLockRepo, HostName: hostName, FirebaseClient: client}
  48. }
  49. func newFirebaseClient() (*messaging.Client, error) {
  50. firebaseCredentialsFile, err := config.CredentialFilePath("fcm-service-account.json")
  51. if err != nil {
  52. return nil, stacktrace.Propagate(err, "")
  53. }
  54. if firebaseCredentialsFile == "" {
  55. // Can happen when running locally
  56. return nil, nil
  57. }
  58. opt := option.WithCredentialsFile(firebaseCredentialsFile)
  59. app, err := firebase.NewApp(context.Background(), nil, opt)
  60. if err != nil {
  61. return nil, stacktrace.Propagate(err, "")
  62. }
  63. client, err := app.Messaging(context.Background())
  64. if err != nil {
  65. return nil, stacktrace.Propagate(err, "")
  66. }
  67. return client, nil
  68. }
  69. func (c *PushController) AddToken(userID int64, token ente.PushTokenRequest) error {
  70. return stacktrace.Propagate(c.PushRepo.AddToken(userID, token), "")
  71. }
  72. func (c *PushController) RemoveTokensForUser(userID int64) error {
  73. return stacktrace.Propagate(c.PushRepo.RemoveTokensForUser(userID), "")
  74. }
  75. func (c *PushController) SendPushes() {
  76. lockStatus, err := c.TaskLockRepo.AcquireLock(taskLockName,
  77. time.MicrosecondsAfterMinutes(taskLockDurationInMinutes), c.HostName)
  78. if err != nil {
  79. log.Error("Unable to acquire lock to send pushes", err)
  80. return
  81. }
  82. if !lockStatus {
  83. log.Info("Skipping sending pushes since there is an existing lock to send pushes")
  84. return
  85. }
  86. defer c.releaseTaskLock()
  87. tokens, err := c.PushRepo.GetTokensToBeNotified(time.MicrosecondsBeforeMinutes(pushIntervalInMinutes),
  88. concurrentPushesInOneShot)
  89. if err != nil {
  90. log.Error(fmt.Errorf("error fetching tokens to be notified: %v", err))
  91. return
  92. }
  93. err = c.sendFCMPushes(tokens, map[string]string{"action": "sync"})
  94. if err != nil {
  95. log.Error(fmt.Errorf("error sending pushes: %v", err))
  96. return
  97. }
  98. c.updateLastNotificationTime(tokens)
  99. }
  100. func (c *PushController) ClearExpiredTokens() {
  101. err := c.PushRepo.RemoveTokensOlderThan(time.NDaysFromNow(-1 * tokenExpiryDurationInDays))
  102. if err != nil {
  103. log.Errorf("Error while removing older tokens %s", err)
  104. } else {
  105. log.Info("Cleared expired FCM tokens")
  106. }
  107. }
  108. func (c *PushController) releaseTaskLock() {
  109. err := c.TaskLockRepo.ReleaseLock(taskLockName)
  110. if err != nil {
  111. log.Errorf("Error while releasing lock %s", err)
  112. }
  113. }
  114. func (c *PushController) updateLastNotificationTime(pushTokens []ente.PushToken) {
  115. err := c.PushRepo.SetLastNotificationTimeToNow(pushTokens)
  116. if err != nil {
  117. log.Error(fmt.Errorf("error updating last notified at times: %v", err))
  118. }
  119. }
  120. func (c *PushController) sendFCMPushes(pushTokens []ente.PushToken, payload map[string]string) error {
  121. firebaseClient := c.FirebaseClient
  122. silent := viper.GetBool("internal.silent")
  123. if silent || firebaseClient == nil {
  124. if len(pushTokens) > 0 {
  125. log.Info("Skipping sending pushes to " + strconv.Itoa(len(pushTokens)) + " devices")
  126. }
  127. return nil
  128. }
  129. log.Info("Sending pushes to " + strconv.Itoa(len(pushTokens)) + " devices")
  130. if len(pushTokens) == 0 {
  131. return nil
  132. }
  133. if len(pushTokens) > concurrentPushesInOneShot {
  134. return errors.New("cannot send these many pushes in one shot")
  135. }
  136. marshal, _ := json.Marshal(pushTokens)
  137. log.WithField("devices", string(marshal)).Info("push to following devices")
  138. fcmTokens := make([]string, 0)
  139. for _, pushTokenData := range pushTokens {
  140. fcmTokens = append(fcmTokens, pushTokenData.FCMToken)
  141. }
  142. message := &messaging.MulticastMessage{
  143. Tokens: fcmTokens,
  144. Data: payload,
  145. Android: &messaging.AndroidConfig{Priority: "high"},
  146. APNS: &messaging.APNSConfig{
  147. Headers: map[string]string{
  148. "apns-push-type": "background",
  149. "apns-priority": "5", // Must be `5` when `contentAvailable` is set to true.
  150. "apns-topic": "io.ente.frame", // bundle identifier
  151. },
  152. Payload: &messaging.APNSPayload{Aps: &messaging.Aps{ContentAvailable: true}},
  153. },
  154. }
  155. result, err := firebaseClient.SendMulticast(context.Background(), message)
  156. if err != nil {
  157. return stacktrace.Propagate(err, "Error sending pushes")
  158. } else {
  159. log.Info("Send push result: success count: " + strconv.Itoa(result.SuccessCount) +
  160. ", failure count: " + strconv.Itoa(result.FailureCount))
  161. return nil
  162. }
  163. }