-
-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathcircuitbreaker.py
356 lines (293 loc) · 11.8 KB
/
circuitbreaker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
from asyncio import iscoroutinefunction
from datetime import datetime, timedelta, timezone
from functools import wraps
from inspect import isgeneratorfunction, isasyncgenfunction, isclass
from math import ceil, floor
from time import monotonic
from typing import AnyStr, Iterable
STRING_TYPES = (bytes, str)
STATE_CLOSED = 'closed'
STATE_OPEN = 'open'
STATE_HALF_OPEN = 'half_open'
def in_exception_list(*exc_types):
"""Build a predicate function that checks if an exception is a subtype from a list"""
def matches_types(thrown_type, _):
return issubclass(thrown_type, exc_types)
return matches_types
def build_failure_predicate(expected_exception):
""" Build a failure predicate_function.
The returned function has the signature (Type[Exception], Exception) -> bool.
Return value True indicates a failure in the underlying function.
:param expected_exception: either an type of Exception, iterable of Exception types, or a predicate function.
If an Exception type or iterable of Exception types, the failure predicate will return True when a thrown
exception type matches one of the provided types.
If a predicate function, it will just be returned as is.
:return: callable (Type[Exception], Exception) -> bool
"""
if isclass(expected_exception) and issubclass(expected_exception, Exception):
failure_predicate = in_exception_list(expected_exception)
else:
try:
# Check for an iterable of Exception types
iter(expected_exception)
# guard against a surprise later
if isinstance(expected_exception, STRING_TYPES):
raise ValueError("expected_exception cannot be a string. Did you mean name?")
failure_predicate = in_exception_list(*expected_exception)
except TypeError:
# not iterable. guess that it's a predicate function
if not callable(expected_exception) or isclass(expected_exception):
raise ValueError("expected_exception does not look like a predicate")
failure_predicate = expected_exception
return failure_predicate
class CircuitBreaker(object):
FAILURE_THRESHOLD = 5
RECOVERY_TIMEOUT = 30
EXPECTED_EXCEPTION = Exception
FALLBACK_FUNCTION = None
def __init__(self,
failure_threshold=None,
recovery_timeout=None,
expected_exception=None,
name=None,
fallback_function=None
):
"""
Construct a circuit breaker.
:param failure_threshold: break open after this many failures
:param recovery_timeout: close after this many seconds
:param expected_exception: either an type of Exception, iterable of Exception types, or a predicate function.
:param name: name for this circuitbreaker
:param fallback_function: called when the circuit is opened
:return: Circuitbreaker instance
:rtype: Circuitbreaker
"""
self._last_failure = None
self._failure_count = 0
self._failure_threshold = failure_threshold or self.FAILURE_THRESHOLD
self._recovery_timeout = recovery_timeout or self.RECOVERY_TIMEOUT
# Build the failure predicate. In order of precedence, prefer the
# * the constructor argument
# * the subclass attribute EXPECTED_EXCEPTION
# * the CircuitBreaker attribute EXPECTED_EXCEPTION
if not expected_exception:
try:
# Introspect our final type, then grab the value via __dict__ to avoid python Descriptor magic
# in the case where it's a callable function.
expected_exception = type(self).__dict__["EXPECTED_EXCEPTION"]
except KeyError:
expected_exception = CircuitBreaker.EXPECTED_EXCEPTION
self.is_failure = build_failure_predicate(expected_exception)
self._fallback_function = fallback_function or self.FALLBACK_FUNCTION
self._name = name
self._state = STATE_CLOSED
self._opened = monotonic()
def __call__(self, wrapped):
return self.decorate(wrapped)
def __enter__(self):
return None
def __exit__(self, exc_type, exc_value, _traceback):
if exc_type and self.is_failure(exc_type, exc_value):
# exception was raised and is our concern
self._last_failure = exc_value
self.__call_failed()
else:
self.__call_succeeded()
return False # return False to raise exception if any
def decorate(self, function):
"""
Applies the circuit breaker to a function
"""
if self._name is None:
try:
self._name = function.__qualname__
except AttributeError:
self._name = function.__name__
CircuitBreakerMonitor.register(self)
if iscoroutinefunction(function) or isasyncgenfunction(function):
return self._decorate_async(function)
return self._decorate_sync(function)
def _decorate_sync(self, function):
@wraps(function)
def wrapper(*args, **kwargs):
if self.opened:
if self.fallback_function:
return self.fallback_function(*args, **kwargs)
raise CircuitBreakerError(self)
return self.call(function, *args, **kwargs)
@wraps(function)
def gen_wrapper(*args, **kwargs):
if self.opened:
if self.fallback_function:
yield from self.fallback_function(*args, **kwargs)
return
raise CircuitBreakerError(self)
yield from self.call_generator(function, *args, **kwargs)
return gen_wrapper if isgeneratorfunction(function) else wrapper
def _decorate_async(self, function):
@wraps(function)
async def awrapper(*args, **kwargs):
if self.opened:
if self.fallback_function:
return await self.fallback_function(*args, **kwargs)
raise CircuitBreakerError(self)
return await self.call_async(function, *args, **kwargs)
@wraps(function)
async def gen_awrapper(*args, **kwargs):
if self.opened:
if self.fallback_function:
async for el in self.fallback_function(*args, **kwargs):
yield el
return
raise CircuitBreakerError(self)
async for el in self.call_async_generator(function, *args, **kwargs):
yield el
return gen_awrapper if isasyncgenfunction(function) else awrapper
def call(self, func, *args, **kwargs):
"""
Calls the decorated function and applies the circuit breaker
rules on success or failure
:param func: Decorated function
"""
with self:
return func(*args, **kwargs)
def call_generator(self, func, *args, **kwargs):
"""
Calls the decorated generator function and applies the circuit breaker
rules on success or failure
:param func: Decorated generator function
"""
with self:
for el in func(*args, **kwargs):
yield el
async def call_async(self, func, *args, **kwargs):
"""
Calls the decorated async function and applies the circuit breaker
rules on success or failure
:param func: Decorated async function
"""
with self:
return await func(*args, **kwargs)
async def call_async_generator(self, func, *args, **kwargs):
"""
Calls the decorated async generator function and applies the circuit breaker
rules on success or failure
:param func: Decorated async generator function
"""
with self:
async for el in func(*args, **kwargs):
yield el
def __call_succeeded(self):
"""
Close circuit after successful execution and reset failure count
"""
self._state = STATE_CLOSED
self._last_failure = None
self._failure_count = 0
def __call_failed(self):
"""
Count failure and open circuit, if threshold has been reached
"""
self._failure_count += 1
if self._failure_count >= self._failure_threshold:
self._state = STATE_OPEN
self._opened = monotonic()
@property
def state(self):
if self._state == STATE_OPEN and self.open_remaining <= 0:
return STATE_HALF_OPEN
return self._state
@property
def open_until(self):
"""
The approximate datetime when the circuit breaker will try to recover
:return: datetime
"""
return datetime.now(timezone.utc) + timedelta(seconds=self.open_remaining)
@property
def open_remaining(self):
"""
Number of seconds remaining, the circuit breaker stays in OPEN state
:return: int
"""
remain = (self._opened + self._recovery_timeout) - monotonic()
return ceil(remain) if remain > 0 else floor(remain)
@property
def failure_count(self):
return self._failure_count
@property
def closed(self):
return self.state == STATE_CLOSED
@property
def opened(self):
return self.state == STATE_OPEN
@property
def name(self):
return self._name
@property
def last_failure(self):
return self._last_failure
@property
def fallback_function(self):
return self._fallback_function
def __str__(self, *args, **kwargs):
return self._name
class CircuitBreakerError(Exception):
def __init__(self, circuit_breaker, *args, **kwargs):
"""
:param circuit_breaker:
:param args:
:param kwargs:
:return:
"""
super(CircuitBreakerError, self).__init__(*args, **kwargs)
self._circuit_breaker = circuit_breaker
def __str__(self, *args, **kwargs):
return 'Circuit "%s" OPEN until %s (%d failures, %d sec remaining) (last_failure: %r)' % (
self._circuit_breaker.name,
self._circuit_breaker.open_until,
self._circuit_breaker.failure_count,
round(self._circuit_breaker.open_remaining),
self._circuit_breaker.last_failure,
)
class CircuitBreakerMonitor(object):
circuit_breakers = {}
@classmethod
def register(cls, circuit_breaker):
cls.circuit_breakers[circuit_breaker.name] = circuit_breaker
@classmethod
def all_closed(cls) -> bool:
return len(list(cls.get_open())) == 0
@classmethod
def get_circuits(cls) -> Iterable[CircuitBreaker]:
return cls.circuit_breakers.values()
@classmethod
def get(cls, name: AnyStr) -> CircuitBreaker:
return cls.circuit_breakers.get(name)
@classmethod
def get_open(cls) -> Iterable[CircuitBreaker]:
for circuit in cls.get_circuits():
if circuit.opened:
yield circuit
@classmethod
def get_closed(cls) -> Iterable[CircuitBreaker]:
for circuit in cls.get_circuits():
if circuit.closed:
yield circuit
def circuit(failure_threshold=None,
recovery_timeout=None,
expected_exception=None,
name=None,
fallback_function=None,
cls=CircuitBreaker):
# if the decorator is used without parameters, the
# wrapped function is provided as first argument
if callable(failure_threshold):
return cls().decorate(failure_threshold)
else:
return cls(
failure_threshold=failure_threshold,
recovery_timeout=recovery_timeout,
expected_exception=expected_exception,
name=name,
fallback_function=fallback_function)