diff --git a/CleanArchitecture.Api/Program.cs b/CleanArchitecture.Api/Program.cs index 4036e29..e63c833 100644 --- a/CleanArchitecture.Api/Program.cs +++ b/CleanArchitecture.Api/Program.cs @@ -3,6 +3,7 @@ using CleanArchitecture.Api.Extensions; using CleanArchitecture.Application.Extensions; using CleanArchitecture.Application.gRPC; using CleanArchitecture.Domain.Extensions; +using CleanArchitecture.Domain.Rabbitmq.Extensions; using CleanArchitecture.Infrastructure.Database; using CleanArchitecture.Infrastructure.Extensions; using HealthChecks.ApplicationStatus.DependencyInjection; @@ -51,6 +52,8 @@ builder.Services.AddCommandHandlers(); builder.Services.AddNotificationHandlers(); builder.Services.AddApiUser(); +builder.Services.AddRabbitMqHandler(builder.Configuration, "RabbitMQ"); + builder.Services.AddHostedService(); builder.Services.AddMediatR(cfg => { cfg.RegisterServicesFromAssemblies(typeof(Program).Assembly); }); diff --git a/CleanArchitecture.Api/appsettings.Development.json b/CleanArchitecture.Api/appsettings.Development.json index 436f0c9..3506cff 100644 --- a/CleanArchitecture.Api/appsettings.Development.json +++ b/CleanArchitecture.Api/appsettings.Development.json @@ -12,5 +12,11 @@ "Issuer": "CleanArchitectureServer", "Audience": "CleanArchitectureClient", "Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*" + }, + "RabbitMQ": { + "Host": "localhost", + "Username": "guest", + "Password": "guest", + "Enabled": "True" } } diff --git a/CleanArchitecture.Api/appsettings.Integration.json b/CleanArchitecture.Api/appsettings.Integration.json new file mode 100644 index 0000000..1913b40 --- /dev/null +++ b/CleanArchitecture.Api/appsettings.Integration.json @@ -0,0 +1,14 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "RabbitMQ": { + "Host": "localhost", + "Username": "guest", + "Password": "guest", + "Enabled": "False" + } +} diff --git a/CleanArchitecture.Api/appsettings.json b/CleanArchitecture.Api/appsettings.json index 6ea892f..ea50523 100644 --- a/CleanArchitecture.Api/appsettings.json +++ b/CleanArchitecture.Api/appsettings.json @@ -14,5 +14,11 @@ "Audience": "CleanArchitectureClient", "Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*" }, - "RedisHostName": "redis" + "RedisHostName": "redis", + "RabbitMQ": { + "Host": "rabbitmq", + "Username": "guest", + "Password": "guest", + "Enabled": "True" + } } diff --git a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj index e192c77..8a6b041 100644 --- a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj +++ b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj @@ -11,6 +11,7 @@ + diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs new file mode 100644 index 0000000..72d3f81 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs @@ -0,0 +1,22 @@ +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class BindQueueToExchange : IRabbitMqAction +{ + private readonly string _exchangeName; + private readonly string _queueName; + private readonly string _routingKey; + + public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "") + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _queueName = queueName; + } + + public void Perform(IModel channel) + { + channel.QueueBind(_queueName, _exchangeName, _routingKey); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs new file mode 100644 index 0000000..ee28a42 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs @@ -0,0 +1,20 @@ +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class CreateExchange : IRabbitMqAction +{ + private readonly string _name; + private readonly string _type; + + public CreateExchange(string name, string type) + { + _name = name; + _type = type; + } + + public void Perform(IModel channel) + { + channel.ExchangeDeclare(_name, _type); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs new file mode 100644 index 0000000..dba3883 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs @@ -0,0 +1,23 @@ +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class CreateQueue : IRabbitMqAction +{ + public string QueueName { get; } + + public CreateQueue(string queueName) + { + QueueName = queueName; + } + + public void Perform(IModel channel) + { + channel.QueueDeclare( + QueueName, + false, + false, + false, + null); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs new file mode 100644 index 0000000..4570bf4 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public interface IRabbitMqAction +{ + void Perform(IModel channel); +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs new file mode 100644 index 0000000..52fbf6c --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs @@ -0,0 +1,32 @@ +using System; +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class RegisterConsumer : IRabbitMqAction +{ + private readonly Action _addConsumer; + private readonly ConsumeEventHandler _consumer; + private readonly string _exchange; + private readonly string _queue; + private readonly string _routingKey; + + public RegisterConsumer( + string exchange, + string queue, + string routingKey, + ConsumeEventHandler consumer, + Action addConsumer) + { + _exchange = exchange; + _queue = queue; + _routingKey = routingKey; + _consumer = consumer; + _addConsumer = addConsumer; + } + + public void Perform(IModel channel) + { + _addConsumer(_exchange, _queue, _routingKey, _consumer); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs new file mode 100644 index 0000000..01e3e11 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs @@ -0,0 +1,18 @@ +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class SendAcknowledgement : IRabbitMqAction +{ + public ulong DeliveryTag { get; } + + public SendAcknowledgement(ulong deliveryTag) + { + DeliveryTag = deliveryTag; + } + + public void Perform(IModel channel) + { + channel.BasicAck(DeliveryTag, false); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs new file mode 100644 index 0000000..213bff4 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs @@ -0,0 +1,38 @@ +using System.Text; +using Newtonsoft.Json; +using RabbitMQ.Client; + +namespace CleanArchitecture.Domain.Rabbitmq.Actions; + +public sealed class SendMessage : IRabbitMqAction +{ + private static readonly JsonSerializerSettings s_serializerSettings = + new() { TypeNameHandling = TypeNameHandling.Objects }; + + private readonly string _exchange; + private readonly object _message; + + private readonly string _routingKey; + + + /// If exchange is empty, this is the name of the queue + public SendMessage(string routingKey, string exchange, object message) + { + _routingKey = routingKey; + _exchange = exchange; + _message = message; + } + + public void Perform(IModel channel) + { + var json = JsonConvert.SerializeObject(_message, s_serializerSettings); + + var content = Encoding.UTF8.GetBytes(json); + + channel.BasicPublish( + _exchange, + _routingKey, + null, + content); + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Delegates.cs b/CleanArchitecture.Domain/Rabbitmq/Delegates.cs new file mode 100644 index 0000000..70e87c7 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Delegates.cs @@ -0,0 +1,6 @@ +using System; +using System.Threading.Tasks; + +namespace CleanArchitecture.Domain.Rabbitmq; + +public delegate Task ConsumeEventHandler(ReadOnlyMemory content); \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs b/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..42ba1ca --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,22 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace CleanArchitecture.Domain.Rabbitmq.Extensions; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection AddRabbitMqHandler( + this IServiceCollection services, + IConfiguration configuration, + string rabbitMqConfigSection) + { + var rabbitMq = new RabbitMqConfiguration(); + configuration.Bind(rabbitMqConfigSection, rabbitMq); + services.AddSingleton(rabbitMq); + + services.AddSingleton(); + services.AddHostedService(serviceProvider => serviceProvider.GetService()!); + + return services; + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs b/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs new file mode 100644 index 0000000..fc9ec38 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs @@ -0,0 +1,9 @@ +namespace CleanArchitecture.Domain.Rabbitmq; + +public sealed class RabbitMqConfiguration +{ + public string Host { get; set; } = string.Empty; + public bool Enabled { get; set; } + public string Username { get; set; } = string.Empty; + public string Password { get; set; } = string.Empty; +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs new file mode 100644 index 0000000..facc475 --- /dev/null +++ b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs @@ -0,0 +1,228 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using CleanArchitecture.Domain.Rabbitmq.Actions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace CleanArchitecture.Domain.Rabbitmq; + +public sealed class RabbitMqHandler : BackgroundService +{ + private readonly RabbitMqConfiguration _configuration; + + private readonly ConcurrentDictionary> _consumers = new(); + + private readonly ILogger _logger; + + private readonly ConcurrentQueue _pendingActions = new(); + + private readonly IModel? _channel; + + private readonly IConnection? _connection; + + + public RabbitMqHandler( + RabbitMqConfiguration configuration, + ILogger logger) + { + _configuration = configuration; + _logger = logger; + + if (!configuration.Enabled) + { + logger.LogInformation("RabbitMQ is disabled. Connection will not be established"); + return; + } + + var factory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + HostName = configuration.Host, + UserName = configuration.Username, + Password = configuration.Password, + DispatchConsumersAsync = true + }; + + _connection = factory.CreateConnection(); + _channel = _connection.CreateModel(); + } + + public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout) + { + if (!_configuration.Enabled) + { + _logger.LogInformation($"RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}."); + return; + } + + _pendingActions.Enqueue(new CreateExchange(exchangeName, type)); + } + + public void InitializeQueues(params string[] queueNames) + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues."); + return; + } + + foreach (var queue in queueNames) + { + _pendingActions.Enqueue(new CreateQueue(queue)); + } + } + + public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "") + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange."); + return; + } + + _pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey)); + } + + public void AddConsumer(string queueName, ConsumeEventHandler consumer) + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer."); + return; + } + + // routingKey is set to queueName to mimic rabbitMQ + _pendingActions.Enqueue( + new RegisterConsumer( + string.Empty, + queueName, + queueName, + consumer, + AddEventConsumer)); + } + + public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer) + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer."); + return; + } + + _pendingActions.Enqueue( + new RegisterConsumer( + exchange, + queue, + routingKey, + consumer, + AddEventConsumer)); + } + + public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer) + { + AddExchangeConsumer(exchange, string.Empty, queue, consumer); + } + + private void AddEventConsumer(string exchange, string queueName, string routingKey, ConsumeEventHandler consumer) + { + var key = $"{exchange}-{routingKey}"; + + if (!_consumers.TryGetValue(key, out var consumers)) + { + consumers = new List(); + _consumers.TryAdd(key, consumers); + + var eventHandler = new AsyncEventingBasicConsumer(_channel); + eventHandler.Received += CallEventConsumersAsync; + + _channel!.BasicConsume(queueName, false, eventHandler); + } + + consumers.Add(consumer); + } + + private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea) + { + var key = $"{ea.Exchange}-{ea.RoutingKey}"; + + if (!_consumers.TryGetValue(key, out var consumers)) + { + return; + } + + foreach (var consumer in consumers) + { + try + { + await consumer(ea.Body); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error while handling event in queue {ea.RoutingKey}"); + } + } + + _pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag)); + } + + + public void EnqueueMessage(string queueName, object message) + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message"); + return; + } + + _pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message)); + } + + public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "") + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message"); + return; + } + + _pendingActions.Enqueue(new SendMessage(routingKey, exchange, message)); + } + + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!_configuration.Enabled) + { + _logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started"); + return; + } + + while (true) + { + HandleEnqueuedActions(); + + await Task.Delay(1000, stoppingToken); + } + } + + private void HandleEnqueuedActions() + { + while (_pendingActions.TryDequeue(out var action)) + { + try + { + action.Perform(_channel!); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error while trying to send a rabbitmq message"); + _pendingActions.Enqueue(action); + } + } + } +} \ No newline at end of file