diff --git a/crates/cachet/src/fallback.rs b/crates/cachet/src/fallback.rs index b775bbd5..6188e901 100644 --- a/crates/cachet/src/fallback.rs +++ b/crates/cachet/src/fallback.rs @@ -17,7 +17,6 @@ use tick::Clock; use crate::Error; use crate::cache::CacheName; use crate::refresh::TimeToRefresh; -use crate::telemetry::ext::ClockExt; use crate::telemetry::{CacheActivity, CacheOperation, CacheTelemetry}; /// Type alias for promotion predicate functions. @@ -212,7 +211,7 @@ where /// /// Separated from [`get`](Self::get) to keep the hot path (primary hits) small. async fn get_from_fallback(&self, key: &K) -> Result>, Error> { - let timed = self.inner.clock.timed_async(self.inner.fallback.get(key)).await; + let timed = self.inner.clock.timed(self.inner.fallback.get(key)).await; self.inner .telemetry .record(self.inner.name, CacheOperation::Get, CacheActivity::Fallback, timed.duration); @@ -223,11 +222,7 @@ where if let Some(ref v) = fallback_value && self.inner.policy.should_promote(v) { - let timed_insert = self - .inner - .clock - .timed_async(self.inner.primary.insert(key.clone(), v.clone())) - .await; + let timed_insert = self.inner.clock.timed(self.inner.primary.insert(key.clone(), v.clone())).await; // Insert errors are intentionally swallowed - a failed promotion should not // fail the overall get. The CacheWrapper around the primary tier already // records an Error activity on insert failure. diff --git a/crates/cachet/src/refresh.rs b/crates/cachet/src/refresh.rs index b3369d5c..c572a77e 100644 --- a/crates/cachet/src/refresh.rs +++ b/crates/cachet/src/refresh.rs @@ -18,7 +18,6 @@ use cachet_tier::{CacheEntry, CacheTier}; use parking_lot::Mutex; use crate::fallback::{FallbackCache, FallbackCacheInner}; -use crate::telemetry::ext::ClockExt; use crate::telemetry::{CacheActivity, CacheOperation}; /// Configuration for background cache refresh. @@ -146,7 +145,7 @@ where F: CacheTier + Send + Sync + 'static, { pub(crate) async fn fetch_and_promote(&self, key: K) { - let timed = self.clock.timed_async(self.fallback.get(&key)).await; + let timed = self.clock.timed(self.fallback.get(&key)).await; match timed.result { Ok(Some(value)) => self.handle_fallback_hit(key, value, timed.duration).await, @@ -164,7 +163,7 @@ where } async fn promote_to_primary(&self, key: K, value: CacheEntry) { - let timed = self.clock.timed_async(self.primary.insert(key, value)).await; + let timed = self.clock.timed(self.primary.insert(key, value)).await; match timed.result { Ok(()) => { diff --git a/crates/cachet/src/telemetry/ext.rs b/crates/cachet/src/telemetry/ext.rs deleted file mode 100644 index 1f1c18bc..00000000 --- a/crates/cachet/src/telemetry/ext.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -//! Extension traits for telemetry recording. - -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use pin_project_lite::pin_project; -use tick::{Clock, Stopwatch}; - -/// Result of a timed async operation. -#[derive(Debug, Clone, Copy)] -pub struct TimedResult { - /// The result of the operation. - pub result: R, - /// The duration of the operation. - pub duration: Duration, -} - -pin_project! { - /// A future that times the inner future's execution. - #[must_use = "futures do nothing unless polled"] - pub struct Timed { - #[pin] - inner: F, - watch: Stopwatch, - } -} - -impl Future for Timed { - type Output = TimedResult; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.inner.poll(cx) { - Poll::Ready(result) => Poll::Ready(TimedResult { - result, - duration: this.watch.elapsed(), - }), - Poll::Pending => Poll::Pending, - } - } -} - -/// Extension trait for timing async operations. -pub trait ClockExt { - /// Times an async operation and returns both the result and elapsed duration. - fn timed_async(&self, f: F) -> Timed - where - F: Future; -} - -impl ClockExt for Clock { - fn timed_async(&self, f: F) -> Timed - where - F: Future, - { - Timed { - inner: f, - watch: self.stopwatch(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn block_on(f: F) -> F::Output { - futures::executor::block_on(f) - } - - #[test] - fn clock_ext_timed_async_measures_duration() { - block_on(async { - let control = tick::ClockControl::new(); - let clock = control.to_clock(); - - let timed = clock - .timed_async(async { - control.advance(Duration::from_millis(100)); - 42 - }) - .await; - - assert_eq!(timed.result, 42); - assert_eq!(timed.duration, Duration::from_millis(100)); - }); - } - - #[test] - fn clock_ext_timed_async_handles_pending() { - use std::pin::Pin; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::task::{Context, Poll}; - - /// A future that returns Pending on the first poll, then Ready on the second. - struct YieldOnce { - yielded: Arc, - } - - impl std::future::Future for YieldOnce { - type Output = i32; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.yielded.swap(true, Ordering::SeqCst) { - Poll::Ready(99) - } else { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } - - block_on(async { - let control = tick::ClockControl::new(); - let clock = control.to_clock(); - - let timed = clock - .timed_async(YieldOnce { - yielded: Arc::new(AtomicBool::new(false)), - }) - .await; - - assert_eq!(timed.result, 99); - }); - } -} diff --git a/crates/cachet/src/telemetry/mod.rs b/crates/cachet/src/telemetry/mod.rs index ac33a471..caf990d5 100644 --- a/crates/cachet/src/telemetry/mod.rs +++ b/crates/cachet/src/telemetry/mod.rs @@ -10,7 +10,6 @@ pub(crate) mod attributes; pub(crate) mod cache; pub(crate) mod config; -pub(crate) mod ext; #[cfg(any(feature = "metrics", test))] pub(crate) mod metrics; diff --git a/crates/cachet/src/wrapper.rs b/crates/cachet/src/wrapper.rs index cf9c13b4..31eb4d04 100644 --- a/crates/cachet/src/wrapper.rs +++ b/crates/cachet/src/wrapper.rs @@ -14,7 +14,6 @@ use cachet_tier::CacheTier; use tick::Clock; use crate::cache::CacheName; -use crate::telemetry::ext::ClockExt; use crate::telemetry::{CacheActivity, CacheOperation, CacheTelemetry}; use crate::{CacheEntry, Error}; @@ -124,7 +123,7 @@ where CT: CacheTier + Send + Sync, { async fn get(&self, key: &K) -> Result>, Error> { - let timed = self.clock.timed_async(self.inner.get(key)).await; + let timed = self.clock.timed(self.inner.get(key)).await; match timed.result { Ok(value) => Ok(self.handle_get_result(value, timed.duration)), Err(e) => { @@ -137,7 +136,7 @@ where async fn insert(&self, key: K, mut entry: CacheEntry) -> Result<(), Error> { entry.ensure_cached_at(self.clock.system_time()); - let timed = self.clock.timed_async(self.inner.insert(key, entry)).await; + let timed = self.clock.timed(self.inner.insert(key, entry)).await; match &timed.result { Ok(()) => { self.telemetry @@ -155,7 +154,7 @@ where } async fn invalidate(&self, key: &K) -> Result<(), Error> { - let timed = self.clock.timed_async(self.inner.invalidate(key)).await; + let timed = self.clock.timed(self.inner.invalidate(key)).await; match &timed.result { Ok(()) => { self.telemetry @@ -173,7 +172,7 @@ where } async fn clear(&self) -> Result<(), Error> { - let timed = self.clock.timed_async(self.inner.clear()).await; + let timed = self.clock.timed(self.inner.clear()).await; match &timed.result { Ok(()) => { self.telemetry diff --git a/crates/tick/Cargo.toml b/crates/tick/Cargo.toml index ca3ec418..9b3e8c8d 100644 --- a/crates/tick/Cargo.toml +++ b/crates/tick/Cargo.toml @@ -48,6 +48,7 @@ thread_aware.workspace = true tokio = { workspace = true, optional = true, features = ["time", "rt"] } [dev-dependencies] +#internal ohno = { workspace = true, features = ["app-err"] } #external @@ -96,6 +97,10 @@ required-features = ["test-util"] name = "interop_jiff" required-features = ["test-util"] +[[example]] +name = "timed_result" +required-features = ["tokio"] + [[bench]] name = "clock_bench" harness = false diff --git a/crates/tick/examples/timed_result.rs b/crates/tick/examples/timed_result.rs new file mode 100644 index 00000000..459d29f3 --- /dev/null +++ b/crates/tick/examples/timed_result.rs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! This example demonstrates how to measure the execution time of an async +//! operation using [`Clock::timed`] and [`TimedResult`]. + +use tick::{Clock, TimedResult}; + +#[tokio::main] +async fn main() { + // Create a clock for the Tokio runtime. + let clock = Clock::new_tokio(); + + // Start some background work that returns a result after a delay. + let background_job = async { + clock.delay(std::time::Duration::from_millis(10)).await; + "Background job result" + }; + + // Use `Timed` to measure the time taken by the background job and capture its result. + let TimedResult { result, duration } = clock.timed(background_job).await; + + // Print the result and the elapsed time. + println!("Result: {result}, Elapsed time: {duration:?}"); +} diff --git a/crates/tick/src/clock.rs b/crates/tick/src/clock.rs index b8153295..62f7dcc3 100644 --- a/crates/tick/src/clock.rs +++ b/crates/tick/src/clock.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant, SystemTime}; use thread_aware::ThreadAware; use thread_aware::affinity::{MemoryAffinity, PinnedAffinity}; +use crate::Timed; use crate::state::ClockState; use crate::timers::TimerKey; @@ -459,6 +460,37 @@ impl Clock { crate::Stopwatch::new(self) } + /// Wraps a future so that its execution time is measured. + /// + /// Returns a [`Timed`] future whose output is a [`TimedResult`][crate::TimedResult] + /// containing both the inner future's result and the elapsed duration. + /// + /// The measurement uses the same clock as the [`Stopwatch`][crate::Stopwatch], + /// so time can be controlled in tests via [`ClockControl`][crate::ClockControl]. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use tick::{Clock, TimedResult}; + /// + /// # async fn timed_example(clock: &Clock) { + /// let TimedResult { result, duration } = clock.timed(async { 42 }).await; + /// assert_eq!(result, 42); + /// assert!(duration >= Duration::from_millis(0)); + /// # } + /// ``` + pub fn timed(&self, f: F) -> Timed + where + F: Future, + { + Timed { + inner: f, + watch: self.stopwatch(), + } + } + pub(super) fn register_timer(&self, when: Instant, waker: Waker) -> TimerKey { match self.clock_state() { #[cfg(any(feature = "test-util", test))] @@ -781,4 +813,58 @@ mod tests { insta::assert_debug_snapshot!(clock); } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn timed_measures_duration() { + let control = ClockControl::new(); + let clock = control.to_clock(); + + let timed = clock + .timed(async { + control.advance(Duration::from_millis(100)); + 42 + }) + .await; + + assert_eq!(timed.result, 42); + assert_eq!(timed.duration, Duration::from_millis(100)); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn timed_handles_pending() { + use std::pin::Pin; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::task::{Context, Poll}; + + /// A future that returns Pending on the first poll, then Ready on the second. + struct YieldOnce { + yielded: Arc, + } + + impl std::future::Future for YieldOnce { + type Output = i32; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.yielded.swap(true, Ordering::SeqCst) { + Poll::Ready(99) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + let control = ClockControl::new(); + let clock = control.to_clock(); + + let timed = clock + .timed(YieldOnce { + yielded: Arc::new(AtomicBool::new(false)), + }) + .await; + + assert_eq!(timed.result, 99); + } } diff --git a/crates/tick/src/lib.rs b/crates/tick/src/lib.rs index 3c8f6a2e..56f58e9c 100644 --- a/crates/tick/src/lib.rs +++ b/crates/tick/src/lib.rs @@ -251,6 +251,7 @@ mod periodic_timer; mod state; mod stopwatch; mod system_time_ext; +mod timed; mod timers; pub mod runtime; @@ -264,6 +265,7 @@ pub use future_ext::FutureExt; pub use periodic_timer::PeriodicTimer; pub use stopwatch::Stopwatch; pub use system_time_ext::SystemTimeExt; +pub use timed::{Timed, TimedResult}; pub use timeout::Timeout; /// Implements [`ThreadAware`](thread_aware::ThreadAware) for types that don't require any special relocation handling. diff --git a/crates/tick/src/timed.rs b/crates/tick/src/timed.rs new file mode 100644 index 00000000..0b8444b4 --- /dev/null +++ b/crates/tick/src/timed.rs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Utilities for measuring the execution time of asynchronous operations. + +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project_lite::pin_project; + +use crate::Stopwatch; + +/// The result of a timed async operation, containing both the inner future's +/// output and the elapsed [`Duration`]. +/// +/// Produced by awaiting a [`Timed`] future, which is created via [`Clock::timed`][crate::Clock::timed]. +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use tick::{Clock, TimedResult}; +/// +/// # async fn example(clock: &Clock) { +/// let TimedResult { result, duration } = clock.timed(async { 42 }).await; +/// assert_eq!(result, 42); +/// assert!(duration >= Duration::from_millis(0)); +/// # } +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct TimedResult { + /// The output of the inner future. + pub result: R, + /// The elapsed duration of the operation, measured using a monotonic clock. + pub duration: Duration, +} + +pin_project! { + /// A future that wraps an inner future and measures its execution time. + /// + /// When the inner future completes, `Timed` yields a [`TimedResult`] containing + /// both the output and the elapsed [`Duration`]. + /// + /// Created via [`Clock::timed`][crate::Clock::timed]. + #[must_use = "futures do nothing unless polled"] + pub struct Timed { + #[pin] + pub(crate) inner: F, + pub(crate) watch: Stopwatch, + } +} + +impl Future for Timed { + type Output = TimedResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Ready(result) => Poll::Ready(TimedResult { + result, + duration: this.watch.elapsed(), + }), + Poll::Pending => Poll::Pending, + } + } +}