Skip to content

Commit

Permalink
Implementing Tracing Functionality in Flow Execution (#1895)
Browse files Browse the repository at this point in the history
# Description

This PR introduces tracing functionality in the flow execution process.
The primary change involves wrapping the original `_exec` function with
a new function, `_exec_with_trace`, which starts a new span, enriches it
with input and output data, and sets the span status. This enhancement
allows us to organize child spans into a flow-level trace, providing a
more comprehensive view of flow execution.

Key changes include:

- New Function - `_exec_with_trace`: This function is similar to
`_exec`, but includes additional tracing functionality. It starts a new
span, enriches it with input and output data, and sets the span status.
- Exception Handling: Exceptions are handled within the context of the
new function. In the event of an exception, the span status is set to
`StatusCode.ERROR`, and the exception is raised.
- Span Enrichment: The new function enriches the span with input and
output data, providing a more detailed view of the flow execution
process.
- Status Setting: The span status is set based on the execution of the
function. If the function executes successfully, the status is set to
`StatusCode.OK`. If an exception occurs, the status is set to
`StatusCode.ERROR`.

This enhancement will improve our ability to track and monitor the flow
execution process, aiding in debugging and performance optimization
efforts.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
liucheng-ms authored Feb 1, 2024
1 parent 772bb0c commit 0222279
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 8 deletions.
25 changes: 20 additions & 5 deletions src/promptflow/promptflow/_core/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ def enrich_span_with_trace(span, trace):
span.set_attributes(
{
"framework": "promptflow",
"span_type": trace.type,
"span_type": trace.type.value,
"function": trace.name,
"inputs": serialize_attribute(trace.inputs),
"node_name": get_node_name_from_context(),
}
)
Expand All @@ -222,6 +221,16 @@ def enrich_span_with_trace(span, trace):
logging.warning(f"Failed to enrich span with trace: {e}")


def enrich_span_with_input(span, input):
try:
serialized_input = serialize_attribute(input)
span.set_attribute("inputs", serialized_input)
except Exception as e:
logging.warning(f"Failed to enrich span with input: {e}")

return input


def enrich_span_with_output(span, output):
try:
serialized_output = serialize_attribute(output)
Expand All @@ -234,9 +243,13 @@ def enrich_span_with_output(span, output):

def serialize_attribute(value):
"""Serialize values that can be used as attributes in span."""
serializable = Tracer.to_serializable(value)
serialized_value = serialize(serializable)
return json.dumps(serialized_value, indent=2, default=default_json_encoder)
try:
serializable = Tracer.to_serializable(value)
serialized_value = serialize(serializable)
return json.dumps(serialized_value, indent=2, default=default_json_encoder)
except Exception as e:
logging.warning(f"Failed to serialize attribute: {e}")
return None


def _traced(
Expand Down Expand Up @@ -291,6 +304,7 @@ async def wrapped(*args, **kwargs):
# because we want to avoid long stack trace when hitting an exception.
try:
Tracer.push(trace)
enrich_span_with_input(span, trace.inputs)
output = await func(*args, **kwargs)
enrich_span_with_output(span, output)
span.set_status(StatusCode.OK)
Expand Down Expand Up @@ -335,6 +349,7 @@ def wrapped(*args, **kwargs):
# because we want to avoid long stack trace when hitting an exception.
try:
Tracer.push(trace)
enrich_span_with_input(span, trace.inputs)
output = func(*args, **kwargs)
enrich_span_with_output(span, output)
span.set_status(StatusCode.OK)
Expand Down
1 change: 1 addition & 0 deletions src/promptflow/promptflow/contracts/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TraceType(str, Enum):
TOOL = "Tool"
FUNCTION = "Function"
LANGCHAIN = "LangChain"
FLOW = "Flow"


@dataclass
Expand Down
66 changes: 63 additions & 3 deletions src/promptflow/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from types import GeneratorType
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple

from opentelemetry.trace.status import StatusCode

from promptflow._constants import LINE_NUMBER_KEY
from promptflow._core._errors import NotSupported, UnexpectedError
from promptflow._core.cache_manager import AbstractCacheManager
Expand All @@ -23,6 +25,7 @@
from promptflow._core.run_tracker import RunTracker
from promptflow._core.tool import STREAMING_OPTION_PARAMETER_ATTR
from promptflow._core.tools_manager import ToolsManager
from promptflow._core.tracer import enrich_span_with_input, enrich_span_with_output, open_telemetry_tracer
from promptflow._utils.context_utils import _change_working_dir
from promptflow._utils.execution_utils import (
apply_default_value_for_input,
Expand All @@ -40,6 +43,7 @@
from promptflow.contracts.flow import Flow, FlowInputDefinition, InputAssignment, InputValueType, Node
from promptflow.contracts.run_info import FlowRunInfo, Status
from promptflow.contracts.run_mode import RunMode
from promptflow.contracts.trace import TraceType
from promptflow.exceptions import PromptflowException
from promptflow.executor import _input_assignment_parser
from promptflow.executor._async_nodes_scheduler import AsyncNodesScheduler
Expand Down Expand Up @@ -632,7 +636,7 @@ def exec(self, inputs: dict, node_concurrency=DEFAULT_CONCURRENCY_FLOW) -> dict:
"""
self._node_concurrency = node_concurrency
inputs = apply_default_value_for_input(self._flow.inputs, inputs)
result = self._exec(inputs)
result = self._exec_with_trace(inputs)
# TODO: remove this line once serving directly calling self.exec_line
self._add_line_results([result])
return result.output or {}
Expand All @@ -642,7 +646,7 @@ def _exec_in_thread(self, args) -> LineResult:
thread_name = current_thread().name
self._processing_idx[line_number] = thread_name
self._run_tracker._activate_in_context()
results = self._exec(
results = self._exec_with_trace(
inputs, run_id=run_id, line_number=line_number, variant_id=variant_id, validate_inputs=validate_inputs
)
self._run_tracker._deactivate_in_context()
Expand Down Expand Up @@ -701,7 +705,7 @@ def exec_line(
# it is not set.
run_id = run_id or str(uuid.uuid4())
self._update_operation_context(run_id)
line_result = self._exec(
line_result = self._exec_with_trace(
inputs,
run_id=run_id,
line_number=index,
Expand Down Expand Up @@ -744,6 +748,62 @@ def _get_node_referenced_flow_inputs(
node_referenced_flow_inputs[value.value] = flow_inputs[value.value]
return node_referenced_flow_inputs

def _exec_with_trace(
self,
inputs: Mapping[str, Any],
run_id: Optional[str] = None,
line_number: Optional[int] = None,
variant_id: str = "",
validate_inputs: bool = False,
allow_generator_output: bool = False,
) -> LineResult:
"""execute line run with trace
This method is similar to `_exec`, but it also includes tracing functionality.
It starts a new span, enriches it with input and output data, and sets the span status.
Args:
inputs (Mapping): flow inputs
run_id: the id to identify the flow run
line_number: line number for batch inputs
variant_id: variant id for the line run
validate_inputs:
Flag to indicate if input validation needed. It is used along with "_raise_ex" to
define if exception shall be raised if inputs validation (type check, etc) failed
The flag is True for Flow Run, False for bulk run as default
allow_generator_output:
Flag to indicate if generator output is allowed.
Returns:
LineResult: Line run result
"""
with open_telemetry_tracer.start_as_current_span("promptflow.flow") as span:
# initialize span
span.set_attributes(
{
"framework": "promptflow",
"span_type": TraceType.FLOW.value,
}
)
# enrich span with input
enrich_span_with_input(span, inputs)
# invoke
result = self._exec(
inputs,
run_id=run_id,
line_number=line_number,
variant_id=variant_id,
validate_inputs=validate_inputs,
allow_generator_output=allow_generator_output,
)
# extract output from result
output = result.output
# enrich span with output
enrich_span_with_output(span, output)
# set status
span.set_status(StatusCode.OK)
return result

def _exec(
self,
inputs: Mapping[str, Any],
Expand Down

0 comments on commit 0222279

Please sign in to comment.