Last time, we looked at a few things, namely:
- Options
- Identity
- SendMore
This time, we will talk about how to handle using multiple sockets.
Where is the Code?
As always, before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:
Handling Multiple Sockets, And Why Would You Need To?
So why would you want to handle multiple sockets anyway? Well, there are a variety of reasons, such as:
- You may have multiple sockets within one process that rely on each other, and the timings are such that you need to know that the socket(s) are ready before it/they can receive anything
- You may have a Request, as well as a Publisher socket in one process
To be honest, there are times you may end up with more than one socket per process. And there may be occasions when you only want to use the socket(s) when they are deemed ready.
ZeroMQ actually has a concept of a “Poller” that can be used to determine if a socket is deemed ready to use.
NetMQ has an implementation of the “Poller”, and it can be used to do the following things:
- Monitor a single socket, for readiness
- Monitor an
IEnumerable<NetMQSocket>
for readiness
- Allow NetMQSocket(s) to be added dynamically and still report on the readiness of the new sockets
- Allow NetMQSocket(s) to be removed dynamically
- Raise an event on the socket instance when it is ready
A good way to look into the NetMQ Poller class is via some tests. I am not going to test everything in this post, but if you want more, NetMQ itself comes with some very very good tests for the Poller. Which is in fact where I lifted these test cases from.
Some Examples
As I just stated, I am not the author of these tests, I have taken a subset of the NetMQ Poller test suite, that I think may be pertinent to a introductory discussion around the Poller class.
NOTE: This series of posts is meant as a beginners guide, and advanced ZeroMQ users would likely not get too much from this series of posts.
Single Socket Poll Test
These test cases use the kind of familiar (hopefully by now) Request/Response socket arrangement. We will use the Poller to alert us (via the xxxxSocket.ReceiveReady
event that the Poller raises for us) that the ResponseSocket
is Ready.
Here is the code for this:
[Test]
public void SingleSocketPollTest()
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var rep = contex.CreateResponseSocket())
{
rep.Bind("tcp://127.0.0.1:5002");
using (var req = contex.CreateRequestSocket())
using (Poller poller = new Poller())
{
req.Connect("tcp://127.0.0.1:5002");
rep.ReceiveReady += (s, a) =>
{
bool more;
string m = a.Socket.ReceiveString(out more);
Assert.False(more);
Assert.AreEqual("Hello", m);
a.Socket.Send("World");
};
poller.AddSocket(rep);
Task pollerTask = Task.Factory.StartNew(poller.Start);
req.Send("Hello");
bool more2;
string m1 = req.ReceiveString(out more2);
Assert.IsFalse(more2);
Assert.AreEqual("World", m1);
poller.Stop();
Thread.Sleep(100);
Assert.IsTrue(pollerTask.IsCompleted);
}
}
}
}
Add Socket During Work Test
This example shows how we can add extra socket(s) to the Poller at runtime, and the Poller will still raise the xxxxSocket.ReceiveReady
event for us.
[Test]
public void AddSocketDuringWorkTest()
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var router = contex.CreateRouterSocket())
using (var router2 = contex.CreateRouterSocket())
{
router.Bind("tcp://127.0.0.1:5002");
router2.Bind("tcp://127.0.0.1:5003");
using (var dealer = contex.CreateDealerSocket())
using (var dealer2 = contex.CreateDealerSocket())
using (Poller poller = new Poller())
{
dealer.Connect("tcp://127.0.0.1:5002");
dealer2.Connect("tcp://127.0.0.1:5003");
bool router1arrived = false;
bool router2arrived = false;
bool more;
router2.ReceiveReady += (s, a) =>
{
router2.Receive(out more);
router2.Receive(out more);
router2arrived = true;
};
router.ReceiveReady += (s, a) =>
{
router1arrived = true;
router.Receive(out more);
router.Receive(out more);
poller.AddSocket(router2);
};
poller.AddSocket(router);
Task task = Task.Factory.StartNew(poller.Start);
dealer.Send("1");
Thread.Sleep(300);
dealer2.Send("2");
Thread.Sleep(300);
poller.Stop(true);
task.Wait();
Assert.IsTrue(router1arrived);
Assert.IsTrue(router2arrived);
}
}
}
}
Add Socket After Removing Test
This example builds on the last one where we add a new socket to the Poller after removing another socket from the Poller:
[Test]
public void AddSocketAfterRemovingTest()
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var router = contex.CreateRouterSocket())
using (var router2 = contex.CreateRouterSocket())
using (var router3 = contex.CreateRouterSocket())
{
router.Bind("tcp://127.0.0.1:5002");
router2.Bind("tcp://127.0.0.1:5003");
router3.Bind("tcp://127.0.0.1:5004");
using (var dealer = contex.CreateDealerSocket())
using (var dealer2 = contex.CreateDealerSocket())
using (var dealer3 = contex.CreateDealerSocket())
using (Poller poller = new Poller())
{
dealer.Connect("tcp://127.0.0.1:5002");
dealer2.Connect("tcp://127.0.0.1:5003");
dealer3.Connect("tcp://127.0.0.1:5004");
bool router1arrived = false;
bool router2arrived = false;
bool router3arrived = false;
bool more;
router.ReceiveReady += (s, a) =>
{
router1arrived = true;
router.Receive(out more);
router.Receive(out more);
poller.RemoveSocket(router);
};
poller.AddSocket(router);
router3.ReceiveReady += (s, a) =>
{
router3.Receive(out more);
router3.Receive(out more);
router3arrived = true;
};
router2.ReceiveReady += (s, a) =>
{
router2arrived = true;
router2.Receive(out more);
router2.Receive(out more);
poller.AddSocket(router3);
};
poller.AddSocket(router2);
Task task = Task.Factory.StartNew(poller.Start);
dealer.Send("1");
Thread.Sleep(300);
dealer2.Send("2");
Thread.Sleep(300);
dealer3.Send("3");
Thread.Sleep(300);
poller.Stop(true);
task.Wait();
Assert.IsTrue(router1arrived);
Assert.IsTrue(router2arrived);
Assert.IsTrue(router3arrived);
}
}
}
}
Add 2 Sockets After Removing Test
And in this one, we add a few sockets to the Poller after removing from the Poller:
[Test]
public void AddTwoSocketAfterRemovingTest()
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var router = contex.CreateRouterSocket())
using (var router2 = contex.CreateRouterSocket())
using (var router3 = contex.CreateRouterSocket())
using (var router4 = contex.CreateRouterSocket())
{
router.Bind("tcp://127.0.0.1:5002");
router2.Bind("tcp://127.0.0.1:5003");
router3.Bind("tcp://127.0.0.1:5004");
router4.Bind("tcp://127.0.0.1:5005");
using (var dealer = contex.CreateDealerSocket())
using (var dealer2 = contex.CreateDealerSocket())
using (var dealer3 = contex.CreateDealerSocket())
using (var dealer4 = contex.CreateDealerSocket())
using (Poller poller = new Poller())
{
dealer.Connect("tcp://127.0.0.1:5002");
dealer2.Connect("tcp://127.0.0.1:5003");
dealer3.Connect("tcp://127.0.0.1:5004");
dealer4.Connect("tcp://127.0.0.1:5005");
int router1arrived = 0;
int router2arrived = 0;
bool router3arrived = false;
bool router4arrived = false;
bool more;
router.ReceiveReady += (s, a) =>
{
router1arrived++;
router.Receive(out more);
router.Receive(out more);
poller.RemoveSocket(router);
};
poller.AddSocket(router);
router3.ReceiveReady += (s, a) =>
{
router3.Receive(out more);
router3.Receive(out more);
router3arrived = true;
};
router4.ReceiveReady += (s, a) =>
{
router4.Receive(out more);
router4.Receive(out more);
router4arrived = true;
};
router2.ReceiveReady += (s, a) =>
{
router2arrived++;
router2.Receive(out more);
router2.Receive(out more);
if (router2arrived == 1)
{
poller.AddSocket(router3);
poller.AddSocket(router4);
}
};
poller.AddSocket(router2);
Task task = Task.Factory.StartNew(poller.Start);
dealer.Send("1");
Thread.Sleep(300);
dealer2.Send("2");
Thread.Sleep(300);
dealer3.Send("3");
dealer4.Send("4");
dealer2.Send("2");
dealer.Send("1");
Thread.Sleep(300);
poller.Stop(true);
task.Wait();
router.Receive(true, out more);
Assert.IsTrue(more);
router.Receive(true, out more);
Assert.IsFalse(more);
Assert.AreEqual(1, router1arrived);
Assert.AreEqual(2, router2arrived);
Assert.IsTrue(router3arrived);
Assert.IsTrue(router4arrived);
}
}
}
}
Cancel Socket Test
This final example shows 3 RouterSockets
connected to 3 DealerSockets
respectively (we will talk about DealerSocket
(s) in a later post, for now you can think of them as typically being used for asynchronous workers). We then add all the routers to the Poller. Within the 1st RouterSocket.ReceiveReady
, we remove the RouterSocket
from the Poller, so it should not receive any more messages back from its respective DealerSocket
. Here is the code for this test:
[Test]
public void CancelSocketTest()
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var router = contex.CreateRouterSocket())
using (var router2 = contex.CreateRouterSocket())
using (var router3 = contex.CreateRouterSocket())
{
router.Bind("tcp://127.0.0.1:5002");
router2.Bind("tcp://127.0.0.1:5003");
router3.Bind("tcp://127.0.0.1:5004");
using (var dealer = contex.CreateDealerSocket())
using (var dealer2 = contex.CreateDealerSocket())
using (var dealer3 = contex.CreateDealerSocket())
using (Poller poller = new Poller())
{
dealer.Connect("tcp://127.0.0.1:5002");
dealer2.Connect("tcp://127.0.0.1:5003");
dealer3.Connect("tcp://127.0.0.1:5004");
bool first = true;
router2.ReceiveReady += (s, a) =>
{
bool more;
byte[] identity = a.Socket.Receive(out more);
a.Socket.Receive(out more);
a.Socket.SendMore(identity);
a.Socket.Send("2");
};
poller.AddSocket(router2);
router.ReceiveReady += (s, a) =>
{
if (!first)
{
Assert.Fail("This should happen because we cancelled the socket");
}
first = false;
bool more;
a.Socket.Receive(out more);
string m = a.Socket.ReceiveString(out more);
Assert.False(more);
Assert.AreEqual("Hello", m);
poller.RemoveSocket(a.Socket);
};
poller.AddSocket(router);
router3.ReceiveReady += (s, a) =>
{
bool more;
byte[] identity = a.Socket.Receive(out more);
a.Socket.Receive(out more);
a.Socket.SendMore(identity).Send("3");
};
poller.AddSocket(router3);
Task pollerTask = Task.Factory.StartNew(poller.Start);
dealer.Send("Hello");
dealer.Send("Hello2");
Thread.Sleep(100);
dealer.Send("Hello3");
Thread.Sleep(500);
bool more2;
dealer2.Send("1");
string msg = dealer2.ReceiveString(out more2);
Assert.AreEqual("2", msg);
dealer3.Send("1");
msg = dealer3.ReceiveString(out more2);
Assert.AreEqual("3", msg);
Thread.Sleep(300);
poller.Stop();
Thread.Sleep(100);
Assert.IsTrue(pollerTask.IsCompleted);
}
}
}
}
And that is about all I wanted to talk about this time. I hope you can see how you could make use of the Poller in your own socket topologies, and why it is a useful tool.