@@ -23,6 +23,7 @@ import io.reactivex.processors.UnicastProcessor
23
23
import io.rsocket.kotlin.*
24
24
import io.rsocket.kotlin.exceptions.ApplicationException
25
25
import io.rsocket.kotlin.exceptions.ChannelRequestException
26
+ import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN
26
27
import org.reactivestreams.Publisher
27
28
import org.reactivestreams.Subscriber
28
29
import org.reactivestreams.Subscription
@@ -121,10 +122,11 @@ internal class RSocketRequester(
121
122
val streamId = streamIds.nextStreamId(receivers)
122
123
val receiver = StreamReceiver .create()
123
124
receivers[streamId] = receiver
124
- val reqN = Cond ()
125
+ var isFirstRequestN = true
125
126
126
127
receiver.doOnRequestIfActive { requestN ->
127
- val frame = if (reqN.first()) {
128
+ val frame = if (isFirstRequestN) {
129
+ isFirstRequestN = false
128
130
Frame .Request .from(
129
131
streamId,
130
132
FrameType .REQUEST_STREAM ,
@@ -148,35 +150,37 @@ internal class RSocketRequester(
148
150
return Flowable .defer {
149
151
val receiver = StreamReceiver .create()
150
152
val streamId = streamIds.nextStreamId(receivers)
151
- val reqN = Cond ()
153
+ var firstReqN = true
154
+ var firstReqPayload = true
152
155
153
156
receiver.doOnRequestIfActive { requestN ->
154
157
155
- if (reqN.first()) {
156
- val wrappedRequest = request.compose {
157
- val sender = RequestingPublisher .wrap(it)
158
- sender.request(1 )
159
- senders[streamId] = sender
160
- receivers[streamId] = receiver
161
- sender
162
- }.publish().autoConnect(2 )
158
+ if (firstReqN) {
159
+ firstReqN = false
163
160
164
- val first = wrappedRequest.take( 1 )
165
- .map { payload ->
166
- Frame . Request .from(
167
- streamId,
168
- FrameType . REQUEST_CHANNEL ,
169
- payload,
170
- requestN)
161
+ val requestFrames = request
162
+ .compose {
163
+ val sender = RequestingPublisher .wrap(it)
164
+ sender.request( 1 )
165
+ senders[streamId] = sender
166
+ receivers[streamId] = receiver
167
+ sender
171
168
}
172
- val rest = wrappedRequest.skip(1 )
173
169
.map { payload ->
174
- Frame .PayloadFrame .from(
175
- streamId,
176
- FrameType .NEXT ,
177
- payload)
170
+ if (firstReqPayload) {
171
+ firstReqPayload = false
172
+ Frame .Request .from(
173
+ streamId,
174
+ FrameType .REQUEST_CHANNEL ,
175
+ payload,
176
+ requestN)
177
+ } else {
178
+ Frame .PayloadFrame .from(
179
+ streamId,
180
+ FrameType .NEXT ,
181
+ payload)
182
+ }
178
183
}
179
- val requestFrames = Flowable .concatArrayEager(first, rest)
180
184
requestFrames.subscribe(
181
185
ChannelRequestSubscriber (
182
186
{ payload -> frameSender.send(payload) },
@@ -248,10 +252,7 @@ internal class RSocketRequester(
248
252
FrameType .NEXT -> receiver.onNext(DefaultPayload (frame))
249
253
FrameType .REQUEST_N -> {
250
254
val sender = senders[streamId]
251
- sender?.let {
252
- val n = Frame .RequestN .requestN(frame).toLong()
253
- it.request(n)
254
- }
255
+ sender?.request(reactiveStreamsRequestN(Frame .RequestN .requestN(frame)))
255
256
}
256
257
FrameType .COMPLETE -> {
257
258
receiver.onComplete()
@@ -320,18 +321,6 @@ internal class RSocketRequester(
320
321
}
321
322
}
322
323
323
- private class Cond {
324
- private var first = true
325
-
326
- fun first (): Boolean =
327
- if (first) {
328
- first = false
329
- true
330
- } else {
331
- false
332
- }
333
- }
334
-
335
324
private class ChannelRequestSubscriber (private val next : (Frame ) -> Unit ,
336
325
private val error : (Throwable ) -> Unit ,
337
326
private val complete : (Boolean ) -> Unit )
0 commit comments