@@ -48,43 +48,48 @@ def __init__(self, command: List[str], output_path: str):
48
48
self .output_path = output_path
49
49
self .output_lines = queue .Queue ()
50
50
self .process = None
51
- self .io_thread = None
51
+ self .stdout_thread = None
52
+ self .stderr_thread = None
53
+ self .lock = threading .Lock ()
52
54
self .done = False
53
55
54
- def _io_thread (self ):
55
- """Reads process lines and writes them to an output file.
56
+ def _write_to_file (self , line : str , is_error_line = False ):
57
+ """Writes a line to an output file in a thread-safe manner.
58
+
59
+ Opens file in append-text mode ('at') with line buffering
60
+ for immediate output to ensure all writes are appended.
61
+ """
62
+ with self .lock :
63
+ with open (self .output_path , "at" , buffering = 1 ) as f :
64
+ if is_error_line :
65
+ f .write (f"!!STDERR!! : { line } " )
66
+ else :
67
+ f .write (line )
68
+ f .flush ()
69
+
70
+ def _stdout_thread (self ):
71
+ """Reads stdout process lines and writes them to an output file.
56
72
57
73
It also sends the output lines to `self.output_lines` for later
58
- reading
74
+ reading.
59
75
"""
60
- out_wait = select .poll ()
61
- out_wait .register (self .process .stdout , select .POLLIN | select .POLLHUP )
62
-
63
- err_wait = select .poll ()
64
- err_wait .register (self .process .stderr , select .POLLIN | select .POLLHUP )
65
-
66
- with open (self .output_path , "wt" , buffering = 1 ) as f : # Enable line buffering for immediate output
67
- f .write ("PROCESS START: %s\n " % time .ctime ())
68
- f .flush ()
69
- while not self .done :
70
- out_changes = out_wait .poll (0.1 )
71
- if self .process .stdout .fileno () in [fd for (fd , _ ) in out_changes ]:
72
- while True :
73
- out_line = self .process .stdout .readline ()
74
- if not out_line :
75
- break
76
- f .write (out_line )
77
- f .flush ()
78
- self .output_lines .put (out_line )
79
-
80
- err_changes = err_wait .poll (0 )
81
- if self .process .stderr .fileno () in [fd for (fd , _ ) in err_changes ]:
82
- err_line = self .process .stderr .readline ()
83
- if err_line :
84
- f .write (f"!!STDERR!! : { err_line } " )
85
- f .flush ()
86
- f .write ("PROCESS END: %s\n " % time .ctime ())
87
- f .flush ()
76
+ while not self .done :
77
+ out_line = self .process .stdout .readline ()
78
+ if not out_line :
79
+ break
80
+ self ._write_to_file (out_line )
81
+ self .output_lines .put (out_line )
82
+
83
+ def _stderr_thread (self ):
84
+ """Reads stderr process lines and writes them to an output file.
85
+
86
+ The lines are marked as error lines when written to the file.
87
+ """
88
+ while not self .done :
89
+ err_line = self .process .stderr .readline ()
90
+ if not err_line :
91
+ break
92
+ self ._write_to_file (err_line , is_error_line = True )
88
93
89
94
def __enter__ (self ):
90
95
self .done = False
@@ -96,8 +101,10 @@ def __enter__(self):
96
101
text = True ,
97
102
bufsize = 1 , # Enable line buffering for immediate output from subprocess
98
103
)
99
- self .io_thread = threading .Thread (target = self ._io_thread )
100
- self .io_thread .start ()
104
+ self .stdout_thread = threading .Thread (target = self ._stdout_thread )
105
+ self .stderr_thread = threading .Thread (target = self ._stderr_thread )
106
+ self .stdout_thread .start ()
107
+ self .stderr_thread .start ()
101
108
return self
102
109
103
110
def __exit__ (self , exception_type , exception_value , traceback ):
@@ -106,8 +113,11 @@ def __exit__(self, exception_type, exception_value, traceback):
106
113
self .process .terminate ()
107
114
self .process .wait ()
108
115
109
- if self .io_thread :
110
- self .io_thread .join ()
116
+ if self .stdout_thread :
117
+ self .stdout_thread .join ()
118
+
119
+ if self .stderr_thread :
120
+ self .stderr_thread .join ()
111
121
112
122
if exception_value :
113
123
# When we fail because of an exception, report the entire log content
0 commit comments