From ad70d5262c32c0fb2d63c6ac3de7ba37de54bf82 Mon Sep 17 00:00:00 2001 From: Daniel Acuna Date: Sun, 8 Mar 2026 20:12:08 -0400 Subject: [PATCH 1/3] Fix MetricBuffer#retrieve_updates segfault Two issues caused segfaults when draining the metric buffer under load: 1. Ruby VALUES stored in Rust Vec invisible to GC: convert_metric_events collected Update objects into a Vec on the Rust heap. Ruby's conservative GC scans the native stack but not the Rust heap, so if a subsequent funcall triggered GC, previously created Update objects could be collected, leaving dangling VALUE pointers. Fixed by pushing directly into an RArray which acts as a GC-visible root. 2. BoxValue dropped on non-Ruby Tokio threads: The Core SDK's Tokio threads call update_attributes on metric instruments, which can drop Arc references to BufferedMetricAttributes/BufferedMetricRef. When the last Arc is dropped on a Tokio thread, BoxValue's destructor calls rb_gc_unregister_address from a thread unknown to Ruby's VM, corrupting GC internals. Fixed by introducing ThreadSafeBoxValue which uses ManuallyDrop and leaks the BoxValue when dropped on non-Ruby threads (acceptable since metric instruments/attributes are bounded cardinality). Fixes #396 Co-Authored-By: Claude Opus 4.6 --- temporalio/ext/src/metric.rs | 35 ++++++++++++++++++----------- temporalio/ext/src/runtime.rs | 3 +-- temporalio/ext/src/util.rs | 42 +++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/temporalio/ext/src/metric.rs b/temporalio/ext/src/metric.rs index 3eec65e8..7abd944e 100644 --- a/temporalio/ext/src/metric.rs +++ b/temporalio/ext/src/metric.rs @@ -1,8 +1,8 @@ use std::{any::Any, sync::Arc, time::Duration}; use magnus::{ - DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol, - Symbol, TryConvert, TypedData, Value, function, + DataTypeFunctions, Error, Float, Integer, RArray, RClass, RHash, RModule, RString, Ruby, + StaticSymbol, Symbol, TryConvert, TypedData, Value, function, gc::register_mark_object, method, prelude::*, @@ -13,7 +13,7 @@ use temporalio_common::telemetry::metrics::{ self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, }; -use crate::{ROOT_MOD, error, id, runtime::Runtime, util::SendSyncBoxValue}; +use crate::{ROOT_MOD, error, id, runtime::Runtime, util::ThreadSafeBoxValue}; pub fn init(ruby: &Ruby) -> Result<(), Error> { let root_mod = ruby.get_inner(&ROOT_MOD); @@ -269,14 +269,14 @@ fn metric_key_value(k: Value, v: Value) -> Result>, + value: Arc>, } impl BufferInstrumentRef for BufferedMetricRef {} #[derive(Debug)] struct BufferedMetricAttributes { - value: SendSyncBoxValue, + value: ThreadSafeBoxValue, } impl CustomMetricAttributes for BufferedMetricAttributes { @@ -325,12 +325,21 @@ pub fn convert_metric_events( ruby: &Ruby, events: Vec>, durations_as_seconds: bool, -) -> Result, Error> { - let temp: Result>, Error> = events - .into_iter() - .map(|e| convert_metric_event(ruby, e, durations_as_seconds)) - .collect(); - Ok(temp?.into_iter().flatten().collect()) +) -> Result { + // We must use an RArray (not Vec) to hold intermediate results. + // Ruby's GC scans the native stack but not the Rust heap. A Vec's + // backing buffer lives on the Rust heap, so GC cannot see the VALUEs stored + // there. If a subsequent funcall triggers GC while we're still iterating, + // previously created Update objects in the Vec could be collected, leaving + // dangling VALUE pointers that cause a segfault. Pushing into an RArray + // immediately makes each object reachable from a GC-visible root. + let result = ruby.ary_new_capa(events.len()); + for event in events { + if let Some(val) = convert_metric_event(ruby, event, durations_as_seconds)? { + result.push(val)?; + } + } + Ok(result) } fn convert_metric_event( @@ -386,7 +395,7 @@ fn convert_metric_event( // Put on lazy ref populate_into .set(Arc::new(BufferedMetricRef { - value: Arc::new(SendSyncBoxValue::new(val)), + value: Arc::new(ThreadSafeBoxValue::new(val)), })) .map_err(|_| error!("Failed setting metric ref"))?; Ok(None) @@ -427,7 +436,7 @@ fn convert_metric_event( // Put on lazy ref populate_into .set(Arc::new(BufferedMetricAttributes { - value: SendSyncBoxValue::new(hash), + value: ThreadSafeBoxValue::new(hash), })) .map_err(|_| error!("Failed setting metric attrs"))?; Ok(None) diff --git a/temporalio/ext/src/runtime.rs b/temporalio/ext/src/runtime.rs index e92c0094..74faa0fa 100644 --- a/temporalio/ext/src/runtime.rs +++ b/temporalio/ext/src/runtime.rs @@ -267,8 +267,7 @@ impl Runtime { .metrics_call_buffer .clone() .expect("Attempting to retrieve buffered metrics without buffer"); - let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?; - Ok(ruby.ary_new_from_values(&updates)) + convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds) } } diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index fad968aa..77260247 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -1,4 +1,5 @@ use std::ffi::c_void; +use std::mem::ManuallyDrop; use magnus::symbol::IntoSymbol; use magnus::value::{BoxValue, OpaqueId, ReprValue}; @@ -157,3 +158,44 @@ impl SendSyncBoxValue { *self.0 } } + +/// Like SendSyncBoxValue but safe to drop from any thread. When dropped on a +/// non-Ruby thread (e.g. a Tokio worker), the BoxValue and its GC registration +/// are intentionally leaked instead of calling rb_gc_unregister_address (which +/// would corrupt Ruby's GC data structures). On a Ruby thread, cleanup proceeds +/// normally. +/// +/// Use this for metric buffer objects (instruments, attribute sets) where: +/// - The Rust Core SDK may drop Arc references on Tokio threads +/// - The number of unique objects is bounded (small, acceptable leak) +pub(crate) struct ThreadSafeBoxValue(ManuallyDrop>); + +unsafe impl Send for ThreadSafeBoxValue {} +unsafe impl Sync for ThreadSafeBoxValue {} + +impl ThreadSafeBoxValue { + pub fn new(val: T) -> Self { + Self(ManuallyDrop::new(BoxValue::new(val))) + } + + pub fn value(&self, _: &Ruby) -> T { + **self.0 + } +} + +impl Drop for ThreadSafeBoxValue { + fn drop(&mut self) { + if Ruby::get().is_ok() { + // On a Ruby thread: safe to call rb_gc_unregister_address + unsafe { ManuallyDrop::drop(&mut self.0); } + } + // On a non-Ruby thread: intentionally leak the BoxValue to avoid + // calling rb_gc_unregister_address from a thread unknown to Ruby's VM + } +} + +impl std::fmt::Debug for ThreadSafeBoxValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ThreadSafeBoxValue").finish() + } +} From e9cc3b7fb361f74211783a8d57834bc39948869c Mon Sep 17 00:00:00 2001 From: Daniel Acuna <66972804+dacuna-ic@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:13:08 -0400 Subject: [PATCH 2/3] Update temporalio/ext/src/util.rs Co-authored-by: Chris Olszewski --- temporalio/ext/src/util.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index 77260247..0325511f 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -187,7 +187,9 @@ impl Drop for ThreadSafeBoxValue { fn drop(&mut self) { if Ruby::get().is_ok() { // On a Ruby thread: safe to call rb_gc_unregister_address - unsafe { ManuallyDrop::drop(&mut self.0); } + unsafe { + ManuallyDrop::drop(&mut self.0); + } } // On a non-Ruby thread: intentionally leak the BoxValue to avoid // calling rb_gc_unregister_address from a thread unknown to Ruby's VM From 7015bb0173e0ab7c13057362d01bc2a11608406d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 13 Mar 2026 11:05:29 -0400 Subject: [PATCH 3/3] Apply suggestion from @chris-olszewski --- temporalio/ext/src/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index 0325511f..664029ac 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -187,7 +187,7 @@ impl Drop for ThreadSafeBoxValue { fn drop(&mut self) { if Ruby::get().is_ok() { // On a Ruby thread: safe to call rb_gc_unregister_address - unsafe { + unsafe { ManuallyDrop::drop(&mut self.0); } }