Click here to Skip to main content
15,888,286 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
The Load balancer accepts incoming requests, re-sends them to multiple servers, and returns the answer from each chosen server to the corresponding client.

C#
//dispatcher.cs
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace LoadBallancer {
    public class Dispatcher
    {
        // set the TcpListener on port 8890
        int port = 8890; 
        TcpListener server;
        List<CoreComm> processors = new List<CoreComm>();

        static void Main()
        {
            var dispatcher = new Dispatcher();
            dispatcher.ListenForRequests();
        }

        public Dispatcher()
        {
            server = new TcpListener(IPAddress.Any, port);
        }

        public void ListenForRequests()
        {
            server.Start();
            while (true)
            {
                try
                {
                    // Start listening for client requests
                    // Enter the listening loop

                    Console.Write("Waiting for a connection... ");

                    lock(server)
                    {
                        // Perform a blocking call to accept requests.
                        TcpClient client = server.AcceptTcpClient();

                        Console.WriteLine("Connected.");

                        ThreadPool.QueueUserWorkItem(ThreadProc, client);
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Exception: {0}", e);
                }
            }
        }
        private static void ThreadProc(object obj)
        {
            var processor = new CoreComm((TcpClient)obj);
            processor.ReSendRequest(null);
        }
    } 
}

// CoreComm.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;

using System.Configuration;
using System.Threading;

namespace LoadBallancer
{
    public class IamServer
    {
        public string   Url { get; set; }
        public int      Port { get; set; }
        public string   Type { get; set; }
    }

    public class CoreComm
    {
        // Buffer for reading data
        int bufSize = 1024;
        static List<IamServer> servers = new List<IamServer>();

        protected TcpClient acceptorSocket;
        NetworkStream acceptorStream;

        protected TcpClient clientSocket;

        protected List<KeyValuePair<int, byte[]>> requestPackets = new List<KeyValuePair<int, byte[]>>();

        static CoreComm()
        {
             // reading config for servers' parameters
        }

        public CoreComm(TcpClient socket)
        {
            acceptorSocket = socket;
            // Get a stream object for reading and writing
            acceptorStream = acceptorSocket.GetStream();
        }

        private void ReadFromAcceptorStream()
        {
            // Loop to receive all the data sent by the client.
            while (acceptorStream.DataAvailable)
            {
                byte[] requestBuffer = new byte[bufSize];
                int i = acceptorStream.Read(requestBuffer, 0, requestBuffer.Length);
                requestPackets.Add(new KeyValuePair<int, byte[]>(i, requestBuffer));
            }
      }

        public void ReSendRequest(Object threadContext)
        {
            ReadFromAcceptorStream();

            var servers = GetDestinationServers(null);

            if (servers.Count == 0)
                acceptorStream.Write(ErrMessage, 0, ErrMessage.Length);
            else
                // for debug only send the first in the list
                SendRequestToServer(servers[0]);

            // Shutdown and end connection
            acceptorSocket.Close();
        }

        public void SendRequestToServer(IamServer server)
        {
            clientSocket = new TcpClient();
            clientSocket.Connect(server.Url, server.Port);
            NetworkStream clientStream = clientSocket.GetStream();

            foreach (var packet in requestPackets)
                clientStream.Write(packet.Value, 0, packet.Key);

            var requestBuffer = new byte[bufSize];

            while (clientStream.DataAvailable)
            {
                int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length);
                acceptorStream.Write(requestBuffer, 0, i);
            }

            clientSocket.Close();
        }

        // Mock up of the real load balancing algorithm
        static int lastServerAnswered = 0;

        public List<IamServer> GetDestinationServers(string requestData)
        {
            // processing to determine the query destinations
            lock(servers)
            {
                // patch
                int currentServerNum = lastServerAnswered;
                lastServerAnswered ++ ;
                if (lastServerAnswered > servers.Count - 1)
                    lastServerAnswered = 0;

                return new List<IamServer> { servers[currentServerNum] };
            }
        }

    }
}


What I have tried:

it works correctly when I debug while setting break-point in the code and does not work otherwise. Any ideas?
Posted
Updated 3-Jun-16 15:24pm

1 solution

The problem was found to be in the code:
C#
while (clientStream.DataAvailable)
{
      int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length);
      acceptorStream.Write(requestBuffer, 0, i);
}


Actually it happened that for some packets clientStream.DataAvailable was false even if there was still remaining data to be received. The solution is based on the application level protocol for which the Load Balancer had been developed that sends in the first 4 bytes of the stream the number of the total bytes that are sent.The code becomes as follows:
C#
var responseBuffer = new byte[bufSize];

int numTotalBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length);
int numBytesToStream = GetNumBytesInTheStream(responseBuffer);

acceptorStream.Write(responseBuffer, 0, numTotalBytesStreamed);

while (numBytesToStream > numTotalBytesStreamed)
{
    while (!clientStream.DataAvailable)
       Thread.Sleep(1);

       int numMoreBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length);
       acceptorStream.Write(responseBuffer, 0, numMoreBytesStreamed);
       numTotalBytesStreamed += numMoreBytesStreamed;
}
acceptorStream.Flush();
clientSocket.Close();


The solution works and is extremely stable for continuous loads of hundreds of requests per second.
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900