diff --git a/ktor-plugins/ktor-tests/src/commonTest/kotlin/io/rsocket/kotlin/ktor/tests/WebSocketConnectionTest.kt b/ktor-plugins/ktor-tests/src/commonTest/kotlin/io/rsocket/kotlin/ktor/tests/WebSocketConnectionTest.kt index 6f7d92c6..361c75bb 100644 --- a/ktor-plugins/ktor-tests/src/commonTest/kotlin/io/rsocket/kotlin/ktor/tests/WebSocketConnectionTest.kt +++ b/ktor-plugins/ktor-tests/src/commonTest/kotlin/io/rsocket/kotlin/ktor/tests/WebSocketConnectionTest.kt @@ -65,7 +65,7 @@ class WebSocketConnectionTest : SuspendTest { flow { var i = 0 while (true) { - emitOrClose(buildPayload { data((++i).toString()) }) + emit(buildPayload { data((++i).toString()) }) delay(1000) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt index 1c84c050..9052ac5e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,8 @@ package io.rsocket.kotlin -import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.io.* @@ -57,6 +55,11 @@ private fun notImplemented(operation: String): Nothing = throw NotImplementedErr * Tries to emit [value], if emit failed, f.e. due cancellation, calls [Closeable.close] on [value]. * Better to use it instead of [FlowCollector.emit] with [Payload] or [ByteReadPacket] to avoid leaks of dropped elements. */ +@Deprecated( + message = "Will be removed in next release", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("emit(value)") +) public suspend fun FlowCollector.emitOrClose(value: C) { try { return emit(value) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 6771a99f..138b8957 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,14 +95,14 @@ abstract class RSocketTest( requestResponse { it } requestStream { it.close() - flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } } + flow { repeat(10) { emit(payload("server got -> [$it]")) } } } requestChannel { init, payloads -> init.close() flow { coroutineScope { payloads.onEach { it.close() }.launchIn(this) - repeat(10) { emitOrClose(payload("server got -> [$it]")) } + repeat(10) { emit(payload("server got -> [$it]")) } } } } diff --git a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt index 8ee8a2a4..3b66c7bb 100644 --- a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt +++ b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt @@ -194,7 +194,7 @@ abstract class TransportTest : SuspendTest { @Test fun requestChannel500NoLeak() = test { val request = flow { - repeat(10_000) { emitOrClose(payload(3)) } + repeat(10_000) { emit(payload(3)) } } val count = client @@ -317,13 +317,13 @@ abstract class TransportTest : SuspendTest { override fun requestStream(payload: Payload): Flow = flow { payload.close() repeat(8192) { - emitOrClose(Payload(packet(responderData), packet(responderMetadata))) + emit(Payload(packet(responderData), packet(responderMetadata))) } } override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = flow { initPayload.close() - payloads.collect { emitOrClose(it) } + payloads.collect { emit(it) } } }