Show / Hide Table of Contents

UgenTec.MassTransit QuickStart

The UgenTec.MassTransit Block contains all Ugentec opinionated logic and infrastructure for using a MassTransit messagebus in applications.

Installation

Use the internal Ugentec Nuget-2 feed to install the UgenTec.MassTransit library

install-package UgenTec.MassTransit

Creating a message consumer

Classes which should act as message consumers should implement the IMessageConsumer<T> interface. T can ben any simple class which serves as message.

public class TestMessageConsumer : IMessageConsumer<TestMessage>
{
    public async Task Consume(TestMessage message)
    {
        ///...
    }
}

Startup.cs : Register MassTransit usage

Make sure all consumer are registered with the dependency injection system, either explicitly or through use of Scrutor assembly scanning.

Note : Consumers should be registered 'AsSelf', so not as their implementing interface. This because the UgenTec MassTransit infrastructure wraps the consumers with an adapter which wraps the consumers defined here.

services.AddScoped<TestMessageConsumer>();

or

services.Scan(x => x.FromAssembliesOf(typeof(Bootstrapper))
    .AddClasses(c => c.AssignableTo(typeof(IMessageConsumer<>)))
    .AsSelf()
    .WithScopedLifetime());

Next wire-up MassTransit infrastructure using the appropriate ServiceCollectionExtension method. 2 variants exist, AddInMemoryMassTransitBus and AddAzureServiceBusMassTransit.

The InMemory-variant is used for local development purposes while the Azure Service Bus backed variant should be used for all production scenario's as it offers resilience and load distrubution capabilities.

In Memory Example

services.AddInMemoryMassTransitBus(new[]
{
    new EndpointConfiguration()
    {
        ConsumerTypes = new[] {typeof(TestMessageConsumer)},
        QueueName = "masstransit_test_commands"
    }
}, new[] {(typeof(TestMessage), "masstransit_test_commands"), "masstransit_test_commands")});

For configuring the MassTransit infrastructure we need to supply it with both endpoint and endpoint mappings configurations.

EndpointConfiguration

EndpointConfiguration describes a logical endpoint and consists of a queue name and one or more consumers. It is perfectly fine to add consumers for different messagetypes to the same endpoint. Queue should be unique, having multiple endpoints registered on the same queue is not supported by MassTransit.

EndpointMapping

EndpointMapping describes which messages are sent to which queue when sending messages to the servicebus.

Azure Service Bus Example

var massTransitConfiguration = new MassTransitServiceBusConfigurationOptions();
configuration.GetSection(MassTransitServiceBusConfigurationOptions.SectionName).Bind(massTransitConfiguration);

services.AddAzureServiceBusMassTransit(new[] {
        new EndpointConfiguration()
        {
            ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer)},
            QueueName = "masstransit_test_commands"
        }
    }, 
    new[] {
        (typeof(TestMessage), "masstransit_test_commands"), (typeof(ThrowingTestMessage), "masstransit_test_commands")
    },
    massTransitConfiguration
);

For configuring the MassTransit infrastructure we need to supply it with endpoint, endpoint mappings and azure service bus specific configurations.

EndpointConfiguration

EndpointConfiguration describes a logical endpoint and consists of a queue name and one or more consumers. It is perfectly fine to add consumers for different messagetypes to the same endpoint. Queue should be unique, having multiple endpoints registered on the same queue is not supported by MassTransit.

EndpointMapping

EndpointMapping describes which messages are sent to which queue when sending messages to the servicebus.

Azure Service Bus specific configuration

ServiceBus configuration options require

  • MessageLockDuration: The duration of the message lock. Default value is 30 seconds.
  • MessageLockRenewalTimeout: The message lock renewal timeout.
  • ConnectionString: The Azure Service Bus connection string.
  • EnvironmentSpecificNamingPrefix: This prefix is prepended to the logical queue name when creating the physical queues in Azure Service Bus.
  • EnvironmentSpecificNamingSuffix: This suffix is appended to the logical queue name when creating the physical queues in Azure Service Bus. This setting should only be used for local development cases.
    {{hostname}} placeholder can be used here to append the name of the executing host to the queue name.

These values should be configured through environment variables.

