subscribers.go 17 KB


  1. package core
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "github.com/gofrs/uuid"
  10. "github.com/knadh/listmonk/models"
  11. "github.com/labstack/echo/v4"
  12. "github.com/lib/pq"
  13. )
  14. var (
  15. subQuerySortFields = []string{"email", "name", "created_at", "updated_at"}
  16. )
  17. // GetSubscriber fetches a subscriber by one of the given params.
  18. func (c *Core) GetSubscriber(id int, uuid, email string) (models.Subscriber, error) {
  19. var uu interface{}
  20. if uuid != "" {
  21. uu = uuid
  22. }
  23. var out models.Subscribers
  24. if err := c.q.GetSubscriber.Select(&out, id, uu, email); err != nil {
  25. c.log.Printf("error fetching subscriber: %v", err)
  26. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  27. c.i18n.Ts("globals.messages.errorFetching",
  28. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  29. }
  30. if len(out) == 0 {
  31. return models.Subscriber{}, echo.NewHTTPError(http.StatusBadRequest,
  32. c.i18n.Ts("globals.messages.notFound", "name", "{globals.terms.subscriber}"))
  33. }
  34. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  35. c.log.Printf("error loading subscriber lists: %v", err)
  36. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  37. c.i18n.Ts("globals.messages.errorFetching",
  38. "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  39. }
  40. return out[0], nil
  41. }
  42. // GetSubscribersByEmail fetches a subscriber by one of the given params.
  43. func (c *Core) GetSubscribersByEmail(emails []string) (models.Subscribers, error) {
  44. var out models.Subscribers
  45. if err := c.q.GetSubscribersByEmails.Select(&out, pq.Array(emails)); err != nil {
  46. c.log.Printf("error fetching subscriber: %v", err)
  47. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  48. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  49. }
  50. if len(out) == 0 {
  51. return nil, echo.NewHTTPError(http.StatusBadRequest, c.i18n.T("campaigns.noKnownSubsToTest"))
  52. }
  53. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  54. c.log.Printf("error loading subscriber lists: %v", err)
  55. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  56. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  57. }
  58. return out, nil
  59. }
  60. // QuerySubscribers queries and returns paginated subscrribers based on the given params including the total count.
  61. func (c *Core) QuerySubscribers(query string, listIDs []int, order, orderBy string, offset, limit int) (models.Subscribers, int, error) {
  62. // There's an arbitrary query condition.
  63. cond := ""
  64. if query != "" {
  65. cond = " AND " + query
  66. }
  67. // Sort params.
  68. if !strSliceContains(orderBy, subQuerySortFields) {
  69. orderBy = "subscribers.id"
  70. }
  71. if order != SortAsc && order != SortDesc {
  72. order = SortDesc
  73. }
  74. // Required for pq.Array()
  75. if listIDs == nil {
  76. listIDs = []int{}
  77. }
  78. // Create a readonly transaction that just does COUNT() to obtain the count of results
  79. // and to ensure that the arbitrary query is indeed readonly.
  80. stmt := fmt.Sprintf(c.q.QuerySubscribersCount, cond)
  81. tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  82. if err != nil {
  83. c.log.Printf("error preparing subscriber query: %v", err)
  84. return nil, 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  85. }
  86. defer tx.Rollback()
  87. // Execute the readonly query and get the count of results.
  88. total := 0
  89. if err := tx.Get(&total, stmt, pq.Array(listIDs)); err != nil {
  90. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  91. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  92. }
  93. // No results.
  94. if total == 0 {
  95. return models.Subscribers{}, 0, nil
  96. }
  97. // Run the query again and fetch the actual data. stmt is the raw SQL query.
  98. var out models.Subscribers
  99. stmt = strings.ReplaceAll(c.q.QuerySubscribers, "%query%", cond)
  100. stmt = strings.ReplaceAll(stmt, "%order%", orderBy+" "+order)
  101. if err := tx.Select(&out, stmt, pq.Array(listIDs), offset, limit); err != nil {
  102. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  103. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  104. }
  105. // Lazy load lists for each subscriber.
  106. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  107. c.log.Printf("error fetching subscriber lists: %v", err)
  108. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  109. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  110. }
  111. return out, total, nil
  112. }
  113. // GetSubscriberLists returns a subscriber's lists based on the given conditions.
  114. func (c *Core) GetSubscriberLists(subID int, uuid string, listIDs []int, listUUIDs []string, subStatus string, listType string) ([]models.List, error) {
  115. if listIDs == nil {
  116. listIDs = []int{}
  117. }
  118. if listUUIDs == nil {
  119. listUUIDs = []string{}
  120. }
  121. var uu interface{}
  122. if uuid != "" {
  123. uu = uuid
  124. }
  125. // Fetch double opt-in lists from the given list IDs.
  126. // Get the list of subscription lists where the subscriber hasn't confirmed.
  127. out := []models.List{}
  128. if err := c.q.GetSubscriberLists.Select(&out, subID, uu, pq.Array(listIDs), pq.Array(listUUIDs), subStatus, listType); err != nil {
  129. c.log.Printf("error fetching lists for opt-in: %s", pqErrMsg(err))
  130. return nil, err
  131. }
  132. return out, nil
  133. }
  134. // GetSubscriberProfileForExport returns the subscriber's profile data as a JSON exportable.
  135. // Get the subscriber's data. A single query that gets the profile, list subscriptions, campaign views,
  136. // and link clicks. Names of private lists are replaced with "Private list".
  137. func (c *Core) GetSubscriberProfileForExport(id int, uuid string) (models.SubscriberExportProfile, error) {
  138. var uu interface{}
  139. if uuid != "" {
  140. uu = uuid
  141. }
  142. var out models.SubscriberExportProfile
  143. if err := c.q.ExportSubscriberData.Get(&out, id, uu); err != nil {
  144. c.log.Printf("error fetching subscriber export data: %v", err)
  145. return models.SubscriberExportProfile{}, echo.NewHTTPError(http.StatusInternalServerError,
  146. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", err.Error()))
  147. }
  148. return out, nil
  149. }
  150. // ExportSubscribers returns an iterator function that provides lists of subscribers based
  151. // on the given criteria in an exportable form. The iterator function returned can be called
  152. // repeatedly until there are nil subscribers. It's an iterator because exports can be extremely
  153. // large and may have to be fetched in batches from the DB and streamed somewhere.
  154. func (c *Core) ExportSubscribers(query string, subIDs, listIDs []int, batchSize int) (func() ([]models.SubscriberExport, error), error) {
  155. // There's an arbitrary query condition.
  156. cond := ""
  157. if query != "" {
  158. cond = " AND " + query
  159. }
  160. stmt := fmt.Sprintf(c.q.QuerySubscribersForExport, cond)
  161. stmt = strings.ReplaceAll(c.q.QuerySubscribersForExport, "%query%", cond)
  162. // Verify that the arbitrary SQL search expression is read only.
  163. if cond != "" {
  164. tx, err := c.db.Unsafe().BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  165. if err != nil {
  166. c.log.Printf("error preparing subscriber query: %v", err)
  167. return nil, echo.NewHTTPError(http.StatusBadRequest,
  168. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  169. }
  170. defer tx.Rollback()
  171. if _, err := tx.Query(stmt, nil, 0, nil, 1); err != nil {
  172. return nil, echo.NewHTTPError(http.StatusBadRequest,
  173. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  174. }
  175. }
  176. if subIDs == nil {
  177. subIDs = []int{}
  178. }
  179. if listIDs == nil {
  180. listIDs = []int{}
  181. }
  182. // Prepare the actual query statement.
  183. tx, err := c.db.Preparex(stmt)
  184. if err != nil {
  185. c.log.Printf("error preparing subscriber query: %v", err)
  186. return nil, echo.NewHTTPError(http.StatusBadRequest,
  187. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  188. }
  189. id := 0
  190. return func() ([]models.SubscriberExport, error) {
  191. var out []models.SubscriberExport
  192. if err := tx.Select(&out, pq.Array(listIDs), id, pq.Array(subIDs), batchSize); err != nil {
  193. c.log.Printf("error exporting subscribers by query: %v", err)
  194. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  195. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  196. }
  197. if len(out) == 0 {
  198. return nil, nil
  199. }
  200. id = out[len(out)-1].ID
  201. return out, nil
  202. }, nil
  203. }
  204. // InsertSubscriber inserts a subscriber and returns the ID. The first bool indicates if
  205. // it was a new subscriber, and the second bool indicates if the subscriber was sent an optin confirmation.
  206. // bool = optinSent?
  207. func (c *Core) InsertSubscriber(sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm bool) (models.Subscriber, bool, error) {
  208. uu, err := uuid.NewV4()
  209. if err != nil {
  210. c.log.Printf("error generating UUID: %v", err)
  211. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
  212. c.i18n.Ts("globals.messages.errorUUID", "error", err.Error()))
  213. }
  214. sub.UUID = uu.String()
  215. subStatus := models.SubscriptionStatusUnconfirmed
  216. if preconfirm {
  217. subStatus = models.SubscriptionStatusConfirmed
  218. }
  219. if sub.Status == "" {
  220. sub.Status = models.UserStatusEnabled
  221. }
  222. // For pq.Array()
  223. if listIDs == nil {
  224. listIDs = []int{}
  225. }
  226. if listUUIDs == nil {
  227. listUUIDs = []string{}
  228. }
  229. if err = c.q.InsertSubscriber.Get(&sub.ID,
  230. sub.UUID,
  231. sub.Email,
  232. strings.TrimSpace(sub.Name),
  233. sub.Status,
  234. sub.Attribs,
  235. pq.Array(listIDs),
  236. pq.Array(listUUIDs),
  237. subStatus); err != nil {
  238. if pqErr, ok := err.(*pq.Error); ok && pqErr.Constraint == "subscribers_email_key" {
  239. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusConflict,
  240. c.i18n.T("subscribers.emailExists"))
  241. } else {
  242. // return sub.Subscriber, errSubscriberExists
  243. c.log.Printf("error inserting subscriber: %v", err)
  244. return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
  245. c.i18n.Ts("globals.messages.errorCreating",
  246. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  247. }
  248. }
  249. // Fetch the subscriber'out full data. If the subscriber already existed and wasn't
  250. // created, the id will be empty. Fetch the details by e-mail then.
  251. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  252. if err != nil {
  253. return models.Subscriber{}, false, err
  254. }
  255. hasOptin := false
  256. if !preconfirm && c.constants.SendOptinConfirmation {
  257. // Send a confirmation e-mail (if there are any double opt-in lists).
  258. num, _ := c.h.SendOptinConfirmation(out, listIDs)
  259. hasOptin = num > 0
  260. }
  261. return out, hasOptin, nil
  262. }
  263. // UpdateSubscriber updates a subscriber's properties.
  264. func (c *Core) UpdateSubscriber(id int, sub models.Subscriber) (models.Subscriber, error) {
  265. // Format raw JSON attributes.
  266. attribs := []byte("{}")
  267. if len(sub.Attribs) > 0 {
  268. if b, err := json.Marshal(sub.Attribs); err != nil {
  269. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  270. c.i18n.Ts("globals.messages.errorUpdating",
  271. "name", "{globals.terms.subscriber}", "error", err.Error()))
  272. } else {
  273. attribs = b
  274. }
  275. }
  276. _, err := c.q.UpdateSubscriber.Exec(id,
  277. sub.Email,
  278. strings.TrimSpace(sub.Name),
  279. sub.Status,
  280. json.RawMessage(attribs),
  281. )
  282. if err != nil {
  283. c.log.Printf("error updating subscriber: %v", err)
  284. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  285. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  286. }
  287. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  288. if err != nil {
  289. return models.Subscriber{}, err
  290. }
  291. return out, nil
  292. }
  293. // UpdateSubscriberWithLists updates a subscriber's properties.
  294. // If deleteLists is set to true, all existing subscriptions are deleted and only
  295. // the ones provided are added or retained.
  296. func (c *Core) UpdateSubscriberWithLists(id int, sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm, deleteLists bool) (models.Subscriber, error) {
  297. subStatus := models.SubscriptionStatusUnconfirmed
  298. if preconfirm {
  299. subStatus = models.SubscriptionStatusConfirmed
  300. }
  301. // Format raw JSON attributes.
  302. attribs := []byte("{}")
  303. if len(sub.Attribs) > 0 {
  304. if b, err := json.Marshal(sub.Attribs); err != nil {
  305. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  306. c.i18n.Ts("globals.messages.errorUpdating",
  307. "name", "{globals.terms.subscriber}", "error", err.Error()))
  308. } else {
  309. attribs = b
  310. }
  311. }
  312. _, err := c.q.UpdateSubscriberWithLists.Exec(id,
  313. sub.Email,
  314. strings.TrimSpace(sub.Name),
  315. sub.Status,
  316. json.RawMessage(attribs),
  317. pq.Array(listIDs),
  318. pq.Array(listUUIDs),
  319. subStatus,
  320. deleteLists)
  321. if err != nil {
  322. c.log.Printf("error updating subscriber: %v", err)
  323. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  324. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  325. }
  326. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  327. if err != nil {
  328. return models.Subscriber{}, err
  329. }
  330. return out, nil
  331. }
  332. // BlocklistSubscribers blocklists the given list of subscribers.
  333. func (c *Core) BlocklistSubscribers(subIDs []int) error {
  334. if _, err := c.q.BlocklistSubscribers.Exec(pq.Array(subIDs)); err != nil {
  335. c.log.Printf("error blocklisting subscribers: %v", err)
  336. return echo.NewHTTPError(http.StatusInternalServerError,
  337. c.i18n.Ts("subscribers.errorBlocklisting", "error", err.Error()))
  338. }
  339. return nil
  340. }
  341. // BlocklistSubscribersByQuery blocklists the given list of subscribers.
  342. func (c *Core) BlocklistSubscribersByQuery(query string, listIDs []int) error {
  343. if err := c.q.ExecSubQueryTpl(sanitizeSQLExp(query), c.q.BlocklistSubscribersByQuery, listIDs, c.db); err != nil {
  344. c.log.Printf("error blocklisting subscribers: %v", err)
  345. return echo.NewHTTPError(http.StatusInternalServerError,
  346. c.i18n.Ts("subscribers.errorBlocklisting", "error", pqErrMsg(err)))
  347. }
  348. return nil
  349. }
  350. // DeleteSubscribers deletes the given list of subscribers.
  351. func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
  352. if subIDs == nil {
  353. subIDs = []int{}
  354. }
  355. if subUUIDs == nil {
  356. subUUIDs = []string{}
  357. }
  358. if _, err := c.q.DeleteSubscribers.Exec(pq.Array(subIDs), pq.Array(subUUIDs)); err != nil {
  359. c.log.Printf("error deleting subscribers: %v", err)
  360. return echo.NewHTTPError(http.StatusInternalServerError,
  361. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  362. }
  363. return nil
  364. }
  365. // DeleteSubscribersByQuery deletes subscribers by a given arbitrary query expression.
  366. func (c *Core) DeleteSubscribersByQuery(query string, listIDs []int) error {
  367. err := c.q.ExecSubQueryTpl(sanitizeSQLExp(query), c.q.DeleteSubscribersByQuery, listIDs, c.db)
  368. if err != nil {
  369. c.log.Printf("error deleting subscribers: %v", err)
  370. return echo.NewHTTPError(http.StatusInternalServerError,
  371. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  372. }
  373. return err
  374. }
  375. // UnsubscribeByCampaign unsubscibers a given subscriber from lists in a given campaign.
  376. func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) error {
  377. if _, err := c.q.UnsubscribeByCampaign.Exec(campUUID, subUUID, blocklist); err != nil {
  378. c.log.Printf("error unsubscribing: %v", err)
  379. return echo.NewHTTPError(http.StatusInternalServerError,
  380. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  381. }
  382. return nil
  383. }
  384. // ConfirmOptionSubscription confirms a subscriber's optin subscription.
  385. func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string) error {
  386. if _, err := c.q.ConfirmSubscriptionOptin.Exec(subUUID, pq.Array(listUUIDs)); err != nil {
  387. c.log.Printf("error confirming subscription: %v", err)
  388. return echo.NewHTTPError(http.StatusInternalServerError,
  389. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  390. }
  391. return nil
  392. }
  393. // DeleteSubscriberBounces deletes the given list of subscribers.
  394. func (c *Core) DeleteSubscriberBounces(id int, uuid string) error {
  395. var uu interface{}
  396. if uuid != "" {
  397. uu = uuid
  398. }
  399. if _, err := c.q.DeleteBouncesBySubscriber.Exec(id, uu); err != nil {
  400. c.log.Printf("error deleting bounces: %v", err)
  401. return echo.NewHTTPError(http.StatusInternalServerError,
  402. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.bounces}", "error", pqErrMsg(err)))
  403. }
  404. return nil
  405. }
  406. // DeleteOrphanSubscribers deletes orphan subscriber records (subscribers without lists).
  407. func (c *Core) DeleteOrphanSubscribers() (int, error) {
  408. res, err := c.q.DeleteOrphanSubscribers.Exec()
  409. if err != nil {
  410. c.log.Printf("error deleting orphan subscribers: %v", err)
  411. return 0, echo.NewHTTPError(http.StatusInternalServerError,
  412. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  413. }
  414. n, _ := res.RowsAffected()
  415. return int(n), nil
  416. }
  417. // DeleteBlocklistedSubscribers deletes blocklisted subscribers.
  418. func (c *Core) DeleteBlocklistedSubscribers() (int, error) {
  419. res, err := c.q.DeleteBlocklistedSubscribers.Exec()
  420. if err != nil {
  421. c.log.Printf("error deleting blocklisted subscribers: %v", err)
  422. return 0, echo.NewHTTPError(http.StatusInternalServerError,
  423. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  424. }
  425. n, _ := res.RowsAffected()
  426. return int(n), nil
  427. }