Skip to content

Commit fe2f8bf

Browse files
committed
Drop or rename TODOs (issues will be created) and use logging for unexpected frame
1 parent cf292b6 commit fe2f8bf

File tree

27 files changed

+60
-64
lines changed

27 files changed

+60
-64
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal class ConnectionInbound(
5454
}
5555

5656
private fun receiveError(cause: Throwable) {
57-
throw cause // TODO?
57+
throw cause
5858
}
5959

6060
fun createOperation(type: FrameType, requestJob: Job): ResponderOperation = when (type) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.core.*
2121
import io.rsocket.kotlin.frame.*
2222
import io.rsocket.kotlin.internal.io.*
23+
import io.rsocket.kotlin.logging.*
2324
import io.rsocket.kotlin.transport.*
2425
import kotlinx.coroutines.*
2526

27+
@RSocketLoggingApi
2628
@RSocketTransportApi
2729
internal abstract class ConnectionInitializer(
2830
private val isClient: Boolean,
2931
private val frameCodec: FrameCodec,
32+
private val frameLogger: Logger,
3033
private val connectionAcceptor: ConnectionAcceptor,
3134
private val interceptors: Interceptors,
3235
) {
@@ -42,11 +45,11 @@ internal abstract class ConnectionInitializer(
4245
else -> connection.acceptStream() ?: error("Initial stream should be received")
4346
}
4447
initialStream.setSendPriority(0)
45-
MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope)
48+
MultiplexedConnection(isClient, frameCodec, frameLogger, connection, initialStream, requestsScope)
4649
}
4750

4851
is RSocketSequentialConnection -> {
49-
SequentialConnection(isClient, frameCodec, connection, requestsScope)
52+
SequentialConnection(isClient, frameCodec, frameLogger, connection, requestsScope)
5053
}
5154
}
5255

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,22 @@
1616

1717
package io.rsocket.kotlin.connection
1818

19+
import io.rsocket.kotlin.*
1920
import io.rsocket.kotlin.frame.*
2021
import io.rsocket.kotlin.internal.*
22+
import io.rsocket.kotlin.logging.*
2123
import io.rsocket.kotlin.operation.*
2224
import io.rsocket.kotlin.payload.*
2325
import io.rsocket.kotlin.transport.*
2426
import kotlinx.coroutines.*
2527
import kotlinx.io.*
2628

