Approved

NOTE: Please read through this walkthrough.

Live Discover MITRE ATT&CK Classification and Hunting

Hi folks an experimental query to perform MITRE ATT&CK classifications with data from an external repository (GIT)

While we build out the backend to allow us to run with thousands of classification heuristics and richer more complex machine learning classifiers I wanted to experiment with some of the tools for virtual table creation that I wrote.  

Often enough we want to process a large volume of data that may exist outside of osquery. perhaps it is from an external threat intelligence site, some internal file share or from a log file that resides on the device.   To get that information into a virtual table so you can perform joins I wrote this little query.  It will load the data and use GREP and SPLIT to convert it into a table.

LOAD A CSV FROM A REMOTE LOCATION

-- LOAD CSV from GIT LOCATION
-- VARIABLE $$URL$$        URL
WITH
    Remote_CSV_file(Line, str) AS (
        SELECT '', (SELECT result from curl where url = '$$URL$$') ||char(10)
        UNION ALL
        SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM Remote_CSV_file WHERE str!=''
    ),
    -- Create Table for Remote_CSV_file
    Remote_Loaded_Table (Col1, Col2, Col3, Col4, Col5, Col6) AS (
        SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6
        FROM Remote_CSV_file WHERE Line != ''
        LIMIT 5
    )
SELECT * FROM Remote_Loaded_Table

LOAD A CSV FROM LOCAL FILE SYSTEM

-- LOAD CSV from LOCAL SYSTEM
-- VARIABLE $$File Path$$  STRING
WITH
    -- LOAD CSV from LOCAL SYSTEM
    Local_CSV_file AS (
        SELECT line FROM grep WHERE pattern = ',' AND path = '$$File Path$$'
    ),
    -- Create Table for Local_CSV_file
    Local_Loaded_Table AS (
        SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6
        FROM Local_CSV_file WHERE Line != ''
        LIMIT 5
    )
SELECT * FROM Local_Loaded_Table

With the ability to load virtual tables from the system or from a remote location I then proceeded to consolidate over 600 IOC mapping rules into two files.  The first contains the full MITRE MAP INFORMATION the second contains the RULES I want to process.

MITRE MAP

https://github.com/karlrackerman/Query_Library/raw/main/MITRE_ID_MAP.CSV

CMDLINE IOC DETECTION RULES

https://github.com/karlrackerman/Query_Library/raw/main/TTP_Rules.CSV

Wit those in place I can now use a query that creates the virtual table to bring that data down to the device when I run a query and I can then process the rules and compare it to cmdlines I am seeing from other tables.  You get a nice MITRE Classification whenever you perform a generic search for process information.

-- Generic Search
-- VARIABLE:   $$Begin Search on date$$                  DATE
-- VARIABLE:   $$Hours to Search$$                       STRING
-- VARIABLE:   $$command line$$                          STRING
-- VARIABLE:   $$process name$$                          STRING
-- VARIABLE:   $$parent process name$$                   STRING
-- VARIABLE:   $$user name$$                             STRING

-- In order to avoid the watchdog on the device, we will query the journals in in 4 hour chunks (3600 seconds)
WITH RECURSIVE
	Time_Interval(x) AS (
		VALUES ( CAST($$Begin Search on date$$ AS INT) )
		UNION ALL
		SELECT x+14400 FROM Time_Interval WHERE x < CAST($$Begin Search on date$$ AS INT) + CAST( $$Hours to Search$$ * 3600 AS INT)
	),
-- Get map from GIT LOCATION
mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS (
 WITH mitre_map_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!=''
 )
SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique
FROM mitre_map_file WHERE Line != '' 
),
-- Get detection rules from GIT LOCATION
mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS (
 WITH mitre_rule_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!=''
 )
SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator
FROM mitre_rule_file WHERE Line != ''
),

-- Map the methods to the description
Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator
FROM mitre_map mm
JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT)  
)

SELECT 
   CAST( replace(datetime(spj.time,'unixepoch'),' ','T') AS TEXT)Date_Time, -- add the T to help excel understand this is a date and time
   CAST( users.username AS TEXT) User_Name,
   CAST( (SELECT processname FROM sophos_process_journal spj2 WHERE spj2.sophosPID = spj.parentSophosPID) AS TEXT) Parent_Process_Name,
   CAST( spj.processname AS TEXT) Process_Name,
   CAST( spj.cmdline AS TEXT) CmdLine,
   CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE spj.processname LIKE '%'||process AND spj.cmdline LIKE indicator) AS TEXT) Mitre_Info,  
   
   -- SHOW a pretty bar whre the size depends on the execution duration  Duration bar is a sqrt function based on execution time
   CASE CAST( (CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END)/15 AS INT)
      WHEN 0 THEN '│'
      ELSE printf('%.' || CAST(CAST(SQRT(CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS INT)/15) AS INT) AS TEXT) ||'c', '█')
   END Execution_Duration,
   
   CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS TEXT) Seconds,
   CAST( spj.sophosPid AS TEXT) SophosPID,
   CAST( spj.parentSophosPID AS TEXT) Parent_SophosPID,
   CAST( spj.SHA256 AS TEXT) SHA256,
   CAST( CASE spj.eventType WHEN 0 THEN 'Process Running' ELSE 'Process Stopped' END AS TEXT)ProcessStatus,
   CAST( spj.pathname AS TEXT) PathName
