1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- """
- Verify incoming webhook from Paddle
- Code inspired from https://developer.paddle.com/webhook-reference/verifying-webhooks
- """
- import base64
- import collections
- # PHPSerialize can be found at https://pypi.python.org/pypi/phpserialize
- import phpserialize
- import requests
- from Crypto.Hash import SHA1
- # Crypto can be found at https://pypi.org/project/pycryptodome/
- from Crypto.PublicKey import RSA
- from Crypto.Signature import PKCS1_v1_5
- from app.config import PADDLE_PUBLIC_KEY_PATH, PADDLE_VENDOR_ID, PADDLE_AUTH_CODE
- # Your Paddle public key.
- from app.log import LOG
- with open(PADDLE_PUBLIC_KEY_PATH) as f:
- public_key = f.read()
- # Convert key from PEM to DER - Strip the first and last lines and newlines, and decode
- public_key_encoded = public_key[26:-25].replace("\n", "")
- public_key_der = base64.b64decode(public_key_encoded)
- def verify_incoming_request(form_data: dict) -> bool:
- """verify the incoming form_data"""
- # copy form data
- input_data = form_data.copy()
- signature = input_data["p_signature"]
- # Remove the p_signature parameter
- del input_data["p_signature"]
- # Ensure all the data fields are strings
- for field in input_data:
- input_data[field] = str(input_data[field])
- # Sort the data
- sorted_data = collections.OrderedDict(sorted(input_data.items()))
- # and serialize the fields
- serialized_data = phpserialize.dumps(sorted_data)
- # verify the data
- key = RSA.importKey(public_key_der)
- digest = SHA1.new()
- digest.update(serialized_data)
- verifier = PKCS1_v1_5.new(key)
- signature = base64.b64decode(signature)
- if verifier.verify(digest, signature):
- return True
- return False
- def cancel_subscription(subscription_id: int) -> bool:
- r = requests.post(
- "https://vendors.paddle.com/api/2.0/subscription/users_cancel",
- data={
- "vendor_id": PADDLE_VENDOR_ID,
- "vendor_auth_code": PADDLE_AUTH_CODE,
- "subscription_id": subscription_id,
- },
- )
- res = r.json()
- if not res["success"]:
- LOG.error(
- f"cannot cancel subscription {subscription_id}, paddle response: {res}"
- )
- return res["success"]
- def change_plan(subscription_id: int, plan_id) -> bool:
- r = requests.post(
- "https://vendors.paddle.com/api/2.0/subscription/users/update",
- data={
- "vendor_id": PADDLE_VENDOR_ID,
- "vendor_auth_code": PADDLE_AUTH_CODE,
- "subscription_id": subscription_id,
- "plan_id": plan_id,
- },
- )
- res = r.json()
- if not res["success"]:
- LOG.error(
- f"cannot change subscription {subscription_id} to {plan_id}, paddle response: {res}"
- )
- return res["success"]
|