-
-
Notifications
You must be signed in to change notification settings - Fork 11
MVP proposal #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lsfera
wants to merge
80
commits into
event-driven-io:master
Choose a base branch
from
lsfera:serialization
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
MVP proposal #28
Changes from 71 commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
ab10cc1
FS clean up
lsfera 977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
lsfera ad74d3c
made UseTable() optional
lsfera b59d386
enforce AOT
lsfera 71c1f14
rename IConsumes to IHandles
lsfera 86656ec
Use double quote
lsfera 438cf21
use ILKE. Replication slot name is forced to lcase
lsfera b24c01f
unused directive
lsfera b024c6f
remove static variables to enable multiple instances on same process
lsfera 8cc1fed
Added DependencyIbjection project
lsfera 380d06a
Added demo projrct for DI
lsfera acf5f4d
Enhanced logging
lsfera e918dfa
bumped version to 0.1.1
lsfera 2055fed
allow tableDescriptor access
lsfera eadba73
renamed vars
lsfera c097047
expose NpgsqlDataSource along with connection string - close #16
lsfera ef9ac95
rename extension method
lsfera 345cad6
move to files
lsfera 645ec1a
simplified di registration
lsfera ca28054
collapse project
lsfera f4cfe0c
rename folder
lsfera f9bf48f
mark as implicit usage
lsfera 7fff1b4
switch to prepared statement
lsfera f0c4160
dispose resources
lsfera 8db4187
explicit defaults
lsfera c446721
add PublisherOptions
lsfera e71999b
Expose shortcut for table validation when bootstrapping publisher
lsfera 2351af1
explain different available to publisher for validating table
lsfera 9d3249a
file renamed
lsfera ea6f7fe
renamed IHandler to IMessageHandler
lsfera 3887f14
first working version
lsfera 0cf7595
removed unused
lsfera 6809b02
files reorg
lsfera e20bfa9
marked classes as sealed
lsfera 9656b3b
Reviewed Atrtibutes class allowed only, not inherit
lsfera 06bd0a1
unused file
lsfera 41a96de
unused
lsfera 52b5567
expose singleton
lsfera 253d4d4
reify memoization
lsfera 678b2c8
formatting stuff
lsfera c60bc89
class renamed to PublisherOptions and SubscriberOptions. Move classes…
lsfera 7729fcd
Ensure subscriber default options
lsfera 5ae673e
rename `PublicationSetupOptions` to `PublicationOptions` and `Replica…
lsfera 85f520b
move classes to files
lsfera 20bc581
renamed files
lsfera a95cf77
table creation can be enforced when bootstrapping in both processes(p…
lsfera d3c5825
typo
lsfera f1d0961
Enforcing invariants on publisher/subscriber options builder
lsfera 910902a
updated satellite packages to latest
lsfera 5126bb9
simplify test
lsfera b236c91
additional examples
lsfera 49408fa
consumer lookup logic must fallback on willdcard
lsfera 376d748
use select pg_advisory_xact_lock to serialize access to message table…
lsfera 8794c4f
mime type is internally exposed for future extension towards binary d…
lsfera fa05588
renamed nethod Name => Named for table name
lsfera 7205446
tested table creation
lsfera ddbd7f6
simplified raw urn
lsfera 964b883
Provide untyped append method
lsfera 1de0a77
avoid usage checking on public method
lsfera 575c2b5
provide additional usage patterns
lsfera 44953bb
provide minimal dsl on typed consumer
lsfera 2726be9
move methodInfo registration at configuration time
lsfera c777721
enable processed data trace only on trace enabled logging level
lsfera b3d4d17
corrected IErrorProcessor signature to accept KoEnvelope Id
lsfera 8fd8041
added more publishing options
lsfera e56c3c1
embed ConsumeOptions with typed Consumes
lsfera cb460d1
Added EnableSubscriptionAutoHeal - see https://github.com/event-drive…
lsfera 80ee62c
narrowed scope
lsfera 35bc4ff
enabling cli invocation
lsfera 0cf3e48
set container name
lsfera 7e16bbb
added auto heal use case
lsfera 414538e
enforce single INamingPolicy instance per subscription
lsfera cebdd98
Added minimal ServiceWorker configuatrion with tests
lsfera 6cbcf1e
Added string routing capability and renamed to ([Message|Raw])RoutedB…
lsfera be963fb
move to source generated logger
lsfera 4741c1a
use string interpolation to enable automatic renaming on refactory
lsfera 321cc8b
use evtension method
lsfera cea26de
default on not found
lsfera 0db1a31
added test clause
lsfera 7968c6d
added ImTools
lsfera File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
#usage: .\scripts\autoheal.ps1 -SolutionFile .\Blumchen.sln -ComposeFile .\docker-compose.yml | ||
param( | ||
[string]$SolutionFile, | ||
[string]$ComposeFile | ||
) | ||
|
||
|
||
$env:DOCKER_CLI_HINTS=$false #disable docker hints | ||
Write-Host "Setup infrastrucure" | ||
|
||
try { | ||
|
||
start powershell { | ||
docker compose up; | ||
Read-Host; | ||
|
||
} | ||
|
||
Write-Host "Waiting for container readiness..." | ||
do | ||
{ | ||
Start-Sleep -s 5 | ||
$state=$(docker inspect db|ConvertFrom-Json).State | ||
$status=$state.Status | ||
$exitCode=$state.ExitCode | ||
$restart=$state.Restarting | ||
}Until(($status -eq "running") -and ($exitCode -eq 0) -and ($restart -eq $false)) | ||
|
||
Write-Host "...Done" | ||
|
||
Write-Host "Start subscriber" | ||
start powershell { | ||
dotnet run --project ./src/SubscriberWorker/SubscriberWorker.csproj | ||
Read-Host; | ||
} | ||
|
||
Write-Host "Publishing 10 messages to test the subscriptions are working properly: hit ENTER when done!" | ||
|
||
start powershell { | ||
dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\"; | ||
} | ||
|
||
Read-Host; | ||
|
||
Write-Host "Start massive insert to force wal segment creation..." | ||
start powershell { | ||
dotnet run --project ./src/Publisher/Publisher.csproj -- -c 800000 -t "UserSubscribed" | ||
} | ||
|
||
Write-Host "Wait for subscribers to auto heal on error...reporting on row insert" | ||
|
||
Start-Sleep -s 15 | ||
do | ||
{ | ||
docker exec -it db psql -h localhost -U postgres -w -c "select count(*) from outbox;" | ||
}Until(Read-Host "Enter to report on counting rows(another key to proceed when done)" "") | ||
|
||
Write-Host "Subscribers resiliency tested :-)" | ||
Write-Host "Publishing 10 messages to test the subscriptions are still working properly: hit ENTER when done!" | ||
|
||
start powershell { | ||
dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\" | ||
} | ||
Read-Host; | ||
|
||
Write-Host "We're done...: hit ENTER to shut down!" | ||
|
||
Read-Host; | ||
|
||
}catch { | ||
Write-Host "An error occurred:" | ||
Write-Host $_ | ||
} | ||
finally{ | ||
docker compose -f $ComposeFile down --rmi local | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
namespace Blumchen; | ||
|
||
public class ConfigurationException(string message): Exception(message); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
using Blumchen.Subscriptions.Replication; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Logging; | ||
|
||
#pragma warning disable IL2091 | ||
|
||
namespace Blumchen.DependencyInjection; | ||
|
||
public static class ServiceCollectionExtensions | ||
{ | ||
|
||
public static IServiceCollection AddBlumchen<T>( | ||
this IServiceCollection service, | ||
Func<IServiceProvider, IWorkerOptionsBuilder, IWorkerOptionsBuilder> workerOptions) | ||
where T : class, IMessageHandler => | ||
service | ||
.AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build()) | ||
.AddHostedService(provider => | ||
new Worker<T>(workerOptions(provider, new WorkerOptionsBuilder()).Build(), | ||
provider.GetRequiredService<ILogger<Worker<T>>>())); | ||
|
||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
using System.Collections.Concurrent; | ||
using Blumchen.Subscriptions; | ||
using Blumchen.Subscriptions.Replication; | ||
using Microsoft.Extensions.Hosting; | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace Blumchen.DependencyInjection; | ||
|
||
public class Worker<T>( | ||
WorkerOptions options, | ||
ILogger<Worker<T>> logger): BackgroundService where T : class, IMessageHandler | ||
{ | ||
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>"; | ||
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase); | ||
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) | ||
lsfera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); | ||
return; | ||
|
||
static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled) => | ||
(ll, enabled) switch | ||
{ | ||
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), | ||
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), | ||
(LogLevel.Trace, true) => (logger, template, parameters) => logger.LogTrace(template, parameters), | ||
(LogLevel.Warning, true) => (logger, template, parameters) => logger.LogWarning(template, parameters), | ||
(LogLevel.Error, true) => (logger, template, parameters) => logger.LogError(template, parameters), | ||
(LogLevel.Critical, true) => (logger, template, parameters) => logger.LogCritical(template, parameters), | ||
(_, _) => (_, _, _) => { } | ||
}; | ||
} | ||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
{ | ||
await options.OuterPipeline.ExecuteAsync(async token => | ||
await options.InnerPipeline.ExecuteAsync(async ct => | ||
{ | ||
await using var subscription = new Subscription(); | ||
await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct) | ||
.GetAsyncEnumerator(ct); | ||
Notify(logger, LogLevel.Information, "{WorkerName} started", WorkerName); | ||
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) | ||
Notify(logger, LogLevel.Trace, "{cursor.Current} processed", cursor.Current); | ||
}, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false); | ||
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
using Blumchen.Subscriber; | ||
using Blumchen.Subscriptions.Management; | ||
using Npgsql; | ||
using Npgsql.Replication; | ||
using Polly; | ||
|
||
namespace Blumchen.DependencyInjection; | ||
|
||
public record WorkerOptions( | ||
ISubscriberOptions SubscriberOptions, | ||
ResiliencePipeline OuterPipeline, | ||
ResiliencePipeline InnerPipeline); | ||
|
||
public interface IWorkerOptionsBuilder | ||
{ | ||
IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline); | ||
IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder); | ||
WorkerOptions Build(); | ||
IWorkerOptionsBuilder EnableSubscriptionAutoHeal(); | ||
} | ||
|
||
internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder | ||
{ | ||
private ResiliencePipeline? _outerPipeline = default; | ||
private Func<string, string, ResiliencePipeline>? _innerPipelineFn = default; | ||
private Func<OptionsBuilder, OptionsBuilder>? _builder; | ||
|
||
public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline) | ||
{ | ||
_outerPipeline = resiliencePipeline; | ||
return this; | ||
}public IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder) | ||
{ | ||
_builder = builder; | ||
return this; | ||
} | ||
|
||
public WorkerOptions Build() | ||
{ | ||
ArgumentNullException.ThrowIfNull(_outerPipeline); | ||
lsfera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ArgumentNullException.ThrowIfNull(_builder); | ||
var subscriberOptions = _builder(new OptionsBuilder()).Build(); | ||
return new(subscriberOptions, _outerPipeline, | ||
_innerPipelineFn?.Invoke(subscriberOptions.ReplicationOptions.SlotName,subscriberOptions.ConnectionStringBuilder.ConnectionString) ?? | ||
ResiliencePipeline.Empty | ||
); | ||
} | ||
|
||
public IWorkerOptionsBuilder EnableSubscriptionAutoHeal() | ||
lsfera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
_innerPipelineFn = (replicationSlotName, connectionString) => new ResiliencePipelineBuilder().AddRetry(new() | ||
{ | ||
ShouldHandle = | ||
new PredicateBuilder().Handle<PostgresException>(exception => | ||
exception.SqlState.Equals("55000", StringComparison.OrdinalIgnoreCase)), | ||
MaxRetryAttempts = int.MaxValue, | ||
OnRetry = async args => | ||
{ | ||
await using var conn = new LogicalReplicationConnection(connectionString); | ||
await conn.Open(args.Context.CancellationToken); | ||
await conn.ReCreate(replicationSlotName, args.Context.CancellationToken).ConfigureAwait(false); | ||
}, | ||
}).Build(); | ||
return this; | ||
} | ||
} | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.