Skip to content

Commit

Permalink
Consumer service refactorig
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcos Gomes committed Feb 20, 2023
1 parent 011225d commit 43c8ca3
Show file tree
Hide file tree
Showing 17 changed files with 289 additions and 134 deletions.
2 changes: 2 additions & 0 deletions src/Marquitos.Events.RabbitMQ/Consumers/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Marquitos.Events.Consumers;
using System.Threading;
using System.Threading.Tasks;

namespace Marquitos.Events.RabbitMQ.Consumers
{
Expand Down
14 changes: 8 additions & 6 deletions src/Marquitos.Events.RabbitMQ/Consumers/EventConsumerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
namespace Marquitos.Events.RabbitMQ.Consumers
using System;

namespace Marquitos.Events.RabbitMQ.Consumers
{
/// <summary>
/// Event consumer options
/// </summary>
public class EventConsumerOptions
{
/// <summary>
/// The queue name
/// </summary>
public string QueueName { get; set; } = "";

/// <summary>
/// Array of delay in minutes of retries atempts that the consumer will try to consume the message before raises a failed exception.
/// </summary>
Expand Down Expand Up @@ -40,6 +37,11 @@ public class EventConsumerOptions
/// </summary>
public int Priority { get; set; } = 0;

/// <summary>
/// Configures the Queue max priority
/// </summary>
public int? MaxPriority { get; set; } = null;

/// <summary>
/// Indicates if the consumer is active and should consume messages.
/// </summary>
Expand Down
11 changes: 2 additions & 9 deletions src/Marquitos.Events.RabbitMQ/Events/ManagementEvent.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
using Marquitos.Events.Consumers;
using Marquitos.Events.RabbitMQ.Enums;
using Marquitos.Events.RabbitMQ.Enums;

namespace Marquitos.Events
{
/// <summary>
/// Management event fired by management service
/// </summary>
internal class ManagementEvent<TConsumer> where TConsumer : class, IEventConsumer
internal class ManagementEvent
{
public ManagementEventActionType Action { get; set; }

/// <summary>
/// Gets the unique identifier key for this event
/// </summary>
public static string Key => $"{typeof(TConsumer).FullName}";

}
}
57 changes: 0 additions & 57 deletions src/Marquitos.Events.RabbitMQ/Events/NotifyEvent.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Marquitos.Events.RabbitMQ.Consumers;
using System;
using System.Threading.Tasks;

namespace Marquitos.Events.RabbitMQ.Extensions.Configuration
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using EasyNetQ;
using EasyNetQ.DI;
using Marquitos.Events.RabbitMQ.Consumers;
using Marquitos.Events.RabbitMQ.Services;
using Marquitos.Events.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace Marquitos.Events.RabbitMQ.Extensions.Configuration
{
Expand All @@ -18,8 +21,42 @@ public static class RabbitServiceCollectionExtension
/// </summary>
/// <param name="services">This Service Collection</param>
/// <param name="connectionString">RabbitMQ connection string.</param>
/// <param name="registerServices">Action to configure EasyNetQ parameters</param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQConnection(this IServiceCollection services, string connectionString)
public static IServiceCollection AddRabbitMQConnection(this IServiceCollection services, string connectionString, Action<IServiceRegister> registerServices)
{
// Register EasyNetQ
services.RegisterEasyNetQ(connectionString, registerServices);

return services;
}

#if NET6_0_OR_GREATER || NETSTANDARD2_0_OR_GREATER

/// <summary>
/// Register the Connection to RabbitMQ using EasyNetQ with Newtosoft serialization
/// </summary>
/// <param name="services">This Service Collection</param>
/// <param name="connectionString">RabbitMQ connection string.</param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQConnectionWithNewtonsoftJson(this IServiceCollection services, string connectionString)
{
// Register EasyNetQ
services.RegisterEasyNetQ(connectionString, o =>
{
o.EnableNewtonsoftJson();
});

return services;
}

/// <summary>
/// Register the Connection to RabbitMQ using EasyNetQ with Microsoft serialization
/// </summary>
/// <param name="services">This Service Collection</param>
/// <param name="connectionString">RabbitMQ connection string.</param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQConnectionWithSystemTextJson(this IServiceCollection services, string connectionString)
{
// Register EasyNetQ
services.RegisterEasyNetQ(connectionString, o =>
Expand All @@ -30,6 +67,8 @@ public static IServiceCollection AddRabbitMQConnection(this IServiceCollection s
return services;
}

#endif

/// <summary>
/// Register the Event Publishing service
/// </summary>
Expand Down Expand Up @@ -67,11 +106,16 @@ public static IServiceCollection AddRabbitMQEventConsumer<TConsumer, TMessage>(t
services.AddScoped<TConsumer>();
services.AddSingleton<IConsumerService, EventConsumerService<TConsumer, TMessage>>((serviceProvider) =>
{
#if NETCOREAPP2_1
var hostEnvironment = serviceProvider.GetRequiredService<IHostingEnvironment>();
#else
var hostEnvironment = serviceProvider.GetRequiredService<IHostEnvironment>();
#endif
var bus = serviceProvider.GetRequiredService<IBus>();
var conventions = serviceProvider.GetRequiredService<IConventions>();
var logger = serviceProvider.GetRequiredService<ILogger<EventConsumerService<TConsumer, TMessage>>>();

var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, logger)
var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, conventions,logger)
{
ConfigureOptions = async (sp, st) =>
{
Expand Down Expand Up @@ -102,11 +146,16 @@ public static IServiceCollection AddRabbitMQEventConsumer<TConsumer, TMessage>(t
services.AddScoped<TConsumer>();
services.AddSingleton<IConsumerService, EventConsumerService<TConsumer, TMessage>>((serviceProvider) =>
{
#if NETCOREAPP2_1
var hostEnvironment = serviceProvider.GetRequiredService<IHostingEnvironment>();
#else
var hostEnvironment = serviceProvider.GetRequiredService<IHostEnvironment>();
#endif
var bus = serviceProvider.GetRequiredService<IBus>();
var conventions = serviceProvider.GetRequiredService<IConventions>();
var logger = serviceProvider.GetRequiredService<ILogger<EventConsumerService<TConsumer, TMessage>>>();

var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, logger)
var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, conventions, logger)
{
ConfigureOptions = async (sp, st) =>
{
Expand Down Expand Up @@ -135,11 +184,16 @@ public static IServiceCollection AddRabbitMQEventConsumer<TConsumer, TMessage>(t
services.AddScoped<TConsumer>();
services.AddSingleton<IConsumerService, EventConsumerService<TConsumer, TMessage>>((serviceProvider) =>
{
#if NETCOREAPP2_1
var hostEnvironment = serviceProvider.GetRequiredService<IHostingEnvironment>();
#else
var hostEnvironment = serviceProvider.GetRequiredService<IHostEnvironment>();
#endif
var bus = serviceProvider.GetRequiredService<IBus>();
var conventions = serviceProvider.GetRequiredService<IConventions>();
var logger = serviceProvider.GetRequiredService<ILogger<EventConsumerService<TConsumer, TMessage>>>();

var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, logger)
var result = new EventConsumerService<TConsumer, TMessage>(serviceProvider, hostEnvironment, bus, conventions, logger)
{
ConfigureOptions = async (sp, st) =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Marquitos.Events.RabbitMQ.Consumers;
using System;

namespace Marquitos.Events.RabbitMQ.Extensions
{
Expand All @@ -22,7 +23,7 @@ public static void UpdateFrom(this EventConsumerOptions options, EventConsumerOp

options.PrefetchCount = sourceOptions.PrefetchCount;
options.Priority = sourceOptions.Priority;
options.QueueName = sourceOptions.QueueName;
options.MaxPriority= sourceOptions.MaxPriority;
options.Retries = sourceOptions.Retries;
options.SingleActiveConsumer = sourceOptions.SingleActiveConsumer;
options.AutoDelete = sourceOptions.AutoDelete;
Expand Down
41 changes: 34 additions & 7 deletions src/Marquitos.Events.RabbitMQ/Marquitos.Events.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TargetFrameworks>netstandard2.0;netcoreapp2.1;netcoreapp3.1;net6.0</TargetFrameworks>
<RootNamespace>Marquitos.Events.RabbitMQ</RootNamespace>
<Title>Marquitos Events RabbitMQ Implementation</Title>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
Expand Down Expand Up @@ -31,10 +29,39 @@
</None>
</ItemGroup>

<ItemGroup>
<PackageReference Include="EasyNetQ" Version="7.4.0" />
<PackageReference Include="EasyNetQ.DI.Microsoft" Version="7.4.0" />
<PackageReference Include="EasyNetQ.Serialization.SystemTextJson" Version="7.4.0" />
<!-- Target .NET 6.0 -->
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="EasyNetQ" Version="7.4.3" />
<PackageReference Include="EasyNetQ.DI.Microsoft" Version="7.4.3" />
<PackageReference Include="EasyNetQ.Serialization.NewtonsoftJson" Version="7.4.3" />
<PackageReference Include="EasyNetQ.Serialization.SystemTextJson" Version="7.4.3" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
</ItemGroup>

<!-- Target .NET Core 3.1 -->
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
<PackageReference Include="EasyNetQ" Version="6.5.2" />
<PackageReference Include="EasyNetQ.DI.Microsoft" Version="6.5.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.21" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.21" />
</ItemGroup>

<!--Target .NET Core 2.1 -->
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp2.1'">
<PackageReference Include="EasyNetQ" Version="6.5.2" />
<PackageReference Include="EasyNetQ.DI.Microsoft" Version="6.5.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" />
</ItemGroup>

<!--Target .NET Standard 2.0 -->
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="EasyNetQ" Version="7.4.3" />
<PackageReference Include="EasyNetQ.DI.Microsoft" Version="7.4.3" />
<PackageReference Include="EasyNetQ.Serialization.NewtonsoftJson" Version="7.4.3" />
<PackageReference Include="EasyNetQ.Serialization.SystemTextJson" Version="7.4.3" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
Expand Down
16 changes: 10 additions & 6 deletions src/Marquitos.Events.RabbitMQ/Services/EventConsumerManager.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
using EasyNetQ;
using Marquitos.Events.RabbitMQ.Consumers;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Marquitos.Events.RabbitMQ.Services
{
internal class EventConsumerManager<T, TMessage> : IEventConsumerManager<T> where T : EventConsumer<TMessage> where TMessage : class, IEvent
internal class EventConsumerManager<TConsumer, TMessage> : IEventConsumerManager<TConsumer> where TConsumer : EventConsumer<TMessage> where TMessage : class, IEvent
{
private readonly IConsumerService _service;
private readonly IBus _Bus;

public EventConsumerManager(IEnumerable<IConsumerService> services, IBus bus)
{
_service = services.First(e => e is EventConsumerService<T, TMessage>);
_service = services.First(e => e is EventConsumerService<TConsumer, TMessage>);
_Bus = bus;
}

Expand All @@ -19,22 +23,22 @@ public EventConsumerManager(IEnumerable<IConsumerService> services, IBus bus)

public async Task StartAsync(CancellationToken cancellationToken = default)
{
var notifyEvent = new ManagementEvent<T>
var notifyEvent = new ManagementEvent
{
Action = Enums.ManagementEventActionType.Start
};

await _Bus.PubSub.PublishAsync(notifyEvent, c => c.WithTopic(ManagementEvent<T>.Key), cancellationToken);
await _Bus.PubSub.PublishAsync(notifyEvent, c => c.WithTopic(typeof(TConsumer).FullName), cancellationToken);
}

public async Task StopAsync(CancellationToken cancellationToken = default)
{
var notifyEvent = new ManagementEvent<T>
var notifyEvent = new ManagementEvent
{
Action = Enums.ManagementEventActionType.Stop
};

await _Bus.PubSub.PublishAsync(notifyEvent, c => c.WithTopic(ManagementEvent<T>.Key), cancellationToken);
await _Bus.PubSub.PublishAsync(notifyEvent, c => c.WithTopic(typeof(TConsumer).FullName), cancellationToken);
}
}
}
Loading

0 comments on commit 43c8ca3

Please sign in to comment.