#!/usr/local/lib/mailinabox/env/bin/python3 # # The API can be accessed on the command line, e.g. use `curl` like so: # curl --user $(', methods=['GET', 'POST', 'PUT', 'DELETE']) @app.route('/dns/custom//', methods=['GET', 'POST', 'PUT', 'DELETE']) @authorized_personnel_only def dns_set_record(qname, rtype="A"): from dns_update import do_dns_update, set_custom_dns_record try: # Normalize. rtype = rtype.upper() # Read the record value from the request BODY, which must be # ASCII-only. Not used with GET. rec = request.form value = "" ttl = None if isinstance(rec, dict): value = request.form.get("value", "") ttl = request.form.get("ttl", None) else: value = request.stream.read().decode("ascii", "ignore").strip() if ttl is not None: try: ttl = int(ttl) except Exception: ttl = None if request.method == "GET": # Get the existing records matching the qname and rtype. return dns_get_records(qname, rtype) elif request.method in ("POST", "PUT"): # There is a default value for A/AAAA records. if rtype in ("A", "AAAA") and value == "": # normally REMOTE_ADDR but we're behind nginx as a reverse proxy value = request.environ.get("HTTP_X_FORWARDED_FOR") # Cannot add empty records. if value == '': return ("No value for the record provided.", 400) if request.method == "POST": # Add a new record (in addition to any existing records # for this qname-rtype pair). action = "add" elif request.method == "PUT": # In REST, PUT is supposed to be idempotent, so we'll # make this action set (replace all records for this # qname-rtype pair) rather than add (add a new record). action = "set" elif request.method == "DELETE": if value == '': # Delete all records for this qname-type pair. value = None else: # Delete just the qname-rtype-value record exactly. pass action = "remove" if set_custom_dns_record(qname, rtype, value, action, env, ttl=ttl): return do_dns_update(env) or "Something isn't right." return "OK" except ValueError as e: return (str(e), 400) @app.route('/dns/dump') @authorized_personnel_only def dns_get_dump(): from dns_update import build_recommended_dns return json_response(build_recommended_dns(env)) @app.route('/dns/zonefile/') @authorized_personnel_only def dns_get_zonefile(zone): from dns_update import get_dns_zonefile return Response(get_dns_zonefile(zone, env), status=200, mimetype='text/plain') # SSL @app.route('/ssl/status') @authorized_personnel_only def ssl_get_status(): from ssl_certificates import get_certificates_to_provision from web_update import get_web_domains_info, get_web_domains # What domains can we provision certificates for? What unexpected problems do we have? provision, cant_provision = get_certificates_to_provision( env, show_valid_certs=False) # What's the current status of TLS certificates on all of the domain? domains_status = get_web_domains_info(env) domains_status = [{ "domain": d["domain"], "status": d["ssl_certificate"][0], "text": d["ssl_certificate"][1] + ((" " + cant_provision[d["domain"]] if d["domain"] in cant_provision else "")) } for d in domains_status] # Warn the user about domain names not hosted here because of other settings. for domain in set(get_web_domains(env, exclude_dns_elsewhere=False)) - set( get_web_domains(env)): domains_status.append({ "domain": domain, "status": "not-applicable", "text": "The domain's website is hosted elsewhere.", }) return json_response({ "can_provision": utils.sort_domains(provision, env), "status": domains_status, }) @app.route('/ssl/csr/', methods=['POST']) @authorized_personnel_only def ssl_get_csr(domain): from ssl_certificates import create_csr ssl_private_key = os.path.join( os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem')) return create_csr(domain, ssl_private_key, request.form.get('countrycode', ''), env) @app.route('/ssl/install', methods=['POST']) @authorized_personnel_only def ssl_install_cert(): from web_update import get_web_domains from ssl_certificates import install_cert domain = request.form.get('domain') ssl_cert = request.form.get('cert') ssl_chain = request.form.get('chain') if domain not in get_web_domains(env): return "Invalid domain name." return install_cert(domain, ssl_cert, ssl_chain, env) @app.route('/ssl/provision', methods=['POST']) @authorized_personnel_only def ssl_provision_certs(): from ssl_certificates import provision_certificates requests = provision_certificates(env, limit_domains=None) return json_response({"requests": requests}) # multi-factor auth @app.route('/mfa/status', methods=['POST']) @authorized_personnel_only def mfa_get_status(): # Anyone accessing this route is an admin, and we permit them to # see the MFA status for any user if they submit a 'user' form # field. But we don't include provisioning info since a user can # only provision for themselves. # user field if given, otherwise the user making the request email = request.form.get('user', request.user_email) try: resp = {"enabled_mfa": get_public_mfa_state(email, env)} if email == request.user_email: resp.update({"new_mfa": {"totp": provision_totp(email, env)}}) except ValueError as e: return (str(e), 400) return json_response(resp) @app.route('/mfa/totp/enable', methods=['POST']) @authorized_personnel_only def totp_post_enable(): secret = request.form.get('secret') token = request.form.get('token') label = request.form.get('label') if type(token) != str: return ("Bad Input", 400) try: validate_totp_secret(secret) enable_mfa(request.user_email, "totp", secret, token, label, env) except ValueError as e: return (str(e), 400) return "OK" @app.route('/mfa/disable', methods=['POST']) @authorized_personnel_only def totp_post_disable(): # Anyone accessing this route is an admin, and we permit them to # disable the MFA status for any user if they submit a 'user' form # field. # user field if given, otherwise the user making the request email = request.form.get('user', request.user_email) try: result = disable_mfa(email, request.form.get('mfa-id') or None, env) # convert empty string to None except ValueError as e: return (str(e), 400) if result: # success return "OK" else: # error return ("Invalid user or MFA id.", 400) # WEB @app.route('/web/domains') @authorized_personnel_only def web_get_domains(): from web_update import get_web_domains_info return json_response(get_web_domains_info(env)) @app.route('/web/update', methods=['POST']) @authorized_personnel_only def web_update(): from web_update import do_web_update try: return do_web_update(env) except Exception as e: return (str(e), 500) # System @app.route('/system/version', methods=["GET"]) @authorized_personnel_only def system_version(): from status_checks import what_version_is_this try: return what_version_is_this(env) except Exception as e: return (str(e), 500) @app.route('/system/latest-upstream-version', methods=["POST"]) @authorized_personnel_only def system_latest_upstream_version(): from status_checks import get_latest_miab_version try: return get_latest_miab_version() except Exception as e: return (str(e), 500) @app.route('/system/status', methods=["POST"]) @authorized_personnel_only def system_status(): from status_checks import run_checks class WebOutput: def __init__(self): self.items = [] def add_heading(self, heading): self.items.append({ "type": "heading", "text": heading, "extra": [] }) def print_ok(self, message): self.items.append({"type": "ok", "text": message, "extra": []}) def print_error(self, message): self.items.append({"type": "error", "text": message, "extra": []}) def print_warning(self, message): self.items.append({ "type": "warning", "text": message, "extra": [] }) def print_na(self, message): self.items.append({"type": "na", "text": message, "extra": []}) def print_line(self, message, monospace=False): self.items[-1]["extra"].append({ "text": message, "monospace": monospace }) output = WebOutput() # Create a temporary pool of processes for the status checks with multiprocessing.pool.Pool(processes=5) as pool: run_checks(False, env, output, pool) return json_response(output.items) @app.route('/system/updates') @authorized_personnel_only def show_updates(): from status_checks import list_apt_updates return "".join("%s (%s)\n" % (p["package"], p["version"]) for p in list_apt_updates()) @app.route('/system/update-packages', methods=["POST"]) @authorized_personnel_only def do_updates(): utils.shell("check_call", ["/usr/bin/apt-get", "-qq", "update"]) return utils.shell("check_output", ["/usr/bin/apt-get", "-y", "upgrade"], env={"DEBIAN_FRONTEND": "noninteractive"}) @app.route('/system/reboot', methods=["GET"]) @authorized_personnel_only def needs_reboot(): from status_checks import is_reboot_needed_due_to_package_installation if is_reboot_needed_due_to_package_installation(): return json_response(True) else: return json_response(False) @app.route('/system/reboot', methods=["POST"]) @authorized_personnel_only def do_reboot(): # To keep the attack surface low, we don't allow a remote reboot if one isn't necessary. from status_checks import is_reboot_needed_due_to_package_installation if is_reboot_needed_due_to_package_installation(): return utils.shell("check_output", ["/sbin/shutdown", "-r", "now"], capture_stderr=True) else: return "No reboot is required, so it is not allowed." @app.route('/system/backup/status') @authorized_personnel_only def backup_status(): from backup import backup_status try: return json_response(backup_status(env)) except Exception as e: return json_response({"error": str(e)}) @app.route('/system/backup/config', methods=["GET"]) @authorized_personnel_only def backup_get_custom(): from backup import get_backup_config return json_response(get_backup_config(env, for_ui=True)) @app.route('/system/backup/config', methods=["POST"]) @authorized_personnel_only def backup_set_custom(): from backup import backup_set_custom return json_response( backup_set_custom(env, request.form.get('target', ''), request.form.get('target_user', ''), request.form.get('target_pass', ''), request.form.get('target_rsync_port', ''), request.form.get('min_age', ''))) @app.route('/system/backup/new', methods=["POST"]) @authorized_personnel_only def backup_new(): from backup import perform_backup, get_backup_config # If backups are disabled, don't perform the backup config = get_backup_config(env) if config["target"] == "off": return "Backups are disabled in this machine. Nothing was done." msg = perform_backup(request.form.get('full', False) == 'true', True) return "OK" if msg is None else msg @app.route('/system/privacy', methods=["GET"]) @authorized_personnel_only def privacy_status_get(): config = utils.load_settings(env) return json_response(config.get("privacy", True)) @app.route('/system/privacy', methods=["POST"]) @authorized_personnel_only def privacy_status_set(): config = utils.load_settings(env) config["privacy"] = (request.form.get('value') == "private") utils.write_settings(config, env) return "OK" @app.route('/system/smtp/relay', methods=["GET"]) @authorized_personnel_only def smtp_relay_get(): config = utils.load_settings(env) dkim_rrtxt = "" rr = config.get("SMTP_RELAY_DKIM_RR", None) if rr is not None: if rr.get("p") is None: raise ValueError("Key doesn't exist!") for c, d in (("v", "DKIM1"), ("h", None), ("k", "rsa"), ("n", None), ("s", None), ("t", None)): txt = rr.get(c, d) if txt is None: continue else: dkim_rrtxt += f"{c}={txt}; " dkim_rrtxt += f"p={rr.get('p')}" return { "enabled": config.get("SMTP_RELAY_ENABLED", False), "host": config.get("SMTP_RELAY_HOST", ""), "port": config.get("SMTP_RELAY_PORT", None), "user": config.get("SMTP_RELAY_USER", ""), "authorized_servers": config.get("SMTP_RELAY_AUTHORIZED_SERVERS", []), "spf_record": config.get("SMTP_RELAY_SPF_RECORD", None), "dkim_selector": config.get("SMTP_RELAY_DKIM_SELECTOR", None), "dkim_rr": dkim_rrtxt } @app.route('/system/smtp/relay', methods=["POST"]) @authorized_personnel_only def smtp_relay_set(): from editconf import edit_conf from os import chmod import re import socket import ssl config = utils.load_settings(env) newconf = request.form # Is DKIM configured? sel = newconf.get("dkim_selector") if sel is None or sel.strip() == "": config["SMTP_RELAY_DKIM_SELECTOR"] = None config["SMTP_RELAY_DKIM_RR"] = None elif re.fullmatch(r"[a-z\d\._]+", sel.strip()) is None: return ("The DKIM selector is invalid!", 400) # DKIM selector looks good, try processing the RR rr = newconf.get("dkim_rr", "") if rr.strip() == "": return ("Cannot publish a selector with an empty key!", 400) components = {} for r in re.split(r"[;\s]+", rr): sp = re.split(r"\=", r) if len(sp) != 2: return ("DKIM public key RR is malformed!", 400) components[sp[0]] = sp[1] if not components.get("p"): return ("The DKIM public key doesn't exist!", 400) config["SMTP_RELAY_DKIM_SELECTOR"] = sel config["SMTP_RELAY_DKIM_RR"] = components relay_on = False implicit_tls = False if newconf.get("enabled") == "true": relay_on = True # Try negotiating TLS directly. We need to know this because we need to configure Postfix # to be aware of this detail. try: ctx = ssl.create_default_context() with socket.create_connection( (newconf.get("host"), int(newconf.get("port"))), 5) as sock: with ctx.wrap_socket(sock, server_hostname=newconf.get("host")): implicit_tls = True except ssl.SSLError as sle: # Couldn't connect via TLS, configure Postfix to send via STARTTLS print(sle.reason) except (socket.herror, socket.gaierror) as he: return ( f"Unable to resolve hostname (it probably is incorrect): {he.strerror}", 400) except socket.timeout: return ( "We couldn't connect to the server. Is it down or did you write the wrong port number?", 400) pw_file = "/etc/postfix/sasl_passwd" modify_password = True # Check that if the provided password is empty, that there was a password saved before if (newconf.get("key", "") == ""): if os.path.isfile(pw_file): modify_password = False else: return ( "Please provide a password/key (there is no existing password to retain).", 400) try: # Write on daemon settings config["local_dkim_selector"] = "mailorigin" if relay_on and sel == "mail" else "mail" config["SMTP_RELAY_ENABLED"] = relay_on config["SMTP_RELAY_HOST"] = newconf.get("host") config["SMTP_RELAY_PORT"] = int(newconf.get("port")) config["SMTP_RELAY_USER"] = newconf.get("user") config["SMTP_RELAY_AUTHORIZED_SERVERS"] = [s.strip() for s in re.split(r"[, ]+", newconf.get("authorized_servers", []) or "") if s.strip() != ""] config["SMTP_RELAY_SPF_RECORD"] = newconf.get("spf_record") utils.write_settings(config, env) # Write on Postfix configs edit_conf("/etc/postfix/main.cf", [ "relayhost=" + (f"[{config['SMTP_RELAY_HOST']}]:{config['SMTP_RELAY_PORT']}" if config["SMTP_RELAY_ENABLED"] else ""), f"smtp_tls_wrappermode={'yes' if implicit_tls else 'no'}" ], delimiter_re=r"\s*=\s*", delimiter="=", comment_char="#") # Edit the sasl password (still will edit the file, but keep the pw) with open(pw_file, "a+") as f: f.seek(0) pwm = re.match(r"\[.+\]\:[0-9]+\s.+\:(.*)", f.readline()) if (pwm is None or len(pwm.groups()) != 1) and not modify_password: # Well if this isn't a bruh moment return ( "Please provide a password/key (there is no existing password to retain).", 400) f.truncate(0) f.write( f"[{config['SMTP_RELAY_HOST']}]:{config['SMTP_RELAY_PORT']} {config['SMTP_RELAY_USER']}:{newconf.get('key') if modify_password else pwm[1]}\n" ) chmod(pw_file, 0o600) utils.shell("check_output", ["/usr/sbin/postmap", pw_file], capture_stderr=True) # Regenerate DNS (to apply whatever changes need to be made) from dns_update import do_dns_update do_dns_update(env) # Restart Postfix return utils.shell("check_output", ["/usr/sbin/postfix", "reload"], capture_stderr=True) except Exception as e: return (str(e), 400) # PGP @app.route('/system/pgp/', methods=["GET"]) @authorized_personnel_only def get_keys(): from pgp import get_daemon_key, get_imported_keys, key_representation return { "daemon": key_representation(get_daemon_key()), "imported": list(map(key_representation, get_imported_keys())) } @app.route('/system/pgp/', methods=["GET"]) @authorized_personnel_only def get_key(fpr): from pgp import get_key, key_representation k = get_key(fpr) if k is None: abort(404) return key_representation(k) @app.route('/system/pgp/', methods=["DELETE"]) @authorized_personnel_only def delete_key(fpr): from pgp import delete_key from wkd import parse_wkd_list, build_wkd try: if delete_key(fpr) is None: abort(404) removed = parse_wkd_list()[0] build_wkd() return json_response([e[0] for e in removed]) except ValueError as e: return (str(e), 400) @app.route('/system/pgp//export', methods=["GET"]) @authorized_personnel_only def export_key(fpr): from pgp import export_key exp = export_key(fpr) if exp is None: abort(404) return exp @app.route('/system/pgp/import', methods=["POST"]) @authorized_personnel_only def import_key(): from pgp import import_key from wkd import build_wkd k = request.form.get('key') try: result = import_key(k) build_wkd() # Rebuild the WKD return { "keys_read": result.considered, "keys_added": result.imported, "keys_unchanged": result.unchanged, "uids_added": result.new_user_ids, "sigs_added": result.new_signatures, "revs_added": result.new_revocations } except ValueError as e: return (str(e), 400) # Web Key Directory @app.route('/system/pgp/wkd', methods=["GET"]) @authorized_personnel_only def get_wkd_status(): from pgp import get_daemon_key, get_imported_keys, key_representation from wkd import get_user_fpr_maps, get_wkd_config from mailconfig import get_domain options = get_user_fpr_maps() chosen = get_wkd_config() wkd_tmp = { x: { "options": list(options.get(x)), "selected": chosen.get(x) } for x in options.keys() } wkd = {} for e in wkd_tmp.keys(): if wkd.get(get_domain(e)) is None: wkd[get_domain(e)] = {} wkd[get_domain(e)][e] = wkd_tmp[e] return { "keys": { x.get("master_fpr"): x for x in map(key_representation, [get_daemon_key()] + get_imported_keys()) }, "wkd": wkd } @app.route('/system/pgp/wkd', methods=["POST"]) @authorized_personnel_only def update_wkd(): from wkd import update_wkd_config, build_wkd update_wkd_config(request.form) build_wkd() return "OK" @app.route('/system/default-quota', methods=["GET"]) @authorized_personnel_only def default_quota_get(): if request.values.get('text'): return get_default_quota(env) else: return json_response({ "default-quota": get_default_quota(env), }) @app.route('/system/default-quota', methods=["POST"]) @authorized_personnel_only def default_quota_set(): config = utils.load_settings(env) try: config["default-quota"] = validate_quota( request.values.get('default_quota')) utils.write_settings(config, env) except ValueError as e: return ("ERROR: %s" % str(e), 400) return "OK" # MUNIN @app.route('/munin/') @authorized_personnel_only def munin_start(): # Munin pages, static images, and dynamically generated images are served # outside of the AJAX API. We'll start with a 'start' API that sets a cookie # that subsequent requests will read for authorization. (We don't use cookies # for the API to avoid CSRF vulnerabilities.) response = make_response("OK") response.set_cookie("session", auth_service.create_session_key(request.user_email, env, type='cookie'), max_age=60 * 30, secure=True, httponly=True, samesite="Strict") # 30 minute duration return response def check_request_cookie_for_admin_access(): session = auth_service.get_session(None, request.cookies.get("session", ""), "cookie", env) if not session: return False privs = get_mail_user_privileges(session["email"], env) if not isinstance(privs, list): return False if "admin" not in privs: return False return True def authorized_personnel_only_via_cookie(f): @wraps(f) def g(*args, **kwargs): if not check_request_cookie_for_admin_access(): return Response("Unauthorized", status=403, mimetype='text/plain', headers={}) return f(*args, **kwargs) return g @app.route('/munin/') @authorized_personnel_only_via_cookie def munin_static_file(filename=""): # Proxy the request to static files. if filename == "": filename = "index.html" return send_from_directory("/var/cache/munin/www", filename) @app.route('/munin/cgi-graph/') @authorized_personnel_only_via_cookie def munin_cgi(filename): """ Relay munin cgi dynazoom requests /usr/lib/munin/cgi/munin-cgi-graph is a perl cgi script in the munin package that is responsible for generating binary png images _and_ associated HTTP headers based on parameters in the requesting URL. All output is written to stdout which munin_cgi splits into response headers and binary response data. munin-cgi-graph reads environment variables to determine what it should do. It expects a path to be in the env-var PATH_INFO, and a querystring to be in the env-var QUERY_STRING. munin-cgi-graph has several failure modes. Some write HTTP Status headers and others return nonzero exit codes. Situating munin_cgi between the user-agent and munin-cgi-graph enables keeping the cgi script behind mailinabox's auth mechanisms and avoids additional support infrastructure like spawn-fcgi. """ COMMAND = 'su - munin --preserve-environment --shell=/bin/bash -c /usr/lib/munin/cgi/munin-cgi-graph' # su changes user, we use the munin user here # --preserve-environment retains the environment, which is where Popen's `env` data is # --shell=/bin/bash ensures the shell used is bash # -c "/usr/lib/munin/cgi/munin-cgi-graph" passes the command to run as munin # "%s" is a placeholder for where the request's querystring will be added if filename == "": return ("a path must be specified", 404) query_str = request.query_string.decode("utf-8", 'ignore') env = { 'PATH_INFO': '/%s/' % filename, 'REQUEST_METHOD': 'GET', 'QUERY_STRING': query_str } code, binout = utils.shell( 'check_output', COMMAND.split(" ", 5), # Using a maxsplit of 5 keeps the last arguments together env=env, return_bytes=True, trap=True) if code != 0: # nonzero returncode indicates error app.logger.error( "munin_cgi: munin-cgi-graph returned nonzero exit code, %s", code) return ("error processing graph image", 500) # /usr/lib/munin/cgi/munin-cgi-graph returns both headers and binary png when successful. # A double-Windows-style-newline always indicates the end of HTTP headers. headers, image_bytes = binout.split(b'\r\n\r\n', 1) response = make_response(image_bytes) for line in headers.splitlines(): name, value = line.decode("utf8").split(':', 1) response.headers[name] = value if 'Status' in response.headers and '404' in response.headers['Status']: app.logger.warning( "munin_cgi: munin-cgi-graph returned 404 status code. PATH_INFO=%s", env['PATH_INFO']) return response def log_failed_login(request): # We need to figure out the ip to list in the message, all our calls are routed # through nginx who will put the original ip in X-Forwarded-For. # During setup we call the management interface directly to determine the user # status. So we can't always use X-Forwarded-For because during setup that header # will not be present. if request.headers.getlist("X-Forwarded-For"): ip = request.headers.getlist("X-Forwarded-For")[0] else: ip = request.remote_addr # We need to add a timestamp to the log message, otherwise /dev/log will eat the "duplicate" # message. app.logger.warning( "Mail-in-a-Box Management Daemon: Failed login attempt from ip %s - timestamp %s" % (ip, time.time())) # APP if __name__ == '__main__': if "DEBUG" in os.environ: # Turn on Flask debugging. app.debug = True if not app.debug: app.logger.addHandler(utils.create_syslog_handler()) #app.logger.info('API key: ' + auth_service.key) # Start the application server. Listens on 127.0.0.1 (IPv4 only). app.run(port=10222)