subscribers.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package core
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "github.com/gofrs/uuid"
  10. "github.com/knadh/listmonk/models"
  11. "github.com/labstack/echo/v4"
  12. "github.com/lib/pq"
  13. )
  14. var (
  15. subQuerySortFields = []string{"email", "name", "created_at", "updated_at"}
  16. )
  17. // GetSubscriber fetches a subscriber by one of the given params.
  18. func (c *Core) GetSubscriber(id int, uuid, email string) (models.Subscriber, error) {
  19. var uu interface{}
  20. if uuid != "" {
  21. uu = uuid
  22. }
  23. var out models.Subscribers
  24. if err := c.q.GetSubscriber.Select(&out, id, uu, email); err != nil {
  25. c.log.Printf("error fetching subscriber: %v", err)
  26. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  27. c.i18n.Ts("globals.messages.errorFetching",
  28. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  29. }
  30. if len(out) == 0 {
  31. return models.Subscriber{}, echo.NewHTTPError(http.StatusBadRequest,
  32. c.i18n.Ts("globals.messages.notFound", "name", "{globals.terms.subscriber}"))
  33. }
  34. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  35. c.log.Printf("error loading subscriber lists: %v", err)
  36. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  37. c.i18n.Ts("globals.messages.errorFetching",
  38. "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  39. }
  40. return out[0], nil
  41. }
  42. // GetSubscribersByEmail fetches a subscriber by one of the given params.
  43. func (c *Core) GetSubscribersByEmail(emails []string) (models.Subscribers, error) {
  44. var out models.Subscribers
  45. if err := c.q.GetSubscribersByEmails.Select(&out, pq.Array(emails)); err != nil {
  46. c.log.Printf("error fetching subscriber: %v", err)
  47. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  48. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  49. }
  50. if len(out) == 0 {
  51. return nil, echo.NewHTTPError(http.StatusBadRequest, c.i18n.T("campaigns.noKnownSubsToTest"))
  52. }
  53. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  54. c.log.Printf("error loading subscriber lists: %v", err)
  55. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  56. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
  57. }
  58. return out, nil
  59. }
  60. // QuerySubscribers queries and returns paginated subscrribers based on the given params including the total count.
  61. func (c *Core) QuerySubscribers(query string, listIDs []int, order, orderBy string, offset, limit int) (models.Subscribers, int, error) {
  62. // There's an arbitrary query condition.
  63. cond := ""
  64. if query != "" {
  65. cond = " AND " + query
  66. }
  67. // Sort params.
  68. if !strSliceContains(orderBy, subQuerySortFields) {
  69. orderBy = "subscribers.id"
  70. }
  71. if order != SortAsc && order != SortDesc {
  72. order = SortDesc
  73. }
  74. // Required for pq.Array()
  75. if listIDs == nil {
  76. listIDs = []int{}
  77. }
  78. // Create a readonly transaction that just does COUNT() to obtain the count of results
  79. // and to ensure that the arbitrary query is indeed readonly.
  80. stmt := fmt.Sprintf(c.q.QuerySubscribersCount, cond)
  81. tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  82. if err != nil {
  83. c.log.Printf("error preparing subscriber query: %v", err)
  84. return nil, 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  85. }
  86. defer tx.Rollback()
  87. // Execute the readonly query and get the count of results.
  88. total := 0
  89. if err := tx.Get(&total, stmt, pq.Array(listIDs)); err != nil {
  90. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  91. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  92. }
  93. // No results.
  94. if total == 0 {
  95. return models.Subscribers{}, 0, nil
  96. }
  97. // Run the query again and fetch the actual data. stmt is the raw SQL query.
  98. var out models.Subscribers
  99. stmt = fmt.Sprintf(c.q.QuerySubscribers, cond, orderBy, order)
  100. if err := tx.Select(&out, stmt, pq.Array(listIDs), offset, limit); err != nil {
  101. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  102. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  103. }
  104. // Lazy load lists for each subscriber.
  105. if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
  106. c.log.Printf("error fetching subscriber lists: %v", err)
  107. return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
  108. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  109. }
  110. return out, total, nil
  111. }
  112. // GetSubscriberLists returns a subscriber's lists based on the given conditions.
  113. func (c *Core) GetSubscriberLists(subID int, uuid string, listIDs []int, listUUIDs []string, subStatus string, listType string) ([]models.List, error) {
  114. if listIDs == nil {
  115. listIDs = []int{}
  116. }
  117. if listUUIDs == nil {
  118. listUUIDs = []string{}
  119. }
  120. var uu interface{}
  121. if uuid != "" {
  122. uu = uuid
  123. }
  124. // Fetch double opt-in lists from the given list IDs.
  125. // Get the list of subscription lists where the subscriber hasn't confirmed.
  126. out := []models.List{}
  127. if err := c.q.GetSubscriberLists.Select(&out, subID, uu, pq.Array(listIDs), pq.Array(listUUIDs), subStatus, listType); err != nil {
  128. c.log.Printf("error fetching lists for opt-in: %s", pqErrMsg(err))
  129. return nil, err
  130. }
  131. return out, nil
  132. }
  133. // GetSubscriberProfileForExport returns the subscriber's profile data as a JSON exportable.
  134. // Get the subscriber's data. A single query that gets the profile, list subscriptions, campaign views,
  135. // and link clicks. Names of private lists are replaced with "Private list".
  136. func (c *Core) GetSubscriberProfileForExport(id int, uuid string) (models.SubscriberExportProfile, error) {
  137. var uu interface{}
  138. if uuid != "" {
  139. uu = uuid
  140. }
  141. var out models.SubscriberExportProfile
  142. if err := c.q.ExportSubscriberData.Get(&out, id, uu); err != nil {
  143. c.log.Printf("error fetching subscriber export data: %v", err)
  144. return models.SubscriberExportProfile{}, echo.NewHTTPError(http.StatusInternalServerError,
  145. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", err.Error()))
  146. }
  147. return out, nil
  148. }
  149. // ExportSubscribers returns an iterator function that provides lists of subscribers based
  150. // on the given criteria in an exportable form. The iterator function returned can be called
  151. // repeatedly until there are nil subscribers. It's an iterator because exports can be extremely
  152. // large and may have to be fetched in batches from the DB and streamed somewhere.
  153. func (c *Core) ExportSubscribers(query string, subIDs, listIDs []int, batchSize int) (func() ([]models.SubscriberExport, error), error) {
  154. // There's an arbitrary query condition.
  155. cond := ""
  156. if query != "" {
  157. cond = " AND " + query
  158. }
  159. stmt := fmt.Sprintf(c.q.QuerySubscribersForExport, cond)
  160. // Verify that the arbitrary SQL search expression is read only.
  161. if cond != "" {
  162. tx, err := c.db.Unsafe().BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
  163. if err != nil {
  164. c.log.Printf("error preparing subscriber query: %v", err)
  165. return nil, echo.NewHTTPError(http.StatusBadRequest,
  166. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  167. }
  168. defer tx.Rollback()
  169. if _, err := tx.Query(stmt, nil, 0, nil, 1); err != nil {
  170. return nil, echo.NewHTTPError(http.StatusBadRequest,
  171. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  172. }
  173. }
  174. if subIDs == nil {
  175. subIDs = []int{}
  176. }
  177. if listIDs == nil {
  178. listIDs = []int{}
  179. }
  180. // Prepare the actual query statement.
  181. tx, err := c.db.Preparex(stmt)
  182. if err != nil {
  183. c.log.Printf("error preparing subscriber query: %v", err)
  184. return nil, echo.NewHTTPError(http.StatusBadRequest,
  185. c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
  186. }
  187. id := 0
  188. return func() ([]models.SubscriberExport, error) {
  189. var out []models.SubscriberExport
  190. if err := tx.Select(&out, pq.Array(listIDs), id, pq.Array(subIDs), batchSize); err != nil {
  191. c.log.Printf("error exporting subscribers by query: %v", err)
  192. return nil, echo.NewHTTPError(http.StatusInternalServerError,
  193. c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  194. }
  195. if len(out) == 0 {
  196. return nil, nil
  197. }
  198. id = out[len(out)-1].ID
  199. return out, nil
  200. }, nil
  201. }
  202. // insertSubscriber inserts a subscriber and returns the ID. The first bool indicates if
  203. // it was a new subscriber, and the second bool indicates if the subscriber was sent an optin confirmation.
  204. // 1st bool = isNew?, 2nd bool = optinSent?
  205. func (c *Core) CreateSubscriber(sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm bool) (models.Subscriber, bool, bool, error) {
  206. uu, err := uuid.NewV4()
  207. if err != nil {
  208. c.log.Printf("error generating UUID: %v", err)
  209. return models.Subscriber{}, false, false, echo.NewHTTPError(http.StatusInternalServerError,
  210. c.i18n.Ts("globals.messages.errorUUID", "error", err.Error()))
  211. }
  212. sub.UUID = uu.String()
  213. var (
  214. isNew = true
  215. subStatus = models.SubscriptionStatusUnconfirmed
  216. )
  217. if preconfirm {
  218. subStatus = models.SubscriptionStatusConfirmed
  219. }
  220. if sub.Status == "" {
  221. sub.Status = models.UserStatusEnabled
  222. }
  223. // For pq.Array()
  224. if listIDs == nil {
  225. listIDs = []int{}
  226. }
  227. if listUUIDs == nil {
  228. listUUIDs = []string{}
  229. }
  230. if err = c.q.InsertSubscriber.Get(&sub.ID,
  231. sub.UUID,
  232. sub.Email,
  233. strings.TrimSpace(sub.Name),
  234. sub.Status,
  235. sub.Attribs,
  236. pq.Array(listIDs),
  237. pq.Array(listUUIDs),
  238. subStatus); err != nil {
  239. if pqErr, ok := err.(*pq.Error); ok && pqErr.Constraint == "subscribers_email_key" {
  240. isNew = false
  241. } else {
  242. // return sub.Subscriber, errSubscriberExists
  243. c.log.Printf("error inserting subscriber: %v", err)
  244. return models.Subscriber{}, false, false, echo.NewHTTPError(http.StatusInternalServerError,
  245. c.i18n.Ts("globals.messages.errorCreating",
  246. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  247. }
  248. }
  249. // Fetch the subscriber'out full data. If the subscriber already existed and wasn't
  250. // created, the id will be empty. Fetch the details by e-mail then.
  251. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  252. if err != nil {
  253. return models.Subscriber{}, false, false, err
  254. }
  255. hasOptin := false
  256. if !preconfirm && c.constants.SendOptinConfirmation {
  257. // Send a confirmation e-mail (if there are any double opt-in lists).
  258. num, _ := c.h.SendOptinConfirmation(out, listIDs)
  259. hasOptin = num > 0
  260. }
  261. return out, isNew, hasOptin, nil
  262. }
  263. // UpdateSubscriber updates a subscriber's properties.
  264. func (c *Core) UpdateSubscriber(id int, sub models.Subscriber, listIDs []int, preconfirm bool) (models.Subscriber, error) {
  265. subStatus := models.SubscriptionStatusUnconfirmed
  266. if preconfirm {
  267. subStatus = models.SubscriptionStatusConfirmed
  268. }
  269. // Format raw JSON attributes.
  270. attribs := []byte("{}")
  271. if len(sub.Attribs) > 0 {
  272. if b, err := json.Marshal(sub.Attribs); err != nil {
  273. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  274. c.i18n.Ts("globals.messages.errorUpdating",
  275. "name", "{globals.terms.subscriber}", "error", err.Error()))
  276. } else {
  277. attribs = b
  278. }
  279. }
  280. _, err := c.q.UpdateSubscriber.Exec(id,
  281. sub.Email,
  282. strings.TrimSpace(sub.Name),
  283. sub.Status,
  284. json.RawMessage(attribs),
  285. pq.Array(listIDs),
  286. subStatus)
  287. if err != nil {
  288. c.log.Printf("error updating subscriber: %v", err)
  289. return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
  290. c.i18n.Ts("globals.messages.errorUpdating",
  291. "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
  292. }
  293. out, err := c.GetSubscriber(sub.ID, "", sub.Email)
  294. if err != nil {
  295. return models.Subscriber{}, err
  296. }
  297. return out, nil
  298. }
  299. // BlocklistSubscribers blocklists the given list of subscribers.
  300. func (c *Core) BlocklistSubscribers(subIDs []int) error {
  301. if _, err := c.q.BlocklistSubscribers.Exec(pq.Array(subIDs)); err != nil {
  302. c.log.Printf("error blocklisting subscribers: %v", err)
  303. return echo.NewHTTPError(http.StatusInternalServerError,
  304. c.i18n.Ts("subscribers.errorBlocklisting", "error", err.Error()))
  305. }
  306. return nil
  307. }
  308. // BlocklistSubscribersByQuery blocklists the given list of subscribers.
  309. func (c *Core) BlocklistSubscribersByQuery(query string, listIDs []int) error {
  310. if err := c.q.ExecSubscriberQueryTpl(sanitizeSQLExp(query), c.q.BlocklistSubscribersByQuery, listIDs, c.db); err != nil {
  311. c.log.Printf("error blocklisting subscribers: %v", err)
  312. return echo.NewHTTPError(http.StatusInternalServerError,
  313. c.i18n.Ts("subscribers.errorBlocklisting", "error", pqErrMsg(err)))
  314. }
  315. return nil
  316. }
  317. // DeleteSubscribers deletes the given list of subscribers.
  318. func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
  319. if subIDs == nil {
  320. subIDs = []int{}
  321. }
  322. if subUUIDs == nil {
  323. subUUIDs = []string{}
  324. }
  325. if _, err := c.q.DeleteSubscribers.Exec(pq.Array(subIDs), pq.Array(subUUIDs)); err != nil {
  326. c.log.Printf("error deleting subscribers: %v", err)
  327. return echo.NewHTTPError(http.StatusInternalServerError,
  328. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  329. }
  330. return nil
  331. }
  332. // DeleteSubscribersByQuery deletes subscribers by a given arbitrary query expression.
  333. func (c *Core) DeleteSubscribersByQuery(query string, listIDs []int) error {
  334. err := c.q.ExecSubscriberQueryTpl(sanitizeSQLExp(query), c.q.DeleteSubscribersByQuery, listIDs, c.db)
  335. if err != nil {
  336. c.log.Printf("error deleting subscribers: %v", err)
  337. return echo.NewHTTPError(http.StatusInternalServerError,
  338. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  339. }
  340. return err
  341. }
  342. // UnsubscribeByCampaign unsubscibers a given subscriber from lists in a given campaign.
  343. func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) error {
  344. if _, err := c.q.Unsubscribe.Exec(campUUID, subUUID, blocklist); err != nil {
  345. c.log.Printf("error unsubscribing: %v", err)
  346. return echo.NewHTTPError(http.StatusInternalServerError,
  347. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  348. }
  349. return nil
  350. }
  351. // ConfirmOptionSubscription confirms a subscriber's optin subscription.
  352. func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string) error {
  353. if _, err := c.q.ConfirmSubscriptionOptin.Exec(subUUID, pq.Array(listUUIDs)); err != nil {
  354. c.log.Printf("error confirming subscription: %v", err)
  355. return echo.NewHTTPError(http.StatusInternalServerError,
  356. c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
  357. }
  358. return nil
  359. }
  360. // DeleteSubscriberBounces deletes the given list of subscribers.
  361. func (c *Core) DeleteSubscriberBounces(id int, uuid string) error {
  362. var uu interface{}
  363. if uuid != "" {
  364. uu = uuid
  365. }
  366. if _, err := c.q.DeleteBouncesBySubscriber.Exec(id, uu); err != nil {
  367. c.log.Printf("error deleting bounces: %v", err)
  368. return echo.NewHTTPError(http.StatusInternalServerError,
  369. c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.bounces}", "error", pqErrMsg(err)))
  370. }
  371. return nil
  372. }