Click here to Skip to main content
15,867,568 members
Articles / Programming Languages / C++

Understanding C++20 Coroutines, Awaitable Objects and Operator co_await with Real Samples

Rate me:
Please Sign up or sign in to vote.
4.33/5 (2 votes)
3 Dec 2020CPOL11 min read 13.6K   6   5
C++, C++20 Coroutine, Awaitable Object, co_await, Cross Platform Development and Non-blocking Socket Communication
This article will mainly focus on awaitable object, task and co_await. It comes with a real demo sample to demonstrate streaming both requests and responses between client and server with the best network efficiency. The provided sample code as well as others listed at the end can be compiled with GCC 10.0.1 and Visual C++ 2017 or later.

Introduction

C++20 has introduced a set of new features which will change the way we design and write applications by use of modern C++. It is believed that think coroutine is the most important one among these C++20 new features.

C++20 coroutine is involved with a lot of new concepts such as awaitable object, task, promise type, co_await, co_yield, co_return, and so on. However, there are not many real samples available at such an early stage for learning on the web sides yet.

UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the github site. The framework well supports modern language development features such as anonymous function, closure, Lambda expression, async/await, future, promise, yield and so on. It is already integrated with the C++20 coroutine feature with many samples for client/server communication, database accessing, file exchanging between client and server, and persistent message queue. It is believed that these samples can certainly assist you to understand the C++20 coroutine.

This short article will touch all of these concepts except co_yield and co_return. However, it will mainly focus on awaitable object, task and co_await. Further, this article comes with a real demo sample to demonstrate streaming both requests and responses between client and server with the best network efficiency. The provided sample code as well as others listed at end can be compiled with GCC 10.0.1 and Visual C++ 2017 or later.

Preparations

First of all, clone SocketPro package at the github site. The original sample code is located at the file ../socketpro/tutorials/cplusplus/hello_world/client/cw_hw.cpp. Next, navigate to SocketPro development index file ../socketpro/doc/dev_guide.htm.

It is highly recommended to read through the following short articles in order:

  1. Fundamentals about SocketPro Communication Framework
  2. CUQueue and Compatibility among Different Development Languages
  3. Get started with SocketPro
  4. C/C++ (C++11 or later)

The third article guides you to distribute all required components. Specifically, we will use the sample server application all_servers for the coming testings. At this time, run it with listening port at 20901 as a remote SocketPro server after distributing these components properly.

Main Test Code

The below code snippet 1 shows the main unit test code related with C++20 coroutine task and awaitable objects queue. First of all, it is required to refer C++20 coroutine header file coroutine as shown at the very top of the code snippet.

C++
#if __has_include(<coroutine>)
#include <coroutine> //GCC 10.0.1 or later
                     //Visual C++ 2019 16.8.0 preview 3.1 or later
#elif __has_include(<experimental/coroutine>)
#include <experimental/coroutine> //Visual C++ 2017 & 2019 16.8.0 before
#else
static_assert(false, "No co_await support");
#endif

#include <iostream>
#include <deque>
#include "../../uqueue_demo/mystruct.h"
#include "HW.h"

using namespace std;
using namespace SPA;
using namespace SPA::ClientSide;

CMyStruct ms0;
typedef CSocketPool<HelloWorld> CMyPool;

deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
    // ......
}

CAwTask MyTest(CMyPool::PHandler& hw) {
    // ......
}

//compile options
//Visual C++ 2017 & 2019 16.8.0 before -- /await
//Visual C++ 2019 16.8.0 preview 3.1 or later -- /std:c++latest
//GCC 10.0.1 or later -- -std=c++20 -fcoroutines -ldl -lstdc++ -pthread
//GCC 11 or clang 14 -- -std=c++20 -ldl -lstdc++ -pthread
int main(int argc, char* argv[]) {
    CMyPool spHw;
    CConnectionContext cc("localhost", 20901, L"MyUserId", L"MyPassword");
    //spHw.SetQueueName("qhw");
    if (!spHw.StartSocketPool(cc, 1)) {//create a pool with one session 
                                       //within one worker thread
        wcout << "No connection to remote hello world server\n";
    }
    else {
        auto hw = spHw.Seek(); //find an async handler from a socket pool
        SetMyStruct(ms0);
        MyTest(hw); //call a C++20 test coroutine
    }

    wcout << L"Press a key to kill the demo ......\n";
    ::getchar();
    return 0;
}
Code snippet 1: Main unit test code involved with C++20 coroutine task and awaitable objects queue

