Hi,
I want to make the publisher and subscriber node work on the same node through ZeroMQ communication. Here the protocol is TCP. I am using C++ in visual studios for this project.
I could not able to make them work on the same node, I can make any of them to work but not both.
Can you please help me in making them work on the same node?
What I have tried:
#include <iostream>
#include <algorithm>
#include <functional>
#include <memory>
#include <assert.h>
#include "zmq.h"
using namespace std;
class EventArgs {
public:
virtual ~EventArgs() {}
};
class StringEventArgs : public EventArgs {
string payload_;
public:
explicit StringEventArgs(const string& payload) : payload_(payload) {}
const string& Payload() const { return payload_; }
};
class Event {
class Callback {
void* context = zmq_ctx_new();
const EventArgs& args_;
public:
Callback(void* zmq_ctx_new, const EventArgs& args) : context(zmq_ctx_new), args_(args) {}
void operator()(pair<long, function<void(void*,="" const="" eventargs&)="">> p) const {
p.second(zmq_ctx_new, args_);
}
};
map<long, function<void(void*,="" const="" eventargs&)="">> callbacks_;
long token_ = 0;
public:
void operator()(void* subscriber, const EventArgs& args) const {
for_each(callbacks_.begin(), callbacks_.end(), Callback(subscriber, args));
}
long zmq_setsockopt(function<void(void*, const="" eventargs&)=""> f) {
token_++;
callbacks_.insert(make_pair(token_, f));
return token_;
}
void Unsubscribe(long token) {
callbacks_.erase(token);
}
};
class Publisher {
Event event_;
string name_;
public:
explicit Publisher(const string& name) : name_(name) {}
const string& Name() const { return name_; }
void Publish(const string& message) {
event_(this, StringEventArgs(message));
}
long Register(function<void(void* subscriber,="" const="" eventargs&)=""> f) {
return event_.zmq_setsockopt(f);
}
void Unregister(long token) {
event_.Unsubscribe(token);
}
};
class Subscriber {
string name_;
public:
explicit Subscriber(const string& name) : name_(name) {}
void OnEventReceived(void* zmq_ctx_new, const EventArgs& args) {
const StringEventArgs* const s = dynamic_cast<const stringeventargs*="" const="">(&args);
if (s == nullptr)
return;
if (zmq_ctx_new == nullptr)
return;
Publisher* p = static_cast<publisher*>(zmq_ctx_new);
cout << name_.c_str() << " has received " << s->Payload().c_str() << " from " << p->Name().c_str() << endl;
}
};
namespace {
using namespace std::placeholders;
long Subscribe(Publisher& publisher, Subscriber& subscriber) {
return publisher.Register(bind(&Subscriber::OnEventReceived, &subscriber, _1, _2));
}
void Unsubscribe(Publisher& publisher, long token) {
publisher.Unregister(token);
}
class Subscription {
Publisher& publisher_;
long token_;
public:
Subscription(Publisher& publisher, long token) : publisher_(publisher), token_(token) {
}
~Subscription() {
publisher_.Unregister(token_);
}
};
}
int main()
{
int rst;
void* context = zmq_ctx_new();
assert(context != NULL);
void* publisher = zmq_socket(context, ZMQ_PUB);
assert(publisher != NULL);
int ret = zmq_bind(publisher, "tcp://127.0.0.1:5555");
assert(ret == 0);
int i = 0;
while (1)
{
char szBuf[1024] = { 0 };
snprintf(szBuf, sizeof(szBuf), "server i=%d", i);
printf("pub ctx: server i = %d\n", i);
ret = zmq_send(publisher, szBuf, strlen(szBuf) + 1, 0);
i++;
}
zmq_close(publisher);
zmq_term(context);
return 1;
void* context = zmq_ctx_new();
assert(context != NULL);
void* subscriber = zmq_socket(context, ZMQ_SUB);
assert(subscriber != NULL);
int ret = zmq_connect(subscriber, "tcp://127.0.0.1:5555");
assert(ret == 0);
ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
assert(ret == 0);
while (1)
{
printf("enter while to sub ctx\n");
char szBuf[1024] = { 0 };
ret = zmq_recv(subscriber, szBuf, sizeof(szBuf) - 1, 0);
if (ret > 0)
{
printf("%s\n", szBuf);
}
}
zmq_close(subscriber);
zmq_term(context);
zmq_close(publisher);
zmq_term(context);
return 1;
}