Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Powerful Alternative to .NET TPL

5.00/5 (6 votes)
5 Aug 2024MIT12 min read 11.1K  
Armat.Threading is an asynchronous code execution library based on .NET TPL mechanisms
Armat.Threading library is an alternative implementation of .NET Task Parallel Library (TPL). It enables using multiple Thread Pools within a single process, allows carrying user defined data (context) across hierarchy of asynchronous operations, is compatible with C# async / await notation, and is extensible enough to meet your own application demands.

Introduction

Asynchronous code execution mechanisms have always been one of the most complex areas in programming. With the introduction of TPL in .NET framework, parallel programming techniques became available to much broader range of developers, thus also making .NET as a framework of choice for many of us.

As a generic asynchronous programming framework, TPL solves most of the real-life problems. It can be used to

  • run user-initiated operations asynchronously - thus providing smooth user experience,
  • trigger parallel code execution - allowing to fully utilize hardware resources,
  • build a graph of inter-dependent tasks - significantly reducing code complexity,
  • and probably more scenarios I won't be bothering to talk about.

While the cases mentioned above may suffice most of the applications, occasionally I encounter limitations that are difficult to overcome with the standard tooling of .NET. While working on my C# project, I chose to take a step back and offer a generic solution for some of those. Particularly, I needed

  • a way to create multiple thread pools within a process,
  • mechanism to carry user context across asynchronous method invocations,
  • logging / traceability of asynchronous code execution,
  • and most importantly, better understanding of TPL internals.

This article will delve into the Armat.Threading library, providing practical examples of its use. Additionally, it will shed light on the implementation details of asynchronous programming in .NET, an area often lacking documentation, specifically related to TPL internals.

Armat.Threading Library

The article is divided into two parts. First part focuses on usage of Armat.Threading library with some real-life examples, while the second part aims to uncover asynchronous programing mechanisms in .NET for supporting async / await notation.

Usage of Armat.Threading for Parallel Code Execution

The Job class

The class Armat.Threading.Job corresponds to System.Threading.Tasks.Task in TPL. It has a similar interface to Task class and serves as the core asynchronous execution unit of Armat.Threading library. The class Armat.Threading.Job<T> corresponds to System.Threading.Tasks.Task<T> in TPL. It derives from the non-generic Job class and has a similar interface to Task<T>. The class Job<T> drives most of the functionality from the Job and is used to execute asynchronous methods which eventually return a result of type T.

Below are some examples of executing a code asynchronously using the Armat.Threading library:

C#
    // Run a Job with no args and no return value
    await Job.Run(
        () ->
        {
            Console.WriteLine("Running job asynchronously");
        }).ConfigureAwait(false);

    // Run a Job with args but no return value
    await Job.Run(
        (double number) ->
        {
            Console.WriteLine("Running job asynchronously with arg {0}", number);
        }, 0.1248).ConfigureAwait(false);

    // Run a Job with no args and returning T
    T result = await Job<T>.Run(
        () ->
        {
            Console.WriteLine("Running job asynchronously");
            return default(T);
        }).ConfigureAwait(false);

    // Run a Job with args and returning T
    T result = await Job<T>.Run(
        (double number) ->
        {
            Console.WriteLine("Running job asynchronously with arg {0}", number);
            return new T(0.1248));
        }, 0.1248).ConfigureAwait(false);

There are multiple ways of running a Job. The simplest way of doing so is to use the static Run method (as shown above). It's also possible to create an instance of Job object using one of the constructors. Jobs created in this way must be executed later by invoking the appropriate overload of Run method as shown below:

C#
    public Int32 Sum(Int32[] array)
    {
        // Create the Job
        Job<Int32> asyncJob = new Job<Int32>(SumFn, array);

        // Run asynchronously
        asyncJob.Run();

        // Wait for it to finish and return the result
        return asyncJob.Result;
    }

    private Int32 SumFn(Object? args)
    {
        Int32[] array = (Int32[])args;

        return array.Sum();
    }

Each instance of a Job has a unique auto-incrementing Id of Int64 type. Those can be used for debugging, logging or tracing purposes. The decision of using Int64 identifiers is based on the following:

  1. It has a relatively small size and does not require heap memory allocation
  2. Generating new identifiers (incrementing Int64 numbers) should not cause performance impact
  3. It should not overflow even in case of long-running applications. Assuming 1 Job/millisecond is the rate of creating Jobs within a process, it will take about 300,000,000 years to overflow.

