123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749 |
- import base64
- import json
- from urllib.parse import urlparse, parse_qs
- from flask import url_for
- from app.db import Session
- from app.jose_utils import verify_id_token, decode_id_token
- from app.models import Client, User, ClientUser, RedirectUri
- from app.oauth.views.authorize import (
- get_host_name_and_scheme,
- generate_access_token,
- construct_url,
- )
- from tests.utils import login, random_domain, random_string, random_email
- def generate_random_uri() -> str:
- return f"https://{random_domain()}/callback"
- def test_get_host_name_and_scheme():
- assert get_host_name_and_scheme("http://localhost:8000?a=b") == (
- "localhost",
- "http",
- )
- assert get_host_name_and_scheme(
- "https://www.bubblecode.net/en/2016/01/22/understanding-oauth2/#Implicit_Grant"
- ) == ("www.bubblecode.net", "https")
- def test_generate_access_token(flask_client):
- access_token = generate_access_token()
- assert len(access_token) == 40
- def test_construct_url():
- url = construct_url("http://ab.cd", {"x": "1 2"})
- assert url == "http://ab.cd?x=1%202"
- def test_authorize_page_non_login_user(flask_client):
- """make sure to display login page for non-authenticated user"""
- user = User.create(random_email(), random_string())
- Session.commit()
- client = Client.create_new(random_string(), user.id)
- Session.commit()
- uri = generate_random_uri()
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="code",
- )
- )
- html = r.get_data(as_text=True)
- assert r.status_code == 200
- assert "Sign in to accept sharing data with" in html
- def test_authorize_page_login_user_non_supported_flow(flask_client):
- """return 400 if the flow is not supported"""
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- # Not provide any flow
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri="http://localhost",
- # not provide response_type param here
- )
- )
- # Provide a not supported flow
- html = r.get_data(as_text=True)
- assert r.status_code == 400
- assert "SimpleLogin only support the following OIDC flows" in html
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri="http://localhost",
- # SL does not support this flow combination
- response_type="code token id_token",
- )
- )
- html = r.get_data(as_text=True)
- assert r.status_code == 400
- assert "SimpleLogin only support the following OIDC flows" in html
- def test_authorize_page_login_user(flask_client):
- """make sure to display authorization page for authenticated user"""
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- uri = generate_random_uri()
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="code",
- )
- )
- html = r.get_data(as_text=True)
- assert r.status_code == 200
- assert f"{user.email} (Personal Email)" in html
- def test_authorize_code_flow_no_openid_scope(flask_client):
- """make sure the authorize redirects user to correct page for the *Code Flow*
- and when the *openid* scope is not present
- , ie when response_type=code, openid not in scope
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="code",
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- assert not o.fragment
- # parse the query, should return something like
- # {'state': ['teststate'], 'code': ['knuyjepwvg']}
- queries = parse_qs(o.query)
- assert len(queries) == 2
- assert queries["state"] == ["teststate"]
- assert len(queries["code"]) == 1
- # Exchange the code to get access_token
- basic_auth_headers = base64.b64encode(
- f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
- ).decode("utf-8")
- r = flask_client.post(
- url_for("oauth.token"),
- headers={"Authorization": "Basic " + basic_auth_headers},
- data={"grant_type": "authorization_code", "code": queries["code"][0]},
- )
- # r.json should have this format
- # {
- # 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
- # 'expires_in': 3600,
- # 'scope': '',
- # 'token_type': 'bearer',
- # 'user': {
- # 'avatar_url': None,
- # 'client': 'test client',
- # 'email': 'x@y.z',
- # 'email_verified': True,
- # 'id': 1,
- # 'name': 'AB CD'
- # }
- # }
- assert r.status_code == 200
- assert r.json["access_token"]
- assert r.json["expires_in"] == 3600
- assert not r.json["scope"]
- assert r.json["token_type"] == "Bearer"
- client_user = ClientUser.get_by(client_id=client.id)
- assert r.json["user"] == {
- "avatar_url": None,
- "client": "test client",
- "email": "x@y.z",
- "email_verified": True,
- "id": client_user.id,
- "name": "AB CD",
- "sub": str(client_user.id),
- }
- def test_authorize_code_flow_with_openid_scope(flask_client):
- """make sure the authorize redirects user to correct page for the *Code Flow*
- and when the *openid* scope is present
- , ie when response_type=code, openid in scope
- The authorize endpoint should stay the same: return the *code*.
- The token endpoint however should now return id_token in addition to the access_token
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="code",
- scope="openid", # openid is in scope
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- assert not o.fragment
- # parse the query, should return something like
- # {'state': ['teststate'], 'code': ['knuyjepwvg'], 'scope': ["openid"]}
- queries = parse_qs(o.query)
- assert len(queries) == 3
- assert queries["state"] == ["teststate"]
- assert len(queries["code"]) == 1
- # Exchange the code to get access_token
- basic_auth_headers = base64.b64encode(
- f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
- ).decode("utf-8")
- r = flask_client.post(
- url_for("oauth.token"),
- headers={"Authorization": "Basic " + basic_auth_headers},
- data={"grant_type": "authorization_code", "code": queries["code"][0]},
- )
- # r.json should have this format
- # {
- # 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
- # 'expires_in': 3600,
- # 'scope': '',
- # 'token_type': 'bearer',
- # 'user': {
- # 'avatar_url': None,
- # 'client': 'test client',
- # 'email': 'x@y.z',
- # 'email_verified': True,
- # 'id': 1,
- # 'name': 'AB CD'
- # }
- # }
- assert r.status_code == 200
- assert r.json["access_token"]
- assert r.json["expires_in"] == 3600
- assert r.json["scope"] == "openid"
- assert r.json["token_type"] == "Bearer"
- client_user = ClientUser.get_by(client_id=client.id)
- assert r.json["user"] == {
- "avatar_url": None,
- "client": "test client",
- "email": "x@y.z",
- "email_verified": True,
- "id": client_user.id,
- "name": "AB CD",
- "sub": str(client_user.id),
- }
- # id_token must be returned
- assert r.json["id_token"]
- # id_token must be a valid, correctly signed JWT
- assert verify_id_token(r.json["id_token"])
- def test_authorize_token_flow(flask_client):
- """make sure the authorize redirects user to correct page for the *Token Flow*
- , ie when response_type=token
- The /authorize endpoint should return an access_token
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="token", # token flow
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- # in token flow, access_token is in fragment and not query
- assert o.fragment
- assert not o.query
- # parse the fragment, should return something like
- # {'state': ['teststate'], 'access_token': ['knuyjepwvg']}
- queries = parse_qs(o.fragment)
- assert len(queries) == 2
- assert queries["state"] == ["teststate"]
- # access_token must be returned
- assert len(queries["access_token"]) == 1
- def test_authorize_id_token_flow(flask_client):
- """make sure the authorize redirects user to correct page for the *ID-Token Flow*
- , ie when response_type=id_token
- The /authorize endpoint should return an id_token
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="id_token", # id_token flow
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- assert not o.fragment
- assert o.query
- # parse the fragment, should return something like
- # {'state': ['teststate'], 'id_token': ['knuyjepwvg']}
- queries = parse_qs(o.query)
- assert len(queries) == 2
- assert queries["state"] == ["teststate"]
- # access_token must be returned
- assert len(queries["id_token"]) == 1
- # id_token must be a valid, correctly signed JWT
- assert verify_id_token(queries["id_token"][0])
- def test_authorize_token_id_token_flow(flask_client):
- """make sure the authorize redirects user to correct page for the *ID-Token Token Flow*
- , ie when response_type=id_token,token
- The /authorize endpoint should return an id_token and access_token
- id_token, once decoded, should contain *at_hash* in payload
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="id_token token", # id_token,token flow
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- assert o.fragment
- assert not o.query
- # parse the fragment, should return something like
- # {'state': ['teststate'], 'id_token': ['knuyjepwvg']}
- queries = parse_qs(o.fragment)
- assert len(queries) == 3
- assert queries["state"] == ["teststate"]
- # access_token must be returned
- assert len(queries["id_token"]) == 1
- assert len(queries["access_token"]) == 1
- # id_token must be a valid, correctly signed JWT
- id_token = queries["id_token"][0]
- assert verify_id_token(id_token)
- # make sure jwt has all the necessary fields
- jwt = decode_id_token(id_token)
- # payload should have this format
- # {
- # 'at_hash': 'jLDmoGpuOIHwxeyFEe9SKw',
- # 'aud': 'testclient-sywcpwsyua',
- # 'auth_time': 1565450736,
- # 'avatar_url': None,
- # 'client': 'test client',
- # 'email': 'x@y.z',
- # 'email_verified': True,
- # 'exp': 1565454336,
- # 'iat': 1565450736,
- # 'id': 1,
- # 'iss': 'http://localhost',
- # 'name': 'AB CD',
- # 'sub': '1'
- # }
- payload = json.loads(jwt.claims)
- # at_hash MUST be present when the flow is id_token,token
- assert "at_hash" in payload
- assert "aud" in payload
- assert "auth_time" in payload
- assert "avatar_url" in payload
- assert "client" in payload
- assert "email" in payload
- assert "email_verified" in payload
- assert "exp" in payload
- assert "iat" in payload
- assert "id" in payload
- assert "iss" in payload
- assert "name" in payload
- assert "sub" in payload
- def test_authorize_code_id_token_flow(flask_client):
- """make sure the authorize redirects user to correct page for the *ID-Token Code Flow*
- , ie when response_type=id_token,code
- The /authorize endpoint should return an id_token, code and id_token must contain *c_hash*
- The /token endpoint must return a access_token and an id_token
- """
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- Session.commit()
- domain = random_domain()
- uri = f"https://{domain}/callback"
- RedirectUri.create(
- client_id=client.id,
- uri=uri,
- commit=True,
- )
- # user allows client on the authorization page
- r = flask_client.post(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri=uri,
- response_type="id_token code", # id_token,code flow
- ),
- data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
- # user will be redirected to client page, do not allow redirection here
- # to assert the redirect url
- # follow_redirects=True,
- )
- assert r.status_code == 302 # user gets redirected back to client page
- # r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
- o = urlparse(r.location)
- assert o.netloc == domain
- assert not o.fragment
- assert o.query
- # parse the query, should return something like
- # {'state': ['teststate'], 'id_token': ['knuyjepwvg'], 'code': ['longstring']}
- queries = parse_qs(o.query)
- assert len(queries) == 3
- assert queries["state"] == ["teststate"]
- assert len(queries["id_token"]) == 1
- assert len(queries["code"]) == 1
- # id_token must be a valid, correctly signed JWT
- id_token = queries["id_token"][0]
- assert verify_id_token(id_token)
- # make sure jwt has all the necessary fields
- jwt = decode_id_token(id_token)
- # payload should have this format
- # {
- # 'at_hash': 'jLDmoGpuOIHwxeyFEe9SKw',
- # 'aud': 'testclient-sywcpwsyua',
- # 'auth_time': 1565450736,
- # 'avatar_url': None,
- # 'client': 'test client',
- # 'email': 'x@y.z',
- # 'email_verified': True,
- # 'exp': 1565454336,
- # 'iat': 1565450736,
- # 'id': 1,
- # 'iss': 'http://localhost',
- # 'name': 'AB CD',
- # 'sub': '1'
- # }
- payload = json.loads(jwt.claims)
- # at_hash MUST be present when the flow is id_token,token
- assert "c_hash" in payload
- assert "aud" in payload
- assert "auth_time" in payload
- assert "avatar_url" in payload
- assert "client" in payload
- assert "email" in payload
- assert "email_verified" in payload
- assert "exp" in payload
- assert "iat" in payload
- assert "id" in payload
- assert "iss" in payload
- assert "name" in payload
- assert "sub" in payload
- # <<< Exchange the code to get access_token >>>
- basic_auth_headers = base64.b64encode(
- f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
- ).decode("utf-8")
- r = flask_client.post(
- url_for("oauth.token"),
- headers={"Authorization": "Basic " + basic_auth_headers},
- data={"grant_type": "authorization_code", "code": queries["code"][0]},
- )
- # r.json should have this format
- # {
- # 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
- # 'id_token': 'ab.cd.xy',
- # 'expires_in': 3600,
- # 'scope': '',
- # 'token_type': 'bearer',
- # 'user': {
- # 'avatar_url': None,
- # 'client': 'test client',
- # 'email': 'x@y.z',
- # 'email_verified': True,
- # 'id': 1,
- # 'name': 'AB CD'
- # }
- # }
- assert r.status_code == 200
- assert r.json["access_token"]
- assert r.json["expires_in"] == 3600
- assert not r.json["scope"]
- assert r.json["token_type"] == "Bearer"
- client_user = ClientUser.get_by(client_id=client.id)
- assert r.json["user"] == {
- "avatar_url": None,
- "client": "test client",
- "email": "x@y.z",
- "email_verified": True,
- "id": client_user.id,
- "name": "AB CD",
- "sub": str(client_user.id),
- }
- # id_token must be returned
- assert r.json["id_token"]
- # id_token must be a valid, correctly signed JWT
- assert verify_id_token(r.json["id_token"])
- def test_authorize_page_invalid_client_id(flask_client):
- """make sure to redirect user to redirect_url?error=invalid_client_id"""
- user = login(flask_client)
- Client.create_new("test client", user.id)
- Session.commit()
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id="invalid_client_id",
- state="teststate",
- redirect_uri="http://localhost",
- response_type="code",
- )
- )
- assert r.status_code == 302
- assert r.location == url_for("auth.login")
- def test_authorize_page_http_not_allowed(flask_client):
- """make sure to redirect user to redirect_url?error=http_not_allowed"""
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- client.approved = True
- Session.commit()
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri="http://mywebsite.com",
- response_type="code",
- )
- )
- assert r.status_code == 302
- assert r.location == "http://mywebsite.com?error=http_not_allowed"
- def test_authorize_page_unknown_redirect_uri(flask_client):
- """make sure to redirect user to redirect_url?error=unknown_redirect_uri"""
- user = login(flask_client)
- client = Client.create_new("test client", user.id)
- client.approved = True
- Session.commit()
- r = flask_client.get(
- url_for(
- "oauth.authorize",
- client_id=client.oauth_client_id,
- state="teststate",
- redirect_uri="https://unknown.com",
- response_type="code",
- )
- )
- assert r.status_code == 302
- assert r.location == "https://unknown.com?error=unknown_redirect_uri"
|