FROM Time_Interval t 
   LEFT JOIN sophos_process_journal spj ON  spj.time > t.x AND spj.time < t.x+14400
   LEFT JOIN users ON uuid LIKE sid
WHERE 
   -- SEARCH AND FILTER CRITERIA
   LOWER(spj.cmdline) LIKE LOWER('%$$command line$$%') AND 
   LOWER(users.username) LIKE LOWER('%$$user name$$%') AND
   LOWER(spj.processname) LIKE LOWER('%$$process name$$%') AND
   LOWER(Parent_Process_Name) LIKE LOWER('%$$parent process name$$%')
GROUP BY SophosPID

You can also add that into a more exhaustive classifier for all activity.

-- MITRE_ATT&CK Cmdline mappings
-- Get map from GIT LOCATION
WITH mitre_map_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!=''
 ),
-- Create Table for Mitre_MAP
mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS (
   SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique
   FROM mitre_map_file WHERE Line != '' 
),
-- Get detection rules from GIT LOCATION
mitre_rule_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!=''
),
-- Create Table for Detection_Rules
mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS (
   SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator
   FROM mitre_rule_file WHERE Line != ''
),

-- Map the methods to the description
Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator
FROM mitre_map mm
JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT)  
)

-- Perform the evaluations
SELECT
  REPLACE(DATETIME(spj.time,'unixepoch'),' ','T') date_time,
  Rule_Map.Info,
  (SELECT username FROM users WHERE users.uuid = spj.sid) AS username,
  spj.processname AS process_name,
  spj.cmdline AS cmd_line,
  spj.sophosPID AS sophos_pid,
  spj.pathname AS path_name,
  (SELECT spj2.processname FROM sophos_process_journal spj2 WHERE spj2.sophospid = spj.parentSophosPID AND spj2.eventtype = 0) parent_name
FROM sophos_process_journal spj
JOIN Rule_Map ON spj.processname LIKE Rule_Map.process AND spj.cmdline LIKE Rule_Map.Indicator
WHERE
  spj.eventType = 0
  AND spj.time > $$Begin search on$$ 
  AND spj.time < $$Begin search on$$ + $$Number of hours to search$$ * 3600

And now that we have PIVOTS you can go from one of the Sophos PIDS to a process three that includes the MITRE enrichments as well

-- Process Tree with MITRE enrichment for CMD LINE
-- VARIABLE $$sophosPID$$      SophosPID

-- Generate Process Tree
WITH RECURSIVE
get_ancestors(sophosPID, level, startTime, cmdLine, procName,    parentSophosPID, sha256, sid) AS (
    SELECT sophosPID, 0, processStartTime, cmdLine, processName, parentSophosPID, sha256, sid
    FROM sophos_process_journal
    WHERE sophosPID = '$$sophosPID$$' AND eventtype = 0

    UNION ALL

    SELECT p.sophosPID, get_ancestors.level - 1, p.processStartTime, p.cmdLine, p.processName, p.parentSophosPID, p.sha256, p.sid
    FROM sophos_process_journal p
    JOIN get_ancestors ON p.sophosPID = get_ancestors.parentSophosPID 
    ORDER BY 2 ASC
),
ancestor_tree AS (
    SELECT DISTINCT 
        startTime,
        printf('%.' || ABS(level) || 'c', '<') || ' ' || procName processBranch,
        sophosPID,
        cmdLine,
        sha256,
        sid
    FROM get_ancestors
    WHERE level < 0
    ORDER BY level ASC
),
get_children(sophosPID, level, startTime, cmdLine, procName, targetsophosPID, sha256, sid) AS (
    SELECT p.sophosPID, 0, p.processStartTime, p.cmdLine, p.processName,spa.targetsophosPID, p.sha256, p.sid
    FROM sophos_process_journal p
    LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process' 
    WHERE p.sophosPID = '$$sophosPID$$'

    UNION ALL

    SELECT p.sophosPID, get_children.level + 1, p.processStartTime, p.cmdLine, p.processName, spa.targetsophosPID, p.sha256, p.sid
    FROM sophos_process_journal p
    JOIN get_children ON p.sophosPID = get_children.targetsophosPID LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process'
    ORDER BY 2 DESC
),
child_tree AS (
    SELECT
        DISTINCT startTime,
        CASE
            WHEN level = 0 THEN procName
            ELSE printf('%.' || level || 'c', '>') || ' ' || procName
        END as processBranch,
        sophosPID,
        cmdLine,
        sha256,
        sid
    FROM get_children
),
Full_Tree AS (
    SELECT * FROM ancestor_tree
    UNION ALL
    SELECT * FROM child_tree
),

