Getting Started In Live Discover - From Beginner to Advanced Query Creation

Disclaimer: This information is provided as-is for the benefit of the Community. Please contact Sophos Professional Services if you require assistance with your specific environment.

Table of Contents

Background

Sophos Central comes stocked with canned queries for XDR. Our community LiveDiscover&Response Forum has a number of Sophos Staff and User created content. If you want to contribute or create queries specific to your organization, this guide will show you how to build starter to advanced queries, how to filter and relate transactional SQL queries, and where to find the resources for your discovery. 


Prerequisites 

  • Basic SQL Knowledge 
  • Know Your Table Data (referred to as "Schema")
    • Sophos schema viewer
    • osquery Schema
      • Click the 'Endpoint/osquery" tab on the Sophos schema viewer landing page
      • This will show the latest osquery version the engine has
        • osquery maintains their own engine repository and is often newer than what XDR is shipping with
  • Available Functions

Guide

Intro

When you build your first few queries, you want to start broadly with information and filter it down to solve problems or relate to other databases on a link data point. Here is an example from start to finish on how we can use XDR to search our entire fleet of devices to return those that interacted with a specific IP address, the cmd used, time, and threat indication levels. 

Basics

SELECT * FROM sophos_ip_journal

The sophos_ip_journal will return fields with information. These rows will contain types of data that can be used by functions. Out of this table, I am only concerned with the following:

time, pid, sophosPID, processStartTime, source, sourcePort, destination, & destinationPort

SELECT time, pid, sophosPID, processStartTime, source, sourcePort, destination, destinationPort
FROM sophos_ip_journal

This returns the fields of the table you requested. Now, I need to understand the times in those human readable format. Let's use the datetime function.

SELECT datetime(time, 'unixepoch', 'localtime'), 
pid, sophosPID, 
datetime(processStartTime, 'unixepoch', 'localtime'), 
source, sourcePort, destination, destinationPort
FROM sophos_ip_journal

You can now see that the time and processStartTime fields will be human readable. If we wanted to make the results look nicer, we could as AS after our datetime function.

This lets me see a full report of the last 90 days of IP addresses my endpoint has interacted with and the ports they are connecting on.

If I have this information, I can use it to trace more relationships of events. Let's explore the sophos_process_journal next. Before we combine two tables and have too much data to parse, review the fields in this new table and what you will want to return.

I am going to match the relating fields: 

sophosPID (anchor), parentPID, fileSize, pathname, processName, cmdLine, sha256, & sha1

When introducing a new table into your query, you will need to selectively tell the tool what fields are being selected from each table. 

SELECT datetime(sophos_ip_journal.time, 'unixepoch', 'localtime') AS ip_timestamp, 
sophos_ip_journal.pid,
sophos_ip_journal.sophosPID, 
datetime(sophos_ip_journal.processStartTime, 'unixepoch', 'localtime') AS processStart_timestamp, 
sophos_ip_journal.source, 
sophos_ip_journal.sourcePort, 
sophos_ip_journal.destination, 
sophos_ip_journal.destinationPort,
sophos_process_journal.sophosPID,
sophos_process_journal.parentPID,
sophos_process_journal.fileSize,
sophos_process_journal.pathName,
sophos_process_journal.processName,
sophos_process_journal.cmdLine,
sophos_process_journal.sha256,
sophos_process_journal.sha1
FROM sophos_ip_journal
JOIN sophos_process_journal
WHERE sophos_ip_journal.sophosPID = sophos_process_journal.sophosPID

Now I can correlate IP addresses with the file size, pathname, process name, command used, and see their hash values. 

Let's use this data to do some of the activity  discussed in his EDR Threat Hunting Framework.

Now incorporate the sophos_file_properties table and use the sha256 field as an anchor to link the new table. I will switch to a join statement to combine multiple tables. 

SELECT datetime(sophos_ip_journal.time, 'unixepoch', 'localtime'), 
sophos_ip_journal.pid,
sophos_ip_journal.sophosPID, 
datetime(sophos_ip_journal.processStartTime, 'unixepoch', 'localtime'), 
sophos_ip_journal.source, 
sophos_ip_journal.sourcePort, 
sophos_ip_journal.destination, 
sophos_ip_journal.destinationPort,
sophos_process_journal.sophosPID,
sophos_process_journal.parentPID,
sophos_process_journal.fileSize,
sophos_process_journal.pathName,
sophos_process_journal.processName,
sophos_process_journal.cmdLine,
sophos_process_journal.sha256,
sophos_process_journal.sha1,
sophos_file_properties.sha256,
sophos_file_properties.mlScoreData,
sophos_file_properties.mlScore,
sophos_file_properties.puaScore,
sophos_file_properties.globalRepData,
sophos_file_properties.globalRep,
sophos_file_properties.localRepData,
sophos_file_properties.localRep,
sophos_file_properties.coreFileInfo
FROM sophos_ip_journal
JOIN sophos_process_journal
ON sophos_ip_journal.sophosPID = sophos_process_journal.sophosPID
JOIN sophos_file_properties 
ON sophos_process_journal.sha256 = sophos_file_properties.sha256

I'm going to also add a clause for where something is part of a condition I don't want to see. I'll modify the fields returned and sort my view.