We are going to focus the two functions CreateAwaitables and MyTest later. The first one will return an instance of deque containing an array of C++20 awaitable objects RWaiter<wstring>. The second one will return a C++20 task CAwTask, which is really similar to the key word async in functionality within other development languages such as .NET, JavaScript and Python. We will discuss them later.

Now, let us move to the main function. Inside the function, first we create socket pool having one non-blocking socket session which is hosted within one worker thread. It is noted that a socket pool can have any number of worker threads. Each of these worker threads can host any number of non-blocking socket sessions to different remote servers. Here, for code clarity, we use one non-blocking socket session hosted within one worker thread for this demonstration. In reality, it is recommended that a socket pool have one worker thread under most cases.

At the end, we call the method MyTest for demonstration of C++20 coroutine and co_wait. You can compile the small piece of code with commented compile options right before the main function, and test this sample client application against the above mentioned server all_servers.

Dissecting MyTest, CAwTask and CreateAwaitables

First of all, the method MyTest within the below code snippet 2 is a C++20 coroutine, which always returns an instance of CAwTask. In functionality, CAwTask is similar to the keyword async of other languages such as .NET, JavaScript and Python. However, we can customize the class CAwTask and its inner class promise_type within C++20 as shown at the top. Under most cases, the class definition here will be fine to your needs without any modification. For details, you may refer to this article. Pay attention to comments at lines 4 and 10. Once a coroutine is going to be called, an instance of CAwTask will be created by calling the method promise_type::get_return_object. At the end, the method promise_type::final_suspend will be called when the C++20 coroutine MyTest is about to exit. It is recommended you put debug break points and step through them for better understanding.

C++
struct CAwTask {
    struct promise_type {
        CAwTask get_return_object() {
            return { }; //line 4 -- called once a coroutine is going to be called
        }
        std::suspend_never initial_suspend() {
            return { };
        }
        std::suspend_never final_suspend() {
            return { }; //line 10 -- called when a coroutine is about to exit
        }
        void return_void() {
        }
        void unhandled_exception() {
        }
    };
};

deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
    auto aw0 = hw->wait_send<wstring>(idSayHello, L"John", L"Dole"); //line 20
    auto aw1 = hw->wait_send<wstring>(idSayHello, L"Hillary", L"Clinton");
    auto aw2 = hw->wait_send<wstring>(idSayHello, L"Donald", L"Trump");
    auto aw3 = hw->wait_send<wstring>(idSayHello, L"Joe", L"Biden");
    auto aw4 = hw->wait_send<wstring>(idSayHello, L"Mike", L"Pence"); //line 24
    //auto aw4 = hw->wait_send<wstring>(idSayHello, L"", L"Pence"); //line 25
    return {aw0, aw1, aw2, aw3, aw4}; //line 26
}
CAwTask MyTest(CMyPool::PHandler& hw) {
    try {
        //requests/results streamed with inline batching
        auto qWaiter = CreateAwaitables(hw); //line 31
        BWaiter ws = hw->wait_sendRequest(idSleep, (int)5000); //line 32
        RWaiter<CMyStruct> wms = hw->wait_send<CMyStruct>(idEcho, ms0); //line 33

        //co_await for all results
        while(qWaiter.size()) { //line 36
            wcout << co_await qWaiter.front() << "\n";
            qWaiter.pop_front();
        }
        wcout << "Waiting sleep ......\n";
        CScopeUQueue sb = co_await ws;
        //sleep request returns nothing
        assert(sb->GetSize() == 0);
        CMyStruct ms = co_await wms; //line 44
        wcout << "(ms == ms0): " << ((ms == ms0) ? 1 : 0)
            << "\nAll requests processed\n";
    }
    catch (CServerError& ex) { //line 48
        wcout << ex.ToString() << "\n";
    }
    catch (CSocketError& ex) {
        wcout << ex.ToString() << "\n";
    }
    catch (exception& ex) {
        wcout << "Unexpected error: " << ex.what() << "\n";
    } //line 56
}
Code snippet 2: CreateAwaitables and MyTest for demonstration of C++20 coroutine task and awaitable objects

