Click here to Skip to main content
15,868,016 members
Articles / Programming Languages / C

Asynchronous Multicast Callbacks in C

Rate me:
Please Sign up or sign in to vote.
5.00/5 (12 votes)
29 Jan 2019CPOL13 min read 21.6K   535   26   2
Simplify passing data between threads with this portable C language callback library.

Introduction

Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my function call and event data from one thread to another safely. A portable and easy to use framework. No more monster switch statements inside a thread loop that typecast OS message queue void* values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal.

On systems that use event loops (aka message loops), a message queue and switch statement are sometimes employed to handle incoming messages. The loop waits for messages and dispatches a function call with data into the program. This solution aims to simplify and standardize the event loop in order to generalize function dispatching and the movement of data between threads.

The callback solution presented here provides the following features:

  • Asynchronous callbacks – support asynchronous callbacks to and from any thread
  • Thread targeting – specify the destination thread for the asynchronous callback
  • Callbacks – invoke any C or C++ free/static function with a matching signature
  • Type safe – user defined, type safe callback function data argument
  • Multicast callbacks – store multiple callbacks within an array for sequential invocation
  • Thread-safe – suitable for use on a multi-threaded system
  • Compact – small, easy to maintain code base consuming minimal code space
  • Portable – portable to an embedded or PC-based platform
  • Any compiler – support for any C language compiler
  • Any OS - easy porting to any operating system
  • Elegant syntax – intuitive and easy to use

The asynchronous callback paradigm significantly eases multithreaded application development by placing the callback function pointer and callback data onto the thread of control that you specify. Exposing a callback interface for a single module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.

This article proposes an inter-thread communication mechanism utilizing asynchronous multicast callbacks. The attached source code implements all features above, as I'll demonstrate.

Callbacks Background

The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.

Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.

If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.

One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.

Using the Code

I'll first present how to use the code, and then get into the implementation details.

A publisher uses the CB_DECLARE macro to expose a callback interface to potential subscribers, typically within a header file. The first argument is the callback name. The second argument is the callback function argument type. In the example below, int* is the callback function argument.

C++
CB_DECLARE(TestCb, int*)

The publisher uses the CB_DEFINE macro within a source file to complete the callback definition. The first argument is the callback name. The second argument is the callback function argument type. The third argument is the size of the data pointed to by the callback function argument. The last argument is the maximum number of subscribers that can register for callback notifications.

C++
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)

To subscribe to a callback, create a function (static class member or global) as shown. I’ll explain why the function signature argument requires a (int*, void*) function signature shortly.

C++
void TestCallback1(int* val, void* userData)
{
    printf("TestCallback1 %d", *val);
}

The subscriber registers to receive callbacks using the CB_Register() function macro. The first argument is the callback name. The second argument is a pointer to the callback function. The third argument is a pointer to a thread dispatch function or NULL if a synchronous callback is desired. And the last argument is a pointer to optional user data passed during callback invocation. The framework internally does nothing with user data other than pass it back to the callback function. The user data value can be anything the caller wants or NULL.

C++
CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);

On C/C++ mixed projects, the userData callback argument can be used to store a this class instance pointer. Pass a class static member function pointer for the callback function and a this pointer for user data to CB_Register(). Within the subscriber callback function, typecast userData back to a class instance pointer. This provides an easy means of accessing class instance functions and data within a static callback function.

Use CB_Invoke() when a publisher needs to invoke the callback for all registered subscribers. The function dispatches the callback and data argument onto the destination thread of control. In the example below, TestCallback1() is called on DispatchCallbackThread1.

C++
int data = 123;
CB_Invoke(TestCb, &data);

Use CB_Unregister() to unsubscribe from a callback.

C++
CB_Unregister(TestCb, TestCallback1, DispatchCallbackThread1);

Asynchronous callbacks are easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.

SysData Publisher Example

SysData is a simple module showing how to expose an outgoing asynchronous interface. The module stores system data and provides asynchronous subscriber notifications when the mode changes. The module interface is shown below.

C++
typedef enum
{
    STARTING,
    NORMAL,
    SERVICE,
    SYS_INOP
} SystemModeType;

typedef struct
{
    SystemModeType PreviousSystemMode;
    SystemModeType CurrentSystemMode;
} SystemModeData;

// Declare a SysData callback interface
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

void SD_Init(void);
void SD_Term(void);
void SD_SetSystemMode(SystemModeType systemMode);

The publisher callback interface is SystemModeChangedCb. Calling SD_SetSystemMode() saves the new mode into _systemMode and notifies all registered subscribers.

C++
void SD_SetSystemMode(SystemModeType systemMode)
{
    LK_LOCK(_hLock);

    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    _systemMode = systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedCb, &callbackData);

    LK_UNLOCK(_hLock);
}

