Armat.Threading library is an alternative implementation of .NET Task Parallel Library (TPL). It enables using multiple Thread Pools within a single process, allows carrying user defined data (context) across hierarchy of asynchronous operations, is compatible with C# async / await notation, and is extensible enough to meet your own application demands.
Introduction
Asynchronous code execution mechanisms have always been one of the most complex areas in programming. With the introduction of TPL in .NET framework, parallel programming techniques became available to much broader range of developers, thus also making .NET as a framework of choice for many of us.
As a generic asynchronous programming framework, TPL solves most of the real-life problems. It can be used to
- run user-initiated operations asynchronously - thus providing smooth user experience,
- trigger parallel code execution - allowing to fully utilize hardware resources,
- build a graph of inter-dependent tasks - significantly reducing code complexity,
- and probably more scenarios I won't be bothering to talk about.
While the cases mentioned above may suffice most of the applications, occasionally I encounter limitations that are difficult to overcome with the standard tooling of .NET. While working on my C# project, I chose to take a step back and offer a generic solution for some of those. Particularly, I needed
- a way to create multiple thread pools within a process,
- mechanism to carry user context across asynchronous method invocations,
- logging / traceability of asynchronous code execution,
- and most importantly, better understanding of TPL internals.
This article will delve into the Armat.Threading library, providing practical examples of its use. Additionally, it will shed light on the implementation details of asynchronous programming in .NET, an area often lacking documentation, specifically related to TPL internals.
Armat.Threading Library
The article is divided into two parts. First part focuses on usage of Armat.Threading library with some real-life examples, while the second part aims to uncover asynchronous programing mechanisms in .NET for supporting async / await notation.
Usage of Armat.Threading for Parallel Code Execution
The Job class
The class Armat.Threading.Job
corresponds to System.Threading.Tasks.Task
in TPL. It has a similar interface to Task
class and serves as the core asynchronous execution unit of Armat.Threading
library. The class Armat.Threading.Job<T>
corresponds to System.Threading.Tasks.Task<T>
in TPL. It derives from the non-generic Job
class and has a similar interface to Task<T>
. The class Job<T>
drives most of the functionality from the Job
and is used to execute asynchronous methods which eventually return a result of type T
.
Below are some examples of executing a code asynchronously using the Armat.Threading
library:
await Job.Run(
() ->
{
Console.WriteLine("Running job asynchronously");
}).ConfigureAwait(false);
await Job.Run(
(double number) ->
{
Console.WriteLine("Running job asynchronously with arg {0}", number);
}, 0.1248).ConfigureAwait(false);
T result = await Job<T>.Run(
() ->
{
Console.WriteLine("Running job asynchronously");
return default(T);
}).ConfigureAwait(false);
T result = await Job<T>.Run(
(double number) ->
{
Console.WriteLine("Running job asynchronously with arg {0}", number);
return new T(0.1248));
}, 0.1248).ConfigureAwait(false);
There are multiple ways of running a Job
. The simplest way of doing so is to use the static Run
method (as shown above). It's also possible to create an instance of Job
object using one of the constructors. Jobs created in this way must be executed later by invoking the appropriate overload of Run
method as shown below:
public Int32 Sum(Int32[] array)
{
Job<Int32> asyncJob = new Job<Int32>(SumFn, array);
asyncJob.Run();
return asyncJob.Result;
}
private Int32 SumFn(Object? args)
{
Int32[] array = (Int32[])args;
return array.Sum();
}
Each instance of a Job
has a unique auto-incrementing Id
of Int64
type. Those can be used for debugging, logging or tracing purposes. The decision of using Int64
identifiers is based on the following:
- It has a relatively small size and does not require heap memory allocation
- Generating new identifiers (incrementing
Int64
numbers) should not cause performance impact - It should not overflow even in case of long-running applications. Assuming 1
Job/millisecond
is the rate of creating Jobs within a process, it will take about 300,000,000 years to overflow.
It's possible to identify the currently running instance of a Job
using the static property Armat.Threading.Job.Current
(it will be null if the current code is not running in context of any Armat.Threading
threads). This way the executing code can access any Job
properties (like AsyncState
, CancellationToken, Initiator
) to define its own behavior.
Note: I won't be insisting if it's the best design choice to expose the current running Job
instance with a static property. Although it simplifies the code (for instance you do not have to explicitly deliver the CancellationToken
to the nested methods for supporting cancellation), but it also allows any method running in context of a Job to read information it probably shouldn't have access to. It could also obscure the code due to implicit ways of accessing the data. Anyways, it solves a lot of problems and creates wide opportunities for further extending Armat.Threading
library. I would urge the consumers not to use this accessor as much as possible to keep the code simple and readable.
The property Initiator
can be used to identify the Job
that has triggered the current one. By recursively iterating through initiator jobs, one can navigate the hierarchy of jobs all the way up to the Root
.
I will not detail the entire interface of the Armat.Threading.Job
class, considering it largely corresponds to the System.Threading.Tasks.Task
from .NET CLR. Also, the class property and method names seem to be quite descriptive.
JobScheduler
IJobScheduler (the abstraction)
The interface Armat.Threading.IJobScheduler
defines the behavior for job schedulers. Following is the declaration of job scheduler interface:
public interface IJobScheduler : IDisposable
{
static IJobScheduler Default { get; }
static IJobScheduler Current { get; }
void Enqueue(Job job);
Boolean Cancel(Job job);
Int32 PendingJobsCount { get; }
JobSchedulerScope EnterScope();
}
The interface is minimalistic and self-descriptive.
One way of using the given instance of IJobScheduler
for running asynchronous Jobs is by using the IJobScheduler.EnterScope
method. It allows to override the Default Job scheduler in the scope of calling method. The following example illustrates how it can be achieved:
private async Job<Int64> JobExecutionInCustomScheduler(IJobScheduler otherScheduler)
{
using var otherScope = scheduler.EnterScope();
Job<Int32> asyncJob = new Job<Int32>(SumFn, new Int32[] { 1, 2, 3 });
asyncJob.Run();
return await asyncJob.ConfigureAwait(false);
}
private Int32 SumFn(Object? args)
{
Int32[] array = (Int32[])args;
return array.Sum();
}
JobSchedulerBase (the building blocks)
The abstract class JobSchedulerBase
implements IJobScheduler
interface. It is designed to be the base class for all possible implementations of job scheduler. JobSchedulerBase
provides means of setting the Default
property of IJobScheduler
interface, as well as provides protected Job
execution APIs for derived classes.
public abstract class JobSchedulerBase : IJobScheduler
{
public static IJobScheduler Default { get; protected set; }
protected Boolean UpdateJobStatus(Job job, JobStatus newStatus, JobStatus prevStatus);
protected JobStatus ExecuteJobProcedure(Job job);
protected Int32 ExecuteJobContinuations(Job job);
}
Note: The decision to provide an abstract base class for all JobSchdulers seems to be contradicting to the best design practices. Particularly, it makes impossible to provide any other implementation of IJobScheduler
without deriving it from JobSchedulerBase
. I chose this solution to protect certain Job
methods using the internal
access modifier, which then are made accessible through correspondng JobSchedulerBase
methods. It ensures that Job
execution methods are only accessible within the same assembly, thus providing an extra layer of security and control over how those are used. It also guarantees setting & resetting IJobScheduler.Current
property during the Job
execution.
JobScheduler (the implementation)
The class JobScheduler
derives from JobSchedulerBase
and provides the default implementation of asynchronous jobs scheduling mechanism. It manages the thread pool(s) and queue of committed Jobs, as well as provides some statistics about Job
executions.
Using the appropriate overload of the constructor, it's possible to configure JobScheduler
with a name, to use certain min/max number of threads, and limit the size of pending Jobs in the queue(s). JobScheduler
manages regular and long-running jobs in separate queues and executes them using separate thread pools, thus ensuring not to block smaller jobs by the long-running ones.
Below are two identical examples of explicitly using the JobScheduler
for running jobs:
public class AsyncExecutor
{
private JobScheduler myScheduler = new("Scheduler of ProcA");
public async void RunInOwnScheduler_Example1()
{
Job job = new(ProcA);
job.Run(myScheduler);
await job.ConfigureAwait(false);
}
public async void RunInOwnScheduler_Example2()
{
Job job = new(ProcA);
myScheduler.Enqueue(job);
await job.ConfigureAwait(false);
}
private async void ProcA()
{
}
}
JobRuntimeScope
Definition of the scope
The class Armat.Threading.JobRuntimeScope
represents a scope of an asynchronous operation. The scope begins at the moment of instantiation of JobRuntimeScope
object and ends with its disposal (generally after completing the asynchronous operation). It represents a pair of String Key and an Object Value. JobRuntimeScope
objects can be retrieved via static accessors during asynchronous code execution (irrespective of the number & depth of asynchronous calls).
Using JobRuntimeScope
one can deliver parameters to the nested asynchronous methods, thus providing contextual information about the invocation. Some good examples of using JobRuntimeScope
are
- identifying correlation of Jobs,
- tracing asynchronous code execution,
- delivering contextual information to the nested methods.
rmat.Threading.JobRuntimeScope
consists of the following members:
public static JobRuntimeScope Enter(String key, Func<Object> factory)
Instantiates an object of JobRuntimeScope
type with the given key and uses the factory method to initialize the Value property.
Note: In case if the key is already defined in the current scope, the existing instance of JobRuntimeScope
is returned, and the factory method is never invoked to create a new one. public static JobRuntimeScope Enter<T>(String key, Func<T> factory)
Represents an overloaded generic version of JobRuntimeScope.Enter
method which can store a Value of type T
. public static JobRuntimeScope Enter<T>(Func<T> factory)
Represents
an overloaded generic version of JobRuntimeScope.Enter
method which uses T
type as a key for creating the scope. public static JobRuntimeScope EnterNew(String key, Func<Object> factory)
Instantiates an object of JobRuntimeScope
type with the given key and uses the factory method to initialize the Value property.
Note: In case if the key is already defined in the current scope, JobRuntimeScope.Null
is returned to indicate a failure result, and the factory method is never invoked to create a new one. public static JobRuntimeScope EnterNew<T>(String key, Func<T> factory)
Represents
an overloaded generic version of JobRuntimeScope.EnterNew
method which can store a Value of type T
. public static JobRuntimeScope EnterNew<T>(Func<T> factory)
Represents
an overloaded generic version JobRuntimeScope.EnterNew
method which uses T
type as a key for creating the scope. public static Object? GetValue(String key)
Returns the value for the given key in the current scope. Will return null if a scope for the key is not found. public static T? GetValue<T>(String key)
Returns the value for the given key in the current scope, and null if a scope for the key is not found. It throws an exception if the value is not assignable to type T
. public static T? GetValue<T>()
Represents an overloaded generic version JobRuntimeScope.GetValue
method which uses T
type as a key. public void Leave()
Leaves the current scope by removing the appropriate key. public void Dispose()
Leaves the current scope as described in Leave()
method. It is designed be invoked with using keyword for ensuring proper disposal when exiting the method scope. public Boolean IsNull
Returns true for null (invalid) scope instances. Null scopes can be returned from EnterNew
method described above in case of a failure to enter the given scope. - public String Key { get; }
Returns the Key of JobRuntimeScope
object. - public Object Value { get; }
Returns the Value of JobRuntimeScope
object.
I won't be trying to describe the implementation details of JobRuntimeScope
class in this article because it seems to be clear reading the source code itself. The main idea lies in storing the JobRuntimeScope
object in ThreadLocal cache and ensuring to propagate it using the jobs instantiated within that thread.
Some examples of using JobRuntimeScope
are available as unit tests here. The following code illustrates how to use it:
private async Job DoSomething()
{
await RunUserScopedOperation().ConfigureAwait(false);
UserData? userData = JobRuntimeScope.GetValue<UserData>();
}
private async Job RunUserScopedOperation()
{
using var scope = JobRuntimeScope.Enter<UserData>(() => new UserData("abc", "123"));
UserData? userData1 = JobRuntimeScope.GetValue<UserData>();
await AsyncOperation().ConfigureAwait(false);
UserData? userData2 = JobRuntimeScope.GetValue<UserData>();
}
private async Job AsyncOperation()
{
UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
await Job.Yield();
UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
}
Thus, JobRuntimeScope
provides a generic mechanism to deliver user-defined contextual information to the code running in context of a Job
.
CorrelationIdScope
The class Armat.Threading.CorrelationIdScope
is one of possible value types for JobRuntimeScope
. It generates auto-incrementing 64-bit IDs to be used for correlation across asynchronous operations (for logging, tracing or any other needs). It also provides convenient factory methods for instantiating JobRuntimeScope
with a new CorrelationIdScope
value as shown in the example below (the appropriate unit test is available here):
private async Job RunCorrelationIDTest(Int32 testNum)
{
using var scope = CorrelationIdScope.Create();
await Job.Yield();
CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
Output.WriteLine("RunCorrelationIDTest: Correlation ID for test {0} is {1}",
testNum,
currentCorrId.CorrelationID);
await NestedAsyncMethodCall(testNum, CorrelationIdScope.Current()!.CorrelationID, 1).ConfigureAwait(false);
}
private async Job NestedAsyncMethodCall(Int32 testNum, Int64 expectedCorrID, Int32 depth)
{
await Job.Yield();
CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
Output.WriteLine("NestedAsyncMethodCall<{0}>: Correlation ID for test {1} is {2}",
depth,
testNum,
currentCorrId.CorrelationID);
if (depth < 3)
await NestedAsyncMethodCall(testNum, expectedCorrID, depth + 1).ConfigureAwait(false);
}
Note: There's only one instance of CorrelationIdScope
that will be created and used until the completion of asynchronous operation. Every next call to CorrelationIdScope.Create()
in context of the same operation will return the already existing instance of CorrelationIdScope
. Thus, it will generate an Identifier only on the first call and keep it accessible during the whole operation. This way it's possible to trace and easily filter the execution of a single operation in application log files.
Parallel Code Execution Mechanisms in C#
In this section I'll try to shed some light on async / await implementation details in .NET. It won't be very detailed, rather than a high-level breakdown of the execution flow. Hopefully, with the help of Armat.Threading
library sources, it should be easier to debug & understand the magic behind TPL.
When an async method is invoked with as await keyword, it starts asynchronous code execution using the appropriate MethodBuilder
. To do so, the method is broken into segment which are partitioned by await statements as shown below:
In Armat.Threading
library the Job
class is decorated with [System.Runtime.CompilerServices.AsyncMethodBuilder(typeof(JobMethodBuilder))]
attribute which indicates the type of MethodBuilder
to be used for asynchronous methods with a return value of type Job
.
The following sequence is initiated by .NET runtime during asynchronous method execution:
- The .NET runtime creates an instance of method builder object to orchestrate method execution. See the
JobMethodBuilder.Create()
implementation for the reference. - Next, the
MethodBuilder.Start(ref TStateMachine stateMachine)
is called to begin the function execution. The delegate stateMachine.MoveNext()
points to the first segment of the executing method. The implementation must ensure that ExecutionContext and SynchronizationContext of the calling thread are restored before the method completes.
For that I've reused the AsyncTaskMethodBuilder.Start(stateMachine)
which represents the method builder for TPL Task (see void JobMethodBuilder.Start<TStateMachine>(ref TStateMachine stateMachine)
).
Note: In the diagram above, it corresponds to calling the "Segment 1" delegate. It does not run asynchronously and blocks the calling thread until the first await statement. - Once the code execution reaches the first await statement it creates an
Awaiter
object. To do so, .NET runtime calls GetAwaiter()
method for the object on the right side of await statement. Awaiter object is intended to invoke the continuation (or the next segment) asynchronously. It's also the responsibility of awaiter to capture the ExecutionContext of the calling thread and optionally to restore it after the continuation method returns.
Note: In the diagram above the awaiter of Job (or Task) returned from the "Segment 1" is used to trigger the "Segment 2" continuation asynchronously. - After creating the awaiter object, one of the
MethodBuilder.AwaitOnCompleted
or MethodBuilder.AwaitUnsafeOnCompleted
is called by .NET runtime. The awaiter is passed to this method with the appropriate stateMachine. At this stage the delegate stateMachine.MoveNext()
points to the next section of the executing method. It is the responsibility of awaiter to invoke that delegate asynchronously (in one of the threads from the pool).
Note: In case of loops within a segment (like in case of "Segment 3"), the same continuation method is called multiple times with appropriate state of the stack. It is also important to note that simplifying the breakdown of a method into segments does not precisely reflect reality. In fact, the execution continues until reaching to the next await statement, and it can cross the "segment boundaries". For instance, in the example provided, the execution of "Segment 3" could loop back to execute the code within "Segment 2" inside the for loop. - Once the async method execution completes, one of the
MethodBuilder.SetResult
or MethodBuilder.SetException
is called by .NET runtime to report the method completion result. This result is set to the resulting Job
exposed by the MethodBuilder.Task
property.
Note: returning a value or throwing an exception from a middle of the method will break the execution and will set appropriate values to the resulting Job using the appropriate setter of MethodBuilder.
There are numerous details in each of the outlined steps, and recalling all of them at once may not be feasible. Understanding the .NET source code backing TPL is quite challenging; however, exploring a few classes from the Armat.Threading
library might be a better approach if you'd like to delve deeper.
Summary
I hope the Armat.Threading
library will inspire you to incorporate it into your projects. I have one, and it works quite well for me. Note that I created this library a few years ago, but I didn't have the opportunity or time to publish it until now. I've made some improvements to it while writing this article, but it still requires additional polishing to become the "ideal" one :).
History
- Version 1: The first version.
- Version 2: Fixed the article tags and GitHub repo URL.
- Version 3: Minor correction of method builder execution flow.