Skip to content

Commit 8da6a79

Browse files
committed
Introduce timeouts in the UpdateProcessingPipeline
1 parent cff9724 commit 8da6a79

File tree

5 files changed

+72
-18
lines changed

5 files changed

+72
-18
lines changed

.deploy/lambda/lib/JProfByBotStack.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,16 @@ export class JProfByBotStack extends cdk.Stack {
131131
compatibleRuntimes: [lambda.Runtime.JAVA_11],
132132
});
133133

134+
const lambdaWebhookTimeout = cdk.Duration.seconds(30);
134135
const lambdaWebhook = new lambda.Function(this, 'jprof-by-bot-lambda-webhook', {
135136
functionName: 'jprof-by-bot-lambda-webhook',
136137
runtime: lambda.Runtime.JAVA_11,
137138
layers: [
138139
layerLibGL,
139140
layerLibfontconfig,
140141
],
141-
timeout: cdk.Duration.seconds(30),
142+
timeout: lambdaWebhookTimeout,
143+
retryAttempts: 0,
142144
memorySize: 512,
143145
code: lambda.Code.fromAsset('../../launchers/lambda/build/libs/jprof_by_bot-launchers-lambda-all.jar'),
144146
handler: 'by.jprof.telegram.bot.launchers.lambda.JProf',
@@ -157,13 +159,15 @@ export class JProfByBotStack extends cdk.Stack {
157159
'STATE_MACHINE_UNPINS': stateMachineUnpin.stateMachineArn,
158160
'TOKEN_TELEGRAM_BOT': props.telegramToken,
159161
'TOKEN_YOUTUBE_API': props.youtubeToken,
162+
'TIMEOUT': lambdaWebhookTimeout.toMilliseconds().toString(),
160163
},
161164
});
162165

