Hetzner Cloud Firewall Updater (Python API Script)

wen man bei einer Hetzner Cloud Firewall in den Regel eine IP anpassen möchte, z.b. hat man eine Dynamische IP, die Regeln von dieser Firewall sind auf diese Dynamische IP gesetzt, jetzt ändert sich die IP und man möchte das diese auch direkt in der Hetzner Cloud Firewall gesetzt werden, das geht über die API und ein kleines Python Script

das Script liest die angegebene Firewall aus, prüft die aktuelle IP und ändert diese nur bei Abweichung in allen regeln dieser Firewall, es wird nur die IP aktualisiert, die Ports bleiben unberührt, es werden nur eingehende Regeln berücksichtigt, ein logfile wird geschrieben.

benötigt wird Python und eine requests-Bibliothek

pip install requests

3 Zeilen müssen angepasst werden

HETZNER_API_TOKEN = "api-token"            # Deinen Hetzner-API-Token hier einfügen
FIREWALL_NAME = "firewall-name"            # Name der Firewall
LOG_FILE = "/var/log/firewall_update.log"  # Pfad zur Log-Datei

hetzner_cloud_firewall_updater.py

import requests
import json
from datetime import datetime

# Deine Hetzner API-Zugangsdaten
HETZNER_API_TOKEN = "api-token"            # Deinen Hetzner-API-Token hier einfügen
FIREWALL_NAME = "firewall-name"            # Name der Firewall
LOG_FILE = "/var/log/firewall_update.log"  # Pfad zur Log-Datei

# Log-Funktion
def write_log(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}\n"
    with open(LOG_FILE, "a") as log_file:
        log_file.write(log_entry)
    print(message)  # Ausgabe zusätzlich im Terminal

# Öffentliche IP abrufen
def get_current_ip():
    try:
        response = requests.get("https://api64.ipify.org?format=json")
        response.raise_for_status()
        current_ip = response.json()["ip"]
        return current_ip
    except requests.RequestException as e:
        write_log(f"Fehler beim Abrufen der aktuellen IP: {e}")
        return None

# Firewall-ID holen
def get_firewall_id(firewall_name):
    url = "https://api.hetzner.cloud/v1/firewalls"
    headers = {
        "Authorization": f"Bearer {HETZNER_API_TOKEN}",
        "Content-Type": "application/json"
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        firewalls = response.json()["firewalls"]
        for firewall in firewalls:
            if firewall["name"] == firewall_name:
                write_log(f"Firewall-ID für '{firewall_name}' gefunden: {firewall['id']}")
                return firewall["id"]
        write_log(f"Keine Firewall mit dem Namen '{firewall_name}' gefunden.")
    except requests.RequestException as e:
        write_log(f"Fehler beim Abrufen der Firewalls: {e}")
    return None

# Firewall-Regeln abrufen
def get_firewall_rules(firewall_id):
    url = f"https://api.hetzner.cloud/v1/firewalls/{firewall_id}"
    headers = {
        "Authorization": f"Bearer {HETZNER_API_TOKEN}",
        "Content-Type": "application/json"
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        rules = response.json()["firewall"]["rules"]
        write_log(f"{len(rules)} Firewall-Regeln abgerufen.")
        return rules
    except requests.RequestException as e:
        write_log(f"Fehler beim Abrufen der Firewall-Regeln: {e}")
        return []

# Firewall-Regeln aktualisieren
def update_firewall_rules(firewall_id, rules, new_ip):
    url = f"https://api.hetzner.cloud/v1/firewalls/{firewall_id}/actions/set_rules"
    headers = {
        "Authorization": f"Bearer {HETZNER_API_TOKEN}",
        "Content-Type": "application/json"
    }

    # Passe die Regeln mit der neuen IP an
    updated_rules = []
    new_ip_cidr = f"{new_ip}/32"  # CIDR-Format für eine Einzel-IP
    for rule in rules:
        if rule["direction"] == "in" and "source_ips" in rule:
            rule["source_ips"] = [new_ip_cidr]  # Ersetze die IP in jeder Regel
            updated_rules.append(rule)

    payload = {
        "rules": updated_rules
    }

    write_log(f"Zu sendender Payload: {json.dumps(payload, indent=4)}")

    try:
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        response.raise_for_status()
        write_log("Firewall-Regeln erfolgreich aktualisiert.")
    except requests.RequestException as e:
        write_log(f"Fehler beim Aktualisieren der Firewall-Regeln: {e}")
        write_log(f"Antwort von Hetzner: {response.text}")

# Hauptfunktion
def main():
    write_log("Skript gestartet.")
    
    # Hole die Firewall-ID basierend auf dem Namen
    firewall_id = get_firewall_id(FIREWALL_NAME)
    if not firewall_id:
        write_log(f"Firewall mit dem Namen '{FIREWALL_NAME}' nicht gefunden.")
        return

    # Hole die aktuelle öffentliche IP
    current_ip = get_current_ip()
    if not current_ip:
        write_log("Fehler beim Abrufen der aktuellen IP. Abbruch.")
        return
    write_log(f"Aktuelle öffentliche IP: {current_ip}")

    # Hole die aktuellen Regeln der Firewall
    firewall_rules = get_firewall_rules(firewall_id)
    if not firewall_rules:
        write_log("Keine Firewall-Regeln gefunden. Abbruch.")
        return

    # Prüfe, ob eine Aktualisierung notwendig ist
    new_ip_cidr = f"{current_ip}/32"
    ip_is_up_to_date = all(new_ip_cidr in rule.get("source_ips", []) for rule in firewall_rules if rule["direction"] == "in")

    if not ip_is_up_to_date:
        write_log("IP hat sich geändert. Aktualisiere die Firewall-Regeln...")
        update_firewall_rules(firewall_id, firewall_rules, current_ip)
    else:
        write_log("IP hat sich nicht geändert. Keine Aktion erforderlich.")

    write_log("Skript beendet.")

if __name__ == "__main__":
    main()

ausgeführt wird es so

python3 hetzner_cloud_firewall_updater.py

als cronjob hinzufügen, wird alle 30 Minuten ausgeführt

*/30 * * * * user python3 /pfad/zu/hetzner_cloud_firewall_updater.py