Click here to Skip to main content
15,917,808 members
Please Sign up or sign in to vote.
3.00/5 (2 votes)
See more:
I want to read the MSMQ where queue data in byte and numbers of queues generated in 1 mins about 1500.

If read continuously queue, cpu goes on 30%, and after some time it stopped.
I need to read queue in high volume upto 4 hrs.

I want safe thread reading in such a manner that shouldn't be blocked.
Actually I am not good about threading so can you please help me out?

Currently I am reading in this manner:
bool ProcessStatus; //process
Thread _UDPthreadConsme;

private void btn_receive_Click(object sender, EventArgs e)
       {

       if (MessageQueue.Exists(@".\private$\myquelocal"))
            {

                ThreadStart _processrcs = new ThreadStart(receivemessages);
                _UDPthreadConsme = new Thread(_processrcs);
                ProcessStatus = true;
                _UDPthreadConsme.Start();
             } 
       }


private void receivemessages()
       {
           MessageBox.Show("Start");
           while (ProcessStatus)
           {
                   try
                        {

                            // Connect to the a queue on the local computer.
                            MessageQueue myQueue = new MessageQueue(@".\private$\myquelocal");


                            System.Messaging.Message[] myMessagecount = myQueue.GetAllMessages();

                            if (myMessagecount.Length <= 0)
                                return;

                       
                            myQueue.Formatter = new BinaryMessageFormatter();
                         
                            // Receive and format the message. 
                            System.Messaging.Message myMessage = myQueue.Receive();
                            byte[] buffer = (byte[])myMessage.Body;
                                    

// here i convert buffer to its related structure and then insert the values in database sqlserver.

           }
       }


[edit]Code block added, "Ignore HTML..." option disabled - OriginalGriff[/edit]
Posted
Updated 10-Mar-11 21:27pm
v3
Comments
Dalek Dave 11-Mar-11 3:28am    
Edited for Grammar and Readability.

All threads are safe, only the code may be not.
Please read answers on these past Questions:

Problem with Treeview Scanner And MD5[^]
Control.Invoke() vs. Control.BeginInvoke()[^]
Multple clients from same port Number[^]
How to pass ref parameter to the thread[^]

I did not mean to repeat my references, but I in case I forgot some: here I tried to overview:
How to get a keydown event to operate on a different thread in vb.net[^]

The recommendations I gave on threading were for different applications and different UI libraries, but they all are applicable to your case.
Basically, you need to create a regular thread (probably not BackgroundWorker in your case) from the very beginning and use it permanently, feeding data to it. Very likely that you will need to communication between threads via blocking queue. Pay attention for explanation of using Invoke or BeginInvoke for using UI.

—SA
 
Share this answer
 
Use something like this to initialize the queue:
string queuePath = GetInputQueueName();
if (!MessageQueue.Exists(queuePath))
{
  inputMessageQueue = MessageQueue.Create(queuePath);
}
else
{
  inputMessageQueue = new MessageQueue(queuePath);
}
inputMessageQueue.Formatter = new BinaryMessageFormatter();
inputMessageQueue.ReceiveCompleted += OnReceiveCompleted;
inputMessageQueue.BeginReceive();



Here is a simple implmentation of OnReceiveCompleted, remember
that this code is not executing in the UI thread, so use the Control.InvokeRequired[^] / Control.Invoke[^] pattern to interact with the UI thread.
private void OnReceiveCompleted(Object source,ReceiveCompletedEventArgs asyncResult)
{
try
 {
  MessageQueue mq = (MessageQueue)source;
                
  if (mq != null)
  {
   try
   {
    Message message = null;
    try
    {
     message = mq.EndReceive(asyncResult.AsyncResult);
    }
    catch (Exception ex)
    {
     if (isRunning)
     {
       LogException(ex, System.Reflection.MethodBase.GetCurrentMethod());
     }
    }
    if (message != null)
    {
      QueuedInput queuedInput = message.Body as QueuedInput;
      if (queuedInput != null)
      {
        // Process data here
      }
    }
   }
   finally
   {
    if (isRunning)
    {
     // Tell the system that you 
     // are ready to process next
     // message
     mq.BeginReceive(); 
    }
   }
  }
  return;
 }
 catch (Exception exc)
 {
  LogException(exc, System.Reflection.MethodBase.GetCurrentMethod());
 }
}


If this code is too slow, then it's probably the deserialization that kills the performance - a complex object graph takes a significant amount of time to deserialize - but you may find an answer to that in this article: Optimizing Serialization in .NET[^]

I've used something quite similar to this to handle many times the load you are taking about.

Update
Microsoft Message Queuing – A simple multithreaded client and server[^] - I hope you find that this solves your problems :)