163166
const lambdaDailyUrbanDictionary = new lambda.Function(this, 'jprof-by-bot-lambda-daily-urban-dictionary', {
164167
functionName: 'jprof-by-bot-lambda-daily-urban-dictionary',
165168
runtime: lambda.Runtime.JAVA_11,
166169
timeout: cdk.Duration.seconds(30),
170+
retryAttempts: 0,
167171
memorySize: 512,
168172
code: lambda.Code.fromAsset('../../english/urban-dictionary-daily/build/libs/jprof_by_bot-english-urban-dictionary-daily-all.jar'),
169173
handler: 'by.jprof.telegram.bot.english.urban_dictionary_daily.Handler',

core/src/main/kotlin/by/jprof/telegram/bot/core/UpdateProcessingPipeline.kt

+14-5
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,29 @@ import kotlinx.coroutines.joinAll
66
import kotlinx.coroutines.launch
77
import kotlinx.coroutines.runBlocking
88
import kotlinx.coroutines.supervisorScope
9+
import kotlinx.coroutines.withTimeoutOrNull
910
import org.apache.logging.log4j.LogManager
1011

1112
class UpdateProcessingPipeline(
12-
private val processors: List<UpdateProcessor>
13+
private val processors: List<UpdateProcessor>,
14+
private val timeout: Long,
1315
) {
1416
companion object {
1517
private val logger = LogManager.getLogger(UpdateProcessingPipeline::class.java)!!
1618
}
1719

1820
fun process(update: Update) = runBlocking {
19-
supervisorScope {
20-
processors
21-
.map { launch(exceptionHandler(it)) { it.process(update) } }
22-
.joinAll()
21+
withTimeoutOrNull(timeout) {
22+
supervisorScope {
23+
processors
24+
.map {
25+
launch(exceptionHandler(it)) {
26+
logger.debug("Processing update with ${it::class.simpleName}")
27+
it.process(update)
28+
}
29+
}
30+
.joinAll()
31+
}
2332
}
2433
}
2534

core/src/test/kotlin/by/jprof/telegram/bot/core/UpdateProcessingPipelineTest.kt

+44-11
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,80 @@ package by.jprof.telegram.bot.core
22

33
import dev.inmo.tgbotapi.types.update.abstracts.UnknownUpdate
44
import dev.inmo.tgbotapi.types.update.abstracts.Update
5+
import java.time.Duration
6+
import java.util.concurrent.atomic.AtomicBoolean
7+
import kotlin.random.Random
58
import kotlinx.coroutines.delay
69
import kotlinx.serialization.json.JsonNull
7-
import org.junit.jupiter.api.Assertions
10+
import org.junit.jupiter.api.Assertions.assertFalse
11+
import org.junit.jupiter.api.Assertions.assertTimeout
12+
import org.junit.jupiter.api.Assertions.assertTrue
813
import org.junit.jupiter.api.Test
9-
import java.time.Duration
10-
import java.util.concurrent.atomic.AtomicBoolean
1114

1215
internal class UpdateProcessingPipelineTest {
1316
@Test
1417
fun process() {
1518
val completionFlags = (1..5).map { AtomicBoolean(false) }
1619
val sut = UpdateProcessingPipeline(
17-
completionFlags.mapIndexed { index, atomicBoolean ->
20+
processors = completionFlags.mapIndexed { index, atomicBoolean ->
1821
Delay((index + 1) * 1000L, atomicBoolean)
19-
}
22+
},
23+
timeout = 10_000,
2024
)
2125

22-
Assertions.assertTimeout(Duration.ofMillis(6000)) {
26+
assertTimeout(Duration.ofMillis(6000)) {
2327
sut.process(UnknownUpdate(1L, "", JsonNull))
2428
}
25-
Assertions.assertTrue(completionFlags.all { it.get() })
29+
assertTrue(completionFlags.all { it.get() })
2630
}
2731

2832
@Test
2933
fun processWithBroken() {
3034
val completionFlags = (1..5).map { AtomicBoolean(false) }
3135
val sut = UpdateProcessingPipeline(
32-
completionFlags.mapIndexed { index, atomicBoolean ->
36+
processors = completionFlags.mapIndexed { index, atomicBoolean ->
3337
when (index % 2) {
3438
0 -> Delay((index + 1) * 1000L, atomicBoolean)
3539
else -> Fail()
3640
}
37-
}
41+
},
42+
timeout = 10_000,
3843
)
3944

40-
Assertions.assertTimeout(Duration.ofMillis(6000)) {
45+
assertTimeout(Duration.ofMillis(6000)) {
4146
sut.process(UnknownUpdate(1L, "", JsonNull))
4247
}
4348
completionFlags.forEachIndexed { index, atomicBoolean ->
4449
if (index % 2 == 0) {
45-
Assertions.assertTrue(atomicBoolean.get())
50+
assertTrue(atomicBoolean.get())
51+
}
52+
}
53+
}
54+
55+
@Test
56+
fun processWithTimeout() {
57+
val completionFlags = (1..6).map { AtomicBoolean(false) }
58+
val sut = UpdateProcessingPipeline(
59+
processors = completionFlags.mapIndexed { index, atomicBoolean ->
60+
when (index % 3) {
61+
0 -> Delay(Long.MAX_VALUE, atomicBoolean)
62+
1 -> Delay(Random.nextLong(0, 100), atomicBoolean)
63+
else -> Fail()
64+
}
65+
},
66+
timeout = 1000,
67+
)
68+
69+
assertTimeout(Duration.ofMillis(2000)) {
70+
sut.process(UnknownUpdate(1L, "", JsonNull))
71+
}
72+
completionFlags.forEachIndexed { index, atomicBoolean ->
73+
if (index % 3 == 1) {
74+
assertTrue(atomicBoolean.get())
75+
}
76+
when (index % 3) {
77+
1 -> assertTrue(atomicBoolean.get())
78+
else -> assertFalse(atomicBoolean.get())
4679
}
4780
}
4881
}

launchers/lambda/src/main/kotlin/by/jprof/telegram/bot/launchers/lambda/config/env.kt

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const val TABLE_TIMEZONES = "TABLE_TIMEZONES"
1616
const val TABLE_LANGUAGE_ROOMS = "TABLE_LANGUAGE_ROOMS"
1717
const val TABLE_URBAN_WORDS_OF_THE_DAY = "TABLE_URBAN_WORDS_OF_THE_DAY"
1818
const val STATE_MACHINE_UNPINS = "STATE_MACHINE_UNPINS"
19+
const val TIMEOUT = "TIMEOUT"
1920

2021
val envModule = module {
2122
listOf(
@@ -37,4 +38,8 @@ val envModule = module {
3738
System.getenv(variable)!!
3839
}
3940
}
41+
42+
single(named(TIMEOUT)) {
43+
System.getenv(TIMEOUT)!!.toLong()
44+
}
4045
}

launchers/lambda/src/main/kotlin/by/jprof/telegram/bot/launchers/lambda/config/pipeline.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ import org.koin.dsl.module
2828
@PreviewFeature
2929
val pipelineModule = module {
3030
single {
31-
UpdateProcessingPipeline(getAll())
31+
UpdateProcessingPipeline(
32+
processors = getAll(),
33+
timeout = get<Long>(named(TIMEOUT)) - 1000
34+
)
3235
}
3336

3437
single<UpdateProcessor>(named("JEPUpdateProcessor")) {

0 commit comments

Comments
 (0)