Local development setup example for running from IDE (launchsettings.json)
{
      "environmentVariables": {
        ...
          "MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING": "<<AZURE SERVICEBUS CONNECTIONSTRING>>", 
          "MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKDURATION": "00:00:30", 
          "MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKRENEWALTIMEOUT": "00:05:00",
          "MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX": "weu-d-fi-masstransit",
          "MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGSUFFIX": "{{hostname}}", 
        ...

By using this configuration logical queue and topic names will be prefixed with weu-d-fi-masstransit and suffixed with the hostname that the development environment is running on. e.g. weu-d-fi-masstransit-fastfinder_archive_commands-ugen-123456789. This ensures isolation between development environments of different developers.

Local development setup example for running from docker (docker-compose)
    environment:
      - MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING=<<AZURE SERVICE BUS CONNECTIONSTRING>>
      - MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKDURATION=00:00:30
      - MASSTRANSIT__AZURESERVICEBUS__MESSAGELOCKRENEWALTIMEOUT=00:05:00
      - MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX=weu-d-fi-masstransit-
      - MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGSUFFIX=${COMPUTERNAME}

By using this configuration logical queue and topic names will be prefixed with weu-d-fi-masstransit and suffixed with the hostname of the host that runs the docker instance. This is achieved by the docker environment setup script, which replaces the ${COMPUTERNAME} token with the name of the host when starting docker instances.
We cannot rely on the {{hostname}} placeholder in this case as it would append the hostname of the docker component to the queuename, which changes with every new container that's started.

Devops release configuration (k8s yaml and devops pipeline variables)
        env:
          - name: MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING
            value: #{<applicationidentifier>_MASSTRANSIT__AZURESERVICEBUS__CONNECTIONSTRING}#
          - name: MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX
            value: #{<applicationidentifier>_MASSTRANSIT__AZURESERVICEBUS__ENVIRONMENTSPECIFICNAMINGPREFIX}#

In kubernetes environments only the environment prefix is used as there is no need for isolating queues and topic within release ring iterations. Separating release ring iterations is handled by using specific prefixes.
e.g. weu-t-rp01-rr02-2304101-fi-masstransit-genotyper_interpretation_commands

Managing concurrency

Limiting consumer concurrency

Sometimes it might be needed to limit the concurrent number of instances that can be active of a specific consumer type. Eg. when the consumer requires a lot of system resources, causing issues with the application stability.
Other examples include scenarios where multiple messages for the same entities arrive at the same time, causing optimistic concurrency issues during processing due to the parallel actions on the same entities.

For this purpose the concept of the 'ConsumerConfiguration' was introduced.

Creating a message consumer configuration class

Classes which provide additional configuration for consumers should implement the IMessageConsumerConfiguration interface.

public class TestMessageConsumerWithConfigurationConfiguration : IMessageConsumerConfiguration<TestMessage, TestMessageConsumerWithConfiguration>
{
    public int? ConcurrencyLimit => 1;
}

Registering consumer configuration classes

Consumer configuration classes should also be registered with the dependency injection system, either explicitly or through the use of Scrutor assembly scanning.

Note Consumer configuration classes should be registered as IMessageConsumerConfiguration<TMessage,TConsumer> and as singleton in order to be resolvable in the MassTransit adapter layer.

    services.Scan(x => x.FromAssemblyOf<Startup>()
        .AddClasses(c => c.AssignableTo(typeof(UgenTec.Framework.Core.Messaging.IMessageConsumerConfiguration<,>)))
        .AsImplementedInterfaces()
        .WithSingletonLifetime());
                

or

    services.AddSingleton<Framework.Core.Messaging.IMessageConsumerConfiguration<TestMessage,TestMessageConsumerWithConfiguration>, TestMessageConsumerWithConfigurationConfiguration>();

Limiting message concurrency

In cases where multiple consumers act on a single message, it might be needed to limit the amount of messages that are concurrently processed. This is a more drastic approach that has a much higher impact on the overall throughput of the system than limiting a single consumer.

Message concurrency can be configured on EndpointConfiguration :

var massTransitConfiguration = new MassTransitServiceBusConfigurationOptions();
configuration.GetSection(MassTransitServiceBusConfigurationOptions.SectionName).Bind(massTransitConfiguration);

services.AddAzureServiceBusMassTransit(new[] {
        new EndpointConfiguration()
        {
            ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer)},
            QueueName = "masstransit_test_commands",
            ConcurrencyLimit = 1 //Limits the number of messages concurrently processed
        }
    }, 
    new[] {
        (typeof(TestMessage), "masstransit_test_commands"), (typeof(ThrowingTestMessage), "masstransit_test_commands")
    },
    massTransitConfiguration
);