SysData Subscriber Example

The subscriber creates a callback function that conforms to the publisher's callback function signature.

C++
void SysDataCallback(const SystemModeData* data, void* userData)
{
    cout << "SysDataCallback: " << data->CurrentSystemMode << endl;
}

At runtime, CB_Register() is used to register for SysData callbacks on DispatchCallbackThread1.

C++
CB_Register(SystemModeChangedCb, SysDataCallback, DispatchCallbackThread1, NULL);

When SD_SetSystemMode() is called, any client interested in the mode changes are notified asynchronously on their desired execution thread.

C++
SD_SetSystemMode(STARTING);
SD_SetSystemMode(NORMAL);

SysDataNoLock Publisher Example

SysDataNoLock is an alternate implementation that uses a private callback for setting the system mode asynchronously and without locks.

C++
// Declare a public SysData callback interface
CB_DECLARE(SystemModeChangedNoLockCb, const SystemModeData*)

void SDNL_Init(void);
void SDNL_Term(void);
void SDNL_SetSystemMode(SystemModeType systemMode);

The initialize function registers with the private SetSystemModeCb callback.

C++
// Define a private callback interface
CB_DECLARE(SetSystemModeCb, SystemModeType*)
CB_DEFINE(SetSystemModeCb, SystemModeType*, sizeof(SystemModeType), 1)

void SDNL_Init(void)
{
    // Register with private callback
    CB_Register(SetSystemModeCb, SDNL_SetSystemModePrivate, DispatchCallbackThread1, NULL);
}

The SSNL_SetSystemMode() function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but, under the hood, a private function call is invoked asynchronously. In this case, invoking SetSystemModeCb causes SDNL_SetSystemModePrivate() to be called on DispatchCallbackThread1.

C++
void SDNL_SetSystemMode(SystemModeType systemMode)
{
    // Invoke the private callback. SDNL_SetSystemModePrivate() will be called
    // on DispatchCallbackThread1.
    CB_Invoke(SetSystemModeCb, &systemMode);
}

Since this private function is always invoked asynchronously on DispatchCallbackThread1 it doesn't require locks.

C++
static void SDNL_SetSystemModePrivate(SystemModeType* systemMode, void* userData)
{
    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = *systemMode;

    // Update the system mode
    _systemMode = *systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedNoLockCb, &callbackData);
}

Callback Signature Limitations

This design has the following limitations imposed on all callback functions:

  1. Each callback handles a single user-defined argument type.
  2. The argument may be a const or non-const pointer (e.g. const MyData* or MyData*).
  3. The two callback function arguments are always: MyData* and void*.
  4. Each callback has a void return type.

For instance, if a callback is declared as:

C++
CB_DECLARE(TestCb, const MyData*)

The callback function signature is:

C++
void MyCallback(const MyData* data, void* userData);

The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:

  1. Dynamically create an instance to a struct or class and populate data
  2. Post a pointer to the data through an OS message as a void*
  3. Get the data from the OS message queue and typecast the void* back to the original type
  4. Delete the dynamically created data

In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class/struct and used as the callback data argument.

Implementation

The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.

