manager_store.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package main
  2. import (
  3. "github.com/gofrs/uuid"
  4. "github.com/knadh/listmonk/models"
  5. "github.com/lib/pq"
  6. )
  7. // runnerDB implements runner.DataSource over the primary
  8. // database.
  9. type runnerDB struct {
  10. queries *Queries
  11. }
  12. func newManagerStore(q *Queries) *runnerDB {
  13. return &runnerDB{
  14. queries: q,
  15. }
  16. }
  17. // NextCampaigns retrieves active campaigns ready to be processed.
  18. func (r *runnerDB) NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) {
  19. var out []*models.Campaign
  20. err := r.queries.NextCampaigns.Select(&out, pq.Int64Array(excludeIDs))
  21. return out, err
  22. }
  23. // NextSubscribers retrieves a subset of subscribers of a given campaign.
  24. // Since batches are processed sequentially, the retrieval is ordered by ID,
  25. // and every batch takes the last ID of the last batch and fetches the next
  26. // batch above that.
  27. func (r *runnerDB) NextSubscribers(campID, limit int) ([]models.Subscriber, error) {
  28. var out []models.Subscriber
  29. err := r.queries.NextCampaignSubscribers.Select(&out, campID, limit)
  30. return out, err
  31. }
  32. // GetCampaign fetches a campaign from the database.
  33. func (r *runnerDB) GetCampaign(campID int) (*models.Campaign, error) {
  34. var out = &models.Campaign{}
  35. err := r.queries.GetCampaign.Get(out, campID, nil)
  36. return out, err
  37. }
  38. // UpdateCampaignStatus updates a campaign's status.
  39. func (r *runnerDB) UpdateCampaignStatus(campID int, status string) error {
  40. _, err := r.queries.UpdateCampaignStatus.Exec(campID, status)
  41. return err
  42. }
  43. // CreateLink registers a URL with a UUID for tracking clicks and returns the UUID.
  44. func (r *runnerDB) CreateLink(url string) (string, error) {
  45. // Create a new UUID for the URL. If the URL already exists in the DB
  46. // the UUID in the database is returned.
  47. uu, err := uuid.NewV4()
  48. if err != nil {
  49. return "", err
  50. }
  51. var out string
  52. if err := r.queries.CreateLink.Get(&out, uu, url); err != nil {
  53. return "", err
  54. }
  55. return out, nil
  56. }
  57. // RecordBounce records a bounce event and returns the bounce count.
  58. func (r *runnerDB) RecordBounce(b models.Bounce) (int64, int, error) {
  59. var res = struct {
  60. SubscriberID int64 `db:"subscriber_id"`
  61. Num int `db:"num"`
  62. }{}
  63. err := r.queries.UpdateCampaignStatus.Select(&res,
  64. b.SubscriberUUID,
  65. b.Email,
  66. b.CampaignUUID,
  67. b.Type,
  68. b.Source,
  69. b.Meta)
  70. return res.SubscriberID, res.Num, err
  71. }
  72. func (r *runnerDB) BlocklistSubscriber(id int64) error {
  73. _, err := r.queries.BlocklistSubscribers.Exec(pq.Int64Array{id})
  74. return err
  75. }
  76. func (r *runnerDB) DeleteSubscriber(id int64) error {
  77. _, err := r.queries.DeleteSubscribers.Exec(pq.Int64Array{id})
  78. return err
  79. }