Click here to Skip to main content
15,890,438 members
Articles / ZeroMQ

ZeroMQ #5 : Sending From Multiple Sockets

Rate me:
Please Sign up or sign in to vote.
5.00/5 (2 votes)
2 Sep 2014CPOL6 min read 20.3K   5  
Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far.

Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far. We will however be beefing things up a bit, and shall examine several ways in which you can have more than one thread pushing messages to the server and getting responses, which is a fairly typical requirement (at least in my book it is).

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

One Thing Before We Start

As you may have realised by now, ZeroMQ is a messaging framework, and as such, promotes the idea of lock free messaging. I also happen to think this is a very good idea. You can achieve an excellent throughput of messages and save yourself a lot of synchronization pain, if you try and avoid shared data structures. By doing this you will also be saving yourself the pain of having to synchronize access to them. So in general try and work with ZeroMQ in the way it wants to be worked with, which is via message passing, and avoiding locks, shared data structures.

Setting The Scene For This Post

Ok so we are nearly at the point where we can start to look at some code, but before we do that, let’s just talk a little bit more about what this post is trying to discuss.

In the code I typically write, it is quite common for a bunch of client threads all to be running at once, each capable of talking to the server. If this sounds like a requirement that you have had to deal with, then you may find this post of use, as this is exactly the scenario this post is aimed at solving.

As the aim of this post is to have asynchronous client, we need a asynchronous server too, so we use DealerSocket(s) for the client(s) and a RouterSocket for the server.

As with most things there is more than one way to skin a cat, so we will look at a couple of options, each with the their own pros/cons.

Option 1 : Each Thread Has It’ Own DealerSocket

The first options does need a bit of .NET threading knowledge, but if you have that, then the idea is a simple one. For each client thread we also create a dedicated DealerSocket that *should be* used exclusively by that thread.

This is achieved using the ThreadLocal<T> .NET class, which allows us to have a DealerSocket per thread. We add each of the client created DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

The obvious downside to this approach is that there will be more socket(s) created on the client side. The upside is that it is very easy to implement, and just works.

Here is an image showing what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace ManualThreadingDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use ThreadLocal<DealerSocket> where each thread has
            //  its own client DealerSocket to talk to server
            //2. Each thread can send using it own socket
            //3. Each thread socket is added to poller
            
            ThreadLocal<DealerSocket> clientSocketPerThread = 

                new ThreadLocal<DealerSocket>();
            int delay = 3000;
            Poller poller = new Poller();
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");
                    //start some threads, each with its own DealerSocket
                    //to talk to the server socket. Creates lots of sockets, 

                    //but no nasty race conditions no shared state, each 

                    //thread has its own socket, happy days
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            DealerSocket client = null;
                            if (!clientSocketPerThread.IsValueCreated)
                            {
                                client = ctx.CreateDealerSocket();
                                client.Connect("tcp://127.0.0.1:5556");
                                client.ReceiveReady += Client_ReceiveReady;
                                clientSocketPerThread.Value = client;
                                poller.AddSocket(client);
                            }
                            else
                            {
                                client = clientSocketPerThread.Value;
                            }
                            while (true)
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(state.ToString());
                                client.SendMessage(messageToServer);
                                Thread.Sleep(delay);
                            }
                        },string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }
                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);
                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }
                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }
        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }
        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

If you were to run this, you would see something like this:

image

Option 2 : Each Thread Delegates Of To A Local Broker

The next example keeps the idea of a separate threads that want to send message(s) to the server. This time however we will use a broker on the client side. The idea being that the client threads will push to a shared data queue, I know I have told you to avoid shared data structures. Thing is, this is not a shared data structure it is just a thread safe queue, that many threads can write to. Where as a a shared data structure may mean several threads all trying to update the current Bid rate of an Fx option quote price. There is a difference. OK the shared queue will have some synchronization somewhere to make it thread safe, thankfully we can rely on the good work of the PFX team at Microsoft for that. Those guys are smart and I am sure the Concurrent collections namespace is pretty well designed and can be trusted to be pretty optimal.

Again we need to call on a bit of .NET know how, so for the centralized queue we use a ConcurrentQueue<T>. All client threads will enqueue their messages for the server here.

There will also be another thread started. This extra thread is the one that will be processing the messages that have been queued onto the centralized queue. When there is a message taken of the centralized queue it will be sent to the server. The thing is only the thread that reads from the centralized queue will send messages to the server.

As we still want messages to be sent out asynchronously we stick with using a DealerSocket, but since their is now only one place where we send messages to the server we only need a single DealerSocket.

We add the SINGLE DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

This is more complex than the first example as there are more moving parts, but we no longer have loads of sockets being create. There is just one.

As before here is a diagram of what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace ConcurrentQueueDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use many threads each writing to ConcurrentQueue
            //2. Extra thread to read from ConcurrentQueue, and this is the one that 

            //   will deal with writing to the server
            ConcurrentQueue<string> messages = new ConcurrentQueue<string>();
            int delay = 3000;
            Poller poller = new Poller();
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");
                    //start some threads, where each thread, will use a client side
                    //broker (simple thread that monitors a CooncurrentQueue), where
                    //ONLY the client side broker talks to the server
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            while (true)
                            {
                                messages.Enqueue(state.ToString());
                                Thread.Sleep(delay);
                            }
                        }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }
                    //single sender loop
                    Task.Factory.StartNew((state) =>
                    {
                        var client = ctx.CreateDealerSocket();
                        client.Connect("tcp://127.0.0.1:5556");
                        client.ReceiveReady += Client_ReceiveReady;
                        poller.AddSocket(client);
                        while (true)
                        {
                            string clientMessage = null;
                            if (messages.TryDequeue(out clientMessage))
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(clientMessage);
                                client.SendMessage(messageToServer);
                            }
                        }
                    }, TaskCreationOptions.LongRunning);
                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);
                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }
                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }
        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }
        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

