Click here to Skip to main content
15,881,089 members
Articles / High Performance Computing / Parallelization

Processing Data Through A Pipeline Using DataFlowLite

Rate me:
Please Sign up or sign in to vote.
4.50/5 (3 votes)
18 Apr 2018CPOL8 min read 9.2K   140   11   1
A custom framework that will parallelize data processing through a pipeline framework

Introduction

First, let me state right up front that I know there is a Dataflow Framework that is part of the Task Parallel Library. I drew some inspiration from it while creating my own framework that I am going to present.

Background

I stumbled on the Dataflow Framework one day and decided to do a proof of concept. I did this to understand what it is, how to use it and what the pros and cons were in order to determine if I might be able to use it in my current project.

The Dataflow Framework documentation states the following:

  • "This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks."
  • "These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available."

This pipeline can be thought of as a enhancement to the Builder Pattern. Individual steps are sequenced in order to create a pipeline which will produce an output or perform overall operation.

As an example, the pipeline I've created using my own framework will:

  • Input several lists of numbers that range from 1 to 200
  • Transform each list of numbers into individual numbers
  • Determine if the number is Even/Odd, convert it to a string and filter to an appropriate output
  • All the Even strings are batched together and all the Odd strings are batched together into a list of strings. After collecting 9 in the list, the list is then passed on
  • The batched Even or Odd outputs are reformed into a single pipeline
  • The final step in the process is to output the list of text as a single block

While this pipeline is simplistic and rather trivial, it shows what you can do with the framework. Both frameworks will take care of the overall functionality of moving the data around and handling multithreading each step. This allows you to focus on writing only that which you really care about. What you choose to do in each of your steps is up to you.

I should point out that while I did draw inspiration from the original framework, they are completely different. In some instances, I used exact or similar name for classes and other times, completely different. So the real question is why create a framework when one already exists?

  • Size - There are 2 different Nuget packages to choose from. The second one is more recently supported as indicated by the date, however it comes with a hefty install and has replacements for some of the common DLLs.
    • Microsoft.Tpl.Dataflow (3.01M) v4.5.24 12/10/2014
    • System.Threading.Tasks.Dataflow (12.4M) v4.8.0 08/11/2017
  • Company directive - Some companies and industries will not allow 3rd party code or Nuget packages regardless of who made it. So you need to roll your own.
  • Sometimes simpler is better.
  • Satisfaction - I like taking things apart and putting them back together. This helps me understand how things work.

Using My Framework Code

Building Blocks

Base Class

  • ExecutionBlock - This is an abstract class in which the other blocks are derived from.
    • Exceptions - storage area if generated
    • Execution State - Enumeration
    • Source - Incoming data storage that the method can operate on
    • Execution Task - A TPL Task that can be accessed for a variety of reasons
    • Miscellaneous operations on code completion, cancelling and waiting
    • Parallel Options - options on how to parallelize your code if requested

Each block has an incoming data collection. If you don't provide your own custom one, I will provide one for you. The internal collection is based on the BlockingCollection which has several features around producer/consumer concurrency control as well as features that limit the amount of data collected before blocking as well as don't accept any more data. I've provided a custom implementation (AnotherSource) which is still based on the BlockingCollection but shows everything that is needed to make it work. It is up to the reader to understand and create their own version if required.

Blocks

The following pre-defined blocks for the most part have a reference to the next step in the pipeline and a method/function that you provide. In addition, there is a section of code which defines how to get data from the source, pass it to the method, conditionally output it to the next step in the pipeline and handle cancelling of the job or any exceptions that are generated. The next stop in the pipeline will be referred to as the Target going forward.

  • TransformBlock - This can transform data from one type to another or could be used to do something like if the data passes some validation routine, it goes on to the target.
    • Function - Single input and single output - You define both
  • TransformToManyBlock - This is the same as the Transform block with the one difference that it will potentially output multiple objects. In my example, I have an input of List<int> and output (n)umber of individual integers
    • Method - 2 inputs and no output - The inputs are your data type and a reference to the target (ITarget<T>) which is the next step in the pipeline.
  • FilterBlock - Optionally transform incoming data and send to the target. In my example, I send even numbers to one pipeline and odd to another. This block allows you to add as many filters as you would like. I've made the restriction that all of the targets must have the same parameter type to keep the implementation simple and also made the conscious decision to pass all data through all filters. This was intentional as another filter could have been added for prime numbers in which some numbers would be both odd and prime. This could be a performance issue if you have a lot of filters and only need to filter it to one location. If this is your case, you might want to create a ChainOfResponsibilityBlock and provide the logic you are looking for in the Execute method.
    • Pairs of Method and Reference to the target - Both the code and target can be different for each filter
  • BatchInputBlock - This is the opposite of TransformToManyBlock. This will collect individual items into a list of items and then pass it on. You get to decide on the number of items to hold into the list. This could be used to collect the data before you bulk load data into a database.
     
  • SourceJoinBlock - In the event that you would like to bring 2 or more pipelines back into a single pipeline moving forward, you will need to use this class. This properly handles the moving of data from all of the source block into the single target. You can think of this as a funnel. This is very necessary in order to prevent the target from closing its input stream prematurely which will lead to exceptions being thrown.
    • Source - (N)umber of pipelines that will be sending data.
  • TerminatorBlock - This is the block at the end of the pipeline. This block is special because the data will not be passed on to any more blocks.