It's possible to identify the currently running instance of a Job using the static property Armat.Threading.Job.Current (it will be null if the current code is not running in context of any Armat.Threading threads). This way the executing code can access any Job properties (like AsyncState, CancellationToken, Initiator) to define its own behavior.

Note: I won't be insisting if it's the best design choice to expose the current running Job instance with a static property. Although it simplifies the code (for instance you do not have to explicitly deliver the CancellationToken to the nested methods for supporting cancellation), but it also allows any method running in context of a Job to read information it probably shouldn't have access to. It could also obscure the code due to implicit ways of accessing the data. Anyways, it solves a lot of problems and creates wide opportunities for further extending Armat.Threading library. I would urge the consumers not to use this accessor as much as possible to keep the code simple and readable.

The property Initiator can be used to identify the Job that has triggered the current one. By recursively iterating through initiator jobs, one can navigate the hierarchy of jobs all the way up to the Root.

I will not detail the entire interface of the Armat.Threading.Job class, considering it largely corresponds to the System.Threading.Tasks.Task from .NET CLR. Also, the class property and method names seem to be quite descriptive.

JobScheduler

IJobScheduler (the abstraction)

The interface Armat.Threading.IJobScheduler defines the behavior for job schedulers. Following is the declaration of job scheduler interface:

C#
public interface IJobScheduler : IDisposable
{
    // Static property returning the default instance of IJobScheduler.
    // This is the one to be used for running the Jobs by default.
    static IJobScheduler Default { get; }

    // Static property returning the current instance of IJobScheduler to be used for Jobs execution
    // If not null, it will used for running Jobs instead of IJobScheduler.Default
    static IJobScheduler Current { get; }

    // Enqueues a Job in a scheduler.
    // To successfully enqueue a Job in a JobScheduler one must have Job.Status = JobStatus.Created (never run before).
    void Enqueue(Job job);
    // Cancels Job execution in the JobScheduler before it begins.
    // The method will fail (will return false) if the Job is already running or is finished.
    Boolean Cancel(Job job);
    // The property returns number of jobs currently waiting in the queue.
    // It may be used to monitor the current load on the JobScheduler.
    Int32 PendingJobsCount { get; }

    // Makes IJobScheduler.Current to refer to this instance the for the executing thread.
    // IJobScheduler.Current is reset to the previous value once the returned JobSchedulerScope is Disposed.
    JobSchedulerScope EnterScope();
}

The interface is minimalistic and self-descriptive.

One way of using the given instance of IJobScheduler for running asynchronous Jobs is by using the IJobScheduler.EnterScope method. It allows to override the Default Job scheduler in the scope of calling method. The following example illustrates how it can be achieved:

C#
    private async Job<Int64> JobExecutionInCustomScheduler(IJobScheduler otherScheduler)
    {
        // After this line all Jobs will be executed by the otherScheduler (unless overridden by another one)
        // This will make IJobScheduler.Current to refer to the otherScheduler
        // Disposing the otherScope will result in restoring the previous value of IJobScheduler.Current
        using var otherScope = scheduler.EnterScope();

        // create the Job
        Job<Int32> asyncJob = new Job<Int32>(SumFn, new Int32[] { 1, 2, 3 });

        // Run asynchronously
        asyncJob.Run();

        // async wait for it to finish and return the result
        return await asyncJob.ConfigureAwait(false);
    }

    private Int32 SumFn(Object? args)
    {
        Int32[] array = (Int32[])args;

        return array.Sum();
    }

JobSchedulerBase (the building blocks)

The abstract class JobSchedulerBase implements IJobScheduler interface. It is designed to be the base class for all possible implementations of job scheduler. JobSchedulerBase provides means of setting the Default property of IJobScheduler interface, as well as provides protected Job execution APIs for derived classes.

C#
public abstract class JobSchedulerBase : IJobScheduler
{
    // Gets or sets the default <code>IJobScheduler</code>. If not set, the <code>JobScheduler.Default</code> is returned.
    // Note: To protect setting the default <code>JobScheduler</code> by an arbitrary code, the setter is made protected, thus requiring a public setter in a derived class.
    // Note: <code>JobScheduler.Default</code> can be set only once during process lifetime (do this at the application initialization phase if necessary).
    public static IJobScheduler Default { get; protected set; }

