Skip to content

Commit 909e365

Browse files
close connection on RequestHandler termination (#74)
1 parent 13fbc76 commit 909e365

File tree

5 files changed

+27
-4
lines changed

5 files changed

+27
-4
lines changed

build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ subprojects {
4141
api 'io.netty:netty-buffer'
4242
api 'io.reactivex.rxjava2:rxjava'
4343
api 'org.jetbrains.kotlin:kotlin-stdlib-jdk7'
44+
api 'org.slf4j:slf4j-api'
45+
4446
compileOnly 'com.google.code.findbugs:jsr305'
4547

46-
testImplementation 'org.slf4j:slf4j-api'
4748
testImplementation 'junit:junit'
4849
testImplementation 'org.mockito:mockito-core'
4950
testImplementation 'org.hamcrest:hamcrest-library'

rsocket-core/gradle/dependency-locks/compileClasspath.lockfile

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.3.50
1010
org.jetbrains.kotlin:kotlin-stdlib:1.3.50
1111
org.jetbrains:annotations:13.0
1212
org.reactivestreams:reactive-streams:1.0.2
13+
org.slf4j:slf4j-api:1.7.28

rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt

+10
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ internal class RSocketResponder(
6161
connection
6262
.onClose()
6363
.subscribe({ completion.complete() }, errorConsumer)
64+
65+
requestHandler
66+
.onClose()
67+
.subscribe({ completion.complete() }, errorConsumer)
6468
}
6569

6670
override fun fireAndForget(payload: Payload): Completable {
@@ -291,7 +295,13 @@ internal class RSocketResponder(
291295

292296
private fun completeOnce(err: Throwable) {
293297
if (completed.compareAndSet(false, true)) {
298+
294299
receiveDisposable.dispose()
300+
301+
connection
302+
.close()
303+
.subscribe({}, errorConsumer)
304+
295305
requestHandler
296306
.close()
297307
.subscribe({}, errorConsumer)

rsocket-core/src/test/kotlin/io/rsocket/kotlin/RSocketResponderTest.kt

+13-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ class RSocketResponderTest {
3939
@get:Rule
4040
val rule = ServerSocketRule()
4141

42+
@Test(timeout = 2000)
43+
fun testRequestHandlerCloseTerminatesRSocket() {
44+
rule.acceptingSocket.close()
45+
.andThen(
46+
rule.rsocket.onClose()
47+
.mergeWith(rule.conn.onClose())
48+
).blockingAwait()
49+
}
50+
4251
@Test(timeout = 2000)
4352
@Throws(Exception::class)
4453
fun testHandleResponseFrameNoError() {
@@ -110,7 +119,7 @@ class RSocketResponderTest {
110119

111120
lateinit var sender: PublishProcessor<Frame>
112121
lateinit var receiver: PublishProcessor<Frame>
113-
private lateinit var conn: LocalDuplexConnection
122+
lateinit var conn: LocalDuplexConnection
114123
lateinit var errors: MutableList<Throwable>
115124
internal lateinit var rsocket: RSocketResponder
116125

@@ -137,9 +146,10 @@ class RSocketResponderTest {
137146
}
138147

139148
fun setAccSocket(acceptingSocket: RSocket) {
149+
val cur = this.acceptingSocket
150+
cur.close().subscribe()
151+
cur.onClose().blockingAwait()
140152
this.acceptingSocket = acceptingSocket
141-
acceptingSocket.close().subscribe()
142-
acceptingSocket.onClose().blockingAwait()
143153
init()
144154
}
145155

rsocket-transport-okhttp/gradle/dependency-locks/compileClasspath.lockfile

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.3.50
1212
org.jetbrains.kotlin:kotlin-stdlib:1.3.50
1313
org.jetbrains:annotations:13.0
1414
org.reactivestreams:reactive-streams:1.0.2
15+
org.slf4j:slf4j-api:1.7.28

0 commit comments

Comments
 (0)