The implementation uses macros and token pasting to provide a unique type-safe interface for each callback. The token pasting operator (##) is used to merge two tokens when the preprocessor expands the macro. The CB_DECLARE macro is shown below:

C++
#define CB_DECLARE(cbName, cbArg) \
    typedef void(*cbName##CallbackFuncType)(cbArg cbData, void* cbUserData); \
    BOOL cbName##_Register(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData); \
    BOOL cbName##_IsRegistered(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Unregister(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Invoke(cbArg cbData); \
    BOOL cbName##_InvokeArray(cbArg cbData, size_t num, size_t size);

In the SysData example used above, the compiler preprocessor expands CB_DECLARE to:

C++
typedef void(*SystemModeChangedCbCallbackFuncType)
            (const SystemModeData* cbData, void* cbUserData); 

BOOL SystemModeChangedCb_Register(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData);
 
BOOL SystemModeChangedCb_IsRegistered(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Unregister(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData); 

BOOL SystemModeChangedCb_InvokeArray(const SystemModeData* cbData, size_t num, size_t size);

Notice every cbName## location is replaced by the macro name argument, in this case, being SystemModeChangedCb from the declaration below.

C++
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

Similarly, the CB_DEFINE macro expands to create the callback function implementations. Notice the macro provides a thin, type-safe wrapper around private functions such as _CB_AddCallback() and _CB_Dispatch(). If attempting to register the wrong function signature, the compiler generates an error or warning. The macros automate the monotonous, boilerplate code that you’d normally write by hand.

The registered callbacks are stored in a static array of CB_Info instances. Calling CB_Invoke(SystemModeChangedCb, &callbackData) executes SystemModeChangedCb_Invoke(). Then _CB_Dispatch() iterates over the CB_Info array and dispatches one CB_CallbackMsg message to each target thread. The message data is dynamically created to travel through an OS message queue.

C++
// Macro generated unique invoke function
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData) 
{ 
    return _CB_Dispatch(&SystemModeChangedCbMulticast[0], 2, cbData, sizeof(SystemModeData)); 
}

BOOL _CB_Dispatch(CB_Info* cbInfo, size_t cbInfoLen, const void* cbData, 
    size_t cbDataSize)
{
    BOOL invoked = FALSE;

    LK_LOCK(_hLock);

    // For each CB_Info instance within the array
    for (size_t idx = 0; idx<cbInfoLen; idx++)
    {
        // Is a client registered?
        if (cbInfo[idx].cbFunc)
        {
            // Dispatch callback onto the OS task
            if (CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize))
            {
                invoked = TRUE;
            }
        }
    }

    LK_UNLOCK(_hLock);
    return invoked;
}

The target OS task event loop dequeues a CB_CallbackMsg* and calls CB_TargetInvoke(). The dynamic data created is freed before the function exits.

C++
void CB_TargetInvoke(const CB_CallbackMsg* cbMsg)
{
    ASSERT_TRUE(cbMsg);
    ASSERT_TRUE(cbMsg->cbFunc);

    // Invoke callback function with the callback data
    cbMsg->cbFunc(cbMsg->cbData, cbMsg->cbUserData);

    // Free data sent through OS queue
    XFREE((void*)cbMsg->cbData);
    XFREE((void*)cbMsg);
} 

Asynchronous callbacks impose certain limitations because everything the callback destination thread needs must be created on the heap, packaged into a CB_CallbackMsg structure, and placed into an OS message queue.

The insertion into an OS queue is platform specific. The CB_DispatchCallbackFuncType function pointer typedef provides the OS queue interface to be implemented for each thread event loop on the target platform. See the Porting section below for a more complete discussion.

C++
typedef BOOL (*CB_DispatchCallbackFuncType)(const CB_CallbackMsg* cbMsg);

Once the message is placed into the message queue, platform specific code unpacks the message, calls the CB_TargetInvoke() function and destroys dynamically allocated data. For this example, a simple WorkerThreadStd class provides the thread event loop leveraging the C++ thread support library. While this example uses C++ threads, the callback modules are written in plain C. Abstracting the OS details from the callback implementation makes this possible.

C++
void WorkerThread::Process()
{
    while (1)
    {
        ThreadMsg* msg = 0;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->GetId())
        {
            case MSG_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg->GetData() != NULL);

                // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
                const CB_CallbackMsg* callbackMsg = 
                        static_cast<const CB_CallbackMsg*>(msg->GetData());

                // Invoke the callback on the target thread
                CB_TargetInvoke(callbackMsg);

                // Delete dynamic data passed through message queue
                delete msg;
                break;
            }
        }
    }
}

Notice the thread loop is unlike most systems that have a huge switch statement handling various incoming data messages, type casting void* data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_DELEGATE message. Once setup, the same small thread loop handles every callback. New publishers and subscribers come and go as the system is designed, but the code in-between doesn't change.

This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here you do none of that. All the stuff in-between is neatly handled for users.

Call Sequence

The two lists below show the call sequence required to asynchronously invoke the callback function TestCallback1() on DispatchCallbackThread1. For this example, TestCb is created using CB_DECLARE/CB_DEFINE and the TestCallback1() callback function is registered with TestCb using CB_Register().

C++
CB_DECLARE(TestCb, int*)
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)

CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);

Main Thread

  1. CB_Invoke(TestCb, &data) – macro function initiates all registered callbacks from the main thread
  2. TestCb_Invoke(&data) – a type-safe macro wrapper function created by CB_DECLARE
  3. _CB_Dispatch(&TestCbMulticast[0], MAX_REGISTER, &data, sizeof(int))private function loops through all registered subscribers
  4. CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize) – the cbData callback data is dispatched to a registered subscriber
  5. DispatchCallbackThread1(cbMsg) – the dynamically allocated cbMsg is placed into the thread 1 message queue

Dispatch Callback Thread 1

  1. WorkerThread::Process() – the worker thread gets cbMsg from the message queue
  2. CB_TargetInvoke(cbMsg) – the callback is invoked on the target thread of control
  3. TestCallback1(&data, NULL) – the target callback function is called with callback data on thread 1

Heap

The dynamic data is required to send data structures through the message queue. Remember, the data pointed to by your callback argument is bitwise copied during a callback.

