diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index fd3fc02f62..ee37314721 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -189,8 +189,7 @@ class WorkflowEntry: ) try: - # run node - generator = node.run() + generator = cls._traced_node_run(node) except Exception as e: logger.exception( "error while running node, workflow_id=%s, node_id=%s, node_type=%s, node_version=%s", @@ -323,8 +322,7 @@ class WorkflowEntry: tenant_id=tenant_id, ) - # run node - generator = node.run() + generator = cls._traced_node_run(node) return node, generator except Exception as e: @@ -430,3 +428,26 @@ class WorkflowEntry: input_value = current_variable.value | input_value variable_pool.add([variable_node_id] + variable_key_list, input_value) + + @staticmethod + def _traced_node_run(node: Node) -> Generator[GraphNodeEventBase, None, None]: + """ + Wraps a node's run method with OpenTelemetry tracing and returns a generator. + """ + # Wrap node.run() with ObservabilityLayer hooks to produce node-level spans + layer = ObservabilityLayer() + layer.on_graph_start() + node.ensure_execution_id() + + def _gen(): + error: Exception | None = None + layer.on_node_run_start(node) + try: + yield from node.run() + except Exception as exc: + error = exc + raise + finally: + layer.on_node_run_end(node, error) + + return _gen()