Hello,
I am attempting to develop a pair of programs (a daemon program and a program that interfaces between the user and the daemon) which uses
send()
and
recv()
calls for inter-process communication. The general flow of the program is as follows:
1, The user program first sends data to an "operator" port on the daemon
2. The daemon receives the data and returns a different port number for the user program to connect to
3. The user program receives this new port, kills the initial socket with the operator, and connects to the new port
4. The user program sends the requested command to the daemon over the new port
5. The daemon then sends an arbitrary number of packets corresponding to the output of the command that the user program requested.
The issue is that when the user program is first run, while the first connection succeeds, the second connection fails, terminating the user program. All subsequent attempts fail at the first connection.
Since I need to make multiple connections and accept multiple connections, I would like to have as much of the initialization code put in a function as possible.
To make it easier for the reader, please note that the
initServer()
and
initClient()
functions in both programs are the server and client initialization code respectively.
Daemon code:
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <syslog.h>
#include <fstream>
#include <sstream>
#include <climits>
#include <thread>
#include <signal.h>
#include <filesystem>
bool* isBusy;
int basePort;
const int bufsize=4096;
const int ms=1000;
std::string getProcName(int procID)
{
char* name = (char*)calloc(1024,sizeof(char));
std::string returnName;
if(name)
{
sprintf(name, "/proc/%d/cmdline",procID);
FILE* f = fopen(name,"r");
if(f)
{
size_t size;
size = fread(name, sizeof(char), 1024, f);
if(size>0)
{
if(name[size-1]=='\n')
{
name[size-1]='\0';
}
fclose(f);
}
}
}
returnName=std::string(name);
free(name);
return returnName;
}
bool fexists(const char *filename)
{
std::ifstream ifile(filename);
return (bool)ifile;
}
bool dexists(const char *directory)
{
struct stat st;
if(stat(directory,&st) == 0)
{
return !(bool)(st.st_mode & S_IFDIR);
}
else
{
return false;
}
}
int initClient(std::string addr, int p)
{
int sock=socket(AF_INET,SOCK_STREAM,0);
int connectRes=INT16_MAX;
sockaddr_in hint;
hint.sin_family=AF_INET;
hint.sin_port=htons(p);
inet_pton(AF_INET,addr.c_str(),&hint.sin_addr);
if(sock==-1)
{
syslog(LOG_ERR,"Unable to create socket on port %d",p);
exit(EXIT_FAILURE);
}
else
{
syslog(LOG_NOTICE,"Successfully initialized socket %d on port %d",sock,p);
}
connectRes=connect(sock,(sockaddr*)&hint,sizeof(hint));
if(connectRes==-1)
{
syslog(LOG_ERR,"Could not connect to %s:%d",addr.c_str(),p);
exit(EXIT_FAILURE);
}
else
{
syslog(LOG_NOTICE,"Successfully connected to %s:%d",addr.c_str(),p);
}
return sock;
}
int initServer(int p)
{
sockaddr_in hint;
sockaddr_in client;
socklen_t clientSize=sizeof(client);
int listening,clientSocket;
char host[NI_MAXHOST]; char service[NI_MAXSERV];
listening=socket(AF_INET, SOCK_STREAM, 0);
if(listening==-1)
{
syslog(LOG_ERR, "Unable to create socket! Quitting...");
exit(EXIT_FAILURE);
}
else
{
syslog(LOG_NOTICE, "Created socket %d for port %d", listening, p);
}
hint.sin_family=AF_INET;
hint.sin_port=htons(p);
inet_pton(AF_INET, "0.0.0.0", &hint.sin_addr);
bind(listening, (sockaddr*)&hint, sizeof(hint));
syslog(LOG_NOTICE, "Initialized listening server on port %d...", p);
listen(listening, SOMAXCONN);
clientSocket=accept(listening, (sockaddr*)&client, &clientSize);
if(clientSocket==-1)
{
syslog(LOG_ERR, "Invalid socket! Quitting...");
exit(EXIT_FAILURE);
}
memset(host,0,NI_MAXHOST);
memset(service,0,NI_MAXSERV);
if(getnameinfo((sockaddr*)&client, sizeof(client), host, NI_MAXHOST, service, NI_MAXSERV, 0)==0)
{
syslog(LOG_NOTICE, "%s connected on port %s",host,service);
}
else
{
inet_ntop(AF_INET, &client.sin_addr, host, NI_MAXHOST);
syslog(LOG_NOTICE, "%s connected on port %d", host, ntohs(client.sin_port));
}
close(listening);
return clientSocket;
}
void writeLog(int winrun_sock,bool verbose, int priority, std::string id, const char* format, ...)
{
std::string header;
va_list args;
va_start(args, format);
char text[256];
vsprintf(text,format,args);
syslog(priority,text);
if(verbose)
{
if(priority==LOG_ERR) {
header="\e[31;1m";
}
else if(priority==LOG_WARNING) {
header="\e[33;1m";
}
else if(priority==LOG_INFO) {
header="\e[36;1m";
}
else {
header="\e[32;1m";
}
send(winrun_sock,(header+text+"\e[00m").c_str(),((header+text+"\e[00m").length()+1),0);
}
va_end(args);
}
int waitForTimeout(std::string id, int svr_sock,int winrun_sock, unsigned long long int secs, std::string action, bool v)
{
int rv;
fd_set readfds, masterfds;
timeval timeout;
timeout.tv_sec=secs;
timeout.tv_usec=0;
FD_ZERO(&masterfds);
FD_SET(svr_sock,&masterfds);
memcpy(&readfds,&masterfds,sizeof(fd_set));
rv=select(svr_sock+1,&readfds,NULL,NULL,&timeout);
if(rv==SO_ERROR)
{
writeLog(winrun_sock,v,LOG_ERR,id,"Socket error during select() on PID %s",id.c_str());
}
else if(rv==0)
{
writeLog(winrun_sock,v,LOG_ERR,id,"Timeout (>%d seconds) while waiting for %s for PID %s",secs,action.c_str(),id.c_str());
}
return rv;
}
void sendData(std::string cmdID, std::string commandstr, std::string bCode, int svr_sock, int winrun_sock, int p, unsigned long long int t, bool v)
{
int sendRes, bytesReceived, timeoutRes;
char dataBuffer[bufsize];
std::string recvStr;
do
{
sendRes=send(svr_sock,(bCode+cmdID+commandstr).c_str(),((bCode+cmdID+commandstr).length()+1),0);
if(sendRes==-1)
{
writeLog(winrun_sock,v,LOG_WARNING,cmdID,"Could not send command to server! Sleeping for 100ms...");
usleep(100*ms);
}
}while(sendRes==-1);
while(true)
{
if(kill(stoi(cmdID),0)==0&&std::string(getProcName(stoi(cmdID))).find("winrun")!=std::string::npos)
{
memset(dataBuffer,0,bufsize);
timeoutRes=waitForTimeout(cmdID,svr_sock,winrun_sock,t,"continue signal",v);
if(timeoutRes==SO_ERROR || timeoutRes==0)
{
return;
}
else
{
bytesReceived=recv(winrun_sock,dataBuffer,bufsize,0);
recvStr=std::string(dataBuffer,bytesReceived);
if(recvStr.find(bCode)==std::string::npos)
{
send(winrun_sock,recvStr.c_str(),recvStr.length()+1,0);
bytesReceived=recv(winrun_sock,dataBuffer,bufsize,0);
recvStr=std::string(dataBuffer,bytesReceived);
send(svr_sock,recvStr.c_str(),recvStr.length()+1,0);
}
else
{
send(winrun_sock,bCode.c_str(),bCode.length()+1,0);
return;
}
}
}
else
{
syslog(LOG_NOTICE,"winrun not found for PID %s, stopping command output",cmdID.c_str());
return;
}
}
}
int winrund_check(std::string IP,int svr_port,int operator_port,int maxThreads)
{
int checkSock,winrunSock;
int bytesReceived,timeoutRes,pid;
int threadSock[maxThreads];
char buf[bufsize];
char dataBuffer[bufsize];
checkSock=initClient(IP,svr_port);
while(1)
{
winrunSock=initServer(operator_port);
memset(dataBuffer,0,bufsize);
pid=stoi(std::string(dataBuffer,recv(winrunSock,dataBuffer,bufsize,0)));
for(int i=1; i<=maxThreads; i++)
{
if(!isBusy[i])
{
isBusy[i]=true;
syslog(LOG_INFO,"Assigning %d to thread %d (port %d)",pid,i,(svr_port+i));
send(winrunSock,std::to_string(operator_port+i).c_str(),std::to_string(operator_port+i).length()+1,0);
close(winrunSock);
break;
}
if(i==maxThreads)
{
i=0;
syslog(LOG_WARNING,"All child threads busy, waiting 100ms...");
sleep(100*ms);
}
}
}
}
int winrund_child(std::string IP,int svr_port,int operator_port)
{
bool verbose=false;
int childSock, winrunSock;
int id;
unsigned long long int timeout;
char buf[bufsize];
std::string command, breakCode;
std::ifstream readstream;
std::ofstream writestream;
childSock=initClient(IP,svr_port);
breakCode=std::string(buf,recv(childSock,buf,bufsize,0));
while(1)
{
winrunSock=initServer(operator_port);
send(winrunSock,breakCode.c_str(),(breakCode.length()+1),0);
memset(buf,0,bufsize);
id=stoi(std::string(buf,recv(winrunSock,buf,bufsize,0)));
memset(buf,0,bufsize);
command=std::string(buf,recv(winrunSock,buf,bufsize,0));
memset(buf,0,bufsize);
timeout=stoull(std::string(buf,recv(winrunSock,buf,bufsize,0)));
memset(buf,0,bufsize);
std::istringstream(std::string(buf,recv(winrunSock,buf,bufsize,0)))>>std::boolalpha>>verbose;
syslog(LOG_INFO,"Sending command \"%s\" for PID %d over port %d",command.c_str(),id,svr_port);
sendData(std::to_string(id),("\""+command+"\""),breakCode,childSock,winrunSock,svr_port,timeout,verbose);
syslog(LOG_INFO,"\"%s\" has completed for PID %d",command.c_str(),id);
isBusy[svr_port-basePort]=false;
close(winrunSock);
}
}
int main(void)
{
bool* verbose;
pid_t pid, sid;
int ctr, maxThreads,socket,operatorPort;
int* id;
unsigned long long int* timeout;
char dataBuffer[bufsize]="";
std::string ip="";
std::string configpath="/etc/winrund/config";
std::string idstr="";
std::string recvStr="";
std::string line="";
std::string timeoutstr="";
std::string* writeBuffer;
std::string* command;
std::ifstream configReader;
std::ifstream outReader;
std::ofstream outWriter;
pid = fork();
if(pid > 0)
{
exit(EXIT_SUCCESS);
}
else if(pid < 0)
{
exit(EXIT_FAILURE);
}
umask(0);
openlog("winrund", LOG_NOWAIT | LOG_PID, LOG_USER);
syslog(LOG_NOTICE, "Successfully started winrund");
sid = setsid();
if(sid < 0)
{
syslog(LOG_ERR, "Could not generate session ID for child process");
exit(EXIT_FAILURE);
}
if((chdir("/")) < 0)
{
syslog(LOG_ERR, "Could not change working directory to \"/\"");
exit(EXIT_FAILURE);
}
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
try
{
configReader.open(configpath);
while(!configReader.eof())
{
getline(configReader,line);
if(line.substr(0,1)!="#") {
if(line.find("ip")==0)
{
ip=line.substr(line.find("=")+1,line.length()-line.find("="));
}
else if(line.find("threads")==0)
{
maxThreads=stoi(line.substr(line.find("=")+1,line.length()-line.find("=")));
verbose=new bool[maxThreads];
id=new int[maxThreads];
timeout=new unsigned long long int[maxThreads];
writeBuffer=new std::string[maxThreads];
command=new std::string[maxThreads];
isBusy=new bool[maxThreads];
for(int i=0; i<maxThreads; i++)
{
verbose[i]=false;
isBusy[i]=false;
}
}
else if(line.find("baseport")==0)
{
basePort=stoi(line.substr(line.find("=")+1,line.length()-line.find("=")));
}
else if(line.find("operatorport")==0)
{
operatorPort=stoi(line.substr(line.find("=")+1,line.length()-line.find("=")));
}
}
}
}
catch(...)
{
syslog(LOG_ERR,"Error while attempting to read config file at \"%s\"",configpath.c_str());
exit(EXIT_FAILURE);
}
configReader.close();
std::thread winrund_check_thread(winrund_check,ip,basePort,operatorPort,maxThreads);
winrund_check_thread.detach();
for(int i=1; i<=maxThreads; i++)
{
std::thread winrund_child_thread(winrund_child,ip,(basePort+i),(operatorPort+i));
winrund_child_thread.detach();
}
while(true)
{
usleep(1000*ms);
}
syslog(LOG_NOTICE, "Stopping winrund");
closelog();
exit(EXIT_SUCCESS);
}
User program code:
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <fstream>
#include <time.h>
#include <filesystem>
#include <climits>
#include <signal.h>
#include <netdb.h>
#include <arpa/inet.h>
const int ms=1000;
const int bufsize=4096;
std::string temppath="/dev/shm/winrund/";
pid_t pid=getpid();
bool dexists(const char *directory)
{
struct stat st;
if(stat(directory,&st) == 0)
{
return true;
}
else
{
return false;
}
}
bool fexists(const char *filename)
{
std::ifstream ifile(filename);
return (bool)ifile;
}
int initClient(std::string addr, int p)
{
int sock=socket(AF_INET,SOCK_STREAM,0);
int connectRes=INT16_MAX;
sockaddr_in hint;
hint.sin_family=AF_INET;
hint.sin_port=htons(p);
inet_pton(AF_INET,addr.c_str(),&hint.sin_addr);
if(sock==-1)
{
fprintf(stderr,"\e[31;1mUnable to create socket on port %d\n\e[00m",p);
exit(EXIT_FAILURE);
}
else
{
fprintf(stdout,"\e[32;1mSuccessfully initialized socket %d on port %d\n\e[00m",sock,p);
}
connectRes=connect(sock,(sockaddr*)&hint,sizeof(hint));
if(connectRes==-1)
{
fprintf(stderr,"\e[31;1mCould not connect to %s:%d\n\e[00m",addr.c_str(),p);
exit(EXIT_FAILURE);
}
else
{
fprintf(stdout,"\e[32;1mSuccessfully connected to %s:%d\n\e[00m",addr.c_str(),p);
}
return sock;
}
int waitForTimeout(int s, unsigned long long int secs, std::string action)
{
int rv;
fd_set readfds, masterfds;
timeval timeout;
timeout.tv_sec=secs;
timeout.tv_usec=0;
FD_ZERO(&masterfds);
FD_SET(s,&masterfds);
memcpy(&readfds,&masterfds,sizeof(fd_set));
rv=select(s+1,&readfds,NULL,NULL,&timeout);
if(rv==SO_ERROR)
{
fprintf(stderr,"\e[31;1mSocket error during select()\n\e[00m");
exit(EXIT_FAILURE);
}
else if(rv==0)
{
fprintf(stderr,"\e[31;1mTimeout (>%d seconds) while waiting for %s\e[00m",secs,action.c_str());
exit(EXIT_FAILURE);
}
return rv;
}
int main(int argc, char** argv)
{
bool verbose=false;
int timeout=5;
int socket=0;
int port=0;
int operatorPort=0;
int timeoutRes=0;
int bytesReceived=0;
char dataBuffer[bufsize]="";
std::string outputpath=(temppath+"_"+std::to_string(pid)).c_str();
std::string line;
std::string command;
std::string breakCode;
std::string configpath="/etc/winrund/config";
std::ifstream readstream;
std::ofstream writestream;
if(dexists(temppath.c_str()))
{
system(("systemctl status winrund | grep 'Active:' > "+temppath+"status").c_str());
readstream.open((temppath+"status").c_str());
getline(readstream,line);
readstream.close();
remove((temppath+"status").c_str());
if(line.find("active (running)")==std::string::npos)
{
fprintf(stderr,"\e[31;1mwinrund not active\n\e[00m");
return -4;
}
}
else
{
fprintf(stderr,"\e[31;1m%s doesn't exist\n\e[00m",temppath.c_str());
return -4;
}
try
{
readstream.open(configpath);
while(!readstream.eof())
{
getline(readstream,line);
if(line.substr(0,1)!="#") {
if(line.find("operatorport")==0)
{
operatorPort=stoi(line.substr(line.find("=")+1,line.length()-line.find("=")));
}
}
}
}
catch(...)
{
fprintf(stderr,"\e[31;1Error while attempting to read config file at \"%s\"\n\e[00m",configpath.c_str());
return -5;
}
if(!argv[1])
{
fprintf(stderr,"\e[31;1mNo command entered\n\e[00m");
return -1;
}
else
{
for(int i=1; i<argc; i++)
{
if(!argv[2])
{
command=argv[1];
break;
}
if(std::string(argv[i])=="-t")
{
try
{
if(argv[i+1])
{
timeout=std::stoull(argv[i+1]);
i++;
}
else
{
fprintf(stderr,"\e[31;1mNo timeout value entered\n\e[00m");
return -2;
}
}
catch(...)
{
fprintf(stderr,"\e[31;1mInvalid timeout value \"%s\". Must be between 0 and %llu\n\e[00m",argv[i],ULLONG_MAX);
return -3;
}
}
else if(std::string(argv[i])=="-v")
{
verbose=true;
}
else
{
command=argv[i];
}
}
if(verbose)
{
fprintf(stdout,"Verbose output color codes: \e[31;1mError\e[00m, \e[33;1mWarning\e[00m, \e[36;1mInfo\e[00m, \e[32;1mNotice\e[00m\n\n");
fprintf(stdout,"\e[36;1mPID: %d\n\n\e[00m",pid);
}
if(verbose)
{
fprintf(stdout,"\e[36;1mOpening connection for initial request\e[00m\n");
}
socket=initClient("127.0.0.1",operatorPort);
send(socket,std::to_string(pid).c_str(),(std::to_string(pid).length()+1),0);
timeoutRes=waitForTimeout(socket,timeout,"initial request");
port=stoi(std::string(dataBuffer,recv(socket,dataBuffer,bufsize,0)));
if(verbose)
{
fprintf(stdout,"\e[36;1mClosing connection for initial request\e[00m\n");
}
close(socket);
if(verbose)
{
fprintf(stdout,"\e[36;1mOpening connection for command execution over port %d\e[00m\n",port);
}
socket=initClient("127.0.0.1",port);
memset(dataBuffer,0,bufsize);
breakCode=std::string(dataBuffer,recv(socket,dataBuffer,bufsize,0));
if(verbose)
{
fprintf(stdout,"\e[32;1mSet break code to %d\e[00m\n",breakCode);
}
fprintf(stdout,"\e[36;1mSending PID (%s), command (%s), timeout (%s), and verbosity (%s)\n",std::to_string(pid).c_str(),command.c_str(),std::to_string(timeout).c_str(),std::to_string(verbose).c_str());
send(socket,std::to_string(pid).c_str(),(std::to_string(pid).length()+1),0);
send(socket,command.c_str(),(command.length()+1),0);
send(socket,std::to_string(timeout).c_str(),(std::to_string(timeout).length()+1),0);
send(socket,std::to_string(verbose).c_str(),(std::to_string(verbose).length()+1),0);
while(true)
{
memset(dataBuffer,0,bufsize);
timeoutRes=waitForTimeout(socket,timeout,"output");
line=std::string(dataBuffer,recv(socket,dataBuffer,bufsize,0));
if(line!=breakCode)
{
fprintf(stdout,"%s\n",line.c_str());
send(socket,std::string(pid+"-1").c_str(),(std::string(pid+"1").size()+1),0);
}
else
{
if(verbose)
{
fprintf(stdout,"\e[36;1mClosing connection for command execution\e[00m\n");
}
close(socket);
return 0;
}
}
}
return 0;
}
What I have tried:
1. Added
shutdown()
after
close()
and also tried just replacing
close()
with
shutdown()
2. Switched between using
0.0.0.0
and
127.0.0.1
for daemon server initialization
3. A lot of Internet research, which from what I can tell, seems to imply that you can just functionalize all of the client/server initialization code.