Here, we use the method CreateAwaitables as shown at line 31 to create five awaitable objects (aw0, aw1, aw2, aw3 and aw4) as shown at line 20 through 26. Each of these awaitable objects corresponds to one request from client to server. We put them into a deque container for later use. Further, we obtain two extra awaitable objects at lines 32 and 33 by sending two requests to server for processing. All of seven requests and expected responses are streamed. SocketPro internally uses its inline batching algorithm to batch these requests and responses data at both client and server sides. SocketPro well supports streaming both requests and responses for the best network efficiency by design, which significantly boosts application both performance and scalability. Our study results show that performance improvements could be easily from 90% for local area network up to 30000% for wide area network. This feature is one of SocketPro shining points. You can hardly find this excellent feature within other frameworks.

After collecting all awaitable objects, we start to co_await them at line 36 through 44. It is noted that all RWaiter<wstring>, BWaiter and RWaiter<CMyStruct> awaitable objects are copyable and moveable. You can put them into standard library containers such as vector, deque, stack, map, and so on.

Inside the C++20 coroutine function MyTest, you can use try/catch for exception handling as shown at line 48 through 56. The error CServerError comes an exception from a remote SocketPro server. For example, you will get such an exception if you uncomment line 25 but comment out line 24. Alternatively, you can also get a CServerError exception by giving a negative value instead of positive 5000 for the second input at line 32. SocketPro client adapter will also throw a CSocketError communication error in case either an underlying socket is closed or a request is canceled. To test the exception CSocketError, you can brutally kill the test sample server application all_servers right after running this client application.

Uncovering RWaiter/wait_send and BWaiter/wait_sendRequest

The below code snippet 3 shows definitions for two awaitable classes, RWaiter and BWaiter at lines 2 and 31, respectively. The first one is a template class which derives from a base awaitable class CWaiterBase<R>, but the second one is a regular class deriving from CWaiterBase<CScopeUQueue>. Here, template argument R and CScopeUQueue represents an expected returning data and an array of bytes from a remote SocketPro server, respectively. It is noted that the array of bytes will be used to de-serialize zero, one or more different types of data later.

C++
template<typename R>
struct RWaiter : public CWaiterBase<R> { //line 2
    RWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
        const unsigned char* pBuffer, unsigned int size)
    : CWaiterBase<R>(reqId) {
        auto& wc = this->m_wc;
        if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
                try {
                    ar >> wc->m_r; //unpack ar buffer into m_r (R) directly
                } catch (...) { //de-serialization or other errors
                    wc->m_ex = std::current_exception();
                }
                //resume coroutine from a socket pool worker thread
                wc->resume();
            }, this->get_aborted(), this->get_se())) {
            //throw CSocketError exception if socket already closed
            ash->raise(reqId);
        }
    }
};

// ......

template<typename R, typename ... Ts>
RWaiter<R> wait_send(unsigned short reqId, const Ts& ... args) {
    CScopeUQueue sb;
    sb->Save(args ...);
    return RWaiter<R>(this, reqId, sb->GetBuffer(), sb->GetSize());
}

struct BWaiter : public CWaiterBase<CScopeUQueue> { //line 31
    BWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
        const unsigned char* pBuffer, unsigned int size)
    : CWaiterBase<CScopeUQueue>(reqId) {
        auto& wc = m_wc;
        if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
                //move server returned buffer from ar into m_r (CScopeUQueue)
                wc->m_r->Swap(ar.UQueue);
                //resume coroutine from a socket pool worker thread
                wc->resume();
            }, get_aborted(), get_se())) {
            //throw CSocketError exception if socket already closed
            ash->raise(reqId);
        }
    }
};

// ......

template<typename ... Ts>
BWaiter wait_sendRequest(unsigned short reqId, const Ts& ... args) {
    CScopeUQueue sb;
    sb->Save(args ...);
    return BWaiter(this, reqId, sb->GetBuffer(), sb->GetSize());
}
Code snippet 3: Decoding RWaiter/wait_send and BWaiter/wait_sendRequest

After having looked at the two awaitable class constructors, you will find that SocketPro always sends a request from client to server with a request id reqId, an array of bytes pBuffer having a given length size, and three callbacks (a lambda expression [wc](CAsyncResult & ar) {......}, get_aborted and get_se). The first lambda expression callback is used to monitor a returning result from server. Inside the callback, it is a must to resume a coroutine by calling the method resume (wc->resume()) after processing server returned result as commented. The second callback get_aborted is used to monitor the two events, request canceled and socket session closed. The last callback get_se is used to track an exception error from remote server. In short, the three callbacks cover all communication errors and possible results including both expected result and exception error from server. We will discuss the two callbacks, get_aborted and get_se within the coming code snippet 4.

