Skip to content

Commit

Permalink
Added Initial structure for Data Analytics.
Browse files Browse the repository at this point in the history
Added basic Card Abandonment Rate calculations.
Added Market Basked Model analytics
Added Project for hosting Data Analytics and ESDB subscription management
Added the separate solution for Data Analytics example
Added configuration for CartAbandonmentRateAnalysis
Added Product Relationships analytics
Refactored CartAbandonmentRate to return summary type instead of event
Replaced redundant Shopping Cart class for CartProductItemsMatched event and used Dictionary instead
Added ElasticSearch read model to Data Analytics example
  • Loading branch information
oskardudycz committed Feb 17, 2023
1 parent a1f5d91 commit d775bac
Show file tree
Hide file tree
Showing 34 changed files with 1,749 additions and 1 deletion.
33 changes: 32 additions & 1 deletion EventSourcing.NetCore.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27130.2003
Expand Down Expand Up @@ -398,6 +398,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ECommerce", "Sample\CRUDToC
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ECommerce.Domain", "Sample\CRUDToCQRS\09-MultipleModules\ECommerce.Domain\ECommerce.Domain.csproj", "{4467D89A-E214-4266-B606-42FB6A5E3724}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DataAnalytics", "DataAnalytics", "{12188348-C513-4A55-A704-6AC0B52815BE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ECommerce", "Sample\EventStoreDB\DataAnalytics\ECommerce\ECommerce.csproj", "{D37BADD9-A7D8-4C24-9976-8BE2F1695E82}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MarketBasketAnalytics", "Sample\EventStoreDB\DataAnalytics\MarketBasketAnalytics\MarketBasketAnalytics.csproj", "{D25B8D85-9598-4323-8B71-66BE2F3F3F31}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DataAnalytics.Core", "Sample\EventStoreDB\DataAnalytics\DataAnalytics.Core\DataAnalytics.Core.csproj", "{C53D404F-F27A-4368-92B9-F2C4202B04FB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MarketBasketAnalytics.Api", "Sample\EventStoreDB\DataAnalytics\MarketBasketAnalytics.Api\MarketBasketAnalytics.Api.csproj", "{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -884,6 +894,22 @@ Global
{4467D89A-E214-4266-B606-42FB6A5E3724}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4467D89A-E214-4266-B606-42FB6A5E3724}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4467D89A-E214-4266-B606-42FB6A5E3724}.Release|Any CPU.Build.0 = Release|Any CPU
{D37BADD9-A7D8-4C24-9976-8BE2F1695E82}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D37BADD9-A7D8-4C24-9976-8BE2F1695E82}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D37BADD9-A7D8-4C24-9976-8BE2F1695E82}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D37BADD9-A7D8-4C24-9976-8BE2F1695E82}.Release|Any CPU.Build.0 = Release|Any CPU
{D25B8D85-9598-4323-8B71-66BE2F3F3F31}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D25B8D85-9598-4323-8B71-66BE2F3F3F31}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D25B8D85-9598-4323-8B71-66BE2F3F3F31}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D25B8D85-9598-4323-8B71-66BE2F3F3F31}.Release|Any CPU.Build.0 = Release|Any CPU
{C53D404F-F27A-4368-92B9-F2C4202B04FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C53D404F-F27A-4368-92B9-F2C4202B04FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C53D404F-F27A-4368-92B9-F2C4202B04FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C53D404F-F27A-4368-92B9-F2C4202B04FB}.Release|Any CPU.Build.0 = Release|Any CPU
{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1045,6 +1071,11 @@ Global
{94EC4289-2B87-4EDB-B170-BE3EC5F78485} = {862DA47E-1A3D-4FD7-956B-8C9D9DF6D54F}
{D582B994-3E6C-40C9-ABC3-1E3057B52F69} = {94EC4289-2B87-4EDB-B170-BE3EC5F78485}
{4467D89A-E214-4266-B606-42FB6A5E3724} = {94EC4289-2B87-4EDB-B170-BE3EC5F78485}
{12188348-C513-4A55-A704-6AC0B52815BE} = {9C5C1CB3-B8CD-4CEE-A5F2-5983770C60BB}
{D37BADD9-A7D8-4C24-9976-8BE2F1695E82} = {12188348-C513-4A55-A704-6AC0B52815BE}
{D25B8D85-9598-4323-8B71-66BE2F3F3F31} = {12188348-C513-4A55-A704-6AC0B52815BE}
{C53D404F-F27A-4368-92B9-F2C4202B04FB} = {12188348-C513-4A55-A704-6AC0B52815BE}
{DE90D2E2-80F3-4C9A-A5E6-78A2684FEB96} = {12188348-C513-4A55-A704-6AC0B52815BE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A5F55604-2FF3-43B7-B657-4F18E6E95D3B}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace DataAnalytics.Core.BackgroundWorkers
{
public class BackgroundWorker: IHostedService
{
private Task? executingTask;
private CancellationTokenSource? cts;
private readonly ILogger<BackgroundWorker> logger;
private readonly Func<CancellationToken, Task> perform;

public BackgroundWorker(
ILogger<BackgroundWorker> logger,
Func<CancellationToken, Task> perform
)
{
this.logger = logger;
this.perform = perform;
}

public Task StartAsync(CancellationToken cancellationToken)
{
// Create a linked token so we can trigger cancellation outside of this token's cancellation
cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

executingTask = perform(cts.Token);

return executingTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
// Stop called without start
if (executingTask == null)
return;

// Signal cancellation to the executing method
cts?.Cancel();

// Wait until the issue completes or the stop token triggers
await Task.WhenAny(executingTask, Task.Delay(-1, cancellationToken));

// Throw if cancellation triggered
cancellationToken.ThrowIfCancellationRequested();

logger.LogInformation("Background worker stopped");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using DataAnalytics.Core.ElasticSearch;
using DataAnalytics.Core.Events;
using DataAnalytics.Core.EventStoreDB;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace DataAnalytics.Core
{
public static class Configuration
{
public static IServiceCollection AddCoreServices(
this IServiceCollection services,
IConfiguration configuration
) =>
services
.AddEventBus()
.AddEventStoreDB(configuration)
.AddElasticsearch(configuration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Polly" Version="7.2.2" />

<PackageReference Include="EventStore.Client.Grpc.Streams" Version="21.2.0" />

<PackageReference Include="NEST" Version="7.15.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace DataAnalytics.Core.ElasticSearch
{
public class ElasticSearchConfig
{
public string Url { get; set; } = default!;
public string DefaultIndex { get; set; } = default!;
}

public static class ElasticSearchConfigExtensions
{
private const string DefaultConfigKey = "ElasticSearch";
public static IServiceCollection AddElasticsearch(
this IServiceCollection services, IConfiguration configuration, Action<ConnectionSettings>? config = null)
{
var elasticSearchConfig = configuration.GetSection(DefaultConfigKey).Get<ElasticSearchConfig>();

var settings = new ConnectionSettings(new Uri(elasticSearchConfig.Url))
.DefaultIndex(elasticSearchConfig.DefaultIndex);

config?.Invoke(settings);

var client = new ElasticClient(settings);

return services.AddSingleton<IElasticClient>(client);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Nest;

namespace DataAnalytics.Core.ElasticSearch
{
public static class ElasticSearchRepository
{

public static async Task<T?> Find<T>(this IElasticClient elasticClient, string id, CancellationToken ct)
where T: class =>
(await elasticClient.GetAsync<T>(id, ct: ct))?.Source;

public static async Task Upsert<T>(this IElasticClient elasticClient, string id, T entity, CancellationToken ct)
where T: class =>
await elasticClient.UpdateAsync<T>(id,
u => u.Doc(entity).Upsert(entity).Index(ToIndexName<T>()),
ct
);

private static readonly ConcurrentDictionary<Type, string> TypeNameMap = new();

private static string ToIndexName<TIndex>()
{
var indexType = typeof(TIndex);
return TypeNameMap.GetOrAdd(indexType, _ =>
{
var modulePrefix = indexType.Namespace!.Split(".").First();
return $"{modulePrefix}-{indexType.Name}".ToLower();
});
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using DataAnalytics.Core.Subscriptions;
using EventStore.Client;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace DataAnalytics.Core.EventStoreDB
{
public class EventStoreDBConfig
{
public string ConnectionString { get; set; } = default!;
}

public record EventStoreDBOptions(
bool UseInternalCheckpointing = true
);

public static class EventStoreDBConfigExtensions
{
private const string DefaultConfigKey = "EventStore";

public static IServiceCollection AddEventStoreDB(this IServiceCollection services, IConfiguration config, EventStoreDBOptions? options = null)
{
var eventStoreDBConfig = config.GetSection(DefaultConfigKey).Get<EventStoreDBConfig>();

services.AddSingleton(
new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)))
.AddTransient<EventStoreDBSubscriptionToAll, EventStoreDBSubscriptionToAll>();

if (options?.UseInternalCheckpointing != false)
{
services
.AddTransient<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}

return services;
}
}
}
Loading

0 comments on commit d775bac

Please sign in to comment.