power-mailinabox/management/wkd.py
2021-03-07 23:39:31 +00:00

202 lines
6.2 KiB
Python

#!/usr/local/lib/mailinabox/env/bin/python
# WDK (Web Key Directory) Manager: Facilitates discovery of keys by third-parties
# Current relevant documents: https://tools.ietf.org/id/draft-koch-openpgp-webkey-service-11.html
import pgp, utils, rtyaml, mailconfig, copy, shutil, os
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
env = utils.load_environment()
wkdpath = f"{env['GNUPGHOME']}/.wkdlist.yml"
class WKDError(Exception):
"""
Errors specifically related to WKD.
"""
def __init__(self, msg):
self.message = msg
def __str__(self):
return self.message
def sha1(message):
h = hashes.Hash(hashes.SHA1(), default_backend())
h.update(message)
return h.finalize()
def zbase32(digest):
# Crudely check if all quintets are complete
if len(digest) % 5 != 0:
raise ValueError("Digest cannot have incomplete chunks of 40 bits!")
base = "ybndrfg8ejkmcpqxot1uwisza345h769"
encoded = ""
for i in range(0, len(digest), 5):
chunk = int.from_bytes(digest[i:i+5], byteorder="big")
for j in range(35, -5, -5):
encoded += base[(chunk >> j) & 31]
return encoded
# Strips and exports a key so that only the provided UID index(es) remain.
# This is to comply with the following requirement, set forth in section 5 of the draft:
#
# The mail provider MUST make sure to publish a key in a way
# that only the mail address belonging to the requested user
# is part of the User ID packets included in the returned key.
# Other User ID packets and their associated binding signatures
# MUST be removed before publication.
#
# Update: I have no idea how this works anymore
@pgp.fork_context
def strip_and_export(fpr, except_uid_indexes, context=None):
context.armor = False # We need to disable armor output for this key
k = pgp.get_key(fpr, context)
if k is None:
return None
for i in except_uid_indexes:
if i > len(k.uids):
raise ValueError(f"UID index {i} out of bounds")
switch = [(f"uid {i+1}" if i + 1 not in except_uid_indexes else "") for i in range(len(k.uids))] + ["deluid", "save"]
stage = [-1] # Horrible hack: Because it's a reference (aka pointer), we can pass these around
def interaction(request, prompt):
# print(f"{request}/{prompt}")
if request in ["GOT_IT", "KEY_CONSIDERED", ""]:
return 0
elif request == "GET_BOOL":
# No way to confirm interactively, so we just say yes
return "y" # Yeah, I'd also rather just return True but that doesn't work
elif request == "GET_LINE" and prompt == "keyedit.prompt":
stage[0] += 1
return switch[stage[0]]
else:
raise Exception("No idea of what to do!")
context.interact(k, interaction)
return pgp.export_key(fpr, context)
def email_compatible_with_key(email, fingerprint):
# 1. Does the user exist?
if not email in mailconfig.get_all_mail_addresses(env):
raise ValueError(f"User or alias {email} not found!")
if fingerprint is not None:
key = pgp.get_key(fingerprint)
# 2. Does the key exist?
if key is None:
raise ValueError(f"The key \"{fingerprint}\" does not exist!")
# 3. Does the key have a user id with the email of the user?
if email not in [u.email for u in key.uids]:
raise WKDError(f"The key \"{fingerprint}\" has no such UID with the email \"{email}\"!")
return key
return None
# Gets a table with all the keys that can be served for each user and/or alias
def get_user_fpr_maps():
uk_maps = {}
for email in mailconfig.get_all_mail_addresses(env):
uk_maps[email] = set()
for key in pgp.get_imported_keys() + [pgp.get_daemon_key()]:
for userid in key.uids:
try:
uk_maps[userid.email].add(key.fpr)
except:
# We don't host this email address, so ignore
pass
return uk_maps
# Gets the current WKD configuration
def get_wkd_config():
# Test
try:
with open(wkdpath, "x"):
pass
except:
pass
with open(wkdpath, "r") as wkdfile:
try:
config = rtyaml.load(wkdfile)
if (type(config) != dict):
return {}
return config
except:
return {}
# Sets the WKD configuration. Takes a dictionary {email: fingerprint}.
# email: An user or alias on this box. e.g. "administrator@example.com"
# fingerprint: The fingerprint of the key we want to bind it to. e.g "0123456789ABCDEF0123456789ABCDEF01234567"
def update_wkd_config(config_sample):
config = dict(config_sample)
for email, fingerprint in config_sample.items():
try:
if fingerprint is None or fingerprint == "":
config.pop(email)
else:
email_compatible_with_key(email, fingerprint)
except Exception as err:
raise err
# All conditions met, do the necessary modifications
with open(wkdpath, "w") as wkdfile:
wkdfile.write(rtyaml.dump(config))
# Looks for incompatible email/key pairs on the WKD configuration file
# and returns the uid indexes for compatible email/key pairs
def parse_wkd_list():
removed = []
uidlist = []
with open(wkdpath, "a+") as wkdfile:
wkdfile.seek(0)
config = {}
try:
config = rtyaml.load(wkdfile)
if (type(config) != dict):
config = {}
except:
config = {}
writeable = copy.deepcopy(config)
for u, k in config.items():
try:
key = email_compatible_with_key(u, k)
# Key is compatible
index = []
writeable[u] = key.fpr # Swap with the full-length fingerprint (if somehow this was changed by hand)
for i in range(0, len(key.uids)):
if key.uids[i].email == u:
index.append(i)
uidlist.append((u, key.fpr, index))
except:
writeable.pop(u)
removed.append((u, k))
# Shove the updated configuration back in the file
wkdfile.truncate(0)
wkdfile.write(rtyaml.dump(writeable))
return (removed, uidlist)
WKD_LOCATION = "/var/lib/mailinabox/wkd/"
def build_wkd():
# Clean everything
try:
shutil.rmtree(WKD_LOCATION)
except FileNotFoundError:
pass
os.mkdir(WKD_LOCATION, mode=0o755)
# We serve WKD for all our emails and aliases (even if there are no keys)
for domain in mailconfig.get_mail_domains(env, users_only=False):
os.mkdir(f"{WKD_LOCATION}/{domain}/", mode=0o755)
for email, fpr, indexes in parse_wkd_list()[1]:
local, domain = email.split("@", 1)
localhash = zbase32(sha1(local.lower().encode()))
with open(f"{WKD_LOCATION}/{domain}/{localhash}", "wb") as k:
k.write(strip_and_export(fpr, indexes))