UgenTec.MassTransit QuickStart
The UgenTec.MassTransit
Block contains all Ugentec opinionated logic and infrastructure for using a MassTransit messagebus in applications.
Installation
Use the internal Ugentec Nuget-2 feed to install the UgenTec.MassTransit library
install-package UgenTec.MassTransit
Creating a message consumer
Classes which should act as message consumers should implement the IMessageConsumer<T>
interface.
T
can ben any simple class which serves as message.
public class TestMessageConsumer : IMessageConsumer<TestMessage>
{
public async Task Consume(TestMessage message)
{
///...
}
}
Startup.cs : Register MassTransit usage
Make sure all consumer are registered with the dependency injection system, either explicitly or through use of Scrutor assembly scanning.
Note : Consumers should be registered 'AsSelf', so not as their implementing interface. This because the UgenTec MassTransit infrastructure wraps the consumers with an adapter which wraps the consumers defined here.
services.AddScoped<TestMessageConsumer>();
or
services.Scan(x => x.FromAssembliesOf(typeof(Bootstrapper))
.AddClasses(c => c.AssignableTo(typeof(IMessageConsumer<>)))
.AsSelf()
.WithScopedLifetime());
Next wire-up MassTransit infrastructure using the appropriate ServiceCollectionExtension method.
2 variants exist, AddInMemoryMassTransitBus
and AddAzureServiceBusMassTransit
.
The InMemory-variant is used for local development purposes while the Azure Service Bus backed variant should be used for all production scenario's as it offers resilience and load distrubution capabilities.
In Memory Example
services.AddInMemoryMassTransitBus(new[]
{
new EndpointConfiguration()
{
ConsumerTypes = new[] {typeof(TestMessageConsumer)},
QueueName = "masstransit_test_commands"
}
}, new[] {(typeof(TestMessage), "masstransit_test_commands"), "masstransit_test_commands")});
For configuring the MassTransit infrastructure we need to supply it with both endpoint and endpoint mappings configurations.
EndpointConfiguration
EndpointConfiguration describes a logical endpoint and consists of a queue name and one or more consumers. It is perfectly fine to add consumers for different messagetypes to the same endpoint. Queue should be unique, having multiple endpoints registered on the same queue is not supported by MassTransit.
EndpointMapping
EndpointMapping describes which messages are sent to which queue when sending messages to the servicebus.
Azure Service Bus Example
var massTransitConfiguration = new MassTransitServiceBusConfigurationOptions();
configuration.GetSection(MassTransitServiceBusConfigurationOptions.SectionName).Bind(massTransitConfiguration);
services.AddAzureServiceBusMassTransit(new[] {
new EndpointConfiguration()
{
ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer)},
QueueName = "masstransit_test_commands"
}
},
new[] {
(typeof(TestMessage), "masstransit_test_commands"), (typeof(ThrowingTestMessage), "masstransit_test_commands")
},
massTransitConfiguration
);
For configuring the MassTransit infrastructure we need to supply it with endpoint, endpoint mappings and azure service bus specific configurations.
EndpointConfiguration
EndpointConfiguration describes a logical endpoint and consists of a queue name and one or more consumers. It is perfectly fine to add consumers for different messagetypes to the same endpoint. Queue should be unique, having multiple endpoints registered on the same queue is not supported by MassTransit.
EndpointMapping
EndpointMapping describes which messages are sent to which queue when sending messages to the servicebus.
Azure Service Bus specific configuration
ServiceBus configuration options require
MessageLockDuration
: The duration of the message lock. Default value is 30 seconds.MessageLockRenewalTimeout
: The message lock renewal timeout.ConnectionString
: The Azure Service Bus connection string.EnvironmentSpecificNamingPrefix
: This prefix is prepended to the logical queue name when creating the physical queues in Azure Service Bus.EnvironmentSpecificNamingSuffix
: This suffix is appended to the logical queue name when creating the physical queues in Azure Service Bus. This setting should only be used for local development cases.
{{hostname}}
placeholder can be used here to append the name of the executing host to the queue name.
These values should be configured through environment variables.
Local development setup example for running from IDE (launchsettings.json)
{
"environmentVariables": {
...
"MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING": "<<AZURE SERVICEBUS CONNECTIONSTRING>>",
"MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKDURATION": "00:00:30",
"MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKRENEWALTIMEOUT": "00:05:00",
"MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX": "weu-d-fi-masstransit",
"MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGSUFFIX": "{{hostname}}",
...
By using this configuration logical queue and topic names will be prefixed with weu-d-fi-masstransit and suffixed with the hostname that the development environment is running on.
e.g. weu-d-fi-masstransit-fastfinder_archive_commands-ugen-123456789
.
This ensures isolation between development environments of different developers.
Local development setup example for running from docker (docker-compose)
environment:
- MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING=<<AZURE SERVICE BUS CONNECTIONSTRING>>
- MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKDURATION=00:00:30
- MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKRENEWALTIMEOUT=00:05:00
- MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX=weu-d-fi-masstransit-
- MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGSUFFIX=${COMPUTERNAME}
By using this configuration logical queue and topic names will be prefixed with weu-d-fi-masstransit and suffixed with the hostname of the host that runs the docker instance.
This is achieved by the docker environment setup script, which replaces the ${COMPUTERNAME} token with the name of the host when starting docker instances.
We cannot rely on the {{hostname}} placeholder in this case as it would append the hostname of the docker component to the queuename, which changes with every new container that's started.
Devops release configuration (k8s yaml and devops pipeline variables)
env:
- name: MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING
value: #{<applicationidentifier>_MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING}#
- name: MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX
value: #{<applicationidentifier>_MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX}#
In kubernetes environments only the environment prefix is used as there is no need for isolating queues and topic within release ring iterations. Separating release ring iterations is handled by using specific prefixes.
e.g. weu-t-rp01-rr02-2304101-fi-masstransit-genotyper_interpretation_commands
Managing concurrency
Limiting consumer concurrency
Sometimes it might be needed to limit the concurrent number of instances that can be active of a specific consumer type.
Eg. when the consumer requires a lot of system resources, causing issues with the application stability.
Other examples include scenarios where multiple messages for the same entities arrive at the same time, causing optimistic concurrency issues during processing due to the parallel actions on the same entities.
For this purpose the concept of the 'ConsumerConfiguration' was introduced.
Creating a message consumer configuration class
Classes which provide additional configuration for consumers should implement the IMessageConsumerConfiguration
interface.
public class TestMessageConsumerWithConfigurationConfiguration : IMessageConsumerConfiguration<TestMessage, TestMessageConsumerWithConfiguration>
{
public int? ConcurrencyLimit => 1;
}
Registering consumer configuration classes
Consumer configuration classes should also be registered with the dependency injection system, either explicitly or through the use of Scrutor assembly scanning.
Note Consumer configuration classes should be registered as IMessageConsumerConfiguration<TMessage,TConsumer>
and as singleton in order to be resolvable in the MassTransit adapter layer.
services.Scan(x => x.FromAssemblyOf<Startup>()
.AddClasses(c => c.AssignableTo(typeof(UgenTec.Framework.Core.Messaging.IMessageConsumerConfiguration<,>)))
.AsImplementedInterfaces()
.WithSingletonLifetime());
or
services.AddSingleton<Framework.Core.Messaging.IMessageConsumerConfiguration<TestMessage,TestMessageConsumerWithConfiguration>, TestMessageConsumerWithConfigurationConfiguration>();
Limiting message concurrency
In cases where multiple consumers act on a single message, it might be needed to limit the amount of messages that are concurrently processed. This is a more drastic approach that has a much higher impact on the overall throughput of the system than limiting a single consumer.
Message concurrency can be configured on EndpointConfiguration
:
var massTransitConfiguration = new MassTransitServiceBusConfigurationOptions();
configuration.GetSection(MassTransitServiceBusConfigurationOptions.SectionName).Bind(massTransitConfiguration);
services.AddAzureServiceBusMassTransit(new[] {
new EndpointConfiguration()
{
ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer)},
QueueName = "masstransit_test_commands",
ConcurrencyLimit = 1 //Limits the number of messages concurrently processed
}
},
new[] {
(typeof(TestMessage), "masstransit_test_commands"), (typeof(ThrowingTestMessage), "masstransit_test_commands")
},
massTransitConfiguration
);
Message concurrency limit vs Consumer concurrency limit
Consider a scenario where 2 consumers act on an endpoint. One consumer that is privy to concurrency issues (let's call that the X consumer), the other not having any restrictions (the N consumer). Now let's queue 10 messages for the endpoint.
In case of consumer limiting, we can limit only the X consumer that is privy to concurrency issues to handle messages one at a time. Mass Transit will acquire locks on all the messages (or as many as is determined by the CPU count of the host) and pass each message to all consumers. This will in turn spin up 10 N consumers (no limit) and 1 X consumer. The N consumers will all continue processing the workload in parallel, while the X consumer will be sequentially invoked for each message. Important to understand here is that the messages will stay locked on the underlying servicebus for as long as it takes for both consumers to act on the message.
In case of message concurrency limiting, we'll limit the number of messages that MassTransit processes concurrently. So if we limit message concurrency to 1 concurrent message, MassTransit will lock 1 message, present it to all consumers and only after all consumers finish, release that message and move on to the next one.
For endpoints that only have 1 consumer for a specific message type both mechanisms will result in the same effect.
Only when multiple consumers act upon a message both mechanisms will behave differently and it's dependent on the specific use-case to determine which mechanism to use.
Adding resilience
In-memory retry (first level retry)
Some exceptions may be caused by a transient condition, such as a network failure, a busy service bus namespace, storage account, or some similar type of situation which usually clears up on a second attempt. With these exception types, it is often desirable to retry the message delivery to the consumer, allowing the consumer to try the operation again.
Endpoints can be configured with an exponential backoff retry strategy. The retry happens in memory and maintains a lock on the message. As such, they it should only be used to handle short, transient error conditions.
It's important to align the retry policy with the MessageLockDuration
so that in case the maximum number of retries should occur, the processing still completes (or finally fails) before the message lock expires and the message is received by another consumer again.
To configure first level retry, configure it on the endpoint configuration :
services.AddAzureServiceBusMassTransit(new[]
{
new EndpointConfiguration()
{
ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer), typeof(TestMessageConsumerWithConfiguration)},
ConsumerConfigurationTypes = new[] {typeof(TestMessageConsumerWithConfigurationConfiguration)},
QueueName = "masstransit_test_commands",
ConcurrencyLimit = 1,
FirstLevelRetryConfiguration = new ExponentialRetryConfiguration(4, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)),
}
}
Redelivery (second level retry)
Some errors take a while to resolve, say when a remote service is down or elastic search is under stress. In these situations, it's better to temporarily suspend the processing of the message to a later point in time. Redelivery is a form of retry where the message is removed from the queue and then redelivered to the queue at a future time. Endpoints can be configured with an exponential backoff redelivery strategy.
To configure second level retry, configure it on the endpoint configuration :
services.AddAzureServiceBusMassTransit(new[]
{
new EndpointConfiguration()
{
ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer), typeof(TestMessageConsumerWithConfiguration)},
ConsumerConfigurationTypes = new[] {typeof(TestMessageConsumerWithConfigurationConfiguration)},
QueueName = "masstransit_test_commands",
ConcurrencyLimit = 1,
SecondLevelRetryConfiguration = new ExponentialRetryConfiguration(3, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15))
}
}