import argparse import hashlib import requests import subprocess import traceback from datetime import date from pyasn1.codec.der.decoder import decode from pyasn1.codec.der.encoder import encode from pyasn1_modules import rfc5280 from cryptography import x509 def calc_tbs(pem: bytes) -> str: cert = x509.load_pem_x509_certificate(pem) tbs, _leftover = decode( cert.tbs_certificate_bytes, asn1Spec=rfc5280.TBSCertificate() ) precert_exts = [ v.dotted_string for k, v in x509.ExtensionOID.__dict__.items() if k.startswith("PRECERT_") ] exts = [ext for ext in tbs["extensions"] if str(ext["extnID"]) not in precert_exts] tbs["extensions"].clear() tbs["extensions"].extend(exts) return hashlib.sha256(encode(tbs)).hexdigest() def main() -> None: parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(required=True) spot = subparsers.add_parser("spot") spot.set_defaults(func=spotter) spot.add_argument("--sendmail", "-s", type=str, required=True) spot.add_argument("--from", "-f", type=str, required=False, default="Certificate Monitoring") spot.add_argument("--to", "-t", type=str, required=False) spot.add_argument("--domain", "-d", type=str, required=True) spot.add_argument("--cache_file", "-c", type=str, required=True) spot.add_argument("certs", type=str, nargs="*") hash = subparsers.add_parser("hash") hash.set_defaults(func=print_hash) hash.add_argument("path", type=str) args = parser.parse_args() args.func(args) def print_hash(args) -> None: with open(args.path, "rb") as f: print(calc_tbs(f.read())) def send_mail( sendmail: str, from_: str | None, to: str | None, subject: str, text: str ): proc = subprocess.Popen( [sendmail, "-i"] + (["-F", from_] if from_ else []) + (['--', to] if to else []), stdin=subprocess.PIPE, ) assert proc.stdin is not None proc.stdin.write(f"Subject: {subject}\n\n".encode("utf-8")) proc.stdin.write((text + "\n").encode("utf-8")) proc.stdin.close() proc.wait() assert proc.returncode == 0 def spotter(args) -> None: try: spotter1(args) except Exception: subject = "Certificate monitoring failure" text = traceback.format_exc() send_mail(args.sendmail, args.__dict__["from"], args.to, subject, text) def spotter1(args) -> None: url = f"https://crt.sh/?CN={args.domain}&dir=^&sort=1&group=none" try: with open(args.cache_file, "rt") as f: lastid = int(f.read()) except FileNotFoundError: lastid = 0 body = requests.get(url).text def parse_row(row: str, tag: str) -> list[str]: ret = [] for col in row.split(f"{tag}>")[:-1]: if "" in col: col = col.split("")[-2].split(">")[-1] else: col = col.split(">")[-1] ret.append(col) return ret cols: list[str] = [] rows: list[dict[str, str]] = [] for s_row in body.split("