13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
16
+ import logging
17
+ import re
18
+ import select
16
19
import subprocess
17
20
import time
18
21
from dataclasses import dataclass
24
27
25
28
_KEEP_ALIVE_TIMEOUT_IN_SECONDS = 120
26
29
_MAX_MESSAGE_SIZE_IN_BYTES = 10485760 # 10 MB
30
+ _CONNECT_MAX_RETRIES_DEFAULT = 4
31
+ _WEBSOCKET_SERVER_MESSAGE = '== WebSocket Server Ready'
32
+ _WEBSOCKET_SERVER_MESSAGE_TIMEOUT = 60 # seconds
33
+ _WEBSOCKET_SERVER_TERMINATE_TIMEOUT = 10 # seconds
27
34
28
35
29
36
@dataclass
@@ -54,7 +61,7 @@ def is_connected(self) -> bool:
54
61
return self ._client .state == websockets .protocol .State .OPEN
55
62
56
63
async def start (self ):
57
- self ._server = await self ._start_server (self ._server_startup_command )
64
+ self ._server = await self ._start_server (self ._server_startup_command , self . _server_connection_url )
58
65
self ._client = await self ._start_client (self ._server_connection_url )
59
66
60
67
async def stop (self ):
@@ -70,7 +77,7 @@ async def execute(self, request):
70
77
return await instance .recv ()
71
78
return None
72
79
73
- async def _start_client (self , url , max_retries = 5 , interval_between_retries = 1 ):
80
+ async def _start_client (self , url , max_retries = _CONNECT_MAX_RETRIES_DEFAULT , interval_between_retries = 1 ):
74
81
if max_retries :
75
82
start = time .time ()
76
83
try :
@@ -93,15 +100,48 @@ async def _stop_client(self, instance):
93
100
if instance :
94
101
await instance .close ()
95
102
96
- async def _start_server (self , command ):
103
+ async def _start_server (self , command , url ):
97
104
instance = None
98
105
if command :
99
- instance = subprocess .Popen (command , stdout = subprocess .DEVNULL )
106
+ start_time = time .time ()
107
+
108
+ command = ['stdbuf' , '-o0' , '-e0' ] + command # disable buffering
109
+ instance = subprocess .Popen (
110
+ command , text = False , bufsize = 0 , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
111
+
112
+ # Loop to read the subprocess output with a timeout
113
+ lines = []
114
+ while True :
115
+ if time .time () - start_time > _WEBSOCKET_SERVER_MESSAGE_TIMEOUT :
116
+ for line in lines :
117
+ print (line .decode ('utf-8' ), end = '' )
118
+ self ._hooks .abort (url )
119
+ await self ._stop_server (instance )
120
+ raise Exception (
121
+ f'Connecting to { url } failed. WebSocket startup has not been detected.' )
122
+
123
+ ready , _ , _ = select .select ([instance .stdout ], [], [], 1 )
124
+ if ready :
125
+ line = instance .stdout .readline ()
126
+ if len (line ):
127
+ lines .append (line )
128
+ if re .search (_WEBSOCKET_SERVER_MESSAGE , line .decode ('utf-8' )):
129
+ break # Exit the loop if the pattern is found
130
+ else :
131
+ continue
132
+ instance .stdout .close ()
133
+
100
134
return instance
101
135
102
136
async def _stop_server (self , instance ):
103
137
if instance :
104
- instance .kill ()
138
+ instance .terminate () # sends SIGTERM
139
+ try :
140
+ instance .wait (_WEBSOCKET_SERVER_TERMINATE_TIMEOUT )
141
+ except subprocess .TimeoutExpired :
142
+ logging .debug (
143
+ 'Subprocess did not terminate on SIGTERM, killing it now' )
144
+ instance .kill ()
105
145
106
146
def _make_server_connection_url (self , address : str , port : int ):
107
147
return 'ws://' + address + ':' + str (port )
0 commit comments