Block Execution Example

This is an example of an Execute method contained within the blocks in my framework. While this section of code came from the TransformToManyBlock, each of the other blocks are equally simplistic. They contain:

  • a loop to process the incoming data
  • a way to identify when there is no more incoming data
  • a parallel statement to process the data according to your needs
  • a call to your custom code
  • exception handling
  • a statement that tells the next block there is no more data coming
C#
protected override void Execute()
{
    State = ExecutionState.Running;

    while (State == ExecutionState.Running)
    {
        try
        {
            if (Source.Count == 0 && Source.IsCompleted)
            {
                State = ExecutionState.Done;
                continue;
            }

            Parallel.ForEach(Source.GetConsumingEnumerable(), ParallelOptions, item =>
            {
                if (item == null)
                    return;

                Method(item, Target);
            });
        }
        catch (OperationCanceledException)
        {
            State = ExecutionState.Cancel;
        }
        catch (Exception ex)
        {
            State = ExecutionState.Error;
            Exceptions.Add(ex.GetBaseException());
            ParallelOptions.Cancel();
        }
    }

    Target.CompleteAdding();
}

Sample Pipeline

C#
public SimplePipeline()
{
    // Create the steps using the built-in data source 
    _blocks.Add(_step1  = new TransformToManyBlock<List<int>, int>(MethodStep1));
    _blocks.Add(_step2  = new FilterBlock<int, string>());
    _blocks.Add(_step3A = new BatchInputBlock<string>(9));
    _blocks.Add(_step3B = new BatchInputBlock<string>(9));
    _blocks.Add(_step4  = new SourceJoinBlock<List<string>>());
    _blocks.Add(_step5  = new ActionBlock<List<string>>(MethodStep6));

    // Set the number of processors per step and Cancellation Token if desired
    var cancellationSource = new CancellationTokenSource();
    _step1.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(6);
    _step2.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(3);
    _step3A.ParallelOptions.SetCancellationSource(cancellationSource);
    _step3B.ParallelOptions.SetCancellationSource(cancellationSource);
    _step4.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);
    _step5.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);

    // Link one step to the next
    _step1.LinkTo(_step2);
    _step2.LinkTo(_step3A, FilterMethod2A);
    _step2.LinkTo(_step3B, FilterMethod2B);

    _step3A.LinkTo(_step4);
    _step3B.LinkTo(_step4);

    _step4.LinkTo(_step5);
    _step4.AddSource(_step3A);
    _step4.AddSource(_step3B);
}

Custom Code for Required Steps

C#
static private void MethodStep1(List<int> list, ITarget<int> target)
{
    list?.ForEach(i => target?.TryAdd(i));
}

static private void FilterMethod2A(int value, ITarget<string> target)
{
    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    if (value % 2 != 0)
        return;

    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    target.TryAdd($"EVEN - {value}");
}

static private void FilterMethod2B(int value, ITarget<string> target)
{
    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    // Used to test throwing an exception
    if (value == 13131313)
    {
        Console.WriteLine($"Throwing Exception - {value} is unlucky\n");
        throw new System.IO.InvalidDataException($"{value} is unlucky");
    }

    if (value % 2 == 0)
        return;

    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random2.Next(100)).Wait();

    target.TryAdd($" ODD - {value}");
}

static private void MethodStep6(List<string> items)
{
    var len = items.Count;
    if (len <= 0)
        return;

    var sb = new StringBuilder(1000);
    sb.AppendLine("==============================");

    for (var i = 0; i < len; i++)
        sb.AppendLine($"{i + 1} - {items[i]}");

    Console.WriteLine(sb.ToString());
}

Closing Remarks

As I have stated before, this was an exercise in creating a proof of concept to understand how data flows from one step to the next. As such, I didn't do any performance testing with either framework. If you look at mine, there really isn't a lot of overhead. Most of the issues would come with the way the custom code is written or by setting the parallelism inappropriately.

I've also included a sample application using the TPL Dataflow Framework so you can compare and contrast the differences between them. I found creating the pipeline with the TPL Dataflow Framework to be frustrating at times because:

  • I couldn't always make it do exactly what I was interested in (i.e., taking a list of items and creating individual items)
  • Had trouble creating the method call
  • JoinBlock seems odd that I have to process both at the same instead of being called with item1 or item2

Take a stroll through the code and feel free to ask questions but know that I consider this the end and don't plan on adding or enhancing the code as I consider my objectives met.

The code is made fully available to you to hack up or enhance in any way that you see fit.

Enjoy!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) Webbert Solutions
United States United States
Dave is an independent consultant working in a variety of industries utilizing Microsoft .NET technologies.

Comments and Discussions

 
QuestionSystem.Threading.Tasks.Dataflow is Open Source and MIT Licensed Pin
Matthew Dennis18-Apr-18 15:55
sysadminMatthew Dennis18-Apr-18 15:55 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.