From 02222794575a1f037daf0e7ef36a7ceeedefa6d3 Mon Sep 17 00:00:00 2001 From: Cheng Liu <51689021+liucheng-ms@users.noreply.github.com> Date: Thu, 1 Feb 2024 20:30:33 +0800 Subject: [PATCH] Implementing Tracing Functionality in Flow Execution (#1895) # Description This PR introduces tracing functionality in the flow execution process. The primary change involves wrapping the original `_exec` function with a new function, `_exec_with_trace`, which starts a new span, enriches it with input and output data, and sets the span status. This enhancement allows us to organize child spans into a flow-level trace, providing a more comprehensive view of flow execution. Key changes include: - New Function - `_exec_with_trace`: This function is similar to `_exec`, but includes additional tracing functionality. It starts a new span, enriches it with input and output data, and sets the span status. - Exception Handling: Exceptions are handled within the context of the new function. In the event of an exception, the span status is set to `StatusCode.ERROR`, and the exception is raised. - Span Enrichment: The new function enriches the span with input and output data, providing a more detailed view of the flow execution process. - Status Setting: The span status is set based on the execution of the function. If the function executes successfully, the status is set to `StatusCode.OK`. If an exception occurs, the status is set to `StatusCode.ERROR`. This enhancement will improve our ability to track and monitor the flow execution process, aiding in debugging and performance optimization efforts. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --- src/promptflow/promptflow/_core/tracer.py | 25 +++++-- src/promptflow/promptflow/contracts/trace.py | 1 + .../promptflow/executor/flow_executor.py | 66 ++++++++++++++++++- 3 files changed, 84 insertions(+), 8 deletions(-) diff --git a/src/promptflow/promptflow/_core/tracer.py b/src/promptflow/promptflow/_core/tracer.py index d4ef8b5584f..5fa764a7dfb 100644 --- a/src/promptflow/promptflow/_core/tracer.py +++ b/src/promptflow/promptflow/_core/tracer.py @@ -210,9 +210,8 @@ def enrich_span_with_trace(span, trace): span.set_attributes( { "framework": "promptflow", - "span_type": trace.type, + "span_type": trace.type.value, "function": trace.name, - "inputs": serialize_attribute(trace.inputs), "node_name": get_node_name_from_context(), } ) @@ -222,6 +221,16 @@ def enrich_span_with_trace(span, trace): logging.warning(f"Failed to enrich span with trace: {e}") +def enrich_span_with_input(span, input): + try: + serialized_input = serialize_attribute(input) + span.set_attribute("inputs", serialized_input) + except Exception as e: + logging.warning(f"Failed to enrich span with input: {e}") + + return input + + def enrich_span_with_output(span, output): try: serialized_output = serialize_attribute(output) @@ -234,9 +243,13 @@ def enrich_span_with_output(span, output): def serialize_attribute(value): """Serialize values that can be used as attributes in span.""" - serializable = Tracer.to_serializable(value) - serialized_value = serialize(serializable) - return json.dumps(serialized_value, indent=2, default=default_json_encoder) + try: + serializable = Tracer.to_serializable(value) + serialized_value = serialize(serializable) + return json.dumps(serialized_value, indent=2, default=default_json_encoder) + except Exception as e: + logging.warning(f"Failed to serialize attribute: {e}") + return None def _traced( @@ -291,6 +304,7 @@ async def wrapped(*args, **kwargs): # because we want to avoid long stack trace when hitting an exception. try: Tracer.push(trace) + enrich_span_with_input(span, trace.inputs) output = await func(*args, **kwargs) enrich_span_with_output(span, output) span.set_status(StatusCode.OK) @@ -335,6 +349,7 @@ def wrapped(*args, **kwargs): # because we want to avoid long stack trace when hitting an exception. try: Tracer.push(trace) + enrich_span_with_input(span, trace.inputs) output = func(*args, **kwargs) enrich_span_with_output(span, output) span.set_status(StatusCode.OK) diff --git a/src/promptflow/promptflow/contracts/trace.py b/src/promptflow/promptflow/contracts/trace.py index 61fc4e3e928..80c95ade56e 100644 --- a/src/promptflow/promptflow/contracts/trace.py +++ b/src/promptflow/promptflow/contracts/trace.py @@ -14,6 +14,7 @@ class TraceType(str, Enum): TOOL = "Tool" FUNCTION = "Function" LANGCHAIN = "LangChain" + FLOW = "Flow" @dataclass diff --git a/src/promptflow/promptflow/executor/flow_executor.py b/src/promptflow/promptflow/executor/flow_executor.py index ffadcf6778b..dbb3e69ab12 100644 --- a/src/promptflow/promptflow/executor/flow_executor.py +++ b/src/promptflow/promptflow/executor/flow_executor.py @@ -13,6 +13,8 @@ from types import GeneratorType from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple +from opentelemetry.trace.status import StatusCode + from promptflow._constants import LINE_NUMBER_KEY from promptflow._core._errors import NotSupported, UnexpectedError from promptflow._core.cache_manager import AbstractCacheManager @@ -23,6 +25,7 @@ from promptflow._core.run_tracker import RunTracker from promptflow._core.tool import STREAMING_OPTION_PARAMETER_ATTR from promptflow._core.tools_manager import ToolsManager +from promptflow._core.tracer import enrich_span_with_input, enrich_span_with_output, open_telemetry_tracer from promptflow._utils.context_utils import _change_working_dir from promptflow._utils.execution_utils import ( apply_default_value_for_input, @@ -40,6 +43,7 @@ from promptflow.contracts.flow import Flow, FlowInputDefinition, InputAssignment, InputValueType, Node from promptflow.contracts.run_info import FlowRunInfo, Status from promptflow.contracts.run_mode import RunMode +from promptflow.contracts.trace import TraceType from promptflow.exceptions import PromptflowException from promptflow.executor import _input_assignment_parser from promptflow.executor._async_nodes_scheduler import AsyncNodesScheduler @@ -632,7 +636,7 @@ def exec(self, inputs: dict, node_concurrency=DEFAULT_CONCURRENCY_FLOW) -> dict: """ self._node_concurrency = node_concurrency inputs = apply_default_value_for_input(self._flow.inputs, inputs) - result = self._exec(inputs) + result = self._exec_with_trace(inputs) # TODO: remove this line once serving directly calling self.exec_line self._add_line_results([result]) return result.output or {} @@ -642,7 +646,7 @@ def _exec_in_thread(self, args) -> LineResult: thread_name = current_thread().name self._processing_idx[line_number] = thread_name self._run_tracker._activate_in_context() - results = self._exec( + results = self._exec_with_trace( inputs, run_id=run_id, line_number=line_number, variant_id=variant_id, validate_inputs=validate_inputs ) self._run_tracker._deactivate_in_context() @@ -701,7 +705,7 @@ def exec_line( # it is not set. run_id = run_id or str(uuid.uuid4()) self._update_operation_context(run_id) - line_result = self._exec( + line_result = self._exec_with_trace( inputs, run_id=run_id, line_number=index, @@ -744,6 +748,62 @@ def _get_node_referenced_flow_inputs( node_referenced_flow_inputs[value.value] = flow_inputs[value.value] return node_referenced_flow_inputs + def _exec_with_trace( + self, + inputs: Mapping[str, Any], + run_id: Optional[str] = None, + line_number: Optional[int] = None, + variant_id: str = "", + validate_inputs: bool = False, + allow_generator_output: bool = False, + ) -> LineResult: + """execute line run with trace + + This method is similar to `_exec`, but it also includes tracing functionality. + It starts a new span, enriches it with input and output data, and sets the span status. + + Args: + inputs (Mapping): flow inputs + run_id: the id to identify the flow run + line_number: line number for batch inputs + variant_id: variant id for the line run + validate_inputs: + Flag to indicate if input validation needed. It is used along with "_raise_ex" to + define if exception shall be raised if inputs validation (type check, etc) failed + The flag is True for Flow Run, False for bulk run as default + allow_generator_output: + Flag to indicate if generator output is allowed. + + Returns: + LineResult: Line run result + """ + with open_telemetry_tracer.start_as_current_span("promptflow.flow") as span: + # initialize span + span.set_attributes( + { + "framework": "promptflow", + "span_type": TraceType.FLOW.value, + } + ) + # enrich span with input + enrich_span_with_input(span, inputs) + # invoke + result = self._exec( + inputs, + run_id=run_id, + line_number=line_number, + variant_id=variant_id, + validate_inputs=validate_inputs, + allow_generator_output=allow_generator_output, + ) + # extract output from result + output = result.output + # enrich span with output + enrich_span_with_output(span, output) + # set status + span.set_status(StatusCode.OK) + return result + def _exec( self, inputs: Mapping[str, Any],