    // updates the status of Job
    // can be used in derived scheduler implementations to control Job status (Job.UpdateStatus is internal to Armat.Threading library)
    protected Boolean UpdateJobStatus(Job job, JobStatus newStatus, JobStatus prevStatus);

    // this method must be used when executing jobs in a scheduler
    // in sets the IJobScheduler.Current property to this during the job execution
    // can be used in derived scheduler implementations to control Job status (Job.ExecuteProcedure is internal to Armat.Threading library)
    protected JobStatus ExecuteJobProcedure(Job job);

    // this method must be used when executing job continuations in a scheduler
    // in sets the IJobScheduler.Current property to this during the job execution
    // can be used in derived scheduler implementations to control Job status (Job.ExecuteJobContinuations is internal to Armat.Threading library)
    protected Int32 ExecuteJobContinuations(Job job);
}

Note: The decision to provide an abstract base class for all JobSchdulers seems to be contradicting to the best design practices. Particularly, it makes impossible to provide any other implementation of IJobScheduler without deriving it from JobSchedulerBase. I chose this solution to protect certain Job methods using the internal access modifier, which then are made accessible through correspondng JobSchedulerBase methods. It ensures that Job execution methods are only accessible within the same assembly, thus providing an extra layer of security and control over how those are used. It also guarantees setting & resetting IJobScheduler.Current property during the Job execution.

JobScheduler (the implementation)

The class JobScheduler derives from JobSchedulerBase and provides the default implementation of asynchronous jobs scheduling mechanism. It manages the thread pool(s) and queue of committed Jobs, as well as provides some statistics about Job executions.

Using the appropriate overload of the constructor, it's possible to configure JobScheduler with a name, to use certain min/max number of threads, and limit the size of pending Jobs in the queue(s). JobScheduler manages regular and long-running jobs in separate queues and executes them using separate thread pools, thus ensuring not to block smaller jobs by the long-running ones.

Below are two identical examples of explicitly using the JobScheduler for running jobs:

C#
public class AsyncExecutor
{
    private JobScheduler myScheduler = new("Scheduler of ProcA");

    public async void RunInOwnScheduler_Example1()
    {
        // create a job
        Job job = new(ProcA);

        // run it within own scheduler
        job.Run(myScheduler);

        // wait for the job to complete
        await job.ConfigureAwait(false);
    }

    public async void RunInOwnScheduler_Example2()
    {
        // create a job
        Job job = new(ProcA);

        // enqueue the job in own scheduler
        myScheduler.Enqueue(job);

        // wait for the job to complete
        await job.ConfigureAwait(false);
    }

    private async void ProcA()
    {
        // do something
    }
}

JobRuntimeScope

Definition of the scope

The class Armat.Threading.JobRuntimeScope represents a scope of an asynchronous operation. The scope begins at the moment of instantiation of JobRuntimeScope object and ends with its disposal (generally after completing the asynchronous operation). It represents a pair of String Key and an Object Value. JobRuntimeScope objects can be retrieved via static accessors during asynchronous code execution (irrespective of the number & depth of asynchronous calls).

Using JobRuntimeScope one can deliver parameters to the nested asynchronous methods, thus providing contextual information about the invocation. Some good examples of using JobRuntimeScope are

  • identifying correlation of Jobs,
  • tracing asynchronous code execution,
  • delivering contextual information to the nested methods.

rmat.Threading.JobRuntimeScope consists of the following members:

  • public static JobRuntimeScope Enter(String key, Func<Object> factory)
    Instantiates an object of JobRuntimeScope type with the given key and uses the factory method to initialize the Value property.
    Note: In case if the key is already defined in the current scope, the existing instance of JobRuntimeScope is returned, and the factory method is never invoked to create a new one.
  • public static JobRuntimeScope Enter<T>(String key, Func<T> factory)
    Represents an overloaded generic version of JobRuntimeScope.Enter method which can store a Value of type T.
  • public static JobRuntimeScope Enter<T>(Func<T> factory)
    Represents an overloaded generic version of JobRuntimeScope.Enter method which uses T type as a key for creating the scope.
  • public static JobRuntimeScope EnterNew(String key, Func<Object> factory)
    Instantiates an object of JobRuntimeScope type with the given key and uses the factory method to initialize the Value property.
    Note: In case if the key is already defined in the current scope, JobRuntimeScope.Null is returned to indicate a failure result, and the factory method is never invoked to create a new one.
  • public static JobRuntimeScope EnterNew<T>(String key, Func<T> factory)
    Represents an overloaded generic version of JobRuntimeScope.EnterNew method which can store a Value of type T.
  • public static JobRuntimeScope EnterNew<T>(Func<T> factory)
    Represents an overloaded generic version JobRuntimeScope.EnterNew method which uses T type as a key for creating the scope.
  • public static Object? GetValue(String key)
    Returns the value for the given key in the current scope. Will return null if a scope for the key is not found.
  • public static T? GetValue<T>(String key)
    Returns the value for the given key in the current scope, and null if a scope for the key is not found. It throws an exception if the value is not assignable to type T.
  • public static T? GetValue<T>()
    Represents an overloaded generic version JobRuntimeScope.GetValue method which uses T type as a key.
  • public void Leave()
    Leaves the current scope by removing the appropriate key.
  • public void Dispose()
    Leaves the current scope as described in Leave() method. It is designed be invoked with using keyword for ensuring proper disposal when exiting the method scope.
  • public Boolean IsNull
    Returns true for null (invalid) scope instances. Null scopes can be returned from EnterNew method described above in case of a failure to enter the given scope.
  • public String Key { get; }
    Returns the Key of JobRuntimeScope object.
  • public Object Value { get; }
    Returns the Value of JobRuntimeScope object.

