authorize.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import random
  2. from typing import Dict
  3. from urllib.parse import urlparse
  4. from flask import request, render_template, redirect
  5. from flask_login import current_user
  6. from app.extensions import db
  7. from app.jose_utils import make_id_token
  8. from app.log import LOG
  9. from app.models import (
  10. Client,
  11. AuthorizationCode,
  12. ClientUser,
  13. GenEmail,
  14. RedirectUri,
  15. OauthToken,
  16. )
  17. from app.oauth.base import oauth_bp
  18. from app.oauth_models import get_response_types, ResponseType
  19. from app.utils import random_string, encode_url
  20. @oauth_bp.route("/authorize", methods=["GET", "POST"])
  21. def authorize():
  22. """
  23. Redirected from client when user clicks on "Login with Server".
  24. This is a GET request with the following field in url
  25. - client_id
  26. - (optional) state
  27. - response_type: must be code
  28. """
  29. oauth_client_id = request.args.get("client_id")
  30. state = request.args.get("state")
  31. scope = request.args.get("scope")
  32. redirect_uri = request.args.get("redirect_uri")
  33. try:
  34. response_types: [ResponseType] = get_response_types(request)
  35. except ValueError:
  36. return (
  37. "response_type must be code, token, id_token or certain combination of these."
  38. " Please see /.well-known/openid-configuration to see what response_type are supported ",
  39. 400,
  40. )
  41. if not redirect_uri:
  42. LOG.d("no redirect uri")
  43. return "redirect_uri must be set", 400
  44. client = Client.get_by(oauth_client_id=oauth_client_id)
  45. if not client:
  46. return f"no such client with oauth-client-id {oauth_client_id}", 400
  47. # check if redirect_uri is valid
  48. # allow localhost by default
  49. # todo: only allow https
  50. hostname, scheme = get_host_name_and_scheme(redirect_uri)
  51. if hostname != "localhost":
  52. if not RedirectUri.get_by(client_id=client.id, uri=redirect_uri):
  53. return f"{redirect_uri} is not authorized", 400
  54. # redirect from client website
  55. if request.method == "GET":
  56. if current_user.is_authenticated:
  57. # user has already allowed this client
  58. client_user: ClientUser = ClientUser.get_by(
  59. client_id=client.id, user_id=current_user.id
  60. )
  61. user_info = {}
  62. if client_user:
  63. LOG.debug("user %s has already allowed client %s", current_user, client)
  64. user_info = client_user.get_user_info()
  65. return render_template(
  66. "oauth/authorize.html", client=client, user_info=user_info
  67. )
  68. else:
  69. # after user logs in, redirect user back to this page
  70. return render_template(
  71. "oauth/authorize_nonlogin_user.html", client=client, next=request.url
  72. )
  73. else: # user allows or denies
  74. gen_new_email = request.form.get("gen-email") == "on"
  75. if request.form.get("button") == "deny":
  76. LOG.debug("User %s denies Client %s", current_user, client)
  77. final_redirect_uri = f"{redirect_uri}?error=deny&state={state}"
  78. return redirect(final_redirect_uri)
  79. LOG.debug("User %s allows Client %s", current_user, client)
  80. client_user = ClientUser.get_by(client_id=client.id, user_id=current_user.id)
  81. # user has already allowed this client
  82. if client_user:
  83. LOG.d("user %s has already allowed client %s", current_user, client)
  84. # User cannot choose to gen new email
  85. gen_new_email = False
  86. else:
  87. client_user = ClientUser.create(
  88. client_id=client.id, user_id=current_user.id
  89. )
  90. db.session.flush()
  91. LOG.d("create client-user for client %s, user %s", client, current_user)
  92. redirect_args = {}
  93. if state:
  94. redirect_args["state"] = state
  95. else:
  96. LOG.warning(
  97. "more security reason, state should be added. client %s", client
  98. )
  99. if scope:
  100. redirect_args["scope"] = scope
  101. for response_type in response_types:
  102. if response_type == ResponseType.CODE:
  103. # Create authorization code
  104. auth_code = AuthorizationCode.create(
  105. client_id=client.id,
  106. user_id=current_user.id,
  107. code=random_string(),
  108. scope=scope,
  109. redirect_uri=redirect_uri,
  110. )
  111. db.session.add(auth_code)
  112. redirect_args["code"] = auth_code.code
  113. elif response_type == ResponseType.TOKEN:
  114. # create access-token
  115. oauth_token = OauthToken.create(
  116. client_id=client.id,
  117. user_id=current_user.id,
  118. scope=scope,
  119. redirect_uri=redirect_uri,
  120. access_token=generate_access_token(),
  121. )
  122. db.session.add(oauth_token)
  123. redirect_args["access_token"] = oauth_token.access_token
  124. elif response_type == ResponseType.ID_TOKEN:
  125. redirect_args["id_token"] = make_id_token(client_user)
  126. if gen_new_email:
  127. client_user.gen_email_id = create_or_choose_gen_email(current_user).id
  128. db.session.commit()
  129. # construct redirect_uri with redirect_args
  130. return redirect(construct_url(redirect_uri, redirect_args))
  131. def create_or_choose_gen_email(user) -> GenEmail:
  132. can_create_new_email = user.can_create_new_email()
  133. if can_create_new_email:
  134. gen_email = GenEmail.create_new_gen_email(user_id=user.id)
  135. db.session.flush()
  136. LOG.debug("generate email %s for user %s", gen_email.email, user)
  137. else: # need to reuse one of the gen emails created
  138. LOG.d("pick a random email for gen emails for user %s", current_user)
  139. gen_emails = GenEmail.filter_by(user_id=current_user.id).all()
  140. gen_email = random.choice(gen_emails)
  141. return gen_email
  142. def construct_url(url, args: Dict[str, str]):
  143. for i, (k, v) in enumerate(args.items()):
  144. # make sure to escape v
  145. v = encode_url(v)
  146. if i == 0:
  147. url += f"?{k}={v}"
  148. else:
  149. url += f"&{k}={v}"
  150. return url
  151. def generate_access_token() -> str:
  152. """generate an access-token that does not exist before"""
  153. access_token = random_string(40)
  154. if not OauthToken.get_by(access_token=access_token):
  155. return access_token
  156. # Rerun the function
  157. LOG.warning("access token already exists, generate a new one")
  158. return generate_access_token()
  159. def get_host_name_and_scheme(url: str) -> (str, str):
  160. """http://localhost:5000?a=b -> (localhost, http) """
  161. url_comp = urlparse(url)
  162. return url_comp.hostname, url_comp.scheme