Skip to content

Commit fb02f85

Browse files
committed
Introduce small delay to avoid race between publisher and subscriber
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent 3f6e9a0 commit fb02f85

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

csp/tests/adapters/test_kafka.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,18 @@ def curtime(x: ts[object]) -> ts[datetime]:
127127
return csp.now()
128128

129129
def graph(symbols: list, count: int):
130-
b = csp.merge(
131-
csp.timer(timedelta(seconds=0.2), True),
132-
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
130+
delay = timedelta(seconds=1)
131+
b = csp.delay(
132+
csp.merge(
133+
csp.timer(timedelta(seconds=0.2), True),
134+
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
135+
),
136+
delay=delay,
133137
)
134-
i = csp.count(csp.timer(timedelta(seconds=0.15)))
135-
d = csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0
136-
s = csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING"))
138+
139+
i = csp.delay(csp.count(csp.timer(timedelta(seconds=0.15))), delay=delay)
140+
d = csp.delay(csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0, delay=delay)
141+
s = csp.delay(csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING")), delay=delay)
137142
dt = curtime(b)
138143
struct = MyData.collectts(b=b, i=i, d=d, s=s, dt=dt)
139144

@@ -157,18 +162,13 @@ def graph(symbols: list, count: int):
157162
)
158163
csp.add_graph_output(f"pall_{symbol}", pub_data)
159164

160-
# csp.print('status', kafkaadapter.status())
161-
162165
sub_data = kafkaadapter.subscribe(
163166
ts_type=SubData,
164167
msg_mapper=msg_mapper,
165168
topic=topic,
166169
key=symbol,
167170
push_mode=csp.PushMode.NON_COLLAPSING,
168171
)
169-
170-
sub_data = csp.firstN(sub_data, count)
171-
172172
csp.add_graph_output(f"sall_{symbol}", sub_data)
173173

174174
done_flag = csp.count(sub_data) == count
@@ -190,8 +190,12 @@ def graph(symbols: list, count: int):
190190
pub = results[f"pall_{symbol}"]
191191
sub = results[f"sall_{symbol}"]
192192

193+
# limit by the last `count`
194+
sub = sub[-1*count:]
195+
pub = pub[-1*count:]
196+
193197
assert len(sub) == count
194-
assert [v[1] for v in sub] == [v[1] for v in pub[:count]]
198+
assert [v[1] for v in sub] == [v[1] for v in pub[-1*count:]]
195199

196200
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
197201
def test_start_offsets(self, kafkaadapter, kafkabroker):

0 commit comments

Comments
 (0)