Investigating cobalt::generator

Standard C++ generator

The code below demonstrates how standard C++23 synchronous generator works:

#include <boost/cobalt.hpp>
#include <iostream>
#include <generator>

namespace cobalt = boost::cobalt;

std::generator<int> numbers()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }
}
cobalt::main co_main(int argc, char* argv[])
{
    for (int v : numbers())
    {
        std::cout << "Got value: " << v << std::endl;
    }

    co_return 0;
}

Cobalt generator

I replaced standard generator with cobalt generator:

cobalt::generator<int> numbers2()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }
}

and got the following warning:

warning C4033: 'numbers2' must return a value

so it wonts me to add a fake co_return and I also use the generator is a different way, because it is asynchronous:

cobalt::generator<int> numbers2()
{
    for (int i = 1; i <= 5; ++i)
    {
        co_yield i;
    }

    co_return 0;
}

cobalt::main co_main(int argc, char* argv[])
{
    auto gen = numbers2();  // создаём генератор

    while (auto val = co_await gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

This code works and prints the following:

Got value: 1
Got value: 2
Got value: 3
Got value: 4
Got value: 5

Using BOOST_COBALT_FOR macro

Replaced co_main with the following:

cobalt::main co_main(int argc, char* argv[])
{
    auto gen = numbers2();  // создаём генератор

    BOOST_COBALT_FOR(auto val, gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

and got the following output:

Got value: 1
Got value: 2
Got value: 3
Got value: 4
Got value: 5
Got value: 0

True asynchronous generator

cobalt::generator<int> numbers3(asio::any_io_executor exec)
{
    asio::steady_timer timer(exec);

    for (int i = 1; i <= 5; ++i)
    {
        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
        co_yield i;
    }

    co_return 0;
}

cobalt::main co_main(int argc, char* argv[])
{
    auto exec = co_await cobalt::this_coro::executor;

    auto gen = numbers3(exec);

    BOOST_COBALT_FOR(auto val, gen)
    {
        std::cout << "Got value: " << val << std::endl;
    }

    co_return 0;
}

Multiple iterating coroutines

Tried to iterate on two spawned coroutines:

cobalt::generator<int> number_source(int count)
{
    auto exec = co_await cobalt::this_coro::executor;
    asio::steady_timer timer(exec);

    for (int i = 1; i <= count; ++i)
    {
        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
        co_yield i;
    }

    co_return 0;
}

using GenPtr = std::shared_ptr<cobalt::generator<int>>;

cobalt::task<void> consumer(int id, GenPtr gen)
{
    auto exec = co_await cobalt::this_coro::executor;
    asio::steady_timer timer(exec);

    while (true)
    {
        auto val = co_await *gen;
        if (!val)
            break;  // the channel is closed

        std::cout << "Consumer " << id << " got: " << val << "\n";

        timer.expires_after(500ms);
        co_await timer.async_wait(cobalt::use_op);
    }

    std::cout << "Consumer " << id << " finished\n";

    co_return;
}

cobalt::main co_main(int argc, char* argv[])
{
    GenPtr gen = std::make_shared<cobalt::generator<int>>(number_source(10));
    
    auto exec = co_await cobalt::this_coro::executor;

    cobalt::spawn(exec, consumer(1, gen), boost::asio::detached);
    cobalt::spawn(exec, consumer(2, gen), boost::asio::detached);

    co_return 0;
}

but got a result that I did not expect, coroutine #2 did not print anything:

Consumer 1 got: 1
Consumer 1 got: 2
Consumer 1 got: 3
Consumer 1 got: 4
Consumer 1 got: 5
Consumer 1 got: 6
Consumer 1 got: 7
Consumer 1 got: 8
Consumer 1 got: 9
Consumer 1 got: 10
Consumer 1 finished

Reading from a file

The main feature here is that you can have multiple read_lines on a single thread that will all make progress at the same time.

It reads from a file with the same effect:

using GenRead = cobalt::generator<boost::system::result<std::string>>;

GenRead read_lines(asio::stream_file& f)
{
    std::string buffer;
    while (f.is_open())
    {
        auto [ec, n] = co_await
            asio::async_read_until(f, asio::dynamic_buffer(buffer), '\n',
                asio::as_tuple(cobalt::use_op));

        // no need to copy, just point to the buffer
        std::string_view ln{ buffer.c_str(), n }; // -1 to skip the line
        ln = boost::algorithm::trim_copy(ln);

        if (!ln.empty())
            co_yield ln;

        if (ec)
            co_return ec;

        buffer.erase(0, n);
    }

    co_return asio::error::broken_pipe;
}

using GenReadPtr = std::shared_ptr<GenRead>;

cobalt::task<void> read_consumer(int id, GenReadPtr gen)
{
    if (id == 1)
    {
        auto exec = co_await cobalt::this_coro::executor;

        asio::steady_timer timer(exec);

        timer.expires_after(500ms);

        co_await timer.async_wait(cobalt::use_op);
    }

    BOOST_COBALT_FOR( // would be for co_await(auto value : read_lines(sf)) if standardized
        auto line,
        *gen)
    {
        if (line.has_error() && line.error() != asio::error::eof)
            std::cerr << "Consumer " << id << " Error occured: " << line.error() << std::endl;
        else if (line.has_value())
            std::cout << "Consumer " << id << " Read line '" << *line << "'" << std::endl;
    }

    std::cout << "Consumer " << id << " finished\n";

    co_return;
}

cobalt::main co_main(int argc, char* argv[])
{
    asio::stream_file sf{ co_await cobalt::this_coro::executor,
                         argv[1], // skipping the check here for brevity.
                         asio::stream_file::read_only };

    GenReadPtr gen = std::make_shared<GenRead>(read_lines(sf));

    auto t1 = read_consumer(1, gen);
    auto t2 = read_consumer(2, gen);

    co_await t1;
    co_await t2;

    co_return 0;
}

The output with the file consisting of the line numbers is the following:

Consumer 1 Read line '1'
Consumer 1 Read line '2'
Consumer 1 Read line '3'
Consumer 1 Read line '4'
Consumer 1 Read line '5'
Consumer 1 Read line '6'
Consumer 1 Read line '7'
Consumer 1 Read line '8'
Consumer 1 Read line '9'
Consumer 1 Read line '10'
Consumer 1 Read line '11'
Consumer 1 Read line '12'
Consumer 1 Read line '13'
Consumer 1 Read line '14'
Consumer 1 Read line '15'
Consumer 1 Read line '16'
Consumer 1 Read line '17'
Consumer 1 Read line '18'
Consumer 1 Read line '19'
Consumer 1 Read line '20'
Consumer 1 Read line '21'
Consumer 1 Read line '22'
Consumer 1 Read line '23'
Consumer 1 Read line '24'
Consumer 1 Read line '25'
Consumer 1 Read line '26'
Consumer 1 Read line '27'
Consumer 1 Read line '28'
Consumer 1 Read line '29'
Consumer 1 Read line '30'
Consumer 1 finished
Consumer 2 finished

1 Response to Investigating cobalt::generator

  1. dmitriano says:

    How to exit from a generator without returning a value
    https://github.com/boostorg/cobalt/issues/176

Leave a Reply

Your email address will not be published. Required fields are marked *