diff --git a/CMakeLists.txt b/CMakeLists.txt index 828b514..d586cbc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,12 +26,15 @@ libhal_apply_compile_options(async_context) libhal_install_library(async_context NAMESPACE libhal) libhal_add_tests(async_context TEST_NAMES - basic_context + sync_wait basics blocked_by cancel exclusive_access proxy + basics_dep_inject + on_unblock + simple_scheduler MODULES tests/util.cppm @@ -43,7 +46,6 @@ libhal_add_tests(async_context if(NOT CMAKE_CROSSCOMPILING) message(STATUS "Building benchmarks tests!") - find_package(benchmark REQUIRED) libhal_add_executable(benchmark SOURCES benchmarks/benchmark.cpp) target_link_libraries(benchmark PRIVATE async_context benchmark::benchmark) diff --git a/README.md b/README.md index 1d055ef..863a325 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,7 @@ int main() Output: -``` +```text Pipeline '🌟 System 1' starting... ['🌟 System 1': Sensor] Starting read... Pipeline '🔥 System 2' starting... @@ -142,9 +142,8 @@ Both pipelines completed successfully! ## Features - **Stack-based coroutine allocation** - No heap allocations; coroutine frames are allocated from a user-provided stack buffer -- **Cache-line optimized** - Context object fits within `std::hardware_constructive_interference_size` (typically 64 bytes) - **Blocking state tracking** - Built-in support for time, I/O, sync, and external blocking states -- **Scheduler integration** - Virtual `do_schedule()` method allows custom scheduler implementations +- **Flexible scheduler integration** - Schedulers can poll context state directly, or register an `unblock_listener` for ISR-safe event notification when contexts become unblocked - **Proxy contexts** - Support for supervised coroutines with timeout capabilities - **Exception propagation** - Proper exception handling through the coroutine chain - **Cancellation support** - Clean cancellation with RAII-based resource cleanup @@ -222,13 +221,35 @@ work. ## Core Types +### `async::unblock_listener` + +An interface for receiving notifications when a context becomes unblocked. This +is the primary mechanism for schedulers to efficiently track which contexts are +ready for execution without polling. Implement this interface and register it +with `context::on_unblock()` to be notified when a context transitions to the +unblocked state. + +The `on_unblock()` method is called from within `context::unblock()`, which may +be invoked from ISRs, driver completion handlers, or other threads. +Implementations must be ISR-safe and noexcept. + ### `async::context` -The base context class that manages coroutine execution and memory. Derived classes must: +The base context class that manages coroutine execution and memory. Contexts +are initialized with stack memory via their constructor: -1. Provide stack memory via `initialize_stack_memory()`, preferably within the - constructor. -2. Implement `do_schedule()` to handle blocking state notifications +```cpp +std::array my_stack{}; +async::context ctx(my_stack); +``` + +> [!CRITICAL] +> The stack memory MUST outlive the context object. The context does not own or +> copy the stack memory—it only stores a reference to it. + +Optionally, contexts can register an `unblock_listener` to be notified of state +changes, or the scheduler can poll the context state directly using `state()` +and `pending_delay()` ### `async::future` @@ -322,49 +343,10 @@ async::future outer(async::context& p_ctx) { } ``` -### Custom Context Implementation - -```cpp -class my_context : public async::context { -public: - std::array m_stack{}; - - my_context() { - initialize_stack_memory(m_stack); - } - ~my_context() { - // ‼️ The most derived context must call cancel in its destructor - cancel(); - // If memory was allocated, deallocate it here... - } - -private: - void do_schedule(async::blocked_by p_state, - async::block_info p_info) noexcept override { - // Notify your scheduler of state changes - } -}; -``` - -#### Initialization - -In order to create a usable custom context, the stack memory must be -initialized with a call to `initialize_stack_memory(span)` with a span to the -memory for the stack. There is no requirements of where this memory comes from -except that it be a valid source. Such sources can be array thats member of -this object, dynamically allocated memory that the context has sole ownership -of, or it can point to statically allocated memory that it has sole control and -ownership over. - -#### Destruction - -The custom context must call `cancel()` before deallocating the stack memory. -Once cancel completes, the stack memory may be deallocated. - -### Using `async::basic_context` with `sync_wait()` +### Using `sync_wait()` ```cpp -async::basic_context<512> ctx; +async::inplace_context<512> ctx; auto future = my_coroutine(ctx); ctx.sync_wait([](async::sleep_duration p_sleep_time) { std::this_thread::sleep_for(p_sleep_time); @@ -377,14 +359,14 @@ function works best for your systems. For example, for FreeRTOS this could be: ```C++ -// Helper function to convert std::chrono::nanoseconds to FreeRTOS ticks -inline TickType_t ns_to_ticks(const std::chrono::nanoseconds& ns) { - // Convert nanoseconds to milliseconds (rounding to nearest ms) - const auto ms = std::chrono::duration_cast(ns).count(); +// Helper function to convert microseconds to FreeRTOS ticks +inline TickType_t us_to_ticks(const std::chrono::microseconds& us) { + // Convert microseconds to milliseconds (rounding to nearest ms) + const auto ms = std::chrono::duration_cast(us).count(); return pdMS_TO_TICKS(ms); } ctx.sync_wait([](async::sleep_duration p_sleep_time) { - xTaskDelay(ns_to_ticks(p_sleep_time)); + xTaskDelay(us_to_ticks(p_sleep_time)); }); ``` @@ -577,7 +559,6 @@ To run the benchmarks on their own: ./build/Release/async_benchmark ``` - Within the [`CMakeList.txt`](./CMakeLists.txt), you can disable unit test or benchmarking by setting the following to `OFF`: ```cmake diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index cc9504d..2c8900e 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -271,25 +271,10 @@ __attribute__((noinline)) async::future sync_future_level1( auto f = sync_future_level2(ctx, x); return sync_wait(f) + 1; } -struct benchmark_context : public async::context -{ - std::array m_stack{}; - - benchmark_context() - { - this->initialize_stack_memory(m_stack); - } - -private: - void do_schedule(async::blocked_by, async::block_info) noexcept override - { - // Do nothing for the benchmark - } -}; static void bm_future_sync_return(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -326,7 +311,7 @@ __attribute__((noinline)) async::future coro_level1(async::context& ctx, static void bm_future_coroutine(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -367,7 +352,7 @@ __attribute__((noinline)) async::future sync_in_coro_level1( static void bm_future_sync_await(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -408,7 +393,7 @@ __attribute__((noinline)) async::future mixed_coro_level1( static void bm_future_mixed(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; for (auto _ : state) { @@ -449,7 +434,7 @@ void_coro_level1(async::context& ctx, int& out, int x) static void bm_future_void_coroutine(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; int output = 0; @@ -464,7 +449,7 @@ BENCHMARK(bm_future_void_coroutine); static void bm_future_void_coroutine_context_resume(benchmark::State& state) { - benchmark_context ctx; + async::inplace_context<1024> ctx; int input = 42; int output = 0; diff --git a/cspell.json b/cspell.json index 4368289..f01af9c 100644 --- a/cspell.json +++ b/cspell.json @@ -51,6 +51,7 @@ "doxygenfunction", "doxygenenum", "alignof", + "inplace" ], "ignorePaths": [ "build/", diff --git a/docs/api/async_context.md b/docs/api/async_context.md index cea1a59..5d2c317 100644 --- a/docs/api/async_context.md +++ b/docs/api/async_context.md @@ -9,7 +9,7 @@ Defined in namespace `async::v0` ```{doxygenclass} v0::context ``` -## async::basic_context +## async::inplace_context Defined in namespace `async::v0` diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 962e369..4509acc 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -21,6 +21,7 @@ module; #include #include +#include #include #include #include @@ -258,10 +259,22 @@ export class operation_cancelled : public std::exception // ============================================================================= /** - * @brief The data type for sleep time duration + * @brief Sleep duration in microseconds * + * Uses microsecond granularity and u32 representation for an optimal balance: + * + * - **Granularity**: Microseconds provide sufficient precision for real-time + * scheduling. Nanoseconds are too fine-grained—most systems cannot achieve + * that level of accuracy. Milliseconds are too coarse; many embedded + * schedulers regularly achieve 50-100µs precision with minimal context + * switching overhead. + * + * - **Range**: A u32 microseconds can represent durations up to ~4,294 seconds + * (1 hour 11 minutes), which is a practical upper bound for async operation + * delays. Longer delays should use alternative mechanisms or be broken into + * smaller segments. */ -export using sleep_duration = std::chrono::nanoseconds; +export using sleep_duration = std::chrono::duration; /** * @brief Information about the block state when context::schedule is called @@ -272,6 +285,86 @@ export using block_info = class promise_base; +/** + * @brief Interface for receiving notifications when an async context is + * unblocked + * + * Implement this interface to receive notifications when a context transitions + * from a blocked state to `blocked_by::nothing`. This is the primary mechanism + * for schedulers to efficiently track which contexts become ready for execution + * without polling. + * + * The `on_unblock()` method is called from within `context::unblock()`, which + * may be invoked from an ISR, a driver completion handler, or another thread. + * Implementations MUST be ISR-safe and noexcept. Avoid any operations that + * could block, allocate memory, or acquire non-ISR-safe locks within + * `on_unblock()`. + * + * Typical usage is through `context_handle`, which automatically registers and + * deregisters the listener on construction and destruction respectively. + * Direct registration is possible via `context::on_unblock()` but requires + * manual lifetime management — the listener MUST outlive the context it is + * registered with. + * + * Example implementation: + * @code + * class my_scheduler : public async::unblock_listener { + * private: + * void on_unblock(async::context& p_context) noexcept override { + * m_ready_queue.push(&p_context); + * } + * // ... + * }; + * @endcode + */ +export struct unblock_listener +{ +public: + template + static auto from(Callable&& p_handler) + { + struct lambda_unblock_listener : public unblock_listener + { + Callable handler; + + lambda_unblock_listener(Callable&& p_handler) + : handler(std::move(p_handler)) + { + } + + private: + void on_unblock(async::context const& p_context) noexcept override + { + handler(p_context); + } + }; + + return lambda_unblock_listener{ std::forward(p_handler) }; + } + + virtual ~unblock_listener() = default; + +private: + friend class context; + + /** + * @brief Called when a context transitions to the unblocked state + * + * This method is invoked by `context::unblock()` immediately after the + * context's state is set to `blocked_by::nothing`. It signals to the + * implementing scheduler that the context is now ready to be resumed. + * + * @param p_context The context that has just been unblocked. The context's + * state will be `blocked_by::nothing` at the time of this call. The + * implementor may read any state from the context but MUST NOT resume or + * destroy it within this call. + * + * @note This method MUST be noexcept and ISR-safe. It may be called from + * any execution context including interrupt handlers. + */ + virtual void on_unblock(context const& p_context) noexcept = 0; +}; + /** * @brief The base context class for managing coroutine execution * @@ -301,11 +394,21 @@ public: /** * @brief Default constructor for context * - * Creates an uninitialized context. Derived classes must call - * initialize_stack_memory() to properly set up the stack memory. + * Creates context without a stack. `initialize_stack_memory()` must be called + * before passing this context to a coroutine. */ context() = default; + /** + * @brief Construct a new context object with stack memory. + * + * @param p_stack - the stack memory for the async operations + */ + context(std::span p_stack) + { + initialize_stack_memory(p_stack); + } + /** * @brief Delete copy constructor * @@ -321,18 +424,52 @@ public: context& operator=(context const&) = delete; /** - * @brief Delete move constructor + * @brief Move constructor * - * Contexts cannot be moved as they manage unique stack memory. + * Transfers ownership of stack and state from the source context. The + * moved-from context is reset to its default state (no stack, no active + * operations). + * + * @param p_other The context to move from (will be reset to default state) */ - context(context&&) = delete; + context(context&& p_other) noexcept + : m_active_handle(std::exchange(p_other.m_active_handle, noop_sentinel)) + , m_stack_pointer(std::exchange(p_other.m_stack_pointer, nullptr)) + , m_stack(std::exchange(p_other.m_stack, {})) + , m_original(std::exchange(p_other.m_original, nullptr)) + , m_listener(std::exchange(p_other.m_listener, nullptr)) + , m_sleep_time(std::exchange(p_other.m_sleep_time, sleep_duration::zero())) + , m_sync_blocker(std::exchange(p_other.m_sync_blocker, nullptr)) + , m_state(std::exchange(p_other.m_state, blocked_by::nothing)) + { + } /** - * @brief Delete move assignment operator + * @brief Move assignment operator * - * Contexts cannot be moved as they manage unique stack memory. + * Transfers ownership of stack and state from the source context. The + * current context is cancelled before assignment, and the moved-from context + * is reset to its default state. + * + * @param p_other The context to move from (will be reset to default state) + * @return Reference to this context */ - context& operator=(context&&) = delete; + context& operator=(context&& p_other) noexcept + { + if (this != &p_other) { + cancel(); + m_active_handle = std::exchange(p_other.m_active_handle, noop_sentinel); + m_stack_pointer = std::exchange(p_other.m_stack_pointer, nullptr); + m_stack = std::exchange(p_other.m_stack, {}); + m_original = std::exchange(p_other.m_original, nullptr); + m_listener = std::exchange(p_other.m_listener, nullptr); + m_sleep_time = + std::exchange(p_other.m_sleep_time, sleep_duration::zero()); + m_sync_blocker = std::exchange(p_other.m_sync_blocker, nullptr); + m_state = std::exchange(p_other.m_state, blocked_by::nothing); + } + return *this; + } /** * @brief Initialize stack memory for the context @@ -356,33 +493,40 @@ public: } /** - * @brief Unblocks a coroutine that was previously blocked + * @brief Unblocks the context without invoking the unblock listener * - * This method transitions the context from any blocking state to "nothing" - * (ready to run) state. It's typically called by I/O completion handlers or - * when external conditions that were blocking a coroutine have been resolved. - * - * @note This method is noexcept and should not throw exceptions. - * It's used internally by I/O completion handlers to signal that a blocked - * coroutine can now proceed. + * This method transitions the context to "nothing" (ready to run). */ - constexpr void unblock() noexcept + constexpr void unblock_without_notification() noexcept { - transition_to(blocked_by::nothing); + // We clear this information after the unblock call to allow the unblock + // call to inspect the context's current state. + get_original().m_state = blocked_by::nothing; + get_original().m_sleep_time = sleep_duration::zero(); + get_original().m_sync_blocker = nullptr; } /** - * @brief Unblocks a coroutine without notifying the scheduler + * @brief Unblocks the context and clears blocking state + * + * The state of the context after this is called: * - * This method transitions the context to "nothing" (ready to run) state but - * does not call the scheduler's do_schedule method. This is useful in cases - * where the scheduler state is being managed externally or during cleanup. + * 1. Block state becomes (block_by::nothing) + * 2. sleep time is set to 0us. + * 3. sync blocker is set to nullptr. * - * @note This method is noexcept and should not throw exceptions. + * The unblock listener is called before clearing the context state in the + * event that the unblock listener wants to inspect information from the + * context. + * + * @note this API is safe to call within an interrupt service routine. */ - constexpr void unblock_without_notification() noexcept + constexpr void unblock() noexcept { - m_state = blocked_by::nothing; + if (get_original().m_listener) { + get_original().m_listener->on_unblock(*this); + } + unblock_without_notification(); } /** @@ -395,10 +539,13 @@ public: * @param p_duration The time duration to block for * @return std::suspend_always to suspend the coroutine until resumed */ + template constexpr std::suspend_always block_by_time( - sleep_duration p_duration) noexcept + std::chrono::duration p_duration) noexcept { - transition_to(blocked_by::time, p_duration); + get_original().m_state = blocked_by::time; + get_original().m_sleep_time = + std::chrono::duration_cast(p_duration); return {}; } @@ -415,7 +562,8 @@ public: constexpr std::suspend_always block_by_io( sleep_duration p_duration = default_timeout) noexcept { - transition_to(blocked_by::io, p_duration); + get_original().m_state = blocked_by::io; + get_original().m_sleep_time = p_duration; return {}; } @@ -434,7 +582,8 @@ public: */ constexpr std::suspend_always block_by_sync(context* p_blocker) noexcept { - transition_to(blocked_by::sync, p_blocker); + get_original().m_state = blocked_by::sync; + get_original().m_sync_blocker = p_blocker; return {}; } @@ -450,7 +599,7 @@ public: */ constexpr std::suspend_always block_by_external() noexcept { - transition_to(blocked_by::external, std::monostate{}); + get_original().m_state = blocked_by::external; return {}; } @@ -481,7 +630,7 @@ public: */ [[nodiscard]] constexpr auto state() const noexcept { - return m_state; + return get_original().m_state; } /** @@ -494,7 +643,7 @@ public: * * @return true if the context is done, false otherwise */ - constexpr bool done() + [[nodiscard]] constexpr bool done() const { return m_active_handle == noop_sentinel; } @@ -517,11 +666,42 @@ public: { // We cannot resume the a coroutine blocked by time. // Only the scheduler can unblock a context state. - if (m_state != blocked_by::time) { + if (state() != blocked_by::time) { m_active_handle.resume(); } } + /** + * @brief Perform sync_wait operation + * + * This method waits synchronously for all coroutines on this context to + * complete. It uses the provided delay function to sleep for the required + * duration when waiting for time-based operations. + * + * @tparam DelayFunc The type of the delay function (must be invocable with + * sleep_duration parameter) + * @param p_delay - a delay function, that accepts a sleep duration and + * returns void. + * + * @note This method is primarily intended for testing and simple applications + * where a synchronous wait is needed. It's not suitable for production + * embedded systems that require precise timing or real-time scheduling. + */ + void sync_wait(std::invocable auto&& p_delay) + { + while (not done()) { + resume(); + + if (state() == blocked_by::time) { + if (auto delay_time = sleep_time(); + delay_time > sleep_duration::zero()) { + p_delay(delay_time); + } + unblock_without_notification(); + } + } + } + /** * @brief Get the amount of stack memory used by active coroutines * @@ -561,26 +741,88 @@ public: } /** - * @brief Virtual destructor for proper cleanup of derived classes + * @brief Amount of time to delay resuming this context + * + * When a context is blocked by for a set duration of time, it is the + * responsibility of the scheduler to ensure that the context is not resumed + * until that duration of time has elapsed. In the event that the context is + * not blocked by time, then this returns 0. * - * This virtual destructor ensures that derived context classes are properly - * cleaned up when deleted through a base class pointer. + * Calling this function multiple times returns the last sleep duration that + * was set by the async operation contained within the context. It is the + * responsibility of the scheduler to unblock this context, otherwise, calling + * resume() will immediately without resuming async operation. + * + * @return constexpr sleep_duration - the amount of time to delay resuming + * this context. */ - virtual ~context() = default; + [[nodiscard]] constexpr sleep_duration sleep_time() const noexcept + { + return get_original().m_sleep_time; + } -private: - friend class promise_base; - template - friend class promise; + /** + * @brief Sets the unblock listener + * + * There can only be a single unblock listener per context, thus any + * previously set unblock listener will be removed. + * + * Because this API takes the address of the listener, it is important that + * the context outlive the listener. + * + * `remove_unblock_handler()` must be called before the end of the lifetime of + * the `p_listener` object. It is undefined behavior to allow a listener to be + * destroyed before it is removed from this context. + * + * @param p_listener - the address of the unblock listener to be invoked when + * this context is unblocked. + */ + void on_unblock(unblock_listener* p_listener) + { + get_original().m_listener = p_listener; + } + + /** + * @brief Clears the on_unblock listener from this context + * + * After this is called, any call to `unblock()` will not invoke an unblock + * listener. + * + * It is the responsibility of the application to clear the unblock listener + * is cleared, before the end of the lifetime of the object that was passed to + * on_unblock(). + */ + void clear_unblock_listener() noexcept + { + get_original().m_listener = nullptr; + } /** - * @brief Internal structure to track proxy context information + * @brief Get the address of the context currently blocking this one blocking + * + * If this context's state is `blocked_by::sync` then there is a context that + * currently holds a resource that this context needs. That context's address + * can be acquired by this API. + * + * @returns context const* - returns a const pointer to the other context that + * is blocking this context. If no such context exists, then a nullptr is + * returned. */ - struct proxy_info + [[nodiscard]] context const* get_blocker() const { - context* original = nullptr; - context* parent = nullptr; - }; + return get_original().m_sync_blocker; + } + + ~context() + { + cancel(); + } + +private: + template + friend class promise; + friend class promise_base; + friend class proxy_context; /** * @brief Check if this is a proxy context @@ -592,35 +834,55 @@ private: */ [[nodiscard]] constexpr bool is_proxy() const noexcept { - return m_proxy.parent == nullptr; + return m_original != nullptr; } /** - * @brief Set the active coroutine handle for this context + * @brief Get a reference to the original context * - * This method sets the coroutine that is currently running on this context. + * If this object is a proxy, then the original context will be pulled from + * the proxy information and if its not, then a reference to `this` is + * returned. * - * @param p_active_handle The coroutine handle to set as active + * @return context& - reference to the original context */ - constexpr void active_handle(std::coroutine_handle<> p_active_handle) + [[nodiscard]] constexpr context const& get_original() const noexcept { - m_active_handle = p_active_handle; + if (is_proxy()) { + return *m_original; + } else { + return *this; + } + } + + /** + * @brief Get a reference to the original context + * + * If this object is a proxy, then the original context will be pulled from + * the proxy information and if its not, then a reference to `this` is + * returned. + * + * @return context& - reference to the original context + */ + [[nodiscard]] constexpr context& get_original() noexcept + { + if (is_proxy()) { + return *m_original; + } else { + return *this; + } } /** - * @brief Transition the context to a new blocking state + * @brief Set the active coroutine handle for this context * - * This internal method transitions the context to a new blocking state and - * notifies the scheduler via do_schedule(). + * This method sets the coroutine that is currently running on this context. * - * @param p_new_state The new blocking state to transition to - * @param p_info Additional information about the blocking condition + * @param p_active_handle The coroutine handle to set as active */ - constexpr void transition_to(blocked_by p_new_state, - block_info p_info = std::monostate{}) noexcept + constexpr void active_handle(std::coroutine_handle<> p_active_handle) { - m_state = p_new_state; - schedule(p_new_state, p_info); + m_active_handle = p_active_handle; } /** @@ -662,60 +924,17 @@ private: return coroutine_frame_stack_address; } - /** - * @brief Wrapper around call to do_schedule - * - * This wrapper exists to allow future extensibility - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void schedule(blocked_by p_block_state, block_info p_block_info) noexcept - { - return do_schedule(p_block_state, p_block_info); - } - - /** - * @brief Implementations of context use this to notify their scheduler of - * changes to this async context. - * - * It is up to the scheduler to ensure that concurrent calls to this API are - * serialized appropriately. For a single threaded event loop, syncronization - * and serialization is not necessary. For a thread pool implementation, - * syncronization and serialization must be considered. - * - * @param p_block_state - the type of blocking event the context has - * occurred. - * @param p_block_info - Information about what exactly is blocking this - * context. If p_block_info is a sleep_duration, and the p_block_state is - * blocked_by::time, then this context is requesting to be scheduled at that - * or a later time. If the p_block_info is a sleep_duration, and the block - * state isn't blocked_by::time, then this sleep duration is a hint to the - * scheduler to when it would be appropriate to reschedule this context. The - * scheduler does not have to be abided by this. If p_block_info is a pointer - * to a context, then the pointed to context is currently blocking p_context. - * This can be used to determine when to schedule p_context again, but does - * not have to be abided by for proper function. - */ - virtual void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept = 0; - friend class proxy_context; - - /* vtable ptr */ // word 1 - std::coroutine_handle<> m_active_handle = noop_sentinel; // word 2 + std::coroutine_handle<> m_active_handle = noop_sentinel; // word 1 + uptr* m_stack_pointer = nullptr; // word 2 std::span m_stack{}; // word 3-4 - uptr* m_stack_pointer = nullptr; // word 5 - blocked_by m_state = blocked_by::nothing; // word 6 - proxy_info m_proxy{}; // word 7-8 + context* m_original = nullptr; // word 5 + // ----------- Only available from the original ----------- + unblock_listener* m_listener = nullptr; // word 6 + sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 + context* m_sync_blocker = nullptr; // word 8 + blocked_by m_state = blocked_by::nothing; // word 9: pad 3 }; -// Context should stay close to a standard cache-line of 64 bytes (8 words) for -// a 64-bit system. This compile time check ensures that the context does not -// exceed the this boundary for the platform. -static_assert(sizeof(context) <= std::hardware_constructive_interference_size, - "Context cannot be contained within a cache-line (as specified " - "by std::hardware_constructive_interference_size)"); - /** * @brief A proxy context that provides isolated stack space for supervised * coroutines @@ -731,6 +950,21 @@ static_assert(sizeof(context) <= std::hardware_constructive_interference_size, export class proxy_context : public context { public: + /** + * @brief Create a proxy context from an existing parent context + * + * This static method creates a new proxy context that uses a portion of the + * parent context's stack memory. The proxy takes control over the remaining + * stack space, effectively creating an isolated sub-context. + * + * @param p_parent The parent context to create a proxy from + * @return A new proxy_context instance + */ + static proxy_context from(context& p_parent) + { + return { p_parent }; + } + /** * @brief Delete copy constructor * @@ -759,20 +993,8 @@ public: */ proxy_context& operator=(proxy_context&&) = delete; - /** - * @brief Create a proxy context from an existing parent context - * - * This static method creates a new proxy context that uses a portion of the - * parent context's stack memory. The proxy takes control over the remaining - * stack space, effectively creating an isolated sub-context. - * - * @param p_parent The parent context to create a proxy from - * @return A new proxy_context instance - */ - static proxy_context from(context& p_parent) - { - return { p_parent }; - } + constexpr void initialize_stack_memory(std::span p_stack_memory) = + delete; /** * @brief Destructor for proxy_context @@ -780,15 +1002,14 @@ public: * The destructor cancels any remaining operations and properly restores * the parent context's stack memory to its original state. */ - ~proxy_context() override + ~proxy_context() { // Cancel any operations still on this context cancel(); // Restore parent stack, by setting its range to be the start of its // stack and the end of our stack. - m_proxy.parent->m_stack = { m_proxy.parent->m_stack.begin(), - m_stack.end() }; + m_parent->m_stack = { m_parent->m_stack.begin(), m_stack.end() }; } private: @@ -801,9 +1022,9 @@ private: * @param p_parent The parent context to create proxy from */ proxy_context(context& p_parent) + : m_parent(&p_parent) { m_active_handle = context::noop_sentinel; - m_proxy = {}; // We need to manually set: // 1. m_stack @@ -822,206 +1043,50 @@ private: // If this is a proxy, take its pointer to the origin if (p_parent.is_proxy()) { - m_proxy = proxy_info{ - .original = m_proxy.original, - .parent = &p_parent, - }; - } else { // Otherwise, the current parent is the origin. - m_proxy = proxy_info{ - .original = &p_parent, - .parent = &p_parent, - }; + m_original = p_parent.m_original; + } else { // Otherwise, the current parent is the original + m_original = &p_parent; } + m_parent = &p_parent; } - /** - * @brief Forwards the schedule call to the original context - * - * This method forwards scheduling notifications to the original context, - * ensuring that the parent context's scheduler is properly notified of - * state changes. - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept override - { - m_proxy.original->schedule(p_block_state, p_block_info); - } + context* m_parent; }; /** - * @brief A basic context implementation that supports synchronous waiting + * @brief A context with embedded inplace stack memory * - * The basic_context class provides a concrete implementation of the context - * interface that supports synchronous waiting operations. It extends the base - * context with functionality to wait for coroutines to complete using a simple - * synchronous loop. - * - * NOTE: This class does not provide stack memory - * - * This context is particularly useful for testing and simple applications where - * a scheduler isn't needed, as it provides a way to wait for all coroutines to - * complete without requiring external scheduling. - * - * @note basic_context is designed for simple use cases and testing, not - * production embedded systems where strict memory management is required. - */ -class basic_context_impl : public context -{ -public: - /** - * @brief Default constructor for basic_context_impl - * - * Creates a new basic context with default initialization. - */ - basic_context_impl() = default; - - /** - * @brief Virtual destructor for proper cleanup - * - * Ensures that the basic context is properly cleaned up when deleted. - */ - ~basic_context_impl() override = default; - - /** - * @brief Get the pending delay time for time-blocking operations - * - * This method returns the sleep duration that is currently pending for - * time-blocking operations. It's used by the sync_wait method to determine - * how long to sleep. - * - * @return The pending sleep duration - * - * @note This is an internal method used by the basic_context implementation - * to manage time-based blocking operations during synchronous waiting. - */ - [[nodiscard]] constexpr sleep_duration pending_delay() const noexcept - { - return m_pending_delay; - } - - /** - * @brief Perform sync_wait operation - * - * This method waits synchronously for all coroutines on this context to - * complete. It uses the provided delay function to sleep for the required - * duration when waiting for time-based operations. - * - * @tparam DelayFunc The type of the delay function (must be invocable with - * sleep_duration parameter) - * @param p_delay - a delay function, that accepts a sleep duration and - * returns void. - * - * @note This method is primarily intended for testing and simple applications - * where a synchronous wait is needed. It's not suitable for production - * embedded systems that require precise timing or real-time scheduling. - */ - void sync_wait(std::invocable auto&& p_delay) - { - while (active_handle() != context::noop_sentinel) { - active_handle().resume(); - - if (state() == blocked_by::time) { - if (m_pending_delay == sleep_duration(0)) { - unblock_without_notification(); - continue; - } - p_delay(m_pending_delay); - m_pending_delay = sleep_duration(0); - unblock_without_notification(); - } - } - } - -private: - /** - * @brief Forwards the schedule call to the original context - * - * This method handles scheduling notifications for time-blocking operations. - * It stores the pending delay duration so that sync_wait can properly wait - * for it. - * - * @param p_block_state - state that this context has been set to - * @param p_block_info - information about the blocking conditions - */ - void do_schedule(blocked_by p_block_state, - block_info p_block_info) noexcept final - { - if (p_block_state == blocked_by::time) { - if (auto* ex = std::get_if(&p_block_info)) { - m_pending_delay = *ex; - } else { - m_pending_delay = sleep_duration{ 0 }; - } - } - // Ignore the rest and poll them... - } - - /** - * @brief The pending delay for time-blocking operations - * - * This member stores the sleep duration that is currently pending for - * time-blocking operations, allowing sync_wait to properly handle delays. - */ - sleep_duration m_pending_delay{ 0 }; -}; - -/** - * @brief A basic context implementation that supports synchronous waiting - * - * The basic_context class provides a concrete implementation of the context - * interface that supports synchronous waiting operations. It extends the base - * context with functionality to wait for coroutines to complete using a simple - * synchronous loop. - * - * This class provides stack memory via its `StackSizeInWords` template - * variable. + * The inplace_context class provides a concrete implementation of the context + * with stack memory embedded directly in the object. Otherwise, it works + * identically to the base context class. * * @tparam StackSizeInWords - the number of words to allocate for the context's * stack memory. Word size is 4 bytes for 32-bit systems and 8 bytes on 64-bit * systems. */ export template -class basic_context +class inplace_context : public context { public: static_assert(StackSizeInWords > 0UL, "Stack memory must be greater than 0 words."); - basic_context() - { - m_context.initialize_stack_memory(m_stack); - } - - ~basic_context() - { - m_context.cancel(); - } - - context& context() - { - return m_context; - } - - operator class context&() + inplace_context() + : context(m_stack) { - return m_context; } - auto* operator->() - { - return &m_context; - } + inplace_context(inplace_context const&) = delete; + inplace_context& operator=(inplace_context const&) = delete; + inplace_context(inplace_context&&) = default; + inplace_context& operator=(inplace_context&&) = default; - auto& operator*() + ~inplace_context() { - return m_context; + cancel(); } private: - basic_context_impl m_context; std::array m_stack{}; }; @@ -1832,7 +1897,7 @@ public: auto handle = std::get(m_state); full_handle_type::from_address(handle.address()) .promise() - .set_object_address(&m_state); + .m_future_state = &m_state; } } @@ -1856,7 +1921,7 @@ public: auto handle = std::get(m_state); full_handle_type::from_address(handle.address()) .promise() - .set_object_address(&m_state); + .m_future_state = &m_state; } } return *this; diff --git a/test_package/main.cpp b/test_package/main.cpp index 0e4ef12..2b2ccdb 100644 --- a/test_package/main.cpp +++ b/test_package/main.cpp @@ -26,71 +26,6 @@ import async_context; -struct round_robin_scheduler -{ - bool run_all_until_done(int p_iterations = 100) - { - for (int i = 0; i < p_iterations; i++) { - bool all_done = true; - for (auto const& ctx : contexts) { - if (not ctx->done()) { - all_done = false; - if (ctx->state() == async::blocked_by::nothing) { - ctx->resume(); - } - } - } - if (all_done) { - return true; - } - } - return false; - } - std::vector contexts{}; -}; - -struct test_context : public async::context -{ - std::array m_stack{}; - // NOTE: the scheduler isn't used in this example, but if it were used, it - // would be called within the do_schedule function. For example, rather than - // performing a sleep that blocks the whole thread, the context and its time - // duration could be passed to an API on the "round_robin_scheduler", that - // makes it aware of the time schedule requirements. - std::shared_ptr scheduler; - - test_context(test_context const&) = delete; - test_context& operator=(test_context const&) = delete; - test_context(test_context&&) = delete; - test_context& operator=(test_context&&) = delete; - - test_context(std::shared_ptr const& p_scheduler) - : scheduler(p_scheduler) - { - scheduler->contexts.push_back(this); - this->initialize_stack_memory(m_stack); - } - -private: - void do_schedule( - async::blocked_by p_blocked_state, - [[maybe_unused]] async::block_info p_block_info) noexcept override - { - // Simulate I/O completion - immediately unblock - if (p_blocked_state == async::blocked_by::io) { - this->unblock_without_notification(); - } else if (p_blocked_state == async::blocked_by::time) { -#if not __ARM_EABI__ - if (auto* time = std::get_if(&p_block_info)) { - // Just block this thread vs doing something smart - std::this_thread::sleep_for(*time); - } -#endif - this->unblock_without_notification(); - } - } -}; - // Simulates reading sensor data with I/O delay async::future read_sensor(async::context& ctx, std::string_view p_name) { @@ -137,22 +72,94 @@ async::future sensor_pipeline(async::context& ctx, std::println("Pipeline '{}' complete!\n", p_name); } -int main() +// Type-erased future wrapper for storing different future types +struct future_wrapper { - auto scheduler = std::make_shared(); + virtual void resume() = 0; + [[nodiscard]] virtual bool done() const = 0; + virtual ~future_wrapper() = default; +}; - // Create context and add them to the scheduler - test_context ctx1(scheduler); - test_context ctx2(scheduler); +template +class typed_future_wrapper : public future_wrapper +{ +public: + explicit typed_future_wrapper(async::future&& p_future) + : m_future(std::move(p_future)) + { + } - // Run two independent pipelines concurrently - auto pipeline1 = sensor_pipeline(ctx1, "🌟 System 1"); - auto pipeline2 = sensor_pipeline(ctx2, "🔥 System 2"); + void resume() override + { + m_future.resume(); + } - scheduler->run_all_until_done(); + [[nodiscard]] bool done() const override + { + return m_future.done(); + } + +private: + async::future m_future; +}; + +template +auto make_future_wrapper(async::future&& p_future) +{ + return std::make_unique>(std::move(p_future)); +} + +template +struct round_robin_scheduler +{ + bool resume_n(int p_iterations) + { + for (int i = 0; i < p_iterations; i++) { + bool all_done = true; + for (auto& ctx : std::span(m_context_list).first(m_context_size)) { + if (not ctx.done()) { + all_done = false; + if (ctx.state() == async::blocked_by::nothing) { + ctx.resume(); +#if not __ARM_EABI__ + std::this_thread::sleep_for(ctx.sleep_time()); +#endif + ctx.unblock(); // simulate quick unblocking of the context. + } + } + } + if (all_done) { + return true; + } + } + return false; + } + + template + void schedule_operation(AsyncOperation&& p_async_operation, + OperationArgs... args) + { + + auto future = p_async_operation(m_context_list[m_context_size], args...); + future_list[m_context_size] = make_future_wrapper(std::move(future)); + m_context_size++; + } + + std::array, ContextCount> + m_context_list{}; + std::array, ContextCount> future_list{}; + unsigned m_context_size = 0; +}; + +int main() +{ + round_robin_scheduler scheduler; + + // Run two independent pipelines concurrently + scheduler.schedule_operation(sensor_pipeline, "🌟 System 1"); + scheduler.schedule_operation(sensor_pipeline, "🔥 System 2"); - assert(pipeline1.done()); - assert(pipeline2.done()); + scheduler.resume_n(100); std::println("Both pipelines completed successfully!"); return 0; diff --git a/tests/basics.test.cpp b/tests/basics.test.cpp index 2446441..c555673 100644 --- a/tests/basics.test.cpp +++ b/tests/basics.test.cpp @@ -11,7 +11,7 @@ void basics() "sync return type void"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; unsigned step = 0; auto sync_coroutine = [&step](async::context&) -> async::future { @@ -24,7 +24,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % 1 == step); @@ -32,7 +31,7 @@ void basics() "sync return type int"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 5; @@ -47,7 +46,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -56,7 +54,7 @@ void basics() "sync return type std::string"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr auto expected_return_value = "Hello, World\n"; @@ -72,7 +70,6 @@ void basics() // Verify expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -81,7 +78,7 @@ void basics() "co_return"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 5; unsigned step = 0; @@ -95,7 +92,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -105,7 +101,6 @@ void basics() // Verify 2 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -114,7 +109,7 @@ void basics() "suspend then co_return"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -130,7 +125,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -140,7 +134,6 @@ void basics() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 1 == step); @@ -150,7 +143,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -159,7 +151,7 @@ void basics() "co_await coroutine"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -182,7 +174,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -192,7 +183,6 @@ void basics() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); @@ -202,7 +192,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); @@ -211,7 +200,7 @@ void basics() "co_await coroutine"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -234,7 +223,6 @@ void basics() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -244,7 +232,6 @@ void basics() // Verify 3 expect(that % 0 == ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_total == future.value()); diff --git a/tests/basics_dep_inject.test.cpp b/tests/basics_dep_inject.test.cpp new file mode 100644 index 0000000..249b1be --- /dev/null +++ b/tests/basics_dep_inject.test.cpp @@ -0,0 +1,134 @@ +#include +#include + +#include +#include + +import async_context; + +void basics_dep_inject() +{ + using namespace boost::ut; + + "sync return type void"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + unsigned step = 0; + auto sync_coroutine = [&step](async::context&) -> async::future { + step = 1; + return {}; + }; + + // Exercise + auto future = sync_coroutine(ctx); + + // Verify + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % 1 == step); + }; + + "suspend then co_return"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto async_coroutine = + [&step](async::context& p_ctx) -> async::future { + step = 1; + while (step != 2) { + // external set to 2 + co_await p_ctx.block_by_io(); + } + step = 2; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and suspend coroutine + ctx.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 1 == step); + expect(ctx.state() == async::blocked_by::io); + + // Exercise 3: resume and co_return from coroutine + ctx.unblock(); + step = 2; + ctx.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 2 == step); + }; + + "Call handler"_test = []() { + // Setup + std::array stack{}; + async::context ctx{ stack }; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto async_coroutine = + [&step](async::context& p_ctx) -> async::future { + step = 1; + co_await p_ctx.block_by_io(); + step = 2; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and suspend coroutine + ctx.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(async::blocked_by::io == ctx.state()); + // expect(that % async::block_by::io == *info); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3: resume and co_return from coroutine + ctx.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 2 == step); + }; +}; + +int main() +{ + basics_dep_inject(); +} diff --git a/tests/blocked_by.test.cpp b/tests/blocked_by.test.cpp index 40a9dcd..90080f7 100644 --- a/tests/blocked_by.test.cpp +++ b/tests/blocked_by.test.cpp @@ -13,7 +13,9 @@ void blocking_states() "co_await 10ms & co_await 50ms"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; + + std::println("ctx.capacity() = {}", ctx.capacity()); static constexpr int expected_return_value = 8748; unsigned step = 0; @@ -31,7 +33,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -41,31 +42,28 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % 1 == ctx.info->sleep_count); expect(that % not future.done()); - expect(that % 10ms == ctx.info->last_sleep_time); - + expect(that % 10ms == ctx.sleep_time()); expect(that % not future.has_value()); expect(that % 1 == step); // Exercise 3 + ctx.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx.memory_used()); - expect(that % 2 == ctx.info->sleep_count); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); - expect(that % 25ms == ctx.info->last_sleep_time); + expect(that % 25ms == ctx.sleep_time()); // Exercise 4 + ctx.unblock(); future.resume(); // Verify 4 expect(that % 0 == ctx.memory_used()); - expect(that % 2 == ctx.info->sleep_count); expect(that % future.done()); expect(that % future.has_value()); expect(that % 3 == step); @@ -74,7 +72,7 @@ void blocking_states() "context::block_by_io() "_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; unsigned step = 0; bool io_complete = false; @@ -98,7 +96,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx.memory_used()); - expect(that % not ctx.info->scheduled_called_once); expect(that % not future.done()); expect(that % 0 == step); @@ -107,45 +104,43 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::io == ctx.state()); expect(that % not future.done()); expect(that % 1 == step); // Exercise 3: stay in loop and re-block on io + ctx.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::io == ctx.state()); expect(that % not future.done()); expect(that % 1 == step); // Exercise 4: unblock IO and resume to final suspend io_complete = true; + ctx.unblock(); future.resume(); // Verify 4 expect(that % 0 == ctx.memory_used()); - expect(that % ctx.info->scheduled_called_once); - expect(that % ctx.info->io_block); + expect(that % async::blocked_by::nothing == ctx.state()); expect(that % future.done()); expect(that % 2 == step); }; "blocked_by time, io, & sync"_test = []() { // Setup - auto info = std::make_shared(); - test_context ctx1(info); - test_context ctx2(info); + async::inplace_context<1024> ctx1{}; + async::inplace_context<1024> ctx2{}; int step = 0; auto co = [&](async::context& p_context) -> async::future { using namespace std::chrono_literals; step = 1; - co_await 100ns; + co_await 100us; step = 2; co_await p_context.block_by_io(); step = 3; @@ -160,7 +155,6 @@ void blocking_states() // Verify 1 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % not ctx1.info->scheduled_called_once); expect(that % not future.done()); expect(that % 0 == step); @@ -170,20 +164,20 @@ void blocking_states() // Verify 2 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % ctx1.info->scheduled_called_once); - expect(that % 100ns == ctx1.info->last_sleep_time); - expect(that % not ctx1.info->io_block); + expect(that % 100us == ctx1.sleep_time()); + expect(that % ctx1.state() == async::blocked_by::time); expect(that % not future.done()); expect(that % 1 == step); // Exercise 3 + ctx1.unblock(); future.resume(); // Verify 3 expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); - expect(that % ctx1.info->scheduled_called_once); - expect(that % ctx1.info->io_block) << "context should be blocked by IO"; + expect(that % ctx1.state() == async::blocked_by::io) + << "context should be blocked by IO"; expect(that % not future.done()); expect(that % 2 == step); @@ -195,7 +189,7 @@ void blocking_states() expect(that % 0 < ctx1.memory_used()); expect(that % 0 == ctx2.memory_used()); expect(that % not future.done()); - expect(that % &ctx2 == ctx1.info->sync_context) + expect(that % &ctx2 == ctx1.get_blocker()) << "sync context should be &ctx2"; expect(that % 3 == step); diff --git a/tests/cancel.test.cpp b/tests/cancel.test.cpp index cd133e4..7abd461 100644 --- a/tests/cancel.test.cpp +++ b/tests/cancel.test.cpp @@ -13,7 +13,7 @@ void cancellation_tests() "Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -68,7 +68,7 @@ void cancellation_tests() "Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -128,7 +128,7 @@ void cancellation_tests() "Context Cancellation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; @@ -191,7 +191,7 @@ void cancellation_tests() "Exception Propagation"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; counter_pair count{}; int ends_reached = 0; diff --git a/tests/exclusive_access.test.cpp b/tests/exclusive_access.test.cpp index 819342d..a263af4 100644 --- a/tests/exclusive_access.test.cpp +++ b/tests/exclusive_access.test.cpp @@ -12,8 +12,8 @@ void guards_tests() using namespace std::chrono_literals; "Context Token"_test = []() { // Setup - test_context ctx1; - test_context ctx2; + async::inplace_context<1024> ctx1; + async::inplace_context<1024> ctx2; async::exclusive_access io_in_use; diff --git a/tests/on_unblock.test.cpp b/tests/on_unblock.test.cpp new file mode 100644 index 0000000..90322c1 --- /dev/null +++ b/tests/on_unblock.test.cpp @@ -0,0 +1,131 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +void on_upload_test() +{ + using namespace boost::ut; + using namespace std::chrono_literals; + + "on_upload() via lambda"_test = []() { + // Setup + async::inplace_context<1024> ctx; + bool unblock_called = false; + async::context const* unblocked_context = nullptr; + auto upload_handler = + async::unblock_listener::from([&](async::context const& p_context) { + unblock_called = true; + unblocked_context = &p_context; + }); + ctx.on_unblock(&upload_handler); + + unsigned step = 0; + auto co = [&step](async::context&) -> async::future { + step = 1; + co_await 10ms; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % unblock_called == false); + expect(that % unblocked_context == nullptr); + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % 10ms == ctx.sleep_time()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3 + ctx.unblock(); + future.resume(); + + // Verify 3 + expect(that % unblock_called == true); + expect(that % unblocked_context == &ctx); + expect(that % unblock_called == true); + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % 1 == step); + + ctx.clear_unblock_listener(); + }; + + "on_upload() via inheritance"_test = []() { + // Setup + async::inplace_context<1024> ctx; + struct un_blocker : public async::unblock_listener + { + bool unblock_called = false; + async::context const* unblocked_context = nullptr; + + void on_unblock(async::context const& p_context) noexcept override + { + unblock_called = true; + unblocked_context = &p_context; + } + }; + + un_blocker ub; + ctx.on_unblock(&ub); + + unsigned step = 0; + auto co = [&step](async::context&) -> async::future { + step = 1; + co_await 10ms; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % ub.unblock_called == false); + expect(that % ub.unblocked_context == nullptr); + expect(that % 0 < ctx.memory_used()); + expect(that % not future.done()); + expect(that % 10ms == ctx.sleep_time()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3 + ctx.unblock(); + future.resume(); + + // Verify 3 + expect(that % ub.unblock_called == true); + expect(that % ub.unblocked_context == &ctx); + expect(that % 0 == ctx.memory_used()); + expect(that % future.done()); + expect(that % 1 == step); + + ctx.clear_unblock_listener(); + }; +}; + +int main() +{ + on_upload_test(); +} diff --git a/tests/proxy.test.cpp b/tests/proxy.test.cpp index fca99ae..c68ee01 100644 --- a/tests/proxy.test.cpp +++ b/tests/proxy.test.cpp @@ -13,7 +13,9 @@ void proxy_tests() "Proxy Context (normal behavior, no timeout)"_test = []() { // Setup - test_context ctx; + std::array stack{}; + async::context ctx; + ctx.initialize_stack_memory(stack); std::println("===================================="); std::println("Running Proxy Context Test (no timeout normal behavior)"); std::println("===================================="); @@ -89,7 +91,7 @@ void proxy_tests() "Proxy Coroutines Timeout"_test = []() { // Setup - test_context ctx; + async::inplace_context<1024> ctx; std::println("===================================="); std::println("Running Proxy Context Test (with timeout)"); std::println("===================================="); diff --git a/tests/simple_scheduler.test.cpp b/tests/simple_scheduler.test.cpp new file mode 100644 index 0000000..b5febc1 --- /dev/null +++ b/tests/simple_scheduler.test.cpp @@ -0,0 +1,143 @@ +#include + +#include +#include +#include +#include +#include +#include + +#if not __ARM_EABI__ +#include +#endif + +import async_context; + +// Simulates reading sensor data with I/O delay +async::future read_sensor(async::context& ctx, std::string_view p_name) +{ + using namespace std::chrono_literals; + co_await ctx.block_by_io(); // Simulate I/O operation + co_return 42; +} + +// Processes data with computation delay +async::future process_data(async::context& ctx, + std::string_view p_name, + int value) +{ + using namespace std::chrono_literals; + co_await 10ms; // Simulate processing time + int result = value * 2; + co_return result; +} + +// Writes result with I/O delay +async::future write_actuator(async::context& ctx, + std::string_view p_name, + int value) +{ + co_await ctx.block_by_io(); +} + +// Coordinates the full pipeline +async::future sensor_pipeline(async::context& ctx, + std::string_view p_name) +{ + + int sensor_value = co_await read_sensor(ctx, p_name); + int processed = co_await process_data(ctx, p_name, sensor_value); + co_await write_actuator(ctx, p_name, processed); + +} + +// Type-erased future wrapper for storing different future types +struct future_wrapper +{ + virtual void resume() = 0; + [[nodiscard]] virtual bool done() const = 0; + virtual ~future_wrapper() = default; +}; + +template +class typed_future_wrapper : public future_wrapper +{ +public: + explicit typed_future_wrapper(async::future&& p_future) + : m_future(std::move(p_future)) + { + } + + void resume() override + { + m_future.resume(); + } + + [[nodiscard]] bool done() const override + { + return m_future.done(); + } + +private: + async::future m_future; +}; + +template +auto make_future_wrapper(async::future&& p_future) +{ + return std::make_unique>(std::move(p_future)); +} + +template +struct round_robin_scheduler +{ + bool resume_n(int p_iterations) + { + for (int i = 0; i < p_iterations; i++) { + bool all_done = true; + for (auto& ctx : std::span(m_context_list).first(m_context_size)) { + if (not ctx.done()) { + all_done = false; + if (ctx.state() == async::blocked_by::nothing) { + ctx.resume(); + std::this_thread::sleep_for(ctx.sleep_time()); + ctx.unblock(); // simulate quick unblocking of the context. + } + } + } + if (all_done) { + return true; + } + } + return false; + } + + template + void schedule_operation(AsyncOperation&& p_async_operation, + OperationArgs... args) + { + + auto future = p_async_operation(m_context_list[m_context_size], args...); + auto& f_wrap = future_list[m_context_size]; + f_wrap = make_future_wrapper(std::move(future)); + m_context_size++; + } + + std::array, ContextCount> + m_context_list{}; + std::array, ContextCount> future_list{}; + unsigned m_context_size = 0; +}; + +int main() +{ + round_robin_scheduler scheduler; + + // Run two independent pipelines concurrently + scheduler.schedule_operation(sensor_pipeline, "🌟 System 1"); + scheduler.schedule_operation(sensor_pipeline, "🔥 System 2"); + + scheduler.resume_n(100); + + return 0; +} diff --git a/tests/basic_context.test.cpp b/tests/sync_wait.test.cpp similarity index 74% rename from tests/basic_context.test.cpp rename to tests/sync_wait.test.cpp index 8e59655..f1c6abd 100644 --- a/tests/basic_context.test.cpp +++ b/tests/sync_wait.test.cpp @@ -6,57 +6,35 @@ import async_context; import test_utils; -void basic_context() +void context() { using namespace boost::ut; static constexpr auto stack_size = 1024; - "sync return type void"_test = []() { - // Setup - async::basic_context ctx; - - unsigned step = 0; - auto sync_coroutine = [&step](async::context&) -> async::future { - step = 1; - return {}; - }; - - // Exercise - auto future = sync_coroutine(ctx); - - // Verify - expect(that % 0 == ctx->memory_used()); - expect(that % future.done()); - expect(that % future.has_value()); - expect(that % 1 == step); - - expect(that % stack_size == ctx->capacity()); - }; - "sync_wait --> future"_test = []() { // Setup - async::basic_context<1024> ctx; + async::inplace_context ctx; auto future = [](async::context&) -> async::future { co_return 5; }(ctx); - ctx->sync_wait([](async::sleep_duration p_sleep_duration) { + ctx.sync_wait([](async::sleep_duration p_sleep_duration) { std::this_thread::sleep_for(p_sleep_duration); }); // Verify - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % 5 == future.value()); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await coroutine"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int expected_return_value = 1413; unsigned step = 0; @@ -78,7 +56,7 @@ void basic_context() auto future = co(ctx); // Verify 1 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -87,7 +65,7 @@ void basic_context() future.resume(); // Verify 2 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 2 == step); @@ -96,18 +74,18 @@ void basic_context() future.resume(); // Verify 3 - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_return_value == future.value()); expect(that % 4 == step); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await coroutine"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -129,7 +107,7 @@ void basic_context() auto future = co(ctx); // Verify 1 - expect(that % 0 < ctx->memory_used()); + expect(that % 0 < ctx.memory_used()); expect(that % not future.done()); expect(that % not future.has_value()); expect(that % 0 == step); @@ -138,18 +116,18 @@ void basic_context() future.resume(); // Verify 3 - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % expected_total == future.value()); expect(that % 2 == step); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; "co_await Xms + sync_wait"_test = []() { // Setup - async::basic_context ctx; + async::inplace_context ctx; static constexpr int return_value1 = 1413; static constexpr int return_value2 = 4324; @@ -175,12 +153,12 @@ void basic_context() // Exercise auto future = co(ctx); - ctx->sync_wait([&](async::sleep_duration p_sleep_time) { + ctx.sync_wait([&](async::sleep_duration p_sleep_time) { sleep_cycles.push_back(p_sleep_time); }); // Verify - expect(that % 0 == ctx->memory_used()); + expect(that % 0 == ctx.memory_used()); expect(that % future.done()); expect(that % future.has_value()); expect(that % 2 == step); @@ -188,11 +166,11 @@ void basic_context() expect(that % sleep_cycles == std::vector{ 44ms, 100ms, 50ms }); - expect(that % stack_size == ctx->capacity()); + expect(that % stack_size == ctx.capacity()); }; }; int main() { - basic_context(); + context(); } diff --git a/tests/util.cppm b/tests/util.cppm index d9952e4..91854fd 100644 --- a/tests/util.cppm +++ b/tests/util.cppm @@ -33,79 +33,6 @@ std::ostream& operator<<(std::ostream& out, blocked_by b) } // namespace async export { - struct thread_info - { - async::context* sync_context = nullptr; - int sleep_count = 0; - bool io_block = false; - async::sleep_duration last_sleep_time = {}; - bool scheduled_called_once = false; - }; - - struct test_context : public async::context - { - std::shared_ptr info; - std::array m_stack{}; - - test_context(std::shared_ptr const& p_info) - : info(p_info) - { - this->initialize_stack_memory(m_stack); - } - test_context() - : info(std::make_shared()) - { - this->initialize_stack_memory(m_stack); - } - - ~test_context() override - { - cancel(); - } - - private: - void do_schedule(async::blocked_by p_block_state, - async::block_info p_block_info) noexcept override - { - info->scheduled_called_once = true; - - switch (p_block_state) { - case async::blocked_by::time: { - if (std::holds_alternative(p_block_info)) { - auto const time = std::get(p_block_info); - info->sleep_count++; - info->last_sleep_time = time; - } - unblock_without_notification(); - break; - } - case async::blocked_by::sync: { - if (std::holds_alternative(p_block_info)) { - auto* context = std::get(p_block_info); - std::println( - "Coroutine ({}) is blocked by syncronization with coroutine ({})", - static_cast(this), - static_cast(context)); - info->sync_context = context; - } - break; - } - case async::blocked_by::io: { - info->io_block = true; - break; - } - case async::blocked_by::nothing: { - std::println("Context ({}) has been unblocked!", - static_cast(this)); - break; - } - default: { - break; - } - } - } - }; - struct sleep_counter { int count = 0;