Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Requeue error with DeferMessageAction using MessagingGateway.MsSql #3124

Open
rene-mandel opened this issue May 31, 2024 · 4 comments
Open
Assignees
Labels
1 - Up Next Bug v9 Required for v9 release

Comments

@rene-mandel
Copy link

rene-mandel commented May 31, 2024

Describe the bug

System.ArgumentException: An item with the same key has already been added. Key: deliveryTag is been thrown when you use DeferMessageAction when using MessagingGateway.MsSql

To Reproduce

Set MsSqlSubscription.RequeueCount value greater than 2.

Exceptions (if any)

at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior) at System.Collections.Generic.Dictionary`2.Add(TKey key, TValue value) at Paramore.Brighter.Serialization.DictionaryStringObjectJsonConverter.Read(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options) in /_/src/Paramore.Brighter/Serialization/DictionaryStringObjectJsonConverter.cs:line 44 at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader) at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.TryRead[TArg](ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo, TArg& arg) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.ReadAndCacheConstructorArgument(ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo) at System.Text.Json.Serialization.Converters.ObjectWithParameterizedConstructorConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo`1 jsonTypeInfo, Nullable`1 actualByteCount) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 json, JsonTypeInfo`1 jsonTypeInfo) at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, JsonSerializerOptions options) at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 133 at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic, Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 100 at Paramore.Brighter.MessagingGateway.MsSql.MsSqlMessageConsumer.Receive(Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/MsSqlMessageConsumer.cs:line 39 at Paramore.Brighter.Channel.Receive(Int32 timeoutinMilliseconds) in /_/src/Paramore.Brighter/Channel.cs:line 120 at Paramore.Brighter.ServiceActivator.MessagePump`1.Run() in /_/src/Paramore.Brighter.ServiceActivator/MessagePump.cs:line 106

Further technical details

@rene-mandel rene-mandel changed the title Requeue error with DeferMessageAction using MessagingGateway.MsSql [Bug] Requeue error with DeferMessageAction using MessagingGateway.MsSql May 31, 2024
@iancooper
Copy link
Member

@rene-mandel Can we just check your use case for MS-SQL as a message provider? We are looking into if the main use case is local development as we might recommend RMQ for testing if you can't access your production provider.

@rene-mandel
Copy link
Author

@iancooper

I'll provide you with details on how this is currently set up. I hope this helps.

private static IServiceCollection AddMessagingSystem(this IServiceCollection services, IConfiguration configuration)
{
    services.AddSingleton<IMessagingSystem, MessagingSystem>();

    MessagingSettings messagingSettings = new();
    configuration.Bind(MessagingSettings.Section, messagingSettings);
    var messagingConfiguration = new MsSqlConfiguration(connectionString: messagingSettings.SqlConnectionString, queueStoreTable: "QueueData");

    AddMessageSender(services, messagingConfiguration);
    // Receiver is a hosted service
    AddMessageReceiver(services, messagingConfiguration);

    return services;
}

private static void AddMessageSender(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
    var publication = new Publication
    {
        MakeChannels = OnMissingChannel.Create,
        Topic = new RoutingKey($"{nameof(DemoMessage)}.topic"),
    };

    var producerRegistry = new MsSqlProducerRegistryFactory(
            messagingConfiguration,
            [publication]
        )
        .Create();

    services.AddBrighter(options =>
    {
        options.HandlerLifetime = ServiceLifetime.Scoped;
    })
    .UseExternalBus(producerRegistry)
    .AutoFromAssemblies();
}

private static void AddMessageReceiver(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
    var subscriptions = new MsSqlSubscription[]
    {
        new MsSqlSubscription<DemoMessage>(
            new SubscriptionName($"{nameof(DemoMessage)}.subscription"),
            new ChannelName($"{nameof(DemoMessage)}.topic"),
            new RoutingKey($"{nameof(DemoMessage)}.topic"),
            runAsync: true,
            requeueCount: 5, // Here is the problematic line, if greater than 2, the exception will occur
            requeueDelayInMilliseconds: 5000
        )
    };

    var messageConsumerFactory = new MsSqlMessageConsumerFactory(messagingConfiguration);

    services.AddServiceActivator(options =>
    {
        options.Subscriptions = subscriptions;
        options.ChannelFactory = new ChannelFactory(messageConsumerFactory);
        options.HandlerLifetime = ServiceLifetime.Scoped;
        options.UseScoped = true;
    })
    .AutoFromAssemblies();

    services.AddHostedService<ServiceActivatorHostedService>();
}

@iancooper
Copy link
Member

@rene-mandel Is your use case that you are using MSSQL as your primary message broker, not as a local alternative to a cloud service. It would help us to understand usage of this feature.

Thanks for the code though, it should help

@rene-mandel
Copy link
Author

@iancooper Yes, MSSQL is used as primary message broker. This usage is through all environments.

@iancooper iancooper self-assigned this Jun 21, 2024
@iancooper iancooper added the v9 Required for v9 release label Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - Up Next Bug v9 Required for v9 release
Projects
None yet
Development

No branches or pull requests

2 participants