Important note about SSL VPN compatibility for 20.0 MR1 with EoL SFOS versions and UTM9 OS. Learn more in the release notes.

Sophos Firewall: Automating Microsoft Endpoint Objects in Firewall Using a Python Script on Windows

Disclaimer: This information is provided as-is for the benefit of the Community. Please contact Sophos Professional Services if you require assistance with your specific environment.


Overview

This article provides clear, step-by-step instructions to set up and run a firewall automation script. The script fetches Microsoft endpoints (IPv4, IPv6, and FQDNs) from the Microsoft-published URL, creates corresponding Host and HostGroup objects, and keeps them up to date. It includes a scheduler that runs every 24 hours to refresh the HostGroups automatically. These HostGroups can be seamlessly used in firewall rules, SD-WAN policies, and other configurations, ensuring they always remain current.

Prerequisites

  1. Windows 10 or 11 with administrative access.
  2. Basic internet connection.

Steps

Install Windows Subsystem for Linux (WSL)

  1. Open PowerShell as Administrator:
  • Press Win + X > Select Windows PowerShell (Admin).
  1. Enable WSL by running the following command:
  • wsl --install
    • This will install WSL and set up Ubuntu by default.
    • If you get any prompt during the installation, click, Accept
    • Restart your PC when prompted.
  1. After reboot, open Ubuntu from the Start menu.
    • Follow the on-screen instructions to create a username and password for Ubuntu.