-- Get map from GIT LOCATION
mitre_map_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!=''
 ),
-- Create Table for Mitre_MAP
mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS (
   SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique
   FROM mitre_map_file WHERE Line != '' 
),
-- Get detection rules from GIT LOCATION
mitre_rule_file(Line, str) AS (
  SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10)
  UNION ALL
  SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!=''
),
-- Create Table for Detection_Rules
mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS (
   SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator
   FROM mitre_rule_file WHERE Line != ''
),

-- Map the methods to the description
Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator
FROM mitre_map mm
JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT)  
)

-- USED TO DEBUG
   -- SELECT * FROM mitre_map
   -- SELECT CAST(method AS TEXT) Method, CAST(noise_level AS TEXT) Noise_Level, CAST(id AS TEXT) ID, CAST(subid AS TEXT) SubID, CAST(process AS TEXT) Process, CAST(indicator AS TEXT) Indicator FROM mitre_detection_rules
   -- SELECT * FROM Rule_Map ORDER BY info
   -- SELECT * FROM mitre_map UNION ALL SELECT * FROM mitre_detection_rules

-- Display the Tree and enrich the CMDLINE
SELECT 
   DateTime(starttime,'unixepoch') Date_Time, 
   processBranch, 
   cmdline, 
   -- NOTE GROUP_CONCAT incase a cmdline has multiple TTP hits
   CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE processBranch LIKE '%'||process AND Cmdline LIKE indicator) AS TEXT) Mitre_Info,  
   (SELECT username FROM users WHERE uuid = sid) username, 
   sophosPID, 
   (SELECT pathname FROM sophos_process_journal WHERE sophosPID = Full_Tree.sophosPID AND eventtype = 0) pathname,
   CAST(sha256 AS TEXT) sha256, 
   sid,
   ROW_NUMBER() OVER() Row_number
FROM Full_Tree

Oh and if you find a particular process you really want to examine, run the extended process tree query on it Slight smile

-- Extended Process Tree for a SophosPID

-- VARIABLE $$SophosPID$$ sophosPID

-- NOTE THE PROCESS OR ANCESTORS MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min
WITH RECURSIVE 
-- GET A LIST OF ALL ANCESTORS OF A SOPHOS PID
Ancestors(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS (
   -- Define the SEED Row as the information on the SophosPID provided
   SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time,
     CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0 
        THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0)
        ELSE strftime('%s','now','+10 minutes') END
    FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0
   
   UNION ALL
   
   -- Recursvly identify all decendents
   SELECT spj.SophosPID, Level - 1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time, 
      CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0
        THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0)
        ELSE strftime('%s','now','+10 minutes') END
   FROM Ancestors JOIN Sophos_Process_Journal spj ON spj.SophosPID = Ancestors.parent AND CAST(spj.endtime AS INT) = 0 
   -- Perform a Depth First Search ASC would perform a Breadth First Search
   ORDER BY 2 DESC
   ),
-- Add Row Numbers to the Ancestor List so we order the tree corretly
-- EXCLUDE the line for the specified SophosPID so that when we show the tree we do not have a duplicate row
Orderd_Ancestors AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, -1 * ROW_Number() OVER () Row FROM Ancestors WHERE SophosPID NOT IN ('$$SophosPID$$') ),