SELECT datetime(sophos_ip_journal.processStartTime, 'unixepoch', 'localtime') AS process_timestamp,
sophos_ip_journal.sophosPID, 
sophos_process_journal.pathName,
sophos_process_journal.processName,
sophos_process_journal.cmdLine,
datetime(sophos_ip_journal.time, 'unixepoch', 'localtime') AS ip_timestamp,
sophos_ip_journal.source, 
sophos_ip_journal.sourcePort, 
sophos_ip_journal.destination, 
sophos_ip_journal.destinationPort,
sophos_file_properties.sha256,
sophos_file_properties.mlScore,
sophos_file_properties.puaScore,
sophos_file_properties.globalRep,
sophos_file_properties.localRep,
sophos_file_properties.localRepData
FROM sophos_ip_journal
JOIN sophos_process_journal
ON sophos_ip_journal.sophosPID = sophos_process_journal.sophosPID
JOIN sophos_file_properties 
ON sophos_process_journal.sha256 = sophos_file_properties.sha256
WHERE source <> '127.0.0.1' OR '::1'


Advanced

The three areas I want to first focus in on will help product case statements. Often, your team can respond faster to data that is already "interpreted." To accomplish this, we will use case statements.

 

SELECT datetime(sophos_ip_journal.processStartTime, 'unixepoch', 'localtime') AS process_timestamp,
sophos_process_journal.PID AS ProcessID, 
sophos_process_journal.pathName,
sophos_process_journal.processName,
sophos_process_journal.cmdLine AS CLI_Log,
datetime(sophos_ip_journal.time, 'unixepoch', 'localtime') AS ip_timestamp,
sophos_ip_journal.source, 
sophos_ip_journal.destination, 
sophos_ip_journal.destinationPort,
CASE
	WHEN (sophos_file_properties.mlScore >= 27 AND sophos_file_properties.mlScore < 30) THEN 'High'
	WHEN (sophos_file_properties.mlScore >= 24 AND sophos_file_properties.mlScore < 27) THEN 'Medium'
    WHEN (sophos_file_properties.mlScore >= 20 AND sophos_file_properties.mlScore < 24) THEN 'Low'
	WHEN (sophos_file_properties.mlScore > 30 AND sophos_file_properties.puaScore > 20) THEN 'High (likely PUA)'
	ELSE 'N/A'
END AS threatIndicationLevel,
CASE
    WHEN (sophos_file_properties.globalRep >= 65 OR sophos_file_properties.localRep >= 65) THEN 'Safe'
    WHEN (sophos_file_properties.globalRep >= 30 AND sophos_file_properties.globalRep <= 64 OR sophos_file_properties.localRep >= 30 AND sophos_file_properties.localRep <= 64) THEN 'Potential'
    ELSE 'Bad'
END AS reputationLevel,
FROM sophos_ip_journal
JOIN sophos_process_journal
ON sophos_ip_journal.sophosPID = sophos_process_journal.sophosPID
JOIN sophos_file_properties 
ON sophos_process_journal.sha256 = sophos_file_properties.sha256
WHERE source <> '127.0.0.1' OR '::1'
AND destination = '$$IPaddress$$'

Let's deep dive how I came to these case statements:

As you start to interpret data, it's important to really interpret the data. In our sophos_files_properties fields, we see a breakdown of how the json structures are written for each scoring level. To see where I'm explaining, look at the "globalRepData" field. 

The global reputation as a json structure containing:

version - int - the version of the structure
reputation - int - -1 is unknown, 0-100 is the local reputation
lookupType - enum - Lookup type used for the Local Reputation score unknown(0), sha256(1), sha1(2)
expireTime - int64 - Time ML scores expire (zero is never)
sampleRate - int - Telemetry Sample Rate. 0 means no samples, all values indicate sample 1 every N
occurrences (average randomized occurrence) reputationData - JSON string - the intermediate results of a
Local Reputation Analysis

As we interpret this information, if the lookupType returns a (0) as unknown, we will have to then rely on the "localRepData" field to get a reputation score. I will now write this into my case statements.

If you recall in an early version of EDR, we populated fields that showed applications that may have been threats. The snippet to see the scoring of the case statement is here:

Threat Indication Level

CASE
	WHEN (sophos_file_properties.mlScore >= 27 AND sophos_file_properties.mlScore < 30) THEN 'High'
	WHEN (sophos_file_properties.mlScore >= 24 AND sophos_file_properties.mlScore < 27) THEN 'Medium'
    WHEN (sophos_file_properties.mlScore >= 20 AND sophos_file_properties.mlScore < 24) THEN 'Low'
	WHEN (sophos_file_properties.mlScore > 30 AND sophos_file_properties.puaScore > 20) THEN 'High (likely PUA)'
	ELSE 'N/A'
END AS threatIndicationLevel

I made a case statement for rep level to monitor for but you could define your own.

Global Reputation Level

CASE
    WHEN (sophos_file_properties.globalRep >= 65 OR sophos_file_properties.localRep >= 65) THEN 'Safe'
    WHEN (sophos_file_properties.globalRep >= 30 AND sophos_file_properties.globalRep <= 64 OR sophos_file_properties.localRep >= 30 AND sophos_file_properties.localRep <= 64) THEN 'Potential'
    ELSE 'Bad'
END AS reputationLevel

Interpreting the Results

You will be able to quickly swift through these logs moving forward because you've defined a process and terms to scoring levels. Now this can be a scheduled query for your XDR collection. Your Helpdesk team could review this and alert the security team when something suspicious is found. 

Instead of leaving the numbers as raw data points, write the results as human readable names. Using the examples above, where globalRep and globalRepData are related, globalRep is the raw data scoring and globalRepData is the description to interpret the score. We can use CASE to return information to drive our threat hunts or any specific query.

Hope this helps someone get started and please remember to share in our Live Discover & Response Forum when you write something new or useful!

-jk



Added Disclaimer
[edited by: GlennSen at 3:11 PM (GMT -7) on 5 Apr 2023]