Skip to content

Commit

Permalink
HelloAgents App Host with xlang sample (#4395)
Browse files Browse the repository at this point in the history
* adds a python xlang sample and aspire code to host it
* fixes message delivery on dotnet runtime
  • Loading branch information
rysweet authored Dec 3, 2024
1 parent c062c51 commit 1c09de8
Show file tree
Hide file tree
Showing 28 changed files with 628 additions and 78 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived
await App.RuntimeApp!.WaitForShutdownAsync();
await app.WaitForShutdownAsync();

[TopicSubscription("HelloAgents")]
[TopicSubscription("agents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
Expand Down
1 change: 1 addition & 0 deletions dotnet/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Aspire.Hosting" Version="9.0.0" />
<PackageVersion Include="Aspire.Hosting.Python" Version="9.0.0" />
<PackageVersion Include="AspNetCore.Authentication.ApiKey" Version="8.0.1" />
<PackageVersion Include="Aspire.Azure.AI.OpenAI" Version="8.0.1-preview.8.24267.1" />
<PackageVersion Include="Aspire.Hosting.AppHost" Version="9.0.0" />
Expand Down
1 change: 1 addition & 0 deletions dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<ItemGroup>
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting" />
<PackageReference Include="Aspire.Hosting.Python" />
</ItemGroup>

<ItemGroup>
Expand Down
18 changes: 13 additions & 5 deletions dotnet/samples/Hello/Hello.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@

var builder = DistributedApplication.CreateBuilder(args);
var backend = builder.AddProject<Projects.Backend>("backend").WithExternalHttpEndpoints();
builder.AddProject<Projects.HelloAgent>("client")
var client = builder.AddProject<Projects.HelloAgent>("HelloAgentsDotNET")
.WithReference(backend)
.WithEnvironment("AGENT_HOST", $"{backend.GetEndpoint("https").Property(EndpointProperty.Url)}")
.WithEnvironment("AGENT_HOST", backend.GetEndpoint("https"))
.WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true")
.WaitFor(backend);

#pragma warning disable ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
// xlang is over http for now - in prod use TLS between containers
builder.AddPythonApp("HelloAgentsPython", "../../../../python/packages/autogen-core/samples/xlang/hello_python_agent", "hello_python_agent.py", "../../../../../.venv")
.WithReference(backend)
.WithEnvironment("AGENT_HOST", backend.GetEndpoint("http"))
.WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true")
.WithEnvironment("GRPC_DNS_RESOLVER", "native")
.WithOtlpExporter()
.WaitFor(client);
#pragma warning restore ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
using var app = builder.Build();

await app.StartAsync();
var url = backend.GetEndpoint("http").Url;
Console.WriteLine("Backend URL: " + url);

await app.WaitForShutdownAsync();
2 changes: 1 addition & 1 deletion dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Microsoft.Extensions.AI;

namespace Hello;
[TopicSubscription("HelloAgents")]
[TopicSubscription("agents")]
public class HelloAIAgent(
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
Expand Down
2 changes: 1 addition & 1 deletion dotnet/samples/Hello/HelloAIAgents/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

namespace Hello
{
[TopicSubscription("HelloAgents")]
[TopicSubscription("agents")]
public class HelloAgent(
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
Expand Down
21 changes: 8 additions & 13 deletions dotnet/samples/Hello/HelloAgent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,17 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

// step 1: create in-memory agent runtime

// step 2: register HelloAgent to that agent runtime

// step 3: start the agent runtime

// step 4: send a message to the agent

// step 5: wait for the agent runtime to shutdown
var local = true;
if (Environment.GetEnvironmentVariable("AGENT_HOST") != null) { local = false; }
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: true);
//var app = await AgentsApp.StartAsync();
}, local: local).ConfigureAwait(false);
await app.WaitForShutdownAsync();

namespace Hello
{
[TopicSubscription("HelloAgents")]
[TopicSubscription("agents")]
public class HelloAgent(
IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase(
Expand Down Expand Up @@ -53,7 +45,10 @@ public async Task Handle(ConversationClosed item)
var goodbye = $"********************* {item.UserId} said {item.UserMessage} ************************";
var evt = new Output { Message = goodbye };
await PublishMessageAsync(evt).ConfigureAwait(true);
await PublishMessageAsync(new Shutdown()).ConfigureAwait(false);
if (Environment.GetEnvironmentVariable("STAY_ALIVE_ON_GOODBYE") != "true")
{
await PublishMessageAsync(new Shutdown()).ConfigureAwait(false);
}
}

public async Task Handle(Shutdown item)
Expand Down
2 changes: 1 addition & 1 deletion dotnet/samples/Hello/HelloAgentState/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Hello
{
[TopicSubscription("HelloAgents")]
[TopicSubscription("agents")]
public class HelloAgent(
IAgentRuntime context,
IHostApplicationLifetime hostApplicationLifetime,
Expand Down
43 changes: 43 additions & 0 deletions dotnet/samples/Hello/protos/agent_events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
syntax = "proto3";

package HelloAgents;

option csharp_namespace = "Microsoft.AutoGen.Abstractions";
message TextMessage {
string textMessage = 1;
string source = 2;
}
message Input {
string message = 1;
}
message InputProcessed {
string route = 1;
}
message Output {
string message = 1;
}
message OutputWritten {
string route = 1;
}
message IOError {
string message = 1;
}
message NewMessageReceived {
string message = 1;
}
message ResponseGenerated {
string response = 1;
}
message GoodBye {
string message = 1;
}
message MessageStored {
string message = 1;
}
message ConversationClosed {
string user_id = 1;
string user_message = 2;
}
message Shutdown {
string message = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ namespace Microsoft.AutoGen.Abstractions;

public static class MessageExtensions
{
private const string PROTO_DATA_CONTENT_TYPE = "application/x-protobuf";
public static CloudEvent ToCloudEvent<T>(this T message, string source) where T : IMessage
{
return new CloudEvent
{
ProtoData = Any.Pack(message),
Type = message.Descriptor.FullName,
Source = source,
Id = Guid.NewGuid().ToString()

Id = Guid.NewGuid().ToString(),
Attributes = { { "datacontenttype", new CloudEvent.Types.CloudEventAttributeValue { CeString = PROTO_DATA_CONTENT_TYPE } } }
};
}
public static T FromCloudEvent<T>(this CloudEvent cloudEvent) where T : IMessage, new()
Expand Down
49 changes: 38 additions & 11 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,50 @@ protected AgentBase(
runtime.AgentInstance = this;
this.EventTypes = eventTypes;
_logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger<AgentBase>();
var subscriptionRequest = new AddSubscriptionRequest
AddImplicitSubscriptionsAsync().AsTask().Wait();
Completion = Start();
}
internal Task Completion { get; }

private async ValueTask AddImplicitSubscriptionsAsync()
{
var topicTypes = new List<string>
{
this.AgentId.Type + ":" + this.AgentId.Key,
this.AgentId.Type
};

foreach (var topicType in topicTypes)
{
RequestId = Guid.NewGuid().ToString(),
Subscription = new Subscription
var subscriptionRequest = new AddSubscriptionRequest
{
TypeSubscription = new TypeSubscription
RequestId = Guid.NewGuid().ToString(),
Subscription = new Subscription
{
AgentType = this.AgentId.Type,
TopicType = this.AgentId.Type + "/" + this.AgentId.Key
TypeSubscription = new TypeSubscription
{
AgentType = this.AgentId.Type,
TopicType = topicType
}
}
};
// explicitly wait for this to complete
await _runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).ConfigureAwait(true);
}

// using reflection, find all methods that Handle<T> and subscribe to the topic T
var handleMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle").ToList();
foreach (var method in handleMethods)
{
var eventType = method.GetParameters()[0].ParameterType;
var topic = EventTypes.EventsMap.FirstOrDefault(x => x.Value.Contains(eventType.Name)).Key;
if (topic != null)
{
Subscribe(nameof(topic));
}
};
_runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).AsTask().Wait();
Completion = Start();
}
internal Task Completion { get; }
}

}
internal Task Start()
{
var didSuppress = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ private async ValueTask RegisterAgentTypeAsync(string type, Type agentType, Canc
//var state = agentType.BaseType?.GetGenericArguments().First();
var topicTypes = agentType.GetCustomAttributes<TopicSubscriptionAttribute>().Select(t => t.Topic);

//TODO: do something with the response (like retry on error)
await WriteChannelAsync(new Message
{
RegisterAgentTypeRequest = new RegisterAgentTypeRequest
Expand All @@ -227,9 +228,47 @@ await WriteChannelAsync(new Message
//Events = { events }
}
}, cancellationToken).ConfigureAwait(false);

foreach (var topic in topicTypes)
{
var subscriptionRequest = new Message
{
AddSubscriptionRequest = new AddSubscriptionRequest
{
RequestId = Guid.NewGuid().ToString(),
Subscription = new Subscription
{
TypeSubscription = new TypeSubscription
{
AgentType = type,
TopicType = topic
}
}
}
};
await WriteChannelAsync(subscriptionRequest, cancellationToken).ConfigureAwait(true);
foreach (var e in events)
{
subscriptionRequest = new Message
{
AddSubscriptionRequest = new AddSubscriptionRequest
{
RequestId = Guid.NewGuid().ToString(),
Subscription = new Subscription
{
TypeSubscription = new TypeSubscription
{
AgentType = type,
TopicType = topic + "." + e
}
}
}
};
await WriteChannelAsync(subscriptionRequest, cancellationToken).ConfigureAwait(true);
}
}
}
}

// new is intentional
public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
Expand Down
Loading

0 comments on commit 1c09de8

Please sign in to comment.