Investigating cobalt::generator

Standard C++ generator

The code below demonstrates how standard C++23 synchronous generator works:

#include <boost/cobalt.hpp>
#include <iostream>
#include <generator>

namespace cobalt = boost::cobalt;

std::generator<int> numbers()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }
}
cobalt::main co_main(int argc, char* argv[])
{
    for (int v : numbers())
    {
        std::cout << "Got value: " << v << std::endl;
    }

    co_return 0;
}

Cobalt generator

I replaced standard generator with cobalt generator:

cobalt::generator<int> numbers2()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }
}

and got the following warning:

warning C4033: 'numbers2' must return a value

so it wonts me to add a fake co_return and I also use the generator is a different way, because it is asynchronous:

cobalt::generator<int> numbers2()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }

    co_return 0;
}

cobalt::main co_main(int argc, char* argv[])
{
    auto gen = numbers2();  // создаём генератор

    while (auto val = co_await gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

This code works and prints the following:

Got value: 1
Got value: 2
Got value: 3
Got value: 4
Got value: 5

Using BOOST_COBALT_FOR macro

Replaced co_main with the following:

cobalt::main co_main(int argc, char* argv[])
{
    auto gen = numbers2();  // создаём генератор

    BOOST_COBALT_FOR(auto val, gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

and got the following output:

Got value: 1
Got value: 2
Got value: 3
Got value: 4
Got value: 5
Got value: 0

True asynchronous generator

cobalt::generator<int> numbers3(asio::any_io_executor exec)
{
    asio::steady_timer timer(exec);

    for (int i = 1; i <= 5; ++i)
    {
        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
        co_yield i;
    }

    co_return 0;
}

cobalt::main co_main(int argc, char* argv[])
{
    auto exec = co_await cobalt::this_coro::executor;

    auto gen = numbers3(exec);

    BOOST_COBALT_FOR(auto val, gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

Multiple iterating coroutines

Tried to iterate on two spawned coroutines:

cobalt::generator<int> number_source(int count)
{
    auto exec = co_await cobalt::this_coro::executor;
    asio::steady_timer timer(exec);

    for (int i = 1; i <= count; ++i)
    {
        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
        co_yield i;
    }

    co_return 0;
}

using GenPtr = std::shared_ptr<cobalt::generator<int>>;

cobalt::task<void> consumer(int id, GenPtr gen)
{
    auto exec = co_await cobalt::this_coro::executor;
    asio::steady_timer timer(exec);

    while (true)
    {
        auto val = co_await *gen;
        if (!val)
            break;  // the channel is closed

        std::cout << "Consumer " << id << " got: " << val << "\n";

        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
    }

    std::cout << "Consumer " << id << " finished\n";

    co_return;
}

cobalt::main co_main(int argc, char* argv[])
{
    GenPtr gen = std::make_shared<cobalt::generator<int>>(number_source(10));
    
    auto exec = co_await cobalt::this_coro::executor;

    cobalt::spawn(exec, consumer(1, gen), boost::asio::detached);
    cobalt::spawn(exec, consumer(2, gen), boost::asio::detached);

    co_return 0;
}

but got a result that I did not expect, coroutine #2 did not print anything:

Consumer 1 got: 1
Consumer 1 got: 2
Consumer 1 got: 3
Consumer 1 got: 4
Consumer 1 got: 5
Consumer 1 got: 6
Consumer 1 got: 7
Consumer 1 got: 8
Consumer 1 got: 9
Consumer 1 got: 10
Consumer 1 finished

Reading from a file

The main feature here is that you can have multiple read_lines on a single thread that will all make progress at the same time.

It reads from a file with the same effect:

using GenRead = cobalt::generator<boost::system::result<std::string>>;

GenRead read_lines(asio::stream_file& f)
{
    std::string buffer;
    while (f.is_open())
    {
        auto [ec, n] = co_await
            asio::async_read_until(f, asio::dynamic_buffer(buffer), '\n',
                asio::as_tuple(cobalt::use_op));

        // no need to copy, just point to the buffer
        std::string_view ln{ buffer.c_str(), n }; // -1 to skip the line
        ln = boost::algorithm::trim_copy(ln);

        if (!ln.empty())
            co_yield ln;

        if (ec)
            co_return ec;

        buffer.erase(0, n);
    }

    co_return asio::error::broken_pipe;
}

using GenReadPtr = std::shared_ptr<GenRead>;

cobalt::task<void> read_consumer(int id, GenReadPtr gen)
{
    if (id == 1)
    {
        auto exec = co_await cobalt::this_coro::executor;

        asio::steady_timer timer(exec);

        timer.expires_after(500ms);

        co_await timer.async_wait(cobalt::use_op);
    }

    BOOST_COBALT_FOR( // would be for co_await(auto value : read_lines(sf)) if standardized
        auto line,
        *gen)
    {
        if (line.has_error() && line.error() != asio::error::eof)
            std::cerr << "Consumer " << id << " Error occured: " << line.error() << std::endl;
        else if (line.has_value())
            std::cout << "Consumer " << id << " Read line '" << *line << "'" << std::endl;
    }

    std::cout << "Consumer " << id << " finished\n";

    co_return;
}

cobalt::main co_main(int argc, char* argv[])
{
    asio::stream_file sf{ co_await cobalt::this_coro::executor,
                         argv[1], // skipping the check here for brevity.
                         asio::stream_file::read_only };

    GenReadPtr gen = std::make_shared<GenRead>(read_lines(sf));

    auto t1 = read_consumer(1, gen);
    auto t2 = read_consumer(2, gen);

    co_await t1;
    co_await t2;

    co_return 0;
}

The output with the file consisting of the line numbers is the following:

Consumer 1 Read line '1'
Consumer 1 Read line '2'
Consumer 1 Read line '3'
Consumer 1 Read line '4'
Consumer 1 Read line '5'
Consumer 1 Read line '6'
Consumer 1 Read line '7'
Consumer 1 Read line '8'
Consumer 1 Read line '9'
Consumer 1 Read line '10'
Consumer 1 Read line '11'
Consumer 1 Read line '12'
Consumer 1 Read line '13'
Consumer 1 Read line '14'
Consumer 1 Read line '15'
Consumer 1 Read line '16'
Consumer 1 Read line '17'
Consumer 1 Read line '18'
Consumer 1 Read line '19'
Consumer 1 Read line '20'
Consumer 1 Read line '21'
Consumer 1 Read line '22'
Consumer 1 Read line '23'
Consumer 1 Read line '24'
Consumer 1 Read line '25'
Consumer 1 Read line '26'
Consumer 1 Read line '27'
Consumer 1 Read line '28'
Consumer 1 Read line '29'
Consumer 1 Read line '30'
Consumer 1 finished
Consumer 2 finished

It is not concurrent consumers, but multiple generators in a single thread:

cobalt::generator<std::string> read_lines(const char* name, int n) {
    for (int i = 0; i < n; ++i) {
        co_yield std::string(name) + ":" + std::to_string(i);
    }
    co_return {};
}

cobalt::main co_main(int argc, char* argv[])
{
    auto ga = read_lines("a", 1000);
    auto gb = read_lines("b", 1000);

    while (ga || gb) {
        auto v = co_await cobalt::race(ga, gb);
        boost::variant2::visit([](auto& s) { std::cout << s << "\n"; }, v);
    }

    co_return 0;
}

Or with std::string wrapped into boost::system::result:

using GenRead = cobalt::generator<boost::system::result<std::string>>;

GenRead read_lines(asio::stream_file& f)
{
    std::string buffer;
    while (f.is_open())
    {
        auto [ec, n] = co_await
            asio::async_read_until(f, asio::dynamic_buffer(buffer), '\n',
                asio::as_tuple(cobalt::use_op));

        // no need to copy, just point to the buffer
        std::string_view ln{ buffer.c_str(), n }; // -1 to skip the line
        ln = boost::algorithm::trim_copy(ln);

        if (!ln.empty())
            co_yield ln;

        if (ec)
            co_return ec;

        buffer.erase(0, n);
    }

    co_return asio::error::broken_pipe;
}

cobalt::main co_main(int argc, char* argv[])
{
    asio::stream_file a{ co_await cobalt::this_coro::executor, argv[1], asio::stream_file::read_only };
    asio::stream_file b{ co_await cobalt::this_coro::executor, argv[2], asio::stream_file::read_only };

    auto ga = read_lines(a);
    auto gb = read_lines(b);

    while (ga || gb) {
        auto v = co_await cobalt::race(ga, gb);
        boost::variant2::visit([](auto& s) { std::cout << s << "\n"; }, v);
    }
    co_return 0;
}

The both examples crash at the end probably because we call cobalt::race with a finished generator.

Fixed examples

Looks like I finally figured out what does

The main feature here is that you can have multiple read_lines on a single thread that will all make progress at the same time.

mean.

It means that we can use cobalt::race with cobalt::generators:

using GenRead = cobalt::generator<boost::system::result<std::string>>;
 
GenRead read_lines(asio::stream_file& f)
{
    std::string buffer;
    while (f.is_open())
    {
        auto [ec, n] = co_await
            asio::async_read_until(f, asio::dynamic_buffer(buffer), '\n',
                asio::as_tuple(cobalt::use_op));
 
        // no need to copy, just point to the buffer
        std::string_view ln{ buffer.c_str(), n }; // -1 to skip the line
        ln = boost::algorithm::trim_copy(ln);
 
        if (!ln.empty())
            co_yield ln;
 
        // Why we do not throw the error as an exception?
        if (ec)
            co_return ec;
 
        buffer.erase(0, n);
    }
 
    co_return asio::error::broken_pipe;
}

cobalt::main co_main(int argc, char* argv[])
{
    asio::stream_file a{ co_await cobalt::this_coro::executor, argv[1], asio::stream_file::read_only };
    asio::stream_file b{ co_await cobalt::this_coro::executor, argv[2], asio::stream_file::read_only };

    auto ga = read_lines(a);
    auto gb = read_lines(b);

    // cobalt::race will crash if one of the generators is finished.
    while (ga && gb) {
        auto v = co_await cobalt::race(ga, gb);
        boost::variant2::visit([](auto& s) { std::cout << s << "\n"; }, v);
    }

    // continue the interation without cobalt::race
    while (ga) {
        auto s = co_await ga;
        std::cout << s << '\n';
    }

    while (gb) {
        auto s = co_await gb;
        std::cout << s << '\n';
    }

    co_return 0;
}

But it is still not obvious, why do we really need this co_return at line 26 and why the error code is returned, but is not thrown at line 21.

The sample code usage:

seq 1 10000 > a.txt
seq 1000000 1010000 > b.txt
AwlTest.exe a.txt b.txt

The output ends with the following:

value:9996
value:1009995
value:9997
value:1009996
value:9998
value:1009997
value:9999
value:1009998
value:10000
value:1009999
error:asio.misc:2
value:1010000
error:asio.misc:2

boost.cobalt source code

There is the following code in boost_1_89_0\boost\cobalt\detail\generator.hpp:

  void return_value(const Yield & res) requires std::is_copy_constructible_v<Yield>
  {
    if (this->receiver)
      this->receiver->yield_value(res);
  }

  void return_value(Yield && res)
  {
    if (this->receiver)
      this->receiver->yield_value(std::move(res));
  }

it is the only place where I found return_value.

Standard generator std::generator has only yield_value(…) and return_void().

boost::cobalt::generator has operator bool, see boost_1_89_0\boost\cobalt\generator.hpp:

template<typename Yield, typename Push >
inline generator<Yield, Push>::operator bool() const
{
  return !receiver_.done || receiver_.result || receiver_.exception;
}

2 Responses to Investigating cobalt::generator

  1. dmitriano says:

    How to exit from a generator without returning a value
    https://github.com/boostorg/cobalt/issues/176
    The discussion I get the code of `read_lines` from.

Leave a Reply

Your email address will not be published. Required fields are marked *