Convey.MessageBrokers.RabbitMQ
Comprehensive RabbitMQ integration for building event-driven microservices with automatic message routing, retry mechanisms, dead letter handling, and extensive configuration options.
Installation
dotnet add package Convey.MessageBrokers.RabbitMQ
Overview
Convey.MessageBrokers.RabbitMQ provides:
- Message publishing - Reliable message publishing with confirmations
- Message subscription - Automatic message handling and routing
- Convention-based routing - Automatic exchange and queue configuration
- Retry mechanisms - Built-in retry policies for failed messages
- Dead letter handling - Automatic dead letter queue setup
- Context propagation - Correlation and tracing context passing
- SSL/TLS support - Secure connections with certificate validation
- Plugin architecture - Extensible message processing pipeline
Configuration
Basic Setup
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddConvey()
.AddRabbitMq()
.AddCommandHandlers() // Register command handlers (from Convey.CQRS.Commands)
.AddEventHandlers(); // Register event handlers (from Convey.CQRS.Events)
var app = builder.Build();
app.UseRabbitMq();
app.Run();
RabbitMQ Options
Configure in appsettings.json:
{
"rabbitmq": {
"connectionName": "my-service",
"hostNames": ["localhost"],
"port": 5672,
"virtualHost": "/",
"username": "guest",
"password": "guest",
"requestedHeartbeat": "00:01:00",
"requestedConnectionTimeout": "00:00:30",
"messageProcessingTimeout": "00:05:00",
"retries": 3,
"retryInterval": 1000,
"messagesPersisted": true,
"requeueFailedMessages": false,
"conventions": {
"casing": "snakeCase"
},
"exchange": {
"declare": true,
"durable": true,
"autoDelete": false,
"type": "topic"
},
"queue": {
"declare": true,
"durable": true,
"exclusive": false,
"autoDelete": false
},
"qos": {
"prefetchSize": 0,
"prefetchCount": 10,
"global": false
},
"deadLetter": {
"enabled": true,
"prefix": "dlx",
"ttl": 86400000
}
}
}
Key Features
1. Event Publishing
public class UserCreatedEvent : IEvent
{
public Guid UserId { get; }
public string Name { get; }
public string Email { get; }
public DateTime CreatedAt { get; }
public UserCreatedEvent(Guid userId, string name, string email)
{
UserId = userId;
Name = name;
Email = email;
CreatedAt = DateTime.UtcNow;
}
}
public class UserService
{
private readonly IBusPublisher _publisher;
public UserService(IBusPublisher publisher)
{
_publisher = publisher;
}
public async Task CreateUserAsync(CreateUserCommand command)
{
// Create user logic
var user = new User(command.Name, command.Email);
await userRepository.AddAsync(user);
// Publish event
var @event = new UserCreatedEvent(user.Id, user.Name, user.Email);
await _publisher.PublishAsync(@event);
}
}
2. Event Handling
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
private readonly ILogger<UserCreatedEventHandler> _logger;
public UserCreatedEventHandler(IEmailService emailService, ILogger<UserCreatedEventHandler> logger)
{
_emailService = emailService;
_logger = logger;
}
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Handling UserCreatedEvent for user {UserId}", @event.UserId);
try
{
await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
_logger.LogInformation("Welcome email sent to {Email}", @event.Email);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send welcome email to {Email}", @event.Email);
throw; // This will trigger retry mechanism
}
}
}
// Multiple handlers for the same event
public class UserAnalyticsEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly IAnalyticsService _analyticsService;
public UserAnalyticsEventHandler(IAnalyticsService analyticsService)
{
_analyticsService = analyticsService;
}
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
{
await _analyticsService.TrackUserRegistrationAsync(@event.UserId, @event.CreatedAt);
}
}
3. Message Routing Conventions
// Automatic routing based on message type
public class OrderCreatedEvent : IEvent
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
}
// Will automatically route to:
// Exchange: order_created_event (based on class name)
// Queue: order_service.order_created_event (based on service name + event name)
// Routing Key: order_created_event
// Custom routing with attributes
[Message("orders", "order.created", "order-processing-queue")]
public class CustomOrderEvent : IEvent
{
public Guid OrderId { get; set; }
public string Status { get; set; }
}
4. Advanced Message Publishing
public class OrderService
{
private readonly IBusPublisher _publisher;
public OrderService(IBusPublisher publisher)
{
_publisher = publisher;
}
public async Task ProcessOrderAsync(Order order)
{
// Publish with custom routing
await _publisher.PublishAsync(
new OrderProcessedEvent(order.Id, order.Status),
routingKey: "order.processed.high-priority"
);
// Publish with message properties
await _publisher.PublishAsync(
new OrderShippedEvent(order.Id, order.TrackingNumber),
messageId: Guid.NewGuid().ToString(),
correlationId: order.CorrelationId,
timestamp: DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
headers: new Dictionary<string, object>
{
{ "source", "order-service" },
{ "version", "1.0" },
{ "priority", "high" }
}
);
// Delayed publishing (if DelayedExchange plugin is enabled)
await _publisher.PublishAsync(
new OrderReminderEvent(order.Id),
delay: TimeSpan.FromHours(24)
);
}
}
5. Error Handling and Retries
public class PaymentEventHandler : IEventHandler<PaymentProcessedEvent>
{
private readonly IPaymentService _paymentService;
private readonly ILogger<PaymentEventHandler> _logger;
public PaymentEventHandler(IPaymentService paymentService, ILogger<PaymentEventHandler> logger)
{
_paymentService = paymentService;
_logger = logger;
}
public async Task HandleAsync(PaymentProcessedEvent @event, CancellationToken cancellationToken = default)
{
try
{
await _paymentService.UpdatePaymentStatusAsync(@event.PaymentId, @event.Status);
}
catch (TransientException ex)
{
_logger.LogWarning(ex, "Transient error processing payment {PaymentId}, will retry", @event.PaymentId);
throw; // Will be retried according to retry policy
}
catch (PermanentException ex)
{
_logger.LogError(ex, "Permanent error processing payment {PaymentId}, moving to DLQ", @event.PaymentId);
throw new RejectException("Permanent failure", ex); // Will be moved to dead letter queue
}
}
}
// Custom exception handling
public class CustomExceptionToMessageMapper : IExceptionToMessageMapper
{
public object Map(Exception exception, object message)
{
return new FailedMessage
{
Message = message,
Error = exception.Message,
StackTrace = exception.StackTrace,
Timestamp = DateTime.UtcNow,
CanRetry = exception is not PermanentException
};
}
}
Advanced Configuration
1. SSL/TLS Configuration
{
"rabbitmq": {
"ssl": {
"enabled": true,
"serverName": "rabbitmq.example.com",
"certificatePath": "/path/to/certificate.pfx",
"certificatePassword": "password",
"acceptablePolicyErrors": ["RemoteCertificateNotAvailable"],
"checkCertificateRevocation": true
}
}
}
2. High Availability Setup
{
"rabbitmq": {
"hostNames": ["rabbit1.example.com", "rabbit2.example.com", "rabbit3.example.com"],
"port": 5672,
"networkRecoveryInterval": "00:00:05",
"requestedHeartbeat": "00:01:00",
"requestedConnectionTimeout": "00:00:30"
}
}
3. Performance Tuning
{
"rabbitmq": {
"qos": {
"prefetchCount": 50,
"prefetchSize": 0,
"global": false
},
"maxProducerChannels": 10,
"messageProcessingTimeout": "00:02:00"
}
}
4. Plugin Integration
builder.Services.AddConvey()
.AddRabbitMq(plugins: registry => registry
.Add<MessageTracingPlugin>()
.Add<MessageDeduplicationPlugin>()
.Add<MessageCompressionPlugin>()
);
public class MessageTracingPlugin : RabbitMqPlugin
{
public override async Task HandleAsync(object message, object correlationContext,
Func<Task> next)
{
using var activity = StartActivity(message.GetType().Name);
try
{
await next();
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
}
Message Patterns
1. Request-Response Pattern
// Request
public class GetUserQuery : IRequest<UserDto>
{
public Guid UserId { get; set; }
}
// Response
public class UserDto
{
public Guid Id { get; set; }
public string Name { get; set; }
public string Email { get; set; }
}
// Handler
public class GetUserQueryHandler : IRequestHandler<GetUserQuery, UserDto>
{
private readonly IUserRepository _userRepository;
public GetUserQueryHandler(IUserRepository userRepository)
{
_userRepository = userRepository;
}
public async Task<UserDto> HandleAsync(GetUserQuery query, CancellationToken cancellationToken = default)
{
var user = await _userRepository.GetAsync(query.UserId);
return new UserDto
{
Id = user.Id,
Name = user.Name,
Email = user.Email
};
}
}
// Usage
public class UserController : ControllerBase
{
private readonly IBusPublisher _publisher;
public UserController(IBusPublisher publisher)
{
_publisher = publisher;
}
[HttpGet("{id:guid}")]
public async Task<ActionResult<UserDto>> GetUser(Guid id)
{
var query = new GetUserQuery { UserId = id };
var response = await _publisher.SendAsync<GetUserQuery, UserDto>(query);
return Ok(response);
}
}
2. Saga Pattern
public class OrderSaga : ISaga<OrderCreatedEvent>,
ISaga<PaymentProcessedEvent>,
ISaga<InventoryReservedEvent>
{
private readonly IBusPublisher _publisher;
private readonly ISagaRepository _repository;
public OrderSaga(IBusPublisher publisher, ISagaRepository repository)
{
_publisher = publisher;
_repository = repository;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
var sagaData = new OrderSagaData
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
TotalAmount = @event.TotalAmount,
Status = OrderSagaStatus.PaymentPending
};
await _repository.SaveAsync(sagaData);
// Start payment process
await _publisher.PublishAsync(new ProcessPaymentCommand
{
OrderId = @event.OrderId,
Amount = @event.TotalAmount,
CustomerId = @event.CustomerId
});
}
public async Task HandleAsync(PaymentProcessedEvent @event, CancellationToken cancellationToken = default)
{
var sagaData = await _repository.GetAsync(@event.OrderId);
if (@event.Success)
{
sagaData.Status = OrderSagaStatus.InventoryPending;
await _repository.UpdateAsync(sagaData);
await _publisher.PublishAsync(new ReserveInventoryCommand
{
OrderId = @event.OrderId,
Items = sagaData.Items
});
}
else
{
sagaData.Status = OrderSagaStatus.PaymentFailed;
await _repository.UpdateAsync(sagaData);
await _publisher.PublishAsync(new OrderFailedEvent
{
OrderId = @event.OrderId,
Reason = "Payment failed"
});
}
}
// ... Handle other events
}
Testing
Unit Testing
public class UserServiceTests
{
private readonly Mock<IBusPublisher> _publisherMock;
private readonly Mock<IUserRepository> _repositoryMock;
private readonly UserService _userService;
public UserServiceTests()
{
_publisherMock = new Mock<IBusPublisher>();
_repositoryMock = new Mock<IUserRepository>();
_userService = new UserService(_repositoryMock.Object, _publisherMock.Object);
}
[Fact]
public async Task CreateUserAsync_ValidUser_PublishesEvent()
{
// Arrange
var command = new CreateUserCommand("John Doe", "john@example.com");
// Act
await _userService.CreateUserAsync(command);
// Assert
_publisherMock.Verify(x => x.PublishAsync(
It.Is<UserCreatedEvent>(e => e.Name == command.Name && e.Email == command.Email),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<object>(),
It.IsAny<IDictionary<string, object>>()),
Times.Once);
}
}
Integration Testing
public class RabbitMqIntegrationTests : IClassFixture<RabbitMqTestFixture>
{
private readonly RabbitMqTestFixture _fixture;
private readonly IBusPublisher _publisher;
private readonly TestEventHandler _handler;
public RabbitMqIntegrationTests(RabbitMqTestFixture fixture)
{
_fixture = fixture;
_publisher = fixture.GetService<IBusPublisher>();
_handler = fixture.GetService<TestEventHandler>();
}
[Fact]
public async Task PublishEvent_ValidEvent_HandlerReceivesMessage()
{
// Arrange
var @event = new TestEvent { Id = Guid.NewGuid(), Message = "Test message" };
// Act
await _publisher.PublishAsync(@event);
// Assert
await _fixture.WaitForMessageAsync(TimeSpan.FromSeconds(5));
Assert.True(_handler.MessageReceived);
Assert.Equal(@event.Id, _handler.ReceivedEvent.Id);
}
}
public class RabbitMqTestFixture : IDisposable
{
private readonly ServiceProvider _serviceProvider;
private readonly CancellationTokenSource _cancellationTokenSource;
public RabbitMqTestFixture()
{
var services = new ServiceCollection();
services.AddConvey()
.AddRabbitMq(options =>
{
// Use test-specific configuration
options.HostNames = new[] { "localhost" };
options.VirtualHost = "/test";
options.Queue.AutoDelete = true;
})
.AddCommandHandlers() // Register command handlers
.AddEventHandlers(); // Register event handlers
services.AddSingleton<TestEventHandler>();
_serviceProvider = services.BuildServiceProvider();
_cancellationTokenSource = new CancellationTokenSource();
// Start message processing
var app = _serviceProvider.GetRequiredService<IApplicationBuilder>();
app.UseRabbitMq();
}
public async Task WaitForMessageAsync(TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
// Wait for message processing
await Task.Delay(100, cts.Token);
}
public T GetService<T>() => _serviceProvider.GetRequiredService<T>();
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_serviceProvider?.Dispose();
}
}
Best Practices
- Use meaningful event names - Event names should clearly describe what happened
- Keep events immutable - Events should be immutable after creation
- Include correlation IDs - For tracing across services
- Handle failures gracefully - Implement proper retry and dead letter handling
- Use schemas - Consider using schema registry for message evolution
- Monitor message flow - Implement proper logging and metrics
- Design for idempotency - Message handlers should be idempotent
- Version your messages - Plan for message schema evolution
Troubleshooting
Common Issues
- Connection failures
- Check RabbitMQ server status
- Verify connection string and credentials
- Check network connectivity and firewall rules
- Messages not being consumed
- Verify queue bindings and routing keys
- Check consumer registration
- Ensure proper exchange configuration
- High memory usage
- Reduce prefetch count
- Implement proper message acknowledgment
- Check for message accumulation in queues
- Performance issues
- Tune QoS settings
- Use connection pooling
- Optimize message serialization