Skip to content

Commit

Permalink
Fiddling with placement
Browse files Browse the repository at this point in the history
  • Loading branch information
asmacdo committed May 5, 2024
1 parent 9a623b3 commit 92a153e
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
import errno
import json
import os
import pprint
Expand Down Expand Up @@ -239,39 +238,22 @@ def start(self):
def _redirect_output(self):
with os.fdopen(self.listener_fd, "rb", buffering=0) as stream:
while True:
try:
data = stream.read(1024)
except OSError as e:
if e.errno == errno.EIO: # The file has been closed
break
else:
raise
data = stream.read(1024)
if not data: # Still open, but no new data to write
break
# sys.stdout = codecs.getwriter('utf8')(sys.stdout.buffer)
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
self.file.write(
data.decode("utf-8", "replace")
) # Handling decoding errors
self.file.flush()

def close(self):
"""Close the slave fd and the file when done."""
os.close(self.writer_fd)
os.close(self.listener_fd)
self.file.close()


def monitor_process(
stdout, stderr, report, process, report_interval, sample_interval, output_prefix
):
def monitor_process(report, process, report_interval, sample_interval, output_prefix):
while True:
if process.poll() is not None: # the passthrough command has finished
if hasattr(stdout, "close"):
stdout.close()
if hasattr(stderr, "close"):
stderr.close()
break

elapsed_time = time.time() - report.start_time
resource_stats_log_path = "{output_prefix}usage.json"
if elapsed_time >= (report.number + 1) * report_interval:
Expand All @@ -286,6 +268,8 @@ def monitor_process(
pinfo["elapsed_time"] = elapsed_time
resource_statistics_log.write(json.dumps(aggregated))
report.number += 1
if process.poll() is not None: # the passthrough command has finished
break
time.sleep(sample_interval)


Expand Down Expand Up @@ -315,6 +299,17 @@ def prepare_outputs(
stderr = subprocess.PIPE
else:
stderr = subprocess.DEVNULL

print(
f"""
REMINDERS:
devnull = {subprocess.DEVNULL},
pipe = {subprocess.PIPE}"""
)
print(f"USING TYPE STDOUT: {type(stdout)}")
print(f"USING STDOUT: {str(stdout)}")
print(f"USING TYPE STDERR: {type(stderr)}")
print(f"USING STDerr: {str(stderr)}")
return stdout, stderr


Expand Down Expand Up @@ -355,13 +350,8 @@ def main():
)
session_id = os.getsid(process.pid) # Get session ID of the new process
report = Report(args.command, session_id)
report.collect_environment()
report.get_system_info()

if args.record_types in ["all", "processes-samples"]:
monitoring_args = [
stdout,
stderr,
report,
process,
args.report_interval,
Expand All @@ -374,13 +364,24 @@ def main():
monitoring_thread.start()
monitoring_thread.join()

outs, err = process.communicate()
print(outs.decode("utf-8", "replace"))
# process.wait()

if args.record_types in ["all", "system-summary"]:
report.collect_environment()
report.get_system_info()
system_info_path = f"{formatted_output_prefix}info.json"
with open(system_info_path, "a") as system_logs:
report.end_time = time.time()
report.run_time_seconds = f"{report.end_time - report.start_time}"
report.get_system_info()
system_logs.write(str(report))

if hasattr(stdout, "close"):
stdout.close()
if hasattr(stderr, "close"):
stderr.close()
pprint.pprint(report, width=120)


Expand Down

0 comments on commit 92a153e

Please sign in to comment.