Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#4.0

A Look At NServiceBus Sagas

4.88/5 (44 votes)
23 Feb 2014CPOL19 min read 153.4K   1.4K  
A small article on how to use NServiceBus to create workflows
fHo There is a small demo app which is available right here

Introduction

This article is all about using a inter process service bus to perform potentially long runnng workflows. I have chosen to use NServiceBus (NSB here after) to demonstrate this, as it offers the following attributes which I personally consider to be very important for any messaging layer:

  • Transactional
  • Durable
  • Reliable
  • Distributed
  • Scalable

In this article we will not be touching on all of those points, but we will cover a few of them. I will also do a walk through of creating a potentially long running workflow (saga in NSB speak), and talk about how the workflow state is persisted and managed between states.

I should also mention that NSB is not the only inter process bus that could do this, you could probably use others such as

  • Mass Transit
  • Rabbit MQ
  • Azure service bus (though I have no experience of that so may be speaking out of turn)

Prerequisites

NServiceBus supports various mechanisms for its persistence of data, such as

  • MSMQ
  • Raven DB
  • SQL Server, via the use of NHibernate

I have chosen t use SQL Server for this article as it something that I wanted to try. I have however also included a RavenDB version of the process that requires storage should you wish to download the RavenDB server, and want to try it out.

Since I am using SQL Server you will need to have access to that (which I guess if you are reading this article you might just have). You will then need to do the following 2 things

  1. Create a new database called "NServiceBusPersistence"
  2. Change the connection string in the App.config of the "SQLSubscriber" project to point to your own database

 

NServiceBus Introduction

NServiceBus is really a distributed messaging layer. It allows for all sorts of communications between processes, you can choose from

  • Peer to peer
  • Pub/sub
  • Send to self
  • Broadcast to all

However at the end of the day what makes this all possible is the central backbone of NServiceBus. These days NServiceBus also offers a cloud based offering. In this article I will however be focusing more on the regular NServiceBus installation (well I think it is more common at any rate), which is by using NServiceBus within an organization using MSMQ.

 As previously stated NServiceBus provides the following attributes

Transactional

This uses the standard Distributed Transaction Coordinator, to allow inter process transactions to occur. What the NServiceBus framework does is that it will only commit a Transaction when a message (or complete Saga) is completed. If an Exception is seen witin a message hanlder, NServiceBus will possibly carry out n-many retries (you must configure this), after which it will roll back the transaction.

Durable

As NServiceBus uses MSMQ as a transport (or Azure, but as I stated we are not discussing that in this article). All messages are safe, and will be maintained even after the loss of power. The usual MSMQ benefits are available.

Reliable

This is partly achieved thanks to the use of MSMQ and also thanks to the fact that NServiceBus persists things at various stages in it's messaging pipeline. As previously stated this could use MSMQ/Raven or SQL storage. This article will talk about the use of SQL storage for persistence.

Distributed

The fact that we could distribute handlers/publishers across an entire netowork, allows us to completely distribute our messaging layer.

Scalable

NServiceBus provides a load balancer called "The Distributor". I will not be covering that in this article, but if you think you would like to load balance your messaging components, "The Distributor" may be for you.

 

Demo App Setup

As previously stated NServiceBus provides a common messaging layer interface For the demo app we will be looking at using NServiceBus to accept a Command and then use a Message. This diagram may help to illustrate this further.

It can be seen that NServiceBus provides the common messaging layer backbone (for want of a better word) and that each of the processes that deals with the messages (known as an endpoint in NSB speak) has a MSMQ.

 

Image 1

It is important to note that this is only one possible configuration of how to use NServiceBus. You really can configure it to suit your needs. The above diagram is just how I chose to do it for the demo app.

 

Messages

It is fair to say that NServiceBus is all about the messages. If you want to send a command or an event to a NServiceBus endpoint, it will take the form of a message.

So what exactly is a message. Put simply it is a shared contract that all parties know how to deal with. The messages are serialized by NServiceBus into the serialization format you choose (there are many choices here Xml, Json, Bson, Binary etc etc), and deserialized by the endpoint(s) that deal with the messages (via the use of confguration and the expected handlers).

If you come from a WCF world, you can think of messages as the

[DataContract] 
objects that are shared between client and server.

There is a distinction between "Command" messages and "Message" messages, but we willl discuss this later. For now lets just see an example of the code for both

