Important note about SSL VPN compatibility for 20.0 MR1 with EoL SFOS versions and UTM9 OS. Learn more in the release notes.

Sophos Firewall: Importing User definitions into Sophos Firewall after v18.0 MR3 and v17.5 MR14

Disclaimer: This information is provided as-is for the benefit of the Community. Please contact Sophos Professional Services if you require assistance with your specific environment.


Overview

In recent MR releases there have been several changes to improve the security of Sophos Firewall and to make it more difficult for attackers to get hold of sensitive information if something does go wrong. 

One of the changes was to remove the ability to export and import user accounts using csv files. There were several security concerns about this feature, in particular the added risk of exposure of password material that it encourages. Many customers now manage users by connecting their firewalls to an external directory service, such as Active Directory, which overall creates a much more secure and manageable environment.

However some customers have established processes that rely on importing lists of users from time to time. What can they do?

SFOS has an XML API that provides a way to automatically manage most objects and features that can be controlled via the Webadmin user interface, and User objects are no exception.

Using the XML API is a great alternative to the old CSV import feature for bulk adding of on-device user accounts.

But how? Well, here's an example. The attached Python program can read basic user data from a csv file and post it directly to your firewall's XML API. It basically replicates the old CSV import functionality.

*NOTE* If Python is not your thing, please check out the reply below in this thread where I added a Powershell version of this script. This can be run directly in Powershell on Windows without installing any other software.

If you have a process where, from time to time, you create a csv file with new user information in it, you can now use this script instead to import it directly instead of navigating through the WebAdmin UI.

PLEASE NOTE: The attached script does not currently work on SFOS installations where the Default Configuration Language is not English. I am working on an update and will post it here soon.

Here's how to do it. Not all steps will be required for everyone - if you already use Python, or if you use a Mac that has Python built-in, you should be pretty much set to go. 

Enabling XML API on your firewall

  1. Log in to your Sophos Firewall as an Administrator account
  2. Navigate to Backup & firmware > API
  3. Under API configuration, check the 'Enabled' box
  4. Under Allowed IP address, enter the IP address of the computer where you are going to run this program

Installing Python 3 and preparing dependencies

  1. Install Python 3 on your computer.
    1. Python 3 comes built in to some Unix-based systems, including MacOS.
    2. For Windows systems, you can download and install the latest Python installer from https://www.python.org
    3. For Linux distributions, your system's default package manager will almost certainly have a suitable package ready to install.
  2. When installing Python on Windows, make sure to select the option to "Add Python to PATH" on the first screen of the installer. This will allow you to run python scripts from a Windows command prompt.
  3. After installation completes, open a new Command Prompt window.
  4. Install additional libraries that are required to run this program - run
    C:\Users\John> pip install requests python-certifi-win32
  5. If you have enabled TLS decryption on your Sophos Firewall and get a certificate error running this command, you can either create a decryption exclusion for the domain pypi.org or get a copy of your firewall's root CA certificate in a file and rerun the command as follows:
    C:\Users\John> pip install requests python-certifi-win32 --cert mycacert.pem

Prepare your CSV file

This script expects you to provide a csv file, with fields separated by commas. Users with regional preferences that create csv files with semicolons or other characters may find that it works, or may need to adjust the script accordingly.

The file must have the following columns - Name, Username, Password, Email Address, Group:

Example file:

Name,Username,Password,Email Address,Group
Nigel Brown,nbrown,Pa5s!w0rd19,nigel.brown@example.com,Open Group
Gina Lopez,glopez,e1Azjr8q9^21,gina.lopez@example.com,Open Group

Downloading and running the script

Download this zip file and extract it to a directory on your computer. The rest of these instructions assume you saved it in 'Downloads', and that the csv file containing the users you want to add is in 'Documents' and called users.csv

Run the following command, substituting your firewall's hostname or IP address, admin username and password. If you're using an account with multi-factor authentication (OTP) then don't forget to append the authentication code to the end of the password:

C:\Users\John> python Downloads\UserImport.py -f myfirewall.example.com -i Documents\users.csv -u admin -p A1B2c3d4!!E5 -a

If you see exception messages related to certificate trust issues, try running the same command again, but add '-n' as an additional command-line qualifier. The argument '-n' tells Python to ignore any certificate validation errors when connecting to the firewall - python does not automatically pick up any additional root CAs from your Windows installation:

C:\Users\John> python Downloads\UserImport.py -f myfirewall.example.com -i Documents\users.csv -u admin -p A1B2c3d4!!E5 -a -n

