Disclaimer: This information is provided as-is for the benefit of the Community. Please contact Sophos Professional Services if you require assistance with your specific environment.
Table of Contents
Overview
This article provides clear, step-by-step instructions to set up and run a firewall automation script. The script fetches Microsoft endpoints (IPv4, IPv6, and FQDNs) from the Microsoft-published URL, creates corresponding Host and HostGroup objects, and keeps them up to date. It includes a scheduler that runs every 24 hours to refresh the HostGroups automatically. These HostGroups can be seamlessly used in firewall rules, SD-WAN policies, and other configurations, ensuring they always remain current.
Prerequisites
- Windows 10 or 11 with administrative access.
- Basic internet connection.
Steps
Install Windows Subsystem for Linux (WSL)
- Open PowerShell as Administrator:
- Press Win + X > Select Windows PowerShell (Admin).
- Enable WSL by running the following command:
- wsl --install
-
- This will install WSL and set up Ubuntu by default.
- If you get any prompt during the installation, click, Accept
- Restart your PC when prompted.
- After reboot, open Ubuntu from the Start menu.
- Follow the on-screen instructions to create a username and password for Ubuntu.
Install Python on WSL
- Open the Ubuntu terminal (WSL window).
- Update the package manager:
- sudo apt update && sudo apt upgrade -y
- Install Python 3.10+ and pip (Python's package manager):
- sudo apt install python3 python3-pip -y
- Verify the installation:
- python3 --version
- pip3 --version
- This ensures Python and pip are installed correctly.
Install Required Dependencies
- Install requests, schedule, and other dependencies using pip:
- pip3 install requests schedule
- Verify successful installation:
- pip3 list
- Confirm that requests and schedule appear in the list.
- If you get an error stating that the schedule and schedule are unable to be installed, please create a Virtual Environment following these steps:
- python3 -m venv myenv
- source myenv/bin/activate
- pip install requests schedule
- To confirm the schedule and requests were installed run
- pip3 list
- Package Version
------------------ ----------
certifi 2024.12.14
charset-normalizer 3.4.0
idna 3.10
pip 24.0
requests 2.32.3
schedule 1.2.2
urllib3 2.2.3
- Package Version
- pip3 list
Prepare Your Script Files
- On your Windows machine, download Python script (Firewall_Automation.py) and configuration file (config.py) to a folder, e.g., C:\FirewallAutomation.
- Download
import requests import xml.etree.ElementTree as ET from xml.dom.minidom import parseString import uuid import ipaddress import time import schedule import subprocess import logging from datetime import datetime import os import re from config import ( FIREWALL_API_URL, MS_ENDPOINT_URL, CLIENT_REQUEST_ID, FIREWALL_USERNAME, FIREWALL_PASSWORD, ) # Setup logging os.makedirs("logs", exist_ok=True) logging.basicConfig( filename=os.path.join("logs", "firewall_automation.log"), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) CIDR_TO_SUBNET = { "32": "255.255.255.255", "31": "255.255.255.254", "30": "255.255.255.252", "29": "255.255.255.248", "28": "255.255.255.240", "27": "255.255.255.224", "26": "255.255.255.192", "25": "255.255.255.128", "24": "255.255.255.0", "23": "255.255.254.0", "22": "255.255.252.0", "21": "255.255.248.0", "20": "255.255.240.0", "19": "255.255.224.0", "18": "255.255.192.0", "17": "255.255.128.0", "16": "255.255.0.0", "15": "255.254.0.0", "14": "255.252.0.0", "13": "255.248.0.0", "12": "255.240.0.0", "11": "255.224.0.0", "10": "255.192.0.0", "9": "255.128.0.0", "8": "255.0.0.0" } def fetch_data(url): """Fetch data from a URL with error handling.""" try: response = requests.get(url) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(f"Failed to fetch data: {str(e)}") raise def generate_transaction_id(): """Generate a random 10-character transaction ID.""" return uuid.uuid4().hex[:10] def validate_ip(ip_str): """Validate IP address and return IP version.""" try: ip = ipaddress.ip_network(ip_str, strict=False) return "IPv6" if isinstance(ip, ipaddress.IPv6Network) else "IPv4", str(ip.network_address), str(ip.prefixlen) except ValueError as e: raise ValueError(f"Invalid IP address format: {ip_str}") def create_ip_host(name, ip_str, host_type="Network"): """Create an IPHost element with proper IP handling.""" transaction_id = generate_transaction_id() ip_host = ET.Element("IPHost", {"transactionid": transaction_id}) ET.SubElement(ip_host, "Name").text = name ET.SubElement(ip_host, "HostType").text = host_type ip_family, ip_address, cidr = validate_ip(ip_str) ET.SubElement(ip_host, "IPFamily").text = ip_family if ip_family == "IPv4": subnet = CIDR_TO_SUBNET.get(cidr, "255.255.255.0") # Default to /24 if not found ET.SubElement(ip_host, "IPAddress").text = ip_address ET.SubElement(ip_host, "Subnet").text = subnet else: # IPv6 ET.SubElement(ip_host, "IPAddress").text = ip_address ET.SubElement(ip_host, "Subnet").text = cidr return ip_host def create_ip_host_group(name, ip_family, hosts): """Create an IPHostGroup element.""" transaction_id = generate_transaction_id() group = ET.Element("IPHostGroup", {"transactionid": transaction_id}) ET.SubElement(group, "Name").text = name ET.SubElement(group, "Description").text = f"Microsoft {ip_family} Host Group" host_list = ET.SubElement(group, "HostList") for host_name in hosts: ET.SubElement(host_list, "Host").text = host_name ET.SubElement(group, "IPFamily").text = ip_family return group def create_fqdn_host(name, fqdn): """Create an FQDNHost element.""" transaction_id = generate_transaction_id() fqdn_host = ET.Element("FQDNHost", {"transactionid": transaction_id}) ET.SubElement(fqdn_host, "Name").text = name ET.SubElement(fqdn_host, "FQDN").text = fqdn return fqdn_host def create_fqdn_host_group(name, fqdn_hosts): """Create an FQDNHostGroup element.""" transaction_id = generate_transaction_id() group = ET.Element("FQDNHostGroup", {"transactionid": transaction_id}) ET.SubElement(group, "Name").text = name ET.SubElement(group, "Description").text = "Microsoft FQDN Host Group" fqdn_list = ET.SubElement(group, "FQDNHostList") for fqdn_host_name in fqdn_hosts: ET.SubElement(fqdn_list, "FQDNHost").text = fqdn_host_name return group def build_xml(data, operation="add"): """Build the XML structure with specified operation, avoiding duplicates.""" request = ET.Element("Request") login = ET.SubElement(request, "Login") ET.SubElement(login, "Username").text = FIREWALL_USERNAME ET.SubElement(login, "Password").text = FIREWALL_PASSWORD ip_host_set = ET.SubElement(request, "Set", {"operation": operation}) ip_v4_hosts = [] ip_v6_hosts = [] fqdn_hosts = [] processed_ips = set() # To track processed IPs processed_fqdns = set() # To track processed FQDNs fqdn_pattern = re.compile(r"^(?:\*|[a-zA-Z0-9-]+)(\.[a-zA-Z0-9-]+)+$") for entry in data: # Process IPs for ip in entry.get("ips", []): try: if ip in processed_ips: logging.info(f"Skipping duplicate IP: {ip}") continue ip_family, _, _ = validate_ip(ip) name = f"{ip_family}_{ip.replace('/', '_').replace(':', '_')}" ip_host = create_ip_host(name, ip) ip_host_set.append(ip_host) processed_ips.add(ip) # Mark IP as processed if ip_family == "IPv4": ip_v4_hosts.append(name) else: ip_v6_hosts.append(name) except ValueError as e: logging.warning(f"Skipping invalid IP {ip}: {str(e)}") # Process FQDNs for fqdn in entry.get("urls", []): if fqdn in processed_fqdns: logging.info(f"Skipping duplicate FQDN: {fqdn}") continue if not fqdn_pattern.match(fqdn): logging.info(f"Skipping unsupported FQDN: {fqdn}") continue # Skip FQDNs that do not match the desired pattern name = fqdn.replace(".", "_") fqdn_host = create_fqdn_host(name, fqdn) ip_host_set.append(fqdn_host) fqdn_hosts.append(name) processed_fqdns.add(fqdn) # Mark FQDN as processed # Create IPHost and FQDN groups if ip_v4_hosts: ip_host_set.append(create_ip_host_group("AutomationMicrosoftHostGroupv4", "IPv4", ip_v4_hosts)) if ip_v6_hosts: ip_host_set.append(create_ip_host_group("AutomationMicrosoftHostGroupv6", "IPv6", ip_v6_hosts)) if fqdn_hosts: ip_host_set.append(create_fqdn_host_group("AutomationMicrosoftFQDNGroup", fqdn_hosts)) # Convert XML to a string and remove the XML declaration line dom = parseString(ET.tostring(request)) xml_str = dom.toprettyxml() xml_str = '\n'.join(xml_str.split('\n')[1:]) return xml_str def make_firewall_request(xml_content, request_file, response_file): """Make API request to firewall and save request and response to files.""" try: # Save XML content to the request file with open(request_file, "w") as f: f.write(xml_content) # Construct and execute curl command curl_command = [ "curl", "-k", FIREWALL_API_URL, "-F", f"reqxml=<{request_file}", "-o", response_file ] result = subprocess.run(curl_command, capture_output=True, text=True) # Check if the command was successful if result.returncode == 0: logging.info(f"Successfully sent request to firewall. Response saved to {response_file}") return True else: logging.error(f"Failed to send request to firewall: {result.stderr}") return False except Exception as e: logging.error(f"Error making firewall request: {str(e)}") return False def build_delete_xml(data): """Build XML for deleting existing FQDN hosts.""" request = ET.Element("Request") login = ET.SubElement(request, "Login") ET.SubElement(login, "Username").text = FIREWALL_USERNAME ET.SubElement(login, "Password").text = FIREWALL_PASSWORD # Create a Remove section for FQDN hosts transaction_id = generate_transaction_id() remove_set = ET.SubElement(request, "Remove", {"transactionid": transaction_id}) fqdn_pattern = re.compile(r"^(?:\*|[a-zA-Z0-9-]+)(\.[a-zA-Z0-9-]+)+$") for entry in data: for fqdn in entry.get("urls", []): if not fqdn_pattern.match(fqdn): logging.info(f"Skipping unsupported FQDN for deletion: {fqdn}") continue # Create FQDNHost remove element fqdn_host_remove = ET.Element("FQDNHost") ET.SubElement(fqdn_host_remove, "Name").text = fqdn remove_set.append(fqdn_host_remove) # Convert XML to a string and remove the XML declaration line dom = parseString(ET.tostring(request)) xml_str = dom.toprettyxml() xml_str = '\n'.join(xml_str.split('\n')[1:]) return xml_str def update_firewall_rules(): """Main function to update Objects with FQDN deletion.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logging.info("Starting Objects update process") # Fetch Microsoft endpoints data data = fetch_data(f"{MS_ENDPOINT_URL}?clientrequestid={CLIENT_REQUEST_ID}") # Deletion of existing FQDN hosts logging.info("Attempting 'delete' operation for existing FQDN hosts") xml_data_delete = build_delete_xml(data) request_file_delete = os.path.join("requests", f"firewall_request_delete_{timestamp}.xml") response_file_delete = os.path.join("responses", f"firewall_response_delete_{timestamp}.xml") # Proceed with delete operation if make_firewall_request(xml_data_delete, request_file_delete, response_file_delete): logging.info("Existing FQDN hosts deleted successfully.") # Attempt 'add' operation logging.info("Attempting 'add' operation") xml_data_add = build_xml(data, operation="add") request_file_add = os.path.join("requests", f"firewall_request_add_{timestamp}.xml") response_file_add = os.path.join("responses", f"firewall_response_add_{timestamp}.xml") # Proceed only if 'add' operation is successful if make_firewall_request(xml_data_add, request_file_add, response_file_add): logging.info("Objects added successfully.") # Attempt 'update' operation after 'add' completes logging.info("Attempting 'update' operation") xml_data_update = build_xml(data, operation="update") request_file_update = os.path.join("requests", f"firewall_request_update_{timestamp}.xml") response_file_update = os.path.join("responses", f"firewall_response_update_{timestamp}.xml") if make_firewall_request(xml_data_update, request_file_update, response_file_update): logging.info("Objects updated successfully.") else: logging.warning("Objects 'update' operation failed.") else: logging.error("Objects 'add' operation failed. 'Update' operation skipped.") else: logging.error("FQDN deletion operation failed. Subsequent operations skipped.") except Exception as e: logging.error(f"Error during Object update: {str(e)}") def run_scheduler(): """Function to run the scheduler""" logging.info("Starting scheduler") schedule.every(24).hours.do(update_firewall_rules) try: while True: schedule.run_pending() time.sleep(60) # Check every minute instead of every second to reduce CPU usage except KeyboardInterrupt: logging.info("Scheduler stopped by user") except Exception as e: logging.error(f"Scheduler error: {str(e)}") def main(): """Main function to initialize and run the firewall automation""" try: logging.info("Starting Firewall Automation Script") # Create necessary directories if they don't exist os.makedirs("logs", exist_ok=True) os.makedirs("requests", exist_ok=True) os.makedirs("responses", exist_ok=True) # Run initial update logging.info("Running initial Objects update") update_firewall_rules() # Start the scheduler logging.info("Starting scheduled updates") run_scheduler() except Exception as e: logging.error(f"Main function error: {str(e)}") raise if __name__ == "__main__": main()
- Download
# config.py FIREWALL_API_URL = "https://172.16.16.16:4444/webconsole/APIController" # Firewall IP FIREWALL_USERNAME = "admin" # api user name or admin user FIREWALL_PASSWORD = "Admin@12345" # admin password MS_ENDPOINT_URL = "https://endpoints.office.com/endpoints/worldwide" #No change required CLIENT_REQUEST_ID = "b10c5ed1-bad1-445f-b386-b919946339a7" #No change required - basically it is part of URL only
- Download
- We recommend creating a folder called FirewallAutomation and adding the two files you just downloaded inside
- Access the Windows folder inside WSL:
- In Ubuntu, navigate to your folder location, in this example it is in FirewallAutomation that is located in the Desktop folder
- cd /mnt/c/Users/Emma/Desktop/FirewallAutomation
- Confirm your script files exist:
- ls
-
-
- You should see your script (firewall_automation.py) and config.py.
-
Edit Configuration File
- Open the config.py file in a text editor, such as Notepad++
- Update the following details:
- FIREWALL_API_URL = "https://<Your Firewall IP Address>/webconsole/APIController"
- FIREWALL_USERNAME = "Your Firewall Username"
- FIREWALL_PASSWORD = "Your Firewall Password"
- Save and close the file:
- Press Ctrl + O to save, then Ctrl + X to exit (If you are using a non-gui text editor such as VI
- If you are using Notepad++ simply save using the floppy disk icon
Enable API and Whitelist Source IP
- In the Sophos Firewall, go to System > Backup & Firmware > API
API configuration = Enabled
Allowed IP Address = Enter the IP of the computer that will run the script
Run the Script
- Run the script manually to test:
- python3 firewall_automation.py
Note: You will not see any output in the console
- Check the script ran successfully by checking the folder:
- Look for logs in the logs folder.
- Look for files named with “update” example firewall_response_update_<date-time>.
- Make sure there are no error in log file under folder response/*.xml
- Each line item should state: “Configuration applied successfully.”
- The script should:
- Fetch Microsoft endpoints.
- Add following:
- IPv4 hosts and IPv4 host group name: “AutomationMicrosoftHostGroupv4”
- IPv6 hosts and IPv6 host group name: “AutomationMicrosoftHostGroupv6”
- FQDN host and FQDN host group name: “AutomationMicrosoftFQDNGroup”
Schedule the Script to Run Automatically
- Open the script again and confirm the scheduler is running every 24 hours.
- Keep the Ubuntu terminal open to allow the script to run continuously.
- If you close the terminal, rerun the script manually:
- python3 firewall_automation.py
Troubleshooting
- WSL not installed?
- Run wsl --install again in PowerShell.
- Python version mismatch?
- Run python3 --version and ensure it's 3.10 or above.
- Missing dependencies?
- Install them using:
- pip3 install requests schedule
- Script errors?
- Check logs/requests/ folder for request XML data
- Check logs/response/ folder for response XML data: Do make sure there are no errors in response XML data for add and update operations.
- Check logs/ for script errors
.
[edited by: emmosophos at 9:06 PM (GMT -8) on 23 Dec 2024]