29+
@RSocketLoggingApi
2730
@RSocketTransportApi
2831
internal class MultiplexedConnection(
2932
isClient: Boolean,
3033
frameCodec: FrameCodec,
34+
private val frameLogger: Logger,
3135
private val connection: RSocketMultiplexedConnection,
3236
private val initialStream: RSocketMultiplexedConnection.Stream,
3337
private val requestsScope: CoroutineScope,
@@ -146,7 +150,7 @@ internal class MultiplexedConnection(
146150
// request is cancelled during fragmentation
147151
is CancelFrame -> error("Request was cancelled by remote party")
148152
is RequestFrame -> {
149-
// TODO: extract assembly logic?
153+
// TODO[fragmentation]: extract assembly logic?
150154
when {
151155
// for RC, it could contain the complete flag
152156
// complete+follows=complete, "complete" overrides "follows" flag
@@ -194,7 +198,7 @@ internal class MultiplexedConnection(
194198
): Unit = coroutineScope {
195199
val outbound = Outbound(streamId, stream)
196200
val receiveJob = launch {
197-
val handler = OperationFrameHandler(operation)
201+
val handler = OperationFrameHandler(operation, frameLogger)
198202
try {
199203
while (true) {
200204
val frame = frameCodec.decodeFrame(

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package io.rsocket.kotlin.connection
1818

19+
import io.rsocket.kotlin.*
1920
import io.rsocket.kotlin.frame.*
21+
import io.rsocket.kotlin.logging.*
2022
import io.rsocket.kotlin.operation.*
2123
import io.rsocket.kotlin.payload.*
2224
import io.rsocket.kotlin.transport.*
2325
import kotlinx.coroutines.*
2426
import kotlinx.io.*
2527

28+
@RSocketLoggingApi
2629
@RSocketTransportApi
2730
internal class SequentialConnection(
2831
isClient: Boolean,
2932
frameCodec: FrameCodec,
33+
private val frameLogger: Logger,
3034
private val connection: RSocketSequentialConnection,
3135
private val requestsScope: CoroutineScope,
3236
) : ConnectionOutbound(frameCodec) {
@@ -60,7 +64,7 @@ internal class SequentialConnection(
6064
): Job = requestsScope.launch(start = CoroutineStart.ATOMIC) {
6165
operation.handleExecutionFailure(requestPayload) {
6266
ensureActive() // because of atomic start
63-
val streamId = storage.createStream(OperationFrameHandler(operation))
67+
val streamId = storage.createStream(OperationFrameHandler(operation, frameLogger))
6468
try {
6569
operation.execute(Outbound(streamId), requestPayload)
6670
} finally {
@@ -116,7 +120,8 @@ internal class SequentialConnection(
116120
when {
117121
frame.follows -> ResponderInboundWrapper(connectionInbound, operationData)
118122
else -> acceptRequest(connectionInbound, operationData)
119-
}
123+
},
124+
frameLogger
120125
)
121126
if (storage.acceptStream(streamId, handler)) {
122127
// for fragmentation
@@ -159,7 +164,7 @@ internal class SequentialConnection(
159164
)
160165
)
161166
// close old handler
162-
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation))?.close()
167+
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation, frameLogger))?.close()
163168
} else {
164169
// should not happen really
165170
storage.removeStream(operationData.streamId)?.close()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class RSocketConnector internal constructor(
6666
private inner class SetupConnection() : ConnectionInitializer(
6767
isClient = true,
6868
frameCodec = FrameCodec(maxFragmentSize),
69+
frameLogger = frameLogger,
6970
connectionAcceptor = acceptor,
7071
interceptors = interceptors
7172
) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class RSocketServer internal constructor(
6666
private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionInitializer(
6767
isClient = false,
6868
frameCodec = FrameCodec(maxFragmentSize),
69+
frameLogger = frameLogger,
6970
connectionAcceptor = acceptor,
7071
interceptors = interceptors
7172
) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,8 @@ internal sealed class Frame : AutoCloseable {
4141
}
4242

4343
internal fun dump(length: Long): String = buildString {
44-
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId).append(" Length: ").append(length)
44+
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId)
45+
if (length != -1L) append(" Length: ").append(length)
4546
append("\nFlags: 0b").append(flags.toBinaryString()).append(" (").apply { appendFlags() }.append(")")
4647
appendSelf()
4748
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,5 +32,5 @@ internal class FrameCodec(
3232

3333
fun encodeFrame(frame: Frame): Buffer = frame.toBuffer()
3434

35-
// TODO: move fragmentation logic here or into separate class?
35+
// TODO[fragmentation]: move fragmentation logic here or into separate class?
3636
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,7 +78,7 @@ internal fun Source.readRequest(
7878
return RequestFrame(type, streamId, fragmentFollows, complete, next, initialRequest, payload)
7979
}
8080

81-
//TODO rename or remove on fragmentation implementation
81+
//TODO[fragmentation] rename or remove on fragmentation implementation
8282
internal fun RequestFireAndForgetFrame(streamId: Int, payload: Payload): RequestFrame =
8383
RequestFrame(FrameType.RequestFnF, streamId, false, false, false, 0, payload)
8484

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ private const val HonorLeaseFlag = 64
2525
private const val ResumeEnabledFlag = 128
2626

2727
internal class SetupFrame(
28-
val version: Version, //TODO check
28+
val version: Version,
2929
val honorLease: Boolean,
3030
val keepAlive: KeepAlive,
3131
val resumeToken: Buffer?,
@@ -104,7 +104,7 @@ private fun Source.readStringMimeType(): String {
104104
}
105105

106106
private fun Sink.writeStringMimeType(mimeType: String) {
107-
val bytes = mimeType.encodeToByteArray() //TODO check
107+
val bytes = mimeType.encodeToByteArray()
108108
writeByte(bytes.size.toByte())
109109
write(bytes)
110110
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,11 +24,6 @@ private fun safeFindNextPositivePowerOfTwo(value: Int): Int = when {
2424
else -> 1 shl 32 - (value - 1).countLeadingZeroBits()
2525
}
2626

27-
// TODO: may be move to `internal-io` (and rename to just `rsocket-internal`)
28-
// and use in prioritization queue to support more granular prioritization for streams
29-
//
30-
// TODO decide, is it needed, or can be replaced by simple map, or concurrent map on JVM?
31-
// do benchmarks
3227
/**
3328
* IntMap implementation based on Netty IntObjectHashMap.
3429
*/

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadAssembler.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,9 +20,8 @@ import io.rsocket.kotlin.frame.io.*
2020
import io.rsocket.kotlin.payload.*
2121
import kotlinx.io.*
2222

23-
// TODO: make metadata should be fully transmitted before data
23+
// TODO[fragmentation]: make metadata should be fully transmitted before data
2424
internal class PayloadAssembler : AutoCloseable {
25-
// TODO: better name
2625
val hasPayload: Boolean
2726
get() = data != null
2827

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/PayloadChannel.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@ import kotlinx.coroutines.channels.*
2424
import kotlinx.coroutines.flow.*
2525

2626
internal class PayloadChannel {
27-
// TODO: capacity should be configurable
2827
private val payloads = channelForCloseable<Payload>(Channel.UNLIMITED)
2928
private val requestNs = Channel<Int>(Channel.UNLIMITED)
3029

@@ -39,7 +38,6 @@ internal class PayloadChannel {
3938

4039
@ExperimentalStreamsApi
4140
suspend fun consumeInto(collector: FlowCollector<Payload>, strategy: RequestStrategy.Element): Throwable? {
42-
// TODO: requestNs should be cancelled on success path?
4341
payloads.consume {
4442
while (true) {
4543
payloads

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationInbound.kt

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,10 @@
1616

1717
package io.rsocket.kotlin.operation
1818

19+
import io.rsocket.kotlin.*
1920
import io.rsocket.kotlin.frame.*
2021
import io.rsocket.kotlin.internal.*
22+
import io.rsocket.kotlin.logging.*
2123
import io.rsocket.kotlin.payload.*
2224

2325
internal interface OperationInbound {
@@ -33,8 +35,11 @@ internal interface OperationInbound {
3335
fun receiveDone() {}
3436
}
3537

36-
// TODO: merge into OperationInbound?
37-
internal class OperationFrameHandler(private val inbound: OperationInbound) {
38+
@RSocketLoggingApi
39+
internal class OperationFrameHandler(
40+
private val inbound: OperationInbound,
41+
private val frameLogger: Logger,
42+
) {
3843
private val assembler = PayloadAssembler()
3944

4045
fun close() {
@@ -47,8 +52,7 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {
4752

4853
fun handleFrame(frame: Frame) {
4954
if (!inbound.shouldReceiveFrame(frame.type)) {
50-
// TODO: replace with logging
51-
println("unexpected frame: $frame")
55+
frameLogger.debug { "Received unexpected frame: ${frame.dump(-1)}" }
5256
return frame.close()
5357
}
5458

@@ -57,14 +61,13 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {
5761
is ErrorFrame -> inbound.receiveErrorFrame(frame.throwable)
5862
is RequestNFrame -> inbound.receiveRequestNFrame(frame.requestN)
5963
is RequestFrame -> {
60-
// TODO: split frames
6164
if (frame.initialRequest != 0) inbound.receiveRequestNFrame(frame.initialRequest)
6265

6366
val payload = when {
6467
// complete+follows=complete
6568
frame.complete -> when {
6669
frame.next -> assembler.assemblePayload(frame.payload)
67-
// TODO - what if we previously received fragment?
70+
// TODO[fragmentation] - what if we previously received fragment?
6871
else -> {
6972
check(!assembler.hasPayload) { "wrong combination of frames" }
7073
null
@@ -85,11 +88,9 @@ internal class OperationFrameHandler(private val inbound: OperationInbound) {
8588
}
8689

8790
inbound.receivePayloadFrame(payload, frame.complete)
88-
//
89-
//
90-
// // TODO: recheck notes
91-
// // TODO: if there are no fragments saved and there are no following - we can ignore going through buffer
92-
// // TODO: really, fragment could be NULL when `complete` is true, but `next` is false
91+
92+
// // TODO[fragmentation]: if there are no fragments saved and there are no following - we can ignore going through buffer
93+
// // TODO[fragmentation]: really, fragment could be NULL when `complete` is true, but `next` is false
9394
// if (frame.next || frame.type.isRequestType) appendFragment(frame.payload)
9495
// if (frame.complete) inbound.receivePayloadFrame(assemblePayload(), complete = true)
9596
// else if (!frame.follows) inbound.receivePayloadFrame(assemblePayload(), complete = false)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/OperationOutbound.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ internal abstract class OperationOutbound(
3131
protected val streamId: Int,
3232
private val frameCodec: FrameCodec,
3333
) {
34-
// TODO: decide on it
34+
// TODO[fragmentation]: decide on it
3535
// private var firstRequestFrameSent: Boolean = false
3636

3737
protected abstract suspend fun sendFrame(frame: Buffer)
@@ -71,8 +71,7 @@ internal abstract class OperationOutbound(
7171
return sendRequestPayload(type, payload, complete, initialRequest)
7272
}
7373

74-
// TODO rework/simplify later
75-
// TODO release on fail ?
74+
// TODO[fragmentation] rework/simplify later, release on fail ?
7675
private suspend fun sendRequestPayload(type: FrameType, payload: Payload, complete: Boolean, initialRequest: Int) {
7776
if (!payload.isFragmentable(type.hasInitialRequest)) {
7877
return sendFrame(RequestFrame(type, streamId, false, complete, true, initialRequest, payload))

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ internal class RequesterRequestChannelOperation(
7777
responsePayloads.isActive -> frameType == FrameType.Payload || frameType == FrameType.Error
7878
else -> false
7979
} || when {
80-
// TODO: handle cancel, when `senderJob` is not started
8180
senderJob == null || senderJob?.isActive == true -> frameType == FrameType.RequestN || frameType == FrameType.Cancel
8281
else -> false
8382
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ internal class RequesterRequestResponseOperation(
3535
)
3636
responseDeferred.join()
3737
} catch (cause: Throwable) {
38-
// TODO: we don't need to send cancel if we have sent no frames
3938
nonCancellable { outbound.sendCancel() }
4039
throw cause
4140
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/ResponderRequestChannelOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal class ResponderRequestChannelOperation(
7272
frameType === FrameType.Cancel || when {
7373
requestPayloads.isActive -> frameType === FrameType.Payload || frameType === FrameType.Error
7474
else -> false
75-
} || frameType === FrameType.RequestN // TODO
75+
} || frameType === FrameType.RequestN
7676

7777
override fun receiveRequestNFrame(requestN: Int) {
7878
limiter.updateRequests(requestN)

0 commit comments

Comments
 (0)