As long as the details of all the users are correct, the import should proceed successfully. If it does not, check the message responses for clues as to what went wrong.

Troubleshooting

  • Error code 534: API operations are not allowed from the requester IP address.
    • You need to add the IP address of your computer to the 'Allowed IP' list on your Sophos Firewall at Backup & firmware > API
  • Login status: Authentication Failure
    • The username or password were incorrect.
    • The user specified is not an Administrator.
    • If the user is configured to use multi-factor authentication using one-time passwords, you'll need to add the current authentication code at the end of the user's password at the command line.
  • Error code 532: You need to enable the API Configuration.
    • You need to enable API configuration as shown in the steps above, under "Enabling XML API on your firewall"
  • Status 599, Message: Not having privilege to add/modify configuration.
    • The admin account specified is configured with a Device access profile that does not allow 'Write' operations for User objects.
  • Status 502, Message: A user with the same name already exists
    • You are in add mode (with the command line argument '-a') and there is already a user matching the name specified in your csv
    • If you want to update existing user records with the same name, omit the '-a' command line argument. This tells the script to send an 'update' command instead of an 'add' command to the XML API.
  • Status 511, Message: Operation failed. Please contact Support.
    • Run the following command to check applog.log:

      grep "common-password db" /log/applog.log

      Example:

      XGS107_SN01_SFOS 19.0.1 MR-1-Build365# grep "common-password db" /log/applog.log
      Nov 30 18:11:38Z Given password is found in common-password db, Try with different password

    • If this log line matches with the recent timestamp, then try using a different password in the csv file.

Python code

If you don't want to download the code as a zip file, you can copy and paste it from here:

# UserImport
#
# Takes basic User definitions from a CSV file and posts
# them to an XG Firewall using the XML API

import xml.etree.ElementTree as ET
import argparse
import requests
import sys
import configparser
import csv

serial = 0

UserAPIAddResults = {
    "200": (True, "User registered successfully"),
    "500": (False, "User couldn't be registered"),
    "502": (False, "A user with the same name already exists"),
    "503": (False, "A user with the same L2TP/PPTP IP already exists"),
    "510": (False, "Invalid password - doesn't meet complexity requirements")
}

UserAPIUpdateResults = {
    "200": (True, "User registered successfully"),
    "500": (False, "User couldn't be updated"),
    "502": (False, "A user with the same name already exists"),
    "503": (False, "A user with the same L2TP/PPTP IP already exists"),
    "510": (False, "Invalid password - doesn't meet complexity requirements"),
    "541": (False, "There must be at least one Administrator"),
    "542": (False,
            "There must be at least one user with Administrator profile")
}


def eprint(*args, **kwargs):
    # Print to stderr instead of stdout
    print(*args, file=sys.stderr, **kwargs)


def getArguments():
    parser = argparse.ArgumentParser(
        description='Grab a list of video IDs from a YouTube playlist')

    parser.add_argument(
        '-f', '--firewall',
        help=("To call the API directly, specify a firewall hostname or IP. "
              "Without this, an XML API document will be output to stdout."),
        default=argparse.SUPPRESS)
    parser.add_argument(
        '-i', '--input', metavar="FILENAME",
        help=("Name of CSV file containing these columns:"
              " Name, Username, Password, Email Address, Group"),
        default=argparse.SUPPRESS)
    parser.add_argument(
        '-u', '--fwuser', metavar="ADMIN-USERNAME",
        help='Admin username for the XG firewall',
        default='admin')
    parser.add_argument(
        '-p', '--fwpassword', metavar="ADMIN-PASSWORD",
        help='Password for the XG user - defaults to "admin"',
        default='password')
    parser.add_argument(
        '-a', '--add',
        help=('Call API in "Add" mode - use first time only (otherwise'
              ' Update will be used)'),
        action='store_true')
    parser.add_argument(
        '-n', '--insecure',
        help="Don't validate the Firewall's HTTPS certificate",
        action='store_false')
    parser.add_argument(
        '-1', '--oneshot',
        help=("Create a single enormous XMLAPI transaction instead "
              " of multiple smaller ones. Only used when outputting "
              "to stdout "),
        action='store_true')
    parser.add_argument(
        '-x', '--useimpex',
        help=("Create an Entities.xml-style file for inclusion in an "
              " Import tarball"),
        action='store_true')

    return parser.parse_args()