I won't be trying to describe the implementation details of JobRuntimeScope class in this article because it seems to be clear reading the source code itself. The main idea lies in storing the JobRuntimeScope object in ThreadLocal cache and ensuring to propagate it using the jobs instantiated within that thread.

Some examples of using JobRuntimeScope are available as unit tests here. The following code illustrates how to use it:

C#
    private async Job DoSomething()
    {
        // run some user scoped operation
        await RunUserScopedOperation().ConfigureAwait(false);

        // user data is null here because there's JobRuntimeScope object 
        //   has been defined and disposed within RunUserScopedOperation method
        UserData? userData = JobRuntimeScope.GetValue<UserData>();
    }
    private async Job RunUserScopedOperation()
    {
        // create the scope with some UserData information
        // 'using' keyword guarantees to have the scope Disposed when exiting the method
        using var scope = JobRuntimeScope.Enter<UserData>(() => new UserData("abc", "123"));

        // user data refers to the one declared above
        UserData? userData1 = JobRuntimeScope.GetValue<UserData>();

        // run any asynchronous operation
        // UserData will be accessible in all inner synchronous or asynchronous methods
        //   irrespective of the thread running the method
        await AsyncOperation().ConfigureAwait(false);

        // user data remains the same as above
        UserData? userData2 = JobRuntimeScope.GetValue<UserData>();
    }
    private async Job AsyncOperation()
    {
        // user data remains the same as created in the caller method
        UserData? userData3 = JobRuntimeScope.GetValue<UserData>();

        // running some asynchronous operations
        await Job.Yield();

        // user data remains the same as created in the caller method
        UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
    }

Thus, JobRuntimeScope provides a generic mechanism to deliver user-defined contextual information to the code running in context of a Job.

CorrelationIdScope

The class Armat.Threading.CorrelationIdScope is one of possible value types for JobRuntimeScope. It generates auto-incrementing 64-bit IDs to be used for correlation across asynchronous operations (for logging, tracing or any other needs). It also provides convenient factory methods for instantiating JobRuntimeScope with a new CorrelationIdScope value as shown in the example below (the appropriate unit test is available here):

C#
    private async Job RunCorrelationIDTest(Int32 testNum)
    {
        // this will create correlation ID and the appropriate scope
        using var scope = CorrelationIdScope.Create();

        // any asynchronous code execution
        await Job.Yield();

        // correlation ID is available here (after async method invocation)
        CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
        Output.WriteLine("RunCorrelationIDTest: Correlation ID for test {0} is {1}",
            testNum,
            currentCorrId.CorrelationID);

        // nested async method invocations
        await NestedAsyncMethodCall(testNum, CorrelationIdScope.Current()!.CorrelationID, 1).ConfigureAwait(false);
    }

    private async Job NestedAsyncMethodCall(Int32 testNum, Int64 expectedCorrID, Int32 depth)
    {
        // any asynchronous code execution
        await Job.Yield();

        // correlation ID remains the same as in the caller method above
        CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
        Output.WriteLine("NestedAsyncMethodCall<{0}>: Correlation ID for test {1} is {2}",
            depth,
            testNum,
            currentCorrId.CorrelationID);

        // go even deeper
        if (depth < 3)
            await NestedAsyncMethodCall(testNum, expectedCorrID, depth + 1).ConfigureAwait(false);
    }

