Contents
- Push Model
- Distributed Event-Stream Processing
- Based on the Azure Lease Blob
- Loosely decouple pattern
- Ready for runtime business composition, troubleshooting, monitoring,
etc.
- No Database
Windows Azure Storage Client Library 2.0 (packaged inside the
Windows Azure for .NET SDK 2.0) introduces new features for expanded
Azure Storage support of the 2012-02-12 REST API version with implementation for
Blob & Container Leases. This new implementation features enable to design an
event-driven distributed architecture, where the business processors can
establish and manage a lock on Azure Storage Blob & Container for write and
delete operations. This article describes how these storage features
can be utilized in the modern metadata driven architecture based on the WF 4.X declarative
programming.
Let's start with an example of a simple processing, where a state-full
message is processing in the pipeline of the three business processors such as
pre-processor, processor and post-processor, see the following picture:
As the above picture shows, the message is processing in the pipeline of the
state-less services (business processors), where each processor can update a
message. In other words, the message can hold a state of the business processors.
The process is state-less and the business context is processed in the sequential
manner.
Let's assume, the incoming message includes some multiple parts, that can be
processed in the parallel manner. The following picture shows this example:
As the above picture shows, the pre-processor is a splitter of the message
and the post-processor is an aggregator of the message. The Aggregator has a
responsibility to create the final message based on the states stored in the
database during the processing of each message via processors. This solution requires a
database for storing a specific business context state.
The business context complexity can grow, for instance: the business
processor can be a nested splitter, etc. The following picture shows this
example:
Basically as we can see, the services and messages are state-less, however,
we have a database with application specific
schema to hold a business context state. The business context state is persisted behind the Post-Processor in the
Aggregator, which is also a state-less service with sharing data in the
database. Of course, we can create a model where each processor will have
responsibility to publish a business context state via its processing. This
scenario requires a specific implementation of the database service where a
business context state is stored.
Based on the business context complexity and requirements for throughput and
small latency, we can use (instead of the Aggregator) the Microsoft StreamInsight Technology for Complex Event Processing (CEP), see
StreamInsight Service for Windows Azure. In this scenario, the business
processors are the sources for complex event processing to generate action such as
aggregated message. I do recommend to look at
StreamInsight Service for Windows Azure and
Testing the StreamInsight Service for Windows Azure for detail description
of event processing on the Cloud solution..
Another approach for holding a business context state in the event-driven
distributed architecture is the usage of Azure Storage Lease Blob. The following
picture shows our example with Lease Blob:
Basically, the Lease Blob is a Blob in the Azure Storage with exclusive write
operation. The content of the blob is a collection of the Events generated from
the sources within the Business context. Instead of the one central event engine
processor (like it is used in the CEP), this scenario is based on the distributed
event-processing, where each service will process a business context state on the
Event-Stream
stored in the Lease Blob.
Let's assume we have three business processors 1,2 and N like it is shown in the
above picture. They were created in the pre-processor splitter. Each
service has responsibility to update a current Event
in the Event-Stream
and
initialize the next Event(s)
if the process continues. Based on that, the
Pre-Processor/Splitter will create an Event-Stream
(if it doesn't exist already),
updating/creating Event 0 and initializing three Events such as Event 1, 2 and
N. When the processor is processing, its event in the Event-Stream
is updated
and creating the next Event
, and so on. As the above picture shows, the processor 2
was the last processor for splitter and therefore its responsibility is to
create a message for Post-Postprocessor and writing an Event N+1 into the
Event-Stream. Finally, the Post-Processor will update its event in the
Event-Stream and generating the finally aggregated message.
The Event-Stream
stored in the Lease Blob represents a runtime behavior of
the business context. This stream can be used during the runtime for
troubleshooting, monitoring and also for watchdog processing, where each delta
business process can be under the watchdog timer. In other words, if something
happens in the distributed process such as lose connectivity, etc., the
Event-Stream knows the details for the process recovery. Another feature of the
Event-Stream is to allow aborting the running business process based on the user
request.
Thanks to Lease Blob feature in the Azure Storage, which simplified
the implementation of the shared resource in the Azure Storage with exclusive write
in the distributed model, similar like to lock resource in the application domain.
This feature is built-in the blob service, therefore its consumer needs to
acquire lease for this specific blob for its operation such as write, delete,
etc.
OK, let's continue with the concept and design of the Using Azure Lease
Blob. I am assuming you have a working knowledge of the Windows Azure Platform
and WF Technologies.
The Concept of using Azure Lease Blob in the distributed event-driven
architecture is based on acquiring an exclusive ownership for write or delete
lease blob in the Azure Storage. Basically, the event-driven distributed process is
executing in parallel, where business processes are running concurrently
within the business context. To control this business context such as aggregation
of
messages, triggering events based on the context state, etc. we need to have a
small knowledge base for these business process states. This "knowledge base"
is a runtime log of the events into the lease blob and this content is called
the Event-Stream.
The following code snippet shows a generic Event
object stored in the
Event-Stream
:
[XmlRoot(ElementName = "Event")]
public class EventP
{
[XmlAttribute(AttributeName = "name")]
public string Name { get; set; }
[XmlAttribute(AttributeName = "status")]
public string Status { get; set; }
[XmlAttribute(AttributeName = "id")]
public string Id { get; set; }
[XmlAttribute(AttributeName = "topic")]
public string Topic { get; set; }
[XmlAttribute(AttributeName = "key")]
public string Key { get; set; }
[XmlAttribute(AttributeName = "msg")]
public string Msg { get; set; }
[XmlAttribute(AttributeName = "trackingId")]
public string TrackingId { get; set; }
[XmlAttribute(AttributeName = "created")]
public string Created { get; set; }
[XmlAttribute(AttributeName = "timestamp")]
public string Timestamp { get; set; }
[XmlAttribute(AttributeName = "timeout")]
public string Timeout { get; set; }
[XmlAttribute(AttributeName = "ref")]
public string Ref { get; set; }
[XmlAttribute(AttributeName = "tag")]
public string Tag { get; set; }
[XmlAttribute(AttributeName = "prev")]
public string Prev { get; set; }
[XmlAttribute(AttributeName = "next")]
public string Next { get; set; }
}
As you can see, the above Event
object (element) has many attributes to capture a
business state in the specific place of the business processing. These
attributes allow to reference a message, walking through Event-Stream, holding
watchdog timer, topic, key, etc.
Well, from the abstraction point of the view, the event-driven distributed
business model can publish events during the processing like it is shown in the
following picture:
The Events are stored in the blob in order how they are published by business processors, for instance, processor P0 at the time T1, P1/T2, etc. In the concurrent business processing, where more business processes are run concurrently,
there is no exact sequence of the events guaranteed, in other words, two or more publishers want to store
their events at the same time. To avoid this write collision at the blob storage, each publisher needs an exclusive access to the blob storage. That
is a great feature of the Azure Storage Blob Service, such as Lease Blob
.
Azure Lease Blob
From the MSDN documentation Lease Blob:
The Lease Blob operation establishes and manages a lock on a blob for write
and delete operations. The lock duration can be 15 to 60 seconds, or can be
infinite.
The Lease Blob operation can be called in one of five modes:
Acquire, to request a new lease.
Renew, to renew an existing lease.
Change, to change the ID of an existing lease.
Release, to free the lease if it is no longer needed so that another client may
immediately acquire a lease against the blob.
Break, to end the lease but ensure that another client cannot acquire a new
lease until the current lease period has expired.
Ok, back to our abstraction model. We know, that the Event-Driven distributed business model will publish (store) Events into the Lease Blob one by one. This collection of the Events is called Event-Stream
and it represents a runtime behavior of the business model processing. Basically, each incoming message (request) into the business model can have one Event-Stream
for capturing states within its business context. Based on this Event-Stream
, the business process can be controlled, monitored, recovered,
troubleshooted, etc.
Each publisher of the business processor in the Event-Driven distributed model must have an exclusive access for updating Event-Stream
in the Blob Storage. To get this write exclusivity, the publisher needs to acquire blob for lease (locking) purpose before this operation. The write/delete exclusivity blob access
operation is represented by LeaseId
which the client (publisher) must include in the blob operation. The lease is granted for the duration specified when the lease is acquired, which can be between 15 seconds and one minute, or an infinite duration. Of course, we don't want to block other publishers to publish their Event into the Event-Stream, therefore each publisher has responsibility to release ASAP active lease.
The following picture shows this Lease BLOB Scope feature in the publisher:
As the above picture shows, the Lease Blob Scope begins calling a blob request for Acquire
Lease. This request is repeated in the polling cycle until the LeaseId
is obtained by Azure Blob Service. That is the way (such as polling)
to acquire lease in the distributed internet connected model. Once the scope has
an exclusive write access to the blob, we can get an Event-Stream
to the memory, make necessary processing and writing back (Put) to the blob. After that, the scope must be released calling Release Lease operation.
Basically, the scope operation takes
a very short time (couple hundreds milliseconds), including the Get/Put operations, therefore locking the Event-Stream
in the distributed model is not critical, but in the case of crashing the active scope, all other clients (publishers) must wait for releasing Lease Blob by Azure Blob Service,
which can take maximum of 60 seconds.
As I mentioned earlier, the LeaseBlobScope
will have an exclusive access for Get/Put Event-Stream
stored in the Lease Blob. This process will take very little processing time.
It is executed in the memory, therefore after releasing scope we have an updated Event-Stream in the memory and it can be used for
other actions.
The following picture shows this scenario:
As the above picture shows, the LeaseBlobScope
needs to update
an Event-Stream stored in the Lease Blob. Basically, this operation consists of the
Get
Event-Stream, Select
CurrentEvent from the Event-Stream,
Updating
this CurrentEvent, Updating Status in the Event-Stream and
the last optional step is to add a NewEvent if we need it. Once the Event-Stream (located in the memory) has been updated, we can store it back to the Lease Blob on the Azure Storage.
The lease Blob is automatically released during the exiting a LeaseBlobScope
sequence, therefore it is availaible for another acquire request.
Next sequence after the LeaseBlobScope
can be used for more detail processing of the Event-Stream
, for instance, triggering an action like
it is shown in the above picture. Note, that the Event-Stream
is holding all states within the business context, therefore the Event-Stream
status can be scanned for last business parallel processor to fire a message for next action.
To simplify handling a Lease Blob in the Windows Azure Storage within the WF
Technology, the custom LeaseBlobScope
activity can help. The
following picture shows this custom activity which implementation is included in
this article:
And the following pictures show few examples of assigning expression
editor for handling Event-Stream using custom class extensions:
More details about the custom activities and Event-Stream Library can be
found in the Implementation section.
The Event-Stream in the Lease Blob can be shared by any enterprise component
that has an authority access to the Lease Blob. The components can use an Event-Stream for dashboard features (showing a progress status, etc.) and/or for controlling process such as user cancel, notification, etc.
The usage of these features is fully transparent and loosely decouple from the business model.
The following picture shows additional two components connected to the Event-Stream such as User and Watchdog:
Based on the user request, the User publisher can publish an event for canceling a running background business process. In this scenario, the Event-Stream is updated for cancel status and the business processes will take an action during the Event-Stream processing.
The second component from the above picture is a Watchdog service. This service has a responsibility to check an expiration time for Event-Stream. If for some reason, the business process is not executed
within the specific expiration time, the Watchdog publisher will abort this Event or/and Event-Stream.
Event-Stream
The Event-Stream is a collection of the Events published
by sources such as clients, business processors, etc. within the business
context of the distributed model. Each Event represents a state of the business
model. The business model can be composed during the runtime sequentially and/or
concurrently where more business processors run in parallel. For better structuring
and query events in the Event-Stream, a formatted id value is used with delimiter
character '.' to represent a new group (or thread) in the Event-Stream, for
instance: 01.00
The following pictures show an example of the SearchOrder Event-Stream for
few business processing steps:
Step 1. This is an initialize step, where OrderStart processor
is creating an Event-Stream and next 3 Events for further processing such as SearchStart. This is a splitter scenario to run searches in parallel manner.
As you can see, the Event-Stream has a root Event
(name=OrderStart,
id=00, status=inprocess). The OrderStart generates 3 messages for SearchStart
processors. The status of the Event-Stream for this step is inprocess-init-init-init
.
Step 2. In this step, the SearchStart process is invoked and in
the Event-Stream the Event SearchStart is updated and creating a new Event for
SearchDone. The following picture shows this scenario:
As you can see, the above picture shows more runtime business composition,
where a second thread 02.00 is processing faster. This thread has a SearchDone and
has available 3 Results. Note, the thread 03.00 failed and has been aborted,
which is the finally state for the Event.
Step 3. In this step, we have more business composition such as
completion, aborted and next nested search processing based on analyzing of
the received Results:
Note, that Event is finalized in the case of completed, aborted or canceled.
As the above picture shows, the Event-Stream is waiting for finalizing the business
processes in the business threads 01.02.00, 01.04.00 and 02.03.00.
The Event-Stream represents a small knowledge base of the business model
composition during the runtime. Based on the Events, we can see performance for
each business processors, message flow, data, etc. If the process crashes,
the business process can be restarted and continue from the crash point.
The Event-Stream is stored in the Azure Storage Lease Blob in the xml formatted
text.
OK, let's look at the first of the custom activities for handling a Lease Blob
within the xaml workflow and custom EventStream library. I am assuming you have a working
knowledge of the Windows Azure Platform and WF Technologies.
Adding the following
custom activities into the Workflow Designer Toolbox allows us to use the Lease Blob
features within the workflow orchestration.
As you can see, the above custom Activity library contains 5 basic Lease Blob
operations and one combined operation for LeaseBlobScope
. The
LeaseBlobScope
custom native
activity allows to execute a sequence of activities with exclusive
write/delete Lease Blob operations. This custom scope activity has a responsibility to
release a Lease Blob on the regular or fault exit.
Let's look at the LeaseBlobScope
custom activity from its implementation point of the view.
LeaseBlobScope custom activity
The following picture shows design and properties of the
LeaseBlobScope
custom activity.
The LeaseBlobScope
requires few properties for creating a Lease Blob such
as BlobAddress, ConfigurationName for account, etc.
What is important for LeaseBlobScope
custom activity is its polling
of the
Acquire request for Lease Blob in the case of receiving a collision error 409
from the Azure Blob Service. Note, that the infinity polling depends on
the number of clients acquired the same Lease Blob and from the quick release of the
active Lease Blob. In the distributed internet connectivity that's the way how
we can get an exclusivity access for writing/deleting a resource on the Azure
Storage Blob.
The following code snippet shows an implementation of the
Exceute
methode of the
LeaseBlobScope
custom activity:
protected override void Execute(NativeActivityContext context)
{
if (this.activities != null && this.Activities.Count > 0)
{
NoPersistHandle handle = this.noPersistHandle.Get(context);
handle.Enter(context);
CloudBlobClient client = StorageAccountHelper.GetCloudStorageAccount(this.ConfigurationName.Get(context)).CreateCloudBlobClient();
CloudBlockBlob blob = client.GetBlockBlobReference(this.BlobAddress.Get(context));
Func<CloudBlockBlob, TimeSpan, string, string> delegateFunc = new Func<CloudBlockBlob, TimeSpan, string, string>(LockingBlob);
var ar = delegateFunc.BeginInvoke(blob, this.LeaseTime.Get(context), this.ProposedLeaseId.Get(context), null, null);
ar.AsyncWaitHandle.WaitOne();
string lid = (string)delegateFunc.EndInvoke(ar);
this.LeaseId.Set(context, lid);
this.Result.Set(context, blob);
context.ScheduleActivity(this.Activities[0], this.onChildComplete, OnFaulted);
}
}
As you can see, before running the first activity in the scope, we need to
acquire a Lease Blob. This request is done asynchronously with delegate function
LockingBlob
:
private string LockingBlob(CloudBlockBlob blob, TimeSpan timeout, string proposedLeaseId)
{
int ii = 0;
DateTime startDT = DateTime.Now;
while (++ii > 0)
{
string lid = blob.TryAcquireLease(timeout, proposedLeaseId);
if (string.IsNullOrEmpty(lid))
{
var delayTime = TimeSpan.FromMilliseconds(new Random(Guid.NewGuid().GetHashCode()).Next(750, 2500));
Thread.Sleep(delayTime);
}
else
{
Trace.WriteLine(string.Format("TryAcquireLease[{0}, {1}ms]", ii, DateTime.Now - startDT));
return lid;
}
}
return null;
}
The LockingBlob function has only one responsibility such as getting a
leaseId
from the acquired Lease Blob. In the case of the null
value, it will wait for a
random time and then trying again to acquire a Lease Blob. Note, that the
Thread.Sleep
is sleeping within the delegated thread, which is different
from the workflow thread.
The following code snippet shows a handler for Fault within the activity
scope. You can see how the Lease Blob is released:
private void OnFaulted(NativeActivityFaultContext context, Exception exception, ActivityInstance faultedInstance)
{
this.Result.Get(context).ReleaseLease(AccessCondition.GenerateLeaseCondition(this.LeaseId.Get(context)));
NoPersistHandle handle = this.noPersistHandle.Get(context);
handle.Exit(context);
}
AcquireLeaseBlob custom activity
AcquireLeaseBlob
custom activity can be used to simply acquire
Lease Blob:
This is an async custom activity with only one request to acquire Lease
Blob. In the case of the collision error (code 409), we can sleep there for
the specified time and then exit it. The following is a code snippet for its
implementation:
protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
CloudBlobClient client = StorageAccountHelper.GetCloudStorageAccount(this.ConfigurationName.Get(context)).CreateCloudBlobClient();
CloudBlockBlob blob = client.GetBlockBlobReference(this.BlobAddress.Get(context));
context.UserState = blob;
return blob.BeginAcquireLease(this.LeaseTime.Get(context), this.ProposedLeaseId.Get(context), callback, state);
}
protected override string EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
CloudBlockBlob blob = (CloudBlockBlob)context.UserState;
try
{
return blob.EndAcquireLease(result);
}
catch (StorageException ex)
{
if (ex.RequestInformation.HttpStatusCode == 409)
{
Trace.WriteLine(string.Format("AcquireLease conflict - sleeping for {0}]", this.RetryTime.Get(context)));
Thread.Sleep(this.RetryTime.Get(context));
return null;
}
else
throw;
}
}
As you can see, this is a straightforward async implementation of the
Begin/End methods. For this custom activity, the workflow orchestration will
take care of the next polling for Lease Blob (using While loop, Retry time, etc.).
Note, that the workflow has a responsibility to release a Lease Blob using a
basic custom activity ReleaseLeaseBlob or library function.
As I mentioned earlier, the Event-Stream is a collection of the xml elements
(EventP
object). The Event-Stream is stored in the Lease Blob in xml formatted
text. For handling an Event-Stream using an Assign activity within the xaml
workflow, the following Library has been implemented.
EventStream Library
This is very lightweight library for basic manipulation with Events in the
Event-Stream such as their updating event property, checking status, etc. The
following picture shows a class view of Event object (EventP
), container of the
Events (EventStreamProcess
) with useful methods and extension class for
EventStreamProcess
.
The design concept of the EventStreamLibrary
is based on using
the methods and
properties within the Expression text, for their chaining in the Assign
activity. For instance: adding the events into the Event-Stream and selecting
specific Event from the Event-Stream can be done within one Assign activity:
Note, for selecting a specific Event in the Event-Stream we can also
used Select
or MustSelect
methods based on the xpath
expression text:
Once we have a specific event for the business state from the Event-Stream (esp),
we can make its update within one Assign activity, see the following example:
The following code snippet shows an implementation of the Update method from
the EventStreamProcessExtension
class:
public static EventStreamProcess Update(this EventStreamProcess esp, XElement element, string name, string value)
{
if (element == null || element.Name.LocalName != "Event")
throw new ArgumentException("element");
element.SetAttributeValue(name, value);
element.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());
XElement evnt = element;
if (name == "status" && (value == "completed" || value == "aborted" || value == "canceled"))
{
while (esp.IsRoot(evnt) == false)
{
Trace.WriteLine("EventStream.UpdateStatus: {0}", evnt.ToString());
evnt = esp.EventStream.XPathSelectElement(string.Format("./Event[@id='{0}' and @status='inprocess']", evnt.Attribute("prev").Value));
if (evnt == null)
break;
List<string> next = evnt.Attribute("next").Value.Split('|').ToList().Select(s => s.Trim()).ToList();
if (next.Count == 1 || esp.IsDone(next))
{
if (element.Attribute("status") != null && element.Attribute("status").Value != "completed")
evnt.SetAttributeValue("msg", "WithError");
evnt.SetAttributeValue("status", "completed");
evnt.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());
}
else
{
break;
}
}
if (esp.IsRoot(evnt))
{
List<string> next = evnt.Attribute("next").Value.Split('|').ToList().Select(s => s.Trim()).ToList();
if (next.Count == 1 || esp.IsDone(next))
{
if (element.Attribute("status") != null && element.Attribute("status").Value != "completed")
evnt.SetAttributeValue("msg", "WithError");
evnt.SetAttributeValue("status", "completed");
evnt.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());
}
}
}
return esp;
}
As you can see, there is one special case in the Update method such as
updating a status property. In this case, this method will update status all
Events within the EventThread from the specific Event up to the root Event.
The Event status can have one of the following values, where the completed,
aborted and canceled are the final statuses of the Event object in the
Event-Stream.
For testing finally status of the Event-Stream, we can use a
IsDone
method like is shows in the following example:
and the following code snippet shows how the method IsDone
implemented in the EventStreamProcess class:
public bool IsDone()
{
var root = this.EventStream.Elements("Event").ElementAt(0);
if (root.Attribute("prev") != null)
throw new ArgumentException("Internal error: The root event must not have attribute 'prev'");
return root.Attribute("status").Value == "completed" || root.Attribute("status").Value == "aborted" || root.Attribute("status").Value == "canceled";
}
CloudBlockBlobExtensions
This is an extension to CloudBlockBlob
class for custom handling
a Lease Blob. The following picture shows all extension methods:
Implementation of the above extension class is very straightforward, for
example: the
following code snippet shows how we can retrieve a blob to the XElement return
value:
public static XElement ToXml(this CloudBlockBlob blob, string leaseId)
{
using (var ms = new MemoryStream())
{
blob.DownloadToStream(ms, AccessCondition.GenerateLeaseCondition(leaseId));
ms.Position = 0;
return XElement.Load(ms);
}
}
The usage of this extension in the xaml workflow declaration is by Assign
activity, see the following example for Loading and Saving
EventStreamProcess
object:
Note, that the EventStream
Library is for simplified handling a collection of
the Events in the Event-Stream. Basically, you can use any declaratively
programming technique for get
, update
and
put
Event-Stream to/from Lease Blob, for instance LINQ. Keep on mind, that
once you are in the
LeaseBlobScope
, the business processing must be done ASAP
because you Lease Blob for your exclusive write, in other word, the
LeaseBlobScope
is locking other clients in the place, where they are asking for the same Lease
Blob for acquire lease operation.
Ok, let's look at some test cases of the Lease Blob using these custom
activities and library.
First of all, the following are prerequisites:
The AzureLeaseBlob
solution
includes 2 projects and one Test solution folder
where assemblies and samples for our
testing are stored. The following picture shows a solution
after its downloading from this article:
As you can see, the solution includes a
WorkflowDesignerTester
application, which is a re-hosted Workflow Designer on the Windows Form,
see more details about this little tool in my
WF4 Custom activities for message mediation article. Another tool what
we need to have for our testing purposes is
Azure Storage Explorer. Basically, you can
use any of your favorite explorer for Azure Storage Blob.
Before our first test, we need to create one empty blob in your account. The
following picture shows this public BlockBlob resource such as temp/eventstream
.
Let's start with very simple basic example.
Test A. - Basic example
This is a basic test of the LeaseBlobScope
custom activity in
the infinity loop. Please follow these instruction steps:
Step 1. Lunch the WorkflowDesignerTester45 program
We will use the WD 4.5 version for demonstration of the AzureLeaseBlob custom
activities. The following screen snippet is the WD after opening a file
LeaseBlobScopeBasic_Test45.xaml from the solution Samples folder.
As you can see, there is (on the left side) a workflow document outline view
panel and on the toolbox side a custom library AzureLeaseBlob.ActivityLibrary
.
Our Test Sequence is in the designer workspace. This sequence is shown in the
following screen snippet:
That's all for the test loop. DoWhile
loop is looping a
LeaseBlobScope
custom activity with a random delay (time between 750-2000 milliseconds).
The delay expression is
TimeSpan.FromMilliseconds(new Random(Guid.NewGuid().GetHashCode()).Next(750, 2000))
Inside the scope is one activity such as WriteLine
to display a
LeaseID on
the console screen.
Step 2. Azure Storage account
Before running this test, we need to populate an azure storage account in the
applicationStorageAccount
variable. In the case of using another
container Blob for our test, please also change the value of the
EventStreamRef
variable. The following picture is a screen snippet of the
variables used in our test:
Step 3. Run
Pressing the button Run
, the console self-hosted process is
created and loaded the xaml document for its execution. Our test requires to
lunch two processes to see how the LeaseBlobScope
are handling a shareable Lease
Blob in concurrent manner.
Please, press the Run button twice.
The following two console programs are lunched and displaying LeaseIDs.
While executing the LeaseBlobScope
custom activity, the scope
has an exclusive access for writing to the Lease Bob (temp/eventstream
).
This Lease Blob status is represented by displaying its LeaseID. Note, that the
other LeaseBlobScope
is acquiring Lease Blob in the pooling manner.
To see an acquire delay for Lease Blob, the following utility
DebugView can
be used. The log messages shows a total acquire pooling time and counter of
pooling.
As you can see, the above picture shows a 3 cases when the
LeaseBlobScope
needs to repeat the acquire request to the Lease Blob.
That's the situation when we have two concurrently running processes. Now we
can run more processes, for instance 5 and look at the tracer for acquire
pooling values.
You can play with workflow process, adding more activities within the scope
such as writing some contents to the blob, etc.
Ok, let's make more advanced test, where LeaseBlobScope
will
work with a Lease Blob body.
Test B. Advanced Test with Lease Blob
This test demonstrates a LeaseBlobScope
and
EventStreamLibrary
for concurrently handling an
EventStream
within the scope. The test case within the loop is to find the Event object with
status=init
in the
EventStream
and updated to
status=complete
. The loop is
finished when all Events in the EventStream are updated to
status=complete
. To
repeat the test, the blob must be deleted in the container.
First of all, we need to open a new test file.
Step 1. Open the LeaseBlobScope_v45.xaml file
Click on the File/Open to select a workflow document
LeaseBlobScope_v45.xaml
file from the Sample solution folder. The
following picture shows a Test Sequence of this loop:
Basically, the Test Sequence has two parts such as DoWhile loop and Init. The
Init part has responsibility to create an EventStream
with specific number of
Events (default value is 10) with status=init
. The DoWhile loop is also simple
and straightforward.
The first activity is to get the EventStream
from the Lease Blob
esp = new EventStreamProcess(blob.ToXml(leaseId))
then selecting an Event object with status=init
curEvent = esp.Select("./Event[@status='init']")
next step is to update this curEvent
esp = esp.Update(curEvent, "key", workerID).Update(curEvent, "status", "completed")
and last step in this sequence is to store updated EventsStream
back to the
Lease Blob
esp.EventStream = blob.FromXml(esp.EventStream, leaseId)
The following screen snippet shows this sequence:
Note, that the DoWhile condition is very simple expression using an
EventStreamLibrary such as esp.IsDone=false
Before we start our test, we have to setup Azure Storage account.
Step 2. Azure Storage account
The following highlighted variables must be changed or modified:
Variable applicationStorageAccount
must be setup for your Azure Storage
account. The other variables are optional such as EventStreamRef
and
maxEvents
.
Note, that the container temp
has been created manually in our first Test.
Basically, you can create another one for this test. It is based on your needs.
The number of Events, in the EventStream
is setup for 10, but it can be maximum
of
99. It is a string value.
Step 3. Creating EventStream
This step is for creating a Lease Blob which contains of EventStream
,
therefore press the button Run
to lunch the process. The following screen
snippet shows a result of the init sequence:
As you can see, we have an EventStream
with 10 Event objects ready for our
test. If we press the Enter key on the keyboard, the DoWhile
loop will start and only this process will be updated
EventStream
which we don't want. Our test must be proven by more than one running
processes, therefore please follow the next step.
Step 4. Run Test Loop
In this step we need to perform two actions such as pressing the button Run
on the WorkflowDesigner to create the second process and back to first one to
press Enter key on the keyboard. These actions must be done quickly.
The following picture shows two running processes:
As you can see, each process updated some Event objects in the
EventStream
in the concurrently manner without any collision for sharing
a Lease Blob.
Once the EventStream
is updated for all Events, our test is finished. The
following picture shows this situation:
You can see the property key
in the Event object who was the
exclusive owner of the Lease Blob at the moment.
The above test can be repeated again after the Lease Blob is deleted, for
instance using a 3rd party tool such as
Azure Storage Explorer.
The test can be repeated for more Events and/or running processes to see how
the LeaseBlobScope
works with a Lease Blob.
That's is all for our testing, of course you can continue with your needs for
your application specific EventStream.
This article described using an
Azure Storage Leas Blob for compositions of the event-driven distributed
business processes. The concept is based on the Event-Streaming
during a runtime business composition, where each business processor (worker)
published its state (Event) to the shareable resource called as
Event-Stream for specific business context. In the distributed internet
connectivity model, all publishers and consumers of the Event-Stream are
loosely decoupled. For this scenario, the Lease Blob is the best solution
for storing an Event-Stream with an exclusive write operation. Article
includes the custom activities for Azure Storage Lease Blob and EventStream
Library for handling a Lease Blob. The described concept of
the using Event-Stream for runtime business process composition can help your
application with scalability and parallelism in the distributed cloud-driven
architecture. I hope you enjoyed it.
[1] Windows Azure
[2] Windows Azure SDK 2.0 for .Net
[3] What's New in Storage Client Library for .NET (version 2.0)
[4] Introducing Windows Azure Storage Client Library 2.0 for .NET and Windows
Runtime
[5] Lease Blob
[6] New Blob Lease Features: Infinite Leases, Smaller Lease Times, and More
[7] StreamInsight Service for Windows Azure
[8] StreamInsight in Azure
[9] Patterns and Best
Practices for Enterprise Integration
[10] What's New in Windows Azure
[11] Azure Storage Explorer
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.