def readConfig():
    # Read in the temp config file (if it exists)
    global config
    global workingPath

    config = configparser.ConfigParser()
    config.read(workingPath)


def getserial():
    # Returns an incrementing serial number. Used to reference individual
    # XMLAPI transactions.
    global serial
    serial = serial + 1
    return str(serial)


def xgAPIStartUser(username, name, password, email,
                   group="Open Group", usertype="User",
                   surfquota="Unlimited Internet Access",
                   accesstime="Allowed all the time",
                   datatransferpolicy="", qospolicy="", sslvpnpolicy="",
                   clientlesspolicy="", status="Active", l2tp="Disable",
                   pptp="Disable", cisco="Disable",
                   quarantinedigest="Disable", macbinding="Disable",
                   loginrestriction="UserGroupNode",
                   accessschedule="All The Time", loginrestrictionapp="",
                   isencryptcert="Disable", simultaneouslogins="Enable"):
    # Returns a complete XML User record with some sane defaults which can
    # be modified later, if you want.

    userblock = ET.Element('User', transactionid=getserial())
    ET.SubElement(userblock, 'Username').text = username
    ET.SubElement(userblock, 'Name').text = name
    ET.SubElement(userblock, 'Password').text = password
    ET.SubElement(ET.SubElement(userblock, 'EmailList'),
                  'EmailID').text = email
    ET.SubElement(userblock, 'Group').text = group
    ET.SubElement(userblock, 'SurfingQuotaPolicy').text = surfquota
    ET.SubElement(userblock, 'AccessTimePolicy').text = accesstime
    ET.SubElement(userblock, 'DataTransferPolicy').text = datatransferpolicy
    ET.SubElement(userblock, 'QoSPolicy').text = qospolicy
    ET.SubElement(userblock, 'SSLVPNPolicy').text = sslvpnpolicy
    ET.SubElement(userblock, 'ClientlessPolicy').text = clientlesspolicy
    ET.SubElement(userblock, 'Status').text = status
    ET.SubElement(userblock, 'L2TP').text = l2tp
    ET.SubElement(userblock, 'PPTP').text = pptp
    ET.SubElement(userblock, 'CISCO').text = cisco
    ET.SubElement(userblock, 'QuarantineDigest').text = quarantinedigest
    ET.SubElement(userblock, 'MACBinding').text = macbinding
    ET.SubElement(userblock, 'LoginRestriction').text = loginrestriction
    ET.SubElement(
        userblock, 'ScheduleForApplianceAccess').text = accessschedule
    ET.SubElement(
        userblock, 'LoginRestrictionForAppliance').text = loginrestrictionapp
    ET.SubElement(userblock, 'IsEncryptCert').text = isencryptcert
    ET.SubElement(
        userblock, 'SimultaneousLoginsGlobal').text = simultaneouslogins

    return userblock


def xgAPILogin(fwuser, fwpassword):
    # Returns a root Login element for an XMLAPI call

    requestroot = ET.Element('Request')
    login = ET.SubElement(requestroot, 'Login')
    ET.SubElement(login, 'Username').text = fwuser
#  ET.SubElement(login, 'Password', passwordform = 'encrypt').text = fwpassword
    ET.SubElement(login, 'Password').text = fwpassword

    return requestroot


def xgImpExBegin():
    return ET.Element('Configuration', APIVersion="1702.1", IPS_CAT_VER="1")


def xgAPIPost(requestroot):
    # Posts an XMLAPI document to the firewall specified in command line -f arg
    # If there is no -f arg, it prints the document to stdout
    # Parameter 'requestroot' provides the root element of the XML document

    postdata = {
        'reqxml': ET.tostring(requestroot, 'unicode')
    }
    result = 0

    try:
        callurl = ('https://' + stuff.firewall +
                   ':4444/webconsole/APIController')
        eprint("Sending XMLAPI request to %s" % callurl)

        r = requests.post(callurl, data=postdata, verify=stuff.insecure)
#        eprint(r)
#        eprint(r.text)
        result = 1
    except AttributeError:
        print(ET.tostring(requestroot, 'unicode'))
        result = 0
    return (result, r)

# Fun starts here


# Check command line
stuff = getArguments()

try:
    eprint("Reading from file: %s" % stuff.input)
except AttributeError:
    eprint("No filename provided. Run this command with --help for more info.")
    sys.exit()

# If there's a firewall address set, make sure we're in oneshot mode
try:
    eprint("Config will be posted to " + stuff.firewall)
    stuff.oneshot = True
except AttributeError:
    eprint("No firewall set")

