models.py 26 KB


  1. import enum
  2. import random
  3. import uuid
  4. import arrow
  5. import bcrypt
  6. from flask import url_for
  7. from flask_login import UserMixin
  8. from sqlalchemy import text, desc
  9. from sqlalchemy_utils import ArrowType
  10. from app import s3
  11. from app.config import (
  12. EMAIL_DOMAIN,
  13. MAX_NB_EMAIL_FREE_PLAN,
  14. URL,
  15. AVATAR_URL_EXPIRATION,
  16. JOB_ONBOARDING_1,
  17. )
  18. from app.extensions import db
  19. from app.log import LOG
  20. from app.oauth_models import Scope
  21. from app.utils import convert_to_id, random_string, random_words, random_word
  22. class ModelMixin(object):
  23. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  24. created_at = db.Column(ArrowType, default=arrow.utcnow, nullable=False)
  25. updated_at = db.Column(ArrowType, default=None, onupdate=arrow.utcnow)
  26. _repr_hide = ["created_at", "updated_at"]
  27. @classmethod
  28. def query(cls):
  29. return db.session.query(cls)
  30. @classmethod
  31. def get(cls, id):
  32. return cls.query.get(id)
  33. @classmethod
  34. def get_by(cls, **kw):
  35. return cls.query.filter_by(**kw).first()
  36. @classmethod
  37. def filter_by(cls, **kw):
  38. return cls.query.filter_by(**kw)
  39. @classmethod
  40. def get_or_create(cls, **kw):
  41. r = cls.get_by(**kw)
  42. if not r:
  43. r = cls(**kw)
  44. db.session.add(r)
  45. return r
  46. @classmethod
  47. def create(cls, **kw):
  48. r = cls(**kw)
  49. db.session.add(r)
  50. return r
  51. def save(self):
  52. db.session.add(self)
  53. @classmethod
  54. def delete(cls, obj_id):
  55. cls.query.filter(cls.id == obj_id).delete()
  56. def __repr__(self):
  57. values = ", ".join(
  58. "%s=%r" % (n, getattr(self, n))
  59. for n in self.__table__.c.keys()
  60. if n not in self._repr_hide
  61. )
  62. return "%s(%s)" % (self.__class__.__name__, values)
  63. class File(db.Model, ModelMixin):
  64. path = db.Column(db.String(128), unique=True, nullable=False)
  65. def get_url(self, expires_in=3600):
  66. return s3.get_url(self.path, expires_in)
  67. class PlanEnum(enum.Enum):
  68. monthly = 2
  69. yearly = 3
  70. class AliasGeneratorEnum(enum.Enum):
  71. word = 1 # aliases are generated based on random words
  72. uuid = 2 # aliases are generated based on uuid
  73. @classmethod
  74. def has_value(cls, value: int) -> bool:
  75. return value in set(item.value for item in cls)
  76. class User(db.Model, ModelMixin, UserMixin):
  77. __tablename__ = "users"
  78. email = db.Column(db.String(256), unique=True, nullable=False)
  79. salt = db.Column(db.String(128), nullable=False)
  80. password = db.Column(db.String(128), nullable=False)
  81. name = db.Column(db.String(128), nullable=False)
  82. is_admin = db.Column(db.Boolean, nullable=False, default=False)
  83. alias_generator = db.Column(
  84. db.Integer,
  85. nullable=False,
  86. default=AliasGeneratorEnum.word.value,
  87. server_default=str(AliasGeneratorEnum.word.value),
  88. )
  89. notification = db.Column(
  90. db.Boolean, default=True, nullable=False, server_default="1"
  91. )
  92. activated = db.Column(db.Boolean, default=False, nullable=False)
  93. profile_picture_id = db.Column(db.ForeignKey(File.id), nullable=True)
  94. otp_secret = db.Column(db.String(16), nullable=True)
  95. enable_otp = db.Column(
  96. db.Boolean, nullable=False, default=False, server_default="0"
  97. )
  98. # some users could have lifetime premium
  99. lifetime = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
  100. # user can use all premium features until this date
  101. trial_end = db.Column(
  102. ArrowType, default=lambda: arrow.now().shift(days=7, hours=1), nullable=True
  103. )
  104. profile_picture = db.relationship(File)
  105. @classmethod
  106. def create(cls, email, name, password=None, **kwargs):
  107. user: User = super(User, cls).create(email=email, name=name, **kwargs)
  108. if not password:
  109. # set a random password
  110. password = random_string(20)
  111. user.set_password(password)
  112. db.session.flush()
  113. # create a first alias mail to show user how to use when they login
  114. GenEmail.create_new(user.id, prefix="my-first-alias")
  115. db.session.flush()
  116. # Schedule onboarding emails
  117. Job.create(
  118. name=JOB_ONBOARDING_1,
  119. payload={"user_id": user.id},
  120. run_at=arrow.now().shift(days=1),
  121. )
  122. db.session.flush()
  123. return user
  124. def lifetime_or_active_subscription(self) -> bool:
  125. """True if user has lifetime licence or active subscription"""
  126. if self.lifetime:
  127. return True
  128. sub: Subscription = self.get_subscription()
  129. if sub:
  130. return True
  131. return False
  132. def in_trial(self):
  133. """return True if user does not have lifetime licence or an active subscription AND is in trial period"""
  134. if self.lifetime_or_active_subscription():
  135. return False
  136. if self.trial_end and arrow.now() < self.trial_end:
  137. return True
  138. return False
  139. def should_upgrade(self):
  140. return not self.lifetime_or_active_subscription()
  141. def is_premium(self) -> bool:
  142. """
  143. user is premium if they:
  144. - have a lifetime deal or
  145. - in trial period or
  146. - active subscription
  147. """
  148. if self.lifetime_or_active_subscription():
  149. return True
  150. if self.trial_end and arrow.now() < self.trial_end:
  151. return True
  152. return False
  153. def can_create_new_alias(self) -> bool:
  154. if self.is_premium():
  155. return True
  156. return GenEmail.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
  157. def set_password(self, password):
  158. salt = bcrypt.gensalt()
  159. password_hash = bcrypt.hashpw(password.encode(), salt).decode()
  160. self.salt = salt.decode()
  161. self.password = password_hash
  162. def check_password(self, password) -> bool:
  163. password_hash = bcrypt.hashpw(password.encode(), self.salt.encode())
  164. return self.password.encode() == password_hash
  165. def profile_picture_url(self):
  166. if self.profile_picture_id:
  167. return self.profile_picture.get_url()
  168. else:
  169. return url_for("static", filename="default-avatar.png")
  170. def suggested_emails(self, website_name) -> (str, [str]):
  171. """return suggested email and other email choices """
  172. website_name = convert_to_id(website_name)
  173. all_gen_emails = [ge.email for ge in GenEmail.filter_by(user_id=self.id)]
  174. if self.can_create_new_alias():
  175. suggested_gen_email = GenEmail.create_new(
  176. self.id, prefix=website_name
  177. ).email
  178. else:
  179. # pick an email from the list of gen emails
  180. suggested_gen_email = random.choice(all_gen_emails)
  181. return (
  182. suggested_gen_email,
  183. list(set(all_gen_emails).difference({suggested_gen_email})),
  184. )
  185. def suggested_names(self) -> (str, [str]):
  186. """return suggested name and other name choices """
  187. other_name = convert_to_id(self.name)
  188. return self.name, [other_name, "Anonymous", "whoami"]
  189. def get_name_initial(self) -> str:
  190. names = self.name.split(" ")
  191. return "".join([n[0].upper() for n in names if n])
  192. def get_subscription(self):
  193. """return *active* subscription
  194. TODO: support user unsubscribe and re-subscribe
  195. """
  196. sub = Subscription.get_by(user_id=self.id)
  197. if sub and sub.cancelled:
  198. # sub is active until the next billing_date + 1
  199. if sub.next_bill_date >= arrow.now().shift(days=-1).date():
  200. return sub
  201. else: # past subscription, user is considered not having a subscription
  202. return None
  203. else:
  204. return sub
  205. def verified_custom_domains(self):
  206. return CustomDomain.query.filter_by(user_id=self.id, verified=True).all()
  207. def __repr__(self):
  208. return f"<User {self.id} {self.name} {self.email}>"
  209. def _expiration_1h():
  210. return arrow.now().shift(hours=1)
  211. def _expiration_12h():
  212. return arrow.now().shift(hours=12)
  213. def _expiration_5m():
  214. return arrow.now().shift(minutes=5)
  215. class ActivationCode(db.Model, ModelMixin):
  216. """For activate user account"""
  217. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  218. code = db.Column(db.String(128), unique=True, nullable=False)
  219. user = db.relationship(User)
  220. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  221. def is_expired(self):
  222. return self.expired < arrow.now()
  223. class ResetPasswordCode(db.Model, ModelMixin):
  224. """For resetting password"""
  225. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  226. code = db.Column(db.String(128), unique=True, nullable=False)
  227. user = db.relationship(User)
  228. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  229. def is_expired(self):
  230. return self.expired < arrow.now()
  231. # <<< OAUTH models >>>
  232. def generate_oauth_client_id(client_name) -> str:
  233. oauth_client_id = convert_to_id(client_name) + "-" + random_string()
  234. # check that the client does not exist yet
  235. if not Client.get_by(oauth_client_id=oauth_client_id):
  236. LOG.debug("generate oauth_client_id %s", oauth_client_id)
  237. return oauth_client_id
  238. # Rerun the function
  239. LOG.warning(
  240. "client_id %s already exists, generate a new client_id", oauth_client_id
  241. )
  242. return generate_oauth_client_id(client_name)
  243. class Client(db.Model, ModelMixin):
  244. oauth_client_id = db.Column(db.String(128), unique=True, nullable=False)
  245. oauth_client_secret = db.Column(db.String(128), nullable=False)
  246. name = db.Column(db.String(128), nullable=False)
  247. home_url = db.Column(db.String(1024))
  248. published = db.Column(db.Boolean, default=False, nullable=False)
  249. # user who created this client
  250. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  251. icon_id = db.Column(db.ForeignKey(File.id), nullable=True)
  252. icon = db.relationship(File)
  253. def nb_user(self):
  254. return ClientUser.filter_by(client_id=self.id).count()
  255. def get_scopes(self) -> [Scope]:
  256. # todo: client can choose which scopes they want to have access
  257. return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL]
  258. @classmethod
  259. def create_new(cls, name, user_id) -> "Client":
  260. # generate a client-id
  261. oauth_client_id = generate_oauth_client_id(name)
  262. oauth_client_secret = random_string(40)
  263. client = Client.create(
  264. name=name,
  265. oauth_client_id=oauth_client_id,
  266. oauth_client_secret=oauth_client_secret,
  267. user_id=user_id,
  268. )
  269. return client
  270. def get_icon_url(self):
  271. if self.icon_id:
  272. return self.icon.get_url()
  273. else:
  274. return URL + "/static/default-icon.svg"
  275. def last_user_login(self) -> "ClientUser":
  276. client_user = (
  277. ClientUser.query.filter(ClientUser.client_id == self.id)
  278. .order_by(ClientUser.updated_at)
  279. .first()
  280. )
  281. if client_user:
  282. return client_user
  283. return None
  284. class RedirectUri(db.Model, ModelMixin):
  285. """Valid redirect uris for a client"""
  286. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  287. uri = db.Column(db.String(1024), nullable=False)
  288. client = db.relationship(Client, backref="redirect_uris")
  289. class AuthorizationCode(db.Model, ModelMixin):
  290. code = db.Column(db.String(128), unique=True, nullable=False)
  291. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  292. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  293. scope = db.Column(db.String(128))
  294. redirect_uri = db.Column(db.String(1024))
  295. # what is the input response_type, e.g. "code", "code,id_token", ...
  296. response_type = db.Column(db.String(128))
  297. user = db.relationship(User, lazy=False)
  298. client = db.relationship(Client, lazy=False)
  299. expired = db.Column(ArrowType, nullable=False, default=_expiration_5m)
  300. def is_expired(self):
  301. return self.expired < arrow.now()
  302. class OauthToken(db.Model, ModelMixin):
  303. access_token = db.Column(db.String(128), unique=True)
  304. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  305. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  306. scope = db.Column(db.String(128))
  307. redirect_uri = db.Column(db.String(1024))
  308. # what is the input response_type, e.g. "token", "token,id_token", ...
  309. response_type = db.Column(db.String(128))
  310. user = db.relationship(User)
  311. client = db.relationship(Client)
  312. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  313. def is_expired(self):
  314. return self.expired < arrow.now()
  315. def generate_email(
  316. scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False
  317. ) -> str:
  318. """generate an email address that does not exist before
  319. :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
  320. :type in_hex: bool, if the generate scheme is uuid, is hex favorable?
  321. """
  322. if scheme == AliasGeneratorEnum.uuid.value:
  323. name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
  324. random_email = name + "@" + EMAIL_DOMAIN
  325. else:
  326. random_email = random_words() + "@" + EMAIL_DOMAIN
  327. # check that the client does not exist yet
  328. if not GenEmail.get_by(email=random_email) and not DeletedAlias.get_by(
  329. email=random_email
  330. ):
  331. LOG.debug("generate email %s", random_email)
  332. return random_email
  333. # Rerun the function
  334. LOG.warning("email %s already exists, generate a new email", random_email)
  335. return generate_email(scheme=scheme, in_hex=in_hex)
  336. class GenEmail(db.Model, ModelMixin):
  337. """Generated email"""
  338. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  339. email = db.Column(db.String(128), unique=True, nullable=False)
  340. enabled = db.Column(db.Boolean(), default=True, nullable=False)
  341. custom_domain_id = db.Column(
  342. db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=True
  343. )
  344. # To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature
  345. automatic_creation = db.Column(
  346. db.Boolean, nullable=False, default=False, server_default="0"
  347. )
  348. # to know whether an alias belongs to a directory
  349. directory_id = db.Column(
  350. db.ForeignKey("directory.id", ondelete="cascade"), nullable=True
  351. )
  352. note = db.Column(db.Text, default=None, nullable=True)
  353. # an alias can be owned by another mailbox
  354. mailbox_id = db.Column(
  355. db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True, default=None
  356. )
  357. user = db.relationship(User)
  358. mailbox = db.relationship('Mailbox')
  359. @classmethod
  360. def create_new(cls, user_id, prefix, note=None):
  361. if not prefix:
  362. raise Exception("alias prefix cannot be empty")
  363. # find the right suffix - avoid infinite loop by running this at max 1000 times
  364. for i in range(1000):
  365. suffix = random_word()
  366. email = f"{prefix}.{suffix}@{EMAIL_DOMAIN}"
  367. if not cls.get_by(email=email):
  368. break
  369. return GenEmail.create(user_id=user_id, email=email, note=note)
  370. @classmethod
  371. def create_new_random(
  372. cls, user_id, scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False
  373. ):
  374. """create a new random alias"""
  375. random_email = generate_email(scheme=scheme, in_hex=in_hex)
  376. return GenEmail.create(user_id=user_id, email=random_email)
  377. def __repr__(self):
  378. return f"<GenEmail {self.id} {self.email}>"
  379. class ClientUser(db.Model, ModelMixin):
  380. __table_args__ = (
  381. db.UniqueConstraint("user_id", "client_id", name="uq_client_user"),
  382. )
  383. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  384. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  385. # Null means client has access to user original email
  386. gen_email_id = db.Column(
  387. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=True
  388. )
  389. # user can decide to send to client another name
  390. name = db.Column(
  391. db.String(128), nullable=True, default=None, server_default=text("NULL")
  392. )
  393. # user can decide to send to client a default avatar
  394. default_avatar = db.Column(
  395. db.Boolean, nullable=False, default=False, server_default="0"
  396. )
  397. gen_email = db.relationship(GenEmail, backref="client_users")
  398. user = db.relationship(User)
  399. client = db.relationship(Client)
  400. def get_email(self):
  401. return self.gen_email.email if self.gen_email_id else self.user.email
  402. def get_user_name(self):
  403. if self.name:
  404. return self.name
  405. else:
  406. return self.user.name
  407. def get_user_info(self) -> dict:
  408. """return user info according to client scope
  409. Return dict with key being scope name. For now all the fields are the same for all clients:
  410. {
  411. "client": "Demo",
  412. "email": "test-avk5l@mail-tester.com",
  413. "email_verified": true,
  414. "id": 1,
  415. "name": "Son GM",
  416. "avatar_url": "http://s3..."
  417. }
  418. """
  419. res = {
  420. "id": self.id,
  421. "client": self.client.name,
  422. "email_verified": True,
  423. "sub": str(self.id),
  424. }
  425. for scope in self.client.get_scopes():
  426. if scope == Scope.NAME:
  427. if self.name:
  428. res[Scope.NAME.value] = self.name
  429. else:
  430. res[Scope.NAME.value] = self.user.name
  431. elif scope == Scope.AVATAR_URL:
  432. if self.user.profile_picture_id:
  433. if self.default_avatar:
  434. res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
  435. else:
  436. res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
  437. AVATAR_URL_EXPIRATION
  438. )
  439. else:
  440. res[Scope.AVATAR_URL.value] = None
  441. elif scope == Scope.EMAIL:
  442. # Use generated email
  443. if self.gen_email_id:
  444. LOG.debug(
  445. "Use gen email for user %s, client %s", self.user, self.client
  446. )
  447. res[Scope.EMAIL.value] = self.gen_email.email
  448. # Use user original email
  449. else:
  450. res[Scope.EMAIL.value] = self.user.email
  451. return res
  452. class ForwardEmail(db.Model, ModelMixin):
  453. """
  454. Emails that are forwarded through SL: email that is sent by website to user via SL alias
  455. """
  456. __table_args__ = (
  457. db.UniqueConstraint("gen_email_id", "website_email", name="uq_forward_email"),
  458. )
  459. gen_email_id = db.Column(
  460. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=False
  461. )
  462. # used to be envelope header, should be mail header from instead
  463. website_email = db.Column(db.String(256), nullable=False)
  464. # the email from header, e.g. AB CD <ab@cd.com>
  465. # nullable as this field is added after website_email
  466. website_from = db.Column(db.String(256), nullable=True)
  467. # when user clicks on "reply", they will reply to this address.
  468. # This address allows to hide user personal email
  469. # this reply email is created every time a website sends an email to user
  470. # it has the prefix "reply+" to distinguish with other email
  471. reply_email = db.Column(db.String(256), nullable=False)
  472. gen_email = db.relationship(GenEmail, backref="forward_emails")
  473. def website_send_to(self):
  474. """return the email address with name.
  475. to use when user wants to send an email from the alias"""
  476. from app.email_utils import get_email_name
  477. if self.website_from:
  478. name = get_email_name(self.website_from)
  479. if name:
  480. return name + " " + self.website_email + f" <{self.reply_email}>"
  481. return self.website_email.replace("@", " at ") + f" <{self.reply_email}>"
  482. def last_reply(self) -> "ForwardEmailLog":
  483. """return the most recent reply"""
  484. return (
  485. ForwardEmailLog.query.filter_by(forward_id=self.id, is_reply=True)
  486. .order_by(desc(ForwardEmailLog.created_at))
  487. .first()
  488. )
  489. class ForwardEmailLog(db.Model, ModelMixin):
  490. forward_id = db.Column(
  491. db.ForeignKey(ForwardEmail.id, ondelete="cascade"), nullable=False
  492. )
  493. # whether this is a reply
  494. is_reply = db.Column(db.Boolean, nullable=False, default=False)
  495. # for ex if alias is disabled, this forwarding is blocked
  496. blocked = db.Column(db.Boolean, nullable=False, default=False)
  497. class Subscription(db.Model, ModelMixin):
  498. # Come from Paddle
  499. cancel_url = db.Column(db.String(1024), nullable=False)
  500. update_url = db.Column(db.String(1024), nullable=False)
  501. subscription_id = db.Column(db.String(1024), nullable=False, unique=True)
  502. event_time = db.Column(ArrowType, nullable=False)
  503. next_bill_date = db.Column(db.Date, nullable=False)
  504. cancelled = db.Column(db.Boolean, nullable=False, default=False)
  505. plan = db.Column(db.Enum(PlanEnum), nullable=False)
  506. user_id = db.Column(
  507. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  508. )
  509. user = db.relationship(User)
  510. def plan_name(self):
  511. if self.plan == PlanEnum.monthly:
  512. return "Monthly ($2.99/month)"
  513. else:
  514. return "Yearly ($29.99/year)"
  515. class DeletedAlias(db.Model, ModelMixin):
  516. """Store all deleted alias to make sure they are NOT reused"""
  517. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  518. email = db.Column(db.String(256), unique=True, nullable=False)
  519. class EmailChange(db.Model, ModelMixin):
  520. """Used when user wants to update their email"""
  521. user_id = db.Column(
  522. db.ForeignKey(User.id, ondelete="cascade"),
  523. nullable=False,
  524. unique=True,
  525. index=True,
  526. )
  527. new_email = db.Column(db.String(256), unique=True, nullable=False)
  528. code = db.Column(db.String(128), unique=True, nullable=False)
  529. expired = db.Column(ArrowType, nullable=False, default=_expiration_12h)
  530. user = db.relationship(User)
  531. def is_expired(self):
  532. return self.expired < arrow.now()
  533. class AliasUsedOn(db.Model, ModelMixin):
  534. """Used to know where an alias is created"""
  535. __table_args__ = (
  536. db.UniqueConstraint("gen_email_id", "hostname", name="uq_alias_used"),
  537. )
  538. gen_email_id = db.Column(
  539. db.ForeignKey(GenEmail.id, ondelete="cascade"), nullable=False
  540. )
  541. hostname = db.Column(db.String(1024), nullable=False)
  542. class ApiKey(db.Model, ModelMixin):
  543. """used in browser extension to identify user"""
  544. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  545. code = db.Column(db.String(128), unique=True, nullable=False)
  546. name = db.Column(db.String(128), nullable=False)
  547. last_used = db.Column(ArrowType, default=None)
  548. times = db.Column(db.Integer, default=0, nullable=False)
  549. user = db.relationship(User)
  550. @classmethod
  551. def create(cls, user_id, name):
  552. # generate unique code
  553. found = False
  554. while not found:
  555. code = random_string(60)
  556. if not cls.get_by(code=code):
  557. found = True
  558. a = cls(user_id=user_id, code=code, name=name)
  559. db.session.add(a)
  560. return a
  561. class CustomDomain(db.Model, ModelMixin):
  562. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  563. domain = db.Column(db.String(128), unique=True, nullable=False)
  564. verified = db.Column(db.Boolean, nullable=False, default=False)
  565. dkim_verified = db.Column(
  566. db.Boolean, nullable=False, default=False, server_default="0"
  567. )
  568. spf_verified = db.Column(
  569. db.Boolean, nullable=False, default=False, server_default="0"
  570. )
  571. # an alias is created automatically the first time it receives an email
  572. catch_all = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  573. user = db.relationship(User)
  574. def nb_alias(self):
  575. return GenEmail.filter_by(custom_domain_id=self.id).count()
  576. def __repr__(self):
  577. return f"<Custom Domain {self.domain}>"
  578. class LifetimeCoupon(db.Model, ModelMixin):
  579. code = db.Column(db.String(128), nullable=False, unique=True)
  580. nb_used = db.Column(db.Integer, nullable=False)
  581. class Directory(db.Model, ModelMixin):
  582. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  583. name = db.Column(db.String(128), unique=True, nullable=False)
  584. user = db.relationship(User)
  585. def nb_alias(self):
  586. return GenEmail.filter_by(directory_id=self.id).count()
  587. def __repr__(self):
  588. return f"<Directory {self.name}>"
  589. class Job(db.Model, ModelMixin):
  590. """Used to schedule one-time job in the future"""
  591. name = db.Column(db.String(128), nullable=False)
  592. payload = db.Column(db.JSON)
  593. # whether the job has been taken by the job runner
  594. taken = db.Column(db.Boolean, default=False, nullable=False)
  595. run_at = db.Column(ArrowType)
  596. def __repr__(self):
  597. return f"<Job {self.id} {self.name} {self.payload}>"
  598. class Mailbox(db.Model, ModelMixin):
  599. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  600. email = db.Column(db.String(256), unique=True, nullable=False)
  601. verified = db.Column(db.Boolean, default=False, nullable=False)
  602. user = db.relationship(User)
  603. def nb_alias(self):
  604. return GenEmail.filter_by(mailbox_id=self.id).count()
  605. def __repr__(self):
  606. return f"<Mailbox {self.email}>"