From a6debe862aa56596fafb6561b292a755cdc74be7 Mon Sep 17 00:00:00 2001 From: Mohammad Mazraeh Date: Sat, 30 Nov 2024 22:45:20 -0800 Subject: [PATCH 1/2] update to include create_stream Signed-off-by: Mohammad Mazraeh --- .../samples/distributed-group-chat/README.md | 3 +- .../samples/distributed-group-chat/_agents.py | 84 ++++++++++++++++--- .../samples/distributed-group-chat/run.sh | 4 + .../components/models/_model_client.py | 1 + .../models/_openai/_openai_client.py | 1 - .../models/_reply_chat_completion_client.py | 1 + 6 files changed, 81 insertions(+), 13 deletions(-) diff --git a/python/packages/autogen-core/samples/distributed-group-chat/README.md b/python/packages/autogen-core/samples/distributed-group-chat/README.md index b4cf16583996..3aadbee5ff5b 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/README.md +++ b/python/packages/autogen-core/samples/distributed-group-chat/README.md @@ -9,7 +9,7 @@ This example runs a gRPC server using [WorkerAgentRuntimeHost](../../src/autogen ### Setup Python Environment 1. Create a virtual environment as instructed in [README](../../../../../../../../README.md). -2. Run `uv pip install chainlit` in the same virtual environment +2. Run `uv pip install pydantic==2.10.1 chainlit` in the same virtual environment. We have to pin the pydantic version due to [this issue](https://github.com/Chainlit/chainlit/issues/1544) ### General Configuration @@ -111,4 +111,3 @@ graph TD; ## TODO: - [ ] Properly handle chat restarts. It complains about group chat manager being already registered -- [ ] Add streaming to the UI like [this example](https://docs.chainlit.io/advanced-features/streaming) when [this bug](https://github.com/microsoft/autogen/issues/4213) is resolved diff --git a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py index 8ff935600bab..5f5c7e6ec932 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py @@ -1,6 +1,6 @@ import asyncio import random -from typing import Awaitable, Callable, List +from typing import AsyncGenerator, Awaitable, Callable, List from uuid import uuid4 from _types import GroupChatMessage, MessageChunk, RequestToSpeak, UIAgentConfig @@ -14,6 +14,7 @@ SystemMessage, UserMessage, ) +from autogen_core.components.models._types import CreateResult from rich.console import Console from rich.markdown import Markdown @@ -51,21 +52,23 @@ async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageCon self._chat_history.append( UserMessage(content=f"Transferred to {self.id.type}, adopt the persona immediately.", source="system") ) - completion = await self._model_client.create([self._system_message] + self._chat_history) - assert isinstance(completion.content, str) - self._chat_history.append(AssistantMessage(content=completion.content, source=self.id.type)) - - console_message = f"\n{'-'*80}\n**{self.id.type}**: {completion.content}" - self.console.print(Markdown(console_message)) - - await publish_message_to_ui_and_backend( + stream_output = self._model_client.create_stream( + messages=[self._system_message] + self._chat_history, max_consecutive_empty_chunk_tolerance=3 + ) + create_stream_result = await publish_message_stream_to_ui_and_backend( runtime=self, source=self.id.type, - user_message=completion.content, + stream_output=stream_output, ui_config=self._ui_config, group_chat_topic_type=self._group_chat_topic_type, ) + if create_stream_result is not None: + self._chat_history.append(AssistantMessage(content=create_stream_result.content, source=self.id.type)) + + console_message = f"\n{'-'*80}\n**{self.id.type}**: {create_stream_result.content}" + self.console.print(Markdown(console_message)) + class GroupChatManager(RoutedAgent): def __init__( @@ -168,12 +171,72 @@ async def handle_message_chunk(self, message: MessageChunk, ctx: MessageContext) await self._on_message_chunk_func(message) +async def publish_message_stream_to_ui( + runtime: RoutedAgent | WorkerAgentRuntime, + source: str, + ui_config: UIAgentConfig, + stream_output: AsyncGenerator, +) -> None: + """Publishes a stream of messages to the UI.""" + message_id = str(uuid4()) + async for chunk in stream_output: + if isinstance(chunk, str): + msg_chunk = MessageChunk(message_id=message_id, text=str(chunk), author=source, finished=False) + + await runtime.publish_message( + msg_chunk, + DefaultTopicId(type=ui_config.topic_type), + ) + await asyncio.sleep(random.uniform(ui_config.min_delay, ui_config.max_delay)) + elif isinstance(chunk, CreateResult): + print("Ok, finished the message!") + await runtime.publish_message( + MessageChunk(message_id=message_id, text=" ", author=source, finished=True), + DefaultTopicId(type=ui_config.topic_type), + ) + + +async def publish_message_stream_to_ui_and_backend( + runtime: RoutedAgent | WorkerAgentRuntime, + source: str, + ui_config: UIAgentConfig, + group_chat_topic_type: str, + stream_output: AsyncGenerator, +) -> None | CreateResult: + """Publishes a stream of messages to both the UI and backend.""" + message_id = str(uuid4()) + async for chunk in stream_output: + if isinstance(chunk, str): + msg_chunk = MessageChunk(message_id=message_id, text=str(chunk), author=source, finished=False) + + await runtime.publish_message( + msg_chunk, + DefaultTopicId(type=ui_config.topic_type), + ) + await asyncio.sleep(random.uniform(ui_config.min_delay, ui_config.max_delay)) + elif isinstance(chunk, CreateResult): + print("Ok, finished the message!") + await runtime.publish_message( + MessageChunk(message_id=message_id, text=" ", author=source, finished=True), + DefaultTopicId(type=ui_config.topic_type), + ) + # Publish message to backend + await runtime.publish_message( + GroupChatMessage(body=UserMessage(content=str(chunk.content), source=source)), + topic_id=DefaultTopicId(type=group_chat_topic_type), + ) + return chunk + + return None + + async def publish_message_to_ui( runtime: RoutedAgent | WorkerAgentRuntime, source: str, user_message: str, ui_config: UIAgentConfig, ) -> None: + """Publishes a single message to the UI.""" message_id = str(uuid4()) # Stream the message to UI message_chunks = ( @@ -200,6 +263,7 @@ async def publish_message_to_ui_and_backend( ui_config: UIAgentConfig, group_chat_topic_type: str, ) -> None: + """Publishes a single message to both the UI and backend.""" # Publish messages for ui await publish_message_to_ui( runtime=runtime, diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run.sh b/python/packages/autogen-core/samples/distributed-group-chat/run.sh index d4b8c1b1b6f4..0c732523f11e 100755 --- a/python/packages/autogen-core/samples/distributed-group-chat/run.sh +++ b/python/packages/autogen-core/samples/distributed-group-chat/run.sh @@ -1,5 +1,9 @@ #!/bin/bash # # Start a new tmux session named 'distributed_group_chat' +# These line are to supress https://stackoverflow.com/questions/78780089 +export GRPC_VERBOSITY=ERROR +export GLOG_minloglevel=2 + tmux new-session -d -s distributed_group_chat # # Split the terminal into 2 vertical panes diff --git a/python/packages/autogen-core/src/autogen_core/components/models/_model_client.py b/python/packages/autogen-core/src/autogen_core/components/models/_model_client.py index 532bb2ea1258..1ff0aed295bf 100644 --- a/python/packages/autogen-core/src/autogen_core/components/models/_model_client.py +++ b/python/packages/autogen-core/src/autogen_core/components/models/_model_client.py @@ -45,6 +45,7 @@ def create_stream( json_output: Optional[bool] = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: Optional[CancellationToken] = None, + max_consecutive_empty_chunk_tolerance: int = 0, ) -> AsyncGenerator[Union[str, CreateResult], None]: ... def actual_usage(self) -> RequestUsage: ... diff --git a/python/packages/autogen-ext/src/autogen_ext/models/_openai/_openai_client.py b/python/packages/autogen-ext/src/autogen_ext/models/_openai/_openai_client.py index 1d46ecc7e561..c9f6f1c070f7 100644 --- a/python/packages/autogen-ext/src/autogen_ext/models/_openai/_openai_client.py +++ b/python/packages/autogen-ext/src/autogen_ext/models/_openai/_openai_client.py @@ -573,7 +573,6 @@ async def create_stream( json_output: Optional[bool] = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: Optional[CancellationToken] = None, - *, max_consecutive_empty_chunk_tolerance: int = 0, ) -> AsyncGenerator[Union[str, CreateResult], None]: """ diff --git a/python/packages/autogen-ext/src/autogen_ext/models/_reply_chat_completion_client.py b/python/packages/autogen-ext/src/autogen_ext/models/_reply_chat_completion_client.py index 187dfdace14a..6b6494654eaa 100644 --- a/python/packages/autogen-ext/src/autogen_ext/models/_reply_chat_completion_client.py +++ b/python/packages/autogen-ext/src/autogen_ext/models/_reply_chat_completion_client.py @@ -160,6 +160,7 @@ async def create_stream( json_output: Optional[bool] = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: Optional[CancellationToken] = None, + max_consecutive_empty_chunk_tolerance: int = 0, ) -> AsyncGenerator[Union[str, CreateResult], None]: """Return the next completion as a stream.""" if self._current_index >= len(self.chat_completions): From 4d14babca8a1c1b5c50448155ad8c09c7cbdfbca Mon Sep 17 00:00:00 2001 From: Mohammad Mazraeh Date: Sat, 30 Nov 2024 23:10:53 -0800 Subject: [PATCH 2/2] fix mypy errors Signed-off-by: Mohammad Mazraeh --- .../autogen-core/samples/distributed-group-chat/_agents.py | 6 +++--- python/packages/autogen-core/tests/test_tool_agent.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py index 5f5c7e6ec932..fb8ba5b1b361 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py @@ -1,6 +1,6 @@ import asyncio import random -from typing import AsyncGenerator, Awaitable, Callable, List +from typing import AsyncGenerator, Awaitable, Callable, List, Union from uuid import uuid4 from _types import GroupChatMessage, MessageChunk, RequestToSpeak, UIAgentConfig @@ -175,7 +175,7 @@ async def publish_message_stream_to_ui( runtime: RoutedAgent | WorkerAgentRuntime, source: str, ui_config: UIAgentConfig, - stream_output: AsyncGenerator, + stream_output: AsyncGenerator[Union[str, CreateResult], None], ) -> None: """Publishes a stream of messages to the UI.""" message_id = str(uuid4()) @@ -201,7 +201,7 @@ async def publish_message_stream_to_ui_and_backend( source: str, ui_config: UIAgentConfig, group_chat_topic_type: str, - stream_output: AsyncGenerator, + stream_output: AsyncGenerator[Union[str, CreateResult], None], ) -> None | CreateResult: """Publishes a stream of messages to both the UI and backend.""" message_id = str(uuid4()) diff --git a/python/packages/autogen-core/tests/test_tool_agent.py b/python/packages/autogen-core/tests/test_tool_agent.py index bdbd3b96b724..f22603ceb07e 100644 --- a/python/packages/autogen-core/tests/test_tool_agent.py +++ b/python/packages/autogen-core/tests/test_tool_agent.py @@ -122,6 +122,7 @@ def create_stream( json_output: Optional[bool] = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: Optional[CancellationToken] = None, + max_consecutive_empty_chunk_tolerance: int = 0, ) -> AsyncGenerator[Union[str, CreateResult], None]: raise NotImplementedError()