Command

C#
namespace Messages.Commands
{
    public class CreatePurchaseOrderCommand
    {
        public int PurchaseOrderId { get; set; }
        public string Description { get; set; }
    }
}
VB.NET
Namespace Commands
    Public Class CreatePurchaseOrderCommand

        Public Property PurchaseOrderId() As Integer
        Public Property Description() As String

    End Class
End Namespace

Message

C#
namespace Messages.Mess
{
    public class CheckPurchaseOrderStatusMessage
    {
        public int PurchaseOrderId { get; set; }
        public string ConfirmationDescription { get; set; }
    }
}
VB.NET
Namespace Mess
    Public Class CheckPurchaseOrderStatusMessage

        Public Property PurchaseOrderId() As Integer
        Public Property ConfirmationDescription() As String

    End Class
End Namespace

As you can see the Command and Message classes are just standard .NET classes, nothing special about them at all. We will et to the bit where we discuss he importance between Commands and Message, but for now just note that these messages are just regular .NET classes that allow you to add whatever data you want to them. You can be reassured that NServiceBus will make sure that the values for these messages will be stored when needed. 

 

Commands Vs Messages

Ok so what exactly is the difference between a command. Well to understand that a bit further, lets consider the following scenario.

We wish to design an ordering system something like Amazon, where we allow the user to place an order. When an order is placed by the customer it should be dispatched and email is sent to the client notifying them of the purchase.

Now that is a pretty simple workflow, but how do "Commands" and "Messages" fit into that?

Well if we break it down we can imagine something like the following:

Action Command Or Message?
Place order Command to start the work flow
Dispatch order Message that could advance the workflow 
Email client order confirmation  Message that could advance the workflow
 

Still not clear?

 

Ok so lets try some words. I find it helps to think of things as follows:

Command : Is something that is about to happen. Typically this would be a 1..1 type of message.

Message : Something that may continue the current long running process. This would typically be sent the current endpoint, and would ore than likely be a 1..1 type of message.

 

NOTE : The more eagle eyed amongst you may think that rather than having one big workflow we could possible have multiple endpoints, one for DispatchOrder, and another for EmailConfirmationToClient. The difference being is that if you chose to go down this route, you would need to send a message to a new endpoint rather than locally

NServiceBus also supports the idea of publishing events, this would typically be used when you want more than one endpoint to act on some event. In fact it should come as no suprise that NServiceBus distinguishes this type of message as an "Event" that is broadcast (via Bus.Publish(...)) to all interested endpoints. This is not done in the demo app, but rest assured it is completely possible, and is easily acheived by using NServiceBus. Thing is at the end of the day this is an architectual decision that only you can make.

More information of general messaging type distinctions can be found here and here. Although it does not cover the NServiceBus message concept exactly, but these links are still a worthy read

Hosting The Bus 

NServiceBus offers a few hosting solutions, you may choose from: 

  • Running the standalone NServiceBus.Host.exe executable 
  • Hosting NServiceBus inside a windows service, which just runs the standalone NServiceBus.Host.exe executable
  • Or go completely self hosted.

The code in this article uses the last option where we self host the bus. This is done as follows:

The hosting code for the demo article is slightly different depending on which endpoint you are taking about.

Command Sender