On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The x_allocator implementation solves the dynamic storage issues and is much faster than the global heap. To use, just define USE_CALLBACK_ALLOCATOR within callback.c. See the References section for more information on x_allocator.

Porting

The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into four directories.

  1. Callback - core library implementation files
  2. Port – Windows-specific files (thread/lock)
  3. Examples – sample code showing usage
  4. VS2017 – Visual Studio 2017 project files

Porting to another platform requires implementing a dispatch function that accepts a const CB_CallbackMsg* for each thread. The functions below show an example.

C++
// C language interface to a callback dispatch function
extern "C" BOOL DispatchCallbackThread1(const CB_CallbackMsg* cbMsg)
{
    workerThread1.DispatchCallback(cbMsg);
    return TRUE;
}

void WorkerThread::DispatchCallback(const CB_CallbackMsg* msg)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    ThreadMsg* threadMsg = new ThreadMsg(MSG_DISPATCH_DELEGATE, msg);

    // Add dispatch delegate msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

The thread event loop gets the message and calls the CB_TargetInvoke() function. The data sent through the queue is deleted once complete.

C++
case MSG_DISPATCH_DELEGATE:
{
    ASSERT_TRUE(msg->GetData() != NULL);

    // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
    const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());

    // Invoke the callback on the target thread
    CB_TargetInvoke(callbackMsg);

    // Delete dynamic data passed through message queue
    delete msg;
    break;
}

Software locks are handled by the LockGuard module. This file can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places. Define USE_LOCKS within callback.c to use LockGuard module locks.

Which Callback Implementation?

I’ve documented three different asynchronous multicast callback implementations here on CodeProject. Each version has its own unique features and advantages. The sections below highlight the main differences between each solution. See the References section below for links to each article.

Asynchronous Multicast Callbacks in C

  • Implemented in C
  • Callback function is a free or static member only
  • One callback argument supported
  • Callback argument must be a pointer type
  • Callback argument data copied with memcpy
  • Type-safety provided by macros
  • Static array holds registered subscriber callbacks
  • Number of registered subscribers fixed at compile time
  • Fixed block memory allocator in C
  • Compact implementation

Asynchronous Multicast Callbacks with Inter-Thread Messaging

  • Implemented in C++
  • Callback function is a free or static member only
  • One callback argument supported
  • Callback argument must be a pointer type
  • Callback argument data copied with copy constructor
  • Type-safety provided by templates
  • Minimal use of templates
  • Dynamic list of registered subscriber callbacks
  • Number of registered subscribers expands at runtime
  • Fixed block memory allocator in C++
  • Compact implementation

Asynchronous Multicast Delegates in C++

  • Implemented in C++
  • C++ delegate paradigm
  • Any callback function type (member, static, free)
  • Multiple callback arguments supported (up to 5)
  • Callback argument any type (value, reference, pointer, pointer to pointer)
  • Callback argument data copied with copy constructor
  • Type-safety provided by templates
  • Heavy use of templates
  • Dynamic list of registered subscriber callbacks
  • Number of registered subscribers expands at runtime
  • Fixed block memory allocator in C++
  • Larger implementation

References

Conclusion

There are many ways to design a publisher/subscriber callback system. This C language version incorporates unique features, standardizes the event loop, and eases generating asynchronous callbacks onto a client specified thread of control. The implementation was kept to a minimum to facilitate porting to any system embedded or otherwise.

This callback implementation works on C and C++ projects. However, if developing a strictly C++ project, you could consider using one of the C++ callback implementations listed within the References section.

I've used this technique on projects with great success. Each module or subsystem may expose one or more outgoing interfaces with CB_DECLARE and CB_DEFINE macros. Any code within the system is able to connect and receive asynchronous callbacks with worrying about cross-threading or the machinery to make it all work. A feature like this eases application design and architecturally standardizes inter-thread communication with a well-understood callback paradigm.

History

  • 6th January, 2019
    • Initial release
  • 29th January, 2019

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
I've been a professional software engineer for over 20 years. When not writing code, I enjoy spending time with the family, camping and riding motorcycles around Southern California.

Comments and Discussions

 
PraiseYou've done it again! Pin
koothkeeper31-Jan-19 17:38
professionalkoothkeeper31-Jan-19 17:38 
GeneralRe: You've done it again! Pin
David Lafreniere12-Feb-19 11:34
David Lafreniere12-Feb-19 11:34 
Thanks for the feedback!

The Process() function doesn't have a memory leak because only two messages can be placed into the queue and the switch statement handles both of those cases. The default case asserts (which should never happen). However, I probably should have at least had a default case to delete it anyway as a defensive measure.

Dave

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.