Skip to content

Commit

Permalink
zarusz#356 Consumer error response per transport
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa committed Jan 3, 2025
1 parent 15834b6 commit 0e18604
Show file tree
Hide file tree
Showing 32 changed files with 188 additions and 247 deletions.
70 changes: 30 additions & 40 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
- [Order of Execution](#order-of-execution)
- [Generic interceptors](#generic-interceptors)
- [Error Handling](#error-handling)
- [Azure Service Bus](#azure-service-bus)
- [RabbitMQ](#rabbitmq)
- [Logging](#logging)
- [Debugging](#debugging)
- [Provider specific functionality](#provider-specific-functionality)
Expand Down Expand Up @@ -1081,15 +1083,14 @@ public interface IConsumerErrorHandler<in T>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options.

| Result | Description |
| ProcessResult | Description |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** |
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
Expand All @@ -1105,70 +1106,59 @@ services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabb
services.AddTransient(typeof(IConsumerErrorHandler<>), typeof(CustomConsumerErrorHandler<>));
```

Transport plugins provide specialized error handling interfaces. Examples include:
Transport plugins provide specialized error handling interfaces with a default implementation that includes any additional `ProcessResult` options. Examples include:

- [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs)
- [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs)
- [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs)
- [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs)
- [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs)
- [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs)
- [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs)
- [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs)
| Interface | Implementation including reference to additional options (if any) |
| ---------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
| [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | [MemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) |
| [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | [RabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) |
| [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | [KafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) |
| [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | [RedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) |
| [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | [NatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) |
| [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | [ServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) |
| [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | [EventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) |
| [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | [SqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) |

> The message processing pipeline will always attempt to use the transport-specific error handler (e.g., `IMemoryConsumerErrorHandler<T>`) first. If unavailable, it will then look for the generic error handler (`IConsumerErrorHandler<T>`).

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.


### Abandon
#### Azure Service Bus
The Azure Service Bus transport has full support for abandoning messages to the dead letter queue.

#### RabbitMQ
Abandon will issue a `Nack` with `requeue: false`.

#### Other transports
No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown.
| ProcessResult | Description |
| ------------- | ---------------------------------------------------------------------------------- |
| DeadLetter | Abandons further processing of the message by sending it to the dead letter queue. |

### Failure
#### RabbitMQ
While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler.
| ProcessResult | Description |
| ------------- | --------------------------------------------------------------- |
| Requeue | Return the message to the queue for re-processing <sup>1</sup>. |

Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions.

```cs
.Handle<EchoRequest, EchoResponse>(x => x
.Queue("echo-request-handler")
.ExchangeBinding("test-echo")
.DeadLetterExchange("echo-request-handler-dlq")
// requeue a message on failure
.RequeueOnFailure()
.WithHandler<EchoRequestHandler>())
```
<sup>1</sup> RabbitMQ does not have a maximum delivery count. Please use `Requeue` with caution as, if no other conditions are applied, it may result in an infinite message loop.

### Example usage
Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):
Example retry with exponential back-off and short-curcuit to dead letter exchange on non-transient exceptions (using the [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) abstract implementation):
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
public class RetryHandler<T> : RabbitMqConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (!IsTranientException(exception))
{
return Abandon();
return Failure();
}

if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);

// in process retry
return Retry();
}

return Failure();
// re-qeuue for out of process execution
return Requeue();
}

private static bool IsTransientException(Exception exception)
Expand Down
Loading

0 comments on commit 0e18604

Please sign in to comment.