Approved

NOTE: Please follow the steps in the walkthrough as it requires multiple queries.

LINUX Process Tree for Data Lake (SHORT)

-- FIXED PID RECYCLE PROBLEM

With the Data lake and LINUX we have some challenges creating a Sophos PID.  The issue is around time from the Linux Process Events Journal in OSQuery.  It does not have accurate enough process start time information so we are doing the best we can to avoid collisions when a PID is recycled during a single boot session.

This query takes a device name and Sophos PID as variables. Ensure you set the sophosPID to the variable type SophosPID so that pivots will work.

The Code :) (This was more complex than I expected see unnest function to flatten the list of pids and parents in the data lake)

-- This will take a few steps.  First lets narrow down the time range

----------------------
-- DETERMINE THE LOWER AND UPPER TIME LIMITS FOR THE SOPHOS_PID 
----------------------
WITH 
   -- Get the system boot time for the target process This will be the lower bound below which we do not search
   System_boot(boot_time) AS (
      SELECT MAX(meta_boot_time) 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time <= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
         AND CONTAINS(SPLIT(pids,','), SPLIT_PART('$$SophosPID$$',':',1))  -- The desired PID is in the list
      ),
   -- Get the next boot time or 'NOW' as a max upper bound to search  
   Next_boot_time (max_time) AS (
      SELECT CASE WHEN MIN(meta_boot_time) > 0 THEN MIN(meta_boot_time) ELSE CAST(to_unixtime(now()) AS BIGINT) END 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time >= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
   ),
   -- Determine if the PID was recycled during the boot session if so it will be the upper bound
   PID_Recycled(pid_time) AS (
      SELECT MIN(time) pid_time
      FROM xdr_data LEFT JOIN Next_boot_time ON CAST(1 AS BOOLEAN)
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND CONTAINS(SPLIT(pids,','), SPLIT_PART('$$SophosPID$$',':',1))  -- The desired PID is in the list
         AND time > CAST(SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT)
         AND time <= max_time
      ),
   -- Set the lower and upper bound as a table for easy access
   Time_Limits(lower_bound, upper_bound) AS (
      SELECT boot_time, CASE WHEN pid_time > 0 THEN pid_time ELSE Next_boot_time.max_time END
      FROM Next_boot_time LEFT JOIN System_Boot ON CAST(1 AS BOOLEAN) LEFT JOIN PID_Recycled ON pid_time < Next_boot_time.max_time
      ),
   -- NOW BUILD A SINGLE TABLE WITH the Device, Time, Name, CmdLine, SophosPID, ParentSophosPID for each process (We have to flatten the groupings)
   All_Data AS (
   SELECT 
      X1.meta_hostname, 
      X1.time, 
      X1.name, 
      rtrim(array_join(array_agg(distinct X1.cmdline||','), chr(10)),',') as cmdline,
      S1.Single_PID||':'||CAST(X1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      T1.Single_Parent||':'||CAST(
         (SELECT MAX(X2.time) 
          FROM xdr_data X2 CROSS JOIN UNNEST(SPLIT(X2.pids,',')) AS S2 (Single_PID) CROSS JOIN UNNEST(SPLIT(X2.parents,',')) AS T2 (Single_Parent), Time_Limits TL_2
          WHERE X2.query_name IN ( 'running_processes_linux_events') 
             AND LOWER(X2.meta_hostname) LIKE LOWER('%%$$device_name$$%')
             AND S2.Single_PID = T1.Single_Parent
             AND X2.time >= TL_2.lower_bound
             AND X2.time <= X1.time
         ) AS VARCHAR) Parent_SophosPID,
      X1.path,
      X1.sha256
   FROM xdr_data X1 CROSS JOIN UNNEST(SPLIT(X1.pids,',')) AS S1 (Single_PID) CROSS JOIN UNNEST(SPLIT(X1.parents,',')) AS T1 (Single_Parent), Time_Limits TL_1
   WHERE X1.query_name IN ( 'running_processes_linux_events') 
      AND LOWER(X1.meta_hostname) LIKE LOWER('%%$$device_name$$%')
      AND X1.time >= TL_1.lower_bound
      AND X1.time <= TL_1.upper_bound
   GROUP BY X1.meta_hostname, X1.time, X1.name, X1.time, X1.path, X1.sha256, X1.pids, S1.Single_PID, X1.parents, T1.Single_Parent
   ),
-------
-- NOW TO get each generation of the TREE
-------
   Target_Process AS (SELECT meta_hostname, time, name, rtrim(array_join(array_agg(distinct cmdline||','), chr(10)),',') as cmdline, SophosPID, Parent_SophosPID, path, sha256, 2 Sort_Order 
      FROM All_Data WHERE SophosPID = '$$SophosPID$$' GROUP BY meta_hostname, time, name, SophosPID, Parent_SophosPID, path, sha256 ),
   Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 1 Sort_Order 
      FROM All_Data A JOIN Target_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
--   Grand_Parent_Process AS (SELECT A.meta_hostname, A.time, '◄ ◄ '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, 0 Sort_Order 
--      FROM All_Data A JOIN Parent_Process T ON A.SophosPID = T.Parent_SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256 ),
   Generation_One AS (SELECT A.meta_hostname, A.time, '► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order + 1000000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
      FROM All_Data A JOIN Target_Process T ON A.Parent_SophosPID = T.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, T.Sort_Order ),
   Generation_Two AS (SELECT A.meta_hostname, A.time, '► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 10000 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
      FROM All_Data A JOIN Generation_One G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),
--   Generation_Three AS (SELECT A.meta_hostname, A.time, '► ► ► '||A.name AS name, rtrim(array_join(array_agg(distinct A.cmdline||','), chr(10)),',') as cmdline, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order + 100 * ROW_NUMBER() OVER( ORDER BY A.time, A.SophosPID ASC) Sort_Order
--      FROM All_Data A JOIN Generation_Two G ON A.Parent_SophosPID = G.SophosPID GROUP BY A.meta_hostname, A.time, A.name, A.SophosPID, A.Parent_SophosPID, A.path, A.sha256, G.Sort_Order ),

-- Assemble the Tree from each generation
   Tree AS (
--      SELECT * FROM Grand_Parent_Process
--      UNION ALL
      SELECT * FROM Parent_Process
      UNION ALL
      SELECT * FROM Target_Process
      UNION ALL
      SELECT * FROM Generation_One
      UNION ALL
      SELECT * FROM Generation_Two
--      UNION ALL
--      SELECT * FROM Generation_Three
   )
SELECT meta_hostname, date_format(from_unixtime(time), '%y-%m-%dt%h:%i:%sz') as date_time, name, cmdline, SophosPID, Parent_SophosPID, path, sha256 FROM Tree ORDER BY CAST(Sort_Order AS INT) ASC



-- DEBUG
-- SELECT * FROM System_boot
-- SELECT * FROM Next_boot_time
-- SELECT * FROM PID_Recycled
-- SELECT * FROM Time_Limits
-- SELECT * FROM ALL_Data WHERE Parent_SophosPID = '$$SophosPID$$'

And the same from Live Discover

-- This will take a few steps.  First lets narrow down the time range

----------------------
-- DETERMINE THE LOWER AND UPPER TIME LIMITS FOR THE SOPHOS_PID 
----------------------
WITH 
   -- Get the system boot time for the target process This will be the lower bound below which we do not search
   System_boot(boot_time) AS (
      SELECT MAX(time-uptime)  
      FROM process_events 
      WHERE time-uptime <= SPLIT('$$SophosPID$$',':',1) -- System boot must have been before process start 
         AND pid = SPLIT('$$SophosPID$$',':',0)  -- The desired PID is in the list
      ),
   -- Get the next boot time or 'NOW' as a max upper bound to search  
   Next_boot_time (max_time) AS (
      SELECT CASE WHEN MIN(strftime('%s','now')-uptime) > 0 THEN MIN(strftime('%s','now')-uptime) ELSE strftime('%s','now') END 
      FROM process_events 
      WHERE strftime('%s','now')-uptime >= SPLIT('$$SophosPID$$',':',1) -- System boot must have been before process start 
   ),
   -- Determine if the PID was recycled during the boot session if so it will be the upper bound
   PID_Recycled(pid_time) AS (
      SELECT MIN(time) pid_time
      FROM process_events LEFT JOIN Next_boot_time ON 1
      WHERE pid = SPLIT('$$SophosPID$$',':',0)  -- The desired PID is in the list
         AND time > SPLIT('$$SophosPID$$',':',1)
         AND time <= max_time
      ),
   -- Set the lower and upper bound as a table for easy access
   Time_Limits(lower_bound, upper_bound) AS (
      SELECT boot_time, CASE WHEN pid_time > 0 THEN pid_time ELSE max_time END
      FROM Next_boot_time LEFT JOIN System_Boot ON 1 LEFT JOIN PID_Recycled ON pid_time < max_time
      ),

Target_Process AS (SELECT P1.time, REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, 2 Sort_Order
   FROM process_events P1, Time_Limits TL_1
   WHERE pid = SPLIT('$$SophosPID$$',':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY time, path, pid, parent
   ),
Parent_Process AS (SELECT P1.time, '◄ '||REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, 1 Sort_Order
   FROM process_events P1, Target_Process T1, Time_Limits TL_1
   WHERE P1.pid = SPLIT(T1.Parent_SophosPID,':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY P1.time, P1.path, P1.pid, P1.parent
   ),
Grand_Parent_Process AS (SELECT P1.time, '◄ ◄ '||REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, 0 Sort_Order
   FROM process_events P1, Parent_Process T1, Time_Limits TL_1
   WHERE P1.pid = SPLIT(T1.Parent_SophosPID,':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY P1.time, P1.path, P1.pid, P1.parent
   ),
Generation_One AS (SELECT P1.time, '► '||REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, T1.Sort_Order + 1000000 * ROW_NUMBER() OVER( ORDER BY T1.time, T1.SophosPID ASC) Sort_Order
   FROM process_events P1, Target_Process T1, Time_Limits TL_1
   WHERE P1.parent = SPLIT(T1.SophosPID,':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY P1.time, P1.path, P1.pid, P1.parent
   ),
Generation_Two AS (SELECT P1.time, '► ► '||REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, T1.Sort_Order + 10000 * ROW_NUMBER() OVER( ORDER BY T1.time, T1.SophosPID ASC) Sort_Order
   FROM process_events P1, Generation_One T1, Time_Limits TL_1
   WHERE P1.parent = SPLIT(T1.SophosPID,':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY P1.time, P1.path, P1.pid, P1.parent
   ),
Generation_Three AS (SELECT P1.time, '► ► ► '||REPLACE(P1.path, RTRIM(P1.path, REPLACE(P1.path, '/', '')), '') name, CAST(GROUP_CONCAT(P1.cmdline,CHAR(10)) AS TEXT) Cmd_Line, P1.pid||':'||CAST(P1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      P1.parent||':'||CAST((SELECT CASE WHEN MAX(P2.time) > 0 THEN MAX(p2.time) ELSE '' END FROM process_events P2, Time_Limits TL_2 where P2.pid = P1.Parent AND P2.time > TL_2.lower_bound AND P2.time <= P1.Time) AS VARCHAR)Parent_SophosPID,
      P1.path, T1.Sort_Order + 100 * ROW_NUMBER() OVER( ORDER BY T1.time, T1.SophosPID ASC) Sort_Order
   FROM process_events P1, Generation_Two T1, Time_Limits TL_1
   WHERE P1.parent = SPLIT(T1.SophosPID,':',0) AND P1.time >= TL_1.lower_bound AND P1.time <= TL_1.upper_bound
   GROUP BY P1.time, P1.path, P1.pid, P1.parent
   ),

   Tree AS (
      SELECT * FROM Grand_Parent_Process
      UNION ALL
      SELECT * FROM Parent_Process
      UNION ALL
      SELECT * FROM Target_Process
      UNION ALL
      SELECT * FROM Generation_One
      UNION ALL
      SELECT * FROM Generation_Two
      UNION ALL
      SELECT * FROM Generation_Three
   )
SELECT STRFTIME('%Y-%m-%dT%H:%M:%SZ', DATETIME(time,'unixepoch')) AS date_time, name, Cmd_Line, SophosPID, Parent_SophosPID, CAST(path AS TEXT) FROM Tree ORDER BY CAST(Sort_Order AS INT) ASC

AND if you just want a LIST of ALL processes in a BOOT period that a given SophosPID ran use this.

-- This will take a few steps.  First lets narrow down the time range

----------------------
-- DETERMINE THE LOWER AND UPPER TIME LIMITS FOR THE SOPHOS_PID 
----------------------
WITH 
   -- Get the system boot time for the target process This will be the lower bound below which we do not search
   System_boot(boot_time) AS (
      SELECT MAX(meta_boot_time) 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time <= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
         AND CONTAINS(SPLIT(pids,','), SPLIT_PART('$$SophosPID$$',':',1))  -- The desired PID is in the list
      ),
   -- Get the next boot time or 'NOW' as a max upper bound to search  
   Next_boot_time (max_time) AS (
      SELECT CASE WHEN MIN(meta_boot_time) > 0 THEN MIN(meta_boot_time) ELSE CAST(to_unixtime(now()) AS BIGINT) END 
      FROM xdr_data 
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND meta_boot_time >= CAST( SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT) -- System boot must have been before process start 
   ),
   -- Determine if the PID was recycled during the boot session if so it will be the upper bound
   PID_Recycled(pid_time) AS (
      SELECT MIN(time) pid_time
      FROM xdr_data LEFT JOIN Next_boot_time ON CAST(1 AS BOOLEAN)
      WHERE query_name IN ( 'running_processes_linux_events') 
         AND LOWER(meta_hostname) LIKE LOWER('%%$$device_name$$%')
         AND CONTAINS(SPLIT(pids,','), SPLIT_PART('$$SophosPID$$',':',1))  -- The desired PID is in the list
         AND time > CAST(SPLIT_PART('$$SophosPID$$',':',2) AS BIGINT)
         AND time <= max_time
      ),
   -- Set the lower and upper bound as a table for easy access
   Time_Limits(lower_bound, upper_bound) AS (
      SELECT boot_time, CASE WHEN pid_time > 0 THEN pid_time ELSE Next_boot_time.max_time END
      FROM Next_boot_time LEFT JOIN System_Boot ON CAST(1 AS BOOLEAN) LEFT JOIN PID_Recycled ON pid_time < Next_boot_time.max_time
      ),
   -- NOW BUILD A SINGLE TABLE WITH the Device, Time, Name, CmdLine, SophosPID, ParentSophosPID for each process (We have to flatten the groupings)
   All_Data AS (
   SELECT 
      X1.meta_hostname, 
      X1.time, 
      X1.name, 
      rtrim(array_join(array_agg(distinct X1.cmdline||','), chr(10)),',') as cmdline,
      S1.Single_PID||':'||CAST(X1.time AS VARCHAR) SophosPID,
      -- Address PID REUSE with a sub-select to identify the correct time for the parent if multiple PIDs match the ParentPID --
      T1.Single_Parent||':'||CAST(
         (SELECT MAX(X2.time) 
          FROM xdr_data X2 CROSS JOIN UNNEST(SPLIT(X2.pids,',')) AS S2 (Single_PID) CROSS JOIN UNNEST(SPLIT(X2.parents,',')) AS T2 (Single_Parent), Time_Limits TL_2
          WHERE X2.query_name IN ( 'running_processes_linux_events') 
             AND LOWER(X2.meta_hostname) LIKE LOWER('%%$$device_name$$%')
             AND S2.Single_PID = T1.Single_Parent
             AND X2.time >= TL_2.lower_bound
             AND X2.time <= X1.time
         ) AS VARCHAR) Parent_SophosPID,
      X1.path,
      X1.sha256
   FROM xdr_data X1 CROSS JOIN UNNEST(SPLIT(X1.pids,',')) AS S1 (Single_PID) CROSS JOIN UNNEST(SPLIT(X1.parents,',')) AS T1 (Single_Parent), Time_Limits TL_1
   WHERE X1.query_name IN ( 'running_processes_linux_events') 
      AND LOWER(X1.meta_hostname) LIKE LOWER('%%$$device_name$$%')
      AND X1.time >= TL_1.lower_bound
      AND X1.time <= TL_1.upper_bound
   GROUP BY X1.meta_hostname, X1.time, X1.name, X1.time, X1.path, X1.sha256, X1.pids, S1.Single_PID, X1.parents, T1.Single_Parent
   )

SELECT meta_hostname, date_format(from_unixtime(time), '%y-%m-%dt%h:%i:%sz') as date_time, name, cmdline, SophosPID, Parent_SophosPID, path, sha256 FROM ALL_Data ORDER BY time ASC, SophosPID ASC