Disclaimer: This information is provided as-is for the benefit of the Community. Please contact Sophos Professional Services if you require assistance with your specific environment.
With the release of XDR Detections we now have a Sophos Central view of the detections that have been classified. The data lake includes a new table called xdr_ti_data and we can use that table to perform PIVOTS. One thing that every threat hunter wants to know when they see a detection is HOW did this happen and is there MORE to the detection. To help with that I have added a query below that allows you to PIVOT from the SophosPID available in most detections to generate a process tree that will include the detection information for each member of the tree.
So going from this view where we have a suspect powershell downloader we can pivot to the tree for other activity performed by the parent process 'splunkd.exe'
PIVOT to the query we added - Process Tree Data Lake (Windows)
and see that the splunkd.exe process was the parent of lots of other powershells that also have detections associated with them. We found our RAT(Remote Access Tool)
HERE is the query, I will have dev make sure it is efficient as I am sure what I have here can be improved.
-- This will take a few steps. First lets narrow down the time range
----------------------
-- DETERMINE THE LOWER AND UPPER TIME LIMITS FOR THE SOPHOS_PID
----------------------
WITH
-- Get the system boot time for the target process This will be the lower bound below which we do not search
System_boot(boot_time) AS (
SELECT MAX(meta_boot_time)
FROM xdr_data
WHERE query_name IN ( 'running_processes_windows_sophos')
AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
AND meta_boot_time <= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start
AND sophos_pid = '$$SophosPID$$' -- The desired PID is in the list
),
-- Get the next boot time or 'NOW' as a max upper bound to search
Next_boot_time (max_time) AS (
SELECT CASE WHEN MIN(meta_boot_time) > 0 THEN MIN(meta_boot_time) ELSE CAST(to_unixtime(now()) AS BIGINT) END
FROM xdr_data
WHERE query_name IN ( 'running_processes_windows_sophos')
AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
AND meta_boot_time >= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start
),
-- Determine if the PID was recycled during the boot session if so it will be the upper bound
PID_Recycled(pid_time) AS (
SELECT MIN(time) pid_time
FROM xdr_data LEFT JOIN Next_boot_time ON CAST(1 AS BOOLEAN)
WHERE query_name IN ( 'running_processes_windows_sophos')
AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
AND sophos_pid = '$$SophosPID$$' -- The desired PID is in the list
AND time > CAST(SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT)
AND time <= max_time
),
-- Set the lower and upper bound as a table for easy access
Time_Limits(lower_bound, upper_bound) AS (
SELECT boot_time, CASE WHEN pid_time > 0 THEN pid_time ELSE Next_boot_time.max_time END
FROM Next_boot_time LEFT JOIN System_Boot ON CAST(1 AS BOOLEAN) LEFT JOIN PID_Recycled ON pid_time < Next_boot_time.max_time
),
-- NOW BUILD A SINGLE TABLE WITH the Device, Time, Name, CmdLine, SophosPID, ParentSophosPID for each process (We have to flatten the groupings)
All_Data AS (
SELECT
X1.meta_hostname,
X1.time,
X1.name,
rtrim(array_join(array_agg(distinct X1.cmdline||','), chr(10)),',') as cmdline,
X1.Sophos_pid SophosPID,
-- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
X1.Parent_Sophos_PID Parent_SophosPID,
X1.path,
X1.sha256
FROM xdr_data X1, Time_Limits TL_1
WHERE X1.query_name IN ( 'running_processes_windows_sophos')
AND LOWER(X1.meta_hostname) LIKE LOWER('%%$$device_name$$%')
AND X1.time >= TL_1.lower_bound
AND X1.time <= TL_1.upper_bound
GROUP BY X1.meta_hostname, X1.time, X1.name, X1.time, X1.path, X1.sha256, X1.pids, x1.sophos_pid, X1.parent_sophos_pid
),
-------
-- NOW TO get each generation of the TREE
-------
Target_Process AS (SELECT meta_hostname, time, name, rtrim(array_join(array_agg(distinct cmdline||','), chr(10)),',') as cmdline, SophosPID, Parent_SophosPID, path, sha256, 3 Sort_Order
FROM All_Data WHERE SophosPID = '$$SophosPID$$' GROUP BY meta_hostname, time, name, SophosPID, Parent_SophosPID, path, sha256 ),
Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 2 Sort_Order
FROM All_Data A JOIN Target_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
Grand_Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ ◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 1 Sort_Order
FROM All_Data A JOIN Parent_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
Great_Grand_Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ ◄ ◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 0 Sort_Order
FROM All_Data A JOIN Grand_Parent_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
Generation_One AS (SELECT A.meta_hostname, A.time, '► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order + 1000000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
FROM All_Data A JOIN Target_Process T ON A.Parent_SophosPID = T.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order ),
Generation_Two AS (SELECT A.meta_hostname, A.time, '► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 10000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
FROM All_Data A JOIN Generation_One G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),
-- Generation_Three AS (SELECT A.meta_hostname, A.time, '► ► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 100 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
-- FROM All_Data A JOIN Generation_Two G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),
-- Assemble the Tree from each generation
Tree AS (
SELECT * FROM Great_Grand_Parent_Process
UNION ALL
SELECT * FROM Grand_Parent_Process
UNION ALL
SELECT * FROM Parent_Process
UNION ALL
SELECT * FROM Target_Process
UNION ALL
SELECT * FROM Generation_One
UNION ALL
SELECT * FROM Generation_Two
-- UNION ALL
-- SELECT * FROM Generation_Three
)
SELECT
T.meta_hostname,
date_format(from_unixtime(T.time), '%y-%m-%dt%h:%i:%sz') as date_time,
T.name,
-- Identify any detections for each member of the tree
array_join(array_agg(CASE JSON_ARRAY_LENGTH(ioc_detection_mitre_attack)
WHEN 1 THEN
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR)
WHEN 2 THEN
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR)
WHEN 3 THEN
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR)
WHEN 4 THEN
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].name') AS VARCHAR)
ELSE
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.name') AS VARCHAR) || ' - ' ||
CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.techniques[0].name') AS VARCHAR)
END ),CHR(10))
MITRE,
ARRAY_JOIN(ARRAY_AGG(ioc_detection_id),CHR(10)) Sources,
T.cmdline,
T.SophosPID,
T.Parent_SophosPID,
T.path,
T.sha256
FROM Tree T LEFT JOIN xdr_ti_data XDR_TI ON T.SophosPID = XDR_TI.sophos_pid
GROUP BY T.meta_hostname, T.time, T.name, T.cmdline, T.SophosPID, T.Parent_SophosPID, T.path, T.sha256 ,T.Sort_Order
ORDER BY CAST(T.Sort_Order AS INT) ASC
Updated disclaimer
[edited by: Qoosh at 9:59 PM (GMT -7) on 31 Mar 2023]