MassTransit is a free and open-source library that helps you build message-based applications and services in .NET. At the time of writing, it supports RabbitMQ, Azure Service Bus, and AmazonSqs.
MassTransit provides a framework that makes it easy to send and publish messages as well as receiving them. If you’ve ever had to incorporate a messaging system such as RabbitMQ or Azure Service Bus, you may find yourself, like me, writing wrappers or abstractions around it. With MassTransit, it helps a little that they have provided a framework to do some of the work so that you can focus on sending and consuming messages.
It also offers other features such as monitoring messages as they are received, consumed, sent and published. MassTransit refers to this as Observing messages. It also provides scheduling messages which it uses Quartz underneath.
From my understanding, there are three ways that you can send messages using MassTransit.
- Send. A message is delivered to a specific endpoint. i.e. a queue and a single consumer subscribes to this queue.
- Publish. When using publish, the message is broadcast to many consumers.
- Request and Response. MassTransit also offers a Request and Response mechanism where you can send a request and receive a response based on that request.
MassTransit can be set up with a web or console application without too much trouble. It’s all in the configuration and once it’s all set up, you can focus on the core part, which is sending, receiving and processing messages.
Getting started
We will be using Azure Service bus to build a simple web app in dotnet core to demonstrate how to send a message (via queues) and publish messages (via topics) from a web application. We will also create a simple console app that will receive messages from the queue and the topic.
The application example is based on a contrived example of flight orders and flight cancellations. Flight orders will be broadcast and can be consumed by multiple consumers. Flight cancellations will be sent to a queue where they will be processed by one consumer.
MassTransit Packages
To start, the following nuget packages are required:
Create a service bus on Azure
Create a new service bus on Azure if you don’t have one already. Just a note, to use Azure Service Bus Topics, the “standard” pricing tier is required. If you are just playing around and only need queues, the “Basic” pricing tier is sufficient.
Once the bus is created, it is not mandatory to create the queues and topics at this stage. You can either let MassTransit create them for you with their naming conventions, or you can specify the names as part of the configuration MassTransit, which is what we are going to do here.
Be sure to make a note of the connection string as highlighted below.
Create Messages to be sent
In this example, since we will be sending two types of messages, one via a queue and another via a topic, here are the two types of messages we will be sending.
public class FlightOrder
{
public Guid FlightId { get; set; }
public int OrderId { get; set; }
}
The FlightOrder
will be sent to the topic flight-orders
.
public class FlightCancellation
{
public Guid FlightId { get; set; }
public int CancellationId { get; set; }
}
The FlightCancellation
will be sent to the queue flight-cancellation
.
Create the publisher
For this example, we will create a publisher within a dotnet core web application. In other words, the web application will send and publish messages.
Configuration
In the Startup.cs
, an Azure Service Bus needs to be created in the ConfigureServices
method as shown below.
public void ConfigureServices(IServiceCollection services)
{
string connectionString = "Endpoint=sb://your-service-bus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz";
string flightOrdersTopic = "flight-orders";
// create the bus using Azure Service bus
var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
{
// specify the message FlightOrder to be sent to a specific topic
busFactoryConfig.Message<FlightOrder>(configTopology =>
{
configTopology.SetEntityName(flightOrdersTopic);
});
var host = busFactoryConfig.Host(connectionString, hostConfig =>
{
// This is optional, but you can specify the protocol to use.
hostConfig.TransportType = TransportType.AmqpWebSockets;
});
});
...
}
Here, we use the Bus.Factory.CreateUsingAzureServiceBus
static method to create an Azure Service Bus. We also setup the FlightOrder
message to be sent to the Azure Topic flight-orders
using busFactoryConfig.Message<FlightOrder>(configTopology =>{ configTopology.SetEntityName(flightOrdersTopic);});
Then, just use the provided extension method AddMassTransit
to add MassTransit.
// Add MassTransit
services.AddMassTransit(config =>
{
config.AddBus(provider => azureServiceBus);
});
To send or publish messages, we need to register IPublishEndpoint
and ISendEndpointProvider
. IPublishEndpoint
is used to publish messages and ISendEndpointProvider
is used to send messages onto a queue. Registering IBus
is optional.
// register MassTransit's IPublishEndpoint, ISendEndpointProvider and IBus which can be used to send and publish messages
services.AddSingleton<IPublishEndpoint>(azureServiceBus);
services.AddSingleton<ISendEndpointProvider>(azureServiceBus);
services.AddSingleton<IBus>(azureServiceBus);
Sending and publishing
We will create two buttons on a web page. One will trigger a publish and the other will trigger a send.
To publish a message onto an Azure topic, it will look something like this.
await _publishEndpoint.Publish<FlightOrder>(new FlightOrder { FlightId = Guid.NewGuid(), OrderId = _random.Next(1, 999) });
To send a message onto an Azure queue, we need to use the GetSendEndpoint
method to explicitly specify the queue path. In this case, the queue name we are using here is flight-cancellation
.
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("sb://masstransit-demo-xyz123.servicebus.windows.net/flight-cancellation"));
await sendEndpoint.Send<FlightCancellation>(new FlightCancellation { FlightId = Guid.NewGuid(), CancellationId = _random.Next(1, 999)});
Create the subscriber
Create a regular console app and register MassTransit similar to that from the publisher.
To receive messages, MassTransit makes it easy with the use of the IConsumer
interface.
Here’s an example to receive and consume a message for the FlightOrder
message.
public class FlightPurchasedConsumer : IConsumer<FlightOrder>
{
private readonly ILogger<FlightPurchasedConsumer> _logger;
public FlightPurchasedConsumer(ILogger<FlightPurchasedConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<FlightOrder> context)
{
_logger.LogInformation($"Order processed: FlightId:{context.Message.FlightId} - OrderId:{context.Message.OrderId}");
return Task.CompletedTask;
}
}
Here’s an example of how to consume the FlightCancellation
message.
public class FlightCancellationConsumer : IConsumer<FlightCancellation>
{
private readonly ILogger<FlightCancellationConsumer> _logger;
public FlightCancellationConsumer(ILogger<FlightCancellationConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<Messages.FlightCancellation> context)
{
_logger.LogInformation($"Flight Dequeued- FlightId:{context.Message.FlightId} CancellationId: {context.Message.CancellationId}");
return Task.CompletedTask;
}
}
Now we need to register MassTransit and configure the SubscriptionEndpoint
to listen for messages on the topic and use the ReceiveEndpoint
to listen for messages on the queue.
string connectionString =
"Endpoint=sb://masstransit-demo-xyz1231.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz";
string flightOrdersTopic = "flight-orders";
string subscriptionName = "flight-subscriber";
string queueName = "flight-canellation";
var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
{
busFactoryConfig.Message<FlightOrder>(m => { m.SetEntityName(flightOrdersTopic); });
var host = busFactoryConfig.Host(connectionString, hostConfig =>
{
// this is optional
hostConfig.TransportType = TransportType.AmqpWebSockets;
});
// setup Azure topic consumer
busFactoryConfig.SubscriptionEndpoint<FlightOrder>(host, subscriptionName, configurator =>
{
configurator.Consumer<FlightPurchasedConsumer>(provider);
});
// setup Azure queue consumer
busFactoryConfig.ReceiveEndpoint(host, queueName, configurator =>
{
configurator.Consumer<FlightQueueConsumer>(provider);
});
});
Now, any messages placed onto the Azure Topic or Azure queue, the relevant Consumer will be instantiated by MassTransit and then can be handled.
Summary
This was a brief intro on how to use and configure MassTransit to implement message-based applications. Some basic code examples were also provided to demonstrate sending messages on a topic as well as queue and receiving messages via a console application.
Full code can be found on my GitHub repository