Before ending this section, it is worth noting that all the three callbacks are always called within a socket pool worker thread. You can use the two template methods, wait_send and wait_sendRequest to send any types of requests onto remote SocketPro server. They will immediately return C++20 awaitable objects without waiting for server response, which will be co_awaited later for any types of expected results or possible different types of exceptions.

Decoding Template Class CWaiterBase

The below code snippet 4 shows implementation of SocketPro C++20 awaitable located at the file ../socketpro/include/aclientw.h. First of all, see the code line 78 through 110, which generates the two callbacks DDiscarded and DServerException by calling the two methods get_abored and get_se, respectively. As shown in the previous code snippet 3, they are also reused within SocketPro remoting file (../socketpro/include/streamingfile.h), server persistent message queue (../socketpro/include/aqhandler.h) and database handlers (../socketpro/include/udb_client.h). It is noted that the generated two callbacks will be always called by socket pool worker threads. Further, C++20 coroutine handler will always be called to resume at end after setting an exception properly, as shown at lines 84 and 106.

C++
typedef std::coroutine_handle<> CRHandle;

template<typename R>
struct CWaiterBase {

    struct CWaiterContext {

        CWaiterContext(unsigned short reqId)
        : m_done(false), m_reqId(reqId) {
        }

        CWaiterContext(const CWaiterContext& wc) = delete;
        CWaiterContext(CWaiterContext&& wc) = delete;
        CWaiterContext& operator=(const CWaiterContext& wc) = delete;
        CWaiterContext& operator=(CWaiterContext&& wc) = delete;

        bool await_ready() noexcept {
            CSpinAutoLock al(m_cs);
            return m_done;
        }

        //always call this method from pool worker thread
        void resume() noexcept {
            CSpinAutoLock al(m_cs);
            if (!m_done) {
                m_done = true;
                if (m_rh) {
                    m_rh.resume(); //line 28
                }
            }
        }

        unsigned short get_id() {
            return m_reqId;
        }

        R m_r;
        std::exception_ptr m_ex;

    private:
        bool await_suspend(CRHandle rh) noexcept {
            CSpinAutoLock al(m_cs);
            if (!m_done) {
                m_rh = rh; //line 44
                return true; //will resume from worker thread
            }
            return false; //resume immediately
        }
        CSpinLock m_cs;
        bool m_done; //protected by m_cs
        CRHandle m_rh; //protected by m_cs
        unsigned short m_reqId;
        friend struct CWaiterBase;
    };

    CWaiterBase(unsigned short reqId)
    : m_wc(new CWaiterContext(reqId)) {
    }

    bool await_suspend(CRHandle rh) noexcept {
        return m_wc->await_suspend(rh);
    }

    //R should support moveable (preferred) or copy constructor
    R&& await_resume() {
        if (m_wc->m_ex) {
            std::rethrow_exception(m_wc->m_ex); //line 67
        }
        return std::move(m_wc->m_r);
    }

    bool await_ready() noexcept {
        return m_wc->await_ready();
    }

protected:

    DServerException get_se() noexcept { //line 78
        auto& wc = m_wc;
        return [wc](CAsyncServiceHandler* ash, unsigned short reqId, const
            wchar_t* errMsg, const char* errWhere, unsigned int errCode) {
            wc->m_ex = std::make_exception_ptr(
                CServerError(errCode, errMsg, errWhere, reqId));
            wc->resume(); //line 84
        };
    }

    DDiscarded get_aborted() noexcept {
        auto& wc = m_wc;
        return [wc](CAsyncServiceHandler* h, bool canceled) {
            if (canceled) {
                wc->m_ex = std::make_exception_ptr(CSocketError(
                   REQUEST_CANCELED, REQUEST_CANCELED_ERR_MSG, wc->get_id(), false));
            } else {
                CClientSocket* cs = h->GetSocket();
                int ec = cs->GetErrorCode();
                if (ec) {
                    std::string em = cs->GetErrorMsg();
                    wc->m_ex = std::make_exception_ptr(
                  CSocketError(ec,Utilities::ToWide(em).c_str(),wc->get_id(),false));
                } else {
                 wc->m_ex =std::make_exception_ptr(CSocketError(SESSION_CLOSED_AFTER,
                    SESSION_CLOSED_AFTER_ERR_MSG, wc->get_id(), false));
                }
            }
            wc->resume(); //line 106
        };
    }

