From b41ca57cb3de5f463319a7bf8eb4c00dc4b091cb Mon Sep 17 00:00:00 2001 From: chayleaf Date: Wed, 5 Jun 2024 19:34:29 +0700 Subject: [PATCH] server: switch from certspotter to a crt.sh-based monitor --- system/hosts/server/certspotter.nix | 55 +++++----- system/hosts/server/certspotter.py | 158 ++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 24 deletions(-) create mode 100644 system/hosts/server/certspotter.py diff --git a/system/hosts/server/certspotter.nix b/system/hosts/server/certspotter.nix index 0d3efee..2c4a5fc 100644 --- a/system/hosts/server/certspotter.nix +++ b/system/hosts/server/certspotter.nix @@ -5,33 +5,19 @@ let cfg = config.server; + python = pkgs.python3.withPackages (p: with p; [ cryptography pyasn1 pyasn1-modules requests ]); + tool = pkgs.writeScript "certspotter.py" '' + #!${python}/bin/python3 + ${builtins.readFile ./certspotter.py} + ''; in { - security.acme.certs = lib.flip builtins.mapAttrs (lib.filterAttrs (k: v: v.enableACME) config.services.nginx.virtualHosts) (k: v: { - postRun = let - python = pkgs.python3.withPackages (p: with p; [ cryptography pyasn1 pyasn1-modules ]); - tbs-hash = pkgs.writeScript "tbs-hash.py" '' - #!${python}/bin/python3 - import hashlib - from pyasn1.codec.der.decoder import decode - from pyasn1.codec.der.encoder import encode - from pyasn1_modules import rfc5280 - from cryptography import x509 - - with open('full.pem', 'rb') as f: - cert = x509.load_pem_x509_certificate(f.read()) - tbs, _leftover = decode(cert.tbs_certificate_bytes, asn1Spec=rfc5280.TBSCertificate()) - precert_exts = [v.dotted_string for k, v in x509.ExtensionOID.__dict__.items() if k.startswith('PRECERT_')] - exts = [ext for ext in tbs["extensions"] if str(ext["extnID"]) not in precert_exts] - tbs["extensions"].clear() - tbs["extensions"].extend(exts) - print(hashlib.sha256(encode(tbs)).hexdigest()) - ''; - in '' - ${tbs-hash} > "/var/lib/certspotter/tbs-hashes/${k}" + security.acme.certs = lib.mkIf config.services.certspotter.enable (lib.flip builtins.mapAttrs (lib.filterAttrs (k: v: v.enableACME) config.services.nginx.virtualHosts) (k: v: { + postRun = '' + ${tool} tbs full.pem > "/var/lib/certspotter/tbs-hashes/${k}" ''; - }); + })); services.certspotter = { - enable = true; + enable = false; extraFlags = [ ]; watchlist = [ ".${cfg.domainName}" ]; hooks = lib.toList (pkgs.writeShellScript "certspotter-hook" '' @@ -41,4 +27,25 @@ in { (echo "Subject: $SUMMARY" && echo && cat "$TEXT_FILENAME") | /run/wrappers/bin/sendmail -i webmaster-certspotter@${cfg.domainName} ''); }; + systemd.services.certspotter-lite = { + script = '' + exec ${tool} spot \ + -c /var/lib/acme/certspotter-lite.txt \ + -d ${cfg.domainName} \ + -t webmaster-certspotter@${cfg.domainName} \ + -s /run/wrappers/bin/sendmail \ + /var/lib/acme/*/full.pem + ''; + serviceConfig = { + User = "acme"; + Group = "acme"; + Type = "oneshot"; + }; + }; + systemd.timers.certspotter-lite = { + wantedBy = [ "timers.target" ]; + partOf = [ "certspotter-lite.service" ]; + timerConfig.OnCalendar = [ "*-*-* 00:00:00" ]; # every day + timerConfig.RandomizedDelaySec = 43200; # execute at random time in the first 12 hours + }; } diff --git a/system/hosts/server/certspotter.py b/system/hosts/server/certspotter.py new file mode 100644 index 0000000..d170533 --- /dev/null +++ b/system/hosts/server/certspotter.py @@ -0,0 +1,158 @@ +import argparse +import hashlib +import requests +import subprocess +import traceback +from datetime import date +from pyasn1.codec.der.decoder import decode +from pyasn1.codec.der.encoder import encode +from pyasn1_modules import rfc5280 +from cryptography import x509 + + +def calc_tbs(pem: bytes) -> str: + cert = x509.load_pem_x509_certificate(pem) + tbs, _leftover = decode( + cert.tbs_certificate_bytes, asn1Spec=rfc5280.TBSCertificate() + ) + precert_exts = [ + v.dotted_string + for k, v in x509.ExtensionOID.__dict__.items() + if k.startswith("PRECERT_") + ] + exts = [ext for ext in tbs["extensions"] if str(ext["extnID"]) not in precert_exts] + tbs["extensions"].clear() + tbs["extensions"].extend(exts) + return hashlib.sha256(encode(tbs)).hexdigest() + + +def main() -> None: + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(required=True) + spot = subparsers.add_parser("spot") + spot.set_defaults(func=spotter) + spot.add_argument("--sendmail", "-s", type=str, required=True) + spot.add_argument("--from", "-f", type=str, required=False, default="Certificate Monitoring") + spot.add_argument("--to", "-t", type=str, required=False) + spot.add_argument("--domain", "-d", type=str, required=True) + spot.add_argument("--cache_file", "-c", type=str, required=True) + spot.add_argument("certs", type=str, nargs="*") + tbs = subparsers.add_parser("tbs") + tbs.set_defaults(func=print_tbs) + tbs.add_argument("path", type=str) + args = parser.parse_args() + args.func(args) + + +def print_tbs(args) -> None: + with open(args.path, "rb") as f: + print(calc_tbs(f.read())) + + +def send_mail( + sendmail: str, from_: str | None, to: str | None, subject: str, text: str +): + proc = subprocess.Popen( + [sendmail, "-i"] + (["-F", from_] if from_ else []) + (['--', to] if to else []), + stdin=subprocess.PIPE, + ) + assert proc.stdin is not None + proc.stdin.write(f"Subject: {subject}\n\n".encode("utf-8")) + proc.stdin.write((text + "\n").encode("utf-8")) + proc.stdin.close() + proc.wait() + assert proc.returncode == 0 + + +def spotter(args) -> None: + try: + spotter1(args) + except Exception: + subject = "Certificate monitoring failure" + text = traceback.format_exc() + send_mail(args.sendmail, args.__dict__["from"], args.to, subject, text) + + +def spotter1(args) -> None: + url = f"https://crt.sh/?CN={args.domain}&dir=^&sort=1&group=none" + + try: + with open(args.cache_file, "rt") as f: + lastid = int(f.read()) + except FileNotFoundError: + lastid = 0 + + body = requests.get(url).text + + def parse_row(row: str, tag: str) -> list[str]: + ret = [] + for col in row.split(f"")[:-1]: + if "" in col: + col = col.split("")[-2].split(">")[-1] + else: + col = col.split(">")[-1] + ret.append(col) + return ret + + cols: list[str] = [] + rows: list[dict[str, str]] = [] + for s_row in body.split("")[2:]: + s_row = s_row.split("")[0] + if " 30: + continue + pem_urls[crtid] = f"https://crt.sh/?d={crtid}" + issuers[crtid] = row.get("Issuer Name", "") + cns[crtid] = row.get("Matching Identities", "") + + if not pem_urls: + return + + valid_hashes: set[str] = set() + for path in args.certs: + with open(path, "rb") as f1: + valid_hashes.add(calc_tbs(f1.read())) + + pems: dict[int, bytes] = {} + + for id, pem_url in pem_urls.items(): + lastid = max(id, lastid) + pems[id] = requests.get(pem_url).content + + invalid_ids: set[int] = set() + + for id, pem in pems.items(): + if calc_tbs(pem) not in valid_hashes: + invalid_ids.add(id) + + if invalid_ids: + subject = f"{len(invalid_ids)} invalid certs discovered!" + text = "\n".join( + f"https://crt.sh/?id={id} ({cns[id]}, {issuers[id]})" + for id in sorted(invalid_ids) + ) + send_mail(args.sendmail, args.__dict__["from"], args.to, subject, text) + + with open(args.cache_file, "wt") as f: + f.write(str(lastid)) + + +if __name__ == "__main__": + main()