Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ def _attach_context(headers)
::OpenTelemetry::Context.attach(context) if context
end

# @!visibility private
def _detach_context(context_token)
::OpenTelemetry::Context.detach(context_token) if context_token
end

# @!visibility private
def _context_from_headers(headers)
carrier = headers[@header_key]
Expand Down Expand Up @@ -206,7 +211,7 @@ def initialize(root, next_interceptor)

# @!visibility private
def execute(input)
@root._attach_context(input.headers)
token = @root._attach_context(input.headers)
info = Activity::Context.current.info
@root._with_started_span(
name: "RunActivity:#{info.activity_type}",
Expand All @@ -217,6 +222,8 @@ def execute(input)
'temporalActivityID' => info.activity_id
}
) { super }
ensure
@root._detach_context(token)
end
end

Expand All @@ -236,7 +243,7 @@ def init(outbound)

# @!visibility private
def execute(input)
_attach_context(Temporalio::Workflow.info.headers)
token = _attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span("RunWorkflow:#{Temporalio::Workflow.info.workflow_type}", kind: :server) do
super
ensure
Expand All @@ -246,11 +253,13 @@ def execute(input)
exception: $ERROR_INFO # steep:ignore
)
end
ensure
_detach_context(token)
end

# @!visibility private
def handle_signal(input)
_attach_context(Temporalio::Workflow.info.headers)
token = _attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleSignal:#{input.signal}",
links: _links_from_headers(input.headers),
Expand All @@ -261,11 +270,13 @@ def handle_signal(input)
Workflow.completed_span("FailHandleSignal:#{input.signal}", kind: :internal, exception: e)
raise
end
ensure
_detach_context(token)
end

# @!visibility private
def handle_query(input)
_attach_context(Temporalio::Workflow.info.headers)
token = _attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleQuery:#{input.query}",
links: _links_from_headers(input.headers),
Expand All @@ -282,11 +293,13 @@ def handle_query(input)
)
raise
end
ensure
_detach_context(token)
end

# @!visibility private
def validate_update(input)
_attach_context(Temporalio::Workflow.info.headers)
token = _attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"ValidateUpdate:#{input.update}",
attributes: { 'temporalUpdateID' => input.id },
Expand All @@ -305,11 +318,13 @@ def validate_update(input)
)
raise
end
ensure
_detach_context(token)
end

# @!visibility private
def handle_update(input)
_attach_context(Temporalio::Workflow.info.headers)
token = _attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleUpdate:#{input.update}",
attributes: { 'temporalUpdateID' => input.id },
Expand All @@ -326,6 +341,8 @@ def handle_update(input)
)
raise
end
ensure
_detach_context(token)
end

# @!visibility private
Expand All @@ -341,6 +358,14 @@ def _attach_context(headers)
end
end

# @!visibility private
def _detach_context(context_token)
# See _attach_context above for why we have to disable scheduler even for these simple operations
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
@root._detach_context(context_token)
end
end

# @!visibility private
def _links_from_headers(headers)
# See _attach_context above for why we have to disable scheduler even for these simple operations
Expand Down
8 changes: 5 additions & 3 deletions temporalio/sig/temporalio/contrib/open_telemetry.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ module Temporalio
) -> void

def _apply_context_to_headers: (Hash[String, untyped] headers, ?context: untyped) -> void
def _attach_context: (Hash[String, untyped] headers) -> void
def _attach_context: (Hash[String, untyped] headers) -> Object?
def _detach_context: (Object? token) -> void
def _context_from_headers: (Hash[String, untyped] headers) -> untyped
def _with_started_span: [T] (
name: String,
Expand All @@ -29,7 +30,8 @@ module Temporalio
class WorkflowInbound < Worker::Interceptor::Workflow::Inbound
def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void

def _attach_context: (Hash[String, untyped] headers) -> void
def _attach_context: (Hash[String, untyped] headers) -> Object?
def _detach_context: (Object? token) -> void
def _links_from_headers: (Hash[String, untyped] headers) -> Array[untyped]
end

Expand Down Expand Up @@ -61,4 +63,4 @@ module Temporalio
end
end
end
end
end
24 changes: 24 additions & 0 deletions temporalio/test/contrib/open_telemetry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def initialize(tracer)

def execute(scenario)
case scenario.to_sym
when :success_return_trace_id
OpenTelemetry::Trace.current_span.context.hex_trace_id
when :fail_first_attempt
raise 'Intentional activity failure' if Temporalio::Activity::Context.current.info.attempt == 1

Expand Down Expand Up @@ -49,6 +51,8 @@ def execute(scenario)
Temporalio::Contrib::OpenTelemetry::Workflow.with_completed_span('custom-can-span') do
raise Temporalio::Workflow::ContinueAsNewError, :complete
end
when :activity_success_return_trace_id
Temporalio::Workflow.execute_activity(TestActivity, :success_return_trace_id, start_to_close_timeout: 30)
when :activity_fail_benign
Temporalio::Workflow.execute_activity(TestActivity, :fail_benign, start_to_close_timeout: 30)
else
Expand Down Expand Up @@ -608,6 +612,26 @@ def test_illegal_calls_on_context
ContextCurrentPatch.do_illegal_thing = false
end

def test_otel_context_cleared
traced_wf_trace_id = nil
traced_act = trace_workflow(:activity_success_return_trace_id) do |handle|
traced_wf_trace_id = handle.result
end
assert !traced_act.children.empty?

untraced_wf_trace_id = nil
untraced_span = trace_workflow(
:activity_success_return_trace_id,
start_with_untraced_client: true,
check_root: false
) do |handle|
untraced_wf_trace_id = handle.result
end
assert_empty untraced_span.children

assert traced_wf_trace_id != untraced_wf_trace_id
end

ExpectedSpan = Data.define( # rubocop:disable Layout/ClassStructure
:name,
:children,
Expand Down
Loading