Message concurrency limit vs Consumer concurrency limit

Consider a scenario where 2 consumers act on an endpoint. One consumer that is privy to concurrency issues (let's call that the X consumer), the other not having any restrictions (the N consumer). Now let's queue 10 messages for the endpoint.

In case of consumer limiting, we can limit only the X consumer that is privy to concurrency issues to handle messages one at a time. Mass Transit will acquire locks on all the messages (or as many as is determined by the CPU count of the host) and pass each message to all consumers. This will in turn spin up 10 N consumers (no limit) and 1 X consumer. The N consumers will all continue processing the workload in parallel, while the X consumer will be sequentially invoked for each message. Important to understand here is that the messages will stay locked on the underlying servicebus for as long as it takes for both consumers to act on the message.

In case of message concurrency limiting, we'll limit the number of messages that MassTransit processes concurrently. So if we limit message concurrency to 1 concurrent message, MassTransit will lock 1 message, present it to all consumers and only after all consumers finish, release that message and move on to the next one.

For endpoints that only have 1 consumer for a specific message type both mechanisms will result in the same effect.
Only when multiple consumers act upon a message both mechanisms will behave differently and it's dependent on the specific use-case to determine which mechanism to use.

Adding resilience

In-memory retry (first level retry)

Some exceptions may be caused by a transient condition, such as a network failure, a busy service bus namespace, storage account, or some similar type of situation which usually clears up on a second attempt. With these exception types, it is often desirable to retry the message delivery to the consumer, allowing the consumer to try the operation again. Endpoints can be configured with an exponential backoff retry strategy. The retry happens in memory and maintains a lock on the message. As such, they it should only be used to handle short, transient error conditions. It's important to align the retry policy with the MessageLockDuration so that in case the maximum number of retries should occur, the processing still completes (or finally fails) before the message lock expires and the message is received by another consumer again.

To configure first level retry, configure it on the endpoint configuration :

    services.AddAzureServiceBusMassTransit(new[]
    {
        new EndpointConfiguration()
        {
            ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer), typeof(TestMessageConsumerWithConfiguration)},
            ConsumerConfigurationTypes = new[] {typeof(TestMessageConsumerWithConfigurationConfiguration)},
            QueueName = "masstransit_test_commands",
            ConcurrencyLimit = 1,
            FirstLevelRetryConfiguration = new ExponentialRetryConfiguration(4, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)),
        }
    }

Redelivery (second level retry)

Some errors take a while to resolve, say when a remote service is down or elastic search is under stress. In these situations, it's better to temporarily suspend the processing of the message to a later point in time. Redelivery is a form of retry where the message is removed from the queue and then redelivered to the queue at a future time. Endpoints can be configured with an exponential backoff redelivery strategy.

To configure second level retry, configure it on the endpoint configuration :

    services.AddAzureServiceBusMassTransit(new[]
    {
        new EndpointConfiguration()
        {
            ConsumerTypes = new[] {typeof(TestMessageConsumer), typeof(ThrowingTestMessageConsumer), typeof(TestMessageConsumerWithConfiguration)},
            ConsumerConfigurationTypes = new[] {typeof(TestMessageConsumerWithConfigurationConfiguration)},
            QueueName = "masstransit_test_commands",
            ConcurrencyLimit = 1,
            SecondLevelRetryConfiguration = new ExponentialRetryConfiguration(3, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15))
        }
    }
In This Article
  • Installation
  • Creating a message consumer
  • Startup.cs : Register MassTransit usage
    • In Memory Example
      • EndpointConfiguration
      • EndpointMapping
    • Azure Service Bus Example
      • EndpointConfiguration
      • EndpointMapping
      • Azure Service Bus specific configuration
  • Managing concurrency
    • Limiting consumer concurrency
    • Creating a message consumer configuration class
      • Registering consumer configuration classes
    • Limiting message concurrency
    • Message concurrency limit vs Consumer concurrency limit
  • Adding resilience
    • In-memory retry (first level retry)
    • Redelivery (second level retry)
Back to top Generated by DocFX