Click here to Skip to main content
15,889,315 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi,

Before you type tl;dr, read the cliff-notes ^_^

Some of you will be a bit familiar with my project.

Basically: I read messages from the db. When I do I set a marker on the record called 'Started' to true. That way I won't read them again.

I put the messages into a SortedSet. I read from the list like a queue.

So - Here are some snippets. This will probably be quite a code dump - apologies:

Figure 1: [SELECT_WORKFLOW_MESSAGES_DUE] Stored Proc:
SQL
ALTER PROCEDURE [wfe].[SELECT_WORKFLOW_MESSAGES_DUE](
  @Date DATETIME,
  @SetStarted BIT = 1
  )
AS
BEGIN

	DECLARE @Results TABLE (workflow_message_id INT)
	DECLARE @Return TABLE(workflow_message_id INT PRIMARY KEY,
           workflow_message_fid INT,
           workflow_message_serialized_data XML	,
           workflow_message_due DATETIME,
           workflow_message_started BIT,
           workflow_message_complete BIT,
           workflow_message_workflowid INT,
           workflow_message_messageinfoid INT)

	--TEST ITEMS
	DECLARE @read INT,@set INT

	SET @read = 0
	SET @set = 0

	INSERT INTO @Results
	        ( workflow_message_id 
	        )
	SELECT m.workflow_message_id 
	FROM wfe.workflow_messages m
	WHERE m.workflow_message_started = 0 AND m.workflow_message_due < @DATE


	IF(@SetStarted=1)
	BEGIN
		UPDATE m
		SET workflow_message_started = 1
		FROM wfe.workflow_messages m
		INNER JOIN @Results r ON m.workflow_message_id = r.workflow_message_id
		
		SET @set = @@ROWCOUNT
	END
	
	INSERT INTO @Return
	        ( workflow_message_id ,
	          workflow_message_fid ,
	          workflow_message_serialized_data ,
	          workflow_message_due ,
	          workflow_message_started ,
	          workflow_message_complete ,
	          workflow_message_workflowid ,
	          workflow_message_messageinfoid
	        )
	SELECT m.workflow_message_id ,
	          m.workflow_message_fid ,
	          m.workflow_message_serialized_data ,
	          m.workflow_message_due ,
	          m.workflow_message_started ,
	          m.workflow_message_complete ,
	          m.workflow_message_workflowid ,
	          m.workflow_message_messageinfoid
	FROM wfe.workflow_messages m
		INNER JOIN @Results r ON m.workflow_message_id = r.workflow_message_id

		SET @read = @@ROWCOUNT

		IF(@read > 0 OR @set > 0)
		BEGIN
			INSERT INTO wfe.test
				(test_set, test_read)
			VALUES
				(@set,@read)
		END

		SELECT * from @Return
END

I have also included my test items in this. I wanted to make sure the duplicates where not creeping in here. @read and @set are always identical

I set the items to "started" and fetch them in a single SP so the action is guaranteed to return and set the same results every time. I'm using EF for db calls so this is a lot more effective than doing it in c#.

The SortedList PriorityQueue:
C#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Extensions.Linq;
using WorkflowDataAccess.Projections;

namespace Tools
{
    public class OrderedQueue<T>
    {

        private readonly SortedSet<T> _queue;

        private bool _cancel;

        public OrderedQueue(params  Func<T, IComparable>[] orderByFuncs)
        {
            _queue = new SortedSet<T>(orderByFuncs.ToComparer());
        }

        private readonly object _lockObj = new object();

        private SortedSet<T> Queue
        {
            get
            {
                lock (_lockObj)
                {
                    return _queue;
                }
            }
        }

        public T Next
        {
            get
            {
                Console.WriteLine("Ins and Outs count now at {0} in and {1} out",_inInt,_outInt);
                if (WaitForItems())
                    return Dequeue();
                return default(T);
            }
        }

        private int _inInt, _outInt;

        private void AddItem(T item)
        {
            _inInt ++;
            Queue.Add(item);
            var message = item as Message;
            if (message != null)
                Console.WriteLine(@"1. Message Id: {0}", message.Id);
            _waitForItems.Set();
        }

        public void Enqueue(T item)
        {
            lock (_lockObj)
            {
                AddItem(item);
            }
        }