if stuff.add:
    method = 'add'
else:
    method = 'update'

# Start building the XMLAPI Request
if stuff.useimpex:
    fwRequestroot = xgImpExBegin()
    fwRequestSet = fwRequestroot
else:
    fwRequestroot = xgAPILogin(stuff.fwuser, stuff.fwpassword)
    fwRequestSet = ET.SubElement(fwRequestroot, 'Set', operation=method)

usernames = []

# Open the csv for reading
with open(stuff.input, encoding="utf-8-sig") as input_csv:
    csv_parser = csv.DictReader(input_csv, delimiter=',')
    count = 0
    for row in csv_parser:
        #       eprint(row)

        fwRequestSet.append(xgAPIStartUser(row["Username"], row["Name"],
                                           row["Password"],
                                           row["Email Address"],
                                           group=row["Group"]))
        usernames.append(row["Username"])
        count = count + 1

    eprint("Read %d users from file %s\n" % (count, stuff.input))

if stuff.oneshot or stuff.useimpex:
    result, r = xgAPIPost(fwRequestroot)

if result == 1:
    resultcontent = ET.fromstring(r.text)
    for child in resultcontent:
        if child.tag == "Login":
            status = child.find('status').text
            if status == '200':
                eprint("API login successful")
            else:
                eprint("Login status: %s" % child.find('status').text)

        if child.tag == "User":
            Status = child.find('Status')
            eprint("Line %s (%s), Status %s" % (
                child.attrib["transactionid"], usernames[int(
                    child.attrib["transactionid"]) - 1],
                Status.attrib["code"]))
            try:
                if stuff.add:
                    eprint("     %s" %
                           UserAPIAddResults[Status.attrib["code"]][1])
                else:
                    eprint("     %s" %
                           UserAPIUpdateResults[Status.attrib["code"]][1])
            except KeyError:
                eprint("Message: %s" % Status.text)
                continue

        if child.tag == "Status":
            eprint("Error code %s: %s" % (child.attrib['code'], child.text))

# Take the xml output from this program and send it to your firewall with curl,
# for example:
# $ curl -k https://<firewall ip>:4444/webconsole/APIController -F "reqxml=<foo.xml"

# To create an API Import file, use '-x', write the XML output to a file
# 'Entities.xml' and create a tarball with the following (group/owner options 
# to make it anonymous):
# $ tar --group=a:1000 --owner=a:1000 --numeric-owner -cvf ../API-O365.tar Entities.xml

______________________________________________________________________________________________________________________________________



