Exploring what you can do with the new XDR Detections table (xdr_ti_data)

With the release of XDR Detections we now have a Sophos Central view of the detections that have been classified. The data lake includes a new table called xdr_ti_data and we can use that table to perform PIVOTS.  One thing that every threat hunter wants to know when they see a detection is HOW did this happen and is there MORE to the detection.  To help with that I have added a query below that allows you to PIVOT from the SophosPID available in most detections to generate a process tree that will include the detection information for each member of the tree.

So going from this view where we have a suspect powershell downloader we can pivot to the tree for other activity performed by the parent process 'splunkd.exe'

PIVOT to the query we added - Process Tree Data Lake (Windows)

and see that the splunkd.exe process was the parent of lots of other powershells that also have detections associated with them.  We found our RAT(Remote Access Tool)

HERE is the query, I will have dev make sure it is efficient as I am sure what I have here can be improved.

-- This will take a few steps.  First lets narrow down the time range

----------------------
-- DETERMINE THE LOWER AND UPPER TIME LIMITS FOR THE SOPHOS_PID 
----------------------
WITH 
   -- Get the system boot time for the target process This will be the lower bound below which we do not search
   System_boot(boot_time) AS (
      SELECT MAX(meta_boot_time) 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_windows_sophos') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time <= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
         AND sophos_pid = '$$SophosPID$$'  -- The desired PID is in the list
      ),
   -- Get the next boot time or 'NOW' as a max upper bound to search  
   Next_boot_time (max_time) AS (
      SELECT CASE WHEN MIN(meta_boot_time) > 0 THEN MIN(meta_boot_time) ELSE CAST(to_unixtime(now()) AS BIGINT) END 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_windows_sophos') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time >= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
   ),
   -- Determine if the PID was recycled during the boot session if so it will be the upper bound
   PID_Recycled(pid_time) AS (
      SELECT MIN(time) pid_time
      FROM xdr_data LEFT JOIN Next_boot_time ON CAST(1 AS BOOLEAN)
      WHERE query_name IN ( 'running_processes_windows_sophos') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND sophos_pid = '$$SophosPID$$'  -- The desired PID is in the list
         AND time > CAST(SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT)
         AND time <= max_time
      ),
   -- Set the lower and upper bound as a table for easy access
   Time_Limits(lower_bound, upper_bound) AS (
      SELECT boot_time, CASE WHEN pid_time > 0 THEN pid_time ELSE Next_boot_time.max_time END
      FROM Next_boot_time LEFT JOIN System_Boot ON CAST(1 AS BOOLEAN) LEFT JOIN PID_Recycled ON pid_time < Next_boot_time.max_time
      ),
   -- NOW BUILD A SINGLE TABLE WITH the Device, Time, Name, CmdLine, SophosPID, ParentSophosPID for each process (We have to flatten the groupings)

   All_Data AS (
   SELECT 
      X1.meta_hostname, 
      X1.time, 
      X1.name, 
      rtrim(array_join(array_agg(distinct X1.cmdline||','), chr(10)),',') as cmdline,
      X1.Sophos_pid SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      X1.Parent_Sophos_PID Parent_SophosPID,
      X1.path,
      X1.sha256
   FROM xdr_data X1, Time_Limits TL_1
   WHERE X1.query_name IN ( 'running_processes_windows_sophos') 
      AND LOWER(X1.meta_hostname) LIKE LOWER('%%$$device_name$$%')
      AND X1.time >= TL_1.lower_bound
      AND X1.time <= TL_1.upper_bound
   GROUP BY X1.meta_hostname, X1.time, X1.name, X1.time, X1.path, X1.sha256, X1.pids, x1.sophos_pid, X1.parent_sophos_pid
   ),

-------
-- NOW TO get each generation of the TREE
-------
   Target_Process AS (SELECT meta_hostname, time, name, rtrim(array_join(array_agg(distinct cmdline||','), chr(10)),',') as cmdline, SophosPID, Parent_SophosPID, path, sha256, 3 Sort_Order 
      FROM All_Data WHERE SophosPID = '$$SophosPID$$' GROUP BY meta_hostname, time, name, SophosPID, Parent_SophosPID, path, sha256 ),
   Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 2 Sort_Order 
      FROM All_Data A JOIN Target_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
   Grand_Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ ◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 1 Sort_Order 
      FROM All_Data A JOIN Parent_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
   Great_Grand_Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ ◄ ◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 0 Sort_Order 
      FROM All_Data A JOIN Grand_Parent_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
   Generation_One AS (SELECT A.meta_hostname, A.time, '► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order + 1000000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
      FROM All_Data A JOIN Target_Process T ON A.Parent_SophosPID = T.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order ),
   Generation_Two AS (SELECT A.meta_hostname, A.time, '► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 10000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
      FROM All_Data A JOIN Generation_One G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),
--   Generation_Three AS (SELECT A.meta_hostname, A.time, '► ► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 100 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
--      FROM All_Data A JOIN Generation_Two G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),

-- Assemble the Tree from each generation
   Tree AS (
      SELECT * FROM Great_Grand_Parent_Process
      UNION ALL
      SELECT * FROM Grand_Parent_Process
      UNION ALL
      SELECT * FROM Parent_Process
      UNION ALL
      SELECT * FROM Target_Process
      UNION ALL
      SELECT * FROM Generation_One
      UNION ALL
      SELECT * FROM Generation_Two
 --     UNION ALL
--      SELECT * FROM Generation_Three
   )
SELECT 
   T.meta_hostname, 
   date_format(from_unixtime(T.time), '%y-%m-%dt%h:%i:%sz') as date_time, 
   T.name, 
-- Identify any detections for each member of the tree
   array_join(array_agg(CASE JSON_ARRAY_LENGTH(ioc_detection_mitre_attack) 
      WHEN 1 THEN
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR)
      WHEN 2 THEN 
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR)
      WHEN 3 THEN 
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR)
      WHEN 4 THEN 
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].name') AS VARCHAR)
      ELSE 
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,0),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,1),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,2),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,3),'$.tactic.techniques[0].name') AS VARCHAR) || CHR(10) ||

         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.techniques[0].id') AS VARCHAR) || ' - ' ||
         CAST(JSON_EXTRACT(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.name') AS VARCHAR) || ' - ' ||
         CAST( JSON_EXTRACT_SCALAR(JSON_ARRAY_GET(ioc_detection_mitre_attack,4),'$.tactic.techniques[0].name') AS VARCHAR)
      END ),CHR(10))
   MITRE,
   ARRAY_JOIN(ARRAY_AGG(ioc_detection_id),CHR(10)) Sources,
   T.cmdline, 
   T.SophosPID, 
   T.Parent_SophosPID, 
   T.path, 
   T.sha256 
FROM Tree T LEFT JOIN xdr_ti_data XDR_TI ON T.SophosPID = XDR_TI.sophos_pid
GROUP BY T.meta_hostname, T.time, T.name, T.cmdline, T.SophosPID, T.Parent_SophosPID, T.path, T.sha256 ,T.Sort_Order
ORDER BY CAST(T.Sort_Order AS INT) ASC



edits
[edited by: Karl_Ackerman at 6:12 PM (GMT -8) on 19 Nov 2021]