Note: There's only one instance of CorrelationIdScope that will be created and used until the completion of asynchronous operation. Every next call to CorrelationIdScope.Create() in context of the same operation will return the already existing instance of CorrelationIdScope. Thus, it will generate an Identifier only on the first call and keep it accessible during the whole operation. This way it's possible to trace and easily filter the execution of a single operation in application log files.

Parallel Code Execution Mechanisms in C#

In this section I'll try to shed some light on async / await implementation details in .NET. It won't be very detailed, rather than a high-level breakdown of the execution flow. Hopefully, with the help of Armat.Threading library sources, it should be easier to debug & understand the magic behind TPL.

When an async method is invoked with as await keyword, it starts asynchronous code execution using the appropriate MethodBuilder. To do so, the method is broken into segment which are partitioned by await statements as shown below:

Image 1

In Armat.Threading library the Job class is decorated with [System.Runtime.CompilerServices.AsyncMethodBuilder(typeof(JobMethodBuilder))] attribute which indicates the type of MethodBuilder to be used for asynchronous methods with a return value of type Job.

The following sequence is initiated by .NET runtime during asynchronous method execution:

  1. The .NET runtime creates an instance of method builder object to orchestrate method execution. See the JobMethodBuilder.Create() implementation for the reference.
  2. Next, the MethodBuilder.Start(ref TStateMachine stateMachine) is called to begin the function execution. The delegate stateMachine.MoveNext() points to the first segment of the executing method. The implementation must ensure that ExecutionContext and SynchronizationContext of the calling thread are restored before the method completes.
    For that I've reused the AsyncTaskMethodBuilder.Start(stateMachine) which represents the method builder for TPL Task (see void JobMethodBuilder.Start<TStateMachine>(ref TStateMachine stateMachine)).
    Note: In the diagram above, it corresponds to calling the "Segment 1" delegate. It does not run asynchronously and blocks the calling thread until the first await statement.
  3. Once the code execution reaches the first await statement it creates an Awaiter object. To do so, .NET runtime calls GetAwaiter() method for the object on the right side of await statement. Awaiter object is intended to invoke the continuation (or the next segment) asynchronously. It's also the responsibility of awaiter to capture the ExecutionContext of the calling thread and optionally to restore it after the continuation method returns.
    Note: In the diagram above the awaiter of Job (or Task) returned from the "Segment 1" is used to trigger the "Segment 2" continuation asynchronously.
  4. After creating the awaiter object, one of the MethodBuilder.AwaitOnCompleted or MethodBuilder.AwaitUnsafeOnCompleted is called by .NET runtime. The awaiter is passed to this method with the appropriate stateMachine. At this stage the delegate stateMachine.MoveNext() points to the next section of the executing method. It is the responsibility of awaiter to invoke that delegate asynchronously (in one of the threads from the pool).
    Note: In case of loops within a segment (like in case of "Segment 3"), the same continuation method is called multiple times with appropriate state of the stack. It is also important to note that simplifying the breakdown of a method into segments does not precisely reflect reality. In fact, the execution continues until reaching to the next await statement, and it can cross the "segment boundaries". For instance, in the example provided, the execution of "Segment 3" could loop back to execute the code within "Segment 2" inside the for loop.
  5. Once the async method execution completes, one of the MethodBuilder.SetResult or MethodBuilder.SetException is called by .NET runtime to report the method completion result. This result is set to the resulting Job exposed by the MethodBuilder.Task property.
    Note: returning a value or throwing an exception from a middle of the method will break the execution and will set appropriate values to the resulting Job using the appropriate setter of MethodBuilder.

There are numerous details in each of the outlined steps, and recalling all of them at once may not be feasible. Understanding the .NET source code backing TPL is quite challenging; however, exploring a few classes from the Armat.Threading library might be a better approach if you'd like to delve deeper.

Summary

I hope the Armat.Threading library will inspire you to incorporate it into your projects. I have one, and it works quite well for me. Note that I created this library a few years ago, but I didn't have the opportunity or time to publish it until now. I've made some improvements to it while writing this article, but it still requires additional polishing to become the "ideal" one :).

History

  • Version 1: The first version.
  • Version 2: Fixed the article tags and GitHub repo URL.
  • Version 3: Minor correction of method builder execution flow.

License

This article, along with any associated source code and files, is licensed under The MIT License