From 10df5484104f0c643b5c1c0cae7608583f15734e Mon Sep 17 00:00:00 2001 From: Khalil Estell Date: Fri, 13 Mar 2026 14:13:29 -0700 Subject: [PATCH 1/2] :recycle: Decouple context from virtual callback API via unblock_listener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Separate the unblock notification mechanism from the context class into a dedicated `unblock_listener` interface. This eliminates the need for virtual function on context itself, allowing each context to be a simple, concrete object. The callback pattern is now only used for the unblock event—the only notification that must happen asynchronously (in ISRs or other threads). The scheduler can directly inspect context state to determine readiness and sleep duration without callbacks. Changes: - Extract unblock notification into `unblock_listener` interface - Remove virtual methods from context class - Update sleep_duration to use microseconds (u32) instead of nanoseconds to save 4 bytes, reduce the timing requirements on the scheduler, and to provide a more realistic delay range for most systems. - Simplify context state inspection for scheduler decision-making - Add `on_unblock()` listener registration/clearing on context - Update tests to use new listener-based design --- CMakeLists.txt | 6 +- README.md | 89 +-- benchmarks/benchmark.cpp | 27 +- cspell.json | 1 + docs/api/async_context.md | 2 +- modules/async_context.cppm | 681 ++++++++++-------- test_package/main.cpp | 159 ++-- tests/basics.test.cpp | 27 +- tests/basics_dep_inject.test.cpp | 134 ++++ tests/blocked_by.test.cpp | 50 +- tests/cancel.test.cpp | 8 +- tests/exclusive_access.test.cpp | 4 +- tests/on_unblock.test.cpp | 131 ++++ tests/proxy.test.cpp | 6 +- tests/simple_scheduler.test.cpp | 152 ++++ ...ic_context.test.cpp => sync_wait.test.cpp} | 60 +- tests/util.cppm | 73 -- 17 files changed, 978 insertions(+), 632 deletions(-) create mode 100644 tests/basics_dep_inject.test.cpp create mode 100644 tests/on_unblock.test.cpp create mode 100644 tests/simple_scheduler.test.cpp rename tests/{basic_context.test.cpp => sync_wait.test.cpp} (74%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 828b514..d586cbc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,12 +26,15 @@ libhal_apply_compile_options(async_context) libhal_install_library(async_context NAMESPACE libhal) libhal_add_tests(async_context TEST_NAMES - basic_context + sync_wait basics blocked_by cancel exclusive_access proxy + basics_dep_inject + on_unblock + simple_scheduler MODULES tests/util.cppm @@ -43,7 +46,6 @@ libhal_add_tests(async_context if(NOT CMAKE_CROSSCOMPILING) message(STATUS "Building benchmarks tests!") - find_package(benchmark REQUIRED) libhal_add_executable(benchmark SOURCES benchmarks/benchmark.cpp) target_link_libraries(benchmark PRIVATE async_context benchmark::benchmark) diff --git a/README.md b/README.md index 1d055ef..863a325 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ int main() Output: -``` +```text Pipeline '🌟 System 1' starting... ['🌟 System 1': Sensor] Starting read... Pipeline '🔥 System 2' starting... @@ -142,9 +142,8 @@ Both pipelines completed successfully! ## Features - **Stack-based coroutine allocation** - No heap allocations; coroutine frames are allocated from a user-provided stack buffer -- **Cache-line optimized** - Context object fits within `std::hardware_constructive_interference_size` (typically 64 bytes) - **Blocking state tracking** - Built-in support for time, I/O, sync, and external blocking states -- **Scheduler integration** - Virtual `do_schedule()` method allows custom scheduler implementations +- **Flexible scheduler integration** - Schedulers can poll context state directly, or register an `unblock_listener` for ISR-safe event notification when contexts become unblocked - **Proxy contexts** - Support for supervised coroutines with timeout capabilities - **Exception propagation** - Proper exception handling through the coroutine chain - **Cancellation support** - Clean cancellation with RAII-based resource cleanup @@ -222,13 +221,35 @@ work. ## Core Types +### `async::unblock_listener` + +An interface for receiving notifications when a context becomes unblocked. This +is the primary mechanism for schedulers to efficiently track which contexts are +ready for execution without polling. Implement this interface and register it +with `context::on_unblock()` to be notified when a context transitions to the +unblocked state. + +The `on_unblock()` method is called from within `context::unblock()`, which may +be invoked from ISRs, driver completion handlers, or other threads. +Implementations must be ISR-safe and noexcept. + ### `async::context` -The base context class that manages coroutine execution and memory. Derived classes must: +The base context class that manages coroutine execution and memory. Contexts +are initialized with stack memory via their constructor: -1. Provide stack memory via `initialize_stack_memory()`, preferably within the - constructor. -2. Implement `do_schedule()` to handle blocking state notifications +```cpp +std::array my_stack{}; +async::context ctx(my_stack); +``` + +> [!CRITICAL] +> The stack memory MUST outlive the context object. The context does not own or +> copy the stack memory—it only stores a reference to it. + +Optionally, contexts can register an `unblock_listener` to be notified of state +changes, or the scheduler can poll the context state directly using `state()` +and `pending_delay()` ### `async::future` @@ -322,49 +343,10 @@ async::future outer(async::context& p_ctx) { } ``` -### Custom Context Implementation - -```cpp -class my_context : public async::context { -public: - std::array m_stack{}; - - my_context() { - initialize_stack_memory(m_stack); - } - ~my_context() { - // ‼️ The most derived context must call cancel in its destructor - cancel(); - // If memory was allocated, deallocate it here... - } - -private: - void do_schedule(async::blocked_by p_state, - async::block_info p_info) noexcept override { - // Notify your scheduler of state changes - } -}; -``` - -#### Initialization - -In order to create a usable custom context, the stack memory must be -initialized with a call to `initialize_stack_memory(span)` with a span to the -memory for the stack. There is no requirements of where this memory comes from -except that it be a valid source. Such sources can be array thats member of -this object, dynamically allocated memory that the context has sole ownership -of, or it can point to statically allocated memory that it has sole control and -ownership over. - -#### Destruction - -The custom context must call `cancel()` before deallocating the stack memory. -Once cancel completes, the stack memory may be deallocated. - -### Using `async::basic_context` with `sync_wait()` +### Using `sync_wait()` ```cpp -async::basic_context<512> ctx; +async::inplace_context<512> ctx; auto future = my_coroutine(ctx); ctx.sync_wait([](async::sleep_duration p_sleep_time) { std::this_thread::sleep_for(p_sleep_time); @@ -377,14 +359,14 @@ function works best for your systems. For example, for FreeRTOS this could be: ```C++ -// Helper function to convert std::chrono::nanoseconds to FreeRTOS ticks -inline TickType_t ns_to_ticks(const std::chrono::nanoseconds& ns) { - // Convert nanoseconds to milliseconds (rounding to nearest ms) - const auto ms = std::chrono::duration_cast(ns).count(); +// Helper function to convert microseconds to FreeRTOS ticks +inline TickType_t us_to_ticks(const std::chrono::microseconds& us) { + // Convert microseconds to milliseconds (rounding to nearest ms) + const auto ms = std::chrono::duration_cast(us).count(); return pdMS_TO_TICKS(ms); } ctx.sync_wait([](async::sleep_duration p_sleep_time) { - xTaskDelay(ns_to_ticks(p_sleep_time)); + xTaskDelay(us_to_ticks(p_sleep_time)); }); ``` @@ -577,7 +559,6 @@ To run the benchmarks on their own: ./build/Release/async_benchmark ``` - Within the [`CMakeList.txt`](./CMakeLists.txt), you can disable unit test or benchmarking by setting the following to `OFF`: ```cmake diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index cc9504d..2c8900e 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -271,25 +271,10 @@ __attribute__((noinline)) async::future sync_future_level1( auto f = sync_future_level2(ctx, x); return sync_wait(f) + 1; } -struct benchmark_context : public async::context -{ - std::array m_stack{}; - - benchmark_context() - { - this->initialize_stack_memory(m_stack); - } - -private: - void do_schedule(async::blocked_by, async::block_info) noexcept override - { - // Do nothing for the benchmark - } -}; static void bm_future_sync_return(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -326,7 +311,7 @@ __attribute__((noinline)) async::future coro_level1(async::context& ctx, static void bm_future_coroutine(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -367,7 +352,7 @@ __attribute__((noinline)) async::future sync_in_coro_level1( static void bm_future_sync_await(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -408,7 +393,7 @@ __attribute__((noinline)) async::future mixed_coro_level1( static void bm_future_mixed(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -449,7 +434,7 @@ void_coro_level1(async::context& ctx, int& out, int x) static void bm_future_void_coroutine(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; int output = 0; @@ -464,7 +449,7 @@ BENCHMARK(bm_future_void_coroutine); static void bm_future_void_coroutine_context_resume(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; int output = 0; diff --git a/cspell.json b/cspell.json index 4368289..f01af9c 100644 --- a/cspell.json +++ b/cspell.json @@ -51,6 +51,7 @@ "doxygenfunction", "doxygenenum", "alignof", + "inplace" ], "ignorePaths": [ "build/", diff --git a/docs/api/async_context.md b/docs/api/async_context.md index cea1a59..5d2c317 100644 --- a/docs/api/async_context.md +++ b/docs/api/async_context.md @@ -9,7 +9,7 @@ Defined in namespace `async::v0` ```{doxygenclass} v0::context ``` -## async::basic_context +## async::inplace_context Defined in namespace `async::v0` diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 962e369..4509acc 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -21,6 +21,7 @@ module; #include #include +#include #include #include #include @@ -258,10 +259,22 @@ export class operation_cancelled : public std::exception // ============================================================================= /** - * @brief The data type for sleep time duration + * @brief Sleep duration in microseconds * + * Uses microsecond granularity and u32 representation for an optimal balance: + * + * - **Granularity**: Microseconds provide sufficient precision for real-time + * scheduling. Nanoseconds are too fine-grained—most systems cannot achieve + * that level of accuracy. Milliseconds are too coarse; many embedded + * schedulers regularly achieve 50-100µs precision with minimal context + * switching overhead. + * + * - **Range**: A u32 microseconds can represent durations up to ~4,294 seconds + * (1 hour 11 minutes), which is a practical upper bound for async operation + * delays. Longer delays should use alternative mechanisms or be broken into + * smaller segments. */ -export using sleep_duration = std::chrono::nanoseconds; +export using sleep_duration = std::chrono::duration; /** * @brief Information about the block state when context::schedule is called @@ -272,6 +285,86 @@ export using block_info = class promise_base; +/** + * @brief Interface for receiving notifications when an async context is + * unblocked + * + * Implement this interface to receive notifications when a context transitions + * from a blocked state to `blocked_by::nothing`. This is the primary mechanism + * for schedulers to efficiently track which contexts become ready for execution + * without polling. + * + * The `on_unblock()` method is called from within `context::unblock()`, which + * may be invoked from an ISR, a driver completion handler, or another thread. + * Implementations MUST be ISR-safe and noexcept. Avoid any operations that + * could block, allocate memory, or acquire non-ISR-safe locks within + * `on_unblock()`. + * + * Typical usage is through `context_handle`, which automatically registers and + * deregisters the listener on construction and destruction respectively. + * Direct registration is possible via `context::on_unblock()` but requires + * manual lifetime management — the listener MUST outlive the context it is + * registered with. + * + * Example implementation: + * @code + * class my_scheduler : public async::unblock_listener { + * private: + * void on_unblock(async::context& p_context) noexcept override { + * m_ready_queue.push(&p_context); + * } + * // ... + * }; + * @endcode + */ +export struct unblock_listener +{ +public: + template + static auto from(Callable&& p_handler) + { + struct lambda_unblock_listener : public unblock_listener + { + Callable handler; + + lambda_unblock_listener(Callable&& p_handler) + : handler(std::move(p_handler)) + { + } + + private: + void on_unblock(async::context const& p_context) noexcept override + { + handler(p_context); + } + }; + + return lambda_unblock_listener{ std::forward(p_handler) }; + } + + virtual ~unblock_listener() = default; + +private: + friend class context; + + /** + * @brief Called when a context transitions to the unblocked state + * + * This method is invoked by `context::unblock()` immediately after the + * context's state is set to `blocked_by::nothing`. It signals to the + * implementing scheduler that the context is now ready to be resumed. + * + * @param p_context The context that has just been unblocked. The context's + * state will be `blocked_by::nothing` at the time of this call. The + * implementor may read any state from the context but MUST NOT resume or + * destroy it within this call. + * + * @note This method MUST be noexcept and ISR-safe. It may be called from + * any execution context including interrupt handlers. + */ + virtual void on_unblock(context const& p_context) noexcept = 0; +}; + /** * @brief The base context class for managing coroutine execution * @@ -301,11 +394,21 @@ public: /** * @brief Default constructor for context * - * Creates an uninitialized context. Derived classes must call - * initialize_stack_memory() to properly set up the stack memory. + * Creates context without a stack. `initialize_stack_memory()` must be called + * before passing this context to a coroutine. */ context() = default; + /** + * @brief Construct a new context object with stack memory. + * + * @param p_stack - the stack memory for the async operations + */ + context(std::span p_stack) + { + initialize_stack_memory(p_stack); + } + /** * @brief Delete copy constructor * @@ -321,18 +424,52 @@ public: context& operator=(context const&) = delete; /** - * @brief Delete move constructor + * @brief Move constructor * - * Contexts cannot be moved as they manage unique stack memory. + * Transfers ownership of stack and state from the source context. The + * moved-from context is reset to its default state (no stack, no active + * operations). + * + * @param p_other The context to move from (will be reset to default state) */ - context(context&&) = delete; + context(context&& p_other) noexcept + : m_active_handle(std::exchange(p_other.m_active_handle, noop_sentinel)) + , m_stack_pointer(std::exchange(p_other.m_stack_pointer, nullptr)) + , m_stack(std::exchange(p_other.m_stack, {})) + , m_original(std::exchange(p_other.m_original, nullptr)) + , m_listener(std::exchange(p_other.m_listener, nullptr)) + , m_sleep_time(std::exchange(p_other.m_sleep_time, sleep_duration::zero())) + , m_sync_blocker(std::exchange(p_other.m_sync_blocker, nullptr)) + , m_state(std::exchange(p_other.m_state, blocked_by::nothing)) + { + } /** - * @brief Delete move assignment operator + * @brief Move assignment operator * - * Contexts cannot be moved as they manage unique stack memory. + * Transfers ownership of stack and state from the source context. The + * current context is cancelled before assignment, and the moved-from context + * is reset to its default state. + * + * @param p_other The context to move from (will be reset to default state) + * @return Reference to this context */ - context& operator=(context&&) = delete; + context& operator=(context&& p_other) noexcept + { + if (this != &p_other) { + cancel(); + m_active_handle = std::exchange(p_other.m_active_handle, noop_sentinel); + m_stack_pointer = std::exchange(p_other.m_stack_pointer, nullptr); + m_stack = std::exchange(p_other.m_stack, {}); + m_original = std::exchange(p_other.m_original, nullptr); + m_listener = std::exchange(p_other.m_listener, nullptr); + m_sleep_time = + std::exchange(p_other.m_sleep_time, sleep_duration::zero()); + m_sync_blocker = std::exchange(p_other.m_sync_blocker, nullptr); + m_state = std::exchange(p_other.m_state, blocked_by::nothing); + } + return *this; + } /** * @brief Initialize stack memory for the context @@ -356,33 +493,40 @@ public: } /** - * @brief Unblocks a coroutine that was previously blocked + * @brief Unblocks the context without invoking the unblock listener * - * This method transitions the context from any blocking state to "nothing" - * (ready to run) state. It's typically called by I/O completion handlers or - * when external conditions that were blocking a coroutine have been resolved. - * - * @note This method is noexcept and should not throw exceptions. - * It's used internally by I/O completion handlers to signal that a blocked - * coroutine can now proceed. + * This method transitions the context to "nothing" (ready to run). */ - constexpr void unblock() noexcept + constexpr void unblock_without_notification() noexcept { - transition_to(blocked_by::nothing); + // We clear this information after the unblock call to allow the unblock + // call to inspect the context's current state. + get_original().m_state = blocked_by::nothing; + get_original().m_sleep_time = sleep_duration::zero(); + get_original().m_sync_blocker = nullptr; } /** - * @brief Unblocks a coroutine without notifying the scheduler + * @brief Unblocks the context and clears blocking state + * + * The state of the context after this is called: * - * This method transitions the context to "nothing" (ready to run) state but - * does not call the scheduler's do_schedule method. This is useful in cases - * where the scheduler state is being managed externally or during cleanup. + * 1. Block state becomes (block_by::nothing) + * 2. sleep time is set to 0us. + * 3. sync blocker is set to nullptr. * - * @note This method is noexcept and should not throw exceptions. + * The unblock listener is called before clearing the context state in the + * event that the unblock listener wants to inspect information from the + * context. + * + * @note this API is safe to call within an interrupt service routine. */ - constexpr void unblock_without_notification() noexcept + constexpr void unblock() noexcept { - m_state = blocked_by::nothing; + if (get_original().m_listener) { + get_original().m_listener->on_unblock(*this); + } + unblock_without_notification(); } /** @@ -395,10 +539,13 @@ public: * @param p_duration The time duration to block for * @return std::suspend_always to suspend the coroutine until resumed */ + template constexpr std::suspend_always block_by_time( - sleep_duration p_duration) noexcept + std::chrono::duration p_duration) noexcept { - transition_to(blocked_by::time, p_duration); + get_original().m_state = blocked_by::time; + get_original().m_sleep_time = + std::chrono::duration_cast(p_duration); return {}; } @@ -415,7 +562,8 @@ public: constexpr std::suspend_always block_by_io( sleep_duration p_duration = default_timeout) noexcept { - transition_to(blocked_by::io, p_duration); + get_original().m_state = blocked_by::io; + get_original().m_sleep_time = p_duration; return {}; } @@ -434,7 +582,8 @@ public: */ constexpr std::suspend_always block_by_sync(context* p_blocker) noexcept { - transition_to(blocked_by::sync, p_blocker); + get_original().m_state = blocked_by::sync; + get_original().m_sync_blocker = p_blocker; return {}; } @@ -450,7 +599,7 @@ public: */ constexpr std::suspend_always block_by_external() noexcept { - transition_to(blocked_by::external, std::monostate{}); + get_original().m_state = blocked_by::external; return {}; } @@ -481,7 +630,7 @@ public: */ [[nodiscard]] constexpr auto state() const noexcept { - return m_state; + return get_original().m_state; } /** @@ -494,7 +643,7 @@ public: * * @return true if the context is done, false otherwise */ - constexpr bool done() + [[nodiscard]] constexpr bool done() const { return m_active_handle == noop_sentinel; } @@ -517,11 +666,42 @@ public: { // We cannot resume the a coroutine blocked by time. // Only the scheduler can unblock a context state. - if (m_state != blocked_by::time) { + if (state() != blocked_by::time) { m_active_handle.resume(); } } + /** + * @brief Perform sync_wait operation + * + * This method waits synchronously for all coroutines on this context to + * complete. It uses the provided delay function to sleep for the required + * duration when waiting for time-based operations. + * + * @tparam DelayFunc The type of the delay function (must be invocable with + * sleep_duration parameter) + * @param p_delay - a delay function, that accepts a sleep duration and + * returns void. + * + * @note This method is primarily intended for testing and simple applications + * where a synchronous wait is needed. It's not suitable for production + * embedded systems that require precise timing or real-time scheduling. + */ + void sync_wait(std::invocable auto&& p_delay) + { + while (not done()) { + resume(); + + if (state() == blocked_by::time) { + if (auto delay_time = sleep_time(); + delay_time > sleep_duration::zero()) { + p_delay(delay_time); + } + unblock_without_notification(); + } + } + } + /** * @brief Get the amount of stack memory used by active coroutines * @@ -561,26 +741,88 @@ public: } /** - * @brief Virtual destructor for proper cleanup of derived classes + * @brief Amount of time to delay resuming this context + * + * When a context is blocked by for a set duration of time, it is the + * responsibility of the scheduler to ensure that the context is not resumed + * until that duration of time has elapsed. In the event that the context is + * not blocked by time, then this returns 0. * - * This virtual destructor ensures that derived context classes are properly - * cleaned up when deleted through a base class pointer. + * Calling this function multiple times returns the last sleep duration that + * was set by the async operation contained within the context. It is the + * responsibility of the scheduler to unblock this context, otherwise, calling + * resume() will immediately without resuming async operation. + * + * @return constexpr sleep_duration - the amount of time to delay resuming + * this context. */ - virtual ~context() = default; + [[nodiscard]] constexpr sleep_duration sleep_time() const noexcept + { + return get_original().m_sleep_time; + } -private: - friend class promise_base; - template - friend class promise; + /** + * @brief Sets the unblock listener + * + * There can only be a single unblock listener per context, thus any + * previously set unblock listener will be removed. + * + * Because this API takes the address of the listener, it is important that + * the context outlive the listener. + * + * `remove_unblock_handler()` must be called before the end of the lifetime of + * the `p_listener` object. It is undefined behavior to allow a listener to be + * destroyed before it is removed from this context. + * + * @param p_listener - the address of the unblock listener to be invoked when + * this context is unblocked. + */ + void on_unblock(unblock_listener* p_listener) + { + get_original().m_listener = p_listener; + } + + /** + * @brief Clears the on_unblock listener from this context + * + * After this is called, any call to `unblock()` will not invoke an unblock + * listener. + * + * It is the responsibility of the application to clear the unblock listener + * is cleared, before the end of the lifetime of the object that was passed to + * on_unblock(). + */ + void clear_unblock_listener() noexcept + { + get_original().m_listener = nullptr; + } /** - * @brief Internal structure to track proxy context information + * @brief Get the address of the context currently blocking this one blocking + * + * If this context's state is `blocked_by::sync` then there is a context that + * currently holds a resource that this context needs. That context's address + * can be acquired by this API. + * + * @returns context const* - returns a const pointer to the other context that + * is blocking this context. If no such context exists, then a nullptr is + * returned. */ - struct proxy_info + [[nodiscard]] context const* get_blocker() const { - context* original = nullptr; - context* parent = nullptr; - }; + return get_original().m_sync_blocker; + } + + ~context() + { + cancel(); + } + +private: + template + friend class promise; + friend class promise_base; + friend class proxy_context; /** * @brief Check if this is a proxy context @@ -592,35 +834,55 @@ private: */ [[nodiscard]] constexpr bool is_proxy() const noexcept { - return m_proxy.parent == nullptr; + return m_original != nullptr; } /** - * @brief Set the active coroutine handle for this context + * @brief Get a reference to the original context * - * This method sets the coroutine that is currently running on this context. + * If this object is a proxy, then the original context will be pulled from + * the proxy information and if its not, then a reference to `this` is + * returned. * - * @param p_active_handle The coroutine handle to set as active + * @return context& - reference to the original context */ - constexpr void active_handle(std::coroutine_handle<> p_active_handle) + [[nodiscard]] constexpr context const& get_original() const noexcept { - m_active_handle = p_active_handle; + if (is_proxy()) { + return *m_original; + } else { + return *this; + } + } + + /** + * @brief Get a reference to the original context + * + * If this object is a proxy, then the original context will be pulled from + * the proxy information and if its not, then a reference to `this` is + * returned. + * + * @return context& - reference to the original context + */ + [[nodiscard]] constexpr context& get_original() noexcept + { + if (is_proxy()) { + return *m_original; + } else { + return *this; + } } /** - * @brief Transition the context to a new blocking state + * @brief Set the active coroutine handle for this context * - * This internal method transitions the context to a new blocking state and - * notifies the scheduler via do_schedule(). + * This method sets the coroutine that is currently running on this context. * - * @param p_new_state The new blocking state to transition to - * @param p_info Additional information about the blocking condition + * @param p_active_handle The coroutine handle to set as active */ - constexpr void transition_to(blocked_by p_new_state, - block_info p_info = std::monostate{}) noexcept + constexpr void active_handle(std::coroutine_handle<> p_active_handle) { - m_state = p_new_state; - schedule(p_new_state, p_info); + m_active_handle = p_active_handle; } /** @@ -662,60 +924,17 @@ private: return coroutine_frame_stack_address; } - /** - * @brief Wrapper around call to do_schedule - * - * This wrapper exists to allow future extensibility - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void schedule(blocked_by p_block_state, block_info p_block_info) noexcept - { - return do_schedule(p_block_state, p_block_info); - } - - /** - * @brief Implementations of context use this to notify their scheduler of - * changes to this async context. - * - * It is up to the scheduler to ensure that concurrent calls to this API are - * serialized appropriately. For a single threaded event loop, syncronization - * and serialization is not necessary. For a thread pool implementation, - * syncronization and serialization must be considered. - * - * @param p_block_state - the type of blocking event the context has - * occurred. - * @param p_block_info - Information about what exactly is blocking this - * context. If p_block_info is a sleep_duration, and the p_block_state is - * blocked_by::time, then this context is requesting to be scheduled at that - * or a later time. If the p_block_info is a sleep_duration, and the block - * state isn't blocked_by::time, then this sleep duration is a hint to the - * scheduler to when it would be appropriate to reschedule this context. The - * scheduler does not have to be abided by this. If p_block_info is a pointer - * to a context, then the pointed to context is currently blocking p_context. - * This can be used to determine when to schedule p_context again, but does - * not have to be abided by for proper function. - */ - virtual void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept = 0; - friend class proxy_context; - - /* vtable ptr */ // word 1 - std::coroutine_handle<> m_active_handle = noop_sentinel; // word 2 + std::coroutine_handle<> m_active_handle = noop_sentinel; // word 1 + uptr* m_stack_pointer = nullptr; // word 2 std::span m_stack{}; // word 3-4 - uptr* m_stack_pointer = nullptr; // word 5 - blocked_by m_state = blocked_by::nothing; // word 6 - proxy_info m_proxy{}; // word 7-8 + context* m_original = nullptr; // word 5 + // ----------- Only available from the original ----------- + unblock_listener* m_listener = nullptr; // word 6 + sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 + context* m_sync_blocker = nullptr; // word 8 + blocked_by m_state = blocked_by::nothing; // word 9: pad 3 }; -// Context should stay close to a standard cache-line of 64 bytes (8 words) for -// a 64-bit system. This compile time check ensures that the context does not -// exceed the this boundary for the platform. -static_assert(sizeof(context) <= std::hardware_constructive_interference_size, - "Context cannot be contained within a cache-line (as specified " - "by std::hardware_constructive_interference_size)"); - /** * @brief A proxy context that provides isolated stack space for supervised * coroutines @@ -731,6 +950,21 @@ static_assert(sizeof(context) <= std::hardware_constructive_interference_size, export class proxy_context : public context { public: + /** + * @brief Create a proxy context from an existing parent context + * + * This static method creates a new proxy context that uses a portion of the + * parent context's stack memory. The proxy takes control over the remaining + * stack space, effectively creating an isolated sub-context. + * + * @param p_parent The parent context to create a proxy from + * @return A new proxy_context instance + */ + static proxy_context from(context& p_parent) + { + return { p_parent }; + } + /** * @brief Delete copy constructor * @@ -759,20 +993,8 @@ public: */ proxy_context& operator=(proxy_context&&) = delete; - /** - * @brief Create a proxy context from an existing parent context - * - * This static method creates a new proxy context that uses a portion of the - * parent context's stack memory. The proxy takes control over the remaining - * stack space, effectively creating an isolated sub-context. - * - * @param p_parent The parent context to create a proxy from - * @return A new proxy_context instance - */ - static proxy_context from(context& p_parent) - { - return { p_parent }; - } + constexpr void initialize_stack_memory(std::span p_stack_memory) = + delete; /** * @brief Destructor for proxy_context @@ -780,15 +1002,14 @@ public: * The destructor cancels any remaining operations and properly restores * the parent context's stack memory to its original state. */ - ~proxy_context() override + ~proxy_context() { // Cancel any operations still on this context cancel(); // Restore parent stack, by setting its range to be the start of its // stack and the end of our stack. - m_proxy.parent->m_stack = { m_proxy.parent->m_stack.begin(), - m_stack.end() }; + m_parent->m_stack = { m_parent->m_stack.begin(), m_stack.end() }; } private: @@ -801,9 +1022,9 @@ private: * @param p_parent The parent context to create proxy from */ proxy_context(context& p_parent) + : m_parent(&p_parent) { m_active_handle = context::noop_sentinel; - m_proxy = {}; // We need to manually set: // 1. m_stack @@ -822,206 +1043,50 @@ private: // If this is a proxy, take its pointer to the origin if (p_parent.is_proxy()) { - m_proxy = proxy_info{ - .original = m_proxy.original, - .parent = &p_parent, - }; - } else { // Otherwise, the current parent is the origin. - m_proxy = proxy_info{ - .original = &p_parent, - .parent = &p_parent, - }; + m_original = p_parent.m_original; + } else { // Otherwise, the current parent is the original + m_original = &p_parent; } + m_parent = &p_parent; } - /** - * @brief Forwards the schedule call to the original context - * - * This method forwards scheduling notifications to the original context, - * ensuring that the parent context's scheduler is properly notified of - * state changes. - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept override - { - m_proxy.original->schedule(p_block_state, p_block_info); - } + context* m_parent; }; /** - * @brief A basic context implementation that supports synchronous waiting + * @brief A context with embedded inplace stack memory * - * The basic_context class provides a concrete implementation of the context - * interface that supports synchronous waiting operations. It extends the base - * context with functionality to wait for coroutines to complete using a simple - * synchronous loop. - * - * NOTE: This class does not provide stack memory - * - * This context is particularly useful for testing and simple applications where - * a scheduler isn't needed, as it provides a way to wait for all coroutines to - * complete without requiring external scheduling. - * - * @note basic_context is designed for simple use cases and testing, not - * production embedded systems where strict memory management is required. - */ -class basic_context_impl : public context -{ -public: - /** - * @brief Default constructor for basic_context_impl - * - * Creates a new basic context with default initialization. - */ - basic_context_impl() = default; - - /** - * @brief Virtual destructor for proper cleanup - * - * Ensures that the basic context is properly cleaned up when deleted. - */ - ~basic_context_impl() override = default; - - /** - * @brief Get the pending delay time for time-blocking operations - * - * This method returns the sleep duration that is currently pending for - * time-blocking operations. It's used by the sync_wait method to determine - * how long to sleep. - * - * @return The pending sleep duration - * - * @note This is an internal method used by the basic_context implementation - * to manage time-based blocking operations during synchronous waiting. - */ - [[nodiscard]] constexpr sleep_duration pending_delay() const noexcept - { - return m_pending_delay; - } - - /** - * @brief Perform sync_wait operation - * - * This method waits synchronously for all coroutines on this context to - * complete. It uses the provided delay function to sleep for the required - * duration when waiting for time-based operations. - * - * @tparam DelayFunc The type of the delay function (must be invocable with - * sleep_duration parameter) - * @param p_delay - a delay function, that accepts a sleep duration and - * returns void. - * - * @note This method is primarily intended for testing and simple applications - * where a synchronous wait is needed. It's not suitable for production - * embedded systems that require precise timing or real-time scheduling. - */ - void sync_wait(std::invocable auto&& p_delay) - { - while (active_handle() != context::noop_sentinel) { - active_handle().resume(); - - if (state() == blocked_by::time) { - if (m_pending_delay == sleep_duration(0)) { - unblock_without_notification(); - continue; - } - p_delay(m_pending_delay); - m_pending_delay = sleep_duration(0); - unblock_without_notification(); - } - } - } - -private: - /** - * @brief Forwards the schedule call to the original context - * - * This method handles scheduling notifications for time-blocking operations. - * It stores the pending delay duration so that sync_wait can properly wait - * for it. - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept final - { - if (p_block_state == blocked_by::time) { - if (auto* ex = std::get_if(&p_block_info)) { - m_pending_delay = *ex; - } else { - m_pending_delay = sleep_duration{ 0 }; - } - } - // Ignore the rest and poll them... - } - - /** - * @brief The pending delay for time-blocking operations - * - * This member stores the sleep duration that is currently pending for - * time-blocking operations, allowing sync_wait to properly handle delays. - */ - sleep_duration m_pending_delay{ 0 }; -}; - -/** - * @brief A basic context implementation that supports synchronous waiting - * - * The basic_context class provides a concrete implementation of the context - * interface that supports synchronous waiting operations. It extends the base - * context with functionality to wait for coroutines to complete using a simple - * synchronous loop. - * - * This class provides stack memory via its `StackSizeInWords` template - * variable. + * The inplace_context class provides a concrete implementation of the context + * with stack memory embedded directly in the object. Otherwise, it works + * identically to the base context class. * * @tparam StackSizeInWords - the number of words to allocate for the context's * stack memory. Word size is 4 bytes for 32-bit systems and 8 bytes on 64-bit * systems. */ export template -class basic_context +class inplace_context : public context { public: static_assert(StackSizeInWords > 0UL, "Stack memory must be greater than 0 words."); - basic_context() - { - m_context.initialize_stack_memory(m_stack); - } - - ~basic_context() - { - m_context.cancel(); - } - - context& context() - { - return m_context; - } - - operator class context&() + inplace_context() + : context(m_stack) { - return m_context; } - auto* operator->() - { - return &m_context; - } + inplace_context(inplace_context const&) = delete; + inplace_context& operator=(inplace_context const&) = delete; + inplace_context(inplace_context&&) = default; + inplace_context& operator=(inplace_context&&) = default; - auto& operator*() + ~inplace_context() { - return m_context; + cancel(); } private: - basic_context_impl m_context; std::array m_stack{}; }; @@ -1832,7 +1897,7 @@ public: auto handle = std::get(m_state); full_handle_type::from_address(handle.address()) .promise() - .set_object_address(&m_state); + .m_future_state = &m_state; } } @@ -1856,7 +1921,7 @@ public: auto handle = std::get(m_state); full_handle_type::from_address(handle.address()) .promise() - .set_object_address(&m_state); + .m_future_state = &m_state; } } return *this; diff --git a/test_package/main.cpp b/test_package/main.cpp index 0e4ef12..2b2ccdb 100644 --- a/test_package/main.cpp +++ b/test_package/main.cpp @@ -26,71 +26,6 @@ import async_context; -struct round_robin_scheduler -{ - bool run_all_until_done(int p_iterations = 100) - { - for (int i = 0; i < p_iterations; i++) { - bool all_done = true; - for (auto const& ctx : contexts) { - if (not ctx->done()) { - all_done = false; - if (ctx->state() == async::blocked_by::nothing) { - ctx->resume(); - } - } - } - if (all_done) { - return true; - } - } - return false; - } - std::vector contexts{}; -}; - -struct test_context : public async::context -{ - std::array m_stack{}; - // NOTE: the scheduler isn't used in this example, but if it were used, it - // would be called within the do_schedule function. For example, rather than - // performing a sleep that blocks the whole thread, the context and its time - // duration could be passed to an API on the "round_robin_scheduler", that - // makes it aware of the time schedule requirements. - std::shared_ptr scheduler; - - test_context(test_context const&) = delete; - test_context& operator=(test_context const&) = delete; - test_context(test_context&&) = delete; - test_context& operator=(test_context&&) = delete; - - test_context(std::shared_ptr const& p_scheduler) - : scheduler(p_scheduler) - { - scheduler->contexts.push_back(this); - this->initialize_stack_memory(m_stack); - } - -private: - void do_schedule( - async::blocked_by p_blocked_state, - [[maybe_unused]] async::block_info p_block_info) noexcept override - { - // Simulate I/O completion - immediately unblock - if (p_blocked_state == async::blocked_by::io) { - this->unblock_without_notification(); - } else if (p_blocked_state == async::blocked_by::time) { -#if not __ARM_EABI__ - if (auto* time = std::get_if(&p_block_info)) { - // Just block this thread vs doing something smart - std::this_thread::sleep_for(*time); - } -#endif - this->unblock_without_notification(); - } - } -}; - // Simulates reading sensor data with I/O delay async::future read_sensor(async::context& ctx, std::string_view p_name) { @@ -137,22 +72,94 @@ async::future sensor_pipeline(async::context& ctx, std::println("Pipeline '{}' complete!\n", p_name); } -int main() +// Type-erased future wrapper for storing different future types +struct future_wrapper { - auto scheduler = std::make_shared(); + virtual void resume() = 0; + [[nodiscard]] virtual bool done() const = 0; + virtual ~future_wrapper() = default; +}; - // Create context and add them to the scheduler - test_context ctx1(scheduler); - test_context ctx2(scheduler); +template +class typed_future_wrapper : public future_wrapper +{ +public: + explicit typed_future_wrapper(async::future&& p_future) + : m_future(std::move(p_future)) + { + } - // Run two independent pipelines concurrently - auto pipeline1 = sensor_pipeline(ctx1, "🌟 System 1"); - auto pipeline2 = sensor_pipeline(ctx2, "🔥 System 2"); + void resume() override + { + m_future.resume(); + } - scheduler->run_all_until_done(); + [[nodiscard]] bool done() const override + { + return m_future.done(); + } + +private: + async::future m_future; +}; + +template +auto make_future_wrapper(async::future&& p_future) +{ + return std::make_unique>(std::move(p_future)); +} + +template +struct round_robin_scheduler +{ + bool resume_n(int p_iterations) + { + for (int i = 0; i < p_iterations; i++) { + bool all_done = true; + for (auto& ctx : std::span(m_context_list).first(m_context_size)) { + if (not ctx.done()) { + all_done = false; + if (ctx.state() == async::blocked_by::nothing) { + ctx.resume(); +#if not __ARM_EABI__ + std::this_thread::sleep_for(ctx.sleep_time()); +#endif + ctx.unblock(); // simulate quick unblocking of the context. + } + } + } + if (all_done) { + return true; + } + } + return false; + } + + template + void schedule_operation(AsyncOperation&& p_async_operation, + OperationArgs... args) + { + + auto future = p_async_operation(m_context_list[m_context_size], args...); + future_list[m_context_size] = make_future_wrapper(std::move(future)); + m_context_size++; + } + + std::array, ContextCount> + m_context_list{}; + std::array, ContextCount> future_list{}; + unsigned m_context_size = 0; +}; + +int main() +{ + round_robin_scheduler scheduler; + + // Run two independent pipelines concurrently + scheduler.schedule_operation(sensor_pipeline, "🌟 System 1"); + scheduler.schedule_operation(sensor_pipeline, "🔥 System 2"); - assert(pipeline1.done()); - assert(pipeline2.done()); + scheduler.resume_n(100); std::println("Both pipelines completed successfully!"); return 0; diff --git a/tests/basics.test.cpp b/tests/basics.test.cpp index 2446441..c555673 100644 --- a/tests/basics.test.cpp +++ b/tests/basics.test.cpp @@ -11,7 +11,7 @@ void basics() "sync return type void"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; unsigned step = 0; auto sync_coroutine = [&step](async::context&) -> async::future { @@ -24,7 +24,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % 1 == step); @@ -32,7 +31,7 @@ void basics() "sync return type int"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 5; @@ -47,7 +46,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -56,7 +54,7 @@ void basics() "sync return type std::string"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr auto expected_return_value = "Hello, World\n"; @@ -72,7 +70,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -81,7 +78,7 @@ void basics() "co_return"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 5; unsigned step = 0; @@ -95,7 +92,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -105,7 +101,6 @@ void basics() // Verify 2 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -114,7 +109,7 @@ void basics() "suspend then co_return"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -130,7 +125,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -140,7 +134,6 @@ void basics() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 1 == step); @@ -150,7 +143,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -159,7 +151,7 @@ void basics() "co_await coroutine"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -182,7 +174,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -192,7 +183,6 @@ void basics() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); @@ -202,7 +192,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -211,7 +200,7 @@ void basics() "co_await coroutine"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -234,7 +223,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -244,7 +232,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_total == future.value()); diff --git a/tests/basics_dep_inject.test.cpp b/tests/basics_dep_inject.test.cpp new file mode 100644 index 0000000..249b1be --- /dev/null +++ b/tests/basics_dep_inject.test.cpp @@ -0,0 +1,134 @@ +#include +#include + +#include +#include + +import async_context; + +void basics_dep_inject() +{ + using namespace boost::ut; + + "sync return type void"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + unsigned step = 0; + auto sync_coroutine = [&step](async::context&) -> async::future { + step = 1; + return {}; + }; + + // Exercise + auto future = sync_coroutine(ctx); + + // Verify + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % 1 == step); + }; + + "suspend then co_return"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto async_coroutine = + [&step](async::context& p_ctx) -> async::future { + step = 1; + while (step != 2) { + // external set to 2 + co_await p_ctx.block_by_io(); + } + step = 2; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and suspend coroutine + ctx.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 1 == step); + expect(ctx.state() == async::blocked_by::io); + + // Exercise 3: resume and co_return from coroutine + ctx.unblock(); + step = 2; + ctx.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 2 == step); + }; + + "Call handler"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto async_coroutine = + [&step](async::context& p_ctx) -> async::future { + step = 1; + co_await p_ctx.block_by_io(); + step = 2; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and suspend coroutine + ctx.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(async::blocked_by::io == ctx.state()); + // expect(that % async::block_by::io == *info); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3: resume and co_return from coroutine + ctx.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 2 == step); + }; +}; + +int main() +{ + basics_dep_inject(); +} diff --git a/tests/blocked_by.test.cpp b/tests/blocked_by.test.cpp index 40a9dcd..90080f7 100644 --- a/tests/blocked_by.test.cpp +++ b/tests/blocked_by.test.cpp @@ -13,7 +13,9 @@ void blocking_states() "co_await 10ms & co_await 50ms"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; + + std::println("ctx.capacity() = {}", ctx.capacity()); static constexpr int expected_return_value = 8748; unsigned step = 0; @@ -31,7 +33,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -41,31 +42,28 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % 1 == ctx.info->sleep_count); expect(that % not future.done()); - expect(that % 10ms == ctx.info->last_sleep_time); - + expect(that % 10ms == ctx.sleep_time()); expect(that % not future.has_value()); expect(that % 1 == step); // Exercise 3 + ctx.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx.memory_used()); - expect(that % 2 == ctx.info->sleep_count); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); - expect(that % 25ms == ctx.info->last_sleep_time); + expect(that % 25ms == ctx.sleep_time()); // Exercise 4 + ctx.unblock(); future.resume(); // Verify 4 expect(that % 0 == ctx.memory_used()); - expect(that % 2 == ctx.info->sleep_count); expect(that % future.done()); expect(that % future.has_value()); expect(that % 3 == step); @@ -74,7 +72,7 @@ void blocking_states() "context::block_by_io() "_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; unsigned step = 0; bool io_complete = false; @@ -98,7 +96,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % 0 == step); @@ -107,45 +104,43 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::io == ctx.state()); expect(that % not future.done()); expect(that % 1 == step); // Exercise 3: stay in loop and re-block on io + ctx.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::io == ctx.state()); expect(that % not future.done()); expect(that % 1 == step); // Exercise 4: unblock IO and resume to final suspend io_complete = true; + ctx.unblock(); future.resume(); // Verify 4 expect(that % 0 == ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::nothing == ctx.state()); expect(that % future.done()); expect(that % 2 == step); }; "blocked_by time, io, & sync"_test = []() { // Setup - auto info = std::make_shared(); - test_context ctx1(info); - test_context ctx2(info); + async::inplace_context<1024> ctx1{}; + async::inplace_context<1024> ctx2{}; int step = 0; auto co = [&](async::context& p_context) -> async::future { using namespace std::chrono_literals; step = 1; - co_await 100ns; + co_await 100us; step = 2; co_await p_context.block_by_io(); step = 3; @@ -160,7 +155,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % not ctx1.info->scheduled_called_once); expect(that % not future.done()); expect(that % 0 == step); @@ -170,20 +164,20 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % ctx1.info->scheduled_called_once); - expect(that % 100ns == ctx1.info->last_sleep_time); - expect(that % not ctx1.info->io_block); + expect(that % 100us == ctx1.sleep_time()); + expect(that % ctx1.state() == async::blocked_by::time); expect(that % not future.done()); expect(that % 1 == step); // Exercise 3 + ctx1.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % ctx1.info->scheduled_called_once); - expect(that % ctx1.info->io_block) << "context should be blocked by IO"; + expect(that % ctx1.state() == async::blocked_by::io) + << "context should be blocked by IO"; expect(that % not future.done()); expect(that % 2 == step); @@ -195,7 +189,7 @@ void blocking_states() expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); expect(that % not future.done()); - expect(that % &ctx2 == ctx1.info->sync_context) + expect(that % &ctx2 == ctx1.get_blocker()) << "sync context should be &ctx2"; expect(that % 3 == step); diff --git a/tests/cancel.test.cpp b/tests/cancel.test.cpp index cd133e4..7abd461 100644 --- a/tests/cancel.test.cpp +++ b/tests/cancel.test.cpp @@ -13,7 +13,7 @@ void cancellation_tests() "Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -68,7 +68,7 @@ void cancellation_tests() "Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -128,7 +128,7 @@ void cancellation_tests() "Context Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -191,7 +191,7 @@ void cancellation_tests() "Exception Propagation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; diff --git a/tests/exclusive_access.test.cpp b/tests/exclusive_access.test.cpp index 819342d..a263af4 100644 --- a/tests/exclusive_access.test.cpp +++ b/tests/exclusive_access.test.cpp @@ -12,8 +12,8 @@ void guards_tests() using namespace std::chrono_literals; "Context Token"_test = []() { // Setup - test_context ctx1; - test_context ctx2; + async::inplace_context<1024> ctx1; + async::inplace_context<1024> ctx2; async::exclusive_access io_in_use; diff --git a/tests/on_unblock.test.cpp b/tests/on_unblock.test.cpp new file mode 100644 index 0000000..90322c1 --- /dev/null +++ b/tests/on_unblock.test.cpp @@ -0,0 +1,131 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +void on_upload_test() +{ + using namespace boost::ut; + using namespace std::chrono_literals; + + "on_upload() via lambda"_test = []() { + // Setup + async::inplace_context<1024> ctx; + bool unblock_called = false; + async::context const* unblocked_context = nullptr; + auto upload_handler = + async::unblock_listener::from([&](async::context const& p_context) { + unblock_called = true; + unblocked_context = &p_context; + }); + ctx.on_unblock(&upload_handler); + + unsigned step = 0; + auto co = [&step](async::context&) -> async::future { + step = 1; + co_await 10ms; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % unblock_called == false); + expect(that % unblocked_context == nullptr); + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % 10ms == ctx.sleep_time()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3 + ctx.unblock(); + future.resume(); + + // Verify 3 + expect(that % unblock_called == true); + expect(that % unblocked_context == &ctx); + expect(that % unblock_called == true); + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % 1 == step); + + ctx.clear_unblock_listener(); + }; + + "on_upload() via inheritance"_test = []() { + // Setup + async::inplace_context<1024> ctx; + struct un_blocker : public async::unblock_listener + { + bool unblock_called = false; + async::context const* unblocked_context = nullptr; + + void on_unblock(async::context const& p_context) noexcept override + { + unblock_called = true; + unblocked_context = &p_context; + } + }; + + un_blocker ub; + ctx.on_unblock(&ub); + + unsigned step = 0; + auto co = [&step](async::context&) -> async::future { + step = 1; + co_await 10ms; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % ub.unblock_called == false); + expect(that % ub.unblocked_context == nullptr); + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % 10ms == ctx.sleep_time()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3 + ctx.unblock(); + future.resume(); + + // Verify 3 + expect(that % ub.unblock_called == true); + expect(that % ub.unblocked_context == &ctx); + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % 1 == step); + + ctx.clear_unblock_listener(); + }; +}; + +int main() +{ + on_upload_test(); +} diff --git a/tests/proxy.test.cpp b/tests/proxy.test.cpp index fca99ae..c68ee01 100644 --- a/tests/proxy.test.cpp +++ b/tests/proxy.test.cpp @@ -13,7 +13,9 @@ void proxy_tests() "Proxy Context (normal behavior, no timeout)"_test = []() { // Setup - test_context ctx; + std::array stack{}; + async::context ctx; + ctx.initialize_stack_memory(stack); std::println("===================================="); std::println("Running Proxy Context Test (no timeout normal behavior)"); std::println("===================================="); @@ -89,7 +91,7 @@ void proxy_tests() "Proxy Coroutines Timeout"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; std::println("===================================="); std::println("Running Proxy Context Test (with timeout)"); std::println("===================================="); diff --git a/tests/simple_scheduler.test.cpp b/tests/simple_scheduler.test.cpp new file mode 100644 index 0000000..84b1839 --- /dev/null +++ b/tests/simple_scheduler.test.cpp @@ -0,0 +1,152 @@ +#include + +#include +#include +#include +#include +#include +#include + +#if not __ARM_EABI__ +#include +#endif + +import async_context; + +// Simulates reading sensor data with I/O delay +async::future read_sensor(async::context& ctx, std::string_view p_name) +{ + using namespace std::chrono_literals; + std::println("['{}': Sensor] Starting read...", p_name); + co_await ctx.block_by_io(); // Simulate I/O operation + std::println("['{}': Sensor] Read complete: 42", p_name); + co_return 42; +} + +// Processes data with computation delay +async::future process_data(async::context& ctx, + std::string_view p_name, + int value) +{ + using namespace std::chrono_literals; + std::println("['{}': Process] Processing {}...", p_name, value); + co_await 10ms; // Simulate processing time + int result = value * 2; + std::println("['{}': Process] Result: {}", p_name, result); + co_return result; +} + +// Writes result with I/O delay +async::future write_actuator(async::context& ctx, + std::string_view p_name, + int value) +{ + std::println("['{}': Actuator] Writing {}...", p_name, value); + co_await ctx.block_by_io(); + std::println("['{}': Actuator] Write complete!", p_name); +} + +// Coordinates the full pipeline +async::future sensor_pipeline(async::context& ctx, + std::string_view p_name) +{ + std::println("Pipeline '{}' starting...", p_name); + + int sensor_value = co_await read_sensor(ctx, p_name); + int processed = co_await process_data(ctx, p_name, sensor_value); + co_await write_actuator(ctx, p_name, processed); + + std::println("Pipeline '{}' complete!\n", p_name); +} + +// Type-erased future wrapper for storing different future types +struct future_wrapper +{ + virtual void resume() = 0; + [[nodiscard]] virtual bool done() const = 0; + virtual ~future_wrapper() = default; +}; + +template +class typed_future_wrapper : public future_wrapper +{ +public: + explicit typed_future_wrapper(async::future&& p_future) + : m_future(std::move(p_future)) + { + } + + void resume() override + { + m_future.resume(); + } + + [[nodiscard]] bool done() const override + { + return m_future.done(); + } + +private: + async::future m_future; +}; + +template +auto make_future_wrapper(async::future&& p_future) +{ + return std::make_unique>(std::move(p_future)); +} + +template +struct round_robin_scheduler +{ + bool resume_n(int p_iterations) + { + for (int i = 0; i < p_iterations; i++) { + bool all_done = true; + for (auto& ctx : std::span(m_context_list).first(m_context_size)) { + if (not ctx.done()) { + all_done = false; + if (ctx.state() == async::blocked_by::nothing) { + ctx.resume(); + std::this_thread::sleep_for(ctx.sleep_time()); + ctx.unblock(); // simulate quick unblocking of the context. + } + } + } + if (all_done) { + return true; + } + } + return false; + } + + template + void schedule_operation(AsyncOperation&& p_async_operation, + OperationArgs... args) + { + + auto future = p_async_operation(m_context_list[m_context_size], args...); + auto& f_wrap = future_list[m_context_size]; + f_wrap = make_future_wrapper(std::move(future)); + m_context_size++; + } + + std::array, ContextCount> + m_context_list{}; + std::array, ContextCount> future_list{}; + unsigned m_context_size = 0; +}; + +int main() +{ + round_robin_scheduler scheduler; + + // Run two independent pipelines concurrently + scheduler.schedule_operation(sensor_pipeline, "🌟 System 1"); + scheduler.schedule_operation(sensor_pipeline, "🔥 System 2"); + + scheduler.resume_n(100); + + std::println("Both pipelines completed successfully!"); + return 0; +} diff --git a/tests/basic_context.test.cpp b/tests/sync_wait.test.cpp similarity index 74% rename from tests/basic_context.test.cpp rename to tests/sync_wait.test.cpp index 8e59655..f1c6abd 100644 --- a/tests/basic_context.test.cpp +++ b/tests/sync_wait.test.cpp @@ -6,57 +6,35 @@ import async_context; import test_utils; -void basic_context() +void context() { using namespace boost::ut; static constexpr auto stack_size = 1024; - "sync return type void"_test = []() { - // Setup - async::basic_context ctx; - - unsigned step = 0; - auto sync_coroutine = [&step](async::context&) -> async::future { - step = 1; - return {}; - }; - - // Exercise - auto future = sync_coroutine(ctx); - - // Verify - expect(that % 0 == ctx->memory_used()); - expect(that % future.done()); - expect(that % future.has_value()); - expect(that % 1 == step); - - expect(that % stack_size == ctx->capacity()); - }; - "sync_wait --> future"_test = []() { // Setup - async::basic_context<1024> ctx; + async::inplace_context ctx; auto future = [](async::context&) -> async::future { co_return 5; }(ctx); - ctx->sync_wait([](async::sleep_duration p_sleep_duration) { + ctx.sync_wait([](async::sleep_duration p_sleep_duration) { std::this_thread::sleep_for(p_sleep_duration); }); // Verify - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % 5 == future.value()); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await coroutine"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -78,7 +56,7 @@ void basic_context() auto future = co(ctx); // Verify 1 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -87,7 +65,7 @@ void basic_context() future.resume(); // Verify 2 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); @@ -96,18 +74,18 @@ void basic_context() future.resume(); // Verify 3 - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); expect(that % 4 == step); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await coroutine"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -129,7 +107,7 @@ void basic_context() auto future = co(ctx); // Verify 1 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -138,18 +116,18 @@ void basic_context() future.resume(); // Verify 3 - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_total == future.value()); expect(that % 2 == step); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await Xms + sync_wait"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -175,12 +153,12 @@ void basic_context() // Exercise auto future = co(ctx); - ctx->sync_wait([&](async::sleep_duration p_sleep_time) { + ctx.sync_wait([&](async::sleep_duration p_sleep_time) { sleep_cycles.push_back(p_sleep_time); }); // Verify - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % 2 == step); @@ -188,11 +166,11 @@ void basic_context() expect(that % sleep_cycles == std::vector{ 44ms, 100ms, 50ms }); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; }; int main() { - basic_context(); + context(); } diff --git a/tests/util.cppm b/tests/util.cppm index d9952e4..91854fd 100644 --- a/tests/util.cppm +++ b/tests/util.cppm @@ -33,79 +33,6 @@ std::ostream& operator<<(std::ostream& out, blocked_by b) } // namespace async export { - struct thread_info - { - async::context* sync_context = nullptr; - int sleep_count = 0; - bool io_block = false; - async::sleep_duration last_sleep_time = {}; - bool scheduled_called_once = false; - }; - - struct test_context : public async::context - { - std::shared_ptr info; - std::array m_stack{}; - - test_context(std::shared_ptr const& p_info) - : info(p_info) - { - this->initialize_stack_memory(m_stack); - } - test_context() - : info(std::make_shared()) - { - this->initialize_stack_memory(m_stack); - } - - ~test_context() override - { - cancel(); - } - - private: - void do_schedule(async::blocked_by p_block_state, - async::block_info p_block_info) noexcept override - { - info->scheduled_called_once = true; - - switch (p_block_state) { - case async::blocked_by::time: { - if (std::holds_alternative(p_block_info)) { - auto const time = std::get(p_block_info); - info->sleep_count++; - info->last_sleep_time = time; - } - unblock_without_notification(); - break; - } - case async::blocked_by::sync: { - if (std::holds_alternative(p_block_info)) { - auto* context = std::get(p_block_info); - std::println( - "Coroutine ({}) is blocked by syncronization with coroutine ({})", - static_cast(this), - static_cast(context)); - info->sync_context = context; - } - break; - } - case async::blocked_by::io: { - info->io_block = true; - break; - } - case async::blocked_by::nothing: { - std::println("Context ({}) has been unblocked!", - static_cast(this)); - break; - } - default: { - break; - } - } - } - }; - struct sleep_counter { int count = 0; From 5a2f25d88ac8308cb2afb1b897898d017644ab8c Mon Sep 17 00:00:00 2001 From: Khalil Estell Date: Mon, 16 Mar 2026 03:01:43 +0000 Subject: [PATCH 2/2] Remove print from simple_scheduler to fix linux build --- tests/simple_scheduler.test.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/simple_scheduler.test.cpp b/tests/simple_scheduler.test.cpp index 84b1839..b5febc1 100644 --- a/tests/simple_scheduler.test.cpp +++ b/tests/simple_scheduler.test.cpp @@ -17,9 +17,7 @@ import async_context; async::future read_sensor(async::context& ctx, std::string_view p_name) { using namespace std::chrono_literals; - std::println("['{}': Sensor] Starting read...", p_name); co_await ctx.block_by_io(); // Simulate I/O operation - std::println("['{}': Sensor] Read complete: 42", p_name); co_return 42; } @@ -29,10 +27,8 @@ async::future process_data(async::context& ctx, int value) { using namespace std::chrono_literals; - std::println("['{}': Process] Processing {}...", p_name, value); co_await 10ms; // Simulate processing time int result = value * 2; - std::println("['{}': Process] Result: {}", p_name, result); co_return result; } @@ -41,22 +37,18 @@ async::future write_actuator(async::context& ctx, std::string_view p_name, int value) { - std::println("['{}': Actuator] Writing {}...", p_name, value); co_await ctx.block_by_io(); - std::println("['{}': Actuator] Write complete!", p_name); } // Coordinates the full pipeline async::future sensor_pipeline(async::context& ctx, std::string_view p_name) { - std::println("Pipeline '{}' starting...", p_name); int sensor_value = co_await read_sensor(ctx, p_name); int processed = co_await process_data(ctx, p_name, sensor_value); co_await write_actuator(ctx, p_name, processed); - std::println("Pipeline '{}' complete!\n", p_name); } // Type-erased future wrapper for storing different future types @@ -147,6 +139,5 @@ int main() scheduler.resume_n(100); - std::println("Both pipelines completed successfully!"); return 0; }