diff --git a/CMakeLists.txt b/CMakeLists.txt index d586cbc..4b33e18 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,20 +21,24 @@ project(async_context LANGUAGES CXX) find_package(LibhalCMakeUtil REQUIRED) libhal_project_init() -libhal_add_library(async_context MODULES modules/async_context.cppm) +libhal_add_library(async_context + MODULES + modules/async_context.cppm + modules/schedulers.cppm) libhal_apply_compile_options(async_context) libhal_install_library(async_context NAMESPACE libhal) libhal_add_tests(async_context TEST_NAMES - sync_wait - basics - blocked_by - cancel - exclusive_access - proxy - basics_dep_inject - on_unblock - simple_scheduler + # sync_wait + # basics + # blocked_by + # cancel + # exclusive_access + # proxy + # basics_dep_inject + # on_unblock + # simple_scheduler + when_all MODULES tests/util.cppm diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 4509acc..95b0f8b 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -1,4 +1,4 @@ -// Copyright 2024 - 2025 Khalil Estell and the libhal contributors +// Copyright 2024 - 2026 Khalil Estell and the libhal contributors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -333,7 +333,7 @@ public: } private: - void on_unblock(async::context const& p_context) noexcept override + void on_unblock(async::context& p_context) noexcept override { handler(p_context); } @@ -362,7 +362,7 @@ private: * @note This method MUST be noexcept and ISR-safe. It may be called from * any execution context including interrupt handlers. */ - virtual void on_unblock(context const& p_context) noexcept = 0; + virtual void on_unblock(context& p_context) noexcept = 0; }; /** @@ -499,8 +499,6 @@ public: */ constexpr void unblock_without_notification() noexcept { - // We clear this information after the unblock call to allow the unblock - // call to inspect the context's current state. get_original().m_state = blocked_by::nothing; get_original().m_sleep_time = sleep_duration::zero(); get_original().m_sync_blocker = nullptr; @@ -526,6 +524,10 @@ public: if (get_original().m_listener) { get_original().m_listener->on_unblock(*this); } + + // We clear this context state information after the unblock listener is + // invoked to allow the unblock listener to inspect the context's current + // state prior to being unblocked. unblock_without_notification(); } @@ -664,8 +666,11 @@ public: */ void resume() { - // We cannot resume the a coroutine blocked by time. - // Only the scheduler can unblock a context state. + // We cannot resume the a coroutine blocked by time. Only the scheduler can + // unblock a context state. + // + // This needs to be here to ensure that sync_wait is possible, otherwise the + // blocked_by::time semantic cannot be supported. if (state() != blocked_by::time) { m_active_handle.resume(); } @@ -924,15 +929,15 @@ private: return coroutine_frame_stack_address; } + // A concern for this library is how large the context objet is. std::coroutine_handle<> m_active_handle = noop_sentinel; // word 1 uptr* m_stack_pointer = nullptr; // word 2 std::span m_stack{}; // word 3-4 context* m_original = nullptr; // word 5 - // ----------- Only available from the original ----------- - unblock_listener* m_listener = nullptr; // word 6 - sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 - context* m_sync_blocker = nullptr; // word 8 - blocked_by m_state = blocked_by::nothing; // word 9: pad 3 + unblock_listener* m_listener = nullptr; // word 6 + sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 + context* m_sync_blocker = nullptr; // word 8 + blocked_by m_state = blocked_by::nothing; // word 9: pad 3 }; /** @@ -1078,8 +1083,8 @@ public: inplace_context(inplace_context const&) = delete; inplace_context& operator=(inplace_context const&) = delete; - inplace_context(inplace_context&&) = default; - inplace_context& operator=(inplace_context&&) = default; + inplace_context(inplace_context&&) = delete; + inplace_context& operator=(inplace_context&&) = delete; ~inplace_context() { diff --git a/modules/schedulers.cppm b/modules/schedulers.cppm new file mode 100644 index 0000000..21d3f35 --- /dev/null +++ b/modules/schedulers.cppm @@ -0,0 +1,371 @@ +// Copyright 2024 - 2026 Khalil Estell and the libhal contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +#define DEBUGGING 0 + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if DEBUGGING +#include +#endif + +export module async_context.schedulers; + +export import async_context; + +namespace async::inline v0 { +// ============================================================================= +// +// RAII Context Handle +// +// ============================================================================= + +/** + * @brief RAII guard that manages unblock listener lifetime on a context. + * + * On construction, registers an unblock listener with the context. + * On destruction, clears the listener to prevent dangling references. + * Takes non-owning pointers to both the context and listener. + * + * The listener object MUST outlive the context_handle. + */ +export class context_handle +{ +public: + /** + * @brief Default constructor (no context attached). + */ + constexpr context_handle() noexcept = default; + + /** + * @brief Construct and register listener with context. + * + * @param p_context The context to register the listener with. Must outlive + * this guard. + * @param p_listener The unblock listener to register. Must outlive this + * guard. + */ + constexpr context_handle(async::context* p_context, + async::unblock_listener* p_listener) noexcept + : m_context(p_context) + { + if (m_context) { + m_context->on_unblock(p_listener); + } + } + + /** + * @brief Move constructor. + * + * Transfers ownership of the listener registration. The source guard is left + * empty (its destructor will be a no-op). + */ + constexpr context_handle(context_handle&& p_other) noexcept + : m_context(std::exchange(p_other.m_context, nullptr)) + { + } + + /** + * @brief Move assignment operator. + * + * Clears the current listener before acquiring a new one. + */ + constexpr context_handle& operator=(context_handle&& p_other) noexcept + { + if (this != &p_other) { + clear(); + m_context = std::exchange(p_other.m_context, nullptr); + } + return *this; + } + + /** + * @brief Destructor — clears the unblock listener from the context. + */ + constexpr ~context_handle() + { + clear(); + } + + context_handle(context_handle const&) = delete; + context_handle& operator=(context_handle const&) = delete; + + /** + * @brief Get the underlying context pointer. + * + * @return Pointer to the context, or nullptr if no context is registered. + */ + [[nodiscard]] constexpr async::context* get() const noexcept + { + return m_context; + } + + /** + * @brief Dereference operator to access the context. + * + * @return Reference to the context. Undefined behavior if no context is + * registered. + */ + [[nodiscard]] constexpr async::context& operator*() const noexcept + { + return *m_context; + } + + /** + * @brief Member access operator. + * + * @return Pointer to the context for member access. Undefined behavior if no + * context is registered. + */ + [[nodiscard]] constexpr async::context* operator->() const noexcept + { + return m_context; + } + +private: + /** + * @brief Clear the listener from the context if one is registered. + */ + constexpr void clear() noexcept + { + if (m_context) { + m_context->clear_unblock_listener(); + } + } + + async::context* m_context = nullptr; +}; + +// ============================================================================= +// +// Clock Concept, Adapters & run_until_done +// +// ============================================================================= + +/** + * @brief Concept for an instance-based clock. + * + * Deliberately does NOT require static now() — unlike std::chrono clock types + * — so that hardware clocks can be injected as runtime objects. Any + * std::chrono clock can be adapted via chrono_clock_adapter. + * + * A conforming type must provide: + * - time_point : the type returned by now() + * - duration : the difference type of two time_points + * - now() : const member returning time_point + * - time_point arithmetic (subtraction -> duration, addition of duration) + * - time_point::max() sentinel for "never" + */ +export template +concept clock = requires(T const t, typename T::time_point tp) { + typename T::time_point; + typename T::duration; + { t.now() } -> std::same_as; + { tp - tp } -> std::convertible_to; + { tp + typename T::duration{} } -> std::same_as; + { T::time_point::max() } -> std::same_as; +}; + +/** + * @brief Adapts any std::chrono-conforming clock (static now()) into an + * async::clock (instance now()). + * + * Zero-size type — all state lives in the underlying static clock. The + * instance now() simply forwards, so this is always optimized away entirely. + * + * Example: + * async::chrono_clock_adapter clk; + * static_assert(async::clock); + */ +export template + requires requires { + { ChronoClock::now() } -> std::same_as; + typename ChronoClock::duration; + } +struct chrono_clock_adapter +{ + using duration = typename ChronoClock::duration; + using time_point = typename ChronoClock::time_point; + + [[nodiscard]] time_point now() const noexcept(noexcept(ChronoClock::now())) + { + return ChronoClock::now(); + } +}; + +// ============================================================================= + +/** + * @brief Per-task scheduling state used internally by run_until_done. + * + * Tracks the absolute wake time_point for each context, computed from + * clock::now() + context::sleep_time() at the moment the context suspends. + * Templated on Clock so time_point arithmetic is always consistent with + * whatever clock was injected. + * + * Holds a context_handle that automatically manages unblock listener + * registration and cleanup. + */ +template +struct ready_time +{ + using time_point = typename Clock::time_point; + + context_handle handle; + time_point wake_time = time_point::max(); + + ready_time() = default; + + void assign(async::context* p_ctx, + async::unblock_listener* p_listener, + Clock const& p_clock) + { + handle = context_handle(p_ctx, p_listener); + refresh(p_clock); + } + + // Recompute wake_time based on the context's current state + void refresh(Clock const& p_clock) + { + async::context* ctx_ptr = handle.get(); + if (ctx_ptr && ctx_ptr->state() == blocked_by::time) { + wake_time = p_clock.now() + ctx_ptr->sleep_time(); + } else { + wake_time = time_point::max(); + } + } + + // Resume the context if its deadline has passed or it is otherwise ready. + void attempt_resume(Clock const& p_clock) + { + async::context* ctx_ptr = handle.get(); + if (!ctx_ptr || ctx_ptr->done()) { + return; + } + + if (ctx_ptr->state() == blocked_by::time && wake_time <= p_clock.now()) { + // Deadline elapsed — unblock without triggering scheduler notification + // (we are the scheduler) then resume + ctx_ptr->unblock_without_notification(); + ctx_ptr->resume(); + } else if (is_ready()) { + ctx_ptr->resume(); + } + + // Recompute after resume — state may have changed + refresh(p_clock); + } + + [[nodiscard]] bool is_ready() const noexcept + { + async::context* ctx_ptr = handle.get(); + return ctx_ptr && ctx_ptr->state() != blocked_by::io && + ctx_ptr->state() != blocked_by::time; + } + + [[nodiscard]] bool operator<(time_point const& p_other) const noexcept + { + return wake_time < p_other; + } +}; + +/** + * @brief Drives a fixed set of async::context objects to completion. + * + * Runs a cooperative scheduling loop, resuming each context that is ready and + * sleeping (via p_sleep_until) when all contexts are either blocked by time or + * blocked by I/O. The sleep is interruptable — an unblock_listener calling + * notify() on the sleep object wakes the loop early when I/O completes. + * + * Each context has an unblock listener registered via context_handle, which + * allows the sleep function to be notified when I/O operations complete. + * + * @tparam ContextCount - deduced from the array size + * @tparam Clock - any type satisfying async::clock + * + * @param p_clock - clock used to compute and compare deadlines + * @param p_sleep_until - callable(Clock::time_point) that sleeps until the + * given absolute deadline or until notified externally. Must return early when + * an unblock notification fires. + * @param p_listener - unblock listener that will be registered with all + * contexts to wake the sleep function when I/O completes. + * @param p_tasks - fixed array of non-owning context pointers to drive. + * Contexts not yet started are skipped until first resumed externally (e.g. via + * a future). + */ +export template +void run_until_done( + Clock& p_clock, + std::array p_tasks, + std::invocable auto&& p_sleep_until, + async::unblock_listener* p_listener = nullptr) +{ + using time_point = typename Clock::time_point; + constexpr auto max_wake = time_point::max(); + + std::array, ContextCount> tasks{}; + for (std::size_t i = 0; i < ContextCount; i++) { + tasks[i].assign(p_tasks[i], p_listener, p_clock); + } + + while (true) { + time_point soonest_wake_time = max_wake; + bool all_done = true; + bool can_sleep = true; + + for (auto& task : tasks) { + async::context* ctx_ptr = task.handle.get(); + if (!ctx_ptr || ctx_ptr->done()) { + continue; + } + + all_done = false; + + task.attempt_resume(p_clock); + + // After resuming, re-evaluate whether we can sleep and what the + // soonest deadline is + if (task.wake_time < soonest_wake_time) { + soonest_wake_time = task.wake_time; + } + + if (task.is_ready()) { + can_sleep = false; + } + } + + if (all_done) { + break; + } + + if (can_sleep && soonest_wake_time < max_wake) { + p_sleep_until(soonest_wake_time); + } + } +} +} // namespace async::inline v0 diff --git a/tests/clock_adapter.test.cpp b/tests/clock_adapter.test.cpp new file mode 100644 index 0000000..46bb721 --- /dev/null +++ b/tests/clock_adapter.test.cpp @@ -0,0 +1,11 @@ +module; + +#include +#include + +import async_context.schedulers; + +static_assert( + std::is_empty_v>); +static_assert( + async::clock>); diff --git a/tests/simple_scheduler.test.cpp b/tests/simple_scheduler.test.cpp index b5febc1..303ee88 100644 --- a/tests/simple_scheduler.test.cpp +++ b/tests/simple_scheduler.test.cpp @@ -48,7 +48,6 @@ async::future sensor_pipeline(async::context& ctx, int sensor_value = co_await read_sensor(ctx, p_name); int processed = co_await process_data(ctx, p_name, sensor_value); co_await write_actuator(ctx, p_name, processed); - } // Type-erased future wrapper for storing different future types diff --git a/tests/when_all.test.cpp b/tests/when_all.test.cpp new file mode 100644 index 0000000..a671a63 --- /dev/null +++ b/tests/when_all.test.cpp @@ -0,0 +1,290 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +import async_context; +import async_context.schedulers; + +using namespace std::literals; + +using time_point = decltype(std::chrono::steady_clock::now()); + +struct ready_time +{ + async::context* ctx = nullptr; + time_point wake_time = time_point::max(); + + ready_time() = default; + ready_time(async::context* p_ctx) + : ctx(p_ctx) + { + set_wake_time(); + } + ready_time& operator=(async::context* p_ctx) + { + ctx = p_ctx; + set_wake_time(); + return *this; + } + + void set_wake_time() + { + if (ctx->state() == async::blocked_by::time) { + wake_time = std::chrono::steady_clock::now() + ctx->sleep_time(); + } else { + wake_time = time_point::max(); + } + } + + ready_time(ready_time const& p_ctx) = delete; + ready_time& operator=(ready_time const& p_ctx) = delete; + ready_time(ready_time&& p_ctx) = default; + ready_time& operator=(ready_time&& p_ctx) = default; + ~ready_time() = default; + + constexpr bool operator<(ready_time const& p_other) + { + return this->wake_time < p_other.wake_time; + } + constexpr bool operator<(time_point const& p_other) + { + return this->wake_time < p_other; + } + constexpr bool operator>(time_point const& p_other) + { + return this->wake_time > p_other; + } + constexpr bool operator==(ready_time const& p_other) + { + return this->wake_time == p_other.wake_time; + } + + void attempt_resume() + { + if (wake_time <= std::chrono::steady_clock::now()) { + ctx->unblock_without_notification(); + ctx->resume(); + set_wake_time(); + } else if (is_ready()) { + ctx->resume(); + set_wake_time(); + } + } + + constexpr bool is_ready() + { + return ctx->state() != async::blocked_by::io and + ctx->state() != async::blocked_by::time; + } +}; + +template +void run_until_done(std::invocable auto&& p_sleep_until, + std::array p_tasks) +{ + std::array tasks{}; + + // Fill time points with the list of tasks in p_tasks + static_assert(tasks.size() == p_tasks.size()); + for (std::size_t i = 0; i < ContextCount; i++) { + tasks[i] = p_tasks[i]; + } + + while (true) { + constexpr auto max_wake = time_point::max(); + time_point soonest_wake_time = max_wake; + bool all_done = true; + bool can_sleep = true; + + for (auto& task : tasks) { + if (task.ctx->done()) { + continue; + } + + all_done = false; + + task.attempt_resume(); + + if (task < soonest_wake_time) { + soonest_wake_time = task.wake_time; + } + + if (task.is_ready()) { + can_sleep = false; + } + } + + if (all_done) { + break; + } + + if (can_sleep and soonest_wake_time < max_wake) { + p_sleep_until(soonest_wake_time); + } + } +} + +async::task sleeping_task(async::context& p_ctx, + std::string_view p_name, + async::sleep_duration p_sleep_time) +{ + std::println("[{}] Starting...", p_name); + + for (int i = 1; i <= 5; i++) { + co_await p_sleep_time; + std::println("[{}] sleep {}", p_name, i); + } + co_return; +} + +using high_res_clock = std::chrono::steady_clock; +using high_res_clock_time = high_res_clock::time_point; + +template +async::future> +track_sleep_precision(async::context&, async::sleep_duration p_sleep_time) +{ + std::array wake_times{}; + + wake_times[0] = high_res_clock::now(); + for (std::size_t i = 1; i <= SleepTime; i++) { + co_await p_sleep_time; + wake_times[i] = high_res_clock::now(); + } + co_return wake_times; +} + +int main() +{ + async::inplace_context<1024> ctx0; + async::inplace_context<1024> ctx1; + async::inplace_context<1024> ctx2; + async::inplace_context<1024> ctx3; + async::inplace_context<1024> ctx4; + async::inplace_context<1024> ctx5; + [[maybe_unused]] async::inplace_context<1024> ctx6; + [[maybe_unused]] async::inplace_context<1024> ctx7; + [[maybe_unused]] async::inplace_context<1024> ctx8; + [[maybe_unused]] async::inplace_context<1024> ctx9; + + std::println("Starting When All Test!"); + std::vector future_set(16); + + future_set.push_back(sleeping_task(ctx0, "A", 8ms)); + future_set.push_back(sleeping_task(ctx1, "B", 22ms)); + future_set.push_back(sleeping_task(ctx2, "C", 35ms)); + future_set.push_back(sleeping_task(ctx3, "D", 40ms)); + future_set.push_back(sleeping_task(ctx4, "E", 75ms)); + future_set.push_back(sleeping_task(ctx5, "F", 100ms)); + + run_until_done( + [](time_point p_wake_time) { + auto duration = std::chrono::duration_cast( + p_wake_time - std::chrono::steady_clock::now()); + std::println("sleeping for {} (epoch: {})", + duration, + std::chrono::duration_cast( + p_wake_time.time_since_epoch())); + std::this_thread::sleep_until(p_wake_time); + }, + std::to_array({ + &ctx0, + &ctx1, + &ctx2, + &ctx3, + &ctx4, + &ctx5, + &ctx6, + &ctx7, + &ctx8, + &ctx9, + })); + + std::println("✅ Test finished!\n\n"); + std::println("🚀 Starting track_sleep_precision test! ============="); + + constexpr auto sleep_time0 = 8ms; + constexpr auto sleep_time1 = 22ms; + constexpr auto sleep_time2 = 35ms; + constexpr auto sleep_time3 = 40ms; + constexpr auto sleep_time4 = 75ms; + constexpr auto sleep_time5 = 100ms; + + auto ftx0 = track_sleep_precision<10>(ctx0, sleep_time0); + auto ftx1 = track_sleep_precision<10>(ctx1, sleep_time1); + auto ftx2 = track_sleep_precision<10>(ctx2, sleep_time2); + auto ftx3 = track_sleep_precision<10>(ctx3, sleep_time3); + auto ftx4 = track_sleep_precision<10>(ctx4, sleep_time4); + auto ftx5 = track_sleep_precision<10>(ctx5, sleep_time5); + + run_until_done( + [](time_point p_wake_time) { + auto duration = std::chrono::duration_cast( + p_wake_time - std::chrono::steady_clock::now()); + std::this_thread::sleep_for(duration); + }, + std::to_array({ + &ctx0, + &ctx1, + &ctx2, + &ctx3, + &ctx4, + &ctx5, + &ctx6, + &ctx7, + &ctx8, + &ctx9, + })); + + // Calculate and print jitter for each future + std::println("\n📊 Sleep Precision Analysis:"); + std::println("{:<10} {:<15} {:<15} {:<15} {:<15}", + "Context", + "Expected (ms)", + "Actual (ms)", + "Jitter (ms)", + "Jitter %"); + std::println("{:-<80}", ""); + + auto print_jitter = [](std::string_view p_name, + async::sleep_duration p_expected, + auto const& p_wake_times) { + using namespace std::chrono; + + double const expected_ms = + duration_cast>(p_expected).count(); + + for (std::size_t i = 1; i < p_wake_times.size(); i++) { + auto const actual_duration = p_wake_times[i] - p_wake_times[i - 1]; + double const actual_ms = + duration_cast>(actual_duration).count(); + double const jitter_ms = actual_ms - expected_ms; + double const jitter_percent = (jitter_ms / expected_ms) * 100.0; + + std::println("{:<10} {:<15.3f} {:<15.3f} {:<15.3f} {:<14.2f}%", + p_name, + expected_ms, + actual_ms, + jitter_ms, + jitter_percent); + } + }; + + print_jitter("ctx0", sleep_time0, ftx0.value()); + print_jitter("ctx1", sleep_time1, ftx1.value()); + print_jitter("ctx2", sleep_time2, ftx2.value()); + print_jitter("ctx3", sleep_time3, ftx3.value()); + print_jitter("ctx4", sleep_time4, ftx4.value()); + print_jitter("ctx5", sleep_time5, ftx5.value()); + + return 0; +}