Added TAGs
[edited by: Raphael Alganes at 5:18 AM (GMT -7) on 18 Sep 2024]
  • Does your import file include usernames that already exist? Try adding this to the command-line: "-operation update"

  • Thank you 

    I tired with a single user, that user does not exist in firewall. Still am getting the error. Below is the complete error list. The username mentioned I have replaced with placeholders.

    Posting to FWLANIP:4444/.../APIController
    Returned 200 - OK
    <?xml version="1.0" encoding="UTF-8"?>
    <Response APIVersion="1800.2" IPS_CAT_VER="1">
    <Login>
    <status>Authentication Successful</status>
    </Login>
    <User transactionid="1">
    <Status code="505">Operation failed. Deleting entity referred by another entity.</Status>
    </User>
    <User transactionid="2">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="3">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="4">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="5">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="6">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="7">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="8">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="9">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="10">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="11">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="12">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="13">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="14">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="15">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="16">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="17">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="18">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="19">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="20">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="21">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="22">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="23">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="24">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="25">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="26">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="27">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="28">
    <Status code="501">Configuration parameters validation failed.</Status>
    <InvalidParams>
    <Params>/User/EmailList/EmailID</Params>
    </InvalidParams>
    </User>
    <User transactionid="29">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="30">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="31">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="32">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="33">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="34">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="35">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="36">
    <Status code="501">Configuration parameters validation failed.</Status>
    <InvalidParams>
    <Params>/User/EmailList/EmailID</Params>
    </InvalidParams>
    </User>
    <User transactionid="37">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="38">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="39">
    <Status code="505">Unable to get status message</Status>
    </User>
    <User transactionid="40">
    <Status code="505">Unable to get status message</Status>
    </User>
    </Response>

    1 - User 1 : Operation failed. Deleting entity referred by another entity. (505)
    2 - user 2 : Unable to get status message (505)
    3 - User: Unable to get status message (505)
    4 - User: Unable to get status message (505)
    5 - User4) : Unable to get status message (505)
    6 - UserH15) : Unable to get status message (505)
    7 - UserCH16) : Unable to get status message (505)
    8 - UserH17) : Unable to get status message (505)
    9 - User: Unable to get status message (505)
    10 - Usermar19) : Unable to get status message (505)
    11 - User0) : Unable to get status message (505)
    12 - User63) : Unable to get status message (505)
    13 - User) : Unable to get status message (505)
    14 - User5) : Unable to get status message (505)
    15 - User6) : Unable to get status message (505)
    16 - User) : Unable to get status message (505)
    17 - User: Unable to get status message (505)
    18 - User69) : Unable to get status message (505)
    19 - User70) : Unable to get status message (505)
    20 - User1) : Unable to get status message (505)
    21 - User2) : Unable to get status message (505)
    22 - User73) : Unable to get status message (505)
    23 - UserA74) : Unable to get status message (505)
    24 - User5) : Unable to get status message (505)
    25 - User : Unable to get status message (505)
    26 - User7) : Unable to get status message (505)
    27 - User : Unable to get status message (505)
    28 - User : Configuration parameters validation failed. (501)
    29 - UserP80) : Unable to get status message (505)
    30 - User81) : Unable to get status message (505)
    31 - User82) : Unable to get status message (505)
    32 - UserA83) : Unable to get status message (505)
    33 - User) : Unable to get status message (505)
    34 - UserA85) : Unable to get status message (505)
    35 - UserA86) : Unable to get status message (505)
    36 - User87) : Configuration parameters validation failed. (501)
    37 - User8) : Unable to get status message (505)
    38 - UserAA89) : Unable to get status message (505)
    39 - UserA90) : Unable to get status message (505)
    40 - User91) : Unable to get status message (505)

  • Got this resolved. Issue was with user password complexity. Passwords given in the csv were not meeting the complexity for user settings. We disabled user password complexity and got it worked.
  • I don't get your point why fetching a user via AD needs a password. 

    Thats a limitation from sophos, not a general AD requirement.

    XG could simply fetch the user and create it locally with a strong random password, as the password will get "overwritten" anyways, as soon as the user authenticates agains a backend. the "local" user is then converted to "remote" and the random pw does not work anymore. 

    thats how we used to provision our technicians admin accounts to customer firewalls, which authenticate against tacacs. we pushed a list of placeholder with random password. 

    I get your point that dynamically creation of users work but as an admin, thats a pain in the neck, especially of the mapping of group/vpn does not work reliably. 

  • Not to mention Mac SSL VPN requires third party apps...  Client drops six figures on a bunch of firewalls then has to download open source clients?!?

  • HI I'm Eric, I try to work on this python but It didn't work, I don't know why, can someone help me?

    This is the error about python :

    C:\Users\Eric.chen>python Downloads\UserImport.py -f 172.30.2.180 -i Documents\users.csv -u admin -p P@ssw0rd1463 -a
    Traceback (most recent call last):
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\importlib\_common.py", line 89, in _tempfile
    os.write(fd, reader())
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\importlib\abc.py", line 371, in read_bytes
    with self.open('rb') as strm:
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\importlib\_adapters.py", line 54, in open
    raise ValueError()
    ValueError

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
    File "C:\Users\Eric.chen\Downloads\UserImport.py", line 8, in <module>
    import requests
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\requests\__init__.py", line 133, in <module>
    from . import utils
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\requests\utils.py", line 27, in <module>
    from . import certs
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\requests\certs.py", line 15, in <module>
    from certifi import where
    File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
    File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
    File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\wrapt\importer.py", line 170, in exec_module
    notify_module_loaded(module)
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\wrapt\decorators.py", line 470, in _synchronized
    return wrapped(*args, **kwargs)
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\wrapt\importer.py", line 136, in notify_module_loaded
    hook(module)
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\certifi_win32\wrapt_certifi.py", line 20, in apply_patches
    certifi_win32.wincerts.CERTIFI_PEM = certifi.where()
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\site-packages\certifi\core.py", line 37, in where
    _CACERT_PATH = str(_CACERT_CTX.__enter__())
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 135, in __enter__
    return next(self.gen)
    File "C:\Users\Eric.chen\AppData\Local\Programs\Python\Python310\lib\importlib\_common.py", line 95, in _tempfile
    os.remove(raw_path)
    PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\ERIC~1.CHE\\AppData\\Local\\Temp\\tmpzqzoqfag'

  • I really disappointed when Sophos remove this csv import/export features. You should get the community concern first. I have plenty of user that I need to export the list every quarter for HR housekeeping. Please bring back the features in ver18. 

  • You can still export the Information as XML.

    __________________________________________________________________________________________________________________

  • Yes, thanks. Jus a bit additional work where need to unzip the TAR file and convert it to xls format

  • We have updated the script to fetch the details of the group passed by user in the csv file and then pass the fetched details along with user creation details to make sure, user gets created with correct user group details.

    Please see the updated python script.

    # UserImport
    #
    # Takes basic User definitions from a CSV file and posts
    # them to an XG Firewall using the XML API
    
    import xml.etree.ElementTree as ET
    import argparse
    import requests
    import sys
    import configparser
    import csv
    import xmltodict # This needs to be installed on local machine for XML string to dictionary conversion.
    
    serial = 0
    
    UserAPIAddResults = {
        "200": (True, "User registered successfully"),
        "500": (False, "User couldn't be registered"),
        "502": (False, "A user with the same name already exists"),
        "503": (False, "A user with the same L2TP/PPTP IP already exists"),
        "510": (False, "Invalid password - doesn't meet complexity requirements")
    }
    
    UserAPIUpdateResults = {
        "200": (True, "User registered successfully"),
        "500": (False, "User couldn't be updated"),
        "502": (False, "A user with the same name already exists"),
        "503": (False, "A user with the same L2TP/PPTP IP already exists"),
        "510": (False, "Invalid password - doesn't meet complexity requirements"),
        "541": (False, "There must be at least one Administrator"),
        "542": (False,
                "There must be at least one user with Administrator profile")
    }
    
    
    def eprint(*args, **kwargs):
        # Print to stderr instead of stdout
        print(*args, file=sys.stderr, **kwargs)
    
    
    def getArguments():
        parser = argparse.ArgumentParser(
            description='Grab a list of video IDs from a YouTube playlist')
    
        parser.add_argument(
            '-f', '--firewall',
            help=("To call the API directly, specify a firewall hostname or IP. "
                  "Without this, an XML API document will be output to stdout."),
            default=argparse.SUPPRESS)
        parser.add_argument(
            '-i', '--input', metavar="FILENAME",
            help=("Name of CSV file containing these columns:"
                  " Name, Username, Password, Email Address, Group"),
            default=argparse.SUPPRESS)
        parser.add_argument(
            '-u', '--fwuser', metavar="ADMIN-USERNAME",
            help='Admin username for the XG firewall',
            default='admin')
        parser.add_argument(
            '-p', '--fwpassword', metavar="ADMIN-PASSWORD",
            help='Password for the XG user - defaults to "admin"',
            default='password')
        parser.add_argument(
            '-a', '--add',
            help=('Call API in "Add" mode - use first time only (otherwise'
                  ' Update will be used)'),
            action='store_true')
        parser.add_argument(
            '-n', '--insecure',
            help="Don't validate the Firewall's HTTPS certificate",
            action='store_false')
        parser.add_argument(
            '-1', '--oneshot',
            help=("Create a single enormous XMLAPI transaction instead "
                  " of multiple smaller ones. Only used when outputting "
                  "to stdout "),
            action='store_true')
        parser.add_argument(
            '-x', '--useimpex',
            help=("Create an Entities.xml-style file for inclusion in an "
                  " Import tarball"),
            action='store_true')
    
        return parser.parse_args()
    
    
    def readConfig():
        # Read in the temp config file (if it exists)
        global config
        global workingPath
    
        config = configparser.ConfigParser()
        config.read(workingPath)
    
    
    def getserial():
        # Returns an incrementing serial number. Used to reference individual
        # XMLAPI transactions.
        global serial
        serial = serial + 1
        return str(serial)
    
    def xgAPIStartUser(username, name, password, email,
                       group="Open Group", usertype="User",
                       surfquota="Unlimited Internet Access",
                       accesstime="Allowed all the time",
                       datatransferpolicy="", qospolicy="", sslvpnpolicy="",
                       clientlesspolicy="", status="Active", l2tp="Disable",
                       pptp="Disable", cisco="Disable",
                       quarantinedigest="Disable", macbinding="Disable",
                       loginrestriction="UserGroupNode",
                       accessschedule="All The Time", loginrestrictionapp="",
                       isencryptcert="Disable", simultaneouslogins="Enable"):
        # Returns a complete XML User record with some sane defaults which can
        # be modified later, if you want.
    
        userblock = ET.Element('User', transactionid=getserial())
        ET.SubElement(userblock, 'Username').text = username
        ET.SubElement(userblock, 'Name').text = name
        ET.SubElement(userblock, 'Password').text = password
        ET.SubElement(ET.SubElement(userblock, 'EmailList'),
                      'EmailID').text = email
        ET.SubElement(userblock, 'Group').text = group
        ET.SubElement(userblock, 'SurfingQuotaPolicy').text = surfquota
        ET.SubElement(userblock, 'AccessTimePolicy').text = accesstime
        ET.SubElement(userblock, 'DataTransferPolicy').text = datatransferpolicy
        ET.SubElement(userblock, 'QoSPolicy').text = qospolicy
        ET.SubElement(userblock, 'SSLVPNPolicy').text = sslvpnpolicy
        ET.SubElement(userblock, 'ClientlessPolicy').text = clientlesspolicy
        ET.SubElement(userblock, 'Status').text = status
        ET.SubElement(userblock, 'L2TP').text = l2tp
        ET.SubElement(userblock, 'PPTP').text = pptp
        ET.SubElement(userblock, 'CISCO').text = cisco
        ET.SubElement(userblock, 'QuarantineDigest').text = quarantinedigest
        ET.SubElement(userblock, 'MACBinding').text = macbinding
        ET.SubElement(userblock, 'LoginRestriction').text = loginrestriction
        ET.SubElement(
            userblock, 'ScheduleForApplianceAccess').text = accessschedule
        ET.SubElement(
            userblock, 'LoginRestrictionForAppliance').text = loginrestrictionapp
        ET.SubElement(userblock, 'IsEncryptCert').text = isencryptcert
        ET.SubElement(
            userblock, 'SimultaneousLoginsGlobal').text = simultaneouslogins
    
        return userblock
    
    
    def xgAPILogin(fwuser, fwpassword):
        # Returns a root Login element for an XMLAPI call
    
        requestroot = ET.Element('Request')
        login = ET.SubElement(requestroot, 'Login')
        ET.SubElement(login, 'Username').text = fwuser
    #  ET.SubElement(login, 'Password', passwordform = 'encrypt').text = fwpassword
        ET.SubElement(login, 'Password').text = fwpassword
    
        return requestroot
    
    
    def xgImpExBegin():
        return ET.Element('Configuration', APIVersion="1702.1", IPS_CAT_VER="1")
    
    def xgAPIPost(requestroot):
        # Posts an XMLAPI document to the firewall specified in command line -f arg
        # If there is no -f arg, it prints the document to stdout
        # Parameter 'requestroot' provides the root element of the XML document
    
        postdata = {
            'reqxml': ET.tostring(requestroot, 'unicode')
        }
        result = 0
    
        try:
            callurl = ('https://' + stuff.firewall +
                       ':4444/webconsole/APIController')
            eprint("Sending XMLAPI request to %s" % callurl)
    
            r = requests.post(callurl, data=postdata, verify=stuff.insecure)
    #        eprint(r.text)
            result = 1
        except AttributeError:
            print(ET.tostring(requestroot, 'unicode'))
            result = 0
        return (result, r)
    
    def xgAPIGetRequest(group):
        getrequest = ET.Element('UserGroup')
        groupdetail = ET.SubElement(getrequest, 'GroupDetail')
        filtertag = ET.SubElement(groupdetail, 'Filter')
        ET.SubElement(filtertag, 'key', name="Name", criteria="=").text = group
    
        return getrequest
    
    
    # Fun starts here
    
    
    # Check command line
    stuff = getArguments()
    
    try:
        eprint("Reading from file: %s" % stuff.input)
    except AttributeError:
        eprint("No filename provided. Run this command with --help for more info.")
        sys.exit()
    
    # If there's a firewall address set, make sure we're in oneshot mode
    try:
        eprint("Config will be posted to " + stuff.firewall)
        stuff.oneshot = True
    except AttributeError:
        eprint("No firewall set")
    
    if stuff.add:
        method = 'add'
    else:
        method = 'update'
    
    # Start building the XMLAPI Request
    if stuff.useimpex:
        fwRequestroot = xgImpExBegin()
        fwRequestSet = fwRequestroot
    else:
        fwRequestroot = xgAPILogin(stuff.fwuser, stuff.fwpassword)
        fwRequestSet = ET.SubElement(fwRequestroot, 'Set', operation=method)
    
    usernames = []
    
    # Open the csv for reading
    with open(stuff.input, encoding="utf-8-sig") as input_csv:
        prev_group = ""
        surfquota = accesstime = datatransferpolicy = qospolicy = sslvpnpolicy = clientlesspolicy = ""
        l2tp = pptp = quarantinedigest = macbinding = loginrestriction = ""
        csv_parser = csv.DictReader(input_csv, delimiter=',')
        count = 0
        for row in csv_parser:
            if prev_group != row["Group"]:
                prev_group = row["Group"]
                if stuff.useimpex:
                    fwRequestrootforget = xgImpExBegin()
                    fwRequestGet = fwRequestroot
                else:
                    fwRequestrootforget = xgAPILogin(stuff.fwuser, stuff.fwpassword)
                    fwRequestGet = ET.SubElement(fwRequestrootforget, 'Get')
    
                fwRequestGet.append(xgAPIGetRequest(row["Group"]))
                _, rget = xgAPIPost(fwRequestrootforget)
    
                my_dict = xmltodict.parse(rget.text)
                surfquota = my_dict['Response']['UserGroup']['GroupDetail']['SurfingQuotaPolicy']
                accesstime = my_dict['Response']['UserGroup']['GroupDetail']['AccessTimePolicy']
                datatransferpolicy = my_dict['Response']['UserGroup']['GroupDetail']['DataTransferPolicy']
                qospolicy = my_dict['Response']['UserGroup']['GroupDetail']['QoSPolicy']
                sslvpnpolicy = my_dict['Response']['UserGroup']['GroupDetail']['SSLVPNPolicy']
                clientlesspolicy = my_dict['Response']['UserGroup']['GroupDetail']['ClientlessPolicy']
                l2tp = my_dict['Response']['UserGroup']['GroupDetail']['L2TP']
                pptp = my_dict['Response']['UserGroup']['GroupDetail']['PPTP']
                quarantinedigest = my_dict['Response']['UserGroup']['GroupDetail']['QuarantineDigest']
                macbinding = my_dict['Response']['UserGroup']['GroupDetail']['MACBinding']
                loginrestriction = my_dict['Response']['UserGroup']['GroupDetail']['LoginRestriction']
    
            fwRequestSet.append(xgAPIStartUser(username=row["Username"], name=row["Name"], password=row["Password"],
                                               email=row["Email Address"], group=row["Group"], surfquota=surfquota,
                                               accesstime=accesstime, datatransferpolicy=datatransferpolicy,
                                               qospolicy=qospolicy, sslvpnpolicy=sslvpnpolicy, clientlesspolicy=clientlesspolicy,
                                               l2tp=l2tp, pptp=pptp, quarantinedigest=quarantinedigest, macbinding=macbinding,
                                               loginrestriction=loginrestriction))
            usernames.append(row["Username"])
            count = count + 1
    
        eprint("Read %d users from file %s\n" % (count, stuff.input))
    
    if stuff.oneshot or stuff.useimpex:
        result, r = xgAPIPost(fwRequestroot)
    
    if result == 1:
        resultcontent = ET.fromstring(r.text)
        for child in resultcontent:
            if child.tag == "Login":
                status = child.find('status').text
                if status == '200':
                    eprint("API login successful")
                else:
                    eprint("Login status: %s" % child.find('status').text)
    
            if child.tag == "User":
                Status = child.find('Status')
                eprint("Line %s (%s), Status %s" % (
                    child.attrib["transactionid"], usernames[int(
                        child.attrib["transactionid"]) - 1],
                    Status.attrib["code"]))
                try:
                    if stuff.add:
                        eprint("     %s" %
                               UserAPIAddResults[Status.attrib["code"]][1])
                    else:
                        eprint("     %s" %
                               UserAPIUpdateResults[Status.attrib["code"]][1])
                except KeyError:
                    eprint("Message: %s" % Status.text)
                    continue
    
            if child.tag == "Status":
                eprint("Error code %s: %s" % (child.attrib['code'], child.text))
    
    # Take the xml output from this program and send it to your firewall with curl,
    # for example:
    # $ curl -k https://<firewall ip>:4444/webconsole/APIController -F "reqxml=<foo.xml"
    
    # To create an API Import file, use '-x', write the XML output to a file
    # 'Entities.xml' and create a tarball with the following (group/owner options 
    # to make it anonymous):
    # $ tar --group=a:1000 --owner=a:1000 --numeric-owner -cvf ../API-O365.tar Entities.xml