        public void Enqueue(IEnumerable<T> items)
        {
            var enumerable = items as T[] ?? items.ToArray();
            lock (_lockObj)
            {
                if (items != null && enumerable.Any())
                {
                    foreach (T item in enumerable)
                    {
                        AddItem(item);
                    }
                }
            }
        }

        public T Dequeue()
        {
            T result;
            lock (_lockObj)
            {
                result = Peek();

                var message = result as Message;
                if (message != null)
                    Console.WriteLine(@"2. Message Id: {0}", message.Id);

                Queue.Remove(result);
                _outInt++;
            }
            return result;
        }

        public T Peek()
        {
            lock (_lockObj)
            {
                return Queue.FirstOrDefault();
            }
        }

        ManualResetEvent _waitForItems = new ManualResetEvent(false);

        private bool WaitForItems()
        {
            bool hasItems;
            lock (_lockObj)
            {
                hasItems = Queue.Any();
                if (!hasItems)
                {
                    _waitForItems = new ManualResetEvent(false);
                }
            }

            if (!hasItems)
            {
                Console.WriteLine("Waiting for items");
                _waitForItems.WaitOne();
            }
            lock (_lockObj)
            {
                hasItems = Queue.Any();
            }
            return hasItems;
        }

        public IEnumerable<T> StreamQueue()
        {
            while (!_cancel)
                while (WaitForItems())
                {
                    if (!_cancel)
                        yield return Dequeue();
                }
        }


        public void StopStream()
        {
            _cancel = true;
            _waitForItems.Set();
        }
    }
}

This is not as the original version, but it should still be thread safe. I want it to be generic but right now it does include some debugging information where it casts the T item as a Message.

I instantiate this class as follows:
C#
private readonly OrderedQueue<WorkflowDataAccess.Projections.Message>
           _messageQueue = new OrderedQueue<WorkflowDataAccess.Projections.Message>(m => -m.Urgency, m => m.Due, m=>m.Id);

The first two function are the sort-key, the last is used an a unique identifier otherwise items with the same urgency and due date get merged.

I run a separate BackgroundWorker to read into the queue. This is the method run in that worker:
C#
public void GatherMessages(BackgroundWorker bgw)
{
    while (!bgw.CancellationPending)
    {
        _messageQueue.Enqueue(WorkflowDataAccess.Projections.Message.SelectDueMessage());
                //Thread.Sleep(100);
    }
    _messageQueue.StopStream();
}



Finally my Message Parse method. This also runs withing a BackgroundWorker:
C#
public void ParseMessageQueue()
{
    We need to make sure that we do not load the same workflow twice at once
    List<int> liveWorkflowIds = new List<int>();

    //This lock is for the list above
    object lockObject = new object();

    WorkflowDataAccess.Projections.Message message;
    
    //get the next message.  This will only be null if the _messageQueue.StopStream(); is called
    while ((message = _messageQueue.Next) != null)
    {
        //Debug Info
        Console.WriteLine(@"3. Message Id: {0}", message.Id);

        var m = message;
        ThreadPool.QueueUserWorkItem(state =>
        {
            IMessage messageInstance = MessageLoader.LoadMessage(m);
            IWorkflow workflow = GetWorkflow(messageInstance);

            if (workflow == null)
            {
                //Some messages can be discarded; others cannot.  This exception has not been thrown for weeks (thank goodness)
                if (!messageInstance.SkipIfNotInWorkflowState)
                    throw new Exception(string.Format("Workflow not found for message id {0}", m.Id));
            }
            else
            {
                //The following must be run in sequence so I use lock to ensure that the list is safe
                bool canContinue;
                lock (lockObject)
                {
                    canContinue = !liveWorkflowIds.Contains(workflow.WorkflowId);
                    if (canContinue)
                        liveWorkflowIds.Add(workflow.WorkflowId);
                }

                if (canContinue)
                {
                    //This takes the most time.  It can create new messages but never one of the same type as it being parsed.  In some cases it can also requeue the message, but the current message should always be closed at the end if this thread.
                    workflow.ParseMessage(messageInstance);
                    lock (lockObject)
                        liveWorkflowIds.Remove(workflow.WorkflowId);
                }
                else
                {
                    Console.WriteLine(@"3.5. Message Id{0}",m.Id);
                    //Create a new instance with a later due date and dave it to the database
                    messageInstance.Requeue();
                }
            }
            //Close off this message.  When I am confident all works, this would be a good time to delete the historic message
            messageInstance.MarkComplete();
            Console.WriteLine(@"4. Message Id{0}",m.Id);
        });
    }
}




