#!/usr/local/lib/mailinabox/env/bin/python # WDK (Web Key Directory) Manager: Facilitates discovery of keys by third-parties # Current relevant documents: https://tools.ietf.org/id/draft-koch-openpgp-webkey-service-11.html import pgp import utils import rtyaml import mailconfig import copy import shutil import os import re from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend env = utils.load_environment() wkdpath = f"{env['GNUPGHOME']}/.wkdlist.yml" class WKDError(Exception): """ Errors specifically related to WKD. """ def __init__(self, msg): self.message = msg def __str__(self): return self.message def sha1(message): h = hashes.Hash(hashes.SHA1(), default_backend()) h.update(message) return h.finalize() def zbase32(digest): # Crudely check if all quintets are complete if len(digest) % 5 != 0: raise ValueError("Digest cannot have incomplete chunks of 40 bits!") base = "ybndrfg8ejkmcpqxot1uwisza345h769" encoded = "" for i in range(0, len(digest), 5): chunk = int.from_bytes(digest[i:i + 5], byteorder="big") for j in range(35, -5, -5): encoded += base[(chunk >> j) & 31] return encoded # Strips and exports a key so that only the UID's with the provided email remain. # This is to comply with the following requirement, set forth in section 5 of the draft: # # The mail provider MUST make sure to publish a key in a way # that only the mail address belonging to the requested user # is part of the User ID packets included in the returned key. # Other User ID packets and their associated binding signatures # MUST be removed before publication. # # The arguments "buffer" and "context" are automatically added # by the pgp.fork_context() decorator. @pgp.fork_context def strip_and_export(fpr, target_email, buffer=None, context=None): context.armor = False # We need to disable armor output for this key k = pgp.get_key(fpr, context) if k is None: return None # Horrible hack: Because it's a reference (aka pointer), we can pass these around the functions statusref = {"seq_read": False, "sequence": [], "seq_number": -1} def parse_key_dump(dump): UID_REGEX = r".*:.* <(.*)>:.*:([0-9]),.*" at_least_one_not_deleted = False lines = dump.decode().split("\n") for line in lines: if line[0:3] != "uid": continue # It's a uid, find the email and the "tag" m = re.search(UID_REGEX, line) if m.group(1) != target_email: statusref["sequence"].append(f"uid {m.group(2)}") else: at_least_one_not_deleted = True if not at_least_one_not_deleted: raise WKDError("All UID's in this key would have been deleted!") statusref["sequence"] += ["deluid", "save"] statusref["seq_read"] = True def interaction(request, prompt): if request in ["GOT_IT", "KEY_CONSIDERED", "KEYEXPIRED", ""]: return 0 elif request == "GET_BOOL": # No way to confirm interactively, so we just say yes return "y" # Yeah, I'd also rather just return True but that doesn't work elif request == "GET_LINE" and prompt == "keyedit.prompt": if not statusref["seq_read"]: buffer.seek(0, os.SEEK_SET) parse_key_dump(buffer.read()) statusref["seq_number"] += 1 seqnum = statusref["seq_number"] return statusref["sequence"][seqnum] else: raise Exception("No idea of what to do!") buffer.seek(0, os.SEEK_SET) buffer.write(b'') context.interact(k, interaction, sink=buffer) return pgp.export_key(fpr, context) def email_compatible_with_key(email, fingerprint): # 1. Does the user exist? if not email in mailconfig.get_all_mail_addresses(env): raise ValueError(f"User or alias {email} not found!") if fingerprint is not None: key = pgp.get_key(fingerprint) # 2. Does the key exist? if key is None: raise ValueError(f"The key \"{fingerprint}\" does not exist!") # 3. Does the key have a user id with the email of the user? if email not in [u.email for u in key.uids]: raise WKDError( f"The key \"{fingerprint}\" has no such UID with the email \"{email}\"!" ) return key return None # Gets a table with all the keys that can be served for each user and/or alias def get_user_fpr_maps(): uk_maps = {} for email in mailconfig.get_all_mail_addresses(env): uk_maps[email] = set() for key in pgp.get_imported_keys() + [pgp.get_daemon_key()]: for userid in key.uids: try: uk_maps[userid.email].add(key.fpr) except: # We don't host this email address, so ignore pass return uk_maps # Gets the current WKD configuration def get_wkd_config(): # Test try: with open(wkdpath, "x"): pass except: pass with open(wkdpath, "r") as wkdfile: try: config = rtyaml.load(wkdfile) if (type(config) != dict): return {} return config except: return {} # Sets the WKD configuration. Takes a dictionary {email: fingerprint}. # email: An user or alias on this box. e.g. "administrator@example.com" # fingerprint: The fingerprint of the key we want to bind it to. e.g "0123456789ABCDEF0123456789ABCDEF01234567" def update_wkd_config(config_sample): config = dict(config_sample) for email, fingerprint in config_sample.items(): try: if fingerprint is None or fingerprint == "": config.pop(email) else: email_compatible_with_key(email, fingerprint) except Exception as err: raise err # All conditions met, do the necessary modifications with open(wkdpath, "w") as wkdfile: wkdfile.write(rtyaml.dump(config)) # Looks for incompatible email/key pairs on the WKD configuration file # and returns the uid indexes for compatible email/key pairs def parse_wkd_list(): removed = [] uidlist = [] with open(wkdpath, "a+") as wkdfile: wkdfile.seek(0) config = {} try: config = rtyaml.load(wkdfile) if (type(config) != dict): config = {} except: config = {} writeable = copy.deepcopy(config) for u, k in config.items(): try: key = email_compatible_with_key(u, k) # Key is compatible # Swap with the full-length fingerprint (if somehow this was changed by hand) writeable[u] = key.fpr uidlist.append((u, key.fpr)) except: writeable.pop(u) removed.append((u, k)) # Shove the updated configuration back in the file wkdfile.truncate(0) wkdfile.write(rtyaml.dump(writeable)) return (removed, uidlist) WKD_LOCATION = "/var/lib/mailinabox/wkd/" def build_wkd(): # Clean everything try: shutil.rmtree(WKD_LOCATION) except FileNotFoundError: pass os.mkdir(WKD_LOCATION, mode=0o755) # We serve WKD for all our emails and aliases (even if there are no keys) for domain in mailconfig.get_mail_domains(env, users_only=False): os.mkdir(f"{WKD_LOCATION}/{domain}/", mode=0o755) for email, fpr in parse_wkd_list()[1]: local, domain = email.split("@", 1) localhash = zbase32(sha1(local.lower().encode())) with open(f"{WKD_LOCATION}/{domain}/{localhash}", "wb") as k: k.write(strip_and_export(fpr, email))