Click here to Skip to main content
15,996,252 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi,

I want to make the publisher and subscriber node work on the same node through ZeroMQ communication. Here the protocol is TCP. I am using C++ in visual studios for this project.

I could not able to make them work on the same node, I can make any of them to work but not both.

Can you please help me in making them work on the same node?

What I have tried:

C++
#include <iostream>
#include <algorithm>
#include <functional>
#include <memory>
#include <assert.h>
#include "zmq.h"

using namespace std;

class EventArgs {
public:
	virtual ~EventArgs() {}
};

class StringEventArgs : public EventArgs {
	string payload_;
public:
	explicit StringEventArgs(const string& payload) : payload_(payload) {}
	const string& Payload() const { return payload_; }
};

class Event {
	class Callback {
		void* context = zmq_ctx_new();
		const EventArgs& args_;

	public:
		Callback(void* zmq_ctx_new, const EventArgs& args) : context(zmq_ctx_new), args_(args) {}
		void operator()(pair<long, function<void(void*,="" const="" eventargs&)="">> p) const {
			p.second(zmq_ctx_new, args_);
		}
	};
	map<long, function<void(void*,="" const="" eventargs&)="">> callbacks_;
	long token_ = 0;

public:
	void operator()(void* subscriber, const EventArgs& args) const {
		for_each(callbacks_.begin(), callbacks_.end(), Callback(subscriber, args));
	}
	long zmq_setsockopt(function<void(void*, const="" eventargs&)=""> f) {
		token_++;
		callbacks_.insert(make_pair(token_, f));
		return token_;
	}
	void Unsubscribe(long token) {
		callbacks_.erase(token);
	}
};

class Publisher {
	Event event_;
	string name_;

public:
	explicit Publisher(const string& name) : name_(name) {}
	const string& Name() const { return name_; }
	void Publish(const string& message) {
		event_(this, StringEventArgs(message));
	}
	long Register(function<void(void* subscriber,="" const="" eventargs&)=""> f) {
		return event_.zmq_setsockopt(f);
	}
	void Unregister(long token) {
		event_.Unsubscribe(token);
	}
};

class Subscriber {
	string name_;

public:
	explicit Subscriber(const string& name) : name_(name) {}
	void OnEventReceived(void* zmq_ctx_new, const EventArgs& args) {
		const StringEventArgs* const s = dynamic_cast<const stringeventargs*="" const="">(&args);
		if (s == nullptr)
			return;
		if (zmq_ctx_new == nullptr)
			return;
		Publisher* p = static_cast<publisher*>(zmq_ctx_new);
		cout << name_.c_str() << " has received " << s->Payload().c_str() << " from " << p->Name().c_str() << endl;
	}
};

namespace {
	using namespace std::placeholders;
	long Subscribe(Publisher& publisher, Subscriber& subscriber) {
		return publisher.Register(bind(&Subscriber::OnEventReceived, &subscriber, _1, _2));
	}
	void Unsubscribe(Publisher& publisher, long token) {
		publisher.Unregister(token);
	}

	class Subscription {
		Publisher& publisher_;
		long token_;

	public:
		Subscription(Publisher& publisher, long token) : publisher_(publisher), token_(token) {
		}
		~Subscription() {
			publisher_.Unregister(token_);
		}
	};
}

int main()
{
	int rst;
	void* context = zmq_ctx_new();
	assert(context != NULL);

	void* publisher = zmq_socket(context, ZMQ_PUB);
	assert(publisher != NULL);

	int ret = zmq_bind(publisher, "tcp://127.0.0.1:5555");
	assert(ret == 0);
	int i = 0;
	while (1)
	{

		char szBuf[1024] = { 0 };
		snprintf(szBuf, sizeof(szBuf), "server i=%d", i);
		printf("pub ctx: server i = %d\n", i);
		ret = zmq_send(publisher, szBuf, strlen(szBuf) + 1, 0);
		i++;
	}
	zmq_close(publisher);
	zmq_term(context);
	return 1;
	

	void* context = zmq_ctx_new();
	assert(context != NULL);

	void* subscriber = zmq_socket(context, ZMQ_SUB);
	assert(subscriber != NULL);

	int ret = zmq_connect(subscriber, "tcp://127.0.0.1:5555");
	assert(ret == 0);
	ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
	assert(ret == 0);
	while (1)
	{
		printf("enter while to sub ctx\n");
		char szBuf[1024] = { 0 };
		ret = zmq_recv(subscriber, szBuf, sizeof(szBuf) - 1, 0);
		if (ret > 0)
		{
			printf("%s\n", szBuf);
		}
	}
	

	zmq_close(subscriber);
	zmq_term(context);
	

	zmq_close(publisher);
	zmq_term(context);
	return 1;
}
Posted
Updated 5-Jul-21 4:58am
v5
Comments
11917640 Member 5-Jul-21 6:57am    
Do you mean: on the same computer? Try address tcp://*:5555 for publisher and tcp://localhost:5555 for subscriber.
Member 14519564 5-Jul-21 10:55am    
Ok, thanks i will check and let you know

1 solution

Both your publisher and subscriber bind to the same address/port. They have to use different address/port combinations, which is what the comment above is getting at. Although the comment uses 5555 both times, localhost is 127.0.0.1, so it's a different address/port combination.

EDIT: See the comments below. The problem was that the subscriber was trying to connect to localhost:5555, but the publisher had not bound to that address in the original code.
 
Share this answer
 
v2
Comments
Member 14519564 5-Jul-21 10:55am    
Ok thanks, i will try and let you know
11917640 Member 6-Jul-21 4:46am    
For clarity: publisher address tcp://*:5555 means: all available NICs (including localhost), publisher is bound to port 5555. Subscriber address tcp://localhost:5555 means: connect to localhost, port 5555.
Greg Utas 6-Jul-21 7:35am    
I didn't catch that. But if it causes the publisher to bind to localhost:5555, how can the subscriber also bind to it with the same protocol? I didn't think that was possible.
11917640 Member 6-Jul-21 8:00am    
Publisher is TCP server, subscriber is client. Subscriber doesn't bind to the port.
Greg Utas 6-Jul-21 8:29am    
Duh. It's been a while, and I'd forgotten how connect works!

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900