Multithreaded and Interprocess Signaling Using Semaphores in C++
Multithreaded and interprocess signaling using semaphores in C++.
Introduction
Semaphores and Mutex are heavily used for inter-process and multi-threaded signaling. This article demonstrates an Object Oriented wrapper over Unix system calls for creating Mutex, semaphores, and threads. Like the .NET framework's implementation, here also, apart for Mutex, the Semaphore
class can be used for both Shared (system wide) and Local semaphores.
Background
Basic knowledge of semaphores, Mutex, and threads are required.
Using the code
This article provides you classes for creating semaphores and Mutex in an object oriented way. The interfaces are inspired by the .NET framework's implementation of semaphores and Mutex.
The following example shows how to create and use local semaphores for multi threaded signaling. In the program, we are creating two threads and starting them. The callback has a call to ::localSemaphore.WaitOne()
. So, the two threads will call two WAitOne()
. That means each thread waits to acquire a room in the semaphore. But the call to WaitOne()
blocks them from going ahead because the initial count (i.e., the initially available room count) is set to 0. Two Release()
calls or a Release(2)
call is required to help them. The getchar()
function halts the program until the user presses a key. As the user presses any key, the program call the next method, ::localSemaphore.Release(2)
. This calls signals the two threads which are waiting. As the threads can proceed now (they succeed to acquire a room, i.e., the WaitOne()
call returns), their callback routines continue to end. The program waits for each of the threads to complete by calling the Join()
method.
#include <stdlib.h>
#include <iostream>
#include "Thread.h"
#include "Semaphore.h"
using namespace mom;
//declear the callback method
var_t callbackMethod(var_t arg);
//define the 'local semaphore' instance
Semaphore localSemaphore((unsigned int)0, (unsigned int)2);
//initial-count = 0 (no room available to enter), max-size = 2
int main(int argc, char** argv) {
std::cout <<std::endl
<<std::endl;
//create two thread objects
Thread thread1(callbackMethod);
Thread thread2(callbackMethod);
//lets start the threads
thread1.Start((var_t)"1");
thread2.Start((var_t)"2");
std::cout <<"press any key to signal"
<<std::endl;
//two threads were started and they are waiting to enter the semaphore
getchar();
//now providing two rooms that can be occupied
::localSemaphore.Release(2);
//main thread will wait for the threads to complete their routine
thread1.Join();
std::cout <<"thread1 exited"
<<std::endl;
thread2.Join();
std::cout <<"thread2 exited"
<<std::endl;
std::cout <<"press any key to exit"
<<std::endl;
getchar();
return (EXIT_SUCCESS);
}
var_t callbackMethod(var_t arg) {
std::cout <<"Thread "
<<(const char*)arg
<<" callback_method called .. "
<<"waiting for signal to exit.."
<<std::endl;
//try to occupy a room in the semaphore. block the thread until sucess
::localSemaphore.WaitOne();
}
The same semaphore class can also be used to created system wide shared semaphores that can be used by multiple processes for signaling purpose. Let's assume we have two processes and those will share a system wide semaphore to signal the other.
Let's define Process 1:
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
*
*/
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
//lets create a system wide semaphore with sem_id 111
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
//initial-count: 0 (no room available to occupy)
std::cout <<"Process 1 Started... waiting for for signal from Process 2"
<<std::endl;
//wait util a room is available to occupy on the semaphore (sem_id 100)
systemWideSemaphore.WaitOne();
std::cout <<"Signal received from Process2.. exiting Process1"
<<std::endl;
return (EXIT_SUCCESS);
}
And for Process 2, we have:
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
*
*/
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
//lets retrieve the system wide semaphore with sem_id 111
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
/* std::cout <<"Process 2 Started... press any key to signal Process1"
<<std::endl;
getchar();*/
//Process1 is already waiting to occupy a room in the semaphore
//release/provide a room in the semaphore so that Process1 can continue
systemWideSemaphore.Release();
return (EXIT_SUCCESS);
}
Process 1 calls the WaitOne()
method on the semaphore to acquire a room. As the initial count was 0, it waits there until any room is released for it to go ahead. Process 2 does this job. When started, Process 2 retrieves the semaphore with the given key and calls Release()
to provide a room for Process 1 to go ahead. So after the systemWideSemaphore.Release()
method is called in Process 2, Process 1 exits.
Source code
Please find the source code for the classes here:
Semaphore.h:
/*
* File: Semaphore.h
* Author: Souvik Chatterjee
* This class is a C# like object oriented wrapper
* for local(in process) and shared(system wide) semaphore
* This file defines the interface for the Semaphore class
*
* Important: I designed the class inspired
* by the .NET framework's implementation of Semaphore
* In .Net framework, apart from Mutex there is a Semaphore class
* which can be used as both local and shared
* semaphores. I am not sure of .Net internal implementation.
* But this C++ class internally uses two completely
* different implementation UNIX semget C system call
* and UNIX pthread C system call for shared and local semaphores
* respectively.
*/
#ifndef _SEMAPHORE_H
#define _SEMAPHORE_H
namespace mom {
class Semaphore {
public:
//This constructor creates(or retrieves existing)
//system wide semaphore with the given sem_id
//using this call 65535 different system wide semaphores can be created
//this shared semaphore is implemented with UNIX semget C system call
Semaphore(unsigned short sem_id, unsigned int initial_count, unsigned int max_count);
//This constructor creates local(in process) semaphore
//in UNIX local semaphore in a MUTEX.
//this local semaphore internally uses UNIX pthread mutext
Semaphore(unsigned int initial_count, unsigned int max_count);
//waits until succeeds to acquire a room in the semaphore object
void WaitOne();
//releases one room among the acquired rooms in the semaphore objet
void Release();
//releases specified number(release_count) rooms
//among the acquired rooms in the semaphore objet
void Release(int release_count);
//destructor
virtual ~Semaphore();
private:
//internal flag to determine if the created semaphore is local or shared
bool _is_local;
//internal handle for the created semaphore instance
void* _semaphore_instance_ptr;
};
}
#endif /* _SEMAPHORE_H */
Semaphore.cpp:
/*
* File: SemaphoreWrapper.cpp
* Author: Souvik Chatterjee
* This file defines the implementation
* of the Semaphore interface declared in Semaphore.h
* Semaphore class uses __shared_semaphore class
* for shared(system wide) semaphore which is basically an object
* oriented wrapper over UNIX semget.
* On the other hand, it uses __local_semaphore class
* for local(in process) semaphore which is basically an object
* oriented wrapper over UNIX pthread mutex.
* Reference: to know understand the UNIX system calls
* used throughout this implementation please refer to
* Open Group (http://www.unix.org/single_unix_specification/) and
* The Linux Programmer's Guide (http://tldp.org/LDP/lpg/)
*/
#include "Semaphore.h"
#include "Mutex.h"
#include "Thread.h"
#include <iostream>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <errno.h>
namespace mom {
//------------------- System wide shared semaphore
// (class __shared_semaphore)-------------------
class __shared_semaphore {
public:
// creates (or retrieves existing) system wide(shared)
// semaphore with given semaphore id(sem_id)
// initial_count refers to the initially available
// rooms that can be acquired by _wait_one method
// max_count refers to the maximum number of rooms
// that can be acquired by _wait_one_method
__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count);
// waits untils succeeds to acquire a room in the semaphore object
void _wait_one();
// releases specified number(release_count) of rooms
// among the acquired rooms in the semaphore object
// if no rooms are currently occupied, it simply ignores
// the call. you an implement it with a custom exception
// thrown
void _release(unsigned int release_count);
private:
// releases specified number(release_count) of rooms
// among the acquired rooms in the semaphore object
// it can not provided rooms exceeding the <max_count>.
// any such attempt will be simply ignored. you can
// implement this with a custom xception thrown
void _release_internal(unsigned int release_count);
//holds the key of the shared semaphore object
key_t _sem_key;
//holds the maximum count for the semaphore object
unsigned int _max_count;
//holds the semaphore id retrieved from the system
int _sem_id;
};
__shared_semaphore::__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
//set the key
_sem_key = (key_t)sem_id;
//set max count
_max_count = max_count;
//set wait instruction
//set sem id to not set i.e. -1
_sem_id = -1;
//define the semaphore creation mode
// IPC_CREATE: create a new semaphore if already
// there is no sem_id associated with sem_key
// IPC_EXCL: the semget function will fail if there
// is already sem_id exists associated with the sem_key
// S_IRUSR: owner has the read permission on semaphore object
// S_IWUSR: owner has the write permission on semaphore object
// S_IROTH: read permission on semaphore object for others
// S_IWOTH: write permission on semaphore object for others
mode_t sem_mode = IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
//lets try to retrieve the semaphore id for
//the existing semaphore(if any) associated with the sem_key
//it will return -1 if there is no semaphore
//available in the system associated with the given key
_sem_id = semget(_sem_key, 0, 0);
if( _sem_id == -1 ) { //okay no semaphore found in the system for the given key
//now lets create a new semaphore with the sem_key and with sem_mode
_sem_id = semget(_sem_key, 1, sem_mode);
//lets assume it failed due to some reason..
//if you use this code, I will recommend to use
//proper object oriented exception handling here
if(_sem_id == -1) {
if (errno == EEXIST) {
perror("IPC error 1: semget");
}
else {
perror("IPC error 2: semget");
}
exit(1);
}
//this process created the semaphore first
//lets provide <initial_count> number of rooms
_release_internal(initial_count);
}
}
void __shared_semaphore::_wait_one() {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = -1;
sem_instruction.sem_flg = SEM_UNDO;
//execute the semop system call on the semaphore
//with the prepared wait instruction
if(semop(_sem_id, &sem_instruction, 1)!=-1) {
//for proper functionality, this line of code is required
//it sets the semaphore's current value
//in the system which other process can feel
//i am not very much sure why it is required.
//I used it after doing a lots of debugging
//please put a comment in the article
//if you have the detailed information for it
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
void __shared_semaphore::_release(unsigned int release_count) {
if(semctl(_sem_id, 0, GETNCNT, 0) > 0) {
//if atleast one process is waiting for a resource
_release_internal(release_count);
}
else {
//no process is waiting fo the resource..
//so simply ignored the call.. you should throw some
// custom exception from here
}
}
void __shared_semaphore::_release_internal(unsigned int release_count) {
if(semctl(_sem_id, 0, GETVAL, 0) < _max_count) {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = release_count;
sem_instruction.sem_flg = IPC_NOWAIT|SEM_UNDO;
//execute the semop system call on the semaphore
//with the prepared signal instruction
if(semop(_sem_id, &sem_instruction, 1) != -1) {
//for proper functionality, this line of code is required
//it sets the semaphore's current value
//in the system which other process can feel
//i am not very much sure why it is required.
//I used it after doing a lots of debugging
//please put a comment in the article
//if you have the detailed information for it
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
else {
//ignored the call. you should thorw some custo exception
}
}
//----------------------- Local semaphore --------------(class __local_semaphore)--------
class __local_semaphore {
public:
//creates a logical couting semaphore using mutex.
//This semaphore has no scope out side the process
//inwhich its running. so it can be used
//for inter-thread signalling but not interprocess signalling
__local_semaphore(unsigned int initial_count, unsigned int max_count);
void _wait_one();
void _release(unsigned int release_count);
private:
unsigned int _initial_count;
unsigned int _max_count;
Mutex* _wait_handle;
bool _waiting;
};
__local_semaphore::__local_semaphore(unsigned int initial_count,
unsigned int max_count) {
_initial_count = initial_count;
_max_count = max_count;
_wait_handle = new Mutex();
_wait_handle->Lock();
_waiting = false;
}
void __local_semaphore::_wait_one() {
if(_initial_count == _max_count) {
_waiting = true;
_wait_handle->Lock();
}
else if(_initial_count < _max_count) {
_initial_count ++;
}
}
void __local_semaphore::_release(unsigned int release_count) {
_initial_count -= release_count;
if(_waiting) {
_waiting = false;
_wait_handle->Unlock();
}
}
//----------------------- Semphore (wrapper)------------(class Semaphore)----
//create a system wide semaphore with the sem_id provided
Semaphore::Semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
_is_local = false;
__shared_semaphore* shared_semaphore =
new __shared_semaphore(sem_id, initial_count, max_count);
_semaphore_instance_ptr = (void*)shared_semaphore;
}
//create a local semaphore
Semaphore::Semaphore(unsigned int initial_count, unsigned int max_count) {
_is_local = true;
__local_semaphore* local_semaphore =
new __local_semaphore(initial_count, max_count);
_semaphore_instance_ptr = (void*)local_semaphore;
}
//block the caller until it succeeds to occupy a room
void Semaphore::WaitOne() {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
}
//release <release_count> occupied rooms
void Semaphore::Release(int release_count) {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
}
//release an occupied room
void Semaphore::Release() {
Release(1);
}
Semaphore ::~Semaphore() {
if(_is_local) {
__local_semaphore* __semaphore_ptr =
(__local_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
else {
__shared_semaphore* __semaphore_ptr =
(__shared_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
_semaphore_instance_ptr = NULL;
}
//-----------------------------------------------------------------------
}
Mutex.h:
/*
* File: Mutex.h
* Author: Souvik Chatterjee
* This file declares the interface for the Mutex class
* Mutex class is a wrapper over the pthread mutex.
* It provides an C# like object oriented implementation
* of unix pthread mutex
*/
#ifndef _MUTEX_H
#define _MUTEX_H
#include <pthread.h>
class Mutex {
public:
//Mutext::Lock() gains a lock on the MUTEX
void Lock();
//Mutext::Unlock() releases the MUTEX
void Unlock();
private:
//unix pthread instance
pthread_mutex_t _mutex;
};
#endif /* _MUTEX_H */
Mutex.cpp:
/*
* File: Mutex.cpp
* Author: Souvik Chatterjee
* This CPP File Contains the implementation of the header Mutex.h
*/
#include <stdio.h>
#include <ios>
#include <pthread.h>
#include "Mutex.h"
//---------------------------------------
/*
* Mutext::Lock() gains a lock on the MUTEX
*/
void Mutex::Lock() {
//execute pthread mutex lock system call
//with member pthread mutext instance
//pass the reference of the pthread mutex instance
pthread_mutex_lock(&_mutex);
}
//--------------------------------------
/*
* Mutext::Unlock() releases the MUTEX
*/
void Mutex::Unlock() {
//execute pthread mutex unlock system call
//with member pthread mutext instance
//pass the reference of the pthread mutex instance
pthread_mutex_unlock(&_mutex);
}
//--------------------------------------
Thread.h:
/*
* File: Thread.h
* Author: Souvik Chatterjee
*
* Created on August 12, 2009, 2:54 AM
*/
#ifndef _THREAD_H
#define _THREAD_H
#include<pthread.h>
#include <stdio.h>
#include <ios>
namespace mom {
typedef void* var_t;
typedef var_t (*thread_start_t)(var_t);
class Thread {
public:
Thread(thread_start_t thread_start);
void Start(var_t thread_args);
void Join();
static int Sleep(unsigned long millisecs) {
long sec = (long)(millisecs / 1000);
long nsec = (millisecs - (sec*1000))*1000;
timespec delay = {sec, nsec};
int return_val = nanosleep(&delay, (timespec*)NULL);
return return_val;
}
private:
thread_start_t _thread_start;
pthread_t _thread;
};
}
#endif /* _THREAD_H */
Thread.cpp:
/*
* File: Thread.cpp
* Author: Souvik Chatterjee
*
* Created on August 12, 2009, 2:54 AM
*/
#include "Thread.h"
namespace mom {
Thread :: Thread(thread_start_t thread_start) {
_thread_start = thread_start;
}
void Thread :: Start(var_t thread_args) {
pthread_create(&_thread, NULL, _thread_start, thread_args);
}
void Thread :: Join() {
pthread_join(_thread, NULL);
}
}