-- NAME: NDR - Devices generating most network traffic (BARS) -- CATEGORY: NDR -- DESCRIPTION: Detection for identifying the top 100 talkers on a network. -- Fields are ip: the private IP address of the machine, total_bytes: the total number of bytes sent or received by that ip, -- avg_pcr_payload: Producer consumer -1 pure download 1 pure upload. -- SOURCE: Data Lake -- VARIABLE $$IP Address$$ IP ADDRESS -- VARIABLE $$MAC Address$$ STRING -- VARIABLE $$meta_hostname$$ DEVICE_NAME WITH NDR_Data AS ( SELECT CAST(JSON_EXTRACT(raw,'$.ingest_date') AS VARCHAR) Day, CAST(JSON_EXTRACT(raw, '$.name') AS VARCHAR) Report_Name, CAST(JSON_EXTRACT(raw,'$.description') AS VARCHAR) Description, -- Detection Context CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].avg_pcr_payload') AS VARCHAR) avg_pcr_payload, CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].ip') AS VARCHAR) ip, CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].mac') AS VARCHAR) mac, CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].total_bytes') AS DOUBLE) Total_Bytes, CASE WHEN CAST(CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].total_bytes') AS DOUBLE) AS BIGINT) = 0 THEN '' WHEN CAST(SQRT(CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].total_bytes') AS DOUBLE)/(1024*1024)) AS BIGINT) = 0 THEN '|' ELSE RPAD('',CAST(SQRT(SQRT(CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].total_bytes') AS DOUBLE)/(1024*1024))) AS BIGINT),'█') END || CAST(FORMAT(' %,.2f',CAST(CAST(JSON_EXTRACT(raw,'$.detection_context['||CAST(A.x AS VARCHAR)||'].total_bytes') AS DOUBLE)/(1024*1024) AS DOUBLE)) AS VARCHAR) "Total (MB)" FROM mdr_ioc_all, UNNEST(SEQUENCE(0,JSON_ARRAY_LENGTH(JSON_EXTRACT(raw,'$.detection_context'))-1)) AS A(x) WHERE ioc_worker_id = 'worker_ndr' AND CAST(JSON_EXTRACT(raw, '$.name') AS VARCHAR) = 'topTalkers' ) SELECT NDR.Day, -- NDR.Report_Name, -- NDR.Description, FORMAT('%.2f',CAST(NDR.avg_pcr_payload AS DOUBLE)) Avg_PCR_payload, NDR.ip, NDR.mac, "Total (MB)", -- XDR data -- CASE WHEN XDR.meta_hostname > '' THEN 'XDR_Installed' ELSE CASE WHEN NDR.mac > '' THEN 'NOT Managed' ELSE '' END END XDR_Enabled, ARRAY_JOIN(ARRAY_AGG(DISTINCT XDR.meta_hostname),CHR(10)) meta_hostname, ARRAY_JOIN(ARRAY_AGG(DISTINCT XDR.meta_os_platform),CHR(10)) os_platform, ARRAY_JOIN(ARRAY_AGG(DISTINCT XDR.meta_endpoint_type),CHR(10)) endpoint_type, ARRAY_JOIN(ARRAY_AGG(DISTINCT XDR.meta_username),CHR(10)) username FROM NDR_Data NDR LEFT JOIN xdr_data XDR ON LOWER(XDR.meta_mac_address) = LOWER(NDR.mac) -- Include the Stream_Ingest_date match to the NDR.Day info to determine if XDR was installed at the time of the NDR data or not AND XDR.stream_ingest_date = NDR.Day WHERE CASE WHEN NDR.ip > '' THEN LOWER(NDR.ip) LIKE LOWER('%$$IP Address$$%') ELSE CASE WHEN '$$IP Address$$' = '%' THEN CAST('1' AS BOOLEAN) ELSE CAST('1' AS BOOLEAN) END END AND CASE WHEN NDR.mac > '' THEN LOWER(NDR.mac) LIKE LOWER('%$$MAC Address$$%') ELSE CASE WHEN '$$MAC Address$$' = '%' THEN CAST('1' AS BOOLEAN) ELSE CAST('1' AS BOOLEAN) END END AND CASE WHEN XDR.meta_hostname > '' THEN LOWER(XDR.meta_hostname) LIKE LOWER('%$$meta_hostname$$%') ELSE CASE WHEN '$$meta_hostname$$' = '%' THEN CAST('1' AS BOOLEAN) ELSE CAST('0' AS BOOLEAN) END END GROUP BY 1,2,3,4,5,6, NDR.Total_Bytes ORDER BY Day DESC, XDR_Enabled DESC, Total_Bytes DESC
Sample output