4
4
import java .util .concurrent .*;
5
5
import java .util .concurrent .atomic .AtomicReference ;
6
6
7
+ /**
8
+ * The {@code Async} class provides utilities to execute code asynchronously using virtual threads.
9
+ * It allows running {@link ThrowingRunnable} and {@link Callable} tasks asynchronously, handling exceptions,
10
+ * and returning results in a synchronous manner. It leverages the new virtual threads feature introduced in Java
11
+ * to provide lightweight concurrency.
12
+ * <p>
13
+ * This class offers methods to:
14
+ * <ul>
15
+ * <li>Run a block of code asynchronously and wait for its completion.</li>
16
+ * <li>Handle both checked and unchecked exceptions in asynchronous tasks.</li>
17
+ * <li>Retrieve results from {@link Future} and {@link CompletableFuture} objects.</li>
18
+ * </ul>
19
+ */
7
20
public class Async {
8
21
9
22
private static final Thread .Builder virtualThreadBuilder = Thread .ofVirtual ()
10
23
.name ("async-virtual-" , 0 );
11
24
12
- public static void await (ThrowingRunnable block ) {
25
+ private Async () {
26
+ // hide public constructor
27
+ }
28
+
29
+ /**
30
+ * Executes a block of code asynchronously and waits for its completion.
31
+ *
32
+ * @param block The code to be executed asynchronously
33
+ * @param millis The maximum time to wait for the block to complete, in milliseconds
34
+ * @throws CompletionException if the virtual thread is interrupted
35
+ * @throws RuntimeException if the block throws an exception
36
+ * @throws Error if the block throws an Error
37
+ * @throws IllegalStateException if an unexpected Throwable is encountered
38
+ */
39
+ @ SuppressWarnings ("java:S1181" )
40
+ public static void await (ThrowingRunnable block , long millis ) {
13
41
Objects .requireNonNull (block , "Block should not be null" );
14
42
try {
15
43
final var failureHolder = new AtomicReference <Throwable >();
@@ -18,44 +46,71 @@ public static void await(ThrowingRunnable block) {
18
46
ThrowingRunnable
19
47
.toRunnable (block )
20
48
.run ();
21
- } catch (RuntimeException | Error e ) {
49
+ } catch (Error e ) {
22
50
failureHolder .set (e );
51
+ } catch (Exception e ) {
52
+ failureHolder .set (toRuntimeException (e ));
23
53
}
24
54
})
25
- .join ();
55
+ .join (millis );
26
56
final Throwable throwable = failureHolder .get ();
27
- if (throwable instanceof Error e ) {
28
- throw e ;
29
- } else if (throwable instanceof RuntimeException re ) {
30
- throw re ;
31
- } else {
32
- throw new IllegalStateException ("Unexpected Throwable: " + throwable , throwable );
57
+ switch (throwable ) {
58
+ case null -> {
59
+ // success
60
+ }
61
+ case Error e -> throw e ;
62
+ case RuntimeException re -> throw re ;
63
+ default -> throw new IllegalStateException ("Unexpected Throwable: " + throwable , throwable );
33
64
}
34
65
} catch (InterruptedException e ) {
35
66
Thread .currentThread ().interrupt ();
36
- throw new RuntimeException ("Interrupted virtual thread" , e );
67
+ throw new CompletionException ("Interrupted virtual thread" , e );
37
68
}
38
69
}
39
70
40
- public static <T > T await (Callable <T > block ) throws RuntimeException {
71
+ /**
72
+ * Executes a block of code asynchronously and waits indefinitely for its completion.
73
+ *
74
+ * @param block The code to be executed asynchronously
75
+ * @throws CompletionException if the virtual thread is interrupted
76
+ * @throws RuntimeException if the block throws an exception
77
+ * @throws Error if the block throws an Error
78
+ * @throws IllegalStateException if an unexpected Throwable is encountered
79
+ */
80
+ public static void await (ThrowingRunnable block ) {
81
+ await (block , 0 );
82
+ }
83
+
84
+ /**
85
+ * Executes a callable block asynchronously and returns its result.
86
+ *
87
+ * @param <T> The type of the result
88
+ * @param block The callable block to be executed asynchronously. <strong>Block should not execute code
89
+ * that contains synchronized blocks or invokes synchronized methods to avoid scalability issues.</strong>
90
+ * @param millis The maximum time to wait for the callable block to complete, in milliseconds
91
+ * @return The result of the callable block
92
+ * @throws RuntimeException if the virtual thread is interrupted or if the block throws an exception
93
+ * @throws Error if the block throws an Error
94
+ * @throws IllegalStateException if an unexpected throwable is encountered in the call result
95
+ */
96
+ public static <T > T await (Callable <T > block , long millis ) {
41
97
Objects .requireNonNull (block , "Callable should not be null" );
42
98
try {
43
99
final var resultHolder = new AtomicReference <Result <T >>();
44
100
virtualThreadBuilder .start (() -> {
45
101
final Result <T > result = callWithErrorHandling (block );
46
102
resultHolder .set (result );
47
- }).join ();
103
+ }).join (millis );
48
104
final Result <T > result = resultHolder .get ();
49
105
if (result .isSuccess ()) {
50
- return result .getOrThrow ();
106
+ return result .getOrNull ();
51
107
} else {
52
108
final Throwable failure = result .failure ();
53
- if (failure instanceof RuntimeException re ) {
54
- throw re ;
55
- } else if (failure instanceof Error e ) {
56
- throw e ;
57
- } else {
58
- throw new IllegalStateException ("Unexpected throwable in call Result:" + failure , failure );
109
+ switch (failure ) {
110
+ case RuntimeException re -> throw re ;
111
+ case Error e -> throw e ;
112
+ case null , default ->
113
+ throw new IllegalStateException ("Unexpected throwable in call Result:" + failure , failure );
59
114
}
60
115
}
61
116
} catch (InterruptedException e ) {
@@ -64,6 +119,62 @@ public static <T> T await(Callable<T> block) throws RuntimeException {
64
119
}
65
120
}
66
121
122
+ /**
123
+ * Executes a callable block asynchronously and returns its result.
124
+ *
125
+ * @param <T> The type of the result
126
+ * @param block The callable block to be executed asynchronously
127
+ * @return The result of the callable block
128
+ * @throws RuntimeException if the virtual thread is interrupted or if the block throws an exception
129
+ * @throws Error if the block throws an Error
130
+ * @throws IllegalStateException if an unexpected throwable is encountered in the call result
131
+ */
132
+ public static <T > T await (Callable <T > block ) {
133
+ return await (block , 0 );
134
+ }
135
+
136
+ /**
137
+ * Waits for the completion of a Future and returns its result.
138
+ *
139
+ * @param <T> The type of the result
140
+ * @param future The Future to await
141
+ * @return The result of the Future
142
+ * @throws RuntimeException if the Future completes exceptionally
143
+ */
144
+ public static <T > T await (Future <T > future ) {
145
+ if (shortCircuitDoneFuture (future )) return future .resultNow ();
146
+ return await (() -> future .get ());
147
+ }
148
+
149
+
150
+ /**
151
+ * Waits for the completion of a Future and returns its result.
152
+ *
153
+ * @param <T> The type of the result
154
+ * @param future The Future to await
155
+ * @param millis The maximum time to wait for the future to complete, in milliseconds
156
+ * @return The result of the Future
157
+ * @throws RuntimeException if the Future completes exceptionally
158
+ */
159
+ public static <T > T await (Future <T > future , long millis ) {
160
+ if (shortCircuitDoneFuture (future )) return future .resultNow ();
161
+ return await (() -> future .get (millis , TimeUnit .MILLISECONDS ));
162
+ }
163
+
164
+ /**
165
+ * Waits for the completion of a CompletableFuture and returns its result.
166
+ *
167
+ * @param <T> The type of the result
168
+ * @param completableFuture The CompletableFuture to await
169
+ * @return The result of the CompletableFuture
170
+ * @throws RuntimeException if the CompletableFuture completes exceptionally
171
+ */
172
+ public static <T > T await (CompletableFuture <T > completableFuture ) {
173
+ if (shortCircuitDoneFuture (completableFuture )) return completableFuture .resultNow ();
174
+ return await (completableFuture ::join );
175
+ }
176
+
177
+
67
178
/**
68
179
* Executes a block of code with error handling.
69
180
* <p>
@@ -79,38 +190,49 @@ public static <T> T await(Callable<T> block) throws RuntimeException {
79
190
* @throws RuntimeException if an {@link InterruptedException},
80
191
* {@link ExecutionException}, or any other exception occurs
81
192
*/
193
+ @ SuppressWarnings ("java:S1181" )
82
194
private static <T > Result <T > callWithErrorHandling (Callable <T > block ) {
83
195
try {
84
196
return Result .success (block .call ());
85
197
} catch (InterruptedException e ) {
86
198
Thread .currentThread ().interrupt (); // Restore the interrupted status
87
199
return Result .failure (
88
- new RuntimeException ("Can't execute async task: interrupted" , e )
200
+ new CompletionException ("Can't execute async task: interrupted" , e )
89
201
);
90
- } catch (ExecutionException | CompletionException e ) {
202
+ } catch (Error e ) {
203
+ return Result .failure (e );
204
+ } catch (CompletionException | ExecutionException e ) {
91
205
final Throwable cause = e .getCause ();
92
- if (cause instanceof RuntimeException re ) {
93
- // re-throw RuntimeException as it is
94
- return Result .failure (re );
95
- } else if (cause instanceof Error error ) {
96
- // re-throw Error as it is
97
- return Result .failure (error );
206
+ if (cause instanceof Error ) {
207
+ return Result .failure (cause );
98
208
} else {
99
- return Result .failure (new RuntimeException ( "Can't execute async task: exception" , cause ));
209
+ return Result .failure (toRuntimeException ( cause ));
100
210
}
101
- } catch (RuntimeException | Error e ) {
102
- return Result .failure (e );
103
- } catch (Exception e ) {
104
- return Result .failure (new RuntimeException ("Can't execute async task: exception" , e ));
211
+ } catch (Throwable e ) {
212
+ return Result .failure (toRuntimeException (e ));
105
213
}
106
214
}
107
215
108
- public static <T > T await (Future <T > future ) {
109
- return await (() -> future .get ());
216
+ private static <T > boolean shortCircuitDoneFuture (Future <T > future ) {
217
+ if (future .state () == Future .State .SUCCESS ) {
218
+ return true ;
219
+ } else if (future .state () == Future .State .FAILED ) {
220
+ Throwable throwable = future .exceptionNow ();
221
+ if (throwable instanceof Error e ) {
222
+ throw e ;
223
+ }
224
+ throw toRuntimeException (throwable );
225
+ }
226
+ return false ;
110
227
}
111
228
112
- public static <T > T await (CompletableFuture <T > completableFuture ) {
113
- return await (completableFuture ::join );
229
+ private static RuntimeException toRuntimeException (Throwable cause ) {
230
+ if (cause instanceof RuntimeException re ) {
231
+ // re-throw RuntimeException as it is
232
+ return re ;
233
+ } else {
234
+ return new CompletionException ("Can't execute async task: exception" , cause );
235
+ }
114
236
}
115
237
116
238
}
0 commit comments