mirror of
https://github.com/alex289/CleanArchitecture.git
synced 2025-06-29 18:21:08 +00:00
feat: MassTransit (#94)
This commit is contained in:
commit
884aa80a21
@ -14,16 +14,18 @@
|
||||
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
||||
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="9.0.0" />
|
||||
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
||||
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.67.0" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
|
||||
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.70.0" />
|
||||
<PackageReference Include="MassTransit.Newtonsoft" Version="8.3.7" />
|
||||
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.7" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.3" />
|
||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
|
||||
<PackageReference Include="Swashbuckle.AspNetCore.Annotations" Version="7.3.1" />
|
||||
</ItemGroup>
|
||||
|
@ -1,5 +1,5 @@
|
||||
using System;
|
||||
using CleanArchitecture.Domain.Rabbitmq;
|
||||
using CleanArchitecture.Domain.Settings;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace CleanArchitecture.Api.Extensions;
|
||||
@ -11,7 +11,6 @@ public static class ConfigurationExtensions
|
||||
{
|
||||
var isAspire = configuration["ASPIRE_ENABLED"] == "true";
|
||||
|
||||
var rabbitEnabled = configuration["RabbitMQ:Enabled"];
|
||||
var rabbitHost = configuration["RabbitMQ:Host"];
|
||||
var rabbitPort = configuration["RabbitMQ:Port"];
|
||||
var rabbitUser = configuration["RabbitMQ:Username"];
|
||||
@ -19,7 +18,6 @@ public static class ConfigurationExtensions
|
||||
|
||||
if (isAspire)
|
||||
{
|
||||
rabbitEnabled = "true";
|
||||
var connectionString = configuration["ConnectionStrings:RabbitMq"];
|
||||
|
||||
var rabbitUri = new Uri(connectionString!);
|
||||
@ -33,7 +31,6 @@ public static class ConfigurationExtensions
|
||||
{
|
||||
Host = rabbitHost ?? "",
|
||||
Port = int.Parse(rabbitPort ?? "0"),
|
||||
Enabled = bool.Parse(rabbitEnabled ?? "false"),
|
||||
Username = rabbitUser ?? "",
|
||||
Password = rabbitPass ?? ""
|
||||
};
|
||||
|
@ -4,19 +4,21 @@ using CleanArchitecture.Api.BackgroundServices;
|
||||
using CleanArchitecture.Api.Extensions;
|
||||
using CleanArchitecture.Application.Extensions;
|
||||
using CleanArchitecture.Application.gRPC;
|
||||
using CleanArchitecture.Domain.Consumers;
|
||||
using CleanArchitecture.Domain.Extensions;
|
||||
using CleanArchitecture.Domain.Rabbitmq.Extensions;
|
||||
using CleanArchitecture.Infrastructure.Database;
|
||||
using CleanArchitecture.Infrastructure.Extensions;
|
||||
using CleanArchitecture.ServiceDefaults;
|
||||
using HealthChecks.ApplicationStatus.DependencyInjection;
|
||||
using HealthChecks.UI.Client;
|
||||
using MassTransit;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
@ -33,11 +35,6 @@ if (builder.Environment.IsProduction())
|
||||
builder.Services.AddZenFirewall();
|
||||
}
|
||||
|
||||
builder.Services
|
||||
.AddHealthChecks()
|
||||
.AddDbContextCheck<ApplicationDbContext>()
|
||||
.AddApplicationStatus();
|
||||
|
||||
var isAspire = builder.Configuration["ASPIRE_ENABLED"] == "true";
|
||||
|
||||
var rabbitConfiguration = builder.Configuration.GetRabbitMqConfiguration();
|
||||
@ -47,23 +44,22 @@ var dbConnectionString = isAspire
|
||||
? builder.Configuration["ConnectionStrings:Database"]
|
||||
: builder.Configuration["ConnectionStrings:DefaultConnection"];
|
||||
|
||||
if (builder.Environment.IsProduction())
|
||||
{
|
||||
builder.Services
|
||||
.AddHealthChecks()
|
||||
.AddSqlServer(dbConnectionString!)
|
||||
.AddRedis(redisConnectionString!, "Redis")
|
||||
.AddRabbitMQ(
|
||||
async _ =>
|
||||
builder.Services
|
||||
.AddHealthChecks()
|
||||
.AddDbContextCheck<ApplicationDbContext>()
|
||||
.AddApplicationStatus()
|
||||
.AddSqlServer(dbConnectionString!)
|
||||
.AddRedis(redisConnectionString!, "Redis")
|
||||
.AddRabbitMQ(
|
||||
async _ =>
|
||||
{
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
Uri = new Uri(rabbitConfiguration.ConnectionString),
|
||||
};
|
||||
return await factory.CreateConnectionAsync();
|
||||
},
|
||||
name: "RabbitMQ");
|
||||
}
|
||||
Uri = new Uri(rabbitConfiguration.ConnectionString),
|
||||
};
|
||||
return await factory.CreateConnectionAsync();
|
||||
},
|
||||
name: "RabbitMQ");
|
||||
|
||||
builder.Services.AddDbContext<ApplicationDbContext>(options =>
|
||||
{
|
||||
@ -82,7 +78,48 @@ builder.Services.AddCommandHandlers();
|
||||
builder.Services.AddNotificationHandlers();
|
||||
builder.Services.AddApiUser();
|
||||
|
||||
builder.Services.AddRabbitMqHandler(rabbitConfiguration);
|
||||
builder.Services.AddMassTransit(x =>
|
||||
{
|
||||
x.AddConsumer<FanoutEventConsumer>();
|
||||
x.AddConsumer<TenantUpdatedEventConsumer>();
|
||||
|
||||
x.UsingRabbitMq((context, cfg) =>
|
||||
{
|
||||
cfg.ConfigureNewtonsoftJsonSerializer(settings =>
|
||||
{
|
||||
settings.TypeNameHandling = TypeNameHandling.Objects;
|
||||
settings.NullValueHandling = NullValueHandling.Ignore;
|
||||
return settings;
|
||||
});
|
||||
cfg.UseNewtonsoftJsonSerializer();
|
||||
cfg.ConfigureNewtonsoftJsonDeserializer(settings =>
|
||||
{
|
||||
settings.TypeNameHandling = TypeNameHandling.Objects;
|
||||
settings.NullValueHandling = NullValueHandling.Ignore;
|
||||
return settings;
|
||||
});
|
||||
|
||||
cfg.Host(rabbitConfiguration.Host, (ushort)rabbitConfiguration.Port, "/", h => {
|
||||
h.Username(rabbitConfiguration.Username);
|
||||
h.Password(rabbitConfiguration.Password);
|
||||
});
|
||||
|
||||
// Every instance of the service will receive the message
|
||||
cfg.ReceiveEndpoint("clean-architecture-fanout-event-" + Guid.NewGuid(), e =>
|
||||
{
|
||||
e.Durable = false;
|
||||
e.AutoDelete = true;
|
||||
e.ConfigureConsumer<FanoutEventConsumer>(context);
|
||||
e.DiscardSkippedMessages();
|
||||
});
|
||||
cfg.ReceiveEndpoint("clean-architecture-fanout-events", e =>
|
||||
{
|
||||
e.ConfigureConsumer<TenantUpdatedEventConsumer>(context);
|
||||
e.DiscardSkippedMessages();
|
||||
});
|
||||
cfg.ConfigureEndpoints(context);
|
||||
});
|
||||
});
|
||||
|
||||
builder.Services.AddHostedService<SetInactiveUsersService>();
|
||||
|
||||
@ -148,7 +185,7 @@ app.MapControllers();
|
||||
app.MapGrpcService<UsersApiImplementation>();
|
||||
app.MapGrpcService<TenantsApiImplementation>();
|
||||
|
||||
app.Run();
|
||||
await app.RunAsync();
|
||||
|
||||
// Needed for integration tests web application factory
|
||||
public partial class Program
|
||||
|
@ -15,7 +15,7 @@
|
||||
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.1.0" />
|
||||
<PackageReference Include="Aspire.Hosting.RabbitMQ" Version="9.1.0" />
|
||||
<PackageReference Include="Aspire.Hosting.Redis" Version="9.1.0" />
|
||||
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.0.0" />
|
||||
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.1.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
@ -6,7 +6,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
@ -8,11 +8,11 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
|
||||
<PackageReference Include="FluentValidation" Version="11.11.0" />
|
||||
<PackageReference Include="MassTransit" Version="8.3.7" />
|
||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.3" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||
<PackageReference Include="RabbitMQ.Client" Version="7.1.1" />
|
||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.0" />
|
||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
@ -1,6 +0,0 @@
|
||||
namespace CleanArchitecture.Domain.Constants;
|
||||
|
||||
public sealed class Messaging
|
||||
{
|
||||
public const string ExchangeNameNotifications = "exchange-notifications";
|
||||
}
|
22
CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
Normal file
22
CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
Normal file
@ -0,0 +1,22 @@
|
||||
using System.Threading.Tasks;
|
||||
using CleanArchitecture.Shared.Events;
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CleanArchitecture.Domain.Consumers;
|
||||
|
||||
public sealed class FanoutEventConsumer : IConsumer<FanoutDomainEvent>
|
||||
{
|
||||
private readonly ILogger<FanoutEventConsumer> _logger;
|
||||
|
||||
public FanoutEventConsumer(ILogger<FanoutEventConsumer> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task Consume(ConsumeContext<FanoutDomainEvent> context)
|
||||
{
|
||||
_logger.LogInformation("FanoutDomainEventConsumer: {FanoutDomainEvent}", context.Message);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
using System.Threading.Tasks;
|
||||
using CleanArchitecture.Shared.Events.Tenant;
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace CleanArchitecture.Domain.Consumers;
|
||||
|
||||
public sealed class TenantUpdatedEventConsumer : IConsumer<TenantUpdatedEvent>
|
||||
{
|
||||
private readonly ILogger<TenantUpdatedEventConsumer> _logger;
|
||||
|
||||
public TenantUpdatedEventConsumer(ILogger<TenantUpdatedEventConsumer> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task Consume(ConsumeContext<TenantUpdatedEvent> context)
|
||||
{
|
||||
_logger.LogInformation("TenantUpdatedEventConsumer: {TenantId}", context.Message.AggregateId);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
@ -1,27 +1,33 @@
|
||||
using System.Threading.Tasks;
|
||||
using CleanArchitecture.Domain.Constants;
|
||||
using CleanArchitecture.Domain.Rabbitmq;
|
||||
using CleanArchitecture.Domain.Interfaces;
|
||||
using CleanArchitecture.Shared.Events;
|
||||
using MassTransit;
|
||||
|
||||
namespace CleanArchitecture.Domain.EventHandler.Fanout;
|
||||
|
||||
public sealed class FanoutEventHandler : IFanoutEventHandler
|
||||
{
|
||||
private readonly RabbitMqHandler _rabbitMqHandler;
|
||||
private readonly IPublishEndpoint _massTransit;
|
||||
private readonly IUser _user;
|
||||
|
||||
public FanoutEventHandler(
|
||||
RabbitMqHandler rabbitMqHandler)
|
||||
IPublishEndpoint massTransit, IUser user)
|
||||
{
|
||||
_rabbitMqHandler = rabbitMqHandler;
|
||||
_rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications);
|
||||
_massTransit = massTransit;
|
||||
_user = user;
|
||||
}
|
||||
|
||||
public Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event)
|
||||
public async Task<T> HandleDomainEventAsync<T>(T @event) where T : DomainEvent
|
||||
{
|
||||
_rabbitMqHandler.EnqueueExchangeMessage(
|
||||
Messaging.ExchangeNameNotifications,
|
||||
@event);
|
||||
var fanoutDomainEvent =
|
||||
new FanoutDomainEvent(
|
||||
@event.AggregateId,
|
||||
@event,
|
||||
_user.GetUserId());
|
||||
|
||||
await _massTransit.Publish(fanoutDomainEvent);
|
||||
await _massTransit.Publish(@event);
|
||||
|
||||
return Task.FromResult(@event);
|
||||
return @event;
|
||||
}
|
||||
}
|
@ -5,5 +5,5 @@ namespace CleanArchitecture.Domain.EventHandler.Fanout;
|
||||
|
||||
public interface IFanoutEventHandler
|
||||
{
|
||||
Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event);
|
||||
Task<T> HandleDomainEventAsync<T>(T @event) where T : DomainEvent;
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class BindQueueToExchange : IRabbitMqAction
|
||||
{
|
||||
private readonly string _exchangeName;
|
||||
private readonly string _queueName;
|
||||
private readonly string _routingKey;
|
||||
|
||||
public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
|
||||
{
|
||||
_exchangeName = exchangeName;
|
||||
_routingKey = routingKey;
|
||||
_queueName = queueName;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
await channel.QueueBindAsync(_queueName, _exchangeName, _routingKey);
|
||||
}
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class CreateExchange : IRabbitMqAction
|
||||
{
|
||||
private readonly string _name;
|
||||
private readonly string _type;
|
||||
|
||||
public CreateExchange(string name, string type)
|
||||
{
|
||||
_name = name;
|
||||
_type = type;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
await channel.ExchangeDeclareAsync(_name, _type);
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class CreateQueue : IRabbitMqAction
|
||||
{
|
||||
public string QueueName { get; }
|
||||
|
||||
public CreateQueue(string queueName)
|
||||
{
|
||||
QueueName = queueName;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
await channel.QueueDeclareAsync(
|
||||
QueueName,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
null);
|
||||
}
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public interface IRabbitMqAction
|
||||
{
|
||||
Task Perform(IChannel channel);
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class RegisterConsumer : IRabbitMqAction
|
||||
{
|
||||
private readonly Func<string, string, string, ConsumeEventHandler, Task> _addConsumer;
|
||||
private readonly ConsumeEventHandler _consumer;
|
||||
private readonly string _exchange;
|
||||
private readonly string _queue;
|
||||
private readonly string _routingKey;
|
||||
|
||||
public RegisterConsumer(
|
||||
string exchange,
|
||||
string queue,
|
||||
string routingKey,
|
||||
ConsumeEventHandler consumer,
|
||||
Func<string, string, string, ConsumeEventHandler, Task> addConsumer)
|
||||
{
|
||||
_exchange = exchange;
|
||||
_queue = queue;
|
||||
_routingKey = routingKey;
|
||||
_consumer = consumer;
|
||||
_addConsumer = addConsumer;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
await _addConsumer(_exchange, _queue, _routingKey, _consumer);
|
||||
}
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class SendAcknowledgement : IRabbitMqAction
|
||||
{
|
||||
public ulong DeliveryTag { get; }
|
||||
|
||||
public SendAcknowledgement(ulong deliveryTag)
|
||||
{
|
||||
DeliveryTag = deliveryTag;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
await channel.BasicAckAsync(DeliveryTag, false);
|
||||
}
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Newtonsoft.Json;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
|
||||
public sealed class SendMessage : IRabbitMqAction
|
||||
{
|
||||
private static readonly JsonSerializerSettings s_serializerSettings =
|
||||
new() { TypeNameHandling = TypeNameHandling.Objects };
|
||||
|
||||
private readonly string _exchange;
|
||||
private readonly object _message;
|
||||
|
||||
private readonly string _routingKey;
|
||||
|
||||
|
||||
/// <param name="routingKey">If exchange is empty, this is the name of the queue</param>
|
||||
public SendMessage(string routingKey, string exchange, object message)
|
||||
{
|
||||
_routingKey = routingKey;
|
||||
_exchange = exchange;
|
||||
_message = message;
|
||||
}
|
||||
|
||||
public async Task Perform(IChannel channel)
|
||||
{
|
||||
var json = JsonConvert.SerializeObject(_message, s_serializerSettings);
|
||||
|
||||
var content = Encoding.UTF8.GetBytes(json);
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
_exchange,
|
||||
_routingKey,
|
||||
content);
|
||||
}
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
||||
|
||||
public delegate Task<bool> ConsumeEventHandler(ReadOnlyMemory<byte> content);
|
@ -1,18 +0,0 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq.Extensions;
|
||||
|
||||
public static class ServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddRabbitMqHandler(
|
||||
this IServiceCollection services,
|
||||
RabbitMqConfiguration configuration)
|
||||
{
|
||||
services.AddSingleton(configuration);
|
||||
|
||||
services.AddSingleton<RabbitMqHandler>();
|
||||
services.AddHostedService(serviceProvider => serviceProvider.GetService<RabbitMqHandler>()!);
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
@ -1,240 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using CleanArchitecture.Domain.Rabbitmq.Actions;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
||||
|
||||
public sealed class RabbitMqHandler : BackgroundService
|
||||
{
|
||||
private readonly RabbitMqConfiguration _configuration;
|
||||
|
||||
private readonly ConcurrentDictionary<string, List<ConsumeEventHandler>> _consumers = new();
|
||||
|
||||
private readonly ILogger<RabbitMqHandler> _logger;
|
||||
|
||||
private readonly ConcurrentQueue<IRabbitMqAction> _pendingActions = new();
|
||||
private IChannel? _channel;
|
||||
|
||||
public RabbitMqHandler(
|
||||
RabbitMqConfiguration configuration,
|
||||
ILogger<RabbitMqHandler> logger)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Connection will not be established");
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Starting RabbitMQ connection");
|
||||
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
AutomaticRecoveryEnabled = true,
|
||||
HostName = _configuration.Host,
|
||||
Port = _configuration.Port,
|
||||
UserName = _configuration.Username,
|
||||
Password = _configuration.Password
|
||||
};
|
||||
|
||||
var connection = await factory.CreateConnectionAsync(cancellationToken);
|
||||
_channel = await connection.CreateChannelAsync(null, cancellationToken);
|
||||
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
|
||||
public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.",
|
||||
exchangeName);
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(new CreateExchange(exchangeName, type));
|
||||
}
|
||||
|
||||
public void InitializeQueues(params string[] queueNames)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues.");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var queue in queueNames)
|
||||
{
|
||||
_pendingActions.Enqueue(new CreateQueue(queue));
|
||||
}
|
||||
}
|
||||
|
||||
public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange.");
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey));
|
||||
}
|
||||
|
||||
public void AddConsumer(string queueName, ConsumeEventHandler consumer)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer.");
|
||||
return;
|
||||
}
|
||||
|
||||
// routingKey is set to queueName to mimic rabbitMQ
|
||||
_pendingActions.Enqueue(
|
||||
new RegisterConsumer(
|
||||
string.Empty,
|
||||
queueName,
|
||||
queueName,
|
||||
consumer,
|
||||
AddEventConsumer));
|
||||
}
|
||||
|
||||
public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer.");
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(
|
||||
new RegisterConsumer(
|
||||
exchange,
|
||||
queue,
|
||||
routingKey,
|
||||
consumer,
|
||||
AddEventConsumer));
|
||||
}
|
||||
|
||||
public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer)
|
||||
{
|
||||
AddExchangeConsumer(exchange, string.Empty, queue, consumer);
|
||||
}
|
||||
|
||||
private async Task AddEventConsumer(string exchange, string queueName, string routingKey,
|
||||
ConsumeEventHandler consumer)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Event consumer will not be added.");
|
||||
return;
|
||||
}
|
||||
|
||||
var key = $"{exchange}-{routingKey}";
|
||||
|
||||
if (!_consumers.TryGetValue(key, out var consumers))
|
||||
{
|
||||
consumers = new List<ConsumeEventHandler>();
|
||||
_consumers.TryAdd(key, consumers);
|
||||
|
||||
var eventHandler = new AsyncEventingBasicConsumer(_channel!);
|
||||
eventHandler.ReceivedAsync += CallEventConsumersAsync;
|
||||
|
||||
await _channel!.BasicConsumeAsync(queueName, false, eventHandler);
|
||||
}
|
||||
|
||||
consumers.Add(consumer);
|
||||
}
|
||||
|
||||
private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea)
|
||||
{
|
||||
var key = $"{ea.Exchange}-{ea.RoutingKey}";
|
||||
|
||||
if (!_consumers.TryGetValue(key, out var consumers))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var consumer in consumers)
|
||||
{
|
||||
try
|
||||
{
|
||||
await consumer(ea.Body);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error while handling event in queue {RoutingKey}", ea.RoutingKey);
|
||||
}
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag));
|
||||
}
|
||||
|
||||
|
||||
public void EnqueueMessage(string queueName, object message)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message));
|
||||
}
|
||||
|
||||
public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "")
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingActions.Enqueue(new SendMessage(routingKey, exchange, message));
|
||||
}
|
||||
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (!_configuration.Enabled)
|
||||
{
|
||||
_logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started");
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
await HandleEnqueuedActions();
|
||||
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleEnqueuedActions()
|
||||
{
|
||||
while (_pendingActions.TryDequeue(out var action))
|
||||
{
|
||||
try
|
||||
{
|
||||
await action.Perform(_channel!);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error while trying to send a rabbitmq message");
|
||||
_pendingActions.Enqueue(action);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,10 +1,9 @@
|
||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
||||
namespace CleanArchitecture.Domain.Settings;
|
||||
|
||||
public sealed class RabbitMqConfiguration
|
||||
{
|
||||
public string Host { get; set; } = string.Empty;
|
||||
public int Port { get; set; }
|
||||
public bool Enabled { get; set; }
|
||||
public string Username { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
|
@ -12,10 +12,10 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
|
@ -13,8 +13,8 @@
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.2" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.3" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
|
||||
<PackageReference Include="NUnit" Version="4.3.2" />
|
||||
<PackageReference Include="NUnit.Analyzers" Version="4.6.0">
|
||||
|
@ -66,7 +66,7 @@ internal class GlobalSetupFixture
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Creation of the respawner can fail if the database has not been created yet
|
||||
TestContext.WriteLine($"Failed to create respawner: {ex.Message}");
|
||||
await TestContext.Out.WriteLineAsync($"Failed to create respawner: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,9 @@ public sealed class HealthChecksTests
|
||||
[Test, Order(0)]
|
||||
public async Task Should_Return_Healthy()
|
||||
{
|
||||
// Wait some time to let the services get healthy
|
||||
await Task.Delay(2000);
|
||||
|
||||
var response = await _fixture.ServerClient.GetAsync("/healthz");
|
||||
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||
|
||||
|
@ -13,9 +13,9 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Google.Protobuf" Version="3.29.3" />
|
||||
<PackageReference Include="Google.Protobuf.Tools" Version="3.29.3" />
|
||||
<PackageReference Include="Grpc.AspNetCore" Version="2.67.0" />
|
||||
<PackageReference Include="Google.Protobuf" Version="3.30.1" />
|
||||
<PackageReference Include="Google.Protobuf.Tools" Version="3.30.1" />
|
||||
<PackageReference Include="Grpc.AspNetCore" Version="2.70.0" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
@ -10,15 +10,15 @@
|
||||
<ItemGroup>
|
||||
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
||||
|
||||
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
|
||||
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
|
||||
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
|
||||
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
|
||||
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.1" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.EntityFrameworkCore" Version="1.0.0-beta.12" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.9.0-beta.1" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.0" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.1" />
|
||||
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.1" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MassTransit" Version="8.3.7" />
|
||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||
</ItemGroup>
|
||||
|
||||
|
@ -1,8 +1,10 @@
|
||||
using System;
|
||||
using MassTransit;
|
||||
using MediatR;
|
||||
|
||||
namespace CleanArchitecture.Shared.Events;
|
||||
|
||||
[ExcludeFromTopology]
|
||||
public abstract class DomainEvent : Message, INotification
|
||||
{
|
||||
public DateTime Timestamp { get; private set; }
|
||||
|
18
CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
Normal file
18
CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
Normal file
@ -0,0 +1,18 @@
|
||||
using System;
|
||||
|
||||
namespace CleanArchitecture.Shared.Events;
|
||||
|
||||
public class FanoutDomainEvent : DomainEvent
|
||||
{
|
||||
public DomainEvent DomainEvent { get; }
|
||||
public Guid? UserId { get; }
|
||||
|
||||
public FanoutDomainEvent(
|
||||
Guid aggregateId,
|
||||
DomainEvent domainEvent,
|
||||
Guid? userId) : base(aggregateId)
|
||||
{
|
||||
DomainEvent = domainEvent;
|
||||
UserId = userId;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user