models.py 62 KB


  1. import enum
  2. import random
  3. import uuid
  4. from email.utils import formataddr
  5. from typing import List, Tuple, Optional
  6. import arrow
  7. import bcrypt
  8. from arrow import Arrow
  9. from flask import url_for
  10. from flask_login import UserMixin
  11. from sqlalchemy import text, desc, CheckConstraint
  12. from sqlalchemy_utils import ArrowType
  13. from app import s3
  14. from app.config import (
  15. MAX_NB_EMAIL_FREE_PLAN,
  16. URL,
  17. AVATAR_URL_EXPIRATION,
  18. JOB_ONBOARDING_1,
  19. JOB_ONBOARDING_2,
  20. JOB_ONBOARDING_4,
  21. LANDING_PAGE_URL,
  22. FIRST_ALIAS_DOMAIN,
  23. DISABLE_ONBOARDING,
  24. UNSUBSCRIBER,
  25. )
  26. from app.errors import AliasInTrashError
  27. from app.extensions import db
  28. from app.log import LOG
  29. from app.oauth_models import Scope
  30. from app.utils import convert_to_id, random_string, random_words, random_word
  31. class ModelMixin(object):
  32. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  33. created_at = db.Column(ArrowType, default=arrow.utcnow, nullable=False)
  34. updated_at = db.Column(ArrowType, default=None, onupdate=arrow.utcnow)
  35. _repr_hide = ["created_at", "updated_at"]
  36. @classmethod
  37. def query(cls):
  38. return db.session.query(cls)
  39. @classmethod
  40. def get(cls, id):
  41. return cls.query.get(id)
  42. @classmethod
  43. def get_by(cls, **kw):
  44. return cls.query.filter_by(**kw).first()
  45. @classmethod
  46. def filter_by(cls, **kw):
  47. return cls.query.filter_by(**kw)
  48. @classmethod
  49. def get_or_create(cls, **kw):
  50. r = cls.get_by(**kw)
  51. if not r:
  52. r = cls(**kw)
  53. db.session.add(r)
  54. return r
  55. @classmethod
  56. def create(cls, **kw):
  57. # whether should call db.session.commit
  58. commit = kw.pop("commit", False)
  59. r = cls(**kw)
  60. db.session.add(r)
  61. if commit:
  62. db.session.commit()
  63. return r
  64. def save(self):
  65. db.session.add(self)
  66. @classmethod
  67. def delete(cls, obj_id):
  68. cls.query.filter(cls.id == obj_id).delete()
  69. @classmethod
  70. def first(cls):
  71. return cls.query.first()
  72. def __repr__(self):
  73. values = ", ".join(
  74. "%s=%r" % (n, getattr(self, n))
  75. for n in self.__table__.c.keys()
  76. if n not in self._repr_hide
  77. )
  78. return "%s(%s)" % (self.__class__.__name__, values)
  79. class File(db.Model, ModelMixin):
  80. path = db.Column(db.String(128), unique=True, nullable=False)
  81. user_id = db.Column(db.ForeignKey("users.id", ondelete="cascade"), nullable=True)
  82. def get_url(self, expires_in=3600):
  83. return s3.get_url(self.path, expires_in)
  84. def __repr__(self):
  85. return f"<File {self.path}>"
  86. class EnumE(enum.Enum):
  87. @classmethod
  88. def has_value(cls, value: int) -> bool:
  89. return value in set(item.value for item in cls)
  90. class PlanEnum(EnumE):
  91. monthly = 2
  92. yearly = 3
  93. # Specify the format for sender address
  94. class SenderFormatEnum(EnumE):
  95. AT = 0 # John Wick - john at wick.com
  96. VIA = 1 # john@wick.com via SimpleLogin
  97. A = 2 # John Wick - john(a)wick.com
  98. FULL = 3 # John Wick - john@wick.com
  99. class AliasGeneratorEnum(EnumE):
  100. word = 1 # aliases are generated based on random words
  101. uuid = 2 # aliases are generated based on uuid
  102. class Fido(db.Model, ModelMixin):
  103. __tablename__ = "fido"
  104. credential_id = db.Column(db.String(), nullable=False, unique=True, index=True)
  105. uuid = db.Column(
  106. db.ForeignKey("users.fido_uuid", ondelete="cascade"),
  107. unique=False,
  108. nullable=False,
  109. )
  110. public_key = db.Column(db.String(), nullable=False, unique=True)
  111. sign_count = db.Column(db.Integer(), nullable=False)
  112. name = db.Column(db.String(128), nullable=False, unique=False)
  113. class User(db.Model, ModelMixin, UserMixin):
  114. __tablename__ = "users"
  115. email = db.Column(db.String(256), unique=True, nullable=False)
  116. salt = db.Column(db.String(128), nullable=True)
  117. password = db.Column(db.String(128), nullable=True)
  118. name = db.Column(db.String(128), nullable=False)
  119. is_admin = db.Column(db.Boolean, nullable=False, default=False)
  120. alias_generator = db.Column(
  121. db.Integer,
  122. nullable=False,
  123. default=AliasGeneratorEnum.word.value,
  124. server_default=str(AliasGeneratorEnum.word.value),
  125. )
  126. notification = db.Column(
  127. db.Boolean, default=True, nullable=False, server_default="1"
  128. )
  129. activated = db.Column(db.Boolean, default=False, nullable=False)
  130. # an account can be disabled if having harmful behavior
  131. disabled = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
  132. profile_picture_id = db.Column(db.ForeignKey(File.id), nullable=True)
  133. otp_secret = db.Column(db.String(16), nullable=True)
  134. enable_otp = db.Column(
  135. db.Boolean, nullable=False, default=False, server_default="0"
  136. )
  137. last_otp = db.Column(db.String(12), nullable=True, default=False)
  138. # Fields for WebAuthn
  139. fido_uuid = db.Column(db.String(), nullable=True, unique=True)
  140. # the default domain that's used when user creates a new random alias
  141. # default_random_alias_domain_id XOR default_random_alias_public_domain_id
  142. default_random_alias_domain_id = db.Column(
  143. db.ForeignKey("custom_domain.id", ondelete="SET NULL"),
  144. nullable=True,
  145. default=None,
  146. )
  147. default_random_alias_public_domain_id = db.Column(
  148. db.ForeignKey("public_domain.id", ondelete="SET NULL"),
  149. nullable=True,
  150. default=None,
  151. )
  152. # some users could have lifetime premium
  153. lifetime = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
  154. paid_lifetime = db.Column(
  155. db.Boolean, default=False, nullable=False, server_default="0"
  156. )
  157. # user can use all premium features until this date
  158. trial_end = db.Column(
  159. ArrowType, default=lambda: arrow.now().shift(days=7, hours=1), nullable=True
  160. )
  161. # the mailbox used when create random alias
  162. # this field is nullable but in practice, it's always set
  163. # it cannot be set to non-nullable though
  164. # as this will create foreign key cycle between User and Mailbox
  165. default_mailbox_id = db.Column(
  166. db.ForeignKey("mailbox.id"), nullable=True, default=None
  167. )
  168. profile_picture = db.relationship(File, foreign_keys=[profile_picture_id])
  169. # Specify the format for sender address
  170. # John Wick - john at wick.com -> 0
  171. # john@wick.com via SimpleLogin -> 1
  172. # John Wick - john@wick.com -> 2
  173. # John Wick - john@wick.com -> 3
  174. sender_format = db.Column(
  175. db.Integer, default="1", nullable=False, server_default="1"
  176. )
  177. replace_reverse_alias = db.Column(
  178. db.Boolean, default=False, nullable=False, server_default="0"
  179. )
  180. referral_id = db.Column(
  181. db.ForeignKey("referral.id", ondelete="SET NULL"), nullable=True, default=None
  182. )
  183. referral = db.relationship("Referral", foreign_keys=[referral_id])
  184. # whether intro has been shown to user
  185. intro_shown = db.Column(
  186. db.Boolean, default=False, nullable=False, server_default="0"
  187. )
  188. default_mailbox = db.relationship("Mailbox", foreign_keys=[default_mailbox_id])
  189. # user can set a more strict max_spam score to block spams more aggressively
  190. max_spam_score = db.Column(db.Integer, nullable=True)
  191. # newsletter is sent to this address
  192. newsletter_alias_id = db.Column(
  193. db.ForeignKey("alias.id", ondelete="SET NULL"), nullable=True, default=None
  194. )
  195. @classmethod
  196. def create(cls, email, name, password=None, **kwargs):
  197. user: User = super(User, cls).create(email=email, name=name, **kwargs)
  198. if password:
  199. user.set_password(password)
  200. db.session.flush()
  201. mb = Mailbox.create(user_id=user.id, email=user.email, verified=True)
  202. db.session.flush()
  203. user.default_mailbox_id = mb.id
  204. # create a first alias mail to show user how to use when they login
  205. alias = Alias.create_new(
  206. user,
  207. prefix="simplelogin-newsletter",
  208. mailbox_id=mb.id,
  209. note="This is your first alias. It's used to receive SimpleLogin communications "
  210. "like new features announcements, newsletters.",
  211. )
  212. db.session.flush()
  213. user.newsletter_alias_id = alias.id
  214. db.session.flush()
  215. if DISABLE_ONBOARDING:
  216. LOG.d("Disable onboarding emails")
  217. return user
  218. # Schedule onboarding emails
  219. Job.create(
  220. name=JOB_ONBOARDING_1,
  221. payload={"user_id": user.id},
  222. run_at=arrow.now().shift(days=1),
  223. )
  224. Job.create(
  225. name=JOB_ONBOARDING_2,
  226. payload={"user_id": user.id},
  227. run_at=arrow.now().shift(days=2),
  228. )
  229. Job.create(
  230. name=JOB_ONBOARDING_4,
  231. payload={"user_id": user.id},
  232. run_at=arrow.now().shift(days=3),
  233. )
  234. db.session.flush()
  235. return user
  236. def _lifetime_or_active_subscription(self) -> bool:
  237. """True if user has lifetime licence or active subscription"""
  238. if self.lifetime:
  239. return True
  240. sub: Subscription = self.get_subscription()
  241. if sub:
  242. return True
  243. apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
  244. if apple_sub and apple_sub.is_valid():
  245. return True
  246. manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
  247. if manual_sub and manual_sub.end_at > arrow.now():
  248. return True
  249. return False
  250. def is_paid(self) -> bool:
  251. """same as _lifetime_or_active_subscription but not include free manual subscription"""
  252. sub: Subscription = self.get_subscription()
  253. if sub:
  254. return True
  255. apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
  256. if apple_sub and apple_sub.is_valid():
  257. return True
  258. manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
  259. if (
  260. manual_sub
  261. and not manual_sub.is_giveaway
  262. and manual_sub.end_at > arrow.now()
  263. ):
  264. return True
  265. return False
  266. def in_trial(self):
  267. """return True if user does not have lifetime licence or an active subscription AND is in trial period"""
  268. if self._lifetime_or_active_subscription():
  269. return False
  270. if self.trial_end and arrow.now() < self.trial_end:
  271. return True
  272. return False
  273. def should_show_upgrade_button(self):
  274. if self._lifetime_or_active_subscription():
  275. # user who has canceled can also re-subscribe
  276. sub: Subscription = self.get_subscription()
  277. if sub and sub.cancelled:
  278. return True
  279. return False
  280. return True
  281. def can_upgrade(self):
  282. """User who has lifetime licence or giveaway manual subscriptions can decide to upgrade to a paid plan"""
  283. sub: Subscription = self.get_subscription()
  284. # user who has canceled can also re-subscribe
  285. if sub and not sub.cancelled:
  286. return False
  287. apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
  288. if apple_sub and apple_sub.is_valid():
  289. return False
  290. manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
  291. # user who has giveaway premium can decide to upgrade
  292. if (
  293. manual_sub
  294. and manual_sub.end_at > arrow.now()
  295. and not manual_sub.is_giveaway
  296. ):
  297. return False
  298. return True
  299. def is_premium(self) -> bool:
  300. """
  301. user is premium if they:
  302. - have a lifetime deal or
  303. - in trial period or
  304. - active subscription
  305. """
  306. if self._lifetime_or_active_subscription():
  307. return True
  308. if self.trial_end and arrow.now() < self.trial_end:
  309. return True
  310. return False
  311. def can_create_new_alias(self) -> bool:
  312. if self.is_premium():
  313. return True
  314. return Alias.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
  315. def set_password(self, password):
  316. salt = bcrypt.gensalt()
  317. password_hash = bcrypt.hashpw(password.encode(), salt).decode()
  318. self.salt = salt.decode()
  319. self.password = password_hash
  320. def check_password(self, password) -> bool:
  321. if not self.password:
  322. return False
  323. password_hash = bcrypt.hashpw(password.encode(), self.salt.encode())
  324. return self.password.encode() == password_hash
  325. def profile_picture_url(self):
  326. if self.profile_picture_id:
  327. return self.profile_picture.get_url()
  328. else:
  329. return url_for("static", filename="default-avatar.png")
  330. def suggested_emails(self, website_name) -> (str, [str]):
  331. """return suggested email and other email choices """
  332. website_name = convert_to_id(website_name)
  333. all_aliases = [
  334. ge.email for ge in Alias.filter_by(user_id=self.id, enabled=True)
  335. ]
  336. if self.can_create_new_alias():
  337. suggested_alias = Alias.create_new(self, prefix=website_name).email
  338. else:
  339. # pick an email from the list of gen emails
  340. suggested_alias = random.choice(all_aliases)
  341. return (
  342. suggested_alias,
  343. list(set(all_aliases).difference({suggested_alias})),
  344. )
  345. def suggested_names(self) -> (str, [str]):
  346. """return suggested name and other name choices """
  347. other_name = convert_to_id(self.name)
  348. return self.name, [other_name, "Anonymous", "whoami"]
  349. def get_name_initial(self) -> str:
  350. names = self.name.split(" ")
  351. return "".join([n[0].upper() for n in names if n])
  352. def get_subscription(self) -> Optional["Subscription"]:
  353. """return *active* subscription
  354. TODO: support user unsubscribe and re-subscribe
  355. """
  356. sub = Subscription.get_by(user_id=self.id)
  357. if sub:
  358. # sub is active until the next billing_date + 1
  359. if sub.next_bill_date >= arrow.now().shift(days=-1).date():
  360. return sub
  361. # past subscription, user is considered not having a subscription = free plan
  362. else:
  363. return None
  364. else:
  365. return sub
  366. def verified_custom_domains(self) -> ["CustomDomain"]:
  367. return CustomDomain.query.filter_by(user_id=self.id, verified=True).all()
  368. def mailboxes(self) -> List["Mailbox"]:
  369. """list of mailbox that user own"""
  370. mailboxes = []
  371. for mailbox in Mailbox.query.filter_by(user_id=self.id, verified=True):
  372. mailboxes.append(mailbox)
  373. return mailboxes
  374. def nb_directory(self):
  375. return Directory.query.filter_by(user_id=self.id).count()
  376. def has_custom_domain(self):
  377. return CustomDomain.filter_by(user_id=self.id, verified=True).count() > 0
  378. def custom_domains(self):
  379. return CustomDomain.filter_by(user_id=self.id, verified=True).all()
  380. def available_domains_for_random_alias(self) -> List[Tuple[bool, str]]:
  381. """Return available domains for user to create random aliases
  382. Each result record contains:
  383. - whether the domain belongs to SimpleLogin
  384. - the domain
  385. """
  386. res = []
  387. for domain in self.available_sl_domains():
  388. res.append((True, domain))
  389. for custom_domain in self.verified_custom_domains():
  390. res.append((False, custom_domain.domain))
  391. return res
  392. def default_random_alias_domain(self) -> str:
  393. """return the domain used for the random alias"""
  394. if self.default_random_alias_domain_id:
  395. custom_domain = CustomDomain.get(self.default_random_alias_domain_id)
  396. # sanity check
  397. if (
  398. not custom_domain
  399. or not custom_domain.verified
  400. or custom_domain.user_id != self.id
  401. ):
  402. LOG.warning("Problem with %s default random alias domain", self)
  403. return FIRST_ALIAS_DOMAIN
  404. return custom_domain.domain
  405. if self.default_random_alias_public_domain_id:
  406. sl_domain = SLDomain.get(self.default_random_alias_public_domain_id)
  407. # sanity check
  408. if not sl_domain:
  409. LOG.exception("Problem with %s public random alias domain", self)
  410. return FIRST_ALIAS_DOMAIN
  411. if sl_domain.premium_only and not self.is_premium():
  412. LOG.warning(
  413. "%s is not premium and cannot use %s. Reset default random alias domain setting",
  414. self,
  415. sl_domain,
  416. )
  417. self.default_random_alias_domain_id = None
  418. self.default_random_alias_public_domain_id = None
  419. db.session.commit()
  420. return FIRST_ALIAS_DOMAIN
  421. return sl_domain.domain
  422. return FIRST_ALIAS_DOMAIN
  423. def fido_enabled(self) -> bool:
  424. if self.fido_uuid is not None:
  425. return True
  426. return False
  427. def two_factor_authentication_enabled(self) -> bool:
  428. return self.enable_otp or self.fido_enabled()
  429. def get_communication_email(self) -> (Optional[str], str, bool):
  430. """
  431. Return
  432. - the email that user uses to receive email communication. None if user unsubscribes from newsletter
  433. - the unsubscribe URL
  434. - whether the unsubscribe method is via sending email (mailto:) or Http POST
  435. """
  436. if self.notification and self.activated:
  437. if self.newsletter_alias_id:
  438. alias = Alias.get(self.newsletter_alias_id)
  439. if alias.enabled:
  440. unsubscribe_link, via_email = alias.unsubscribe_link()
  441. return alias.email, unsubscribe_link, via_email
  442. # alias disabled -> user doesn't want to receive newsletter
  443. else:
  444. return None, None, False
  445. else:
  446. # do not handle http POST unsubscribe
  447. if UNSUBSCRIBER:
  448. # use * as suffix instead of = as for alias unsubscribe
  449. return self.email, f"mailto:{UNSUBSCRIBER}?subject={self.id}*", True
  450. return None, None, False
  451. def available_sl_domains(self) -> [str]:
  452. """
  453. Return all SimpleLogin domains that user can use when creating a new alias, including:
  454. - SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
  455. - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
  456. """
  457. return [sl_domain.domain for sl_domain in self.get_sl_domains()]
  458. def get_sl_domains(self) -> ["SLDomain"]:
  459. if self.is_premium():
  460. query = SLDomain.query
  461. else:
  462. query = SLDomain.filter_by(premium_only=False)
  463. return query.all()
  464. def available_alias_domains(self) -> [str]:
  465. """return all domains that user can use when creating a new alias, including:
  466. - SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
  467. - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
  468. - Verified custom domains
  469. """
  470. domains = self.available_sl_domains()
  471. for custom_domain in self.verified_custom_domains():
  472. domains.append(custom_domain.domain)
  473. # can have duplicate where a "root" user has a domain that's also listed in SL domains
  474. return list(set(domains))
  475. def __repr__(self):
  476. return f"<User {self.id} {self.name} {self.email}>"
  477. def _expiration_1h():
  478. return arrow.now().shift(hours=1)
  479. def _expiration_12h():
  480. return arrow.now().shift(hours=12)
  481. def _expiration_5m():
  482. return arrow.now().shift(minutes=5)
  483. def _expiration_7d():
  484. return arrow.now().shift(days=7)
  485. class ActivationCode(db.Model, ModelMixin):
  486. """For activate user account"""
  487. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  488. code = db.Column(db.String(128), unique=True, nullable=False)
  489. user = db.relationship(User)
  490. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  491. def is_expired(self):
  492. return self.expired < arrow.now()
  493. class ResetPasswordCode(db.Model, ModelMixin):
  494. """For resetting password"""
  495. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  496. code = db.Column(db.String(128), unique=True, nullable=False)
  497. user = db.relationship(User)
  498. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  499. def is_expired(self):
  500. return self.expired < arrow.now()
  501. class SocialAuth(db.Model, ModelMixin):
  502. """Store how user authenticates with social login"""
  503. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  504. # name of the social login used, could be facebook, google or github
  505. social = db.Column(db.String(128), nullable=False)
  506. __table_args__ = (db.UniqueConstraint("user_id", "social", name="uq_social_auth"),)
  507. # <<< OAUTH models >>>
  508. def generate_oauth_client_id(client_name) -> str:
  509. oauth_client_id = convert_to_id(client_name) + "-" + random_string()
  510. # check that the client does not exist yet
  511. if not Client.get_by(oauth_client_id=oauth_client_id):
  512. LOG.debug("generate oauth_client_id %s", oauth_client_id)
  513. return oauth_client_id
  514. # Rerun the function
  515. LOG.warning(
  516. "client_id %s already exists, generate a new client_id", oauth_client_id
  517. )
  518. return generate_oauth_client_id(client_name)
  519. class MfaBrowser(db.Model, ModelMixin):
  520. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  521. token = db.Column(db.String(64), default=False, unique=True, nullable=False)
  522. expires = db.Column(ArrowType, default=False, nullable=False)
  523. user = db.relationship(User)
  524. @classmethod
  525. def create_new(cls, user, token_length=64) -> "MfaBrowser":
  526. found = False
  527. while not found:
  528. token = random_string(token_length)
  529. if not cls.get_by(token=token):
  530. found = True
  531. return MfaBrowser.create(
  532. user_id=user.id,
  533. token=token,
  534. expires=arrow.now().shift(days=30),
  535. )
  536. @classmethod
  537. def delete(cls, token):
  538. cls.query.filter(cls.token == token).delete()
  539. db.session.commit()
  540. @classmethod
  541. def delete_expired(cls):
  542. cls.query.filter(cls.expires < arrow.now()).delete()
  543. db.session.commit()
  544. def is_expired(self):
  545. return self.expires < arrow.now()
  546. def reset_expire(self):
  547. self.expires = arrow.now().shift(days=30)
  548. class Client(db.Model, ModelMixin):
  549. oauth_client_id = db.Column(db.String(128), unique=True, nullable=False)
  550. oauth_client_secret = db.Column(db.String(128), nullable=False)
  551. name = db.Column(db.String(128), nullable=False)
  552. home_url = db.Column(db.String(1024))
  553. published = db.Column(db.Boolean, default=False, nullable=False)
  554. # user who created this client
  555. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  556. icon_id = db.Column(db.ForeignKey(File.id), nullable=True)
  557. icon = db.relationship(File)
  558. def nb_user(self):
  559. return ClientUser.filter_by(client_id=self.id).count()
  560. def get_scopes(self) -> [Scope]:
  561. # todo: client can choose which scopes they want to have access
  562. return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL]
  563. @classmethod
  564. def create_new(cls, name, user_id) -> "Client":
  565. # generate a client-id
  566. oauth_client_id = generate_oauth_client_id(name)
  567. oauth_client_secret = random_string(40)
  568. client = Client.create(
  569. name=name,
  570. oauth_client_id=oauth_client_id,
  571. oauth_client_secret=oauth_client_secret,
  572. user_id=user_id,
  573. )
  574. return client
  575. def get_icon_url(self):
  576. if self.icon_id:
  577. return self.icon.get_url()
  578. else:
  579. return URL + "/static/default-icon.svg"
  580. def last_user_login(self) -> "ClientUser":
  581. client_user = (
  582. ClientUser.query.filter(ClientUser.client_id == self.id)
  583. .order_by(ClientUser.updated_at)
  584. .first()
  585. )
  586. if client_user:
  587. return client_user
  588. return None
  589. class RedirectUri(db.Model, ModelMixin):
  590. """Valid redirect uris for a client"""
  591. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  592. uri = db.Column(db.String(1024), nullable=False)
  593. client = db.relationship(Client, backref="redirect_uris")
  594. class AuthorizationCode(db.Model, ModelMixin):
  595. code = db.Column(db.String(128), unique=True, nullable=False)
  596. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  597. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  598. scope = db.Column(db.String(128))
  599. redirect_uri = db.Column(db.String(1024))
  600. # what is the input response_type, e.g. "code", "code,id_token", ...
  601. response_type = db.Column(db.String(128))
  602. user = db.relationship(User, lazy=False)
  603. client = db.relationship(Client, lazy=False)
  604. expired = db.Column(ArrowType, nullable=False, default=_expiration_5m)
  605. def is_expired(self):
  606. return self.expired < arrow.now()
  607. class OauthToken(db.Model, ModelMixin):
  608. access_token = db.Column(db.String(128), unique=True)
  609. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  610. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  611. scope = db.Column(db.String(128))
  612. redirect_uri = db.Column(db.String(1024))
  613. # what is the input response_type, e.g. "token", "token,id_token", ...
  614. response_type = db.Column(db.String(128))
  615. user = db.relationship(User)
  616. client = db.relationship(Client)
  617. expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
  618. def is_expired(self):
  619. return self.expired < arrow.now()
  620. def generate_email(
  621. scheme: int = AliasGeneratorEnum.word.value,
  622. in_hex: bool = False,
  623. alias_domain=FIRST_ALIAS_DOMAIN,
  624. ) -> str:
  625. """generate an email address that does not exist before
  626. :param alias_domain: the domain used to generate the alias.
  627. :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
  628. :type in_hex: bool, if the generate scheme is uuid, is hex favorable?
  629. """
  630. if scheme == AliasGeneratorEnum.uuid.value:
  631. name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
  632. random_email = name + "@" + alias_domain
  633. else:
  634. random_email = random_words() + "@" + alias_domain
  635. random_email = random_email.lower().strip()
  636. # check that the client does not exist yet
  637. if not Alias.get_by(email=random_email) and not DeletedAlias.get_by(
  638. email=random_email
  639. ):
  640. LOG.debug("generate email %s", random_email)
  641. return random_email
  642. # Rerun the function
  643. LOG.warning("email %s already exists, generate a new email", random_email)
  644. return generate_email(scheme=scheme, in_hex=in_hex)
  645. class Alias(db.Model, ModelMixin):
  646. """Alias"""
  647. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  648. email = db.Column(db.String(128), unique=True, nullable=False)
  649. # the name to use when user replies/sends from alias
  650. name = db.Column(db.String(128), nullable=True, default=None)
  651. enabled = db.Column(db.Boolean(), default=True, nullable=False)
  652. custom_domain_id = db.Column(
  653. db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=True
  654. )
  655. custom_domain = db.relationship("CustomDomain", foreign_keys=[custom_domain_id])
  656. # To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature
  657. automatic_creation = db.Column(
  658. db.Boolean, nullable=False, default=False, server_default="0"
  659. )
  660. # to know whether an alias belongs to a directory
  661. directory_id = db.Column(
  662. db.ForeignKey("directory.id", ondelete="cascade"), nullable=True
  663. )
  664. note = db.Column(db.Text, default=None, nullable=True)
  665. # an alias can be owned by another mailbox
  666. mailbox_id = db.Column(
  667. db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=False
  668. )
  669. # prefix _ to avoid this object being used accidentally.
  670. # To have the list of all mailboxes, should use AliasInfo instead
  671. _mailboxes = db.relationship("Mailbox", secondary="alias_mailbox", lazy="joined")
  672. # If the mailbox has PGP-enabled, user can choose disable the PGP on the alias
  673. # this is useful when some senders already support PGP
  674. disable_pgp = db.Column(
  675. db.Boolean, nullable=False, default=False, server_default="0"
  676. )
  677. # a way to bypass the bounce automatic disable mechanism
  678. cannot_be_disabled = db.Column(
  679. db.Boolean, nullable=False, default=False, server_default="0"
  680. )
  681. # when a mailbox wants to send an email on behalf of the alias via the reverse-alias
  682. # several checks are performed to avoid email spoofing
  683. # this option allow disabling these checks
  684. disable_email_spoofing_check = db.Column(
  685. db.Boolean, nullable=False, default=False, server_default="0"
  686. )
  687. # to know whether an alias is added using a batch import
  688. batch_import_id = db.Column(
  689. db.ForeignKey("batch_import.id", ondelete="SET NULL"),
  690. nullable=True,
  691. default=None,
  692. )
  693. user = db.relationship(User, foreign_keys=[user_id])
  694. mailbox = db.relationship("Mailbox", lazy="joined")
  695. @property
  696. def mailboxes(self):
  697. ret = [self.mailbox]
  698. for m in self._mailboxes:
  699. ret.append(m)
  700. ret = [mb for mb in ret if mb.verified]
  701. ret = sorted(ret, key=lambda mb: mb.email)
  702. return ret
  703. def mailbox_support_pgp(self) -> bool:
  704. """return True of one of the mailboxes support PGP"""
  705. for mb in self.mailboxes:
  706. if mb.pgp_finger_print:
  707. return True
  708. return False
  709. def pgp_enabled(self) -> bool:
  710. if self.mailbox_support_pgp() and not self.disable_pgp:
  711. return True
  712. return False
  713. @classmethod
  714. def create(cls, **kw):
  715. # whether should call db.session.commit
  716. commit = kw.pop("commit", False)
  717. r = cls(**kw)
  718. email = kw["email"]
  719. # make sure email is lowercase and doesn't have any whitespace
  720. email = email.lower().strip().replace(" ", "")
  721. # make sure alias is not in global trash, i.e. DeletedAlias table
  722. if DeletedAlias.get_by(email=email):
  723. raise AliasInTrashError
  724. if DomainDeletedAlias.get_by(email=email):
  725. raise AliasInTrashError
  726. db.session.add(r)
  727. if commit:
  728. db.session.commit()
  729. return r
  730. @classmethod
  731. def create_new(cls, user, prefix, note=None, mailbox_id=None):
  732. prefix = prefix.lower().strip().replace(" ", "")
  733. if not prefix:
  734. raise Exception("alias prefix cannot be empty")
  735. # find the right suffix - avoid infinite loop by running this at max 1000 times
  736. for i in range(1000):
  737. suffix = random_word()
  738. email = f"{prefix}.{suffix}@{FIRST_ALIAS_DOMAIN}"
  739. if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email):
  740. break
  741. return Alias.create(
  742. user_id=user.id,
  743. email=email,
  744. note=note,
  745. mailbox_id=mailbox_id or user.default_mailbox_id,
  746. )
  747. @classmethod
  748. def delete(cls, obj_id):
  749. raise Exception("should use delete_alias(alias,user) instead")
  750. @classmethod
  751. def create_new_random(
  752. cls,
  753. user,
  754. scheme: int = AliasGeneratorEnum.word.value,
  755. in_hex: bool = False,
  756. note: str = None,
  757. ):
  758. """create a new random alias"""
  759. custom_domain = None
  760. random_email = None
  761. if user.default_random_alias_domain_id:
  762. custom_domain = CustomDomain.get(user.default_random_alias_domain_id)
  763. random_email = generate_email(
  764. scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain
  765. )
  766. elif user.default_random_alias_public_domain_id:
  767. sl_domain: SLDomain = SLDomain.get(
  768. user.default_random_alias_public_domain_id
  769. )
  770. if sl_domain.premium_only and not user.is_premium():
  771. LOG.exception("%s not premium, cannot use %s", user, sl_domain)
  772. else:
  773. random_email = generate_email(
  774. scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain
  775. )
  776. if not random_email:
  777. random_email = generate_email(scheme=scheme, in_hex=in_hex)
  778. alias = Alias.create(
  779. user_id=user.id,
  780. email=random_email,
  781. mailbox_id=user.default_mailbox_id,
  782. note=note,
  783. )
  784. if custom_domain:
  785. alias.custom_domain_id = custom_domain.id
  786. return alias
  787. def mailbox_email(self):
  788. if self.mailbox_id:
  789. return self.mailbox.email
  790. else:
  791. return self.user.email
  792. def unsubscribe_link(self) -> (str, bool):
  793. """return the unsubscribe link along with whether this is via email (mailto:) or Http POST
  794. The mailto: method is preferred
  795. """
  796. if UNSUBSCRIBER:
  797. return f"mailto:{UNSUBSCRIBER}?subject={self.id}=", True
  798. else:
  799. return f"{URL}/dashboard/unsubscribe/{self.id}", False
  800. def __repr__(self):
  801. return f"<Alias {self.id} {self.email}>"
  802. class ClientUser(db.Model, ModelMixin):
  803. __table_args__ = (
  804. db.UniqueConstraint("user_id", "client_id", name="uq_client_user"),
  805. )
  806. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  807. client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
  808. # Null means client has access to user original email
  809. alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=True)
  810. # user can decide to send to client another name
  811. name = db.Column(
  812. db.String(128), nullable=True, default=None, server_default=text("NULL")
  813. )
  814. # user can decide to send to client a default avatar
  815. default_avatar = db.Column(
  816. db.Boolean, nullable=False, default=False, server_default="0"
  817. )
  818. alias = db.relationship(Alias, backref="client_users")
  819. user = db.relationship(User)
  820. client = db.relationship(Client)
  821. def get_email(self):
  822. return self.alias.email if self.alias_id else self.user.email
  823. def get_user_name(self):
  824. if self.name:
  825. return self.name
  826. else:
  827. return self.user.name
  828. def get_user_info(self) -> dict:
  829. """return user info according to client scope
  830. Return dict with key being scope name. For now all the fields are the same for all clients:
  831. {
  832. "client": "Demo",
  833. "email": "test-avk5l@mail-tester.com",
  834. "email_verified": true,
  835. "id": 1,
  836. "name": "Son GM",
  837. "avatar_url": "http://s3..."
  838. }
  839. """
  840. res = {
  841. "id": self.id,
  842. "client": self.client.name,
  843. "email_verified": True,
  844. "sub": str(self.id),
  845. }
  846. for scope in self.client.get_scopes():
  847. if scope == Scope.NAME:
  848. if self.name:
  849. res[Scope.NAME.value] = self.name
  850. else:
  851. res[Scope.NAME.value] = self.user.name
  852. elif scope == Scope.AVATAR_URL:
  853. if self.user.profile_picture_id:
  854. if self.default_avatar:
  855. res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
  856. else:
  857. res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
  858. AVATAR_URL_EXPIRATION
  859. )
  860. else:
  861. res[Scope.AVATAR_URL.value] = None
  862. elif scope == Scope.EMAIL:
  863. # Use generated email
  864. if self.alias_id:
  865. LOG.debug(
  866. "Use gen email for user %s, client %s", self.user, self.client
  867. )
  868. res[Scope.EMAIL.value] = self.alias.email
  869. # Use user original email
  870. else:
  871. res[Scope.EMAIL.value] = self.user.email
  872. return res
  873. class Contact(db.Model, ModelMixin):
  874. """
  875. Store configuration of sender (website-email) and alias.
  876. """
  877. __table_args__ = (
  878. db.UniqueConstraint("alias_id", "website_email", name="uq_contact"),
  879. )
  880. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  881. alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False)
  882. name = db.Column(
  883. db.String(512), nullable=True, default=None, server_default=text("NULL")
  884. )
  885. website_email = db.Column(db.String(512), nullable=False)
  886. # the email from header, e.g. AB CD <ab@cd.com>
  887. # nullable as this field is added after website_email
  888. website_from = db.Column(db.String(1024), nullable=True)
  889. # when user clicks on "reply", they will reply to this address.
  890. # This address allows to hide user personal email
  891. # this reply email is created every time a website sends an email to user
  892. # it has the prefix "reply+" to distinguish with other email
  893. reply_email = db.Column(db.String(512), nullable=False)
  894. # whether a contact is created via CC
  895. is_cc = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  896. pgp_public_key = db.Column(db.Text, nullable=True)
  897. pgp_finger_print = db.Column(db.String(512), nullable=True)
  898. alias = db.relationship(Alias, backref="contacts")
  899. user = db.relationship(User)
  900. # the latest reply sent to this contact
  901. latest_reply: Optional[Arrow] = None
  902. # to investigate why the website_email is sometimes not correctly parsed
  903. # the envelope mail_from
  904. mail_from = db.Column(db.Text, nullable=True, default=None)
  905. # the message["From"] header
  906. from_header = db.Column(db.Text, nullable=True, default=None)
  907. @property
  908. def email(self):
  909. return self.website_email
  910. def website_send_to(self):
  911. """return the email address with name.
  912. to use when user wants to send an email from the alias
  913. Return
  914. "First Last | email at example.com" <ra+random_string@SL>
  915. """
  916. # Prefer using contact name if possible
  917. user = self.user
  918. name = self.name
  919. email = self.website_email
  920. if (
  921. not user
  922. or not SenderFormatEnum.has_value(user.sender_format)
  923. or user.sender_format == SenderFormatEnum.AT.value
  924. ):
  925. email = email.replace("@", " at ")
  926. elif user.sender_format == SenderFormatEnum.A.value:
  927. email = email.replace("@", "(a)")
  928. # if no name, try to parse it from website_from
  929. if not name and self.website_from:
  930. try:
  931. from app.email_utils import parseaddr_unicode
  932. name, _ = parseaddr_unicode(self.website_from)
  933. except Exception:
  934. # Skip if website_from is wrongly formatted
  935. LOG.warning(
  936. "Cannot parse contact %s website_from %s", self, self.website_from
  937. )
  938. name = ""
  939. # remove all double quote
  940. if name:
  941. name = name.replace('"', "")
  942. if name:
  943. name = name + " | " + email
  944. else:
  945. name = email
  946. # cannot use formataddr here as this field is for email client, not for MTA
  947. return f'"{name}" <{self.reply_email}>'
  948. def new_addr(self):
  949. """
  950. Replace original email by reply_email. Possible formats:
  951. - first@example.com via SimpleLogin <reply_email> OR
  952. - First Last - first at example.com <reply_email> OR
  953. - First Last - first(a)example.com <reply_email> OR
  954. - First Last - first@example.com <reply_email> OR
  955. And return new address with RFC 2047 format
  956. `new_email` is a special reply address
  957. """
  958. user = self.user
  959. if (
  960. not user
  961. or not SenderFormatEnum.has_value(user.sender_format)
  962. or user.sender_format == SenderFormatEnum.VIA.value
  963. ):
  964. new_name = f"{self.website_email} via SimpleLogin"
  965. else:
  966. if user.sender_format == SenderFormatEnum.AT.value:
  967. formatted_email = self.website_email.replace("@", " at ").strip()
  968. elif user.sender_format == SenderFormatEnum.A.value:
  969. formatted_email = self.website_email.replace("@", "(a)").strip()
  970. elif user.sender_format == SenderFormatEnum.FULL.value:
  971. formatted_email = self.website_email.strip()
  972. # Prefix name to formatted email if available
  973. new_name = (
  974. (self.name + " - " + formatted_email)
  975. if self.name and self.name != self.website_email.strip()
  976. else formatted_email
  977. )
  978. new_addr = formataddr((new_name, self.reply_email)).strip()
  979. return new_addr.strip()
  980. def last_reply(self) -> "EmailLog":
  981. """return the most recent reply"""
  982. return (
  983. EmailLog.query.filter_by(contact_id=self.id, is_reply=True)
  984. .order_by(desc(EmailLog.created_at))
  985. .first()
  986. )
  987. def __repr__(self):
  988. return f"<Contact {self.id} {self.website_email} {self.alias_id}>"
  989. class EmailLog(db.Model, ModelMixin):
  990. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  991. contact_id = db.Column(
  992. db.ForeignKey(Contact.id, ondelete="cascade"), nullable=False
  993. )
  994. # whether this is a reply
  995. is_reply = db.Column(db.Boolean, nullable=False, default=False)
  996. # for ex if alias is disabled, this forwarding is blocked
  997. blocked = db.Column(db.Boolean, nullable=False, default=False)
  998. # can happen when user mailbox refuses the forwarded email
  999. # usually because the forwarded email is too spammy
  1000. bounced = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  1001. # SpamAssassin result
  1002. is_spam = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  1003. spam_score = db.Column(db.Float, nullable=True)
  1004. spam_status = db.Column(db.Text, nullable=True, default=None)
  1005. # Point to the email that has been refused
  1006. refused_email_id = db.Column(
  1007. db.ForeignKey("refused_email.id", ondelete="SET NULL"), nullable=True
  1008. )
  1009. # in case of bounce, record on what mailbox the email has been bounced
  1010. # useful when an alias has several mailboxes
  1011. bounced_mailbox_id = db.Column(
  1012. db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True
  1013. )
  1014. refused_email = db.relationship("RefusedEmail")
  1015. forward = db.relationship(Contact)
  1016. contact = db.relationship(Contact, backref="email_logs")
  1017. def bounced_mailbox(self) -> str:
  1018. if self.bounced_mailbox_id:
  1019. return Mailbox.get(self.bounced_mailbox_id).email
  1020. # retro-compatibility
  1021. return self.contact.alias.mailboxes[0].email
  1022. def get_action(self) -> str:
  1023. """return the action name: forward|reply|block|bounced"""
  1024. if self.is_reply:
  1025. return "reply"
  1026. elif self.bounced:
  1027. return "bounced"
  1028. elif self.blocked:
  1029. return "block"
  1030. else:
  1031. return "forward"
  1032. class Subscription(db.Model, ModelMixin):
  1033. """Paddle subscription"""
  1034. # Come from Paddle
  1035. cancel_url = db.Column(db.String(1024), nullable=False)
  1036. update_url = db.Column(db.String(1024), nullable=False)
  1037. subscription_id = db.Column(db.String(1024), nullable=False, unique=True)
  1038. event_time = db.Column(ArrowType, nullable=False)
  1039. next_bill_date = db.Column(db.Date, nullable=False)
  1040. cancelled = db.Column(db.Boolean, nullable=False, default=False)
  1041. plan = db.Column(db.Enum(PlanEnum), nullable=False)
  1042. user_id = db.Column(
  1043. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  1044. )
  1045. user = db.relationship(User)
  1046. def plan_name(self):
  1047. if self.plan == PlanEnum.monthly:
  1048. return "Monthly ($2.99/month)"
  1049. else:
  1050. return "Yearly ($29.99/year)"
  1051. def __repr__(self):
  1052. return f"<Subscription {self.plan} {self.next_bill_date}>"
  1053. class ManualSubscription(db.Model, ModelMixin):
  1054. """
  1055. For users who use other forms of payment and therefore not pass by Paddle
  1056. """
  1057. user_id = db.Column(
  1058. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  1059. )
  1060. # an reminder is sent several days before the subscription ends
  1061. end_at = db.Column(ArrowType, nullable=False)
  1062. # for storing note about this subscription
  1063. comment = db.Column(db.Text, nullable=True)
  1064. # manual subscription are also used for Premium giveaways
  1065. is_giveaway = db.Column(
  1066. db.Boolean, default=False, nullable=False, server_default="0"
  1067. )
  1068. user = db.relationship(User)
  1069. # https://help.apple.com/app-store-connect/#/dev58bda3212
  1070. _APPLE_GRACE_PERIOD_DAYS = 16
  1071. class AppleSubscription(db.Model, ModelMixin):
  1072. """
  1073. For users who have subscribed via Apple in-app payment
  1074. """
  1075. user_id = db.Column(
  1076. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  1077. )
  1078. expires_date = db.Column(ArrowType, nullable=False)
  1079. # to avoid using "Restore Purchase" on another account
  1080. original_transaction_id = db.Column(db.String(256), nullable=False, unique=True)
  1081. receipt_data = db.Column(db.Text(), nullable=False)
  1082. plan = db.Column(db.Enum(PlanEnum), nullable=False)
  1083. user = db.relationship(User)
  1084. def is_valid(self):
  1085. # Todo: take into account grace period?
  1086. return self.expires_date > arrow.now().shift(days=-_APPLE_GRACE_PERIOD_DAYS)
  1087. class DeletedAlias(db.Model, ModelMixin):
  1088. """Store all deleted alias to make sure they are NOT reused"""
  1089. email = db.Column(db.String(256), unique=True, nullable=False)
  1090. @classmethod
  1091. def create(cls, **kw):
  1092. raise Exception("should use delete_alias(alias,user) instead")
  1093. def __repr__(self):
  1094. return f"<Deleted Alias {self.email}>"
  1095. class EmailChange(db.Model, ModelMixin):
  1096. """Used when user wants to update their email"""
  1097. user_id = db.Column(
  1098. db.ForeignKey(User.id, ondelete="cascade"),
  1099. nullable=False,
  1100. unique=True,
  1101. index=True,
  1102. )
  1103. new_email = db.Column(db.String(256), unique=True, nullable=False)
  1104. code = db.Column(db.String(128), unique=True, nullable=False)
  1105. expired = db.Column(ArrowType, nullable=False, default=_expiration_12h)
  1106. user = db.relationship(User)
  1107. def is_expired(self):
  1108. return self.expired < arrow.now()
  1109. def __repr__(self):
  1110. return f"<EmailChange {self.id} {self.new_email} {self.user_id}>"
  1111. class AliasUsedOn(db.Model, ModelMixin):
  1112. """Used to know where an alias is created"""
  1113. __table_args__ = (
  1114. db.UniqueConstraint("alias_id", "hostname", name="uq_alias_used"),
  1115. )
  1116. alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False)
  1117. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1118. alias = db.relationship(Alias)
  1119. hostname = db.Column(db.String(1024), nullable=False)
  1120. class ApiKey(db.Model, ModelMixin):
  1121. """used in browser extension to identify user"""
  1122. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1123. code = db.Column(db.String(128), unique=True, nullable=False)
  1124. name = db.Column(db.String(128), nullable=False)
  1125. last_used = db.Column(ArrowType, default=None)
  1126. times = db.Column(db.Integer, default=0, nullable=False)
  1127. user = db.relationship(User)
  1128. @classmethod
  1129. def create(cls, user_id, name):
  1130. # generate unique code
  1131. found = False
  1132. while not found:
  1133. code = random_string(60)
  1134. if not cls.get_by(code=code):
  1135. found = True
  1136. a = cls(user_id=user_id, code=code, name=name)
  1137. db.session.add(a)
  1138. return a
  1139. class CustomDomain(db.Model, ModelMixin):
  1140. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1141. domain = db.Column(db.String(128), unique=True, nullable=False)
  1142. # default name to use when user replies/sends from alias
  1143. name = db.Column(db.String(128), nullable=True, default=None)
  1144. verified = db.Column(db.Boolean, nullable=False, default=False)
  1145. dkim_verified = db.Column(
  1146. db.Boolean, nullable=False, default=False, server_default="0"
  1147. )
  1148. spf_verified = db.Column(
  1149. db.Boolean, nullable=False, default=False, server_default="0"
  1150. )
  1151. dmarc_verified = db.Column(
  1152. db.Boolean, nullable=False, default=False, server_default="0"
  1153. )
  1154. _mailboxes = db.relationship("Mailbox", secondary="domain_mailbox", lazy="joined")
  1155. # an alias is created automatically the first time it receives an email
  1156. catch_all = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  1157. # option to generate random prefix version automatically
  1158. random_prefix_generation = db.Column(
  1159. db.Boolean, nullable=False, default=False, server_default="0"
  1160. )
  1161. # incremented when a check is failed on the domain
  1162. # alert when the number exceeds a threshold
  1163. # used in check_custom_domain()
  1164. nb_failed_checks = db.Column(
  1165. db.Integer, default=0, server_default="0", nullable=False
  1166. )
  1167. user = db.relationship(User, foreign_keys=[user_id])
  1168. @property
  1169. def mailboxes(self):
  1170. if self._mailboxes:
  1171. return self._mailboxes
  1172. else:
  1173. return [self.user.default_mailbox]
  1174. def nb_alias(self):
  1175. return Alias.filter_by(custom_domain_id=self.id).count()
  1176. def __repr__(self):
  1177. return f"<Custom Domain {self.domain}>"
  1178. class DomainDeletedAlias(db.Model, ModelMixin):
  1179. """Store all deleted alias for a domain"""
  1180. __table_args__ = (
  1181. db.UniqueConstraint("domain_id", "email", name="uq_domain_trash"),
  1182. )
  1183. email = db.Column(db.String(256), nullable=False)
  1184. domain_id = db.Column(
  1185. db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=False
  1186. )
  1187. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1188. domain = db.relationship(CustomDomain)
  1189. @classmethod
  1190. def create(cls, **kw):
  1191. raise Exception("should use delete_alias(alias,user) instead")
  1192. def __repr__(self):
  1193. return f"<DomainDeletedAlias {self.id} {self.email}>"
  1194. class LifetimeCoupon(db.Model, ModelMixin):
  1195. code = db.Column(db.String(128), nullable=False, unique=True)
  1196. nb_used = db.Column(db.Integer, nullable=False)
  1197. paid = db.Column(db.Boolean, default=False, server_default="0", nullable=False)
  1198. class Directory(db.Model, ModelMixin):
  1199. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1200. name = db.Column(db.String(128), unique=True, nullable=False)
  1201. user = db.relationship(User)
  1202. _mailboxes = db.relationship(
  1203. "Mailbox", secondary="directory_mailbox", lazy="joined"
  1204. )
  1205. @property
  1206. def mailboxes(self):
  1207. if self._mailboxes:
  1208. return self._mailboxes
  1209. else:
  1210. return [self.user.default_mailbox]
  1211. def nb_alias(self):
  1212. return Alias.filter_by(directory_id=self.id).count()
  1213. @classmethod
  1214. def delete(cls, obj_id):
  1215. obj: Directory = cls.get(obj_id)
  1216. user = obj.user
  1217. # Put all aliases belonging to this directory to global or domain trash
  1218. for alias in Alias.query.filter_by(directory_id=obj_id):
  1219. from app import alias_utils
  1220. alias_utils.delete_alias(alias, user)
  1221. cls.query.filter(cls.id == obj_id).delete()
  1222. db.session.commit()
  1223. def __repr__(self):
  1224. return f"<Directory {self.name}>"
  1225. class Job(db.Model, ModelMixin):
  1226. """Used to schedule one-time job in the future"""
  1227. name = db.Column(db.String(128), nullable=False)
  1228. payload = db.Column(db.JSON)
  1229. # whether the job has been taken by the job runner
  1230. taken = db.Column(db.Boolean, default=False, nullable=False)
  1231. run_at = db.Column(ArrowType)
  1232. def __repr__(self):
  1233. return f"<Job {self.id} {self.name} {self.payload}>"
  1234. class Mailbox(db.Model, ModelMixin):
  1235. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1236. email = db.Column(db.String(256), nullable=False)
  1237. verified = db.Column(db.Boolean, default=False, nullable=False)
  1238. force_spf = db.Column(db.Boolean, default=True, server_default="1", nullable=False)
  1239. # used when user wants to update mailbox email
  1240. new_email = db.Column(db.String(256), unique=True)
  1241. pgp_public_key = db.Column(db.Text, nullable=True)
  1242. pgp_finger_print = db.Column(db.String(512), nullable=True)
  1243. # incremented when a check is failed on the mailbox
  1244. # alert when the number exceeds a threshold
  1245. # used in sanity_check()
  1246. nb_failed_checks = db.Column(
  1247. db.Integer, default=0, server_default="0", nullable=False
  1248. )
  1249. # a mailbox can be disabled if it can't be reached
  1250. disabled = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
  1251. __table_args__ = (db.UniqueConstraint("user_id", "email", name="uq_mailbox_user"),)
  1252. user = db.relationship(User, foreign_keys=[user_id])
  1253. def nb_alias(self):
  1254. return (
  1255. AliasMailbox.filter_by(mailbox_id=self.id).count()
  1256. + Alias.filter_by(mailbox_id=self.id).count()
  1257. )
  1258. @classmethod
  1259. def delete(cls, obj_id):
  1260. mailbox: Mailbox = cls.get(obj_id)
  1261. user = mailbox.user
  1262. # Put all aliases belonging to this mailbox to global or domain trash
  1263. for alias in Alias.query.filter_by(mailbox_id=obj_id):
  1264. # special handling for alias that has several mailboxes and has mailbox_id=obj_id
  1265. if len(alias.mailboxes) > 1:
  1266. # use the first mailbox found in alias._mailboxes
  1267. first_mb = alias._mailboxes[0]
  1268. alias.mailbox_id = first_mb.id
  1269. alias._mailboxes.remove(first_mb)
  1270. else:
  1271. from app import alias_utils
  1272. # only put aliases that have mailbox as a single mailbox into trash
  1273. alias_utils.delete_alias(alias, user)
  1274. db.session.commit()
  1275. cls.query.filter(cls.id == obj_id).delete()
  1276. db.session.commit()
  1277. @property
  1278. def aliases(self) -> [Alias]:
  1279. ret = Alias.filter_by(mailbox_id=self.id).all()
  1280. for am in AliasMailbox.filter_by(mailbox_id=self.id):
  1281. ret.append(am.alias)
  1282. return ret
  1283. def __repr__(self):
  1284. return f"<Mailbox {self.email}>"
  1285. class AccountActivation(db.Model, ModelMixin):
  1286. """contains code to activate the user account when they sign up on mobile"""
  1287. user_id = db.Column(
  1288. db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
  1289. )
  1290. # the activation code is usually 6 digits
  1291. code = db.Column(db.String(10), nullable=False)
  1292. # nb tries decrements each time user enters wrong code
  1293. tries = db.Column(db.Integer, default=3, nullable=False)
  1294. __table_args__ = (
  1295. CheckConstraint(tries >= 0, name="account_activation_tries_positive"),
  1296. {},
  1297. )
  1298. class RefusedEmail(db.Model, ModelMixin):
  1299. """Store emails that have been refused, i.e. bounced or classified as spams"""
  1300. # Store the full report, including logs from Sending & Receiving MTA
  1301. full_report_path = db.Column(db.String(128), unique=True, nullable=False)
  1302. # The original email, to display to user
  1303. path = db.Column(db.String(128), unique=True, nullable=True)
  1304. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1305. # the email content will be deleted at this date
  1306. delete_at = db.Column(ArrowType, nullable=False, default=_expiration_7d)
  1307. # toggle this when email content (stored at full_report_path & path are deleted)
  1308. deleted = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
  1309. def get_url(self, expires_in=3600):
  1310. if self.path:
  1311. return s3.get_url(self.path, expires_in)
  1312. else:
  1313. return s3.get_url(self.full_report_path, expires_in)
  1314. def __repr__(self):
  1315. return f"<Refused Email {self.id} {self.path} {self.delete_at}>"
  1316. class Referral(db.Model, ModelMixin):
  1317. """Referral code so user can invite others"""
  1318. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1319. name = db.Column(db.String(512), nullable=True, default=None)
  1320. code = db.Column(db.String(128), unique=True, nullable=False)
  1321. user = db.relationship(User, foreign_keys=[user_id])
  1322. def nb_user(self) -> int:
  1323. return User.filter_by(referral_id=self.id, activated=True).count()
  1324. def nb_paid_user(self) -> int:
  1325. res = 0
  1326. for user in User.filter_by(referral_id=self.id, activated=True):
  1327. if user.is_paid():
  1328. res += 1
  1329. return res
  1330. def link(self):
  1331. return f"{LANDING_PAGE_URL}?slref={self.code}"
  1332. class SentAlert(db.Model, ModelMixin):
  1333. """keep track of alerts sent to user.
  1334. User can receive an alert when there's abnormal activity on their aliases such as
  1335. - reverse-alias not used by the owning mailbox
  1336. - SPF fails when using the reverse-alias
  1337. - bounced email
  1338. - ...
  1339. Different rate controls can then be implemented based on SentAlert:
  1340. - only once alert: an alert type should be sent only once
  1341. - max number of sent per 24H: an alert type should not be sent more than X times in 24h
  1342. """
  1343. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1344. to_email = db.Column(db.String(256), nullable=False)
  1345. alert_type = db.Column(db.String(256), nullable=False)
  1346. class AliasMailbox(db.Model, ModelMixin):
  1347. __table_args__ = (
  1348. db.UniqueConstraint("alias_id", "mailbox_id", name="uq_alias_mailbox"),
  1349. )
  1350. alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False)
  1351. mailbox_id = db.Column(
  1352. db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
  1353. )
  1354. alias = db.relationship(Alias)
  1355. class DirectoryMailbox(db.Model, ModelMixin):
  1356. __table_args__ = (
  1357. db.UniqueConstraint("directory_id", "mailbox_id", name="uq_directory_mailbox"),
  1358. )
  1359. directory_id = db.Column(
  1360. db.ForeignKey(Directory.id, ondelete="cascade"), nullable=False
  1361. )
  1362. mailbox_id = db.Column(
  1363. db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
  1364. )
  1365. class DomainMailbox(db.Model, ModelMixin):
  1366. """store the owning mailboxes for a domain"""
  1367. __table_args__ = (
  1368. db.UniqueConstraint("domain_id", "mailbox_id", name="uq_domain_mailbox"),
  1369. )
  1370. domain_id = db.Column(
  1371. db.ForeignKey(CustomDomain.id, ondelete="cascade"), nullable=False
  1372. )
  1373. mailbox_id = db.Column(
  1374. db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
  1375. )
  1376. _NB_RECOVERY_CODE = 8
  1377. _RECOVERY_CODE_LENGTH = 8
  1378. class RecoveryCode(db.Model, ModelMixin):
  1379. """allow user to login in case you lose any of your authenticators"""
  1380. __table_args__ = (db.UniqueConstraint("user_id", "code", name="uq_recovery_code"),)
  1381. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1382. code = db.Column(db.String(16), nullable=False)
  1383. used = db.Column(db.Boolean, nullable=False, default=False)
  1384. used_at = db.Column(ArrowType, nullable=True, default=None)
  1385. user = db.relationship(User)
  1386. @classmethod
  1387. def generate(cls, user):
  1388. """generate recovery codes for user"""
  1389. # delete all existing codes
  1390. cls.query.filter_by(user_id=user.id).delete()
  1391. db.session.flush()
  1392. nb_code = 0
  1393. while nb_code < _NB_RECOVERY_CODE:
  1394. code = random_string(_RECOVERY_CODE_LENGTH)
  1395. if not cls.get_by(user_id=user.id, code=code):
  1396. cls.create(user_id=user.id, code=code)
  1397. nb_code += 1
  1398. LOG.d("Create recovery codes for %s", user)
  1399. db.session.commit()
  1400. @classmethod
  1401. def empty(cls, user):
  1402. """Delete all recovery codes for user"""
  1403. cls.query.filter_by(user_id=user.id).delete()
  1404. db.session.commit()
  1405. class Notification(db.Model, ModelMixin):
  1406. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1407. message = db.Column(db.Text, nullable=False)
  1408. # whether user has marked the notification as read
  1409. read = db.Column(db.Boolean, nullable=False, default=False)
  1410. class SLDomain(db.Model, ModelMixin):
  1411. """SimpleLogin domains"""
  1412. __tablename__ = "public_domain"
  1413. domain = db.Column(db.String(128), unique=True, nullable=False)
  1414. # only available for premium accounts
  1415. premium_only = db.Column(
  1416. db.Boolean, nullable=False, default=False, server_default="0"
  1417. )
  1418. def __repr__(self):
  1419. return f"<SLDomain {self.domain} {'Premium' if self.premium_only else 'Free'}"
  1420. class Monitoring(db.Model, ModelMixin):
  1421. """
  1422. Store different host information over the time in order to
  1423. - alert issues in (almost) real time
  1424. - analyze data trending
  1425. """
  1426. host = db.Column(db.String(256), nullable=False)
  1427. # Postfix stats
  1428. incoming_queue = db.Column(db.Integer, nullable=False)
  1429. active_queue = db.Column(db.Integer, nullable=False)
  1430. deferred_queue = db.Column(db.Integer, nullable=False)
  1431. class BatchImport(db.Model, ModelMixin):
  1432. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1433. file_id = db.Column(db.ForeignKey(File.id, ondelete="cascade"), nullable=False)
  1434. processed = db.Column(db.Boolean, nullable=False, default=False)
  1435. summary = db.Column(db.Text, nullable=True, default=None)
  1436. file = db.relationship(File)
  1437. user = db.relationship(User)
  1438. def nb_alias(self):
  1439. return Alias.query.filter_by(batch_import_id=self.id).count()
  1440. def __repr__(self):
  1441. return f"<BatchImport {self.id}>"
  1442. class AuthorizedAddress(db.Model, ModelMixin):
  1443. """Authorize other addresses to send emails from aliases that are owned by a mailbox"""
  1444. user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False)
  1445. mailbox_id = db.Column(
  1446. db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
  1447. )
  1448. email = db.Column(db.String(256), nullable=False)
  1449. __table_args__ = (
  1450. db.UniqueConstraint("mailbox_id", "email", name="uq_authorize_address"),
  1451. )
  1452. mailbox = db.relationship(Mailbox, backref="authorized_addresses")
  1453. def __repr__(self):
  1454. return f"<AuthorizedAddress {self.id} {self.email} {self.mailbox_id}>"