Hi folks an experimental query to perform MITRE ATT&CK classifications with data from an external repository (GIT)
While we build out the backend to allow us to run with thousands of classification heuristics and richer more complex machine learning classifiers I wanted to experiment with some of the tools for virtual table creation that I wrote.
Often enough we want to process a large volume of data that may exist outside of osquery. perhaps it is from an external threat intelligence site, some internal file share or from a log file that resides on the device. To get that information into a virtual table so you can perform joins I wrote this little query. It will load the data and use GREP and SPLIT to convert it into a table.
LOAD A CSV FROM A REMOTE LOCATION
-- LOAD CSV from GIT LOCATION -- VARIABLE $$URL$$ URL WITH Remote_CSV_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = '$$URL$$') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM Remote_CSV_file WHERE str!='' ), -- Create Table for Remote_CSV_file Remote_Loaded_Table (Col1, Col2, Col3, Col4, Col5, Col6) AS ( SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6 FROM Remote_CSV_file WHERE Line != '' LIMIT 5 ) SELECT * FROM Remote_Loaded_Table
LOAD A CSV FROM LOCAL FILE SYSTEM
-- LOAD CSV from LOCAL SYSTEM -- VARIABLE $$File Path$$ STRING WITH -- LOAD CSV from LOCAL SYSTEM Local_CSV_file AS ( SELECT line FROM grep WHERE pattern = ',' AND path = '$$File Path$$' ), -- Create Table for Local_CSV_file Local_Loaded_Table AS ( SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6 FROM Local_CSV_file WHERE Line != '' LIMIT 5 ) SELECT * FROM Local_Loaded_Table
With the ability to load virtual tables from the system or from a remote location I then proceeded to consolidate over 600 IOC mapping rules into two files. The first contains the full MITRE MAP INFORMATION the second contains the RULES I want to process.
MITRE MAP
https://github.com/karlrackerman/Query_Library/raw/main/MITRE_ID_MAP.CSV
CMDLINE IOC DETECTION RULES
https://github.com/karlrackerman/Query_Library/raw/main/TTP_Rules.CSV
Wit those in place I can now use a query that creates the virtual table to bring that data down to the device when I run a query and I can then process the rules and compare it to cmdlines I am seeing from other tables. You get a nice MITRE Classification whenever you perform a generic search for process information.
-- Generic Search -- VARIABLE: $$Begin Search on date$$ DATE -- VARIABLE: $$Hours to Search$$ STRING -- VARIABLE: $$command line$$ STRING -- VARIABLE: $$process name$$ STRING -- VARIABLE: $$parent process name$$ STRING -- VARIABLE: $$user name$$ STRING -- In order to avoid the watchdog on the device, we will query the journals in in 4 hour chunks (3600 seconds) WITH RECURSIVE Time_Interval(x) AS ( VALUES ( CAST($$Begin Search on date$$ AS INT) ) UNION ALL SELECT x+14400 FROM Time_Interval WHERE x < CAST($$Begin Search on date$$ AS INT) + CAST( $$Hours to Search$$ * 3600 AS INT) ), -- Get map from GIT LOCATION mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS ( WITH mitre_map_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!='' ) SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique FROM mitre_map_file WHERE Line != '' ), -- Get detection rules from GIT LOCATION mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS ( WITH mitre_rule_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!='' ) SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator FROM mitre_rule_file WHERE Line != '' ), -- Map the methods to the description Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator FROM mitre_map mm JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT) ) SELECT CAST( replace(datetime(spj.time,'unixepoch'),' ','T') AS TEXT)Date_Time, -- add the T to help excel understand this is a date and time CAST( users.username AS TEXT) User_Name, CAST( (SELECT processname FROM sophos_process_journal spj2 WHERE spj2.sophosPID = spj.parentSophosPID) AS TEXT) Parent_Process_Name, CAST( spj.processname AS TEXT) Process_Name, CAST( spj.cmdline AS TEXT) CmdLine, CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE spj.processname LIKE '%'||process AND spj.cmdline LIKE indicator) AS TEXT) Mitre_Info, -- SHOW a pretty bar whre the size depends on the execution duration Duration bar is a sqrt function based on execution time CASE CAST( (CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END)/15 AS INT) WHEN 0 THEN '│' ELSE printf('%.' || CAST(CAST(SQRT(CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS INT)/15) AS INT) AS TEXT) ||'c', '█') END Execution_Duration, CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS TEXT) Seconds, CAST( spj.sophosPid AS TEXT) SophosPID, CAST( spj.parentSophosPID AS TEXT) Parent_SophosPID, CAST( spj.SHA256 AS TEXT) SHA256, CAST( CASE spj.eventType WHEN 0 THEN 'Process Running' ELSE 'Process Stopped' END AS TEXT)ProcessStatus, CAST( spj.pathname AS TEXT) PathName FROM Time_Interval t LEFT JOIN sophos_process_journal spj ON spj.time > t.x AND spj.time < t.x+14400 LEFT JOIN users ON uuid LIKE sid WHERE -- SEARCH AND FILTER CRITERIA LOWER(spj.cmdline) LIKE LOWER('%$$command line$$%') AND LOWER(users.username) LIKE LOWER('%$$user name$$%') AND LOWER(spj.processname) LIKE LOWER('%$$process name$$%') AND LOWER(Parent_Process_Name) LIKE LOWER('%$$parent process name$$%') GROUP BY SophosPID
You can also add that into a more exhaustive classifier for all activity.
-- MITRE_ATT&CK Cmdline mappings -- Get map from GIT LOCATION WITH mitre_map_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!='' ), -- Create Table for Mitre_MAP mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS ( SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique FROM mitre_map_file WHERE Line != '' ), -- Get detection rules from GIT LOCATION mitre_rule_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!='' ), -- Create Table for Detection_Rules mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS ( SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator FROM mitre_rule_file WHERE Line != '' ), -- Map the methods to the description Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator FROM mitre_map mm JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT) ) -- Perform the evaluations SELECT REPLACE(DATETIME(spj.time,'unixepoch'),' ','T') date_time, Rule_Map.Info, (SELECT username FROM users WHERE users.uuid = spj.sid) AS username, spj.processname AS process_name, spj.cmdline AS cmd_line, spj.sophosPID AS sophos_pid, spj.pathname AS path_name, (SELECT spj2.processname FROM sophos_process_journal spj2 WHERE spj2.sophospid = spj.parentSophosPID AND spj2.eventtype = 0) parent_name FROM sophos_process_journal spj JOIN Rule_Map ON spj.processname LIKE Rule_Map.process AND spj.cmdline LIKE Rule_Map.Indicator WHERE spj.eventType = 0 AND spj.time > $$Begin search on$$ AND spj.time < $$Begin search on$$ + $$Number of hours to search$$ * 3600
And now that we have PIVOTS you can go from one of the Sophos PIDS to a process three that includes the MITRE enrichments as well
-- Process Tree with MITRE enrichment for CMD LINE -- VARIABLE $$sophosPID$$ SophosPID -- Generate Process Tree WITH RECURSIVE get_ancestors(sophosPID, level, startTime, cmdLine, procName, parentSophosPID, sha256, sid) AS ( SELECT sophosPID, 0, processStartTime, cmdLine, processName, parentSophosPID, sha256, sid FROM sophos_process_journal WHERE sophosPID = '$$sophosPID$$' AND eventtype = 0 UNION ALL SELECT p.sophosPID, get_ancestors.level - 1, p.processStartTime, p.cmdLine, p.processName, p.parentSophosPID, p.sha256, p.sid FROM sophos_process_journal p JOIN get_ancestors ON p.sophosPID = get_ancestors.parentSophosPID ORDER BY 2 ASC ), ancestor_tree AS ( SELECT DISTINCT startTime, printf('%.' || ABS(level) || 'c', '<') || ' ' || procName processBranch, sophosPID, cmdLine, sha256, sid FROM get_ancestors WHERE level < 0 ORDER BY level ASC ), get_children(sophosPID, level, startTime, cmdLine, procName, targetsophosPID, sha256, sid) AS ( SELECT p.sophosPID, 0, p.processStartTime, p.cmdLine, p.processName,spa.targetsophosPID, p.sha256, p.sid FROM sophos_process_journal p LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process' WHERE p.sophosPID = '$$sophosPID$$' UNION ALL SELECT p.sophosPID, get_children.level + 1, p.processStartTime, p.cmdLine, p.processName, spa.targetsophosPID, p.sha256, p.sid FROM sophos_process_journal p JOIN get_children ON p.sophosPID = get_children.targetsophosPID LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process' ORDER BY 2 DESC ), child_tree AS ( SELECT DISTINCT startTime, CASE WHEN level = 0 THEN procName ELSE printf('%.' || level || 'c', '>') || ' ' || procName END as processBranch, sophosPID, cmdLine, sha256, sid FROM get_children ), Full_Tree AS ( SELECT * FROM ancestor_tree UNION ALL SELECT * FROM child_tree ), -- Get map from GIT LOCATION mitre_map_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!='' ), -- Create Table for Mitre_MAP mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS ( SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique FROM mitre_map_file WHERE Line != '' ), -- Get detection rules from GIT LOCATION mitre_rule_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!='' ), -- Create Table for Detection_Rules mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS ( SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator FROM mitre_rule_file WHERE Line != '' ), -- Map the methods to the description Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator FROM mitre_map mm JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT) ) -- USED TO DEBUG -- SELECT * FROM mitre_map -- SELECT CAST(method AS TEXT) Method, CAST(noise_level AS TEXT) Noise_Level, CAST(id AS TEXT) ID, CAST(subid AS TEXT) SubID, CAST(process AS TEXT) Process, CAST(indicator AS TEXT) Indicator FROM mitre_detection_rules -- SELECT * FROM Rule_Map ORDER BY info -- SELECT * FROM mitre_map UNION ALL SELECT * FROM mitre_detection_rules -- Display the Tree and enrich the CMDLINE SELECT DateTime(starttime,'unixepoch') Date_Time, processBranch, cmdline, -- NOTE GROUP_CONCAT incase a cmdline has multiple TTP hits CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE processBranch LIKE '%'||process AND Cmdline LIKE indicator) AS TEXT) Mitre_Info, (SELECT username FROM users WHERE uuid = sid) username, sophosPID, (SELECT pathname FROM sophos_process_journal WHERE sophosPID = Full_Tree.sophosPID AND eventtype = 0) pathname, CAST(sha256 AS TEXT) sha256, sid, ROW_NUMBER() OVER() Row_number FROM Full_Tree
Oh and if you find a particular process you really want to examine, run the extended process tree query on it
-- Extended Process Tree for a SophosPID -- VARIABLE $$SophosPID$$ sophosPID -- NOTE THE PROCESS OR ANCESTORS MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min WITH RECURSIVE -- GET A LIST OF ALL ANCESTORS OF A SOPHOS PID Ancestors(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS ( -- Define the SEED Row as the information on the SophosPID provided SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time, CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0 THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) ELSE strftime('%s','now','+10 minutes') END FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0 UNION ALL -- Recursvly identify all decendents SELECT spj.SophosPID, Level - 1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time, CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0 THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) ELSE strftime('%s','now','+10 minutes') END FROM Ancestors JOIN Sophos_Process_Journal spj ON spj.SophosPID = Ancestors.parent AND CAST(spj.endtime AS INT) = 0 -- Perform a Depth First Search ASC would perform a Breadth First Search ORDER BY 2 DESC ), -- Add Row Numbers to the Ancestor List so we order the tree corretly -- EXCLUDE the line for the specified SophosPID so that when we show the tree we do not have a duplicate row Orderd_Ancestors AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, -1 * ROW_Number() OVER () Row FROM Ancestors WHERE SophosPID NOT IN ('$$SophosPID$$') ), -- GET A LIST OF ALL DECENDENTS OF A SOPHOS PID -- NOTE THE PROCESS OR CHILDREN MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min Children(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS ( -- Define the SEED Row as the information on the SophosPID provided SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time, CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0 THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) ELSE strftime('%s','now','+10 minutes') END FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0 UNION ALL -- Recursvly identify all decendents SELECT spj.SophosPID, Level +1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time, CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0 THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) ELSE strftime('%s','now','+10 minutes') END FROM Children JOIN Sophos_Process_Journal spj ON spj.ParentSophosPID = Children.SophosPID AND CAST(spj.endtime AS INT) = 0 AND spj.time > Children.start -5 and spj.time < Children.end +3600 -- Perform a Depth First Search ASC would perform a Breadth First Search ORDER BY 2 DESC ), -- Add Row Numbers to the Decendent List so we order the tree corretly Orderd_Descendants AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, ROW_Number() OVER () Row FROM Children), -- Now collect the activity for all descendents and the selected sophosPID using a UNION to list the decendent then the file activity it had File_Activity AS ( -- FOR ANCESTORS WE WILL ONLY SHOW THE PROCESS TREE INFO (No activity will be collected) SELECT REPLACE(DATETIME(A.start,'unixepoch'), ' ','T') Date_Time, CASE A.SophosPID WHEN '$$SophosPID$$' THEN A.ProcessName ELSE substr('< < < < < < < < < < < < < < < < < < < < ', 1, A.Level * -2) || A.processName END Process_Tree, '-----------' Subject, '-----------' Action, '-----------' Object, CAST(A.cmdline AS TEXT) Cmd_Line, A.SophosPID SophosPID, A.pathname Process_Pathname, A.sha256 Process_SHA256, A.SID Process_SID, A.Level Level, A.Row Row, 0 Sub_Row, a.start time FROM Orderd_Ancestors A UNION ALL -- SHOW THE PROCESS TREE INFO FOR DESCENDENTS SELECT REPLACE(DATETIME(D.start,'unixepoch'), ' ','T') Date_Time, CASE D.SophosPID WHEN '$$SophosPID$$' THEN D.ProcessName ELSE substr('> > > > > > > > > > > > > > > > > > > > > > ', 1, D.Level * 2) || D.processName END Process_Tree, '-----------' Subject, '-----------' Action, '-----------' Object, CAST(D.cmdline AS TEXT) Cmd_Line, D.SophosPID SophosPID, D.pathname Process_Pathname, D.sha256 Process_SHA256, D.SID Process_SID, D.Level Level, D.Row Row, 0 Sub_Row, D.start time FROM Orderd_Descendants D UNION ALL -- ADD THE PROCESS ACTIVITY FOR EACH DESCENDENT SELECT REPLACE(DATETIME(MIN(spa.time),'unixepoch'),' ','T') Date_Time, CASE D.SophosPID WHEN '$$SophosPID$$' THEN '( '||D.processname||' ) ACTIVITY' ELSE substr('~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~', 1, D.Level * 2) || '( '||D.processname||' ) ACTIVITY' END Process_Tree, spa.subject Subject, CAST(GROUP_CONCAT(DISTINCT spa.action)||' ('||COUNT(spa.action)||')' AS TEXT) Action, spa.object, CAST(D.cmdline AS TEXT) Cmd_Line, D.SophosPID SophosPID, D.pathname Process_Pathname, D.sha256 Process_SHA256, D.SID Process_SID, '' Level, D.Row Row, 1 Sub_Row, spa.time time -- NOTE The details for each process does not include 'FileDataReads', 'FileOtherReads', 'FileBinaryReads', 'Image', 'Thread' FROM Orderd_Descendants D LEFT JOIN Sophos_Process_Activity spa ON -- spa.subject IN ('Thread','DirectoryChanges','Dns','FileBinaryChanges','FileDataChanges','FileOtherChanges','Http','Ip','Network','Url','Registry','Process') -- AND spa.SophosPID = D.SophosPID AND spa.time > D.start-1 AND spa.time < D.end+1 WHERE spa.subject > '' GROUP BY spa.subject, spa.action, spa.object, spa.SophosPID, D.processname, D.SophosPID, D.pathname, D.sha256, D.SID ) -- Now that we have all activity for each descendent, we need to provide the pretty list showing the Process Tree and File activity for each process in the tree SELECT Date_Time, Process_Tree, Subject, Action, Object, Cmd_Line, SophosPID, Process_Pathname, Process_SHA256, Process_SID, row, sub_row, time,ROW_NUMBER() OVER( ORDER BY Row, Sub_Row, Time) SORT_ORDER_FOR_EXCEL FROM File_Activity ORDER By SORT_ORDER_FOR_EXCEL