Update 2
Microsoft Message Queuing – Log Trade information using Microsoft SQL Server[^] - another take at describing how to solve your problems :)

Regards
Espen Harlinn
 
Share this answer
 
v5
Comments
Amit kumar pathak 12-Mar-11 8:22am    
when i tried with this cpu goes to 40% and suddenly stooped exe.. kindly help me out..
Espen Harlinn 12-Mar-11 13:24pm    
I've written a codeproject article on the subject - it will show up on the CP artcle page as soon as it has been reviewed by the editors. The solution is capable of handling 500 000 messages in 3 minutes on a regular laptop. Hopefully this will get a better response - a 2 is kind of painful :)
Amit kumar pathak 14-Apr-11 5:27am    
Thanks to your solution your solution is correct in normal case actually in my program problem was inPtr and Marshal.Copy(buffer, 0, HeaderPtr, buffer.Length);. here it was taking so much of time, now i have use inPtr with GCHandle and its working very smooth.
Amit kumar pathak 14-Mar-11 3:12am    
i run your code using source provided by you(http://www.codeproject.com/KB/cs/Harlinn_Messaging.aspx) worked well, but when i implement it in my program then it gets hanged within a minute and stopped working.... Kindly help me out your help to make running program will be so helpfull...if you provide your email then i'll send fulll code ...

namespace udpConsume
{

public partial class FormReceive : Form
{
private MessageQueue messageQueue;
private bool isRunning;

public FormReceive()
{
InitializeComponent();
InitializeQueue();
}

private void InitializeQueue()
{
receivedCounter = 0;
string queuePath = @".\private$\myquelocal";
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
isRunning = true;
messageQueue.Formatter = new BinaryMessageFormatter();
messageQueue.ReceiveCompleted += OnReceiveCompleted;
messageQueue.BeginReceive();
}

private delegate void LogMessageDelegate(string text);
private void LogMessage(string text)
{
if (InvokeRequired)
{
Invoke(new LogMessageDelegate(LogMessage), text);
}
else
{
messageTextBox.AppendText(text + Environment.NewLine);
}
}

private int receivedCounter;
private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;

if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
}
catch (Exception ex)
{
LogMessage(ex.Message);
}
if (message != null)
{
byte[] buffer = (byte[])message.Body;
if (buffer.Length > 0)
{
receivedCounter++;
if ((receivedCounter % 10) == 0)
{
TBCastMessageHeader bcastHeader = new TBCastMessageHeader();
IntPtr bcastHeaderPtr;
bcastHeaderPtr = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(TBCastMessageHeader)));
Marshal.Copy(buffer, 0, bcastHeaderPtr, buffer.Length);
bcastHeader = (TBCastMessageHeader)(Marshal.PtrToStructure(bcastHeaderPtr, typeof(TBCastMessageHeader)));


//MessageBox.Show(bcastHeader.MessageCode.ToString());
try
{
LogMessage(bcastHeader.MessageCode.ToString());
string ChngSign = "";

switch (bcastHeader.MessageCode)
{

case 1001: //usp_Insert1001data

TMarketUpdateMsg marketupdMsg;
IntPtr marketPtr;
marketPtr = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(TMarketUpdateMsg)));
Marshal.C
Espen Harlinn 14-Mar-11 4:15am    
My initial gut feeling is that the source of your trouble is this:
TBCastMessageHeader bcastHeader = new TBCastMessageHeader();
IntPtr bcastHeaderPtr;
bcastHeaderPtr = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(TBCastMessageHeader)));
Marshal.Copy(buffer, 0, bcastHeaderPtr, buffer.Length);
bcastHeader = (TBCastMessageHeader)(Marshal.PtrToStructure(bcastHeaderPtr, typeof(TBCastMessageHeader)));


//MessageBox.Show(bcastHeader.MessageCode.ToString());
try
{
LogMessage(bcastHeader.MessageCode.ToString());
string ChngSign = "";

switch (bcastHeader.MessageCode)
{
case 1001: //usp_Insert1001data
TMarketUpdateMsg marketupdMsg;
IntPtr marketPtr;
marketPtr = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(TMarketUpdateMsg)));
Marshal.C

As you can see from my code I cast directly to the desired type - this would also work for a class hierarchy - casting to the base type - then later cast to desired derived type based on bcastHeader.MessageCode - My email is available on my home site harlinn.com, or if you're in a hurry - espen.harlinn@goodtech.no for commercial work - good luck

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900