Skip to content

Commit b0a3736

Browse files
fedeAlterioFedericoAlterio
authored andcommitted
AsyncObseravableBase: normalized synchronous exceptions thrown by observer methods
1 parent de5749f commit b0a3736

File tree

2 files changed

+48
-3
lines changed

2 files changed

+48
-3
lines changed

AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Reactive.Internal;
56
using System.Threading.Tasks;
67

78
namespace System.Reactive
@@ -124,7 +125,7 @@ protected override async ValueTask OnCompletedAsyncCore()
124125
return;
125126
}
126127

127-
_task = _observer.OnCompletedAsync();
128+
_task = _observer.OnCompletedAsync_EnsureAsync();
128129
}
129130

130131
try
@@ -146,7 +147,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error)
146147
return;
147148
}
148149

149-
_task = _observer.OnErrorAsync(error);
150+
_task = _observer.OnErrorAsync_EnsureAsync(error);
150151
}
151152

152153
try
@@ -168,7 +169,7 @@ protected override async ValueTask OnNextAsyncCore(T value)
168169
return;
169170
}
170171

171-
_task = _observer.OnNextAsync(value);
172+
_task = _observer.OnNextAsync_EnsureAsync(value);
172173
}
173174

174175
try
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System.Threading.Tasks;
2+
3+
namespace System.Reactive.Internal;
4+
5+
// Helpers methods that ensure that calls to IAsyncObserver methods don't throw synchronously.
6+
// Those methods will always return a ValueTask, and any exception will be propagated through that ValueTask.
7+
internal static class AsyncObserverEnsureAsyncHelpers
8+
{
9+
public static ValueTask OnNextAsync_EnsureAsync<T>(this IAsyncObserver<T> source, T value)
10+
{
11+
try
12+
{
13+
return source.OnNextAsync(value);
14+
}
15+
catch (Exception e)
16+
{
17+
return new ValueTask(Task.FromException(e));
18+
}
19+
}
20+
21+
public static ValueTask OnErrorAsync_EnsureAsync<T>(this IAsyncObserver<T> source, Exception error)
22+
{
23+
try
24+
{
25+
return source.OnErrorAsync(error);
26+
}
27+
catch (Exception e)
28+
{
29+
return new ValueTask(Task.FromException(e));
30+
}
31+
}
32+
33+
public static ValueTask OnCompletedAsync_EnsureAsync<T>(this IAsyncObserver<T> source)
34+
{
35+
try
36+
{
37+
return source.OnCompletedAsync();
38+
}
39+
catch (Exception e)
40+
{
41+
return new ValueTask(Task.FromException(e));
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)