14
14
15
15
import logging
16
16
import queue
17
- import select
18
17
import subprocess
19
18
import threading
20
- import time
21
19
from typing import List
22
20
23
21
@@ -48,43 +46,48 @@ def __init__(self, command: List[str], output_path: str):
48
46
self .output_path = output_path
49
47
self .output_lines = queue .Queue ()
50
48
self .process = None
51
- self .io_thread = None
49
+ self .stdout_thread = None
50
+ self .stderr_thread = None
51
+ self .lock = threading .Lock ()
52
52
self .done = False
53
53
54
- def _io_thread (self ):
55
- """Reads process lines and writes them to an output file.
54
+ def _write_to_file (self , line : str , is_error_line = False ):
55
+ """Writes a line to an output file in a thread-safe manner.
56
+
57
+ Opens file in append-text mode ('at') with line buffering
58
+ for immediate output to ensure all writes are appended.
59
+ """
60
+ with self .lock :
61
+ with open (self .output_path , "at" , buffering = 1 ) as f :
62
+ if is_error_line :
63
+ f .write (f"!!STDERR!! : { line } " )
64
+ else :
65
+ f .write (line )
66
+ f .flush ()
67
+
68
+ def _stdout_thread (self ):
69
+ """Reads stdout process lines and writes them to an output file.
56
70
57
71
It also sends the output lines to `self.output_lines` for later
58
- reading
72
+ reading.
59
73
"""
60
- out_wait = select .poll ()
61
- out_wait .register (self .process .stdout , select .POLLIN | select .POLLHUP )
62
-
63
- err_wait = select .poll ()
64
- err_wait .register (self .process .stderr , select .POLLIN | select .POLLHUP )
65
-
66
- with open (self .output_path , "wt" , buffering = 1 ) as f : # Enable line buffering for immediate output
67
- f .write ("PROCESS START: %s\n " % time .ctime ())
68
- f .flush ()
69
- while not self .done :
70
- out_changes = out_wait .poll (0.1 )
71
- if self .process .stdout .fileno () in [fd for (fd , _ ) in out_changes ]:
72
- while True :
73
- out_line = self .process .stdout .readline ()
74
- if not out_line :
75
- break
76
- f .write (out_line )
77
- f .flush ()
78
- self .output_lines .put (out_line )
79
-
80
- err_changes = err_wait .poll (0 )
81
- if self .process .stderr .fileno () in [fd for (fd , _ ) in err_changes ]:
82
- err_line = self .process .stderr .readline ()
83
- if err_line :
84
- f .write (f"!!STDERR!! : { err_line } " )
85
- f .flush ()
86
- f .write ("PROCESS END: %s\n " % time .ctime ())
87
- f .flush ()
74
+ while not self .done :
75
+ out_line = self .process .stdout .readline ()
76
+ if not out_line :
77
+ break
78
+ self ._write_to_file (out_line )
79
+ self .output_lines .put (out_line )
80
+
81
+ def _stderr_thread (self ):
82
+ """Reads stderr process lines and writes them to an output file.
83
+
84
+ The lines are marked as error lines when written to the file.
85
+ """
86
+ while not self .done :
87
+ err_line = self .process .stderr .readline ()
88
+ if not err_line :
89
+ break
90
+ self ._write_to_file (err_line , is_error_line = True )
88
91
89
92
def __enter__ (self ):
90
93
self .done = False
@@ -96,8 +99,10 @@ def __enter__(self):
96
99
text = True ,
97
100
bufsize = 1 , # Enable line buffering for immediate output from subprocess
98
101
)
99
- self .io_thread = threading .Thread (target = self ._io_thread )
100
- self .io_thread .start ()
102
+ self .stdout_thread = threading .Thread (target = self ._stdout_thread )
103
+ self .stderr_thread = threading .Thread (target = self ._stderr_thread )
104
+ self .stdout_thread .start ()
105
+ self .stderr_thread .start ()
101
106
return self
102
107
103
108
def __exit__ (self , exception_type , exception_value , traceback ):
@@ -106,8 +111,11 @@ def __exit__(self, exception_type, exception_value, traceback):
106
111
self .process .terminate ()
107
112
self .process .wait ()
108
113
109
- if self .io_thread :
110
- self .io_thread .join ()
114
+ if self .stdout_thread :
115
+ self .stdout_thread .join ()
116
+
117
+ if self .stderr_thread :
118
+ self .stderr_thread .join ()
111
119
112
120
if exception_value :
113
121
# When we fail because of an exception, report the entire log content
0 commit comments