diff --git a/src/duct.py b/src/duct.py index 15986d50..9b5d442d 100755 --- a/src/duct.py +++ b/src/duct.py @@ -3,7 +3,6 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime -import errno import json import os import pprint @@ -239,39 +238,22 @@ def start(self): def _redirect_output(self): with os.fdopen(self.listener_fd, "rb", buffering=0) as stream: while True: - try: - data = stream.read(1024) - except OSError as e: - if e.errno == errno.EIO: # The file has been closed - break - else: - raise + data = stream.read(1024) if not data: # Still open, but no new data to write break + # sys.stdout = codecs.getwriter('utf8')(sys.stdout.buffer) sys.stdout.buffer.write(data) sys.stdout.buffer.flush() - self.file.write( - data.decode("utf-8", "replace") - ) # Handling decoding errors - self.file.flush() def close(self): """Close the slave fd and the file when done.""" os.close(self.writer_fd) + os.close(self.listener_fd) self.file.close() -def monitor_process( - stdout, stderr, report, process, report_interval, sample_interval, output_prefix -): +def monitor_process(report, process, report_interval, sample_interval, output_prefix): while True: - if process.poll() is not None: # the passthrough command has finished - if hasattr(stdout, "close"): - stdout.close() - if hasattr(stderr, "close"): - stderr.close() - break - elapsed_time = time.time() - report.start_time resource_stats_log_path = "{output_prefix}usage.json" if elapsed_time >= (report.number + 1) * report_interval: @@ -286,6 +268,8 @@ def monitor_process( pinfo["elapsed_time"] = elapsed_time resource_statistics_log.write(json.dumps(aggregated)) report.number += 1 + if process.poll() is not None: # the passthrough command has finished + break time.sleep(sample_interval) @@ -315,6 +299,17 @@ def prepare_outputs( stderr = subprocess.PIPE else: stderr = subprocess.DEVNULL + + print( + f""" +REMINDERS: +devnull = {subprocess.DEVNULL}, +pipe = {subprocess.PIPE}""" + ) + print(f"USING TYPE STDOUT: {type(stdout)}") + print(f"USING STDOUT: {str(stdout)}") + print(f"USING TYPE STDERR: {type(stderr)}") + print(f"USING STDerr: {str(stderr)}") return stdout, stderr @@ -355,13 +350,8 @@ def main(): ) session_id = os.getsid(process.pid) # Get session ID of the new process report = Report(args.command, session_id) - report.collect_environment() - report.get_system_info() - if args.record_types in ["all", "processes-samples"]: monitoring_args = [ - stdout, - stderr, report, process, args.report_interval, @@ -374,13 +364,24 @@ def main(): monitoring_thread.start() monitoring_thread.join() + outs, err = process.communicate() + print(outs.decode("utf-8", "replace")) + # process.wait() + if args.record_types in ["all", "system-summary"]: + report.collect_environment() + report.get_system_info() system_info_path = f"{formatted_output_prefix}info.json" with open(system_info_path, "a") as system_logs: report.end_time = time.time() report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() system_logs.write(str(report)) + + if hasattr(stdout, "close"): + stdout.close() + if hasattr(stderr, "close"): + stderr.close() pprint.pprint(report, width=120)