@@ -3,34 +3,25 @@ package blue.starry.saya.endpoints
3
3
import blue.starry.saya.common.createSayaLogger
4
4
import blue.starry.saya.common.rejectWs
5
5
import blue.starry.saya.common.respondOr404
6
- import blue.starry.saya.models.Comment
7
6
import blue.starry.saya.models.CommentSource
8
7
import blue.starry.saya.models.TimeshiftCommentControl
9
- import blue.starry.saya.services.CommentChannelManager
10
- import blue.starry.saya.services.SayaMiyouTVApi
11
- import blue.starry.saya.services.miyoutv.toSayaComment
8
+ import blue.starry.saya.services.comments.CommentChannelManager
12
9
import blue.starry.saya.services.nicojk.NicoJkApi
13
- import blue.starry.saya.services.nicolive.toSayaComment
14
10
import io.ktor.application.*
15
11
import io.ktor.http.*
16
12
import io.ktor.http.cio.websocket.*
17
13
import io.ktor.response.*
18
14
import io.ktor.routing.*
19
15
import io.ktor.util.*
20
16
import io.ktor.websocket.*
21
- import kotlinx.coroutines.channels.Channel
22
17
import kotlinx.coroutines.channels.consumeEach
23
- import kotlinx.coroutines.delay
24
- import kotlinx.coroutines.flow.collect
25
18
import kotlinx.coroutines.flow.consumeAsFlow
26
19
import kotlinx.coroutines.flow.filterIsInstance
27
- import kotlinx.coroutines.launch
20
+ import kotlinx.coroutines.flow.mapNotNull
28
21
import kotlinx.serialization.decodeFromString
29
22
import kotlinx.serialization.encodeToString
30
23
import kotlinx.serialization.json.Json
31
24
import mu.KotlinLogging
32
- import java.util.concurrent.atomic.AtomicLong
33
- import kotlin.math.roundToLong
34
25
35
26
private val logger = KotlinLogging .createSayaLogger(" saya.endpoints" )
36
27
private val jsonWithDefault = Json {
@@ -45,7 +36,6 @@ fun Route.wsLiveCommentsByTarget() {
45
36
val channel = CommentChannelManager .findByTarget(target) ? : return @webSocket rejectWs { " Parameter target is invalid." }
46
37
47
38
CommentChannelManager .subscribeLiveComments(channel, sources).consumeEach {
48
-
49
39
send(jsonWithDefault.encodeToString(it))
50
40
}
51
41
}
@@ -54,136 +44,22 @@ fun Route.wsLiveCommentsByTarget() {
54
44
fun Route.wsTimeshiftCommentsByTarget () {
55
45
webSocket {
56
46
val target: String by call.parameters
57
-
58
- // エポック秒
59
47
val startAt: Long by call.parameters
60
48
val endAt: Long by call.parameters
61
- val duration = (endAt - startAt).toInt()
62
-
63
49
val sources = CommentSource .from(call.parameters[" sources" ])
64
50
65
51
val channel = CommentChannelManager .findByTarget(target) ? : return @webSocket rejectWs { " Parameter target is invalid." }
66
- val timeMs = AtomicLong (startAt * 1000 )
67
- var pause = true
68
-
69
- // unit 秒ずつ分割して取得
70
- val unit = 600
71
-
72
- // コメント配信ループ
73
- suspend fun Channel<Comment>.sendLoop () {
74
- consumeEach { comment ->
75
- val currentMs = comment.time * 1000 + comment.timeMs
76
- val waitMs = currentMs - timeMs.get()
77
- if (waitMs < - 10000 ) {
78
- logger.trace { " 破棄 ($waitMs ) : $comment " }
79
- return @consumeEach
80
- }
81
-
82
- delay(waitMs)
83
-
84
- while (pause) {
85
- delay(100 )
86
- }
87
- send(jsonWithDefault.encodeToString(comment))
88
- logger.trace { " 配信: $comment " }
89
-
90
- timeMs.getAndUpdate { prev ->
91
- maxOf(prev, currentMs)
92
- }
93
- }
94
- }
95
-
96
- if (CommentSource .Gochan in sources && channel.miyoutvId != null ) {
97
- launch {
98
- val client = SayaMiyouTVApi ? : return @launch
99
- val queue = Channel <Comment >(Channel .UNLIMITED )
100
-
101
- launch {
102
- repeat(duration / unit) { i ->
103
- client.getComments(
104
- channel.miyoutvId,
105
- (startAt + unit * i) * 1000 ,
106
- minOf(startAt + unit * (i + 1 ), endAt) * 1000
107
- ).data.comments.map {
108
- it.toSayaComment()
109
- }.forEach {
110
- queue.send(it)
111
- }
112
- }
113
- }
114
-
115
- queue.sendLoop()
116
- }
117
- }
118
-
119
- if (CommentSource .Nicolive in sources && channel.nicojkId != null ) {
120
- launch {
121
- val queue = Channel <Comment >(Channel .UNLIMITED )
122
-
123
- launch {
124
- repeat(duration / unit) { i ->
125
- NicoJkApi .getComments(
126
- " jk${channel.nicojkId} " ,
127
- startAt + unit * i,
128
- minOf(startAt + unit * (i + 1 ), endAt)
129
- ).packets.map {
130
- it.chat.toSayaComment(
131
- source = " ニコニコ実況過去ログAPI [jk${channel.nicojkId} ]" ,
132
- sourceUrl = " https://jikkyo.tsukumijima.net/api/kakolog/jk${channel.nicojkId} ?starttime=${it.chat.date} &endtime=${it.chat.date + 1 } &format=json"
133
- )
134
- }.forEach {
135
- queue.send(it)
136
- }
137
- }
138
- }
139
-
140
- queue.sendLoop()
141
- }
142
- }
143
-
144
- // WS コントロール処理ループ
145
- incoming.consumeAsFlow().filterIsInstance<Frame .Text >().collect {
146
- val control = try {
52
+ val controls = incoming.consumeAsFlow().filterIsInstance<Frame .Text >().mapNotNull {
53
+ try {
147
54
Json .decodeFromString<TimeshiftCommentControl >(it.readText())
148
55
} catch (t: Throwable ) {
149
- logger.error(t) { " WS コントロール命令のパースに失敗しました。" }
150
- return @collect
151
- }
152
-
153
- when (control.action) {
154
- /* *
155
- * クライアントの準備ができ, コメントの配信を開始する命令
156
- * {"action": "Ready"}
157
- *
158
- * コメントの配信を再開する命令
159
- * {"action": "Resume"}
160
- */
161
- TimeshiftCommentControl .Action .Ready ,
162
- TimeshiftCommentControl .Action .Resume -> {
163
- pause = false
164
- }
165
-
166
- /* *
167
- * コメントの配信を一時停止する命令
168
- * {"action": "Pause"}
169
- */
170
- //
171
- TimeshiftCommentControl .Action .Pause -> {
172
- pause = true
173
- }
174
-
175
- /* *
176
- * コメントの位置を同期する命令
177
- * {"action": "Sync", "seconds": 10.0}
178
- */
179
- TimeshiftCommentControl .Action .Sync -> {
180
- pause = false
181
-
182
- timeMs.set(((startAt + control.seconds) * 1000 ).roundToLong())
183
- }
56
+ logger.error(t) { " TimeshiftCommentControl のパースに失敗しました: $it " }
57
+ null
184
58
}
59
+ }
185
60
186
- logger.debug { " クライアントの命令: $control " }
61
+ CommentChannelManager .subscribeTimeshiftComments(channel, sources, controls, startAt, endAt).consumeEach {
62
+ send(jsonWithDefault.encodeToString(it))
187
63
}
188
64
}
189
65
}
0 commit comments