If you were to run this, you would see something like this:

image

Option 3 : Use NetMQScheduler

The final option is to use the NetMQ library class : NetMQScheduler. I think the best place to start with that is by reading the link I just included. Then come back here.

…….

…….

Time passes

…….

…….

Oh hello you’re back. Ok so now you know that the NetMQScheduler offers us a way to use TPL to schedule work and that there is a Poller that we pass into the NetMQScheduler. Cool.

The NetMQScheduler is a custom TPL scheduler, which allows us to create tasks that we want done, and it will take care of the threading aspects of them. Since we told the NetMQScheduler about the Poller we want to use we are able to hook up the ReceiveReady event and use that to get messages back from the server.

The difference here is that since we are using TPL and NetMQ we need to use TPL Task(s) and the NetMQScheduler instance whenever we want to Send/Receive.

To be honest, I think I like this design the least, as it mixes up too many concepts, and the TPL stuff tends to be mixing a bit too much with the ZeroMQ goodness for my taste. I did however just want to show this example for completeness.

So the code for this example has two parts. A simple client, and then the code that spins up a client instance and then multiple threads that use the client instance to send messages to the server. There is also a basic server loop (which I will show below under the title “The Rest”)

Client Code

Here is the client code, where it can be seen that we create a NetMQScheduler which gets handed a new Poller instance to use internally. The idea is that anyone can send a message simply by calling the clients SendMessage(..) method

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
namespace NetMQSchedulerDemo
{
    public class Client : IDisposable
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Poller poller;
        private NetMQScheduler scheduler;
        private NetMQSocket clientSocket;
        public Client(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }
        public void Start()
        {
            poller = new Poller();
            clientSocket = context.CreateDealerSocket();
            clientSocket.ReceiveReady += clientSocket_ReceiveReady;
            clientSocket.Connect(address);
            scheduler = new NetMQScheduler(context, poller);
            Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning);
        }
        void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            string result = e.Socket.ReceiveString();
            Console.WriteLine("REPLY " + result);
        }
        public async Task SendMessage(NetMQMessage message)
        {
            // instead of creating inproc socket which listen to messages and then send 

            //to the server we just creating task and run a code on
            // the poller thread which the the thread of the clientSocket
            Task task = new Task(() => clientSocket.SendMessage(message));
            task.Start(scheduler);
            await task;
            await ReceiveMessage();
        }
        public async Task ReceiveMessage()
        {
            Task task = new Task(() =>
            {
                var result = clientSocket.ReceiveString();
                Console.WriteLine("REPLY " + result);
            });
            task.Start(scheduler);
            await task;
        }
        public void Dispose()
        {
            scheduler.Dispose();
            clientSocket.Dispose();
            poller.Stop();
        }
    }
}

The Rest

And here is the rest of the code that is responsible for spinning up the client and extra threads to push messages through the client (using the SendMessage(..) method above)

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
using NetMQSchedulerDemo;
using NUnit.Framework;
namespace NetMQSchedulerDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use NetMQs NetMQScheduler to communicate with the 

            //   server. All Send/Receive MUST be done via the 

            //   NetMQScheduler and TPL Tasks. See the Client class 

            //   for more information on this
            int delay = 3000;
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");
                    using (var client = new Client(ctx, "tcp://127.0.0.1:5556"))
                    {
                        client.Start();
                        //start some theads, each thread will use the 

                        //Clients NetMQScheduler to send/receieve messages 

                        //to/from the server
                        for (int i = 0; i < 2; i++)
                        {
                            Task.Factory.StartNew(async (state) =>
                            {
                                while (true)
                                {
                                    var messageToServer = new NetMQMessage();
                                    messageToServer.AppendEmptyFrame();
                                    messageToServer.Append(state.ToString());
                                    await client.SendMessage(messageToServer);
                                    Thread.Sleep(delay);
                                }
                            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                        }
                        //server loop
                        while (true)
                        {
                            var clientMessage = server.ReceiveMessage();
                            Console.WriteLine("========================");
                            Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                            Console.WriteLine("========================");
                            for (int i = 0; i < clientMessage.FrameCount; i++)
                            {
                                Console.WriteLine("Frame[{0}] = {1}", i,
                                    clientMessage[i].ConvertToString());
                            }
                            if (clientMessage.FrameCount == 3)
                            {
                                var clientAddress = clientMessage[0];
                                var clientOriginalMessage = clientMessage[2].ConvertToString();
                                string response = string.Format("{0} back from server {1}",
                                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                                var messageToClient = new NetMQMessage();
                                messageToClient.Append(clientAddress);
                                messageToClient.AppendEmptyFrame();
                                messageToClient.Append(response);
                                server.SendMessage(messageToClient);
                            }
                        }
                    }
                }
            }
        }
        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

If you run this code you may see something like this:

image

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
-- There are no messages in this forum --