Hello,
I have a listener thread that listens to UDP transactions and en-queue them into _TransactionQ, I also have another thread that de-queue the transactions from _TransactionQ, it will do a lot of processing to confirm the correct order of the transactions, it also has a logic to detect missing transactions based on a sequence number and to request a reload from the server, this thread will en-queue the confirmed transactions in _ConfirmedMessageQueue.
I use SyncLock to Synchronize access to the three Queues.
I have a problem with high CPU utilization (70%-80%)!!!!
Imports System.Net
Imports System.Net.Sockets
Imports System.Threading
Public Class UDPListener
#Region "Variables"
Private _port As Integer
Private _listenThread As Thread 'Queue thread
Private _dequeueThread As Thread 'DeQueue thread
Private _ListenThreadIsAlive As Boolean
Private _DequeueThreadIsAlive As Boolean
Private _receivedData As String
Private _udpClient As UdpClient
Private _mySequenceNumber As Integer
Private _address As String
Private _ConfirmedMessageQueue As Queue
Private _TransactionQ As Queue
#End Region
#Region "Properties"
Public Property ListenThreadIsAlive() As Boolean
Get
Return _ListenThreadIsAlive
End Get
Set(ByVal value As Boolean)
_ListenThreadIsAlive = value
End Set
End Property
Public Property DequeueThreadIsAlive() As Boolean
Get
Return _DequeueThreadIsAlive
End Get
Set(ByVal value As Boolean)
_DequeueThreadIsAlive = value
End Set
End Property
Public ReadOnly Property udpClient() As UdpClient
Get
Return _udpClient
End Get
End Property
Public Property mySequenceNumber() As String
Get
Return Interlocked.Read(_mySequenceNumber)
End Get
Set(ByVal value As String)
Interlocked.Exchange(_mySequenceNumber, value)
End Set
End Property
Public Property ConfirmedMessageQueue() As Queue
Get
Return Queue.Synchronized(_ConfirmedMessageQueue)
End Get
Set(ByVal value As Queue)
_ConfirmedMessageQueue = value
End Set
End Property
Public Property TransactionQ() As Queue
Get
Return Queue.Synchronized(_TransactionQ)
End Get
Set(ByVal value As Queue)
_TransactionQ = value
End Set
End Property
#End Region
#Region "Events"
Public Event dataArrived()
#End Region
'Constructor
Sub New(ByVal address As String, ByVal port As Integer, ByVal ExpectedSequenceNumber As Integer)
Me._port = port
Me._address = address
Me.mySequenceNumber = ExpectedSequenceNumber
Me.initialize()
End Sub
Private Sub initialize()
Try
ConfirmedMessageQueue = New Queue
TransactionQ = New Queue
_listenThread = New Thread(AddressOf listen)
_dequeueThread = New Thread(AddressOf dequeue)
_udpClient = New UdpClient(Sockets.AddressFamily.InterNetwork)
Dim localEP As EndPoint = CType(New IPEndPoint(IPAddress.Any, _port), EndPoint)
Dim mcastopt As MulticastOption
mcastopt = New MulticastOption(IPAddress.Parse(_address), IPAddress.Loopback)
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1)
_udpClient.Client.Bind(localEP)
_listenThread.Name = "udpListenThread"
_dequeueThread.Name = "udpDequeueThread"
_ListenThreadIsAlive = True
_listenThread.Start()
_DequeueThreadIsAlive = True
_dequeueThread.Start()
Catch ex As Exception
' Write to log
End Try
End Sub
Private Sub listen()
_udpClient.JoinMulticastGroup(IPAddress.Parse(Me._address))
Dim RemoteIpEndPoint As New IPEndPoint(IPAddress.Parse(Me._address), Me._port)
Dim receiveBytes As [Byte]() = Nothing
Dim TransactionData As String = ""
Dim TransactionSeqNum As String = ""
While _ListenThreadIsAlive
receiveBytes = _udpClient.Receive(RemoteIpEndPoint)
If receiveBytes.Count > 0 Then
TransactionData = Encoding.ASCII.GetString(receiveBytes)
TransactionSeqNum = TransactionData.Substring(0, TransactionData.IndexOf(Chr(3))).Substring(1)
TransactionData = TransactionData.Substring(TransactionData.IndexOf(Chr(3)))
Dim pair As New DictionaryEntry(TransactionSeqNum, TransactionData)
SyncLock TransactionQ.SyncRoot
TransactionQ.Enqueue(pair)
End SyncLock
End If
End While
Try
SyncLock _listenThread
_listenThread.Abort()
End SyncLock
Catch ex As Threading.ThreadAbortException
'Ignore ThreadAbortException
End Try
End Sub
Private Sub dequeue()
While _DequeueThreadIsAlive
Try
Dim TransactionSeqNum As Integer
SyncLock TransactionQ.SyncRoot
Do While TransactionQ.Count > 0
Dim trans As DictionaryEntry
trans = TransactionQ.Dequeue
'
'
' LONG PROCESSING
'
'
SyncLock ConfirmedMessageQueue.SyncRoot
ConfirmedMessageQueue.Enqueue(_receivedData)
incrementMySequenceNumber()
End SyncLock 'ConfirmedMessageQueue
RaiseEvent dataArrived()
Loop
End SyncLock 'TransactionQ
Catch ex As Exception
' Write to log
End Try
End While
Try
SyncLock _dequeueThread
_dequeueThread.Abort()
End SyncLock
Catch ex As Threading.ThreadAbortException
'Ignore ThreadAbortException
End Try
End Sub
'Thread safe increment for mySequenceNumber
Private Sub incrementMySequenceNumber()
Interlocked.Increment(_mySequenceNumber)
End Sub
' close connection, abort thread
Public Sub stopListening()
If Not IsNothing(_udpClient) Then
If _listenThread.IsAlive Then
_ListenThreadIsAlive = False
End If
If _dequeueThread.IsAlive Then
_DequeueThreadIsAlive = False
End If
_udpClient.Close()
CType(_udpClient, IDisposable).Dispose()
End If
End Sub
Protected Overrides Sub Finalize()
MyBase.Finalize()
End Sub
End Class