Skip to content

Commit e5a7f4d

Browse files
committed
[Fabric-Sync] Do not use stdout with non-blocking asyncio
Wrapping stdout with StreamWriter leads to setting stdout stream to non-blocking. In consequence that parent process and all child processes have stdio in non-blocking mode, which is not a standard setup. This leads to random failures (e.g. Python's print might trow BlockinIOError exception).
1 parent f89d5b9 commit e5a7f4d

File tree

1 file changed

+23
-40
lines changed

1 file changed

+23
-40
lines changed

examples/fabric-admin/scripts/fabric-sync-app.py

+23-40
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,13 @@
2020
import shutil
2121
import signal
2222
import sys
23+
import typing
2324
from argparse import ArgumentParser
2425
from tempfile import TemporaryDirectory
2526

2627

27-
async def asyncio_stdin() -> asyncio.StreamReader:
28-
"""Wrap sys.stdin in an asyncio StreamReader."""
29-
loop = asyncio.get_event_loop()
30-
reader = asyncio.StreamReader()
31-
protocol = asyncio.StreamReaderProtocol(reader)
32-
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
33-
return reader
34-
35-
36-
async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter:
37-
"""Wrap an IO stream in an asyncio StreamWriter."""
38-
loop = asyncio.get_event_loop()
39-
transport, protocol = await loop.connect_write_pipe(
40-
lambda: asyncio.streams.FlowControlMixin(loop=loop),
41-
os.fdopen(file.fileno(), 'wb'))
42-
return asyncio.streams.StreamWriter(transport, protocol, None, loop)
43-
44-
4528
async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
46-
f_out: asyncio.StreamWriter, cb=None):
29+
f_out: typing.BinaryIO, cb=None):
4730
"""Forward f_in to f_out with a prefix attached.
4831
4932
This function can optionally feed received lines to a callback function.
@@ -54,9 +37,9 @@ async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
5437
break
5538
if cb is not None:
5639
cb(line)
57-
f_out.write(prefix)
58-
f_out.write(line)
59-
await f_out.drain()
40+
f_out.buffer.write(prefix)
41+
f_out.buffer.write(line)
42+
f_out.flush()
6043

6144

6245
async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
@@ -72,6 +55,7 @@ async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
7255
data = os.read(fd, 1024)
7356
if data:
7457
f_out.write(data)
58+
await f_out.drain()
7559
if not data:
7660
await asyncio.sleep(0.1)
7761
except BlockingIOError:
@@ -80,13 +64,17 @@ async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
8064

8165
async def forward_stdin(f_out: asyncio.StreamWriter):
8266
"""Forward stdin to f_out."""
83-
reader = await asyncio_stdin()
67+
loop = asyncio.get_event_loop()
68+
reader = asyncio.StreamReader()
69+
protocol = asyncio.StreamReaderProtocol(reader)
70+
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
8471
while True:
8572
line = await reader.readline()
8673
if not line:
8774
# Exit on Ctrl-D (EOF).
8875
sys.exit(0)
8976
f_out.write(line)
77+
await f_out.drain()
9078

9179

9280
class Subprocess:
@@ -109,15 +97,9 @@ async def run(self):
10997
stdout=asyncio.subprocess.PIPE,
11098
stderr=asyncio.subprocess.PIPE)
11199
# Add the stdout and stderr processing to the event loop.
112-
asyncio.create_task(forward_f(
113-
self.tag,
114-
self.p.stderr,
115-
await asyncio_stdout(sys.stderr)))
116-
asyncio.create_task(forward_f(
117-
self.tag,
118-
self.p.stdout,
119-
await asyncio_stdout(sys.stdout),
120-
cb=self._check_output))
100+
asyncio.create_task(forward_f(self.tag, self.p.stderr, sys.stderr))
101+
asyncio.create_task(forward_f(self.tag, self.p.stdout, sys.stdout,
102+
cb=self._check_output))
121103

122104
async def send(self, message: str, expected_output: str = None, timeout: float = None):
123105
"""Send a message to a process and optionally wait for a response."""
@@ -206,14 +188,6 @@ async def main(args):
206188
if pipe and not os.path.exists(pipe):
207189
os.mkfifo(pipe)
208190

209-
def terminate(signum, frame):
210-
admin.terminate()
211-
bridge.terminate()
212-
sys.exit(0)
213-
214-
signal.signal(signal.SIGINT, terminate)
215-
signal.signal(signal.SIGTERM, terminate)
216-
217191
admin, bridge = await asyncio.gather(
218192
run_admin(
219193
args.app_admin,
@@ -235,6 +209,15 @@ def terminate(signum, frame):
235209
passcode=args.passcode,
236210
))
237211

212+
def terminate():
213+
admin.terminate()
214+
bridge.terminate()
215+
sys.exit(0)
216+
217+
loop = asyncio.get_event_loop()
218+
loop.add_signal_handler(signal.SIGINT, terminate)
219+
loop.add_signal_handler(signal.SIGTERM, terminate)
220+
238221
# Wait a bit for apps to start.
239222
await asyncio.sleep(1)
240223

0 commit comments

Comments
 (0)