diff --git a/temporalio/ext/src/metric.rs b/temporalio/ext/src/metric.rs index 3eec65e8..7abd944e 100644 --- a/temporalio/ext/src/metric.rs +++ b/temporalio/ext/src/metric.rs @@ -1,8 +1,8 @@ use std::{any::Any, sync::Arc, time::Duration}; use magnus::{ - DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol, - Symbol, TryConvert, TypedData, Value, function, + DataTypeFunctions, Error, Float, Integer, RArray, RClass, RHash, RModule, RString, Ruby, + StaticSymbol, Symbol, TryConvert, TypedData, Value, function, gc::register_mark_object, method, prelude::*, @@ -13,7 +13,7 @@ use temporalio_common::telemetry::metrics::{ self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, }; -use crate::{ROOT_MOD, error, id, runtime::Runtime, util::SendSyncBoxValue}; +use crate::{ROOT_MOD, error, id, runtime::Runtime, util::ThreadSafeBoxValue}; pub fn init(ruby: &Ruby) -> Result<(), Error> { let root_mod = ruby.get_inner(&ROOT_MOD); @@ -269,14 +269,14 @@ fn metric_key_value(k: Value, v: Value) -> Result>, + value: Arc>, } impl BufferInstrumentRef for BufferedMetricRef {} #[derive(Debug)] struct BufferedMetricAttributes { - value: SendSyncBoxValue, + value: ThreadSafeBoxValue, } impl CustomMetricAttributes for BufferedMetricAttributes { @@ -325,12 +325,21 @@ pub fn convert_metric_events( ruby: &Ruby, events: Vec>, durations_as_seconds: bool, -) -> Result, Error> { - let temp: Result>, Error> = events - .into_iter() - .map(|e| convert_metric_event(ruby, e, durations_as_seconds)) - .collect(); - Ok(temp?.into_iter().flatten().collect()) +) -> Result { + // We must use an RArray (not Vec) to hold intermediate results. + // Ruby's GC scans the native stack but not the Rust heap. A Vec's + // backing buffer lives on the Rust heap, so GC cannot see the VALUEs stored + // there. If a subsequent funcall triggers GC while we're still iterating, + // previously created Update objects in the Vec could be collected, leaving + // dangling VALUE pointers that cause a segfault. Pushing into an RArray + // immediately makes each object reachable from a GC-visible root. + let result = ruby.ary_new_capa(events.len()); + for event in events { + if let Some(val) = convert_metric_event(ruby, event, durations_as_seconds)? { + result.push(val)?; + } + } + Ok(result) } fn convert_metric_event( @@ -386,7 +395,7 @@ fn convert_metric_event( // Put on lazy ref populate_into .set(Arc::new(BufferedMetricRef { - value: Arc::new(SendSyncBoxValue::new(val)), + value: Arc::new(ThreadSafeBoxValue::new(val)), })) .map_err(|_| error!("Failed setting metric ref"))?; Ok(None) @@ -427,7 +436,7 @@ fn convert_metric_event( // Put on lazy ref populate_into .set(Arc::new(BufferedMetricAttributes { - value: SendSyncBoxValue::new(hash), + value: ThreadSafeBoxValue::new(hash), })) .map_err(|_| error!("Failed setting metric attrs"))?; Ok(None) diff --git a/temporalio/ext/src/runtime.rs b/temporalio/ext/src/runtime.rs index e92c0094..74faa0fa 100644 --- a/temporalio/ext/src/runtime.rs +++ b/temporalio/ext/src/runtime.rs @@ -267,8 +267,7 @@ impl Runtime { .metrics_call_buffer .clone() .expect("Attempting to retrieve buffered metrics without buffer"); - let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?; - Ok(ruby.ary_new_from_values(&updates)) + convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds) } } diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index fad968aa..664029ac 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -1,4 +1,5 @@ use std::ffi::c_void; +use std::mem::ManuallyDrop; use magnus::symbol::IntoSymbol; use magnus::value::{BoxValue, OpaqueId, ReprValue}; @@ -157,3 +158,46 @@ impl SendSyncBoxValue { *self.0 } } + +/// Like SendSyncBoxValue but safe to drop from any thread. When dropped on a +/// non-Ruby thread (e.g. a Tokio worker), the BoxValue and its GC registration +/// are intentionally leaked instead of calling rb_gc_unregister_address (which +/// would corrupt Ruby's GC data structures). On a Ruby thread, cleanup proceeds +/// normally. +/// +/// Use this for metric buffer objects (instruments, attribute sets) where: +/// - The Rust Core SDK may drop Arc references on Tokio threads +/// - The number of unique objects is bounded (small, acceptable leak) +pub(crate) struct ThreadSafeBoxValue(ManuallyDrop>); + +unsafe impl Send for ThreadSafeBoxValue {} +unsafe impl Sync for ThreadSafeBoxValue {} + +impl ThreadSafeBoxValue { + pub fn new(val: T) -> Self { + Self(ManuallyDrop::new(BoxValue::new(val))) + } + + pub fn value(&self, _: &Ruby) -> T { + **self.0 + } +} + +impl Drop for ThreadSafeBoxValue { + fn drop(&mut self) { + if Ruby::get().is_ok() { + // On a Ruby thread: safe to call rb_gc_unregister_address + unsafe { + ManuallyDrop::drop(&mut self.0); + } + } + // On a non-Ruby thread: intentionally leak the BoxValue to avoid + // calling rb_gc_unregister_address from a thread unknown to Ruby's VM + } +} + +impl std::fmt::Debug for ThreadSafeBoxValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ThreadSafeBoxValue").finish() + } +}