Skip to content

Commit a82317e

Browse files
whdalsrntImMin5
authored andcommitted
refactor: add a timeout settings to grpc client
Signed-off-by: Jongmin Kim <whdalsrnt@megazone.com>
1 parent d53edb4 commit a82317e

File tree

3 files changed

+56
-8
lines changed

3 files changed

+56
-8
lines changed

src/spaceone/core/connector/space_connector.py

+3
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ def __init__(
2525
endpoint: str = None,
2626
token: str = None,
2727
return_type: str = "dict",
28+
timeout: int = None,
2829
**kwargs,
2930
):
3031
super().__init__(*args, **kwargs)
3132
self._service = service
3233
self._endpoint = endpoint
3334
self._token = token
3435
self._return_type = return_type
36+
self._timeout = timeout
3537

3638
self._client = None
3739
self._endpoints: dict = self.config.get("endpoints", {})
@@ -95,6 +97,7 @@ def _init_client(self) -> None:
9597
endpoint=e["endpoint"],
9698
ssl_enabled=e["ssl_enabled"],
9799
max_message_length=1024 * 1024 * 256,
100+
timeout=self._timeout,
98101
)
99102

100103
@staticmethod

src/spaceone/core/error.py

+4
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ class ERROR_GRPC_CONNECTION(ERROR_UNAVAILAVBLE):
250250
_message = "Server is unavailable. (channel = {channel}, message = {message})"
251251

252252

253+
class ERROR_GRPC_TIMEOUT(ERROR_GRPC_CONNECTION):
254+
_message = "gRPC Timeout."
255+
256+
253257
class ERROR_GRPC_TLS_HANDSHAKE(ERROR_GRPC_CONNECTION):
254258
_message = "TLS handshake failed. (reason = {reason})"
255259

src/spaceone/core/pygrpc/client.py

+49-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import types
33
import grpc
4+
from grpc import ClientCallDetails
45
from google.protobuf.json_format import ParseDict
56
from google.protobuf.message_factory import MessageFactory # , GetMessageClass
67
from google.protobuf.descriptor_pool import DescriptorPool
@@ -15,16 +16,28 @@
1516
_LOGGER = logging.getLogger(__name__)
1617

1718

19+
class _ClientCallDetails(ClientCallDetails):
20+
def __init__(self, method, timeout, metadata, credentials, wait_for_ready):
21+
self.method = method
22+
self.timeout = timeout
23+
self.metadata = metadata
24+
self.credentials = credentials
25+
self.wait_for_ready = wait_for_ready
26+
27+
1828
class _ClientInterceptor(
1929
grpc.UnaryUnaryClientInterceptor,
2030
grpc.UnaryStreamClientInterceptor,
2131
grpc.StreamUnaryClientInterceptor,
2232
grpc.StreamStreamClientInterceptor,
2333
):
24-
def __init__(self, options: dict, channel_key: str, request_map: dict):
34+
def __init__(
35+
self, options: dict, channel_key: str, request_map: dict, timeout: int = None
36+
):
2537
self._request_map = request_map
2638
self._channel_key = channel_key
2739
self.metadata = options.get("metadata", {})
40+
self.timeout = timeout or 60
2841

2942
def _check_message(self, client_call_details, request_or_iterator, is_stream):
3043
if client_call_details.method in self._request_map:
@@ -123,18 +136,27 @@ def _retry_call(
123136
return response_or_iterator
124137

125138
except Exception as e:
126-
if e.error_code == "ERROR_GRPC_CONNECTION":
139+
if not isinstance(e, ERROR_BASE):
140+
e = ERROR_UNKNOWN(message=str(e))
141+
142+
if (
143+
e.error_code == "ERROR_GRPC_CONNECTION"
144+
or e.status_code == "DEADLINE_EXCEEDED"
145+
):
127146
if retries >= _MAX_RETRIES:
128147
channel = e.meta.get("channel")
129148
if channel in _GRPC_CHANNEL:
130149
_LOGGER.error(
131150
f"Disconnect gRPC Endpoint. (channel = {channel})"
132151
)
133152
del _GRPC_CHANNEL[channel]
153+
154+
if e.status_code == "DEADLINE_EXCEEDED":
155+
raise ERROR_GRPC_TIMEOUT()
134156
raise e
135157
else:
136158
_LOGGER.debug(
137-
f"Retry gRPC Call: reason = {e.message}, retry = {retries + 1}"
159+
f"Retry gRPC Call: method = {client_call_details.method}, reason = {e.message}, retry = {retries + 1}"
138160
)
139161
else:
140162
raise e
@@ -160,9 +182,20 @@ def _intercept_call(
160182
is_response_stream,
161183
)
162184

185+
def _create_new_call_details(self, client_call_details):
186+
return _ClientCallDetails(
187+
method=client_call_details.method,
188+
timeout=self.timeout,
189+
metadata=client_call_details.metadata,
190+
credentials=client_call_details.credentials,
191+
wait_for_ready=client_call_details.wait_for_ready,
192+
)
193+
163194
def intercept_unary_unary(self, continuation, client_call_details, request):
195+
new_call_details = self._create_new_call_details(client_call_details)
196+
164197
return self._intercept_call(
165-
continuation, client_call_details, request, False, False
198+
continuation, new_call_details, request, False, False
166199
)
167200

168201
def intercept_unary_stream(self, continuation, client_call_details, request):
@@ -263,7 +296,7 @@ def _bind_grpc_method(
263296

264297

265298
class GRPCClient(object):
266-
def __init__(self, channel, options, channel_key):
299+
def __init__(self, channel, options, channel_key, timeout=None):
267300
self._request_map = {}
268301
self._api_resources = {}
269302

@@ -272,7 +305,7 @@ def __init__(self, channel, options, channel_key):
272305
self._init_grpc_reflection()
273306

274307
_client_interceptor = _ClientInterceptor(
275-
options, channel_key, self._request_map
308+
options, channel_key, self._request_map, timeout
276309
)
277310
_intercept_channel = grpc.intercept_channel(channel, _client_interceptor)
278311
self._bind_grpc_stub(_intercept_channel)
@@ -326,7 +359,13 @@ def _create_insecure_channel(endpoint, options):
326359
return grpc.insecure_channel(endpoint, options=options)
327360

328361

329-
def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_opts):
362+
def client(
363+
endpoint=None,
364+
ssl_enabled=False,
365+
max_message_length=None,
366+
timeout=None,
367+
**client_opts,
368+
):
330369
if endpoint is None:
331370
raise Exception("Client's endpoint is undefined.")
332371

@@ -350,7 +389,9 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
350389
)
351390

352391
try:
353-
_GRPC_CHANNEL[endpoint] = GRPCClient(channel, client_opts, endpoint)
392+
_GRPC_CHANNEL[endpoint] = GRPCClient(
393+
channel, client_opts, endpoint, timeout
394+
)
354395
except Exception as e:
355396
if hasattr(e, "details"):
356397
raise ERROR_GRPC_CONNECTION(channel=endpoint, message=e.details())

0 commit comments

Comments
 (0)