-- GET A LIST OF ALL DECENDENTS OF A SOPHOS PID
-- NOTE THE PROCESS OR CHILDREN MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min
Children(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS (
   -- Define the SEED Row as the information on the SophosPID provided
   SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time,
     CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0 
        THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0)
        ELSE strftime('%s','now','+10 minutes') END
    FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0
   
   UNION ALL
   
   -- Recursvly identify all decendents
   SELECT spj.SophosPID, Level +1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time, 
      CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0
        THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0)
        ELSE strftime('%s','now','+10 minutes') END
   FROM Children JOIN Sophos_Process_Journal spj ON spj.ParentSophosPID = Children.SophosPID AND CAST(spj.endtime AS INT) = 0 AND spj.time > Children.start -5 and spj.time < Children.end +3600
   -- Perform a Depth First Search ASC would perform a Breadth First Search
   ORDER BY 2 DESC
   ),
   
   -- Add Row Numbers to the Decendent List so we order the tree corretly
   Orderd_Descendants AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, ROW_Number() OVER () Row FROM Children),

   -- Now collect the activity for all descendents and the selected sophosPID using a UNION to list the decendent then the file activity it had
   File_Activity AS (

     -- FOR ANCESTORS WE WILL ONLY SHOW THE PROCESS TREE INFO (No activity will be collected)
     SELECT 
      REPLACE(DATETIME(A.start,'unixepoch'), ' ','T') Date_Time,
      CASE A.SophosPID 
		   WHEN '$$SophosPID$$' THEN A.ProcessName 
		   ELSE substr('< < < < < < < < < < < < < < < < < < < < ', 1, A.Level * -2)  || A.processName 
	   END Process_Tree,
	   '-----------' Subject,
      '-----------' Action,
      '-----------' Object,
      CAST(A.cmdline AS TEXT) Cmd_Line,
      A.SophosPID SophosPID,
      A.pathname Process_Pathname,
      A.sha256 Process_SHA256,
      A.SID Process_SID,
      A.Level Level,
      A.Row Row,
      0 Sub_Row,
      a.start time
   FROM Orderd_Ancestors A 

   UNION ALL
   
   -- SHOW THE PROCESS TREE INFO FOR DESCENDENTS
   SELECT 
      REPLACE(DATETIME(D.start,'unixepoch'), ' ','T') Date_Time,
      CASE D.SophosPID 
		   WHEN '$$SophosPID$$' THEN D.ProcessName 
		   ELSE substr('> > > > > > > > > > > > > > > > > > > > > > ', 1, D.Level * 2) || D.processName 
	   END Process_Tree,
	   '-----------' Subject,
      '-----------' Action,
      '-----------' Object,
      CAST(D.cmdline AS TEXT) Cmd_Line,
      D.SophosPID SophosPID,
      D.pathname Process_Pathname,
      D.sha256 Process_SHA256,
      D.SID Process_SID,
      D.Level Level,
      D.Row Row,
      0 Sub_Row,
      D.start time
   FROM Orderd_Descendants D 

   UNION ALL

   -- ADD THE PROCESS ACTIVITY FOR EACH DESCENDENT
   SELECT 
      REPLACE(DATETIME(MIN(spa.time),'unixepoch'),' ','T') Date_Time,
      CASE D.SophosPID 
		   WHEN '$$SophosPID$$' THEN '( '||D.processname||' ) ACTIVITY' 
		   ELSE substr('~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~', 1, D.Level * 2) || '( '||D.processname||' ) ACTIVITY' 
	   END Process_Tree,
      spa.subject Subject,
      CAST(GROUP_CONCAT(DISTINCT spa.action)||' ('||COUNT(spa.action)||')' AS TEXT) Action,
      spa.object,
      CAST(D.cmdline AS TEXT) Cmd_Line,
      D.SophosPID SophosPID,
      D.pathname Process_Pathname,
      D.sha256 Process_SHA256,
      D.SID Process_SID,
      '' Level,
      D.Row Row,
      1 Sub_Row,
      spa.time time
   -- NOTE The details for each process does not include 'FileDataReads', 'FileOtherReads', 'FileBinaryReads', 'Image', 'Thread' 
   FROM Orderd_Descendants D LEFT JOIN Sophos_Process_Activity spa ON -- spa.subject IN ('Thread','DirectoryChanges','Dns','FileBinaryChanges','FileDataChanges','FileOtherChanges','Http','Ip','Network','Url','Registry','Process')
--      AND 
      spa.SophosPID = D.SophosPID 
      AND spa.time > D.start-1 
      AND spa.time < D.end+1
   WHERE spa.subject > ''
   GROUP BY spa.subject, spa.action, spa.object, spa.SophosPID, D.processname, D.SophosPID, D.pathname, D.sha256, D.SID
   )
   
-- Now that we have all activity for each descendent, we need to provide the pretty list showing the Process Tree and File activity for each process in the tree
SELECT Date_Time, Process_Tree, Subject, Action, Object, Cmd_Line, SophosPID, Process_Pathname, Process_SHA256, Process_SID, row, sub_row, time,ROW_NUMBER() OVER( ORDER BY Row, Sub_Row, Time) SORT_ORDER_FOR_EXCEL 
FROM File_Activity 
ORDER By SORT_ORDER_FOR_EXCEL