DNS・SSL伝播を自動監視!Pythonで作る「反映完了通知スクリプト」
サイト移行後の「反映した?まだ?」を、Pythonで自動監視して完了時に通知します。A/WWW/MXのDNS到達確認、SSL証明書のCN/SAN一致・有効期限チェックまで一括で行い、OKになった瞬間にメールやLINEで知らせます。
できること
- Aレコード /
wwwサブドメイン / MX(メール)の到達確認 - HTTPS(SSL)の有効性・証明書名一致(CN/SAN)・期限日残数
- 反映完了時に メール or LINE で1回だけ通知
- 複数ドメインを
TARGETSに列挙して同時監視
スクリプト(保存名:watch_dns_ssl.py)
Python 3 環境で動作します。必要に応じて TARGETS と NOTIFY を調整してください。
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket, ssl, smtplib, subprocess, datetime, json, pathlib, re
from email.mime.text import MIMEText
# ===== 設定 =====
TARGETS = [
{
"domain": "example.com",
"expect_ipv4": ["93.184.216.34"], # 想定Aレコード(空でもOK)
"check_www": True,
"check_mx": True,
"check_https": True,
},
# 例: {"domain": "kowa-frp.com", "expect_ipv4": [], "check_www": True, "check_mx": True, "check_https": True},
]
NOTIFY = {
"email": {
"enabled": True,
"to": "you@example.com",
},
"line": {
"enabled": False,
"token": "PUT_YOUR_LINE_NOTIFY_TOKEN",
}
}
STATE_FILE = pathlib.Path("./.dns_ssl_watch_state.json")
TIMEOUT = 5 # 秒
# ==============
def dig(name, rr="A"):
try:
out = subprocess.check_output(["dig", "+short", name, rr], timeout=TIMEOUT).decode().strip()
return [x for x in out.split("\n") if x]
except Exception:
return []
def resolve_a(name):
try:
return list({sock[4][0] for sock in socket.getaddrinfo(name, None, proto=socket.IPPROTO_TCP)})
except Exception:
return []
def check_https_cert(host, port=443):
try:
ctx = ssl.create_default_context()
with socket.create_connection((host, port), timeout=TIMEOUT) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
not_after = cert.get('notAfter')
subject = dict(x[0] for x in cert.get('subject', []))
cn = subject.get('commonName', '')
alt = [t[1] for t in cert.get('subjectAltName', []) if t[0] == 'DNS']
exp = datetime.datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
name_ok = host == cn or any(match_hostname(host, a) for a in alt)
return {"ok": True, "cn": cn, "sans": alt, "expires": exp, "name_ok": name_ok}
except Exception as e:
return {"ok": False, "error": str(e)}
def match_hostname(host, pattern):
return host.endswith(pattern[1:]) if pattern.startswith("*.") else host == pattern
def send_email(subject, body):
if not NOTIFY["email"]["enabled"]:
return
msg = MIMEText(body, _charset="utf-8")
msg["Subject"] = subject
msg["From"] = "watcher@localhost"
msg["To"] = NOTIFY["email"]["to"]
try:
subprocess.Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=subprocess.PIPE).communicate(msg.as_bytes())
except Exception:
with smtplib.SMTP("localhost") as s:
s.send_message(msg)
def send_line(body):
if not NOTIFY["line"]["enabled"]:
return
try:
import urllib.request
req = urllib.request.Request(
"https://notify-api.line.me/api/notify",
data=("message="+body).encode("utf-8"),
headers={"Authorization": "Bearer " + NOTIFY["line"]["token"],
"Content-Type": "application/x-www-form-urlencoded"}
)
urllib.request.urlopen(req, timeout=10).read()
except Exception:
pass
def load_state():
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {}
def save_state(s):
STATE_FILE.write_text(json.dumps(s, default=str, ensure_ascii=False, indent=2))
def main():
state = load_state()
for t in TARGETS:
dom = t["domain"]
want = t.get("expect_ipv4", [])
passed, failed = [], []
a_now = resolve_a(dom)
if want and not any(ip in a_now for ip in want):
failed.append(f"A {dom} -> {a_now} (期待: {want})")
else:
passed.append(f"A {dom} -> {a_now}")
if t.get("check_www"):
a_www = resolve_a("www."+dom)
if want and not any(ip in a_www for ip in want):
failed.append(f"A www.{dom} -> {a_www} (期待: {want})")
else:
passed.append(f"A www.{dom} -> {a_www}")
if t.get("check_mx"):
mx = dig(dom, "MX")
(passed if mx else failed).append(f"MX {dom} -> {mx or 'なし'}")
if t.get("check_https"):
cert = check_https_cert(dom)
if cert["ok"] and cert["name_ok"]:
days = (cert["expires"] - datetime.datetime.utcnow()).days
passed.append(f"HTTPS OK / 残り{days}日 / CN={cert['cn']}")
else:
failed.append(f"HTTPS NG: {cert.get('error','name mismatch')}")
all_ok = len(failed) == 0
already = state.get(dom, {}).get("notified_ok", False)
if all_ok and not already:
body = "【DNS/SSL 伝播完了】\n" + "\n".join(passed)
send_email(f"[OK] {dom} 伝播完了", body)
send_line(body)
state[dom] = {"notified_ok": True, "last_ok": datetime.datetime.utcnow().isoformat()}
else:
state[dom] = {"notified_ok": all_ok, "failed": failed, "passed": passed}
save_state(state)
if __name__ == "__main__":
main()
使い方のポイント
TARGETSのdomainに監視したいドメインを列挙expect_ipv4に新サーバーのAレコードを入れると切替完了が判定しやすい- 通知はまずメール(ローカル送信)を有効化。LINE通知はトークンを入れて
enabled: trueに
定期実行(cron設定)
5分ごとに監視する例。Pythonのパスは環境に合わせて変更してください。
crontab -e
*/5 * * * * /usr/bin/python3 /path/to/watch_dns_ssl.py
よくある拡張
- SPF/DKIM/DMARC:
dig TXTを追加して期待値と一致確認 - 外部SMTPで送信:
smtplib.SMTP('smtp.example.com', 587)へ差し替え - Slack/Discord通知:WebhookへPOSTを追加
- マルチドメイン:
TARGETSに複数列挙
まとめ
DNS/SSLの伝播確認は自動化がラク。サーバー移行やドメイン切替の待ち時間をゼロにし、切替完了の瞬間を確実に捉えましょう。



