Skip to content

Commit

Permalink
Merge pull request #62 from ddbnl/master
Browse files Browse the repository at this point in the history
Add Azure Log Analytics interface
  • Loading branch information
ddbnl authored Mar 12, 2024
2 parents 4223ea6 + a1117b4 commit 42c21bf
Show file tree
Hide file tree
Showing 17 changed files with 298 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Release/Linux/OfficeAuditLogCollector filter=lfs diff=lfs merge=lfs -text
Release/Windows/OfficeAuditLogCollector.exe filter=lfs diff=lfs merge=lfs -text
4 changes: 4 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# These are supported funding model platforms


buy_me_a_coffee: ddbnl
114 changes: 111 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ serde_derive = "1.0.136"
clap = { version = "4.5.2", features = ["derive"] }
csv = "1.3.0"
poston = "0.7.8"
base64 = "0.22.0"
hmac = "0.12.1"
sha2 = "0.10.8"
async-trait = "0.1.77"
14 changes: 14 additions & 0 deletions Release/ConfigExamples/logAnalytics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
collect:
contentTypes:
Audit.General: True
Audit.AzureActiveDirectory: True
Audit.Exchange: True
Audit.SharePoint: True
DLP.All: True
output:
azureLogAnalytics:
workspaceId: 11111111-1111-1111-1111-1111111111111
# Get shared key through AZ CLI:
# az monitor log-analytics workspace get-shared-keys --resource-group my-rg --workspace-name my-oms --query "primarySharedKey"
# Then run collector with:
# OfficeAuditLogCollector [...] --oms-key '12345'
Binary file modified Release/Linux/OfficeAuditLogCollector
Binary file not shown.
Binary file added Release/Windows/OfficeAuditLogCollector.exe
Binary file not shown.
34 changes: 19 additions & 15 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::api_connection;
use crate::api_connection::ApiConnection;
use crate::config::{Config, ContentTypesSubConfig};
use crate::data_structures::{ArbitraryJson, Caches, CliArgs, ContentToRetrieve, JsonList};
use crate::interfaces::azure_oms_interface::OmsInterface;
use crate::interfaces::interface::Interface;
use crate::interfaces::file_interface::FileInterface;
use crate::interfaces::fluentd_interface::FluentdInterface;
Expand Down Expand Up @@ -55,6 +56,9 @@ impl Collector {
if config.output.graylog.is_some() {
interfaces.push(Box::new(GraylogInterface::new(config.clone())));
}
if config.output.oms.is_some() {
interfaces.push(Box::new(OmsInterface::new(config.clone(), args.oms_key.clone())));
}

// Initialize collector threads
let api = api_connection::get_api_connection(
Expand Down Expand Up @@ -94,7 +98,7 @@ impl Collector {

/// Monitor all started content retrieval threads, processing results and terminating
/// when all content has been retrieved (signalled by a final run stats message).
pub fn monitor(&mut self) {
pub async fn monitor(&mut self) {

let start = Instant::now();
loop {
Expand All @@ -106,12 +110,12 @@ impl Collector {
}
// Run stats are only returned when all content has been retrieved,
// therefore this signals the end of the run.
if self.check_stats() {
if self.check_stats().await {
break
}

// Check if a log came in.
self.check_results();
self.check_results().await;
}
self.end_run();
}
Expand All @@ -120,25 +124,25 @@ impl Collector {
self.config.save_known_blobs(&self.known_blobs);
}

fn check_results(&mut self) {
async fn check_results(&mut self) {

if let Ok(Some((msg, content))) = self.result_rx.try_next() {
self.handle_content(msg, content);
self.handle_content(msg, content).await;
}
}

fn handle_content(&mut self, msg: String, content: ContentToRetrieve) {
async fn handle_content(&mut self, msg: String, content: ContentToRetrieve) {
self.known_blobs.insert(content.content_id.clone(), content.expiration.clone());
if let Ok(logs) = serde_json::from_str::<JsonList>(&msg) {
for log in logs {
self.handle_log(log, &content);
self.handle_log(log, &content).await;
}
} else {
warn!("Skipped log that could not be parsed: {}", content.content_id)
}
}

fn handle_log(&mut self, mut log: ArbitraryJson, content: &ContentToRetrieve) {
async fn handle_log(&mut self, mut log: ArbitraryJson, content: &ContentToRetrieve) {

if let Some(filters) = self.filters.get(&content.content_type) {
for (k, v) in filters.iter() {
Expand All @@ -154,17 +158,17 @@ impl Collector {
self.cache.insert(log, &content.content_type);
self.saved += 1;
if self.cache.full() {
self.output();
self.output().await;
}
}
fn check_stats(&mut self) -> bool {
async fn check_stats(&mut self) -> bool {

if let Ok(Some((found,
successful,
retried,
failed))) = self.stats_rx.try_next() {

self.output();
self.output().await;
let output = self.get_output_string(
found,
successful,
Expand All @@ -180,15 +184,15 @@ impl Collector {
}
}

fn output(&mut self) {
async fn output(&mut self) {

let mut cache = Caches::new(self.cache.size);
swap(&mut self.cache, &mut cache);
if self.interfaces.len() == 1 {
self.interfaces.get_mut(0).unwrap().send_logs(cache);
self.interfaces.get_mut(0).unwrap().send_logs(cache).await;
} else {
for interface in self.interfaces.iter_mut() {
interface.send_logs(cache.clone());
interface.send_logs(cache.clone()).await;
}
}
}
Expand Down Expand Up @@ -287,7 +291,7 @@ fn initialize_channels(
retries: config.collect.retries.unwrap_or(3),
kill_rx,
};
return (blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx,
(blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx,
stats_rx, kill_tx)
}

Expand Down
Loading

0 comments on commit 42c21bf

Please sign in to comment.