Click here to Skip to main content
15,889,216 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi,

I have some code that gets a list of tasks from a db message queue. and yields results. This is running quite slowly so I want to multi-thread it. How can I best implement a multi-threaded version of this:

C#
private IEnumerable<Data> GetData()
{
  while (!_cancel)
  {
     foreach (var dueMessage in F.MessageFactory.GetDueMessages())
     {
        var item = ProcessMessage(dueMessage);
        if (item != null)
        yield return item;
    }
  }
}

In each F.MessageFactory.GetDueMessages() there may be zero to hundreds of messages waiting (many processes are scheduled at the same time in some cases)

Ideally I would like each item to be yielded as soon as it is ready, but the message is (currently) only set to complete within ProcessMessage(dueMessage).

This means that I would either have to use a WaitHandle for each while loop or set the message to complete earlier in the process, such as as soon as the foreach loop hits it. EDIT: such as as soon as F.MessageFactory.GetDueMessages() returns the list.

So, My question is: How can I safely multi-thread this method while yielding the results as soon as possible?


Thanks guys ^_^
Andy
Posted
Updated 15-Jun-15 4:18am
v6
Comments
LLLLGGGG 15-Jun-15 8:40am    
What does GetDueMessages() returns? An iterator or a list? And do you have access to the source code of GetDueMessages()?
Andy Lanng 15-Jun-15 8:41am    
Yup - I wrote all of this.
GetDueMessages() returns a list of the db Message type. This means anything with a due date < now and complete == false.
LLLLGGGG 15-Jun-15 8:45am    
mhh. If it is a list and you have not the access to its source code, I do not know how to help you because the method first fetch results from the DB and then returns them to you in form of a List. It means that yo have to wait until the method finishes to fetch information and then you can iterate through the list it returns you. An idea might be to start fetching the list before starting the method, maybe in another thread and then, when it's needed, I.E. when you start the method, you have already the list stored... I'm sorry if I'm not so helpful :(
Andy Lanng 15-Jun-15 8:48am    
I do have access to the source. GetDueMessages is not slow, ProcessMessage is slow. I'm fine waiting for GetDueMessages to complete. Once it has complete then I need to yield the result of ProcessMessage asap.
LLLLGGGG 15-Jun-15 9:02am    
Sorry, I have misunderstood. You can try to replace the ForEach loop with a Parallel.ForEach. Things should become faster. But you cannot use a yield statement inside a Parallel.ForEach loop, so have a look at this maybe: http://stackoverflow.com/questions/8412879/parallel-foreach-yield-return
Try it and tell me how it goes...

EDIT: you may try to use an external list and fire an event (inside the parallel loop) whenever an item is added to that list (by the Parallel loop) and then in that event you can do whatever you are doing with this method, but instead of having an iterator, you have a simple list or a Queue is better in this case. So you fill a queue inside the Parallel loop and every time an element is added, you can fire an event and the handler "handles" the results and Dequeues them... Can this work for you?

1 solution

Edit: The solution I went for including tweaks.

If you follow the comments, my issue is that there is once process that takes all the time in my ProcessMessage method. There is barely anything I can do while I'm waiting for it to do it's stuff so async was not much of an improvement.

I went for a Reactive Extension solution instead.

Note - I need to return a method that returns Enumberable.
C#
        private void GetFlighData()
{
    while (!_cancel)
    {
        Parallel.ForEach(F.MessageFactory.GetDueMessages(), dueMessage =>
        {

            var item = ProcessMessage(dueMessage);
            if (item != null)
            {
                //Event instead of yield
                OnGetFlightData(item);
            }
        });
    }
}

private delegate void GetAFlightDataHandler(FlightData flightData);

private event GetAFlightDataHandler FlightData;

private void OnGetFlightData(FlightData flightData)
{
    if (FlightData != null)
        FlightData(flightData);
}

BackgroundWorker _worker = new BackgroundWorker();

//This is my property that returns a method.  It no longer returns GetFlightData but creates the method from observable.FromEvent
public override GetFlighDataMethod GetFlighDataMethod
{
    get
    {
        //Start the queue
        _worker.DoWork += delegate { GetFlighData(); };
        _worker.RunWorkerAsync();

        // create the method
        return () =>
        {
            var flightDataReceived = Observable.FromEvent<GetAFlightDataHandler, FlightData>(handler =>
            {
                GetAFlightDataHandler fdHandler = (e) =>
                {
                    handler(e);
                };
                return fdHandler;
            },
                fdHandler => FlightData += fdHandler,
                fdHandler => FlightData -= fdHandler
                );

            return flightDataReceived.ToEnumerable();
        };
    }
}
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900