subscribers.go 18 KB


  1. package core
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "github.com/gofrs/uuid"
  10. "github.com/knadh/listmonk/models"
  11. "github.com/labstack/echo/v4"
  12. "github.com/lib/pq"
  13. )
  14. var (
  15. subQuerySortFields = []string{"email", "name", "created_at", "updated_at"}
  16. )
  17. // GetSubscriber fetches a subscriber by one of the given params.
  18. func (c *Core) GetSubscriber(id int, uuid, email string) (models.Subscriber, error) {
  19. var uu interface{}
  20. if uuid != "" {
  21. uu = uuid
  22. }
  23. var out models.Subscribers
  24. if err := c.q.GetSubscriber.Select(&out, id, uu, email); err != nil {
  25. c.log.Printf("error fetching subscriber: %v", err)
  26. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  27. c.i18n.Ts("globals.messages.errorFetching",
  28. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  29. }
  30. if len(out) == 0 {
  31. return models.Subscriber{}, echo.NewHTTPError(http.StatusBadRequest,
  32. c.i18n.Ts("globals.messages.notFound", "name",
  33. fmt.Sprintf("{globals.terms.subscriber} (%d: %s%s)", id, uuid, email)))
  34. }
  35. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  36. c.log.Printf("error loading subscriber lists: %v", err)
  37. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  38. c.i18n.Ts("globals.messages.errorFetching",
  39. "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  40. }
  41. return out[0], nil
  42. }
  43. // GetSubscribersByEmail fetches a subscriber by one of the given params.
  44. func (c *Core) GetSubscribersByEmail(emails []string) (models.Subscribers, error) {
  45. var out models.Subscribers
  46. if err := c.q.GetSubscribersByEmails.Select(&out, pq.Array(emails)); err != nil {
  47. c.log.Printf("error fetching subscriber: %v", err)
  48. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  49. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  50. }
  51. if len(out) == 0 {
  52. return nil, echo.NewHTTPError(http.StatusBadRequest, c.i18n.T("campaigns.noKnownSubsToTest"))
  53. }
  54. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  55. c.log.Printf("error loading subscriber lists: %v", err)
  56. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  57. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  58. }
  59. return out, nil
  60. }
  61. // QuerySubscribers queries and returns paginated subscrribers based on the given params including the total count.
  62. func (c *Core) QuerySubscribers(query string, listIDs []int, order, orderBy string, offset, limit int) (models.Subscribers, int, error) {
  63. // There's an arbitrary query condition.
  64. cond := ""
  65. if query != "" {
  66. cond = " AND " + query
  67. }
  68. // Sort params.
  69. if !strSliceContains(orderBy, subQuerySortFields) {
  70. orderBy = "subscribers.id"
  71. }
  72. if order != SortAsc && order != SortDesc {
  73. order = SortDesc
  74. }
  75. // Required for pq.Array()
  76. if listIDs == nil {
  77. listIDs = []int{}
  78. }
  79. // Create a readonly transaction that just does COUNT() to obtain the count of results
  80. // and to ensure that the arbitrary query is indeed readonly.
  81. stmt := fmt.Sprintf(c.q.QuerySubscribersCount, cond)
  82. tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  83. if err != nil {
  84. c.log.Printf("error preparing subscriber query: %v", err)
  85. return nil, 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  86. }
  87. defer tx.Rollback()
  88. // Execute the readonly query and get the count of results.
  89. total := 0
  90. if err := tx.Get(&total, stmt, pq.Array(listIDs)); err != nil {
  91. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  92. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  93. }
  94. // No results.
  95. if total == 0 {
  96. return models.Subscribers{}, 0, nil
  97. }
  98. // Run the query again and fetch the actual data. stmt is the raw SQL query.
  99. var out models.Subscribers
  100. stmt = strings.ReplaceAll(c.q.QuerySubscribers, "%query%", cond)
  101. stmt = strings.ReplaceAll(stmt, "%order%", orderBy+" "+order)
  102. if err := tx.Select(&out, stmt, pq.Array(listIDs), offset, limit); err != nil {
  103. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  104. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  105. }
  106. // Lazy load lists for each subscriber.
  107. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  108. c.log.Printf("error fetching subscriber lists: %v", err)
  109. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  110. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  111. }
  112. return out, total, nil
  113. }
  114. // GetSubscriberLists returns a subscriber's lists based on the given conditions.
  115. func (c *Core) GetSubscriberLists(subID int, uuid string, listIDs []int, listUUIDs []string, subStatus string, listType string) ([]models.List, error) {
  116. if listIDs == nil {
  117. listIDs = []int{}
  118. }
  119. if listUUIDs == nil {
  120. listUUIDs = []string{}
  121. }
  122. var uu interface{}
  123. if uuid != "" {
  124. uu = uuid
  125. }
  126. // Fetch double opt-in lists from the given list IDs.
  127. // Get the list of subscription lists where the subscriber hasn't confirmed.
  128. out := []models.List{}
  129. if err := c.q.GetSubscriberLists.Select(&out, subID, uu, pq.Array(listIDs), pq.Array(listUUIDs), subStatus, listType); err != nil {
  130. c.log.Printf("error fetching lists for opt-in: %s", pqErrMsg(err))
  131. return nil, err
  132. }
  133. return out, nil
  134. }
  135. // GetSubscriberProfileForExport returns the subscriber's profile data as a JSON exportable.
  136. // Get the subscriber's data. A single query that gets the profile, list subscriptions, campaign views,
  137. // and link clicks. Names of private lists are replaced with "Private list".
  138. func (c *Core) GetSubscriberProfileForExport(id int, uuid string) (models.SubscriberExportProfile, error) {
  139. var uu interface{}
  140. if uuid != "" {
  141. uu = uuid
  142. }
  143. var out models.SubscriberExportProfile
  144. if err := c.q.ExportSubscriberData.Get(&out, id, uu); err != nil {
  145. c.log.Printf("error fetching subscriber export data: %v", err)
  146. return models.SubscriberExportProfile{}, echo.NewHTTPError(http.StatusInternalServerError,
  147. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", err.Error()))
  148. }
  149. return out, nil
  150. }
  151. // ExportSubscribers returns an iterator function that provides lists of subscribers based
  152. // on the given criteria in an exportable form. The iterator function returned can be called
  153. // repeatedly until there are nil subscribers. It's an iterator because exports can be extremely
  154. // large and may have to be fetched in batches from the DB and streamed somewhere.
  155. func (c *Core) ExportSubscribers(query string, subIDs, listIDs []int, batchSize int) (func() ([]models.SubscriberExport, error), error) {
  156. // There's an arbitrary query condition.
  157. cond := ""
  158. if query != "" {
  159. cond = " AND " + query
  160. }
  161. stmt := fmt.Sprintf(c.q.QuerySubscribersForExport, cond)
  162. stmt = strings.ReplaceAll(c.q.QuerySubscribersForExport, "%query%", cond)
  163. // Verify that the arbitrary SQL search expression is read only.
  164. if cond != "" {
  165. tx, err := c.db.Unsafe().BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  166. if err != nil {
  167. c.log.Printf("error preparing subscriber query: %v", err)
  168. return nil, echo.NewHTTPError(http.StatusBadRequest,
  169. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  170. }
  171. defer tx.Rollback()
  172. if _, err := tx.Query(stmt, nil, 0, nil, 1); err != nil {
  173. return nil, echo.NewHTTPError(http.StatusBadRequest,
  174. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  175. }
  176. }
  177. if subIDs == nil {
  178. subIDs = []int{}
  179. }
  180. if listIDs == nil {
  181. listIDs = []int{}
  182. }
  183. // Prepare the actual query statement.
  184. tx, err := c.db.Preparex(stmt)
  185. if err != nil {
  186. c.log.Printf("error preparing subscriber query: %v", err)
  187. return nil, echo.NewHTTPError(http.StatusBadRequest,
  188. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  189. }
  190. id := 0
  191. return func() ([]models.SubscriberExport, error) {
  192. var out []models.SubscriberExport
  193. if err := tx.Select(&out, pq.Array(listIDs), id, pq.Array(subIDs), batchSize); err != nil {
  194. c.log.Printf("error exporting subscribers by query: %v", err)
  195. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  196. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  197. }
  198. if len(out) == 0 {
  199. return nil, nil
  200. }
  201. id = out[len(out)-1].ID
  202. return out, nil
  203. }, nil
  204. }
  205. // InsertSubscriber inserts a subscriber and returns the ID. The first bool indicates if
  206. // it was a new subscriber, and the second bool indicates if the subscriber was sent an optin confirmation.
  207. // bool = optinSent?
  208. func (c *Core) InsertSubscriber(sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm bool) (models.Subscriber, bool, error) {
  209. uu, err := uuid.NewV4()
  210. if err != nil {
  211. c.log.Printf("error generating UUID: %v", err)
  212. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
  213. c.i18n.Ts("globals.messages.errorUUID", "error", err.Error()))
  214. }
  215. sub.UUID = uu.String()
  216. subStatus := models.SubscriptionStatusUnconfirmed
  217. if preconfirm {
  218. subStatus = models.SubscriptionStatusConfirmed
  219. }
  220. if sub.Status == "" {
  221. sub.Status = models.UserStatusEnabled
  222. }
  223. // For pq.Array()
  224. if listIDs == nil {
  225. listIDs = []int{}
  226. }
  227. if listUUIDs == nil {
  228. listUUIDs = []string{}
  229. }
  230. if err = c.q.InsertSubscriber.Get(&sub.ID,
  231. sub.UUID,
  232. sub.Email,
  233. strings.TrimSpace(sub.Name),
  234. sub.Status,
  235. sub.Attribs,
  236. pq.Array(listIDs),
  237. pq.Array(listUUIDs),
  238. subStatus); err != nil {
  239. if pqErr, ok := err.(*pq.Error); ok && pqErr.Constraint == "subscribers_email_key" {
  240. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusConflict,
  241. c.i18n.T("subscribers.emailExists"))
  242. } else {
  243. // return sub.Subscriber, errSubscriberExists
  244. c.log.Printf("error inserting subscriber: %v", err)
  245. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
  246. c.i18n.Ts("globals.messages.errorCreating",
  247. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  248. }
  249. }
  250. // Fetch the subscriber's full data. If the subscriber already existed and wasn't
  251. // created, the id will be empty. Fetch the details by e-mail then.
  252. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  253. if err != nil {
  254. return models.Subscriber{}, false, err
  255. }
  256. hasOptin := false
  257. if !preconfirm && c.constants.SendOptinConfirmation {
  258. // Send a confirmation e-mail (if there are any double opt-in lists).
  259. num, _ := c.h.SendOptinConfirmation(out, listIDs)
  260. hasOptin = num > 0
  261. }
  262. return out, hasOptin, nil
  263. }
  264. // UpdateSubscriber updates a subscriber's properties.
  265. func (c *Core) UpdateSubscriber(id int, sub models.Subscriber) (models.Subscriber, error) {
  266. // Format raw JSON attributes.
  267. attribs := []byte("{}")
  268. if len(sub.Attribs) > 0 {
  269. if b, err := json.Marshal(sub.Attribs); err != nil {
  270. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  271. c.i18n.Ts("globals.messages.errorUpdating",
  272. "name", "{globals.terms.subscriber}", "error", err.Error()))
  273. } else {
  274. attribs = b
  275. }
  276. }
  277. _, err := c.q.UpdateSubscriber.Exec(id,
  278. sub.Email,
  279. strings.TrimSpace(sub.Name),
  280. sub.Status,
  281. json.RawMessage(attribs),
  282. )
  283. if err != nil {
  284. c.log.Printf("error updating subscriber: %v", err)
  285. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  286. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  287. }
  288. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  289. if err != nil {
  290. return models.Subscriber{}, err
  291. }
  292. return out, nil
  293. }
  294. // UpdateSubscriberWithLists updates a subscriber's properties.
  295. // If deleteLists is set to true, all existing subscriptions are deleted and only
  296. // the ones provided are added or retained.
  297. func (c *Core) UpdateSubscriberWithLists(id int, sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm, deleteLists bool) (models.Subscriber, error) {
  298. subStatus := models.SubscriptionStatusUnconfirmed
  299. if preconfirm {
  300. subStatus = models.SubscriptionStatusConfirmed
  301. }
  302. // Format raw JSON attributes.
  303. attribs := []byte("{}")
  304. if len(sub.Attribs) > 0 {
  305. if b, err := json.Marshal(sub.Attribs); err != nil {
  306. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  307. c.i18n.Ts("globals.messages.errorUpdating",
  308. "name", "{globals.terms.subscriber}", "error", err.Error()))
  309. } else {
  310. attribs = b
  311. }
  312. }
  313. _, err := c.q.UpdateSubscriberWithLists.Exec(id,
  314. sub.Email,
  315. strings.TrimSpace(sub.Name),
  316. sub.Status,
  317. json.RawMessage(attribs),
  318. pq.Array(listIDs),
  319. pq.Array(listUUIDs),
  320. subStatus,
  321. deleteLists)
  322. if err != nil {
  323. c.log.Printf("error updating subscriber: %v", err)
  324. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  325. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  326. }
  327. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  328. if err != nil {
  329. return models.Subscriber{}, err
  330. }
  331. if !preconfirm && c.constants.SendOptinConfirmation {
  332. // Send a confirmation e-mail (if there are any double opt-in lists).
  333. c.h.SendOptinConfirmation(out, listIDs)
  334. }
  335. return out, nil
  336. }
  337. // BlocklistSubscribers blocklists the given list of subscribers.
  338. func (c *Core) BlocklistSubscribers(subIDs []int) error {
  339. if _, err := c.q.BlocklistSubscribers.Exec(pq.Array(subIDs)); err != nil {
  340. c.log.Printf("error blocklisting subscribers: %v", err)
  341. return echo.NewHTTPError(http.StatusInternalServerError,
  342. c.i18n.Ts("subscribers.errorBlocklisting", "error", err.Error()))
  343. }
  344. return nil
  345. }
  346. // BlocklistSubscribersByQuery blocklists the given list of subscribers.
  347. func (c *Core) BlocklistSubscribersByQuery(query string, listIDs []int) error {
  348. if err := c.q.ExecSubQueryTpl(sanitizeSQLExp(query), c.q.BlocklistSubscribersByQuery, listIDs, c.db); err != nil {
  349. c.log.Printf("error blocklisting subscribers: %v", err)
  350. return echo.NewHTTPError(http.StatusInternalServerError,
  351. c.i18n.Ts("subscribers.errorBlocklisting", "error", pqErrMsg(err)))
  352. }
  353. return nil
  354. }
  355. // DeleteSubscribers deletes the given list of subscribers.
  356. func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
  357. if subIDs == nil {
  358. subIDs = []int{}
  359. }
  360. if subUUIDs == nil {
  361. subUUIDs = []string{}
  362. }
  363. if _, err := c.q.DeleteSubscribers.Exec(pq.Array(subIDs), pq.Array(subUUIDs)); err != nil {
  364. c.log.Printf("error deleting subscribers: %v", err)
  365. return echo.NewHTTPError(http.StatusInternalServerError,
  366. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  367. }
  368. return nil
  369. }
  370. // DeleteSubscribersByQuery deletes subscribers by a given arbitrary query expression.
  371. func (c *Core) DeleteSubscribersByQuery(query string, listIDs []int) error {
  372. err := c.q.ExecSubQueryTpl(sanitizeSQLExp(query), c.q.DeleteSubscribersByQuery, listIDs, c.db)
  373. if err != nil {
  374. c.log.Printf("error deleting subscribers: %v", err)
  375. return echo.NewHTTPError(http.StatusInternalServerError,
  376. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  377. }
  378. return err
  379. }
  380. // UnsubscribeByCampaign unsubscribes a given subscriber from lists in a given campaign.
  381. func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) error {
  382. if _, err := c.q.UnsubscribeByCampaign.Exec(campUUID, subUUID, blocklist); err != nil {
  383. c.log.Printf("error unsubscribing: %v", err)
  384. return echo.NewHTTPError(http.StatusInternalServerError,
  385. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  386. }
  387. return nil
  388. }
  389. // ConfirmOptionSubscription confirms a subscriber's optin subscription.
  390. func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string, meta models.JSON) error {
  391. if meta == nil {
  392. meta = models.JSON{}
  393. }
  394. if _, err := c.q.ConfirmSubscriptionOptin.Exec(subUUID, pq.Array(listUUIDs), meta); err != nil {
  395. c.log.Printf("error confirming subscription: %v", err)
  396. return echo.NewHTTPError(http.StatusInternalServerError,
  397. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  398. }
  399. return nil
  400. }
  401. // DeleteSubscriberBounces deletes the given list of subscribers.
  402. func (c *Core) DeleteSubscriberBounces(id int, uuid string) error {
  403. var uu interface{}
  404. if uuid != "" {
  405. uu = uuid
  406. }
  407. if _, err := c.q.DeleteBouncesBySubscriber.Exec(id, uu); err != nil {
  408. c.log.Printf("error deleting bounces: %v", err)
  409. return echo.NewHTTPError(http.StatusInternalServerError,
  410. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.bounces}", "error", pqErrMsg(err)))
  411. }
  412. return nil
  413. }
  414. // DeleteOrphanSubscribers deletes orphan subscriber records (subscribers without lists).
  415. func (c *Core) DeleteOrphanSubscribers() (int, error) {
  416. res, err := c.q.DeleteOrphanSubscribers.Exec()
  417. if err != nil {
  418. c.log.Printf("error deleting orphan subscribers: %v", err)
  419. return 0, echo.NewHTTPError(http.StatusInternalServerError,
  420. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  421. }
  422. n, _ := res.RowsAffected()
  423. return int(n), nil
  424. }
  425. // DeleteBlocklistedSubscribers deletes blocklisted subscribers.
  426. func (c *Core) DeleteBlocklistedSubscribers() (int, error) {
  427. res, err := c.q.DeleteBlocklistedSubscribers.Exec()
  428. if err != nil {
  429. c.log.Printf("error deleting blocklisted subscribers: %v", err)
  430. return 0, echo.NewHTTPError(http.StatusInternalServerError,
  431. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  432. }
  433. n, _ := res.RowsAffected()
  434. return int(n), nil
  435. }