From f25ab7dc69a6be059342380eb5d1e6605ebcf482 Mon Sep 17 00:00:00 2001 From: Federico Alterio Date: Sun, 24 Aug 2025 14:38:09 +0200 Subject: [PATCH] AsyncObseravableBase: normalized synchronous exceptions thrown by observer methods --- .../AsyncObservableBase.cs | 7 +-- .../AsyncObserverEnsureAsyncHelpers.cs | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index 08d0a62a1..dd0766e9a 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Reactive.Internal; using System.Threading.Tasks; namespace System.Reactive @@ -124,7 +125,7 @@ protected override async ValueTask OnCompletedAsyncCore() return; } - _task = _observer.OnCompletedAsync(); + _task = _observer.OnCompletedAsync_EnsureAsync(); } try @@ -146,7 +147,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) return; } - _task = _observer.OnErrorAsync(error); + _task = _observer.OnErrorAsync_EnsureAsync(error); } try @@ -168,7 +169,7 @@ protected override async ValueTask OnNextAsyncCore(T value) return; } - _task = _observer.OnNextAsync(value); + _task = _observer.OnNextAsync_EnsureAsync(value); } try diff --git a/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs b/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs new file mode 100644 index 000000000..62a5d82fc --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs @@ -0,0 +1,44 @@ +using System.Threading.Tasks; + +namespace System.Reactive.Internal; + +// Helpers methods that ensure that calls to IAsyncObserver methods don't throw synchronously. +// Those methods will always return a ValueTask, and any exception will be propagated through that ValueTask. +internal static class AsyncObserverEnsureAsyncHelpers +{ + public static ValueTask OnNextAsync_EnsureAsync(this IAsyncObserver source, T value) + { + try + { + return source.OnNextAsync(value); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } + + public static ValueTask OnErrorAsync_EnsureAsync(this IAsyncObserver source, Exception error) + { + try + { + return source.OnErrorAsync(error); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } + + public static ValueTask OnCompletedAsync_EnsureAsync(this IAsyncObserver source) + { + try + { + return source.OnCompletedAsync(); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } +}