models.py 29 KB


  1. import enum
  2. import random
  3. import uuid
  4. import arrow
  5. import bcrypt
  6. from flask import url_for
  7. from flask_login import UserMixin
  8. from sqlalchemy import text, desc, CheckConstraint
  9. from sqlalchemy_utils import ArrowType
  10. from app import s3
  11. from app.config import (
  12. EMAIL_DOMAIN,
  13. MAX_NB_EMAIL_FREE_PLAN,
  14. URL,
  15. AVATAR_URL_EXPIRATION,
  16. JOB_ONBOARDING_1,
  17. )
  18. from app.extensions import db
  19. from app.log import LOG
  20. from app.oauth_models import Scope
  21. from app.utils import convert_to_id, random_string, random_words, random_word
  22. class ModelMixin(object):
  23. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  24. created_at = db.Column(ArrowType, default=arrow.utcnow, nullable=False)
  25. updated_at = db.Column(ArrowType, default=None, onupdate=arrow.utcnow)
  26. _repr_hide = ["created_at", "updated_at"]
  27. @classmethod
  28. def query(cls):
  29. return db.session.query(cls)
  30. @classmethod
  31. def get(cls, id):
  32. return cls.query.get(id)
  33. @classmethod
  34. def get_by(cls, **kw):
  35. return cls.query.filter_by(**kw).first()
  36. @classmethod
  37. def filter_by(cls, **kw):
  38. return cls.query.filter_by(**kw)
  39. @classmethod
  40. def get_or_create(cls, **kw):
  41. r = cls.get_by(**kw)
  42. if not r:
  43. r = cls(**kw)
  44. db.session.add(r)
  45. return r
  46. @classmethod
  47. def create(cls, **kw):
  48. r = cls(**kw)
  49. db.session.add(r)
  50. return r
  51. def save(self):
  52. db.session.add(self)
  53. @classmethod
  54. def delete(cls, obj_id):
  55. cls.query.filter(cls.id == obj_id).delete()
  56. def __repr__(self):
  57. values = ", ".join(
  58. "%s=%r" % (n, getattr(self, n))
  59. for n in self.__table__.c.keys()
  60. if n not in self._repr_hide
  61. )
  62. return "%s(%s)" % (self.__class__.__name__, values)
  63. class File(db.Model, ModelMixin):
  64. path = db.Column(db.String(128), unique=True, nullable=False)
  65. def get_url(self, expires_in=3600):
  66. return s3.get_url(self.path, expires_in)
  67. class PlanEnum(enum.Enum):
  68. monthly = 2
  69. yearly = 3
  70. class AliasGeneratorEnum(enum.Enum):
  71. word = 1 # aliases are generated based on random words
  72. uuid = 2 # aliases are generated based on uuid
  73. @classmethod
  74. def has_value(cls, value: int) -> bool:
  75. return value in set(item.value for item in cls)
  76. class User(db.Model, ModelMixin, UserMixin):
  77. __tablename__ = "users"
  78. email = db.Column(db.String(256), unique=True, nullable=False)
  79. salt = db.Column(db.String(128), nullable=True)
  80. password = db.Column(db.String(128), nullable=True)
  81. name = db.Column(db.String(128), nullable=False)
  82. is_admin = db.Column(db.Boolean, nullable=False, default=False)
  83. alias_generator = db.Column(
  84. db.Integer,
  85. nullable=False,
  86. default=AliasGeneratorEnum.word.value,
  87. server_default=str(AliasGeneratorEnum.word.value),
  88. )
  89. notification = db.Column(
  90. db.Boolean, default=True, nullable=False, server_default="1"
  91. )
  92. activated = db.Column(db.Boolean, default=False, nullable=False)
  93. profile_picture_id = db.Column(db.ForeignKey(File.id), nullable=True)
  94. otp_secret = db.Column(db.String(16), nullable=True)
  95. enable_otp = db.Column(
  96. db.Boolean, nullable=False, default=False, server_default="0"
  97. )
  98. # some users could have lifetime premium
  99. lifetime = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
  100. # user can use all premium features until this date
  101. trial_end = db.Column(
  102. ArrowType, default=lambda: arrow.now().shift(days=7, hours=1), nullable=True
  103. )
  104. # the mailbox used when create random alias
  105. # this field is nullable but in practice, it's always set
  106. # it cannot be set to non-nullable though
  107. # as this will create foreign key cycle between User and Mailbox
  108. default_mailbox_id = db.Column(
  109. db.ForeignKey("mailbox.id"), nullable=True, default=None
  110. )
  111. profile_picture = db.relationship(File)
  112. @classmethod
  113. def create(cls, email, name, password=None, **kwargs):
  114. user: User = super(User, cls).create(email=email, name=name, **kwargs)
  115. if password:
  116. user.set_password(password)
  117. db.session.flush()
  118. mb = Mailbox.create(user_id=user.id, email=user.email, verified=True)
  119. db.session.flush()
  120. user.default_mailbox_id = mb.id
  121. # create a first alias mail to show user how to use when they login
  122. GenEmail.create_new(user, prefix="my-first-alias", mailbox_id=mb.id)
  123. db.session.flush()
  124. # Schedule onboarding emails
  125. Job.create(
  126. name=JOB_ONBOARDING_1,
  127. payload={"user_id": user.id},
  128. run_at=arrow.now().shift(days=1),
  129. )
  130. db.session.flush()
  131. return user
  132. def lifetime_or_active_subscription(self) -> bool:
  133. """True if user has lifetime licence or active subscription"""
  134. if self.lifetime:
  135. return True
  136. sub: Subscription = self.get_subscription()
  137. if sub:
  138. return True
  139. manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
  140. if manual_sub and manual_sub.end_at > arrow.now():
  141. return True
  142. return False
  143. def in_trial(self):
  144. """return True if user does not have lifetime licence or an active subscription AND is in trial period"""
  145. if self.lifetime_or_active_subscription():
  146. return False
  147. if self.trial_end and arrow.now() < self.trial_end:
  148. return True
  149. return False
  150. def should_upgrade(self):
  151. if self.lifetime_or_active_subscription():
  152. # user who has canceled can also re-subscribe
  153. sub: Subscription = self.get_subscription()
  154. if sub and sub.cancelled:
  155. return True
  156. return False
  157. return True
  158. def next_bill_date(self) -> str:
  159. sub: Subscription = self.get_subscription()
  160. if sub:
  161. return sub.next_bill_date.strftime("%Y-%m-%d")
  162. LOG.error(
  163. f"next_bill_date() should be called only on user with active subscription. User {self}"
  164. )
  165. return ""
  166. def is_cancel(self) -> bool:
  167. """User has canceled their subscription but the subscription is still active,
  168. i.e. next_bill_date > now"""
  169. sub: Subscription = self.get_subscription()
  170. if sub and sub.cancelled:
  171. return True
  172. return False
  173. def is_premium(self) -> bool:
  174. """
  175. user is premium if they:
  176. - have a lifetime deal or
  177. - in trial period or
  178. - active subscription
  179. """
  180. if self.lifetime_or_active_subscription():
  181. return True
  182. if self.trial_end and arrow.now() < self.trial_end:
  183. return True
  184. return False
  185. def can_create_new_alias(self) -> bool:
  186. if self.is_premium():
  187. return True
  188. return GenEmail.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
  189. def set_password(self, password):
  190. salt = bcrypt.gensalt()
  191. password_hash = bcrypt.hashpw(password.encode(), salt).decode()
  192. self.salt = salt.decode()
  193. self.password = password_hash
  194. def check_password(self, password) -> bool:
  195. if not self.password:
  196. return False
  197. password_hash = bcrypt.hashpw(password.encode(), self.salt.encode())
  198. return self.password.encode() == password_hash
  199. def profile_picture_url(self):
  200. if self.profile_picture_id:
  201. return self.profile_picture.get_url()
  202. else:
  203. return url_for("static", filename="default-avatar.png")
  204. def suggested_emails(self, website_name) -> (str, [str]):
  205. """return suggested email and other email choices """
  206. website_name = convert_to_id(website_name)
  207. all_gen_emails = [ge.email for ge in GenEmail.filter_by(user_id=self.id)]
  208. if self.can_create_new_alias():
  209. suggested_gen_email = GenEmail.create_new(self, prefix=website_name).email
  210. else:
  211. # pick an email from the list of gen emails
  212. suggested_gen_email = random.choice(all_gen_emails)
  213. return (
  214. suggested_gen_email,
  215. list(set(all_gen_emails).difference({suggested_gen_email})),
  216. )
  217. def suggested_names(self) -> (str, [str]):
  218. """return suggested name and other name choices """
  219. other_name = convert_to_id(self.name)
  220. return self.name, [other_name, "Anonymous", "whoami"]
  221. def get_name_initial(self) -> str:
  222. names = self.name.split(" ")
  223. return "".join([n[0].upper() for n in names if n])
  224. def get_subscription(self):
  225. """return *active* subscription
  226. TODO: support user unsubscribe and re-subscribe
  227. """
  228. sub = Subscription.get_by(user_id=self.id)
  229. if sub and sub.cancelled:
  230. # sub is active until the next billing_date + 1
  231. if sub.next_bill_date >= arrow.now().shift(days=-1).date():
  232. return sub
  233. # past subscription, user is considered not having a subscription = free plan
  234. else:
  235. return None
  236. else:
  237. return sub
  238. def verified_custom_domains(self):
  239. return CustomDomain.query.filter_by(user_id=self.id, verified=True).all()
  240. def mailboxes(self) -> [str]:
  241. """list of mailbox emails that user own"""
  242. mailboxes = []
  243. for mailbox in Mailbox.query.filter_by(user_id=self.id, verified=True):
  244. mailboxes.append(mailbox.email)
  245. return mailboxes
  246. def __repr__(self):
  247. return f"<User {self.id} {self.name} {self.email}>"
  248. def _expiration_1h():
  249. return arrow.now().shift(hours=1)
  250. def _expiration_12h():
  251. return arrow.now().shift(hours=12)
  252. def _expiration_5m():
  253. return arrow.now().shift(minutes=5)
  254. class ActivationCode(db.Model, ModelMixin):
  255. """For activate user account"""
  256. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  257. code = db.Column(db.String(128), unique=True, nullable=False)
  258. user = db.relationship(User)
  259. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  260. def is_expired(self):
  261. return self.expired < arrow.now()
  262. class ResetPasswordCode(db.Model, ModelMixin):
  263. """For resetting password"""
  264. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  265. code = db.Column(db.String(128), unique=True, nullable=False)
  266. user = db.relationship(User)
  267. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  268. def is_expired(self):
  269. return self.expired < arrow.now()
  270. class SocialAuth(db.Model, ModelMixin):
  271. """Store how user authenticates with social login"""
  272. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  273. # name of the social login used, could be facebook, google or github
  274. social = db.Column(db.String(128), nullable=False)
  275. __table_args__ = (db.UniqueConstraint("user_id", "social", name="uq_social_auth"),)
  276. # <<< OAUTH models >>>
  277. def generate_oauth_client_id(client_name) -> str:
  278. oauth_client_id = convert_to_id(client_name) + "-" + random_string()
  279. # check that the client does not exist yet
  280. if not Client.get_by(oauth_client_id=oauth_client_id):
  281. LOG.debug("generate oauth_client_id %s", oauth_client_id)
  282. return oauth_client_id
  283. # Rerun the function
  284. LOG.warning(
  285. "client_id %s already exists, generate a new client_id", oauth_client_id
  286. )
  287. return generate_oauth_client_id(client_name)
  288. class Client(db.Model, ModelMixin):
  289. oauth_client_id = db.Column(db.String(128), unique=True, nullable=False)
  290. oauth_client_secret = db.Column(db.String(128), nullable=False)
  291. name = db.Column(db.String(128), nullable=False)
  292. home_url = db.Column(db.String(1024))
  293. published = db.Column(db.Boolean, default=False, nullable=False)
  294. # user who created this client
  295. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  296. icon_id = db.Column(db.ForeignKey(File.id), nullable=True)
  297. icon = db.relationship(File)
  298. def nb_user(self):
  299. return ClientUser.filter_by(client_id=self.id).count()
  300. def get_scopes(self) -> [Scope]:
  301. # todo: client can choose which scopes they want to have access
  302. return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL]
  303. @classmethod
  304. def create_new(cls, name, user_id) -> "Client":
  305. # generate a client-id
  306. oauth_client_id = generate_oauth_client_id(name)
  307. oauth_client_secret = random_string(40)
  308. client = Client.create(
  309. name=name,
  310. oauth_client_id=oauth_client_id,
  311. oauth_client_secret=oauth_client_secret,
  312. user_id=user_id,
  313. )
  314. return client
  315. def get_icon_url(self):
  316. if self.icon_id:
  317. return self.icon.get_url()
  318. else:
  319. return URL + "/static/default-icon.svg"
  320. def last_user_login(self) -> "ClientUser":
  321. client_user = (
  322. ClientUser.query.filter(ClientUser.client_id == self.id)
  323. .order_by(ClientUser.updated_at)
  324. .first()
  325. )
  326. if client_user:
  327. return client_user
  328. return None
  329. class RedirectUri(db.Model, ModelMixin):
  330. """Valid redirect uris for a client"""
  331. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  332. uri = db.Column(db.String(1024), nullable=False)
  333. client = db.relationship(Client, backref="redirect_uris")
  334. class AuthorizationCode(db.Model, ModelMixin):
  335. code = db.Column(db.String(128), unique=True, nullable=False)
  336. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  337. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  338. scope = db.Column(db.String(128))
  339. redirect_uri = db.Column(db.String(1024))
  340. # what is the input response_type, e.g. "code", "code,id_token", ...
  341. response_type = db.Column(db.String(128))
  342. user = db.relationship(User, lazy=False)
  343. client = db.relationship(Client, lazy=False)
  344. expired = db.Column(ArrowType, nullable=False, default=_expiration_5m)
  345. def is_expired(self):
  346. return self.expired < arrow.now()
  347. class OauthToken(db.Model, ModelMixin):
  348. access_token = db.Column(db.String(128), unique=True)
  349. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  350. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  351. scope = db.Column(db.String(128))
  352. redirect_uri = db.Column(db.String(1024))
  353. # what is the input response_type, e.g. "token", "token,id_token", ...
  354. response_type = db.Column(db.String(128))
  355. user = db.relationship(User)
  356. client = db.relationship(Client)
  357. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  358. def is_expired(self):
  359. return self.expired < arrow.now()
  360. def generate_email(
  361. scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False
  362. ) -> str:
  363. """generate an email address that does not exist before
  364. :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
  365. :type in_hex: bool, if the generate scheme is uuid, is hex favorable?
  366. """
  367. if scheme == AliasGeneratorEnum.uuid.value:
  368. name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
  369. random_email = name + "@" + EMAIL_DOMAIN
  370. else:
  371. random_email = random_words() + "@" + EMAIL_DOMAIN
  372. # check that the client does not exist yet
  373. if not GenEmail.get_by(email=random_email) and not DeletedAlias.get_by(
  374. email=random_email
  375. ):
  376. LOG.debug("generate email %s", random_email)
  377. return random_email
  378. # Rerun the function
  379. LOG.warning("email %s already exists, generate a new email", random_email)
  380. return generate_email(scheme=scheme, in_hex=in_hex)
  381. class GenEmail(db.Model, ModelMixin):
  382. """Generated email"""
  383. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  384. email = db.Column(db.String(128), unique=True, nullable=False)
  385. enabled = db.Column(db.Boolean(), default=True, nullable=False)
  386. custom_domain_id = db.Column(
  387. db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=True
  388. )
  389. # To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature
  390. automatic_creation = db.Column(
  391. db.Boolean, nullable=False, default=False, server_default="0"
  392. )
  393. # to know whether an alias belongs to a directory
  394. directory_id = db.Column(
  395. db.ForeignKey("directory.id", ondelete="cascade"), nullable=True
  396. )
  397. note = db.Column(db.Text, default=None, nullable=True)
  398. # an alias can be owned by another mailbox
  399. mailbox_id = db.Column(
  400. db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=False
  401. )
  402. user = db.relationship(User)
  403. mailbox = db.relationship("Mailbox")
  404. @classmethod
  405. def create_new(cls, user, prefix, note=None, mailbox_id=None):
  406. if not prefix:
  407. raise Exception("alias prefix cannot be empty")
  408. # find the right suffix - avoid infinite loop by running this at max 1000 times
  409. for i in range(1000):
  410. suffix = random_word()
  411. email = f"{prefix}.{suffix}@{EMAIL_DOMAIN}"
  412. if not cls.get_by(email=email):
  413. break
  414. return GenEmail.create(
  415. user_id=user.id,
  416. email=email,
  417. note=note,
  418. mailbox_id=mailbox_id or user.default_mailbox_id,
  419. )
  420. @classmethod
  421. def create_new_random(
  422. cls, user, scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False
  423. ):
  424. """create a new random alias"""
  425. random_email = generate_email(scheme=scheme, in_hex=in_hex)
  426. return GenEmail.create(
  427. user_id=user.id, email=random_email, mailbox_id=user.default_mailbox_id
  428. )
  429. def mailbox_email(self):
  430. if self.mailbox_id:
  431. return self.mailbox.email
  432. else:
  433. return self.user.email
  434. def __repr__(self):
  435. return f"<GenEmail {self.id} {self.email}>"
  436. class ClientUser(db.Model, ModelMixin):
  437. __table_args__ = (
  438. db.UniqueConstraint("user_id", "client_id", name="uq_client_user"),
  439. )
  440. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  441. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  442. # Null means client has access to user original email
  443. gen_email_id = db.Column(
  444. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=True
  445. )
  446. # user can decide to send to client another name
  447. name = db.Column(
  448. db.String(128), nullable=True, default=None, server_default=text("NULL")
  449. )
  450. # user can decide to send to client a default avatar
  451. default_avatar = db.Column(
  452. db.Boolean, nullable=False, default=False, server_default="0"
  453. )
  454. gen_email = db.relationship(GenEmail, backref="client_users")
  455. user = db.relationship(User)
  456. client = db.relationship(Client)
  457. def get_email(self):
  458. return self.gen_email.email if self.gen_email_id else self.user.email
  459. def get_user_name(self):
  460. if self.name:
  461. return self.name
  462. else:
  463. return self.user.name
  464. def get_user_info(self) -> dict:
  465. """return user info according to client scope
  466. Return dict with key being scope name. For now all the fields are the same for all clients:
  467. {
  468. "client": "Demo",
  469. "email": "test-avk5l@mail-tester.com",
  470. "email_verified": true,
  471. "id": 1,
  472. "name": "Son GM",
  473. "avatar_url": "http://s3..."
  474. }
  475. """
  476. res = {
  477. "id": self.id,
  478. "client": self.client.name,
  479. "email_verified": True,
  480. "sub": str(self.id),
  481. }
  482. for scope in self.client.get_scopes():
  483. if scope == Scope.NAME:
  484. if self.name:
  485. res[Scope.NAME.value] = self.name
  486. else:
  487. res[Scope.NAME.value] = self.user.name
  488. elif scope == Scope.AVATAR_URL:
  489. if self.user.profile_picture_id:
  490. if self.default_avatar:
  491. res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
  492. else:
  493. res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
  494. AVATAR_URL_EXPIRATION
  495. )
  496. else:
  497. res[Scope.AVATAR_URL.value] = None
  498. elif scope == Scope.EMAIL:
  499. # Use generated email
  500. if self.gen_email_id:
  501. LOG.debug(
  502. "Use gen email for user %s, client %s", self.user, self.client
  503. )
  504. res[Scope.EMAIL.value] = self.gen_email.email
  505. # Use user original email
  506. else:
  507. res[Scope.EMAIL.value] = self.user.email
  508. return res
  509. class ForwardEmail(db.Model, ModelMixin):
  510. """
  511. Emails that are forwarded through SL: email that is sent by website to user via SL alias
  512. """
  513. __table_args__ = (
  514. db.UniqueConstraint("gen_email_id", "website_email", name="uq_forward_email"),
  515. )
  516. gen_email_id = db.Column(
  517. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=False
  518. )
  519. # used to be envelope header, should be mail header from instead
  520. website_email = db.Column(db.String(256), nullable=False)
  521. # the email from header, e.g. AB CD <ab@cd.com>
  522. # nullable as this field is added after website_email
  523. website_from = db.Column(db.String(256), nullable=True)
  524. # when user clicks on "reply", they will reply to this address.
  525. # This address allows to hide user personal email
  526. # this reply email is created every time a website sends an email to user
  527. # it has the prefix "reply+" to distinguish with other email
  528. reply_email = db.Column(db.String(256), nullable=False)
  529. gen_email = db.relationship(GenEmail, backref="forward_emails")
  530. def website_send_to(self):
  531. """return the email address with name.
  532. to use when user wants to send an email from the alias"""
  533. from app.email_utils import get_email_name
  534. if self.website_from:
  535. name = get_email_name(self.website_from)
  536. if name:
  537. return name + " " + self.website_email + f" <{self.reply_email}>"
  538. return self.website_email.replace("@", " at ") + f" <{self.reply_email}>"
  539. def last_reply(self) -> "ForwardEmailLog":
  540. """return the most recent reply"""
  541. return (
  542. ForwardEmailLog.query.filter_by(forward_id=self.id, is_reply=True)
  543. .order_by(desc(ForwardEmailLog.created_at))
  544. .first()
  545. )
  546. class ForwardEmailLog(db.Model, ModelMixin):
  547. forward_id = db.Column(
  548. db.ForeignKey(ForwardEmail.id, ondelete="cascade"), nullable=False
  549. )
  550. # whether this is a reply
  551. is_reply = db.Column(db.Boolean, nullable=False, default=False)
  552. # for ex if alias is disabled, this forwarding is blocked
  553. blocked = db.Column(db.Boolean, nullable=False, default=False)
  554. # can happen when user email service refuses the forwarded email
  555. # usually because the forwarded email is too spammy
  556. bounced = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  557. class Subscription(db.Model, ModelMixin):
  558. # Come from Paddle
  559. cancel_url = db.Column(db.String(1024), nullable=False)
  560. update_url = db.Column(db.String(1024), nullable=False)
  561. subscription_id = db.Column(db.String(1024), nullable=False, unique=True)
  562. event_time = db.Column(ArrowType, nullable=False)
  563. next_bill_date = db.Column(db.Date, nullable=False)
  564. cancelled = db.Column(db.Boolean, nullable=False, default=False)
  565. plan = db.Column(db.Enum(PlanEnum), nullable=False)
  566. user_id = db.Column(
  567. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  568. )
  569. user = db.relationship(User)
  570. def plan_name(self):
  571. if self.plan == PlanEnum.monthly:
  572. return "Monthly ($2.99/month)"
  573. else:
  574. return "Yearly ($29.99/year)"
  575. class ManualSubscription(db.Model, ModelMixin):
  576. """
  577. For users who use other forms of payment and therefore not pass by Paddle
  578. """
  579. user_id = db.Column(
  580. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  581. )
  582. # an reminder is sent several days before the subscription ends
  583. end_at = db.Column(ArrowType, nullable=False)
  584. # for storing note about this subscription
  585. comment = db.Column(db.Text, nullable=True)
  586. user = db.relationship(User)
  587. class DeletedAlias(db.Model, ModelMixin):
  588. """Store all deleted alias to make sure they are NOT reused"""
  589. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  590. email = db.Column(db.String(256), unique=True, nullable=False)
  591. class EmailChange(db.Model, ModelMixin):
  592. """Used when user wants to update their email"""
  593. user_id = db.Column(
  594. db.ForeignKey(User.id, ondelete="cascade"),
  595. nullable=False,
  596. unique=True,
  597. index=True,
  598. )
  599. new_email = db.Column(db.String(256), unique=True, nullable=False)
  600. code = db.Column(db.String(128), unique=True, nullable=False)
  601. expired = db.Column(ArrowType, nullable=False, default=_expiration_12h)
  602. user = db.relationship(User)
  603. def is_expired(self):
  604. return self.expired < arrow.now()
  605. class AliasUsedOn(db.Model, ModelMixin):
  606. """Used to know where an alias is created"""
  607. __table_args__ = (
  608. db.UniqueConstraint("gen_email_id", "hostname", name="uq_alias_used"),
  609. )
  610. gen_email_id = db.Column(
  611. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=False
  612. )
  613. hostname = db.Column(db.String(1024), nullable=False)
  614. class ApiKey(db.Model, ModelMixin):
  615. """used in browser extension to identify user"""
  616. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  617. code = db.Column(db.String(128), unique=True, nullable=False)
  618. name = db.Column(db.String(128), nullable=False)
  619. last_used = db.Column(ArrowType, default=None)
  620. times = db.Column(db.Integer, default=0, nullable=False)
  621. user = db.relationship(User)
  622. @classmethod
  623. def create(cls, user_id, name):
  624. # generate unique code
  625. found = False
  626. while not found:
  627. code = random_string(60)
  628. if not cls.get_by(code=code):
  629. found = True
  630. a = cls(user_id=user_id, code=code, name=name)
  631. db.session.add(a)
  632. return a
  633. class CustomDomain(db.Model, ModelMixin):
  634. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  635. domain = db.Column(db.String(128), unique=True, nullable=False)
  636. verified = db.Column(db.Boolean, nullable=False, default=False)
  637. dkim_verified = db.Column(
  638. db.Boolean, nullable=False, default=False, server_default="0"
  639. )
  640. spf_verified = db.Column(
  641. db.Boolean, nullable=False, default=False, server_default="0"
  642. )
  643. # an alias is created automatically the first time it receives an email
  644. catch_all = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  645. user = db.relationship(User)
  646. def nb_alias(self):
  647. return GenEmail.filter_by(custom_domain_id=self.id).count()
  648. def __repr__(self):
  649. return f"<Custom Domain {self.domain}>"
  650. class LifetimeCoupon(db.Model, ModelMixin):
  651. code = db.Column(db.String(128), nullable=False, unique=True)
  652. nb_used = db.Column(db.Integer, nullable=False)
  653. class Directory(db.Model, ModelMixin):
  654. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  655. name = db.Column(db.String(128), unique=True, nullable=False)
  656. user = db.relationship(User)
  657. def nb_alias(self):
  658. return GenEmail.filter_by(directory_id=self.id).count()
  659. def __repr__(self):
  660. return f"<Directory {self.name}>"
  661. class Job(db.Model, ModelMixin):
  662. """Used to schedule one-time job in the future"""
  663. name = db.Column(db.String(128), nullable=False)
  664. payload = db.Column(db.JSON)
  665. # whether the job has been taken by the job runner
  666. taken = db.Column(db.Boolean, default=False, nullable=False)
  667. run_at = db.Column(ArrowType)
  668. def __repr__(self):
  669. return f"<Job {self.id} {self.name} {self.payload}>"
  670. class Mailbox(db.Model, ModelMixin):
  671. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  672. email = db.Column(db.String(256), unique=True, nullable=False)
  673. verified = db.Column(db.Boolean, default=False, nullable=False)
  674. # used when user wants to update mailbox email
  675. new_email = db.Column(db.String(256), unique=True)
  676. def nb_alias(self):
  677. return GenEmail.filter_by(mailbox_id=self.id).count()
  678. def __repr__(self):
  679. return f"<Mailbox {self.email}>"
  680. class AccountActivation(db.Model, ModelMixin):
  681. """contains code to activate the user account when they sign up on mobile"""
  682. user_id = db.Column(
  683. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  684. )
  685. # the activation code is usually 6 digits
  686. code = db.Column(db.String(10), nullable=False)
  687. # nb tries decrements each time user enters wrong code
  688. tries = db.Column(db.Integer, default=3, nullable=False)
  689. __table_args__ = (
  690. CheckConstraint(tries >= 0, name="account_activation_tries_positive"),
  691. {},
  692. )