あらゆる営業、設計、積算、納品、製作、搬入、据付、現地施工、保守メンテナンス、雑用に対応します!
DNS・SSL伝播を自動監視!Pythonで作る「反映完了通知スクリプト」

DNS・SSL伝播を自動監視!Pythonで作る「反映完了通知スクリプト」

サイト移行後の「反映した?まだ?」を、Pythonで自動監視して完了時に通知します。A/WWW/MXのDNS到達確認、SSL証明書のCN/SAN一致・有効期限チェックまで一括で行い、OKになった瞬間にメールやLINEで知らせます。

できること

  • Aレコード / www サブドメイン / MX(メール)の到達確認
  • HTTPS(SSL)の有効性・証明書名一致(CN/SAN)・期限日残数
  • 反映完了時に メール or LINE で1回だけ通知
  • 複数ドメインを TARGETS に列挙して同時監視

スクリプト(保存名:watch_dns_ssl.py

Python 3 環境で動作します。必要に応じて TARGETSNOTIFY を調整してください。

# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket, ssl, smtplib, subprocess, datetime, json, pathlib, re
from email.mime.text import MIMEText

# ===== 設定 =====
TARGETS = [
    {
        "domain": "example.com",
        "expect_ipv4": ["93.184.216.34"],   # 想定Aレコード(空でもOK)
        "check_www": True,
        "check_mx": True,
        "check_https": True,
    },
    # 例: {"domain": "kowa-frp.com", "expect_ipv4": [], "check_www": True, "check_mx": True, "check_https": True},
]

NOTIFY = {
    "email": {
        "enabled": True,
        "to": "you@example.com",
    },
    "line": {
        "enabled": False,
        "token": "PUT_YOUR_LINE_NOTIFY_TOKEN",
    }
}

STATE_FILE = pathlib.Path("./.dns_ssl_watch_state.json")
TIMEOUT = 5  # 秒
# ==============

def dig(name, rr="A"):
    try:
        out = subprocess.check_output(["dig", "+short", name, rr], timeout=TIMEOUT).decode().strip()
        return [x for x in out.split("\n") if x]
    except Exception:
        return []

def resolve_a(name):
    try:
        return list({sock[4][0] for sock in socket.getaddrinfo(name, None, proto=socket.IPPROTO_TCP)})
    except Exception:
        return []

def check_https_cert(host, port=443):
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((host, port), timeout=TIMEOUT) as sock:
            with ctx.wrap_socket(sock, server_hostname=host) as ssock:
                cert = ssock.getpeercert()
                not_after = cert.get('notAfter')
                subject = dict(x[0] for x in cert.get('subject', []))
                cn = subject.get('commonName', '')
                alt = [t[1] for t in cert.get('subjectAltName', []) if t[0] == 'DNS']
                exp = datetime.datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
                name_ok = host == cn or any(match_hostname(host, a) for a in alt)
                return {"ok": True, "cn": cn, "sans": alt, "expires": exp, "name_ok": name_ok}
    except Exception as e:
        return {"ok": False, "error": str(e)}

def match_hostname(host, pattern):
    return host.endswith(pattern[1:]) if pattern.startswith("*.") else host == pattern

def send_email(subject, body):
    if not NOTIFY["email"]["enabled"]:
        return
    msg = MIMEText(body, _charset="utf-8")
    msg["Subject"] = subject
    msg["From"] = "watcher@localhost"
    msg["To"] = NOTIFY["email"]["to"]
    try:
        subprocess.Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=subprocess.PIPE).communicate(msg.as_bytes())
    except Exception:
        with smtplib.SMTP("localhost") as s:
            s.send_message(msg)

def send_line(body):
    if not NOTIFY["line"]["enabled"]:
        return
    try:
        import urllib.request
        req = urllib.request.Request(
            "https://notify-api.line.me/api/notify",
            data=("message="+body).encode("utf-8"),
            headers={"Authorization": "Bearer " + NOTIFY["line"]["token"],
                     "Content-Type": "application/x-www-form-urlencoded"}
        )
        urllib.request.urlopen(req, timeout=10).read()
    except Exception:
        pass

def load_state():
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {}

def save_state(s):
    STATE_FILE.write_text(json.dumps(s, default=str, ensure_ascii=False, indent=2))

def main():
    state = load_state()

    for t in TARGETS:
        dom = t["domain"]
        want = t.get("expect_ipv4", [])
        passed, failed = [], []

        a_now = resolve_a(dom)
        if want and not any(ip in a_now for ip in want):
            failed.append(f"A {dom} -> {a_now} (期待: {want})")
        else:
            passed.append(f"A {dom} -> {a_now}")

        if t.get("check_www"):
            a_www = resolve_a("www."+dom)
            if want and not any(ip in a_www for ip in want):
                failed.append(f"A www.{dom} -> {a_www} (期待: {want})")
            else:
                passed.append(f"A www.{dom} -> {a_www}")

        if t.get("check_mx"):
            mx = dig(dom, "MX")
            (passed if mx else failed).append(f"MX {dom} -> {mx or 'なし'}")

        if t.get("check_https"):
            cert = check_https_cert(dom)
            if cert["ok"] and cert["name_ok"]:
                days = (cert["expires"] - datetime.datetime.utcnow()).days
                passed.append(f"HTTPS OK / 残り{days}日 / CN={cert['cn']}")
            else:
                failed.append(f"HTTPS NG: {cert.get('error','name mismatch')}")

        all_ok = len(failed) == 0
        already = state.get(dom, {}).get("notified_ok", False)
        if all_ok and not already:
            body = "【DNS/SSL 伝播完了】\n" + "\n".join(passed)
            send_email(f"[OK] {dom} 伝播完了", body)
            send_line(body)
            state[dom] = {"notified_ok": True, "last_ok": datetime.datetime.utcnow().isoformat()}
        else:
            state[dom] = {"notified_ok": all_ok, "failed": failed, "passed": passed}

    save_state(state)

if __name__ == "__main__":
    main()

使い方のポイント

  • TARGETSdomain に監視したいドメインを列挙
  • expect_ipv4 に新サーバーのAレコードを入れると切替完了が判定しやすい
  • 通知はまずメール(ローカル送信)を有効化。LINE通知はトークンを入れて enabled: true

定期実行(cron設定)

5分ごとに監視する例。Pythonのパスは環境に合わせて変更してください。

crontab -e
*/5 * * * * /usr/bin/python3 /path/to/watch_dns_ssl.py

よくある拡張

  • SPF/DKIM/DMARCdig TXT を追加して期待値と一致確認
  • 外部SMTPで送信smtplib.SMTP('smtp.example.com', 587) へ差し替え
  • Slack/Discord通知:WebhookへPOSTを追加
  • マルチドメインTARGETS に複数列挙

まとめ

DNS/SSLの伝播確認は自動化がラク。サーバー移行やドメイン切替の待ち時間をゼロにし、切替完了の瞬間を確実に捉えましょう。

おすすめの記事