|
@@ -0,0 +1,167 @@
|
|
|
+#!/usr/bin/python3
|
|
|
+# this file is formatted using black
|
|
|
+import argparse
|
|
|
+import os
|
|
|
+import sys
|
|
|
+from typing import List, Dict
|
|
|
+
|
|
|
+authorized_keys_file = "/home/git/.ssh/authorized_keys"
|
|
|
+host_public_key_file = "/etc/ssh/keys/ssh_host_ed25519_key.pub"
|
|
|
+usage = """auth <command> [<args>]
|
|
|
+
|
|
|
+Available commands are:
|
|
|
+ id Shows server key formatted for client's ~/.ssh/known_hosts
|
|
|
+ add <proto> <key> <label> Adds a client key to the list of authorized keys.
|
|
|
+ <proto> <key> <label> is typically the content of the public key file of the client.
|
|
|
+ The label must be unique; usage of existing labels will overwrite the existing key
|
|
|
+ without warning.
|
|
|
+ rm <label> Removes client key with given label from the list of authorized keys
|
|
|
+ ls [-v] Lists all authorized keys.
|
|
|
+ cat Print contents of the authorized_keys file.
|
|
|
+"""
|
|
|
+
|
|
|
+
|
|
|
+class AuthorizedKeysException(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+class AuthorizedKeys(list):
|
|
|
+ """ Provides management of an SSH "authorized_keys" file, restricted to a subset of functionality. """
|
|
|
+
|
|
|
+ def __init__(self, location: str = authorized_keys_file) -> None:
|
|
|
+ super().__init__()
|
|
|
+ self.location = location
|
|
|
+ self.db = {}
|
|
|
+ try:
|
|
|
+ with open(location) as f:
|
|
|
+ for line in f:
|
|
|
+ if not line.strip() or line.startswith("#"):
|
|
|
+ continue
|
|
|
+ proto, key, label = line.strip().split(" ", maxsplit=2)
|
|
|
+ self.db[label] = proto + " " + key
|
|
|
+ except FileNotFoundError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def add(self, proto: str, key: str, label: str) -> None:
|
|
|
+ """ Adds an authorized key. """
|
|
|
+ if label in self.db:
|
|
|
+ raise AuthorizedKeysException(f'Key with label "{label}" already exists.')
|
|
|
+ self.db[label] = proto + " " + key
|
|
|
+ self._save()
|
|
|
+
|
|
|
+ def rm(self, label: str) -> None:
|
|
|
+ """ Removes an authorized key, identified by its label. Raises if key with given label cannot be found. """
|
|
|
+ try:
|
|
|
+ del self.db[label]
|
|
|
+ except KeyError:
|
|
|
+ raise AuthorizedKeysException(
|
|
|
+ f'Could not find authorized key with label "{label}".'
|
|
|
+ )
|
|
|
+ self._save()
|
|
|
+
|
|
|
+ def ls(self) -> Dict[str, str]:
|
|
|
+ """ Returns dictionary of authorized keys, identified by their labels. """
|
|
|
+ return self.db.copy()
|
|
|
+
|
|
|
+ def _save(self) -> None:
|
|
|
+ real_location = os.path.realpath(self.location)
|
|
|
+ with open(real_location + '~', "w") as f:
|
|
|
+ for label, proto_key in self.db.items():
|
|
|
+ f.write(proto_key + " " + label + "\n")
|
|
|
+ os.rename(real_location + '~', real_location)
|
|
|
+
|
|
|
+
|
|
|
+def main(args: List[str]) -> None:
|
|
|
+ """ Command line application main entry point. """
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ usage=usage,
|
|
|
+ )
|
|
|
+ parser.add_argument("command", help="Subcommand to run")
|
|
|
+ parsed = parser.parse_args(args[0:1])
|
|
|
+
|
|
|
+ cmd = {
|
|
|
+ "id": cmd_id,
|
|
|
+ "add": cmd_add,
|
|
|
+ "rm": cmd_rm,
|
|
|
+ "ls": cmd_ls,
|
|
|
+ "cat": cmd_cat,
|
|
|
+ }.get(parsed.command, None)
|
|
|
+ if not callable(cmd):
|
|
|
+ print(f'Unrecognized command "{parsed.command}"')
|
|
|
+ parser.print_usage()
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmd(args[1:])
|
|
|
+ except AuthorizedKeysException as e:
|
|
|
+ sys.stderr.write(str(e) + "\n")
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+
|
|
|
+def cmd_id(args: List[str]) -> None:
|
|
|
+ """ Entrypoint for CLI command that shows this hosts SSH public key. """
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.parse_args(args)
|
|
|
+ with open(host_public_key_file, "r") as f:
|
|
|
+ print(f'desec.{os.environ["DESECSTACK_DOMAIN"]} {f.readline().strip()}')
|
|
|
+
|
|
|
+
|
|
|
+def cmd_add(args: List[str]) -> None:
|
|
|
+ """ Entrypoint for CLI command that adds an authorized SSH key. """
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument("proto", help="Protocol used with given key.")
|
|
|
+ parser.add_argument("key", help="Public key of authorized key pair.")
|
|
|
+ parser.add_argument("label", help="Label under which the key is stored.")
|
|
|
+ parsed = parser.parse_args(args)
|
|
|
+ AuthorizedKeys().add(parsed.proto, parsed.key, parsed.label)
|
|
|
+
|
|
|
+
|
|
|
+def cmd_rm(args: List[str]) -> None:
|
|
|
+ """ Entrypoint for CLI command that removes an authorized SSH key. """
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument("label", help="The key with this label will be removed.")
|
|
|
+ parsed = parser.parse_args(args)
|
|
|
+ AuthorizedKeys().rm(parsed.label)
|
|
|
+
|
|
|
+
|
|
|
+def cmd_ls(args: List[str]) -> None:
|
|
|
+ """ Entrypoint for CLI command that shows all authorized SSH keys. """
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument(
|
|
|
+ "-v",
|
|
|
+ "--verbose",
|
|
|
+ action="store_true",
|
|
|
+ help="Lists all authorized keys.",
|
|
|
+ )
|
|
|
+ parsed = parser.parse_args(args)
|
|
|
+ keys = AuthorizedKeys().ls()
|
|
|
+
|
|
|
+ if not keys:
|
|
|
+ return
|
|
|
+
|
|
|
+ if parsed.verbose:
|
|
|
+ label_length = max(len(label) for label in keys) + 2
|
|
|
+ print(
|
|
|
+ "\n".join(
|
|
|
+ f"{label:{label_length}s} {proto_key}"
|
|
|
+ for label, proto_key in keys.items()
|
|
|
+ )
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ print("\n".join(keys))
|
|
|
+
|
|
|
+
|
|
|
+def cmd_cat(args: List[str]) -> None:
|
|
|
+ """ Entrypoint for CLI command that outputs an exact copy of the SSH authorized keys file. """
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.parse_args(args)
|
|
|
+ try:
|
|
|
+ with open(authorized_keys_file, "r") as f:
|
|
|
+ print(f.read())
|
|
|
+ except FileNotFoundError:
|
|
|
+ sys.stderr.write(f'Authorized keys file not found at "{authorized_keys_file}".')
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main(sys.argv[1:])
|