Skip to content

Commit

Permalink
Thread safety and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
asmacdo committed May 5, 2024
1 parent 92a153e commit 153ef0b
Show file tree
Hide file tree
Showing 7 changed files with 11,214 additions and 31 deletions.
69 changes: 38 additions & 31 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,41 +215,46 @@ def create_and_parse_args():
class TeeStream:
"""TeeStream simultaneously streams to standard output (stdout) and a specified file."""

listener_fd: int
writer_fd: int
write_fd: int
read_fd: int
file: TextIO

def __init__(self, file_path: str) -> None:
CHUNK_SIZE = 1024

def __init__(self, file_path: str, stop_event: threading.Event):
self.stop_event = stop_event
self.file = open(file_path, "w")
(
self.listener_fd,
self.writer_fd,
) = os.openpty() # Use pseudo-terminal to simulate terminal behavior
self.read_fd, self.write_fd = os.openpty()

def fileno(self) -> int:
def fileno(self):
"""Return the file descriptor to be used by subprocess as stdout/stderr."""
return self.listener_fd
return self.write_fd

def start(self):
"""Start a thread to read from the main_fd and write to stdout and the file."""
thread = threading.Thread(target=self._redirect_output, daemon=True)
thread.start()
self.thread = threading.Thread(target=self._redirect_output, daemon=True)
self.thread.start()

def _redirect_output(self):
with os.fdopen(self.listener_fd, "rb", buffering=0) as stream:
while True:
data = stream.read(1024)
if not data: # Still open, but no new data to write
break
# sys.stdout = codecs.getwriter('utf8')(sys.stdout.buffer)
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
try:
with os.fdopen(self.read_fd, "rb", buffering=0) as stream:
while not self.stop_event.is_set():
chunk = stream.read(self.CHUNK_SIZE)
if not chunk:
break
sys.stdout.buffer.write(chunk)
sys.stdout.buffer.flush()
self.file.write(chunk.decode())
self.file.flush()
except Exception as e:
print(f"DEBUG: Error in _redirect_output: {e}", file=sys.stderr)

def close(self):
"""Close the slave fd and the file when done."""
os.close(self.writer_fd)
os.close(self.listener_fd)
self.file.close()
os.close(self.write_fd) # Close write end of the pipe first
if self.file and not self.file.closed:
self.file.close()
self.stop_event.set()
self.thread.join()


def monitor_process(report, process, report_interval, sample_interval, output_prefix):
Expand Down Expand Up @@ -281,7 +286,8 @@ def prepare_outputs(

# Code remains the same
if capture_outputs in ["all", "stdout"] and outputs in ["all", "stdout"]:
stdout = TeeStream(f"{output_prefix}stdout")
out_stop_event = threading.Event()
stdout = TeeStream(f"{output_prefix}stdout", out_stop_event)
stdout.start() # type: ignore
elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]:
stdout = open(f"{output_prefix}stdout", "w")
Expand All @@ -291,7 +297,8 @@ def prepare_outputs(
stdout = subprocess.DEVNULL

if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]:
stderr = TeeStream(f"{output_prefix}stderr")
err_stop_event = threading.Event()
stderr = TeeStream(f"{output_prefix}stderr", err_stop_event)
stderr.start() # type: ignore
elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]:
stderr = open(f"{output_prefix}stderr", "w")
Expand Down Expand Up @@ -364,8 +371,12 @@ def main():
monitoring_thread.start()
monitoring_thread.join()

outs, err = process.communicate()
print(outs.decode("utf-8", "replace"))
if stdout == subprocess.PIPE:
outs, err = process.communicate()
print(outs.decode("utf-8", "replace"))
elif isinstance(stdout, TeeStream):
process.wait()
stdout.close()
# process.wait()

if args.record_types in ["all", "system-summary"]:
Expand All @@ -378,10 +389,6 @@ def main():
report.get_system_info()
system_logs.write(str(report))

if hasattr(stdout, "close"):
stdout.close()
if hasattr(stderr, "close"):
stderr.close()
pprint.pprint(report, width=120)


Expand Down
Empty file added test/e2e/empty-file
Empty file.
100 changes: 100 additions & 0 deletions test/e2e/hundred-lines
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
10 changes: 10 additions & 0 deletions test/e2e/ten-lines
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0
1
2
3
4
5
6
7
8
9
Loading

0 comments on commit 153ef0b

Please sign in to comment.