Install Python on WSL

  1. Open the Ubuntu terminal (WSL window).
  2. Update the package manager:
  • sudo apt update && sudo apt upgrade -y
  1. Install Python 3.10+ and pip (Python's package manager):
  • sudo apt install python3 python3-pip -y
  1. Verify the installation:
  • python3 --version
  • pip3 --version
    • This ensures Python and pip are installed correctly.

Install Required Dependencies

  1. Install requests, schedule, and other dependencies using pip:
  • pip3 install requests schedule
  1. Verify successful installation:
  • pip3 list
  1. Confirm that requests and schedule appear in the list.
  2. If you get an error stating that the schedule and schedule are unable to be installed, please create a Virtual Environment following these steps:
    1. python3 -m venv myenv
    2. source myenv/bin/activate
    3. pip install requests schedule
  3. To confirm the schedule and requests were installed run
    1. pip3 list
      1. Package Version
        ------------------ ----------
        certifi 2024.12.14
        charset-normalizer 3.4.0
        idna 3.10
        pip 24.0
        requests 2.32.3
        schedule 1.2.2
        urllib3 2.2.3

Prepare Your Script Files

  1. On your Windows machine, download Python script (Firewall_Automation.py) and configuration file (config.py) to a folder, e.g., C:\FirewallAutomation.
    • Download 
      import requests
      import xml.etree.ElementTree as ET
      from xml.dom.minidom import parseString
      import uuid
      import ipaddress
      import time
      import schedule
      import subprocess
      import logging
      from datetime import datetime
      import os
      import re
      from config import (
          FIREWALL_API_URL,
          MS_ENDPOINT_URL,
          CLIENT_REQUEST_ID,
          FIREWALL_USERNAME,
          FIREWALL_PASSWORD,
      )
      
      
      # Setup logging
      os.makedirs("logs", exist_ok=True)
      logging.basicConfig(
          filename=os.path.join("logs", "firewall_automation.log"),
          level=logging.INFO,
          format='%(asctime)s - %(levelname)s - %(message)s'
      )
      
      
      CIDR_TO_SUBNET = {
          "32": "255.255.255.255",
          "31": "255.255.255.254",
          "30": "255.255.255.252",
          "29": "255.255.255.248",
          "28": "255.255.255.240",
          "27": "255.255.255.224",
          "26": "255.255.255.192",
          "25": "255.255.255.128",
          "24": "255.255.255.0",
          "23": "255.255.254.0",
          "22": "255.255.252.0",
          "21": "255.255.248.0",
          "20": "255.255.240.0",
          "19": "255.255.224.0",
          "18": "255.255.192.0",
          "17": "255.255.128.0",
          "16": "255.255.0.0",
          "15": "255.254.0.0",
          "14": "255.252.0.0",
          "13": "255.248.0.0",
          "12": "255.240.0.0",
          "11": "255.224.0.0",
          "10": "255.192.0.0",
          "9": "255.128.0.0",
          "8": "255.0.0.0"
      }
      
      
      def fetch_data(url):
          """Fetch data from a URL with error handling."""
          try:
              response = requests.get(url)
              response.raise_for_status()
              return response.json()
          except requests.exceptions.RequestException as e:
              logging.error(f"Failed to fetch data: {str(e)}")
              raise
      
      def generate_transaction_id():
          """Generate a random 10-character transaction ID."""
          return uuid.uuid4().hex[:10]
      
      def validate_ip(ip_str):
          """Validate IP address and return IP version."""
          try:
              ip = ipaddress.ip_network(ip_str, strict=False)
              return "IPv6" if isinstance(ip, ipaddress.IPv6Network) else "IPv4", str(ip.network_address), str(ip.prefixlen)
          except ValueError as e:
              raise ValueError(f"Invalid IP address format: {ip_str}")
      
      def create_ip_host(name, ip_str, host_type="Network"):
          """Create an IPHost element with proper IP handling."""
          transaction_id = generate_transaction_id()
          ip_host = ET.Element("IPHost", {"transactionid": transaction_id})
          ET.SubElement(ip_host, "Name").text = name
          ET.SubElement(ip_host, "HostType").text = host_type
      
          ip_family, ip_address, cidr = validate_ip(ip_str)
          ET.SubElement(ip_host, "IPFamily").text = ip_family
      
          if ip_family == "IPv4":
              subnet = CIDR_TO_SUBNET.get(cidr, "255.255.255.0")  # Default to /24 if not found
              ET.SubElement(ip_host, "IPAddress").text = ip_address
              ET.SubElement(ip_host, "Subnet").text = subnet
          else:  # IPv6
              ET.SubElement(ip_host, "IPAddress").text = ip_address
              ET.SubElement(ip_host, "Subnet").text = cidr
      
          return ip_host
      
      def create_ip_host_group(name, ip_family, hosts):
          """Create an IPHostGroup element."""
          transaction_id = generate_transaction_id()
          group = ET.Element("IPHostGroup", {"transactionid": transaction_id})
          ET.SubElement(group, "Name").text = name
          ET.SubElement(group, "Description").text = f"Microsoft {ip_family} Host Group"
          host_list = ET.SubElement(group, "HostList")
          for host_name in hosts:
              ET.SubElement(host_list, "Host").text = host_name
          ET.SubElement(group, "IPFamily").text = ip_family
          return group
      
      def create_fqdn_host(name, fqdn):
          """Create an FQDNHost element."""
          transaction_id = generate_transaction_id()
          fqdn_host = ET.Element("FQDNHost", {"transactionid": transaction_id})
          ET.SubElement(fqdn_host, "Name").text = name
          ET.SubElement(fqdn_host, "FQDN").text = fqdn
          return fqdn_host
      
      def create_fqdn_host_group(name, fqdn_hosts):
          """Create an FQDNHostGroup element."""
          transaction_id = generate_transaction_id()
          group = ET.Element("FQDNHostGroup", {"transactionid": transaction_id})
          ET.SubElement(group, "Name").text = name
          ET.SubElement(group, "Description").text = "Microsoft FQDN Host Group"
          fqdn_list = ET.SubElement(group, "FQDNHostList")
          for fqdn_host_name in fqdn_hosts:
              ET.SubElement(fqdn_list, "FQDNHost").text = fqdn_host_name
          return group
      
      
      def build_xml(data, operation="add"):
          """Build the XML structure with specified operation, avoiding duplicates."""
          request = ET.Element("Request")
          login = ET.SubElement(request, "Login")
          ET.SubElement(login, "Username").text = FIREWALL_USERNAME
          ET.SubElement(login, "Password").text = FIREWALL_PASSWORD
      
          ip_host_set = ET.SubElement(request, "Set", {"operation": operation})
      
          ip_v4_hosts = []
          ip_v6_hosts = []
          fqdn_hosts = []
      
          processed_ips = set()  # To track processed IPs
          processed_fqdns = set()  # To track processed FQDNs
      
          fqdn_pattern = re.compile(r"^(?:\*|[a-zA-Z0-9-]+)(\.[a-zA-Z0-9-]+)+$")
      
      
          for entry in data:
              # Process IPs
              for ip in entry.get("ips", []):
                  try:
                      if ip in processed_ips:
                          logging.info(f"Skipping duplicate IP: {ip}")
                          continue
      
                      ip_family, _, _ = validate_ip(ip)
                      name = f"{ip_family}_{ip.replace('/', '_').replace(':', '_')}"
                      ip_host = create_ip_host(name, ip)
                      ip_host_set.append(ip_host)
                      processed_ips.add(ip)  # Mark IP as processed
      
                      if ip_family == "IPv4":
                          ip_v4_hosts.append(name)
                      else:
                          ip_v6_hosts.append(name)
                  except ValueError as e:
                      logging.warning(f"Skipping invalid IP {ip}: {str(e)}")
      
              # Process FQDNs
              for fqdn in entry.get("urls", []):
                  if fqdn in processed_fqdns:
                      logging.info(f"Skipping duplicate FQDN: {fqdn}")
                      continue
      
                  if not fqdn_pattern.match(fqdn):
                      logging.info(f"Skipping unsupported FQDN: {fqdn}")
                      continue  # Skip FQDNs that do not match the desired pattern
      
                  name = fqdn.replace(".", "_")
                  fqdn_host = create_fqdn_host(name, fqdn)
                  ip_host_set.append(fqdn_host)
                  fqdn_hosts.append(name)
                  processed_fqdns.add(fqdn)  # Mark FQDN as processed
      
          # Create IPHost and FQDN groups
          if ip_v4_hosts:
              ip_host_set.append(create_ip_host_group("AutomationMicrosoftHostGroupv4", "IPv4", ip_v4_hosts))
          if ip_v6_hosts:
              ip_host_set.append(create_ip_host_group("AutomationMicrosoftHostGroupv6", "IPv6", ip_v6_hosts))
          if fqdn_hosts:
              ip_host_set.append(create_fqdn_host_group("AutomationMicrosoftFQDNGroup", fqdn_hosts))
      
          # Convert XML to a string and remove the XML declaration line
          dom = parseString(ET.tostring(request))
          xml_str = dom.toprettyxml()
          xml_str = '\n'.join(xml_str.split('\n')[1:])
          return xml_str
      
      def make_firewall_request(xml_content, request_file, response_file):
          """Make API request to firewall and save request and response to files."""
          try:
              # Save XML content to the request file
              with open(request_file, "w") as f:
                  f.write(xml_content)
      
              # Construct and execute curl command
              curl_command = [
                  "curl", "-k",
                  FIREWALL_API_URL,
                  "-F", f"reqxml=<{request_file}",
                  "-o", response_file
              ]
              
              result = subprocess.run(curl_command, capture_output=True, text=True)
              
              # Check if the command was successful
              if result.returncode == 0:
                  logging.info(f"Successfully sent request to firewall. Response saved to {response_file}")
                  return True
              else:
                  logging.error(f"Failed to send request to firewall: {result.stderr}")
                  return False
          except Exception as e:
              logging.error(f"Error making firewall request: {str(e)}")
              return False
      
      def build_delete_xml(data):
          """Build XML for deleting existing FQDN hosts."""
          request = ET.Element("Request")
          login = ET.SubElement(request, "Login")
          ET.SubElement(login, "Username").text = FIREWALL_USERNAME
          ET.SubElement(login, "Password").text = FIREWALL_PASSWORD
      
          # Create a Remove section for FQDN hosts
          transaction_id = generate_transaction_id()
          remove_set = ET.SubElement(request, "Remove", {"transactionid": transaction_id})
      
      
          fqdn_pattern = re.compile(r"^(?:\*|[a-zA-Z0-9-]+)(\.[a-zA-Z0-9-]+)+$")
          
          for entry in data:
              for fqdn in entry.get("urls", []):
                  if not fqdn_pattern.match(fqdn):
                      logging.info(f"Skipping unsupported FQDN for deletion: {fqdn}")
                      continue
      
                  # Create FQDNHost remove element
                  fqdn_host_remove = ET.Element("FQDNHost")
                  ET.SubElement(fqdn_host_remove, "Name").text = fqdn
                  remove_set.append(fqdn_host_remove)
      
          # Convert XML to a string and remove the XML declaration line
          dom = parseString(ET.tostring(request))
          xml_str = dom.toprettyxml()
          xml_str = '\n'.join(xml_str.split('\n')[1:])
          return xml_str
      
      def update_firewall_rules():
          """Main function to update Objects with FQDN deletion."""
          try:
              timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
              logging.info("Starting Objects update process")
      
              # Fetch Microsoft endpoints data
              data = fetch_data(f"{MS_ENDPOINT_URL}?clientrequestid={CLIENT_REQUEST_ID}")
      
              # Deletion of existing FQDN hosts
              logging.info("Attempting 'delete' operation for existing FQDN hosts")
              xml_data_delete = build_delete_xml(data)
              request_file_delete = os.path.join("requests", f"firewall_request_delete_{timestamp}.xml")
              response_file_delete = os.path.join("responses", f"firewall_response_delete_{timestamp}.xml")
      
              # Proceed with delete operation
              if make_firewall_request(xml_data_delete, request_file_delete, response_file_delete):
                  logging.info("Existing FQDN hosts deleted successfully.")
      
                  # Attempt 'add' operation
                  logging.info("Attempting 'add' operation")
                  xml_data_add = build_xml(data, operation="add")
                  request_file_add = os.path.join("requests", f"firewall_request_add_{timestamp}.xml")
                  response_file_add = os.path.join("responses", f"firewall_response_add_{timestamp}.xml")
      
                  # Proceed only if 'add' operation is successful
                  if make_firewall_request(xml_data_add, request_file_add, response_file_add):
                      logging.info("Objects added successfully.")
      
                      # Attempt 'update' operation after 'add' completes
                      logging.info("Attempting 'update' operation")
                      xml_data_update = build_xml(data, operation="update")
                      request_file_update = os.path.join("requests", f"firewall_request_update_{timestamp}.xml")
                      response_file_update = os.path.join("responses", f"firewall_response_update_{timestamp}.xml")
      
                      if make_firewall_request(xml_data_update, request_file_update, response_file_update):
                          logging.info("Objects updated successfully.")
                      else:
                          logging.warning("Objects 'update' operation failed.")
                  else:
                      logging.error("Objects 'add' operation failed. 'Update' operation skipped.")
              else:
                  logging.error("FQDN deletion operation failed. Subsequent operations skipped.")
      
          except Exception as e:
              logging.error(f"Error during Object update: {str(e)}")
      
      
      def run_scheduler():
          """Function to run the scheduler"""
          logging.info("Starting scheduler")
          schedule.every(24).hours.do(update_firewall_rules)
          
          try:
              while True:
                  schedule.run_pending()
                  time.sleep(60)  # Check every minute instead of every second to reduce CPU usage
          except KeyboardInterrupt:
              logging.info("Scheduler stopped by user")
          except Exception as e:
              logging.error(f"Scheduler error: {str(e)}")
      
      def main():
          """Main function to initialize and run the firewall automation"""
          try:
              logging.info("Starting Firewall Automation Script")
              
              # Create necessary directories if they don't exist
              os.makedirs("logs", exist_ok=True)
              os.makedirs("requests", exist_ok=True)
              os.makedirs("responses", exist_ok=True)
              
              # Run initial update
              logging.info("Running initial Objects update")
              update_firewall_rules()
              
              # Start the scheduler
              logging.info("Starting scheduled updates")
              run_scheduler()
              
          except Exception as e:
              logging.error(f"Main function error: {str(e)}")
              raise
      
      if __name__ == "__main__":
          main()
       
    • Download 
      # config.py
      
      FIREWALL_API_URL = "https://172.16.16.16:4444/webconsole/APIController" # Firewall IP
      FIREWALL_USERNAME = "admin" # api user name or admin user 
      FIREWALL_PASSWORD = "Admin@12345" # admin password
      
      MS_ENDPOINT_URL = "https://endpoints.office.com/endpoints/worldwide" #No change required
      CLIENT_REQUEST_ID = "b10c5ed1-bad1-445f-b386-b919946339a7" #No change required - basically it is part of URL only
       
  2. We recommend creating a folder called FirewallAutomation and adding the two files you just downloaded inside
  3. Access the Windows folder inside WSL:
    • In Ubuntu, navigate to your folder location, in this example it is in FirewallAutomation that is located in the Desktop folder 
  1. cd /mnt/c/Users/Emma/Desktop/FirewallAutomation
  1. Confirm your script files exist:
  • ls
      1. You should see your script (firewall_automation.py) and config.py.

Edit Configuration File

  1. Open the config.py file in a text editor, such as Notepad++
  2. Update the following details:
  • FIREWALL_API_URL = "https://<Your Firewall IP Address>/webconsole/APIController"
  • FIREWALL_USERNAME = "Your Firewall Username"
  • FIREWALL_PASSWORD = "Your Firewall Password"
  1. Save and close the file:
    • Press Ctrl + O to save, then Ctrl + X to exit (If you are using a non-gui text editor such as VI
    • If you are using Notepad++ simply save using the floppy disk icon

Enable API and Whitelist Source IP

  1. In the Sophos Firewall, go to System > Backup & Firmware > API 
    API configuration = Enabled
    Allowed IP Address = Enter the IP of the computer that will run the script

Run the Script

  1. Run the script manually to test:
  • python3 firewall_automation.py
    Note: You will not see any output in the console
  1. Check the script ran successfully by checking the folder:
    • Look for logs in the logs folder.
    • Look for files named with “update” example firewall_response_update_<date-time>.
    • Make sure there are no error in log file under folder response/*.xml
    • Each line item should state: “Configuration applied successfully.”
  2. The script should:
    • Fetch Microsoft endpoints.
    • Add following:
      1. IPv4 hosts and IPv4 host group name: “AutomationMicrosoftHostGroupv4
      2. IPv6 hosts and IPv6 host group name: “AutomationMicrosoftHostGroupv6
      3. FQDN host and FQDN host group name: “AutomationMicrosoftFQDNGroup

Schedule the Script to Run Automatically

  1. Open the script again and confirm the scheduler is running every 24 hours.
  2. Keep the Ubuntu terminal open to allow the script to run continuously.
  3. If you close the terminal, rerun the script manually:
  • python3 firewall_automation.py

Troubleshooting

  1. WSL not installed?
    • Run wsl --install again in PowerShell.
  2. Python version mismatch?
    • Run python3 --version and ensure it's 3.10 or above.
  3. Missing dependencies?
    • Install them using:
  • pip3 install requests schedule
  1. Script errors?
  • Check logs/requests/ folder for request XML data
  • Check logs/response/ folder for response XML data: Do make sure there are no errors in response XML data for add and update operations.
  • Check logs/ for script errors


.
[edited by: emmosophos at 9:06 PM (GMT -8) on 23 Dec 2024]