The GatherMessages method had a delay of 1 second. I still saw a few duplicates. I removed the delay to see what would happen at full throttle and I got LOADS of duplicate messages in the database. Not just one for each message but any type of message could have none, one or up to 3000 duplicates. There is no ceiling to the number of duplicated. I just stopped the process as soon as I got in this morning (don't panic - just local testing ATM)

Is anyone able to pick apart my code, here? I know it's complicated but this is the simplest I can break it down to :S

Please help! I cannot see where the duplicates are spawning from.

Can you think of a way that I can catch the first duplicate with a breakpoint?

I'm banging my head on the desk. I hope this is not tl;tr (to-long-[to]-read) ^_^

Thanks


EDIT: By Request here is messageInstance.Requeue()

C#
public void Requeue()
{
    int id = Id;
    int? workflowId = WorkflowId;
    bool started = Started;
    bool complete = Complete;
    DateTime due = Due;

    Id = 0;
    WorkflowId = null;
    Started = false;
    Complete = false;
    Due = DateTime.Now+TimeSpan.FromSeconds(10);
    Save();

    Id = id;
    WorkflowId = workflowId;
    Started = started;
    Complete = complete;
    Due = due;

}

I am still in two minds about this. This method exists in the abstract BaseMessage class. I didn't know how I could duplicate the message as the derived class will have some properties that this BaseMessage doesn't have.
The Save() method will insert a new item if the Id = 0



EDIT2: Update - I am getting closer:
Line 1223: 1. Message Id: 3332
Line 1366: 2. Message Id: 3332
Line 1367: 3. Message Id: 3332
Line 3104: 4. Message Id: 3332
Line 3107: 1. Message Id: 3332
Line 3111: 2. Message Id: 3332
Line 3112: 3. Message Id: 3332
Line 4199: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 5061: 1. Message Id: 3332
Line 5065: 2. Message Id: 3332
Line 5066: 3. Message Id: 3332
Line 5069: 4. Message Id: 3332
Line 5272: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 6707: 1. Message Id: 3332
Line 6710: 4. Message Id: 3332
Line 6746: 2. Message Id: 3332
Line 6747: 3. Message Id: 3332
Line 8116: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 8219: 3.5. Message Id: 3332
Line 8221: 1. Message Id: 3332
Line 8238: 4. Message Id: 3332
Line 8342: 2. Message Id: 3332
Line 8344: 3. Message Id: 3332
Line 9148: 4. Message Id: 3332
Line 9416: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)


So the message was read more that once, and there was some significant time inbetween.

I won't mark as solved just yet, but I might just get myself there

EDIT: Solution.

Turns out is was nothing to do with the threading. My BaseMessage has the Started property but IMessage did not. The Deserialize method did not set Started so as soon as the message was saved back to the databse it would lose this value >_<</xml>
Posted
Updated 19-Jun-15 21:11pm
v4
Comments
virusstorm 19-Jun-15 7:37am    
Looking through your code, I think your issue might be at the database level. I wonder if you are running into a race condition where two calls are made at the exact same time, so the stored procedure call ends up working with the exact same data set. Try wrapping your SQL in a transaction. This would cause a table lock making sure each execution of the stored procedure is working a clean data set.
Andy Lanng 19-Jun-15 7:45am    
The call is only run in the loop (not threaded) and the c# calls the sp within a trasactionScope wrapper. This is true for everything in my DataAccess project but I didn't include that detail here.

If I received the same message twice then the sorted list would merge the duplicates as the sort key would be identical

Thanks for taking a look, though ^_^
Tomas Takac 19-Jun-15 7:49am    
As far as I can see only one thread is reading to the queue and another is writing to it. Moreover the queue seems to be thread safe.

What exactly does messageInstance.Requeue() do?
Andy Lanng 19-Jun-15 7:57am    
I was worried about this method. I've played around with it a bit. I've added it to the end of the question.
Thanks for taking a look ^_^
Tomas Takac 20-Jun-15 2:47am    
How it's going, solved already? Are the message duplicated when read from database? Or are fetched multiple times from the queue?

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900