From d5185a6e7f6168a63fe925cab518cdad158ba825 Mon Sep 17 00:00:00 2001 From: ddbnl Date: Thu, 14 Apr 2022 16:29:03 +0200 Subject: [PATCH 1/3] Change file output to CSV Change file output to CSV with a config parameter for separator --- ConfigExamples/fileOutput.yaml | 4 +++- ConfigExamples/fullConfig.yaml | 6 +++-- Source/AuditLogCollector.py | 27 +++++++++++------------ Source/Interfaces/AzureOMSInterface.py | 2 +- Source/Interfaces/FileInterface.py | 29 +++++++++++++++++++++++++ Source/requirements.txt | Bin 976 -> 1064 bytes 6 files changed, 50 insertions(+), 18 deletions(-) create mode 100644 Source/Interfaces/FileInterface.py diff --git a/ConfigExamples/fileOutput.yaml b/ConfigExamples/fileOutput.yaml index c8ec0b2..77b2adf 100644 --- a/ConfigExamples/fileOutput.yaml +++ b/ConfigExamples/fileOutput.yaml @@ -10,4 +10,6 @@ collect: output: file: enabled: True - path: 'output.txt' \ No newline at end of file + path: 'output.txt' + separateByContentType: True + separator: ';' \ No newline at end of file diff --git a/ConfigExamples/fullConfig.yaml b/ConfigExamples/fullConfig.yaml index 1a9faf0..c1c600b 100644 --- a/ConfigExamples/fullConfig.yaml +++ b/ConfigExamples/fullConfig.yaml @@ -22,9 +22,11 @@ filter: # Only logs that match ALL filters for a content type are collected. Le Audit.SharePoint: DLP.All: output: - file: + file: # CSV output enabled: False - path: 'output.txt' + separateByContentType: True # Creates a separate CSV file for each content type, appends content name to path + path: 'output.csv' + separator: ';' azureLogAnalytics: enabled: False workspaceId: diff --git a/Source/AuditLogCollector.py b/Source/AuditLogCollector.py index b3f269c..42af0e6 100644 --- a/Source/AuditLogCollector.py +++ b/Source/AuditLogCollector.py @@ -1,4 +1,4 @@ -from Interfaces import AzureOMSInterface, GraylogInterface, PRTGInterface +from Interfaces import AzureOMSInterface, GraylogInterface, PRTGInterface, FileInterface import AuditLogSubscriber import ApiConnection import os @@ -51,7 +51,7 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno self.filters = {} self.file_output = file_output - self.output_path = output_path + self.file_interface = FileInterface.FileInterface(**kwargs) self.azure_oms_output = azure_oms_output self.azure_oms_interface = AzureOMSInterface.AzureOMSInterface(**kwargs) self.graylog_output = graylog_output @@ -151,7 +151,11 @@ def _load_file_output_config(self, config): if 'enabled' in config['output']['file']: self.file_output = config['output']['file']['enabled'] if 'path' in config['output']['file']: - self.output_path = config['output']['file']['path'] + self.file_interface.output_path = config['output']['file']['path'] + if 'separateByContentType' in config['output']['file']: + self.file_interface.separate_by_content_type = config['output']['file']['separateByContentType'] + if 'separator' in config['output']['file']: + self.file_interface.separator = config['output']['file']['separator'] def _load_azure_log_analytics_output_config(self, config): """ @@ -367,6 +371,8 @@ def _get_available_content(self, content_type, start_time): def _start_interfaces(self): + if self.file_output: + self.file_interface.start() if self.azure_oms_output: self.azure_oms_interface.start() if self.prtg_output: @@ -376,6 +382,8 @@ def _start_interfaces(self): def _stop_interfaces(self): + if self.file_output: + self.file_interface.stop() if self.azure_oms_output: self.azure_oms_interface.stop() if self.prtg_output: @@ -470,7 +478,7 @@ def _output_results(self, results, content_type): :param results: list of JSON """ if self.file_output: - self._output_results_to_file(*results) + self.file_interface.send_messages(*results, content_type=content_type) if self.prtg_output: self.prtg_interface.send_messages(*results, content_type=content_type) if self.graylog_output: @@ -490,15 +498,6 @@ def _check_filters(self, log, content_type): return False return True - def _output_results_to_file(self, *results): - """ - Dump received JSON messages to a file. - :param results: retrieved JSON (dict) - """ - for result in results: - with open(self.output_path, 'a') as ofile: - ofile.write("{}\n".format(json.dumps(obj=result))) - def _add_known_log(self): """ Add a content ID to the known content file to avoid saving messages more than once. @@ -677,7 +676,7 @@ def known_content(self): tenant_id=argsdict['tenant_id'], secret_key=argsdict['secret_key'], client_key=argsdict['client_key'], content_types=content_types, publisher_id=argsdict['publisher_id'], resume=argsdict['resume'], fallback_time=fallback_time, skip_known_logs=argsdict['skip_known_logs'], log_path=argsdict['log_path'], - file_output=argsdict['file'], output_path=argsdict['output_path'], debug=argsdict['debug_logging'], + file_output=argsdict['file'], path=argsdict['output_path'], debug=argsdict['debug_logging'], prtg_output=argsdict['prtg'], azure_oms_output=argsdict['azure'], workspace_id=argsdict['azure_workspace'], shared_key=argsdict['azure_key'], diff --git a/Source/Interfaces/AzureOMSInterface.py b/Source/Interfaces/AzureOMSInterface.py index 62710b3..c53fafb 100644 --- a/Source/Interfaces/AzureOMSInterface.py +++ b/Source/Interfaces/AzureOMSInterface.py @@ -118,6 +118,6 @@ def _post_data(self, body, log_type, time_generated): response.close() if 200 <= status_code <= 299: - logging.info('Accepted payload:' + body) + logging.debug('Accepted payload:' + body) else: raise RuntimeError("Unable to send to OMS with {}: {} ".format(status_code, json_output)) \ No newline at end of file diff --git a/Source/Interfaces/FileInterface.py b/Source/Interfaces/FileInterface.py new file mode 100644 index 0000000..4c3b088 --- /dev/null +++ b/Source/Interfaces/FileInterface.py @@ -0,0 +1,29 @@ +import logging +import os +from . import _Interface +import collections +import pandas + + +class FileInterface(_Interface.Interface): + + def __init__(self, path='output', separate_by_content_type=True, separator=';', **kwargs): + """ + Interface to send logs to an Azure Log Analytics Workspace. + """ + super().__init__(**kwargs) + self.path = path + self.paths = {} + self.separate_by_content_type = separate_by_content_type + self.separator = separator + + def _send_message(self, msg, content_type, **kwargs): + + if content_type not in self.paths: + self.paths[content_type] = "{}_{}.csv".format(self.path, content_type.replace('.', '')) \ + if self.separate_by_content_type else self.path + df = pandas.json_normalize(msg) + df.to_csv(self.paths[content_type], index=False, sep=self.separator, mode='a', + header=not os.path.exists(self.paths[content_type])) + + diff --git a/Source/requirements.txt b/Source/requirements.txt index 67dee77d174832782e6b76ee24edf78eba195f9f..23709cdf452a431cd487bc24126b0211d27ff458 100644 GIT binary patch delta 86 zcmcb>zJg=J1xCX>hEj%Hh609423sIBWYA+U0zy3oV+LLZE}(cKLmop4kSvDDZ$8TC W%_v;LPz4q+VlaSeFr3WJd=dctWDr9D delta 16 YcmZ3%ae;lq1;))hOb(2bGnfwm05pXK$p8QV From db8e04b5d478301c196ddae116840f7a2ebd0593 Mon Sep 17 00:00:00 2001 From: ddbnl Date: Fri, 15 Apr 2022 21:53:00 +0200 Subject: [PATCH 2/3] Add SQL output --- .../{fileOutput.yaml => CsvOutput.yaml} | 2 +- ...FileOutput.yaml => filteredCsvOutput.yaml} | 0 ConfigExamples/fullConfig.yaml | 4 + ConfigExamples/sql.yaml | 12 + README.md | 24 +- Source/AuditLogCollector.py | 87 +++---- Source/Interfaces/FileInterface.py | 52 ++++- Source/Interfaces/PRTGInterface.py | 5 + Source/Interfaces/SqlInterface.py | 221 ++++++++++++++++++ Source/Interfaces/_Interface.py | 8 +- 10 files changed, 363 insertions(+), 52 deletions(-) rename ConfigExamples/{fileOutput.yaml => CsvOutput.yaml} (92%) rename ConfigExamples/{filteredFileOutput.yaml => filteredCsvOutput.yaml} (100%) create mode 100644 ConfigExamples/sql.yaml create mode 100644 Source/Interfaces/SqlInterface.py diff --git a/ConfigExamples/fileOutput.yaml b/ConfigExamples/CsvOutput.yaml similarity index 92% rename from ConfigExamples/fileOutput.yaml rename to ConfigExamples/CsvOutput.yaml index 77b2adf..f84809c 100644 --- a/ConfigExamples/fileOutput.yaml +++ b/ConfigExamples/CsvOutput.yaml @@ -10,6 +10,6 @@ collect: output: file: enabled: True - path: 'output.txt' + path: 'output' separateByContentType: True separator: ';' \ No newline at end of file diff --git a/ConfigExamples/filteredFileOutput.yaml b/ConfigExamples/filteredCsvOutput.yaml similarity index 100% rename from ConfigExamples/filteredFileOutput.yaml rename to ConfigExamples/filteredCsvOutput.yaml diff --git a/ConfigExamples/fullConfig.yaml b/ConfigExamples/fullConfig.yaml index c1c600b..cfba813 100644 --- a/ConfigExamples/fullConfig.yaml +++ b/ConfigExamples/fullConfig.yaml @@ -31,6 +31,10 @@ output: enabled: False workspaceId: sharedKey: + sql: + enabled: False + cacheSize: 500000 # Amount of logs to cache until each SQL commit, larger=faster but eats more memory + chunkSize: 2000 # Amount of rows to write simultaneously to SQL, in most cases just set it as high as your DB allows. COUNT errors = too high graylog: enabled: False address: diff --git a/ConfigExamples/sql.yaml b/ConfigExamples/sql.yaml new file mode 100644 index 0000000..fdb5603 --- /dev/null +++ b/ConfigExamples/sql.yaml @@ -0,0 +1,12 @@ +collect: + contentTypes: + Audit.General: True + Audit.AzureActiveDirectory: True + Audit.Exchange: True + Audit.SharePoint: True + DLP.All: True +output: + sql: + enabled: True + cacheSize: 500000 + chunkSize: 2000 \ No newline at end of file diff --git a/README.md b/README.md index 9f26865..a1fed85 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,10 @@ If you have any issues or questions, feel free to create an issue in this repo. - The following outputs are supported: - Azure Analytics Workspace (OMS) - PRTG Network Monitor + - ( Azure ) SQL server - Graylog (or any other source that accepts a simple socket connection) - - Local file + - CSV Local file + - Power BI (indirectly through SQL or CSV) Simply download the executable you need from the Windows or Linux folder and copy a config file from the ConfigExamples folder that suits your need: - Windows: @@ -40,11 +42,14 @@ See the following link for more info on the management APIs: https://msdn.micros ## Roadmap: +- Add AzureBlob and AzureTable outputs - Automate onboarding as much as possible to make it easier to use - Make a container that runs this script - Create a tutorial for automatic onboarding + docker container for the easiest way to run this ## Latest changes: +- Added SQL output for Power BI +- Changed file to CSV output - Added PRTG output - Added filters - Added YAML config file @@ -63,14 +68,15 @@ See the following link for more info on the management APIs: https://msdn.micros ## Use cases: - Ad-lib log retrieval; -- Scheduling regular execution to retrieve the full audit trail. +- Scheduling regular execution to retrieve the full audit trail in SIEM - Output to PRTG for alerts on audit logs +- Output to (Azure) SQL / CSV for Power BI ## Features: - Subscribe to the audit logs of your choice through the --interactive-subscriber switch, or automatically when collecting logs; - Collect General, Exchange, Sharepoint, Azure active directory and/or DLP audit logs through the collector script; -- Output to file, PRTG, Azure Log Analytics or to a Graylog input (i.e. send the logs over a network socket). +- Output to CSV, PRTG, Azure Log Analytics, SQL or to a Graylog input (i.e. send the logs over a network socket). ## Requirements: - Office365 tenant; @@ -141,6 +147,18 @@ Probably at least 300 seconds. Run the script manually first to check how long i - Match the interval of the sensor to the amount of hours of logs to retrieve. If your interval is 1 hour, hoursToCollect in the config file should also be set to one hour. +### (optional) Using ( Azure ) SQL + +If you are running this script to get audit events in an SQL database you will need an ODBC driver and a connection string +- The collector uses PYODBC, which needs an ODBC driver, examples on how to install this: + - On windows: pip install pywin; pywin install pyodbc + - On Linux: apt get install unixodbc-dev +- Connection string might look like this: "Driver={ODBC Driver 17 for SQL Server};Server=tcp:mydatabase.com,1433;Database=mydatabase;Uid=myuser;Pwd=mypassword;Encrypt +=yes;TrustServerCertificate=no;Connection Timeout=30;" +- Use SQL example config and pass --sql-string parameter when running the collector with your connection string + + + ### (optional) Creating a Graylog input If you are running this script to get audit events in Graylog you will need to create a Graylog input. diff --git a/Source/AuditLogCollector.py b/Source/AuditLogCollector.py index 42af0e6..1e868bc 100644 --- a/Source/AuditLogCollector.py +++ b/Source/AuditLogCollector.py @@ -1,4 +1,4 @@ -from Interfaces import AzureOMSInterface, GraylogInterface, PRTGInterface, FileInterface +from Interfaces import AzureOMSInterface, SqlInterface, GraylogInterface, PRTGInterface, FileInterface import AuditLogSubscriber import ApiConnection import os @@ -17,7 +17,7 @@ class AuditLogCollector(ApiConnection.ApiConnection): def __init__(self, content_types=None, resume=True, fallback_time=None, skip_known_logs=True, log_path='collector.log', debug=False, auto_subscribe=False, max_threads=20, retries=3, - retry_cooldown=3, file_output=False, output_path=None, graylog_output=False, azure_oms_output=False, + retry_cooldown=3, file_output=False, sql_output=False, graylog_output=False, azure_oms_output=False, prtg_output=False, **kwargs): """ Object that can retrieve all available content blobs for a list of content types and then retrieve those @@ -32,7 +32,6 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno :param log_path: path of file to log to (str) :param debug: enable debug logging (Bool) :param auto_subscribe: automatically subscribe to audit log feeds for which content is retrieved (Bool) - :param output_path: path to output retrieved logs to (None=no file output) (string) :param graylog_output: Enable graylog Interface (Bool) :param azure_oms_output: Enable Azure workspace analytics OMS Interface (Bool) :param prtg_output: Enable PRTG output (Bool) @@ -44,6 +43,7 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno hours=23) self.retries = retries self.retry_cooldown = retry_cooldown + self.max_threads = max_threads self.skip_known_logs = skip_known_logs self.log_path = log_path self.debug = debug @@ -54,11 +54,12 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno self.file_interface = FileInterface.FileInterface(**kwargs) self.azure_oms_output = azure_oms_output self.azure_oms_interface = AzureOMSInterface.AzureOMSInterface(**kwargs) + self.sql_output = sql_output + self.sql_interface = SqlInterface.SqlInterface(**kwargs) self.graylog_output = graylog_output self.graylog_interface = GraylogInterface.GraylogInterface(**kwargs) self.prtg_output = prtg_output self.prtg_interface = PRTGInterface.PRTGInterface(**kwargs) - self.max_threads = max_threads self._last_run_times = {} self._known_content = {} @@ -72,6 +73,18 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno self.logs_retrieved = 0 self.errors_retrieving = 0 + @property + def all_interfaces(self): + + return {self.file_interface: self.file_output, self.azure_oms_interface: self.azure_oms_output, + self.sql_interface: self.sql_output, self.graylog_interface: self.graylog_output, + self.prtg_interface: self.prtg_output} + + @property + def all_enabled_interfaces(self): + + return [interface for interface, enabled in self.all_interfaces.items() if enabled] + @property def all_content_types(self): """ @@ -140,6 +153,7 @@ def _load_output_config(self, config): if 'output' in config: self._load_file_output_config(config=config) self._load_azure_log_analytics_output_config(config=config) + self._load_sql_output_config(config=config) self._load_graylog_output_config(config=config) self._load_prtg_output_config(config=config) @@ -169,6 +183,18 @@ def _load_azure_log_analytics_output_config(self, config): if 'sharedKey' in config['output']['azureLogAnalytics']: self.azure_oms_interface.shared_key = config['output']['azureLogAnalytics']['sharedKey'] + def _load_sql_output_config(self, config): + """ + :param config: str + """ + if 'sql' in config['output']: + if 'enabled' in config['output']['sql']: + self.sql_output = config['output']['sql']['enabled'] + if 'cacheSize' in config['output']['sql']: + self.sql_interface.cache_size = config['output']['sql']['cacheSize'] + if 'chunkSize' in config['output']['sql']: + self.sql_interface.chunk_size = config['output']['sql']['chunkSize'] + def _load_graylog_output_config(self, config): """ :param config: str @@ -217,10 +243,9 @@ def _prepare_to_run(self): self._clean_known_content() self._clean_known_logs() self.logs_retrieved = 0 - self.graylog_interface.successfully_sent = 0 - self.graylog_interface.unsuccessfully_sent = 0 - self.azure_oms_interface.successfully_sent = 0 - self.azure_oms_interface.unsuccessfully_sent = 0 + for interface in self.all_enabled_interfaces: + interface.successfully_sent = 0 + interface.unsuccessfully_sent = 0 self.run_started = datetime.datetime.now() def run_once(self, start_time=None): @@ -246,8 +271,6 @@ def _finish_run(self): if self.resume and self._last_run_times: with open('last_run_times', 'w') as ofile: json.dump(fp=ofile, obj=self._last_run_times) - if self.prtg_output: - self.prtg_interface.output() self._log_statistics() def _log_statistics(self): @@ -256,12 +279,9 @@ def _log_statistics(self): """ logging.info("Finished. Total logs retrieved: {}. Total logs with errors: {}. Run time: {}.".format( self.logs_retrieved, self.errors_retrieving, datetime.datetime.now() - self.run_started)) - if self.azure_oms_output: - logging.info("Azure OMS output report: {} successfully sent, {} errors".format( - self.azure_oms_interface.successfully_sent, self.azure_oms_interface.unsuccessfully_sent)) - if self.graylog_output: - logging.info("Graylog output report: {} successfully sent, {} errors".format( - self.graylog_interface.successfully_sent, self.graylog_interface.unsuccessfully_sent)) + for interface in self.all_enabled_interfaces: + logging.info("{} reports: {} successfully sent, {} errors".format( + interface.__class__.__name__, interface.successfully_sent, interface.unsuccessfully_sent)) def _get_last_run_times(self): """ @@ -371,25 +391,13 @@ def _get_available_content(self, content_type, start_time): def _start_interfaces(self): - if self.file_output: - self.file_interface.start() - if self.azure_oms_output: - self.azure_oms_interface.start() - if self.prtg_output: - self.prtg_interface.start() - if self.graylog_output: - self.graylog_interface.start() + for interface in self.all_enabled_interfaces: + interface.start() def _stop_interfaces(self): - if self.file_output: - self.file_interface.stop() - if self.azure_oms_output: - self.azure_oms_interface.stop() - if self.prtg_output: - self.prtg_interface.stop() - if self.graylog_output: - self.graylog_interface.stop() + for interface in self.all_enabled_interfaces: + interface.stop() def _monitor_blobs_to_collect(self): """ @@ -477,14 +485,8 @@ def _output_results(self, results, content_type): :param content_type: Type of API being retrieved for, e.g. 'Audit.Exchange' (str) :param results: list of JSON """ - if self.file_output: - self.file_interface.send_messages(*results, content_type=content_type) - if self.prtg_output: - self.prtg_interface.send_messages(*results, content_type=content_type) - if self.graylog_output: - self.graylog_interface.send_messages(*results, content_type=content_type) - if self.azure_oms_output: - self.azure_oms_interface.send_messages(*results, content_type=content_type) + for interface in self.all_enabled_interfaces: + interface.send_messages(*results, content_type=content_type) def _check_filters(self, log, content_type): """ @@ -610,6 +612,8 @@ def known_content(self): parser.add_argument('secret_key', type=str, help='Secret key generated by Azure application', action='store') parser.add_argument('--config', metavar='config', type=str, help='Path to YAML config file', action='store', dest='config') + parser.add_argument('--sql-string', metavar='sql_string', type=str, + help='Connection string for SQL output interface', action='store', dest='sql_string') parser.add_argument('--interactive-subscriber', action='store_true', help='Manually (un)subscribe to audit log feeds', dest='interactive_subscriber') parser.add_argument('--general', action='store_true', help='Retrieve General content', dest='general') @@ -681,7 +685,8 @@ def known_content(self): azure_oms_output=argsdict['azure'], workspace_id=argsdict['azure_workspace'], shared_key=argsdict['azure_key'], gl_address=argsdict['graylog_addr'], gl_port=argsdict['graylog_port'], - graylog_output=argsdict['graylog']) + graylog_output=argsdict['graylog'], + sql_connection_string=argsdict['sql_string']) if argsdict['config']: collector.load_config(path=argsdict['config']) collector.init_logging() diff --git a/Source/Interfaces/FileInterface.py b/Source/Interfaces/FileInterface.py index 4c3b088..f6c1d42 100644 --- a/Source/Interfaces/FileInterface.py +++ b/Source/Interfaces/FileInterface.py @@ -3,27 +3,67 @@ from . import _Interface import collections import pandas +import threading +import time class FileInterface(_Interface.Interface): - def __init__(self, path='output', separate_by_content_type=True, separator=';', **kwargs): + def __init__(self, path='output', separate_by_content_type=True, separator=';', cache_size=500000, **kwargs): """ - Interface to send logs to an Azure Log Analytics Workspace. + Interface to send logs to CSV file(s). Not every audit log has every possible column, so columns in the CSV + file might change over time. Therefore, the CSV file is recreated each time the cache_size is hit to insure + integrity, taking the performance hit. """ super().__init__(**kwargs) self.path = path self.paths = {} self.separate_by_content_type = separate_by_content_type self.separator = separator + self.cache_size = cache_size + self.results_cache = collections.defaultdict(collections.deque) - def _send_message(self, msg, content_type, **kwargs): + @property + def total_cache_length(self): + + return sum([len(self.results_cache[k]) for k in self.results_cache.keys()]) + + def _path_for(self, content_type): if content_type not in self.paths: self.paths[content_type] = "{}_{}.csv".format(self.path, content_type.replace('.', '')) \ if self.separate_by_content_type else self.path - df = pandas.json_normalize(msg) - df.to_csv(self.paths[content_type], index=False, sep=self.separator, mode='a', - header=not os.path.exists(self.paths[content_type])) + return self.paths[content_type] + + def _send_message(self, msg, content_type, **kwargs): + + self.results_cache[content_type].append(msg) + if self.total_cache_length >= self.cache_size: + self._process_caches() + + def exit_callback(self): + + self._process_caches() + + def _process_caches(self): + + for content_type in self.results_cache.keys(): + self._process_cache(content_type=content_type) + def _process_cache(self, content_type): + amount = len(self.results_cache[content_type]) + try: + df = pandas.DataFrame(self.results_cache[content_type]) + self.results_cache[content_type].clear() + if os.path.exists(self._path_for(content_type=content_type)): + existing_df = pandas.read_csv(self._path_for(content_type=content_type), sep=self.separator) + df = pandas.concat([existing_df, df]) + logging.info("Writing {} logs of type {} to {}".format(amount, content_type, self._path_for(content_type))) + df.to_csv(self._path_for(content_type=content_type), index=False, sep=self.separator, mode='w', + header=not os.path.exists(self.paths[content_type])) + except Exception as e: + self.unsuccessfully_sent += amount + raise e + else: + self.successfully_sent += amount \ No newline at end of file diff --git a/Source/Interfaces/PRTGInterface.py b/Source/Interfaces/PRTGInterface.py index 463b488..7d3156e 100644 --- a/Source/Interfaces/PRTGInterface.py +++ b/Source/Interfaces/PRTGInterface.py @@ -43,3 +43,8 @@ def output(self): csr.error = "Python Script execution error: %s" % str(e) print(csr.json_result) + def exit_callback(self): + + super().exit_callback() + self.output() + diff --git a/Source/Interfaces/SqlInterface.py b/Source/Interfaces/SqlInterface.py new file mode 100644 index 0000000..f9350ce --- /dev/null +++ b/Source/Interfaces/SqlInterface.py @@ -0,0 +1,221 @@ +from . import _Interface +from sqlalchemy import create_engine, inspect +import time +import urllib +import logging +import threading +import collections +import pandas + + +class SqlInterface(_Interface.Interface): + + def __init__(self, sql_connection_string, cache_size=500000, chunk_size=2000, **kwargs): + """ + Interface to send logs to an SQL database. Caches logs in memory until the cache size is hit, then writes them + to database. When the cache is too small too many SQL writes are made taking ages to complete. Too + large and the collector will eat too much memory. + """ + super().__init__(**kwargs) + self._connection_string = sql_connection_string + self.cache_size = cache_size + self.chunk_size = chunk_size + self.results_cache = collections.defaultdict(collections.deque) + self._existing_columns = {} + self._threads = collections.deque() + self._engine = None + + @property + def engine(self): + """ + DB Engine for use in main thread. A separate one is creation for each sub thread. + :return: sqlalchemy.Engine + """ + if not self._engine: + self._engine = create_engine(self.connection_string) + return self._engine + + @staticmethod + def _table_name_for(content_type): + """ + Create a table name for a content type (remove periods). + :param content_type: str + :return: str + """ + return content_type.replace('.', '') + + def _existing_columns_for(self, content_type, engine): + """ + Cache columns currently existing for a table. Used to check if new incoming logs have columns that currently + don't exist in the database. + :param content_type: str + :return: list of str + """ + if content_type not in self._existing_columns.keys(): + self._existing_columns[content_type] = \ + pandas.read_sql_query(f"SELECT TOP (1) * FROM {self._table_name_for(content_type)};", + con=engine).columns.tolist() + return self._existing_columns[content_type] + + @property + def total_cache_length(self): + + return sum([len(self.results_cache[k]) for k in self.results_cache.keys()]) + + @property + def connection_string(self): + + params = urllib.parse.quote_plus(self._connection_string) + return 'mssql+pyodbc:///?autocommit=true&odbc_connect={}'.format(params) + + @staticmethod + def _validate_column_names(df): + """ + Audit logs tend to have periods (.) in their column names. Take those out. If a log column has the same name + as an existing column in the database, but the capitalization doesn't match, rename the column to the existing + one. Otherwise SQL will throw an error for duplicate column names. + :param df: pandas.DataFrame. + :return: pandas.DataFrame. + """ + to_rename = {} + for column in df: + if '.' in column: + to_rename[column] = column.replace('.', '') + return df.rename(columns=to_rename) + + @staticmethod + def _validate_column_value(df): + """ + Flatten columns that a list as value. E.g. column "ColumnA: [1,2,3]" becomes: + "ColumnA_0: 1, ColumnA_1: 2, ColumnA_2: 3". + Logs are processed individually as they come in, so only the first value of a column needs to be checked. + :param df: pandas.DataFrame. + :return: pandas.DataFrame + """ + for column in df.columns.tolist(): + for i, value in enumerate(df[column]): + if type(df[column][i]) in [list, dict]: + df[column][i] = str(df[column][i]) + return df + + def _validate_existing_columns(self, df, content_type, engine): + """ + Not all audit logs have all available columns. There columns in the database might change as logs come in. + Check whether all columns in a log already exist in the current table. + :return: Bool + """ + if inspect(engine).has_table(self._table_name_for(content_type=content_type)): + new_cols = df.columns.tolist() + missing_cols = set(new_cols) - set(self._existing_columns_for(content_type, engine=engine)) + return not missing_cols + return True + + @staticmethod + def _deduplicate_columns(df): + """ + Different logs sometimes have identical columns names but with different capitalization (for some reason); + merge these columns. + :param df: + :return: + """ + to_check = df.columns.tolist() + leading_columns = [] + to_merge = collections.defaultdict(collections.deque) + for column in to_check: + for leading_column in leading_columns: + if column.lower() == leading_column.lower() and column != leading_column: + to_merge[leading_column].append(column) + break + else: + leading_columns.append(column) + for leading_column, columns_to_merge in to_merge.items(): + new_column = df[leading_column] + for column_to_merge in columns_to_merge: + new_column = new_column.combine_first(df[column_to_merge]) + del df[column_to_merge] + del df[leading_column] + df[leading_column] = new_column + return df + + def _remake_table(self, new_data, content_type, engine): + """ + If a new log is coming in that has columns that don't exist in the current table, replace it instead of + appending. + :param new_data: pandas.DataFrame + :param content_type: str + """ + table_name = self._table_name_for(content_type=content_type) + existing_table = pandas.read_sql_table(con=engine, table_name=table_name) + df = pandas.concat([new_data, existing_table]) + self._existing_columns[content_type] = df.columns.tolist() + logging.info("Recommitting {} records of type {} to table {}".format( + len(df), content_type, table_name)) + df = df.loc[:, ~df.columns.duplicated()] # Remove any duplicate columns + df = self._deduplicate_columns(df=df) + df.to_sql(name=table_name, con=engine, index=False, if_exists='replace', + chunksize=int(self.chunk_size / len(df.columns)), method='multi') + + def _send_message(self, msg, content_type, **kwargs): + """ + Write logs to cache. Process cache if cache size is hit. + :param msg: JSON + :param content_type: str + """ + self.results_cache[content_type].append(msg) + if self.total_cache_length >= self.cache_size: + self._wait_threads() + self._threads.clear() + self._process_caches() + + def _process_caches(self): + """ + Write all cached logs to database. + """ + for content_type in self.results_cache.copy().keys(): + if not self.results_cache[content_type]: + continue + thread = threading.Thread(target=self._process_cache, kwargs={'content_type': content_type}, daemon=True) + thread.start() + self._threads.append(thread) + + def _wait_threads(self, timeout=600): + + while True in [thread.is_alive() for thread in self._threads]: + if not timeout: + raise RuntimeError("Timeout while committing to database") + timeout -= 1 + time.sleep(1) + + def _process_cache(self, content_type): + """ + Write cached logs to database for a content type. + :param content_type: str + """ + df = pandas.DataFrame(self.results_cache[content_type]) + df = self._validate_column_names(df=df) + df = self._validate_column_value(df=df) + + table_name = self._table_name_for(content_type=content_type) + engine = create_engine(self.connection_string) + with engine.connect(): + try: + if not self._validate_existing_columns(df=df, content_type=content_type, engine=engine): + self._remake_table(new_data=df, content_type=content_type, engine=engine) + else: + logging.info("Committing {} records of type {} to table {}".format( + len(df), content_type, table_name)) + df = df.loc[:, ~df.columns.duplicated()] # Remove any duplicate columns + df = self._deduplicate_columns(df=df) + df.to_sql(name=table_name, con=engine, index=False, if_exists='append', + chunksize=int(self.chunk_size / len(df.columns)), method='multi') + except Exception as e: + self.unsuccessfully_sent += len(df) + raise e + else: + self.successfully_sent += len(df) + + def exit_callback(self): + + super().exit_callback() + self._process_caches() + self._wait_threads() diff --git a/Source/Interfaces/_Interface.py b/Source/Interfaces/_Interface.py index 3c3d687..1789723 100644 --- a/Source/Interfaces/_Interface.py +++ b/Source/Interfaces/_Interface.py @@ -38,10 +38,16 @@ def monitor_queue(self): if self.queue: msg, content_type = self.queue.popleft() if msg == 'stop monitor thread': - return + return self.exit_callback() else: self._send_message(msg=msg, content_type=content_type) + def exit_callback(self): + """ + Called right before the interface is stopped. + """ + pass + def send_messages(self, *messages, content_type): """ Send message(s) to this interface. They will be handled asynchronously. From bd16298f216b59ed24eab5e129ea643473bbf9f0 Mon Sep 17 00:00:00 2001 From: ddbnl Date: Tue, 19 Apr 2022 11:31:46 +0200 Subject: [PATCH 3/3] Fix import error --- Source/Interfaces/SqlInterface.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Source/Interfaces/SqlInterface.py b/Source/Interfaces/SqlInterface.py index f9350ce..0b99d3f 100644 --- a/Source/Interfaces/SqlInterface.py +++ b/Source/Interfaces/SqlInterface.py @@ -1,5 +1,6 @@ from . import _Interface from sqlalchemy import create_engine, inspect +import pyodbc import time import urllib import logging