Click here to Skip to main content
15,881,681 members
Articles / Programming Languages / C#

A Data Processing Design Pattern for Intermittent Input Data

Rate me:
Please Sign up or sign in to vote.
5.00/5 (5 votes)
30 Jun 2020CPOL4 min read 12.7K   128   9   8
Data Processing with RAM and CPU optimization
For processing continuous data input, RAM and CPU utilization has to be optimized. If there are multiple threads collecting and submitting data for processing, then you have two options from there. One is to create equal amount of input threads for processing data or store the input data in memory and process it one by one. Creating large number of threads chokes up the CPU and holding everything in memory exhausts the RAM. We need a balanced solution.

Introduction

If your data is intermittent (non-continuous), then we can leverage the time span gaps to optimize CPU\RAM utilization. The idea is to process the data before the next batch of data arrives. Let’s say that you receive N number of input data every T second with each data is of d size and one data requires P seconds to process. With a single thread, the Total output time needed will be N x P seconds. If N x P < T , then there is no issue anyway you program it. However, if N x P > T, then you need multiple threads, i.e., when time needed to process the input is greater than time between two consecutive batches of data. If we introduce another variable for multiple threads, then our problem simplifies to [ (N x P) / c ] < T.

Next constraint is how many threads you can create? That limits the factor c. If c is too high, then it would consume lot of CPU. Here, we bring in RAM utilization. As and when data comes in, we first store it in memory and then use c threads to process it. Hence, at any time, there will be c active threads and N-c pending items in queue. Let us say r number of batches which can be in memory, one batch can be processed by c threads at a time. One batch size is c x d. Now we can boil it down to:

  • [ (N x P) / (r x c) ] < T
  • r = Affordable RAM / (c x d)

Background

This scenario is applicable mostly for polling-based systems when you collect data at a specific frequency. Hence, the assumption is that data flow is intermittent and happens in interval. You can leverage the time gaps between data collection to optimally utilize CPU and RAM.

Using the Code

We need an investigative approach to data processing as one size does not fit all. Many parameters like N, d and P are not known beforehand. Hence, we need the design to also supply statistical information so that we can know about N, d and P and adjust CPU and RAM demands accordingly.

As a rough guideline, we need a way to ingest all data submitted via threads. Then, either start processing them immediately or line them up in a queue and process them in multiple threads.

C# provides blocking and bounding capabilities for thread-safe collections. This is an interesting feature which can be used to optimize CPU and Memory for high workload applications. This pattern can be further stacked and interconnected to build directed graphs of data routing. This pattern is used extensively in Apache Nifi Processors.

Before diving further into pattern, let us understand what is bounding and blocking. What problems do they solve?

When there are multiple threads trying to take data from a container, we want the threads to block till more data is available. This is called as “blocking”.

When multiple threads are writing data, we want them to bound until some memory is free to accommodate new data. This is called as “bounding”.

Hence, we can use a blocking collection as the underlying data container.

C#
BlockingCollection DataContainer = new BlockingCollection<string>(
new ConcurrentBag<string>(),
this.MaxContainerSize);

For thread pool, you can use .NET framework built in thread pool but I am using simple array of threads for the sake of simplicity. In fact, I don’t tend towards someone else “managing my threads” 😊.

C#
Thread[] Workers = new Thread[this.MaxWorkerThreads];

for (int i = 0; i < Workers.Length; i++)
{

    Thread newThread = new Thread(new ParameterizedThreadStart(ThreadFunction));
    Workers[i] = newThread;
}

Each of these threads are using a function to block till new data arrives. Here is a basic skeleton of this function.

C#
private void ThreadFunction(object threadContext)
{
       CancellationToken token = (CancellationToken)threadContext;
       while (!token.IsCancellationRequested)
       {
              string Data = DataContainer.Take();
              ProcessData(Data);           
       }
}

And the container provides the capability to block incoming threads for adding new data to the container.

C#
public void Add(string data)
{
    DataContainer.Add(data);
}

That’s the simple recipe.

Points of Interest

Now to optimize and adjust RAM and CPU utilization, you need to adjust MaxWorkerThreads and MaxContainerSize. We need to collect a few statistics to understand the data flow pattern.

  1. Rate of input or how much data comes per second?
  2. Rate of output or how much data is processed per second?
  3. Average active threads
  4. Average container size

These metrics help in the following way:

  1. If Input Rate > Output rate, then container size will either grow forever or there will be increasing blocking threads at input, but will crash the program.
    • So Input Rate < Output Rate
  2. Average active threads, if active threads are mostly at maximum limit but container size is near zero then you can optimize CPU by using some RAM.
  3. Average container size is always at max limit, then more CPU threads will have to be created.

History

  • 29th June, 2020 - First version published
  • 30th June, 2020 - Formatting changes applied

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



Comments and Discussions

 
Questionmore real example Pin
Member 47225332-Jul-20 5:17
Member 47225332-Jul-20 5:17 
AnswerRe: more real example Pin
kiquenet.com5-Jul-20 22:23
professionalkiquenet.com5-Jul-20 22:23 
GeneralMy vote of 5 Pin
thaiwhere2-Jul-20 0:05
professionalthaiwhere2-Jul-20 0:05 
SuggestionAlternative for ASP .NET core Pin
Niemand2530-Jun-20 22:17
professionalNiemand2530-Jun-20 22:17 
GeneralRe: Alternative for ASP .NET core Pin
kiquenet.com5-Jul-20 22:27
professionalkiquenet.com5-Jul-20 22:27 
GeneralRe: Alternative for ASP .NET core Pin
Niemand256-Jul-20 1:25
professionalNiemand256-Jul-20 1:25 
GeneralRe: Alternative for ASP .NET core Pin
Niemand256-Jul-20 1:38
professionalNiemand256-Jul-20 1:38 
GeneralMy vote of 5 Pin
Gopal Banerjee30-Jun-20 6:06
Gopal Banerjee30-Jun-20 6:06 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.