1
1
package cryptomessenger .desktop .infrastructure .client .websocket ;
2
2
3
3
import cryptomessenger .desktop .infrastructure .client .websocket .handler .MessageHandler ;
4
+ import jakarta .annotation .PostConstruct ;
4
5
import lombok .RequiredArgsConstructor ;
6
+ import lombok .SneakyThrows ;
5
7
import lombok .extern .slf4j .Slf4j ;
6
8
import org .springframework .beans .factory .annotation .Value ;
7
9
import org .springframework .boot .context .event .ApplicationReadyEvent ;
18
20
19
21
import java .lang .reflect .Type ;
20
22
import java .nio .charset .StandardCharsets ;
23
+ import java .util .Optional ;
21
24
import java .util .Set ;
22
25
import java .util .function .Consumer ;
23
26
@@ -31,42 +34,60 @@ public class WebSocketManager {
31
34
32
35
private final Set <MessageHandler > messageHandlers ;
33
36
37
+ private final WebSocketStompClient client = new WebSocketStompClient (new StandardWebSocketClient ());
38
+ private StompSession session = null ;
39
+ private Set <Subscription > subscriptions = emptySet ();
40
+
34
41
@ Value ("${app.server-base-url}" )
35
42
private String serverBaseUrl ;
36
43
37
- private StompSession session ;
38
- private Set <Subscription > subscriptions = emptySet ();
39
-
40
- @ EventListener (ApplicationReadyEvent .class )
41
- private void connect () {
42
- var client = new WebSocketStompClient (new StandardWebSocketClient ());
44
+ @ PostConstruct
45
+ private void initialize () {
43
46
client .setMessageConverter (new StringMessageConverter (StandardCharsets .UTF_8 ));
44
- client .connectAsync (getConnectionUrl (), new SessionHandler ())
45
- .thenAccept (establishedSession -> {
46
- log .info ("Connection established" );
47
- session = establishedSession ;
48
- refreshSubscriptions ();
49
- });
50
47
}
51
48
52
- private String getConnectionUrl () {
53
- return serverBaseUrl .replaceFirst ("http" , "ws" ) + "/ws" ;
49
+ @ EventListener (ApplicationReadyEvent .class )
50
+ private void connect () {
51
+ var url = serverBaseUrl .replaceFirst ("http" , "ws" ) + "/ws" ;
52
+ client .connectAsync (url , new SessionHandler (this ::connect )).thenAccept (establishedSession -> {
53
+ log .info ("Session established" );
54
+ session = establishedSession ;
55
+ refreshSubscriptions ();
56
+ });
54
57
}
55
58
56
59
public void refreshSubscriptions () {
57
60
subscriptions .forEach (Subscription ::unsubscribe );
58
- subscriptions = messageHandlers .stream ().map (handler -> {
61
+ subscriptions = messageHandlers .stream ()
62
+ .map (this ::subscribe )
63
+ .flatMap (Optional ::stream )
64
+ .collect (toSet ());
65
+ }
66
+
67
+ private Optional <Subscription > subscribe (MessageHandler handler ) {
68
+ var handlerName = handler .getClass ().getSimpleName ();
69
+ try {
59
70
var destination = handler .getDestination ();
60
71
var subscription = session .subscribe (destination , new FrameHandler (handler .getHandler ()));
61
- log .info ("Subscribed to {}" , destination );
62
- return subscription ;
63
- }).collect (toSet ());
72
+ log .info ("Subscribed {} to {}" , handlerName , destination );
73
+ return Optional .of (subscription );
74
+ } catch (Exception e ) {
75
+ log .warn ("Failed to subscribe {}: {}" , handlerName , e .getMessage ());
76
+ return Optional .empty ();
77
+ }
64
78
}
65
79
80
+ @ RequiredArgsConstructor
66
81
private static class SessionHandler extends StompSessionHandlerAdapter {
82
+
83
+ private final Runnable onDisconnected ;
84
+
67
85
@ Override
86
+ @ SneakyThrows
68
87
public void handleTransportError (StompSession session , Throwable exception ) {
69
88
log .warn ("Transport error: {}" , exception .getMessage ());
89
+ Thread .sleep (5000 );
90
+ onDisconnected .run ();
70
91
}
71
92
}
72
93
0 commit comments