Skip to content

Commit cf292b6

Browse files
authored
Deprecate emitOrClose (#294)
1 parent 79cd1a2 commit cf292b6

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

ktor-plugins/ktor-tests/src/commonTest/kotlin/io/rsocket/kotlin/ktor/tests/WebSocketConnectionTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class WebSocketConnectionTest : SuspendTest {
6565
flow {
6666
var i = 0
6767
while (true) {
68-
emitOrClose(buildPayload { data((++i).toString()) })
68+
emit(buildPayload { data((++i).toString()) })
6969
delay(1000)
7070
}
7171
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
1616

1717
package io.rsocket.kotlin
1818

19-
import io.rsocket.kotlin.frame.io.*
2019
import io.rsocket.kotlin.payload.*
2120
import kotlinx.coroutines.*
22-
import kotlinx.coroutines.channels.*
2321
import kotlinx.coroutines.flow.*
2422
import kotlinx.io.*
2523

@@ -57,6 +55,11 @@ private fun notImplemented(operation: String): Nothing = throw NotImplementedErr
5755
* Tries to emit [value], if emit failed, f.e. due cancellation, calls [Closeable.close] on [value].
5856
* Better to use it instead of [FlowCollector.emit] with [Payload] or [ByteReadPacket] to avoid leaks of dropped elements.
5957
*/
58+
@Deprecated(
59+
message = "Will be removed in next release",
60+
level = DeprecationLevel.ERROR,
61+
replaceWith = ReplaceWith("emit(value)")
62+
)
6063
public suspend fun <C : AutoCloseable> FlowCollector<C>.emitOrClose(value: C) {
6164
try {
6265
return emit(value)

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -95,14 +95,14 @@ abstract class RSocketTest(
9595
requestResponse { it }
9696
requestStream {
9797
it.close()
98-
flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } }
98+
flow { repeat(10) { emit(payload("server got -> [$it]")) } }
9999
}
100100
requestChannel { init, payloads ->
101101
init.close()
102102
flow {
103103
coroutineScope {
104104
payloads.onEach { it.close() }.launchIn(this)
105-
repeat(10) { emitOrClose(payload("server got -> [$it]")) }
105+
repeat(10) { emit(payload("server got -> [$it]")) }
106106
}
107107
}
108108
}

rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ abstract class TransportTest : SuspendTest {
194194
@Test
195195
fun requestChannel500NoLeak() = test {
196196
val request = flow {
197-
repeat(10_000) { emitOrClose(payload(3)) }
197+
repeat(10_000) { emit(payload(3)) }
198198
}
199199
val count =
200200
client
@@ -317,13 +317,13 @@ abstract class TransportTest : SuspendTest {
317317
override fun requestStream(payload: Payload): Flow<Payload> = flow {
318318
payload.close()
319319
repeat(8192) {
320-
emitOrClose(Payload(packet(responderData), packet(responderMetadata)))
320+
emit(Payload(packet(responderData), packet(responderMetadata)))
321321
}
322322
}
323323

324324
override fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> = flow {
325325
initPayload.close()
326-
payloads.collect { emitOrClose(it) }
326+
payloads.collect { emit(it) }
327327
}
328328
}
329329

0 commit comments

Comments
 (0)