C#
private static IBus CreateBusFactory()
{
    Configure.Transactions.Enable();
    Configure.Serialization.Xml();

    //Disable Sagas to stop NSB from trying to use RavenDB in this app, which we don't need since
    //we are using MSMQ subscription storage in this app
    //See : http://support.nservicebus.com/customer/portal/articles/860436-publish-subscribe-configuration
    Configure.Features.Disable<Sagas>();

    var bus = Configure.With()
        .DefaultBuilder()
        .Log4Net()
        .MsmqSubscriptionStorage()
        .DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
        .UseTransport<Msmq>()
        .UnicastBus()
        //See : http://stackoverflow.com/questions/18344962/nservicebus-4-0-3-how-to-disable-ravendb
        .DisableTimeoutManager()
        .SendOnly();

    return bus;
}
VB.NET
Private Shared Function CreateBusFactory() As IBus
    Configure.Transactions.Enable()
    Configure.Serialization.Xml()

    'Disable Sagas to stop NSB from trying to use RavenDB in this app, which we don't need since
    'we are using MSMQ subscription storage in this app
    'See : http://support.nservicebus.com/customer/portal/articles/860436-publish-subscribe-configuration
    Configure.Features.Disable(Of Sagas)()

    'See : http://stackoverflow.com/questions/18344962/nservicebus-4-0-3-how-to-disable-ravendb
    Dim bus = Configure.[With]() _
              .DefaultBuilder() _
              .Log4Net() _
              .MsmqSubscriptionStorage() _
              .DefiningCommandsAs(Function(t) t.[Namespace] IsNot Nothing _
                                      AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
              .UseTransport(Of NServiceBus.Msmq)() _
              .UnicastBus() _
              .DisableTimeoutManager() _
              .SendOnly()

    Return bus
End Function

Lets break that down a bit shall we (Note this section will only show code in C#, sorry)

Fluent API Part Description
Configure.Transactions.Enable(); Enable transactions
Configure.Serialization.Xml(); Use Xml serialization
Configure.Features.Disable<Sagas>(); Disable the Saga feature (the sender doesn't need to run long running workflows)
Configure.With() Start the standard configurator fluent API
.DefaultBuilder() Use the defalt IOC container (AutoFac at time of writing this article)
.Log4Net() Use Log4Net logging
.MsmqSubscriptionStorage() Use MSMQ subscription storage
.DefiningCommandsAs(t => t.Namespace != null &&
&    t.Namespace.StartsWith("Messages.Commands"))
Define commands as those in the namespace shown (You could also use special NSB attributes, but I feel this fluent API is nicer)
.UseTransport<Msmq>() Use MSMQ transport
.UnicastBus() Create the unicast bus
.DisableTimeoutManager() Disable the timeout manager. NSB uses RavenDB as the default timeout manager, which we do not want to setup for th command sender. So by disabling the timeout manager, we are effectively also stopping Raven DB too
.SendOnly(); Set this endpoint up as a "Send Only" endpoint. This is find as this endpoint is only "Sending" a command

Subscriber

C#
namespace SQLSubscriber.IOC
{
    public class NServiceBusInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {

            Configure.Transactions.Enable();
            Configure.Serialization.Xml();
            Configure.Features.Enable<Sagas>();
          

            Configure.With()
                     .Log4Net()
                     .CastleWindsorBuilder(container)
                     .DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
                     .DefiningMessagesAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Mess"))
                     .UseTransport<Msmq>()
                        .PurgeOnStartup(false)
                     .UnicastBus()
                     .LoadMessageHandlers()
                     .UseNHibernateSubscriptionPersister() 
                     .UseNHibernateTimeoutPersister() 
                     .UseNHibernateSagaPersister() 
                     .UseNHibernateGatewayPersister()
                     .CreateBus()
                     .Start(() => Configure.Instance.ForInstallationOn<Windows>().Install());
        }
    }
}
VB.NET
Namespace IOC
    Public Class NServiceBusInstaller
        Implements IWindsorInstaller

        Public Sub IWindsorInstaller_Install(ByVal container As IWindsorContainer, ByVal store As IConfigurationStore) Implements IWindsorInstaller.Install
            Configure.Transactions.Enable()
            Configure.Serialization.Xml()
            Configure.Features.Enable(Of Sagas)()


            Configure.[With]() _
                .Log4Net() _
                .CastleWindsorBuilder(container) _
                .DefiningCommandsAs(Function(t) t.[Namespace] _
                                        IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
                .DefiningMessagesAs(Function(t) t.[Namespace] _
                                        IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Mess")) _
                .UseTransport(Of NServiceBus.Msmq)() _
                .PurgeOnStartup(False) _
                .UnicastBus() _
                .LoadMessageHandlers() _
                .UseNHibernateSubscriptionPersister() _
                .UseNHibernateTimeoutPersister() _
                .UseNHibernateSagaPersister() _
                .UseNHibernateGatewayPersister() _
                .CreateBus() _
                .Start(Function() InstallForWindows())
        End Sub

        Private Function InstallForWindows()
            Configure.Instance.ForInstallationOn(Of Windows)().Install()
            ' return dummy value to get around VB.NET (expression does not return a value) in lambda
            ' see : http://stackoverflow.com/questions/2786753/vb-net-action-delegate-problem
            Return True
        End Function


    End Class
End Namespace

Lets break that down a bit shall we (Note this section will only show code in C#, sorry)

Fluent API Part Description
CConfigure.Transactions.Enable(); Enable transactions
Configure.Serialization.Xml(); Use Xml serialization
Configure.Features.Enable<Sagas>();/td> Enable the saga feature, as this endpoint wants to be able to run long running workflows
Configure.With() Start the standard configurator fluent API
.Log4Net() Use Log4Net logging
 .CastleWindsorBuilder(container) Use a specific castle container
.DefiningCommandsAs(t => t.Namespace != null && 
     t.Namespace.StartsWith("Messages.Commands"))
Define commands as those in the namespace shown
(You could also use special NSB attributes, but I feel this fluent API
is nicer)
.DefiningMessagesAs(t => t.Namespace != null &&
     t.Namespace.StartsWith("Messages.Mess"))
Define commands as those in the namespace shown
(You could also use special NSB attributes, but I feel this fluent API
is nicer)
.UseTransport<Msmq>()
.PurgeOnStartup(false)
Use MSMQ transport
.UnicastBus() Create the unicast bus
 .LoadMessageHandlers() Load ALL the message handlers
.UseNHibernateSubscriptionPersister() Use SQL subscrtiption storage
.UseNHibernateTimeoutPersister() Use SQL time out storage
.UseNHibernateSagaPersister() Use SQL saga storage
.UseNHibernateGatewayPersister() Use SQL gateway storage
 .CreateBus() Create the bus
.Start(() => Configure.Instance.ForInstallationOn<Windows>().Install()); Install it for Windows

As you can see the setup of NServiceBus is done using a fluent API, which is quite similar in both these cases.

 

 

 

Configuring NServiceBus

This section will discuss how to configure NServiceBus for the demo app requirements outlined above. You will most definately need to put in some work of your own if you requirements change from that of the demo app (which they 100% will, so get ready for some self learning)

Demo App Command Sender

Here is the config (App.Config) for the command sender

XML
<?xml version="1.0" encoding="utf-8"?>
<configuration>

  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" 
	type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" 
	type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="Logging" 
	type="NServiceBus.Config.Logging, NServiceBus.Core" />
    <section name="log4net" 
	type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
  </configSections>


  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />

  <UnicastBusConfig ForwardReceivedMessagesTo="audit">
    <MessageEndpointMappings>
      <!-- If you want a remote machine endpoint for messages, 
	use syntax like Endpoint="C1951@Messages.Commands.CreatePurchaseOrderCommand.SQLSubscriber" -->
      <!--<add Assembly="Messages" 
	Type="Messages.Commands.CreatePurchaseOrderCommand" 
	Endpoint="RavenSubscriber" />-->
      <add Assembly="Messages" 
	Type="Messages.Commands.CreatePurchaseOrderCommand" 
	Endpoint="SQLSubscriber" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

  <Logging Threshold="ERROR" />


</configuration>

One thing that should be quite obvious based on everything we have gone through thus far, is that the command sender, is a 1..1 type of message. As such, when we configure this NSB endpoint, we need to tell it where the messages will end up. This is done using the <MessageEndpointMappings> element that you can see above, where we specify the Endpoint of "SQLSubscriber".

This tells NSB that the current Endpoint (Commander) will be able to send commands to the configured endpoints, which in this case is only SQLSubscriber.

There is some other configuration sections at play here which aer described below

MessageForwardingInCaseOfFaultConfig

This section has one value, which is what the error queue should be called. In my case this is a very generic "error" (you like that, so do I cool isnt it) value. You can call this what you like

Logging

This section sets the logging threshold for NSB. Threshold is the only value (AFAIK)

 

 

Demo App Subscriber

Here is the config (App.Config) for the command sender

XML
<?xml version="1.0"?>
<configuration>

  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" 
	type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core"/>
    <section name="UnicastBusConfig" 
	type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
    <section name="Logging" 
	type="NServiceBus.Config.Logging, NServiceBus.Core"/>
    <section name="log4net" 
	type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
  </configSections>

  <connectionStrings>
    <add name="NServiceBus/Persistence" connectionString="......." providerName="System.Data.SqlClient"/>
  </connectionStrings>

  <!-- specify the other needed NHibernate settings like below in appSettings:-->
  <appSettings>
    <!-- dialect is defaulted to MsSql2008Dialect, if needed change accordingly -->
    <add key="NServiceBus/Persistence/NHibernate/dialect" 
	value="NHibernate.Dialect.MsSql2008Dialect" />
    <!-- other optional settings examples -->
    <add key="NServiceBus/Persistence/NHibernate/connection.provider" 
	value="NHibernate.Connection.DriverConnectionProvider" />
    <add key="NServiceBus/Persistence/NHibernate/connection.driver_class" 
	value="NHibernate.Driver.Sql2008ClientDriver" />
  </appSettings>
  

  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error"/>

  <UnicastBusConfig ForwardReceivedMessagesTo="audit">
    <MessageEndpointMappings>
    </MessageEndpointMappings>
  </UnicastBusConfig>


  <Logging Threshold="ERROR"/>

 
</configuration>

We can see that the SQLSubscriber's App.Config is quite a bit different from the Commander. Why is this?

Well for a start this aticle is mainly geared around how to create long running workflows (Sagas) in NSB, and I have chosen to have the workflow execute (as previously stated this is a design decision) in the same Endpoint. As such I do not need to send an NSB message(s)/Command(s)/Event(s) outside of the current Endpoint, which explains why you do not see any values for the <MessageEndpointMappings>

The other interesting thing to note here is, that since this endpoint is the one that will deal with the long running workflow (NSB saga), we need to configure a couple of MANDATORY (when using NHibernate SQL storage at any rate) app settings, to tell NSB about the SQL server instance it can use for storing ong running workflow (NSB saga) in.

However once you get past these 2 differences, there is no other differences between the Command and SQLSubscriber projects.

 

Running The Demo App

In order to run the demo app attached you will need to do the following:

  1. Make sure you have done the prerequisites stuff
  2. Launch the SQLSubscriber project in debug mode (or the VB.SQLSubscriber)
  3. Launch the Commander project in debug mode (or the VB.Commander)
  4. Enter some text on the Commader TextBox
  5. Click the button, which should start the Saga. You cn confirm this by putting a break point in the Saga (SQLSubscriber) Handle(..) methods.

 

NServiceBus Infrastructure

If you have gotten this far, and have started to think about everything we have talked about so far, you may be one of those people that goes:

"Mmmm, great but what about all the queues that need to be setup per environment (where I work we have 6 environments). You know someone has to create all those queues don't they. I don't want to do it!"

Well yes you are spot on, someone needs to do it for sure. Thankfully NSB does all this on your behalkf, and when you run the demo app you should see something like this:

Image 2

I certainly did not set this up. What actually happens is that NSB reads you configuration and does the heavy lifting for you, and is nice enough to create your queues for you. Neato. Thanks NSB

 

Saga's? What Are These

Ok, so now we get to the point (finally) of this article. I wanted to talk you through creating potentially long running workflows using NServiceBus. So what is a "Saga"? Well in NServiceBus, a "Saga" is a long running workflow that has these basic characteristics:

  • It is potentially (though not always) long running
  • Is made up of a number of different steps (these are encoded as message handlers in NServiceBus)
  • May be persisted to some backing store (RavenDB / MSMQ / SQL Server). This ensures that Sagas are durable
  • Sagas are reliable thanks to the MSMQ underlying transport usage
  • May be restarted after being stopped
  • May be completed

So that is essentially that is what a saga is

 

A Walk Through Of A Simple Saga

For the demo app I have fabricated a simple (potetially) long running workflow. I say potentialy, as the demo app obviously doesn't to much as I wanted to keep it simple, but it does show you the ideas/concepts, which are always the most important things to learn anyway.

Here is a diagram of the Saga that the demo app creates:

Image 3

And here is the complete code for the Saga:

C#
using System.Threading;
using Messages.Commands;
using Messages.Mess;
using NServiceBus;
using NServiceBus.Saga;
using Services;

namespace SQLSubscriber.Handlers
{
    public class MySaga : Saga<CreatePurchaseOrderSagaData>,
       IAmStartedByMessages<CreatePurchaseOrderCommand>,
       IHandleMessages<CheckPurchaseOrderStatusMessage>
    {
        public IPurchaseService PurchaseService { get; set; }

        public override void ConfigureHowToFindSaga()
        {
            ConfigureMapping<CreatePurchaseOrderCommand>
                (message => message.PurchaseOrderId)
                    .ToSaga(saga => saga.PurchaseOrderId);
            ConfigureMapping<CheckPurchaseOrderStatusMessage>
                (message => message.PurchaseOrderId)
                    .ToSaga(saga => saga.PurchaseOrderId);
        }

        public void Handle(CreatePurchaseOrderCommand message)
        {
            this.Data.TransactionId = PurchaseService.Initialise();
            this.Data.PurchaseOrderId = message.PurchaseOrderId;
            PurchaseService.Purchase(this.Data.TransactionId, 
                string.Format("1 * {0}", message.PurchaseOrderId));
            Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
                          {
                              PurchaseOrderId = this.Data.PurchaseOrderId
                          });
        }


        public void Handle(CheckPurchaseOrderStatusMessage message)
        {
            if (this.Data.Retries < 3)
            {
                bool isCompleted = PurchaseService.IsCompleted(this.Data.TransactionId);
                if (isCompleted)
                {
                    base.MarkAsComplete();
                }
                else
                {
                    Thread.Sleep(2000);
                    this.Data.Retries++;
                    Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
                                  {
                                      PurchaseOrderId = this.Data.PurchaseOrderId
                                  });
                }
            }
            else
            {
                base.MarkAsComplete();
            }
        }
    }
}
VB.NET
Imports System.Threading
Imports VB.Services.VB.Services
Imports VB.Messages.Mess
Imports VB.Messages.Commands
Imports NServiceBus
Imports NServiceBus.Saga

Namespace Handlers

    Public Class MySaga
        Inherits Saga(Of CreatePurchaseOrderSagaData)
        Implements IAmStartedByMessages(Of CreatePurchaseOrderCommand)
        Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage)

        Public Overrides Sub ConfigureHowToFindSaga()

            ConfigureMapping(Of CreatePurchaseOrderCommand) _
                (Function(message) message.PurchaseOrderId) _
                .ToSaga(Function(saga) saga.PurchaseOrderId)
            ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
                (Function(message) message.PurchaseOrderId) _
                .ToSaga(Function(saga) saga.PurchaseOrderId)

        End Sub


        Public Property PurchaseService() As IPurchaseService

        Public Sub Handle(ByVal message As CreatePurchaseOrderCommand) _
            Implements IHandleMessages(Of CreatePurchaseOrderCommand).Handle

            Me.Data.TransactionId = PurchaseService.Initialise()
            Me.Data.PurchaseOrderId = message.PurchaseOrderId
            PurchaseService.Purchase(Me.Data.TransactionId, String.Format("1 * {0}", message.PurchaseOrderId))
            Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
            checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
            Bus.SendLocal(checkPurchaseOrderStatusMessage)

        End Sub

        Public Sub Handle(ByVal message As CheckPurchaseOrderStatusMessage) _
            Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage).Handle

            If Me.Data.Retries < 3 Then
                Dim isCompleted As Boolean = PurchaseService.IsCompleted(Me.Data.TransactionId)
                If isCompleted Then
                    MyBase.MarkAsComplete()
                Else
                    Thread.Sleep(2000)
                    Me.Data.Retries += 1
                    Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
                    checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
                    Bus.SendLocal(checkPurchaseOrderStatusMessage)
                End If
            Else
                MyBase.MarkAsComplete()
            End If

        End Sub


    End Class
End Namespace

Suprisingly there are only a few concepts to learn here.

 

Concept 1 : How Do We Create A Saga

That is dead easy we just need to inherit from the Saga<T> class (or Saga(Of CreatePurchaseOrderSagaData) for VB.NET users) within NServiceBus. Where the generic is the state data type associated with the Saga (more on this later).

 

Concept 2 : How Are Sagas Started

For a Saga to be started we simply need to implement the NServiceBus interface IAmStartedByMessages<T> interface (or

IAmStartedByMessages(Of CreatePurchaseOrderCommand)
for VB.NET users). This means whenever  NServiceBus see a message of type T on the bus, it will start the Saga that the type of T is associated with.

When you implement the IAmStartedByMessages<T> interface (or IAmStartedByMessages(Of CreatePurchaseOrderCommand) you will also get a Handle(..) method that you must implement. This is the message handler code for the message type that started the Saga.

 

Concept 3 : How Are Sagas Continued

For a Saga to be started we simply need to implement the NServiceBus interface IHandleMessages<T> interface (or

IHandleMessages(Of CreatePurchaseOrderCommand)
for VB.NET users). This means whenever  NServiceBus see a message of type T on the bus, it will start the Saga that the type of T is associated with.

When you implement the IHandleMessages<T> interface (or IAmStarteIHandleMessagesdByMessages(Of CreatePurchaseOrderCommand) you will also get a Handle(..) method that you must implement. This is the message handler code for the message type that continued the Saga.

 

Concept 4 : How Do We Store State For A Saga

We will see much more on this later. But as I have already stated, Sagas and NServiceBusoffer durable messaging. So it is not unreasonable that we would want our Saga state stored too. This has been thought of, and is as simple as writing to the state object associated with the current Saga

C#
this.Data.PurchaseOrderId = message.PurchaseOrderId;
VB.NET
Me.Data.PurchaseOrderId = message.PurchaseOrderId

 

Concept 5 : How Are Sagas Completed

Eventually you will want to complete the Saga, so how do we do that?

Luckily this too is an easy operation, we just do this:

C#
base.MarkAsComplete();
VB.NET
MyBase.MarkAsComplete()

 

Saga State Storage

One of the things you may be asking yourself is how does NServiceBus store state associated with a Saga. Well the secret to that lies in the use of a simple property bag type class. For the demo app this is something like this:

C#
using System;
using NServiceBus.Saga;

namespace SQLSubscriber.Handlers
{
    public class CreatePurchaseOrderSagaData : IContainSagaData
    {
        // the following properties are mandatory
        public virtual Guid Id { get; set; }
        public virtual string Originator { get; set; }
        public virtual string OriginalMessageId { get; set; }

        // all other properties you want persisted - remember to make them virtual

        public virtual int PurchaseOrderId { get; set; }
        public virtual string Description { get; set; }
        public virtual string ConfirmationDescription { get; set; }
        public virtual int Retries { get; set; }
        public virtual Guid TransactionId { get; set; }
    }
}
VB.NET
Imports NServiceBus.Saga

Namespace Handlers
    Public Class CreatePurchaseOrderSagaData
        Implements IContainSagaData

        ' the following properties are mandatory
        Public Overridable Property Id() As Guid Implements IContainSagaData.Id
        Public Overridable Property Originator() As String Implements IContainSagaData.Originator
        Public Overridable Property OriginalMessageId() As String Implements IContainSagaData.OriginalMessageId

        ' all other properties you want persisted - remember to make them virtual
        Public Overridable Property PurchaseOrderId() As Int32
        Public Overridable Property Description() As String
        Public Overridable Property ConfirmationDescription() As String
        Public Overridable Property Retries() As Int32
        Public Overridable Property TransactionId() As Guid

    End Class
End Namespace

It can be seen that there are a number of mandatory properties that you must supply.

  • public virtual Guid Id { get; set; }
  • public virtual string Originator { get; set; }
  • public virtual string OriginalMessageId { get; set; }

These are integral to the way NServiceBus works with Saga data. The other properties are whatever you need to satisfy your Saga requirements. For me those were these properties:

  • public virtual int PurchaseOrderId { get; set; }
  • public virtual string Description { get; set; }
  • public virtual string ConfirmationDescription { get; set; }
  • public virtual int Retries { get; set; }
  • public virtual Guid TransactionId { get; set; }

So once you have this state object you may write to it within your saga code something like this:

C#
this.Data.TransactionId = PurchaseService.Initialise();
VB.NET
Me.Data.TransactionId = PurchaseService.Initialise()

So that is cool, nice and easy. But what exactly is going on here behind the scenes? How does this state get used by  NServiceBus?

Quite simply NServiceBus, will use this state infomation and will automatically persist it, to your backing persistence store of choice (be that RavenDB, SQL Server etc etc) when the saga needs to persist its data. I repeat this is done automatically for you by NServiceBus, you do not have to manage this yourself.

 

SQL Server Saga State Storage

That said if you choose to go down the SQL Server route you do need to have done 2 things before you get into the nitty gritty of your Saga code.

Step 1 : Create A Database For NServiceBus Persistence

Create a new SQL Server database. For the demo app this is called "NServiceBusPersistence". You will need to create your own one, but if you decide to call it something different rememeber to change the App.Config stuff in step 2, shown below

 

Step 2 : App.Config

Make sure you have specified the SQL Server stuff in the App.Config. This is done as follows:

XML
<?xml version="1.0"?>
<configuration>

  <connectionStrings>
    <add name="NServiceBus/Persistence" 
	connectionString="Data Source=YOUR_DATABASE_NAME;
		Initial Catalog=NServiceBusPersistence;Integrated Security=True;
		Timeout=180;MultipleActiveResultSets=true;" 
	providerName="System.Data.SqlClient"/>
  </connectionStrings>

  <!-- specify the other needed NHibernate settings like below in appSettings:-->
  <appSettings>
    <!-- dialect is defaulted to MsSql2008Dialect, if needed change accordingly -->
    <add key="NServiceBus/Persistence/NHibernate/dialect" 
	value="NHibernate.Dialect.MsSql2008Dialect" />
    <!-- other optional settings examples -->
    <add key="NServiceBus/Persistence/NHibernate/connection.provider" 
	value="NHibernate.Connection.DriverConnectionProvider" />
    <add key="NServiceBus/Persistence/NHibernate/connection.driver_class" 
	value="NHibernate.Driver.Sql2008ClientDriver" />
  </appSettings>
  <span lang="en-gb">......</span>
  <span lang="en-gb">......</span>
  <span lang="en-gb">......</span>
  <span lang="en-gb">......</span>
  <span lang="en-gb">......</span>
</configuration>

Once you have those 2 things in place, NServiceBus will automatically create the database tables it needs to manage the Saga state storage, and it will also populate the table as it sees fit with the relevant data.

For eample this is what the demo app looks like when it is running. See how there is a "CreatePurchaseOrderSagaData" table.NServiceBus created that for us.

Image 4

We can also see that this table is filled with data while the Saga is running. Again this is all handled by NServiceBus

Image 5

 

Finding The Right Saga For The Stored Data

Ok great we have managed to now save some saga data. But just how does NServiceBus know wihch of all the running Sagas this data belongs to. Well the table that automatically created by NServiceBus lets it know the type of the Saga, so that is cool. But we could have many many instances of the same Saga all running at the same time, so how do we known which one the data stored is for. Mmmm, sounds like a problem, but if you recall we had to store some some mandatory and some custom data within our saga state object. We can use any of that to tell NServiceBus how to find the correct Saga. This is done is code as follows:

C#
public override void ConfigureHowToFindSaga()
{
    ConfigureMapping<CreatePurchaseOrderCommand>
	(message => message.PurchaseOrderId)
            .ToSaga(saga => saga.PurchaseOrderId);
    ConfigureMapping<CheckPurchaseOrderStatusMessage>
	(message => message.PurchaseOrderId)
            .ToSaga(saga => saga.PurchaseOrderId);
}
VB.NET
Public Overrides Sub ConfigureHowToFindSaga()

    ConfigureMapping(Of CreatePurchaseOrderCommand) _
        (Function(message) message.PurchaseOrderId) _
            .ToSaga(Function(saga) saga.PurchaseOrderId)
    ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
        (Function(message) message.PurchaseOrderId) _
            .ToSaga(Function(saga) saga.PurchaseOrderId)

End Sub

It can be seen that I am using one of the custom state values I stored, namely "PurchaseOrderId". This is a unique value for my demo app. So that will easily find the correct Saga.

Jobs a goodun!!!!

 

What About More Complex Storage?

If you want to store more complex saga data, where you may have many interlinked tables that you really can't be bothered to flesh out, you may want to switch to RavenDB saga storage, as it will allow the storage of arbitary objects. Saying that even if you use a relational database such as SQL Server, you do have choices. XML would be a good choice. In fact someone has done just that and written a pretty good write up on this subject which you can find here:

http://www.make-awesome.com/2010/09/implementing-an-nservicebus-saga-persister/

I quite liked that linked article and felt it was well written, and very useful if you need to use a relationa database.

 

That's All

Anyway that is all I wanted to say for now. I do plan to write a small article on using Durandal (just so I can compare that experience to using Knockout.js and Angular.js), but other than that I shall be mainly trying to learn F# and blog about that from a beginners stand point. So if you enjoyed this article, and feel like you would like to give it a vote/comment that would be most welcome. Thanks for reading it. Cheerio

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)