    std::shared_ptr<CWaiterContext> m_wc; //line 110
};
Code snippet 4: Implementation of SocketPro C++20 base awaitable object

A C++20 awaitable class must be implemented with three required methods, await_ready, await_suspend and await_resume which are used by operator co_await. When co_awaiting an awaitable object, the method await_ready will be called at first to check if an expecting result is already available. If this method returns true, it means that result is indeed available now, and co_awaiting will immediately call the method await_resume for returning result without creating a coroutine handle or calling the method await_suspend at all. This situation may often happen with SocketPro communication framework, when there are multiple requests streamed as shown with this example. Contrarily, If the method await_ready returns false under most of the cases, it means that result is not available at the moment, and co_awaiting will create a coroutine handle (resume point) and call the method await_suspend.

The method await_suspend here returns a bool value. Further, the method can be also defined to return void or coroutine handle. If the method returns true or void under most of the cases, a coroutine handle is remembered as shown at line 44, and co_awaiting will return with a resume point. It is expected that a socket pool worker thread will resume from the resume point later by calling the coroutine handle method resume at line 28. In case the method returns false, co_awaiting will immediately resume, and also immediately call the method await_resume for an expected result or exception. This situation may also happen with SocketPro, although chances are low. The method await_suspend could be also defined to return a customerized coroutine handle, which is beyond this short article.

At the end, the method await_resume will be called through operator co_await after one of the following three cases happen.

  1. await_ready returns true
  2. await_suspend returns false
  3. Call coroutine handler method resume as shown at line 28 from a socket pool worker thread

Inside the method await_resume, it is possible to throw an exception as shown at line 67 if there is indeed an exception recorded. Here, the method returns a rvalue. It is expected that the template argument R supports moveable or copy constructor. However, moveable constructor is preferred over copy constructor to avoid memory copy. It is also noted that the method await_resume can be also defined to return void if there is no result expected.

Other Examples

After understanding all things above, you can do your own studies about C++20 coroutine feature further by use of the other below samples through the sample SocketPro server all_servers.

  1. File exchanging between client and server: ../socketpro/tutorials/cplusplus/remote_file/client/rf_cw.cpp
  2. Server persistent queue: ../socketpro/tutorials/cplusplus/server_queue/client/sq_cw.cpp
  3. SQLite database: ../socketpro/stream_sql/usqlite/test_csqlite/cw_sqlite.cpp

C++20 Coroutine/co_await Performance Advantage

C++20 coroutines are stackless without copying stack information data, which can lead to significant performance improvements in addition to their asynchronous computations. SocketPro has come sample client/server codes at the directory ../socketpro/samples/latency for you to do performance testings. It is noted that SocketPro C++ adapter well supports on modern language development features such as anonymous function, Lambda expression, std::promise, std::future, coroutine and co_await as well as others.

Our performance studies show that C++20 co_await can always reduce about 4 ~ 6 microseconds for each of client/server requests in comparison to the std::promise/std::future/get approach no matter whether a client platform is Windows or Linux. The C++20 co_await performance gain is huge!

History

  • 3rd December, 2020: Initial release
  • 10th February, 2023:  Minor update for adding support to clang 14 or later

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
Yuancai (Charlie) Ye, an experienced C/C++ software engineer, lives in Atlanta, Georgia. He is an expert at continuous inline request/result batching, real-time stream processing, asynchronous data transferring and parallel computation for the best communication throughput and latency. He has been working at SocketPro (https://github.com/udaparts/socketpro) for more than fifteen years.

Comments and Discussions

 
GeneralMy vote of 5 Pin
tugrulGtx13-Feb-21 20:24
tugrulGtx13-Feb-21 20:24 
QuestionHave you tested the code with clang? Pin
vickoza4-Dec-20 7:57
vickoza4-Dec-20 7:57 
AnswerRe: Have you tested the code with clang? Pin
Yuancai (Charlie) Ye5-Dec-20 12:31
Yuancai (Charlie) Ye5-Dec-20 12:31 
GeneralRe: Have you tested the code with clang? Pin
vickoza29-Dec-20 7:38
vickoza29-Dec-20 7:38 
GeneralRe: Have you tested the code with clang? Pin
Yuancai (Charlie) Ye10-Feb-23 7:22
Yuancai (Charlie) Ye10-Feb-23 7:22 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.