feat: add inital in memory dispatcher
Some checks failed
default / dotnet-default-workflow (push) Failing after 1m52s

Add a simple in memory dispatcher for scalar requests and stream request.
This commit is contained in:
Louis Seubert 2026-05-08 20:26:26 +02:00
commit fff952a385
Signed by: louis9902
GPG key ID: 4B9DB28F826553BD
37 changed files with 3065 additions and 0 deletions

View file

@ -0,0 +1,9 @@
[*.{cs,vb}]
# disable CA1822: Mark members as static
# -> TUnit requiring instance methods for test cases
dotnet_diagnostic.CA1822.severity = none
# disable CA1707: Identifiers should not contain underscores
dotnet_diagnostic.CA1707.severity = none
# disable IDE0060: Remove unused parameter
dotnet_diagnostic.IDE0060.severity = none

View file

@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<PropertyGroup>
<RootNamespace>Geekeey.Request</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TUnit" />
<!-- additional packages -->
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverageAttribute" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Request\Geekeey.Request.csproj" />
</ItemGroup>
</Project>

View file

@ -0,0 +1,140 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Geekeey.Request.Tests;
internal sealed class RequestDispatcherBuilderExtensionsTests
{
[Test]
public async Task I_can_add_a_type_and_register_the_options()
{
// Arrange
var services = new ServiceCollection();
var builder = services.AddRequestDispatcher();
var type = typeof(TestHandler);
// Act
builder.Add(type);
// Assert
var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
var handlers = options.GetRequestHandlers<IScalarRequestHandler<TestRequest, string>>(provider);
await Assert.That(handlers).Count().IsEqualTo(1);
await Assert.That(handlers.First()).IsTypeOf<TestHandler>();
}
[Test]
public async Task I_can_add_a_type_with_a_lifetime_and_register_the_options_and_service()
{
// Arrange
var services = new ServiceCollection();
var builder = services.AddRequestDispatcher();
var type = typeof(TestHandler);
var lifetime = ServiceLifetime.Scoped;
// Act
builder.Add(type, lifetime);
// Assert
var serviceDescriptor = services.FirstOrDefault(sd => sd.ServiceType == type);
await Assert.That(serviceDescriptor).IsNotNull();
await Assert.That(serviceDescriptor.Lifetime).IsEqualTo(lifetime);
await Assert.That(serviceDescriptor.ImplementationType).IsEqualTo(type);
var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
var handlers = options.GetRequestHandlers<IScalarRequestHandler<TestRequest, string>>(provider);
await Assert.That(handlers).Count().IsEqualTo(1);
}
[Test]
public async Task I_can_add_an_enumerable_of_types_and_register_the_options()
{
// Arrange
var services = new ServiceCollection();
var builder = services.AddRequestDispatcher();
var types = new[] { typeof(TestHandler), typeof(AnotherTestHandler) };
// Act
builder.Add(types);
// Assert
var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
var handlers1 = options.GetRequestHandlers<IScalarRequestHandler<TestRequest, string>>(provider);
await Assert.That(handlers1).Count().IsEqualTo(1);
await Assert.That(handlers1.First()).IsTypeOf<TestHandler>();
var handlers2 = options.GetRequestHandlers<IScalarRequestHandler<AnotherTestRequest, string>>(provider);
await Assert.That(handlers2).Count().IsEqualTo(1);
await Assert.That(handlers2.First()).IsTypeOf<AnotherTestHandler>();
}
[Test]
public async Task I_can_add_an_enumerable_of_types_with_a_lifetime_and_register_the_options_and_services()
{
// Arrange
var services = new ServiceCollection();
var builder = services.AddRequestDispatcher();
var types = new[] { typeof(TestHandler), typeof(AnotherTestHandler) };
var lifetime = ServiceLifetime.Singleton;
// Act
builder.Add(types, lifetime);
// Assert
foreach (var type in types)
{
var serviceDescriptor = services.FirstOrDefault(sd => sd.ServiceType == type);
await Assert.That(serviceDescriptor).IsNotNull();
await Assert.That(serviceDescriptor.Lifetime).IsEqualTo(lifetime);
}
var provider = services.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
await Assert.That(options.GetRequestHandlers<IScalarRequestHandler<TestRequest, string>>(provider)).Count().IsEqualTo(1);
await Assert.That(options.GetRequestHandlers<IScalarRequestHandler<AnotherTestRequest, string>>(provider)).Count().IsEqualTo(1);
}
[Test]
public async Task I_can_see_it_throw_when_the_builder_is_null()
{
IRequestDispatcherBuilder builder = null!;
using (Assert.Multiple())
{
await Assert.That(() => builder.Add(typeof(TestHandler))).Throws<ArgumentNullException>();
await Assert.That(() => builder.Add(typeof(TestHandler), ServiceLifetime.Transient)).Throws<ArgumentNullException>();
await Assert.That(() => builder.Add([typeof(TestHandler)])).Throws<ArgumentNullException>();
await Assert.That(() => builder.Add([typeof(TestHandler)], ServiceLifetime.Transient)).Throws<ArgumentNullException>();
}
}
private sealed class TestRequest : IScalarRequest<string>;
private sealed class TestHandler : IScalarRequestHandler<TestRequest, string>
{
public Task<string> HandleAsync(TestRequest request, CancellationToken ct)
{
return Task.FromResult("ok");
}
}
private sealed class AnotherTestRequest : IScalarRequest<string>;
private sealed class AnotherTestHandler : IScalarRequestHandler<AnotherTestRequest, string>
{
public Task<string> HandleAsync(AnotherTestRequest request, CancellationToken ct)
{
return Task.FromResult("ok");
}
}
}

View file

@ -0,0 +1,206 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
namespace Geekeey.Request.Tests;
internal sealed class ScalarBehaviourTests
{
[Test]
public async Task I_can_execute_the_closed_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<ScalarTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ScalarTestHandler))
.Add(typeof(ScalarTestBehavior)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<ScalarTestTracker>();
var request = new ScalarTestRequest { Value = "Hello" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Hello-Handled");
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_execute_the_open_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<ScalarTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ScalarTestHandler))
.Add(typeof(ScalarOpenBehavior<,>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<ScalarTestTracker>();
var request = new ScalarTestRequest { Value = "Hello" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Hello-Handled");
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_chain_the_behaviours_in_order()
{
var sc = new ServiceCollection();
sc.AddSingleton<ScalarTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ScalarTestHandler))
.Add(typeof(ScalarChainedBehaviour1))
.Add(typeof(ScalarChainedBehaviour2)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<ScalarTestTracker>();
var request = new ScalarTestRequest { Value = "Hello" };
await dispatcher.DispatchAsync(request);
// They are discovered in the order they appear in the assembly.
// In this file: ChainedBehaviour1, ChainedBehaviour2
await Assert.That(tracker.Log).Count().IsEqualTo(2);
await Assert.That(tracker.Log[0]).IsEquivalentTo("Behaviour1");
await Assert.That(tracker.Log[1]).IsEquivalentTo("Behaviour2");
}
[Test]
public async Task I_can_work_with_a_generic_wrapper_request_and_the_open_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<ScalarTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ScalarTestWrapperHandler<>))
.Add(typeof(ScalarWrapperBehavior<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<ScalarTestTracker>();
var request = new ScalarTestWrapperRequest<int> { Item = 42 };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Handled-42");
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_maintain_the_ordering_between_open_and_closed_behaviours()
{
var sc = new ServiceCollection();
sc.AddSingleton<ScalarTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ScalarTestHandler))
.Add(typeof(ScalarOrderingOpenBehavior<,>))
.Add(typeof(ScalarOrderingClosedBehavior)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<ScalarTestTracker>();
var request = new ScalarTestRequest { Value = "Order" };
await dispatcher.DispatchAsync(request);
await Assert.That(tracker.Log).Contains("OrderingOpen");
await Assert.That(tracker.Log).Contains("OrderingClosed");
}
}
public class ScalarTestTracker
{
public List<string> Log { get; } = [];
public bool Executed { get; set; }
}
public class ScalarOrderingOpenBehavior<TRequest, TOutput>(ScalarTestTracker tracker) : IScalarRequestBehavior<TRequest, TOutput>
where TRequest : IScalarRequest<TOutput>
{
public Task<TOutput> HandleAsync(TRequest request, ScalarHandlerDelegate<TOutput> next, CancellationToken cancellationToken)
{
tracker.Log.Add("OrderingOpen");
return next(request, cancellationToken);
}
}
public class ScalarOrderingClosedBehavior(ScalarTestTracker tracker) : IScalarRequestBehavior<ScalarTestRequest, string>
{
public Task<string> HandleAsync(ScalarTestRequest request, ScalarHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("OrderingClosed");
return next(request, cancellationToken);
}
}
public class ScalarTestRequest : IScalarRequest<string>
{
public string Value { get; set; } = string.Empty;
}
public class ScalarTestHandler : IScalarRequestHandler<ScalarTestRequest, string>
{
public Task<string> HandleAsync(ScalarTestRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"{request.Value}-Handled");
}
}
public class ScalarTestBehavior(ScalarTestTracker tracker) : IScalarRequestBehavior<ScalarTestRequest, string>
{
public async Task<string> HandleAsync(ScalarTestRequest request, ScalarHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return await next(request, cancellationToken);
}
}
public class ScalarOpenBehavior<TRequest, TResponse>(ScalarTestTracker tracker) : IScalarRequestBehavior<TRequest, TResponse>
where TRequest : IScalarRequest<TResponse>
{
public async Task<TResponse> HandleAsync(TRequest request, ScalarHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return await next(request, cancellationToken);
}
}
public class ScalarChainedBehaviour1(ScalarTestTracker tracker) : IScalarRequestBehavior<ScalarTestRequest, string>
{
public async Task<string> HandleAsync(ScalarTestRequest request, ScalarHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Behaviour1");
return await next(request, cancellationToken);
}
}
public class ScalarChainedBehaviour2(ScalarTestTracker tracker) : IScalarRequestBehavior<ScalarTestRequest, string>
{
public async Task<string> HandleAsync(ScalarTestRequest request, ScalarHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Behaviour2");
return await next(request, cancellationToken);
}
}
public class ScalarTestWrapperRequest<T> : IScalarRequest<string>
{
public T Item { get; set; } = default!;
}
public class ScalarTestWrapperHandler<T> : IScalarRequestHandler<ScalarTestWrapperRequest<T>, string>
{
public Task<string> HandleAsync(ScalarTestWrapperRequest<T> request, CancellationToken cancellationToken)
{
return Task.FromResult($"Handled-{request.Item}");
}
}
public class ScalarWrapperBehavior<T>(ScalarTestTracker tracker) : IScalarRequestBehavior<ScalarTestWrapperRequest<T>, string>
{
public async Task<string> HandleAsync(ScalarTestWrapperRequest<T> request, ScalarHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return await next(request, cancellationToken);
}
}

View file

@ -0,0 +1,415 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Geekeey.Request.Tests;
internal sealed class ScalarDispatcherTests
{
[Test]
public async Task I_can_dispatch_a_request_async_with_an_open_generic_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new OpenScalarRequest { Data = "Hello" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Hello-Handled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_open_generic_handler_that_has_constraints()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ConstrainedScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new ConstrainedScalarRequest { Value = 123 };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("123-Constrained");
}
[Test]
public async Task I_can_see_it_fail_if_no_handler_is_found_even_with_an_open_generic_available()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new UnhandledScalarRequest();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await dispatcher.DispatchAsync(request));
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_inherited_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
// InheritedScalarRequest : OpenScalarRequest.
// There is no Handler<InheritedScalarRequest> but there is OpenScalarHandler<TRequest> where TRequest : OpenScalarRequest.
// It should be able to handle InheritedScalarRequest.
var request = new InheritedScalarRequest { Data = "Sub" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Sub-Handled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_inherited_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(DerivedScalarHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new DerivedScalarRequest { Value = 42 };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Derived: 42");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_inherited_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceInheritedScalarHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new InterfaceInheritedScalarRequest { Name = "InterfaceTest" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("InterfaceTest-InterfaceHandled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_deep_inheritance_in_the_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new DeepDerivedScalarRequest { Data = "Deep", DeepValue = 99 };
var result = await dispatcher.DispatchAsync(request);
// OpenScalarHandler<TRequest> where TRequest : OpenScalarRequest should handle this
await Assert.That(result).IsEquivalentTo("Deep-Handled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_constrained_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceInheritedScalarHandler))
.Add(typeof(InterfaceConstrainedScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new InterfaceInheritedScalarRequest { Name = "Constrained" };
var result = await dispatcher.DispatchAsync(request);
// Both InterfaceInheritedScalarHandler and InterfaceConstrainedScalarHandler could match.
// InterfaceInheritedScalarHandler is a concrete match for InterfaceInheritedScalarRequest.
// InterfaceConstrainedScalarHandler is an open generic match.
// Currently Dispatcher.SendAsync checks concrete handlers first.
await Assert.That(result).IsEquivalentTo("Constrained-InterfaceHandled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_only_match()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceConstrainedScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new AnotherNamedScalarRequest { Name = "InterfaceOnly" };
var result = await dispatcher.DispatchAsync(request);
// No concrete handler for AnotherNamedScalarRequest, but InterfaceConstrainedScalarHandler<T> where T : INamedScalarRequest matches.
await Assert.That(result).IsEquivalentTo("InterfaceOnly-ConstrainedByInterface");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_a_nested_generic_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(WrapperScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new WrapperScalarRequest<int> { Item = 42 };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Handled-42");
}
[Test]
public async Task I_can_handle_multiple_interface_implementations()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(MultiInterfaceScalarHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new MultiInterfaceScalarRequest();
var result1 = await dispatcher.DispatchAsync<int>(request);
var result2 = await dispatcher.DispatchAsync<string>(request);
await Assert.That(result1).IsEqualTo(1);
await Assert.That(result2).IsEquivalentTo("One");
}
[Test]
public async Task I_can_see_it_fail_if_there_are_ambiguous_handle_methods()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(AmbiguousScalarHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new AmbiguousScalarRequest();
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Interface-Handled");
}
[Test]
public async Task I_can_dispatch_a_request_async_with_a_generic_interface_explicit_implementation()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ExplicitGenericScalarHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new ExplicitGenericScalarRequest { Value = "Explicit" };
var result = await dispatcher.DispatchAsync(request);
await Assert.That(result).IsEquivalentTo("Explicit-ExplicitHandled");
}
[Test]
public async Task I_can_see_it_throw_the_original_exception()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(FailingScalarHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new FailingScalarRequest();
var ex = await Assert.That(async () => await dispatcher.DispatchAsync(request)).Throws<InvalidOperationException>();
using (Assert.Multiple())
{
await Assert.That(ex?.Message).IsEquivalentTo("Handler failed");
// Assert that the stack trace contains the handler's method name,
// which proves the exception was rethrown while preserving its origin.
await Assert.That(ex?.StackTrace).Contains(nameof(FailingScalarHandler.HandleAsync));
}
}
[Test]
public async Task I_can_see_it_throw_if_dispatcher_options_are_modified_after_build()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(FailingScalarHandler)));
var provider = sc.BuildServiceProvider();
var options = provider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
options.GetRequestBehaviors<IScalarRequestHandler<FailingScalarRequest, string>>(default!);
await Assert.That(() => options.Inspect([])).Throws<InvalidOperationException>();
}
}
public class FailingScalarRequest : IScalarRequest<string>
{
}
public class FailingScalarHandler : IScalarRequestHandler<FailingScalarRequest, string>
{
public Task<string> HandleAsync(FailingScalarRequest request, CancellationToken cancellationToken)
{
throw new InvalidOperationException("Handler failed");
}
}
public class ExplicitGenericScalarRequest : IScalarRequest<string>
{
public string Value { get; set; } = string.Empty;
}
public class ExplicitGenericScalarHandler<T> : IScalarRequestHandler<ExplicitGenericScalarRequest, string>
{
Task<string> IScalarRequestHandler<ExplicitGenericScalarRequest, string>.HandleAsync(ExplicitGenericScalarRequest request, CancellationToken ct)
{
return Task.FromResult($"{request.Value}-ExplicitHandled");
}
}
public class AmbiguousScalarRequest : IScalarRequest<string>
{
}
public class AmbiguousScalarHandler : IScalarRequestHandler<AmbiguousScalarRequest, string>
{
// Public method with the same name and signature
public Task<string> HandleAsync(AmbiguousScalarRequest request, CancellationToken ct)
{
return Task.FromResult("Public-Handled");
}
// Explicit interface implementation
Task<string> IScalarRequestHandler<AmbiguousScalarRequest, string>.HandleAsync(AmbiguousScalarRequest request, CancellationToken ct)
{
return Task.FromResult("Interface-Handled");
}
}
public class OpenScalarRequest : IScalarRequest<string>
{
public string Data { get; set; } = string.Empty;
}
public class InheritedScalarRequest : OpenScalarRequest
{
}
public class DeepDerivedScalarRequest : InheritedScalarRequest
{
public int DeepValue { get; set; }
}
public class OpenScalarHandler<TRequest> : IScalarRequestHandler<TRequest, string>
where TRequest : OpenScalarRequest
{
public Task<string> HandleAsync(TRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"{request.Data}-Handled");
}
}
public class ConstrainedScalarRequest : IScalarRequest<string>
{
public int Value { get; set; }
}
public class ConstrainedScalarHandler<TRequest> : IScalarRequestHandler<TRequest, string>
where TRequest : ConstrainedScalarRequest
{
public Task<string> HandleAsync(TRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"{request.Value}-Constrained");
}
}
public class UnhandledScalarRequest : IScalarRequest<string>
{
}
public class DerivedScalarRequest : IScalarRequest<string>
{
public int Value { get; set; }
}
public abstract class BaseScalarHandler<TRequest> : IScalarRequestHandler<TRequest, string>
where TRequest : IScalarRequest<string>
{
public abstract Task<string> HandleAsync(TRequest request, CancellationToken cancellationToken);
}
public class DerivedScalarHandler : BaseScalarHandler<DerivedScalarRequest>
{
public override Task<string> HandleAsync(DerivedScalarRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"Derived: {request.Value}");
}
}
public interface INamedScalarRequest : IScalarRequest<string>
{
string Name { get; }
}
public class InterfaceInheritedScalarRequest : INamedScalarRequest
{
public string Name { get; set; } = string.Empty;
}
public class AnotherNamedScalarRequest : INamedScalarRequest
{
public string Name { get; set; } = string.Empty;
}
public class InterfaceInheritedScalarHandler : IScalarRequestHandler<InterfaceInheritedScalarRequest, string>
{
public Task<string> HandleAsync(InterfaceInheritedScalarRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"{request.Name}-InterfaceHandled");
}
}
public class InterfaceConstrainedScalarHandler<TRequest> : IScalarRequestHandler<TRequest, string>
where TRequest : INamedScalarRequest
{
public Task<string> HandleAsync(TRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"{request.Name}-ConstrainedByInterface");
}
}
public class WrapperScalarRequest<T> : IScalarRequest<string>
{
public T Item { get; set; } = default!;
}
public class WrapperScalarHandler<T> : IScalarRequestHandler<WrapperScalarRequest<T>, string>
{
public Task<string> HandleAsync(WrapperScalarRequest<T> request, CancellationToken cancellationToken)
{
return Task.FromResult($"Handled-{request.Item}");
}
}
public class MultiInterfaceScalarRequest : IScalarRequest<int>, IScalarRequest<string>
{
}
public class MultiInterfaceScalarHandler : IScalarRequestHandler<MultiInterfaceScalarRequest, int>, IScalarRequestHandler<MultiInterfaceScalarRequest, string>
{
public Task<int> HandleAsync(MultiInterfaceScalarRequest request, CancellationToken cancellationToken)
{
return Task.FromResult(1);
}
Task<string> IScalarRequestHandler<MultiInterfaceScalarRequest, string>.HandleAsync(MultiInterfaceScalarRequest request, CancellationToken ct)
{
return Task.FromResult("One");
}
}

View file

@ -0,0 +1,207 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
namespace Geekeey.Request.Tests;
internal sealed class StreamBehaviourTests
{
[Test]
public async Task I_can_execute_the_closed_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<StreamTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(StreamTestHandler))
.Add(typeof(StreamTestBehavior)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<StreamTestTracker>();
var request = new StreamTestRequest { Value = "Hello" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Hello-Handled-0", "Hello-Handled-1"]);
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_execute_the_open_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<StreamTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(StreamTestHandler))
.Add(typeof(StreamOpenBehavior<,>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<StreamTestTracker>();
var request = new StreamTestRequest { Value = "Hello" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Hello-Handled-0", "Hello-Handled-1"]);
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_chain_the_behaviours_in_order()
{
var sc = new ServiceCollection();
sc.AddSingleton<StreamTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(StreamTestHandler))
.Add(typeof(StreamChainedBehaviour1))
.Add(typeof(StreamChainedBehaviour2)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<StreamTestTracker>();
var request = new StreamTestRequest { Value = "Hello" };
await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(tracker.Log).Count().IsEqualTo(2);
await Assert.That(tracker.Log[0]).IsEquivalentTo("Behaviour1");
await Assert.That(tracker.Log[1]).IsEquivalentTo("Behaviour2");
}
[Test]
public async Task I_can_work_with_a_generic_wrapper_request_and_the_open_behaviour()
{
var sc = new ServiceCollection();
sc.AddSingleton<StreamTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(StreamTestWrapperHandler<>))
.Add(typeof(StreamWrapperBehavior<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<StreamTestTracker>();
var request = new StreamTestWrapperRequest<int> { Item = 42 };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Handled-42-0", "Handled-42-1"]);
await Assert.That(tracker.Executed).IsTrue();
}
[Test]
public async Task I_can_maintain_the_ordering_between_open_and_closed_behaviours()
{
var sc = new ServiceCollection();
sc.AddSingleton<StreamTestTracker>();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(StreamTestHandler))
.Add(typeof(StreamOrderingOpenBehavior<,>))
.Add(typeof(StreamOrderingClosedBehavior)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var tracker = provider.GetRequiredService<StreamTestTracker>();
var request = new StreamTestRequest { Value = "Order" };
await dispatcher.DispatchAsync(request).ToListAsync();
var log = tracker.Log.ToList();
await Assert.That(log).Contains("Open");
await Assert.That(log).Contains("Closed");
}
}
public class StreamTestTracker
{
public List<string> Log { get; } = [];
public bool Executed { get; set; }
}
public class StreamOrderingOpenBehavior<TRequest, TOutput>(StreamTestTracker tracker) : IStreamRequestBehavior<TRequest, TOutput>
where TRequest : IStreamRequest<TOutput>
{
public IAsyncEnumerable<TOutput> HandleAsync(TRequest request, StreamHandlerDelegate<TOutput> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Open");
return next(request, cancellationToken);
}
}
public class StreamOrderingClosedBehavior(StreamTestTracker tracker) : IStreamRequestBehavior<StreamTestRequest, string>
{
public IAsyncEnumerable<string> HandleAsync(StreamTestRequest request, StreamHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Closed");
return next(request, cancellationToken);
}
}
public class StreamTestRequest : IStreamRequest<string>
{
public string Value { get; set; } = string.Empty;
}
public class StreamTestHandler : IStreamRequestHandler<StreamTestRequest, string>
{
public async IAsyncEnumerable<string> HandleAsync(StreamTestRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"{request.Value}-Handled-0";
yield return $"{request.Value}-Handled-1";
}
}
public class StreamTestBehavior(StreamTestTracker tracker) : IStreamRequestBehavior<StreamTestRequest, string>
{
public IAsyncEnumerable<string> HandleAsync(StreamTestRequest request, StreamHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return next(request, cancellationToken);
}
}
public class StreamOpenBehavior<TRequest, TResponse>(StreamTestTracker tracker) : IStreamRequestBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
public IAsyncEnumerable<TResponse> HandleAsync(TRequest request, StreamHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return next(request, cancellationToken);
}
}
public class StreamChainedBehaviour1(StreamTestTracker tracker) : IStreamRequestBehavior<StreamTestRequest, string>
{
public IAsyncEnumerable<string> HandleAsync(StreamTestRequest request, StreamHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Behaviour1");
return next(request, cancellationToken);
}
}
public class StreamChainedBehaviour2(StreamTestTracker tracker) : IStreamRequestBehavior<StreamTestRequest, string>
{
public IAsyncEnumerable<string> HandleAsync(StreamTestRequest request, StreamHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Log.Add("Behaviour2");
return next(request, cancellationToken);
}
}
public class StreamTestWrapperRequest<T> : IStreamRequest<string>
{
public T? Item { get; set; }
}
public class StreamTestWrapperHandler<T> : IStreamRequestHandler<StreamTestWrapperRequest<T>, string>
{
public async IAsyncEnumerable<string> HandleAsync(StreamTestWrapperRequest<T> request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"Handled-{request.Item}-0";
yield return $"Handled-{request.Item}-1";
}
}
public class StreamWrapperBehavior<T>(StreamTestTracker tracker) : IStreamRequestBehavior<StreamTestWrapperRequest<T>, string>
{
public IAsyncEnumerable<string> HandleAsync(StreamTestWrapperRequest<T> request, StreamHandlerDelegate<string> next, CancellationToken cancellationToken)
{
tracker.Executed = true;
return next(request, cancellationToken);
}
}

View file

@ -0,0 +1,418 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
namespace Geekeey.Request.Tests;
public class StreamDispatcherTests
{
[Test]
public async Task I_can_dispatch_a_request_async_with_an_open_generic_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new OpenStreamRequest { Data = "Hello" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Hello-Stream-0", "Hello-Stream-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_open_generic_handler_that_has_constraints()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ConstrainedStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new ConstrainedStreamRequest { Value = 123 };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["123-Constrained-0", "123-Constrained-1"]);
}
[Test]
public async Task I_can_see_it_fail_if_no_handler_is_found_even_with_an_open_generic_available()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new UnhandledStreamRequest();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await dispatcher.DispatchAsync(request).FirstOrDefaultAsync().AsTask());
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_inherited_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new InheritedStreamRequest { Data = "Sub" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Sub-Stream-0", "Sub-Stream-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_inherited_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(DerivedStreamHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new DerivedStreamRequest { Value = 42 };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Derived: 42-0", "Derived: 42-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_inherited_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceInheritedStreamHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new InterfaceInheritedStreamRequest { Name = "InterfaceTest" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["InterfaceTest-InterfaceHandled-0", "InterfaceTest-InterfaceHandled-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_deep_inheritance_in_the_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(OpenStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new DeepDerivedStreamRequest { Data = "Deep", DeepValue = 99 };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Deep-Stream-0", "Deep-Stream-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_constrained_handler()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceInheritedStreamHandler))
.Add(typeof(InterfaceConstrainedStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new InterfaceInheritedStreamRequest { Name = "Constrained" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
// Both InterfaceInheritedStreamHandler and InterfaceConstrainedStreamHandler could match.
// InterfaceInheritedStreamHandler is a concrete match for InterfaceInheritedStreamRequest.
// InterfaceConstrainedStreamHandler is an open generic match.
// Currently Dispatcher checks concrete handlers first.
await Assert.That(results).IsEquivalentTo(["Constrained-InterfaceHandled-0", "Constrained-InterfaceHandled-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_an_interface_only_match()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(InterfaceConstrainedStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new AnotherNamedStreamRequest { Name = "InterfaceOnly" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["InterfaceOnly-ConstrainedByInterface-0", "InterfaceOnly-ConstrainedByInterface-1"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_a_nested_generic_request()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(WrapperStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new WrapperStreamRequest<int> { Item = 42 };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Handled-42-0", "Handled-42-1"]);
}
[Test]
public async Task I_can_handle_multiple_interface_implementations()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(MultiInterfaceStreamHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new MultiInterfaceStreamRequest();
var results1 = await dispatcher.DispatchAsync<int>(request).ToListAsync();
var results2 = await dispatcher.DispatchAsync<string>(request).ToListAsync();
await Assert.That(results1).IsEquivalentTo([1, 2]);
await Assert.That(results2).IsEquivalentTo(["One", "Two"]);
}
[Test]
public async Task I_can_see_it_fail_if_there_are_ambiguous_handle_methods()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(AmbiguousStreamHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new AmbiguousStreamRequest();
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Interface-Handled"]);
}
[Test]
public async Task I_can_dispatch_a_request_async_with_a_generic_interface_explicit_implementation()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(ExplicitGenericStreamHandler<>)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new ExplicitGenericStreamRequest { Value = "Explicit" };
var results = await dispatcher.DispatchAsync(request).ToListAsync();
await Assert.That(results).IsEquivalentTo(["Explicit-ExplicitHandled"]);
}
[Test]
public async Task I_can_see_it_throw_the_original_exception()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(FailingStreamHandler)));
var provider = sc.BuildServiceProvider();
var dispatcher = provider.GetRequiredService<IRequestDispatcher>();
var request = new FailingStreamRequest();
var enumerable = dispatcher.DispatchAsync(request);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await enumerable.ToListAsync().AsTask());
using (Assert.Multiple())
{
await Assert.That(ex?.Message).IsEquivalentTo("Handler failed");
await Assert.That(ex?.StackTrace).Contains(nameof(FailingStreamHandler.HandleAsync));
}
}
[Test]
public async Task I_can_see_it_throw_if_dispatcher_options_are_modified_after_build()
{
var sc = new ServiceCollection();
sc.AddRequestDispatcher(builder => builder
.Add(typeof(FailingStreamHandler)));
var provider = sc.BuildServiceProvider();
var options = provider.GetRequiredService<Microsoft.Extensions.Options.IOptions<RequestDispatcherOptions>>().Value;
options.GetRequestHandlers<IStreamRequestHandler<FailingStreamRequest, string>>(default!);
await Assert.That(() => options.Inspect([])).Throws<InvalidOperationException>();
}
}
public class FailingStreamRequest : IStreamRequest<string>
{
}
public class FailingStreamHandler : IStreamRequestHandler<FailingStreamRequest, string>
{
public async IAsyncEnumerable<string> HandleAsync(FailingStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return "Wait for it...";
throw new InvalidOperationException("Handler failed");
}
}
public class ExplicitGenericStreamRequest : IStreamRequest<string>
{
public string Value { get; set; } = string.Empty;
}
public class ExplicitGenericStreamHandler<T> : IStreamRequestHandler<ExplicitGenericStreamRequest, string>
{
async IAsyncEnumerable<string> IStreamRequestHandler<ExplicitGenericStreamRequest, string>.HandleAsync(ExplicitGenericStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct)
{
yield return $"{request.Value}-ExplicitHandled";
}
}
public class AmbiguousStreamRequest : IStreamRequest<string>
{
}
public class AmbiguousStreamHandler : IStreamRequestHandler<AmbiguousStreamRequest, string>
{
// Public method with the same name and signature
public async IAsyncEnumerable<string> HandleAsync(AmbiguousStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return "Public-Handled";
}
// Explicit interface implementation
async IAsyncEnumerable<string> IStreamRequestHandler<AmbiguousStreamRequest, string>.HandleAsync(AmbiguousStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return "Interface-Handled";
}
}
public class OpenStreamRequest : IStreamRequest<string>
{
public string Data { get; set; } = string.Empty;
}
public class InheritedStreamRequest : OpenStreamRequest
{
}
public class DeepDerivedStreamRequest : InheritedStreamRequest
{
public int DeepValue { get; set; }
}
public class OpenStreamHandler<TRequest> : IStreamRequestHandler<TRequest, string>
where TRequest : OpenStreamRequest
{
public async IAsyncEnumerable<string> HandleAsync(TRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"{request.Data}-Stream-0";
yield return $"{request.Data}-Stream-1";
}
}
public class ConstrainedStreamRequest : IStreamRequest<string>
{
public int Value { get; set; }
}
public class ConstrainedStreamHandler<TRequest> : IStreamRequestHandler<TRequest, string>
where TRequest : ConstrainedStreamRequest
{
public async IAsyncEnumerable<string> HandleAsync(TRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"{request.Value}-Constrained-0";
yield return $"{request.Value}-Constrained-1";
}
}
public class UnhandledStreamRequest : IStreamRequest<string>
{
}
public class DerivedStreamRequest : IStreamRequest<string>
{
public int Value { get; set; }
}
public abstract class BaseStreamHandler<TRequest> : IStreamRequestHandler<TRequest, string>
where TRequest : IStreamRequest<string>
{
public abstract IAsyncEnumerable<string> HandleAsync(TRequest request, CancellationToken cancellationToken);
}
public class DerivedStreamHandler : BaseStreamHandler<DerivedStreamRequest>
{
public override async IAsyncEnumerable<string> HandleAsync(DerivedStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"Derived: {request.Value}-0";
yield return $"Derived: {request.Value}-1";
}
}
public interface INamedStreamRequest : IStreamRequest<string>
{
string Name { get; }
}
public class InterfaceInheritedStreamRequest : INamedStreamRequest
{
public string Name { get; set; } = string.Empty;
}
public class AnotherNamedStreamRequest : INamedStreamRequest
{
public string Name { get; set; } = string.Empty;
}
public class InterfaceInheritedStreamHandler : IStreamRequestHandler<InterfaceInheritedStreamRequest, string>
{
public async IAsyncEnumerable<string> HandleAsync(InterfaceInheritedStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"{request.Name}-InterfaceHandled-0";
yield return $"{request.Name}-InterfaceHandled-1";
}
}
public class InterfaceConstrainedStreamHandler<TRequest> : IStreamRequestHandler<TRequest, string>
where TRequest : INamedStreamRequest
{
public async IAsyncEnumerable<string> HandleAsync(TRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"{request.Name}-ConstrainedByInterface-0";
yield return $"{request.Name}-ConstrainedByInterface-1";
}
}
public class WrapperStreamRequest<T> : IStreamRequest<string>
{
public T Item { get; set; } = default!;
}
public class WrapperStreamHandler<T> : IStreamRequestHandler<WrapperStreamRequest<T>, string>
{
public async IAsyncEnumerable<string> HandleAsync(WrapperStreamRequest<T> request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return $"Handled-{request.Item}-0";
yield return $"Handled-{request.Item}-1";
}
}
public class MultiInterfaceStreamRequest : IStreamRequest<int>, IStreamRequest<string>
{
}
public class MultiInterfaceStreamHandler : IStreamRequestHandler<MultiInterfaceStreamRequest, int>, IStreamRequestHandler<MultiInterfaceStreamRequest, string>
{
public async IAsyncEnumerable<int> HandleAsync(MultiInterfaceStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return 1;
yield return 2;
}
async IAsyncEnumerable<string> IStreamRequestHandler<MultiInterfaceStreamRequest, string>.HandleAsync(MultiInterfaceStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct)
{
yield return "One";
yield return "Two";
}
}

View file

@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net10.0</TargetFramework>
<IsPackable>true</IsPackable>
</PropertyGroup>
<PropertyGroup>
<RootNamespace>Geekeey.Request</RootNamespace>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'">
<InternalsVisibleTo Include="Geekeey.Request.Tests" />
</ItemGroup>
<PropertyGroup>
<PackageReadmeFile>package-readme.md</PackageReadmeFile>
<PackageIcon>package-icon.png</PackageIcon>
<PackageProjectUrl>https://code.geekeey.de/geekeey/request/src/branch/main/src/request</PackageProjectUrl>
<PackageLicenseExpression>EUPL-1.2</PackageLicenseExpression>
</PropertyGroup>
<ItemGroup>
<None Include=".\package-icon.png" Pack="true" PackagePath="\" Visible="false" />
<None Include=".\package-readme.md" Pack="true" PackagePath="\" Visible="false" />
<None Include="..\..\LICENSE.md" Pack="true" PackagePath="\" Visible="false" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
</ItemGroup>
</Project>

View file

@ -0,0 +1,28 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
namespace Geekeey.Request;
/// <summary>
/// Defines functionality to dispatch requests to their corresponding handlers.
/// </summary>
public interface IRequestDispatcher
{
/// <summary>
/// Asynchronously send a request to a handler producing a scalar value.
/// </summary>
/// <param name="request">Request object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <typeparam name="TResponse">Response type</typeparam>
/// <returns>A task that represents the send operation. The task result contains the handler response</returns>
Task<TResponse> DispatchAsync<TResponse>(IScalarRequest<TResponse> request, CancellationToken cancellationToken = default);
/// <summary>
/// Asynchronously send a request to a handler producing a stream value.
/// </summary>
/// <param name="request">Request object</param>
/// <param name="cancellationToken">Optional cancellation token</param>
/// <typeparam name="TResponse">Response type</typeparam>
/// <returns>The created async enumerable, representing the stream of responses.</returns>
IAsyncEnumerable<TResponse> DispatchAsync<TResponse>(IStreamRequest<TResponse> request, CancellationToken cancellationToken = default);
}

View file

@ -0,0 +1,19 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
namespace Geekeey.Request;
/// <summary>
/// Represents a builder for configuring and registering request dispatchers and their related components.
/// </summary>
public interface IRequestDispatcherBuilder
{
/// <summary>
/// Gets the <see cref="IServiceCollection"/> associated with the request dispatcher builder.
/// Provides access to the underlying service collection for configuring dependencies
/// and registering services required by the request dispatcher.
/// </summary>
IServiceCollection Services { get; }
}

View file

@ -0,0 +1,34 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
#pragma warning disable CA1711
namespace Geekeey.Request;
/// <summary>
/// Represents a behavior in the request pipeline, allowing interception, modification,
/// or chaining of asynchronous stream requests and responses.
/// </summary>
/// <typeparam name="TRequest">The type of the request being handled. Must implement <see cref="IScalarRequest{TResponse}"/>.</typeparam>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IScalarRequestBehavior<in TRequest, TResponse> where TRequest : IScalarRequest<TResponse>
{
/// <summary>
/// Handles the asynchronous processing of a request, allowing behavior customization
/// such as interception, modification, or chaining of the request and its response.
/// </summary>
/// <param name="request">The request instance being processed.</param>
/// <param name="next">The next delegate in the pipeline to execute after the custom behavior.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the response from the request.</returns>
Task<TResponse> HandleAsync(TRequest request, ScalarHandlerDelegate<TResponse> next, CancellationToken cancellationToken);
}
/// <summary>
/// Represents the delegate responsible for handling asynchronous requests in a pipeline.
/// </summary>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
/// <param name="request">The asynchronous request being processed.</param>
/// <param name="cancellationToken">A token for monitoring cancellation requests.</param>
/// <returns>A task, that represents the asynchronous operation, containing the response of type <typeparamref name="TResponse"/>.</returns>
public delegate Task<TResponse> ScalarHandlerDelegate<TResponse>(IScalarRequest<TResponse> request, CancellationToken cancellationToken);

View file

@ -0,0 +1,14 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
namespace Geekeey.Request;
/// <summary>
/// Represents a request that produces a response of a specified type.
/// This interface serves as a marker for distinguishing request types,
/// enabling their integration into request-based pipelines or dispatch mechanisms.
/// </summary>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IScalarRequest<out TResponse>
{
}

View file

@ -0,0 +1,20 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
namespace Geekeey.Request;
/// <summary>
/// Defines a handler for processing requests of a specific type and returning a response.
/// </summary>
/// <typeparam name="TRequest">The type of the request to be handled. Must implement <see cref="IScalarRequest{TResponse}"/>.</typeparam>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IScalarRequestHandler<in TRequest, TResponse> where TRequest : IScalarRequest<TResponse>
{
/// <summary>
/// Processes a scalar request and returns a response asynchronously.
/// </summary>
/// <param name="request">The request object to be processed.</param>
/// <param name="cancellationToken">A token to observe for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation, containing the response of type <typeparamref name="TResponse"/>.</returns>
Task<TResponse> HandleAsync(TRequest request, CancellationToken cancellationToken = default);
}

View file

@ -0,0 +1,34 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
#pragma warning disable CA1711
namespace Geekeey.Request;
/// <summary>
/// Represents a behavior in the request pipeline, allowing interception, modification,
/// or chaining of asynchronous stream requests and responses.
/// </summary>
/// <typeparam name="TRequest">The type of the request being processed. Must implement <see cref="IStreamRequest{TResponse}"/>.</typeparam>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IStreamRequestBehavior<in TRequest, TResponse> where TRequest : IStreamRequest<TResponse>
{
/// <summary>
/// Handles the asynchronous processing of a request, allowing behavior customization
/// such as interception, modification, or chaining of the request and its response.
/// </summary>
/// <param name="request">The request instance being processed.</param>
/// <param name="next">The next delegate in the pipeline to execute after the custom behavior.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>An asynchronous stream of <typeparamref name="TResponse"/> representing the processed response.</returns>
IAsyncEnumerable<TResponse> HandleAsync(TRequest request, StreamHandlerDelegate<TResponse> next, CancellationToken cancellationToken);
}
/// <summary>
/// Represents the delegate responsible for handling asynchronous requests in a pipeline.
/// </summary>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
/// <param name="request">The asynchronous request being processed.</param>
/// <param name="cancellationToken">A token for monitoring cancellation requests.</param>
/// <returns>An asynchronous stream of responses of type <typeparamref name="TResponse"/>.</returns>
public delegate IAsyncEnumerable<TResponse> StreamHandlerDelegate<TResponse>(IStreamRequest<TResponse> request, CancellationToken cancellationToken);

View file

@ -0,0 +1,14 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
namespace Geekeey.Request;
/// <summary>
/// Represents a request that produces a response of a specified type.
/// This interface serves as a marker for distinguishing request types,
/// enabling their integration into request-based pipelines or dispatch mechanisms.
/// </summary>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IStreamRequest<out TResponse>
{
}

View file

@ -0,0 +1,21 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
namespace Geekeey.Request;
/// <summary>
/// Defines a handler for processing streaming requests of a specific type and producing
/// a stream of responses.
/// </summary>
/// <typeparam name="TRequest">The type of the request to handle. This must implement <see cref="IStreamRequest{TResponse}"/>.</typeparam>
/// <typeparam name="TResponse">The type of the response produced by the implementing request handler.</typeparam>
public interface IStreamRequestHandler<in TRequest, out TResponse> where TRequest : IStreamRequest<TResponse>
{
/// <summary>
/// Handles a streaming request and returns a stream of responses asynchronously.
/// </summary>
/// <param name="request">The request object to be processed.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An asynchronous stream of responses of type <typeparamref name="TResponse"/>.</returns>
IAsyncEnumerable<TResponse> HandleAsync(TRequest request, CancellationToken cancellationToken);
}

View file

@ -0,0 +1,48 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using System.Collections.Concurrent;
using System.Reflection.Metadata;
[assembly: MetadataUpdateHandler(typeof(Geekeey.Request.RequestDispatcher))]
namespace Geekeey.Request;
internal sealed class RequestDispatcher : IRequestDispatcher
{
private static readonly ConcurrentDictionary<(Type, Type), ScalarRequestInvoker> ScalarRequestHandlers = new();
private static readonly ConcurrentDictionary<(Type, Type), StreamRequestInvoker> StreamRequestHandlers = new();
private readonly IServiceProvider _serviceProvider;
public RequestDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public Task<TResponse> DispatchAsync<TResponse>(IScalarRequest<TResponse> request, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
var handler = ScalarRequestHandlers.GetOrAdd((request.GetType(), typeof(TResponse)), static key =>
{
var type = typeof(ScalarRequestInvoker<,>).MakeGenericType(key.Item1, key.Item2);
return (ScalarRequestInvoker)Activator.CreateInstance(type)!;
});
return ((ScalarRequestInvoker<TResponse>)handler).HandleAsync(request, _serviceProvider, cancellationToken);
}
public IAsyncEnumerable<TResponse> DispatchAsync<TResponse>(IStreamRequest<TResponse> request, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
var handler = StreamRequestHandlers.GetOrAdd((request.GetType(), typeof(TResponse)), static key =>
{
var type = typeof(StreamRequestInvoker<,>).MakeGenericType(key.Item1, key.Item2);
return (StreamRequestInvoker)Activator.CreateInstance(type)!;
});
return ((StreamRequestInvoker<TResponse>)handler).HandleAsync(request, _serviceProvider, cancellationToken);
}
}

View file

@ -0,0 +1,133 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using static Geekeey.Request.RequestDispatcherOptions;
namespace Geekeey.Request;
/// <summary>
/// Provides extension methods for configuring <see cref="IRequestDispatcherBuilder"/>
/// with additional capabilities such as searching and registering request handlers in assemblies or adding types directly.
/// </summary>
public static class RequestDispatcherBuilderExtensions
{
/// <summary>
/// Searches for request handler types within the specified assembly and adds them to the request dispatcher
/// configuration.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> to configure.</param>
/// <param name="assembly">The assembly to search for request handler types.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder SearchHandlerInAssembly(this IRequestDispatcherBuilder builder, Assembly assembly)
{
ArgumentNullException.ThrowIfNull(builder);
var exports = assembly.GetTypes()
.Where(type => type is { IsClass: true, IsAbstract: false })
.Where(IsRequestHandlerType);
builder.Add(exports);
return builder;
}
/// <summary>
/// Searches for request handler types within the specified assembly and adds them to the request dispatcher
/// configuration with the given service lifetime.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> to configure.</param>
/// <param name="assembly">The assembly to search for request handler types.</param>
/// <param name="lifetime">The lifetime with which the request handlers are registered in the dependency injection container.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder SearchHandlerInAssembly(this IRequestDispatcherBuilder builder, Assembly assembly, ServiceLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(builder);
var exports = assembly.GetTypes()
.Where(type => type is { IsClass: true, IsAbstract: false })
.Where(IsRequestHandlerType);
builder.Add(exports, lifetime);
return builder;
}
/// <summary>
/// Adds the specified type to the request dispatcher configuration for inspection.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> to configure.</param>
/// <param name="type">The type to be added to the request dispatcher configuration.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder Add(this IRequestDispatcherBuilder builder, Type type)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.AddOptions<RequestDispatcherOptions>()
.Configure(options => options.Inspect([type]));
return builder;
}
/// <summary>
/// Adds the specified type to the request dispatcher configuration.
/// This also adds the type to the service collection with the specified lifetime,
/// allowing it to be resolved as a dependency in request handlers and behaviors.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> used to configure the request dispatcher.</param>
/// <param name="type">The type to be added to the request dispatcher configuration.</param>
/// <param name="lifetime">The lifetime scope of the type in the service container.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder Add(this IRequestDispatcherBuilder builder, Type type, ServiceLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.AddOptions<RequestDispatcherOptions>()
.Configure(options => options.Inspect([type]));
builder.Services.Add(new ServiceDescriptor(type, type, lifetime));
return builder;
}
/// <summary>
/// Adds the specified collection of types to the request dispatcher configuration for inspection.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> to configure.</param>
/// <param name="type">The collection of types to be added to the request dispatcher configuration.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder Add(this IRequestDispatcherBuilder builder, IEnumerable<Type> type)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.AddOptions<RequestDispatcherOptions>()
.Configure(options => options.Inspect(type));
return builder;
}
/// <summary>
/// Adds the specified collection of types to the request dispatcher configuration for inspection.
/// This also adds the specified collection of types to the service collection with the specified lifetime,
/// allowing it to be resolved as a dependency in request handlers and behaviors.
/// </summary>
/// <param name="builder">The <see cref="IRequestDispatcherBuilder"/> to configure.</param>
/// <param name="type">The collection of types to be added to the request dispatcher configuration.</param>
/// <param name="lifetime">The lifetime scope of the types in the service container.</param>
/// <returns>The <see cref="IRequestDispatcherBuilder"/> instance for further configuration.</returns>
public static IRequestDispatcherBuilder Add(this IRequestDispatcherBuilder builder, IEnumerable<Type> type, ServiceLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.AddOptions<RequestDispatcherOptions>()
.Configure(options => options.Inspect(type));
builder.Services.Add(type.Select(export => new ServiceDescriptor(export, export, lifetime)));
return builder;
}
}

View file

@ -0,0 +1,207 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using System.Collections;
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
namespace Geekeey.Request;
internal sealed class RequestDispatcherOptions
{
private readonly List<Type> _search = [];
private readonly Lazy<TypeIndex> _behaviorsTypeIndex;
private readonly Lazy<TypeIndex> _handlersTypeIndex;
public RequestDispatcherOptions()
{
_behaviorsTypeIndex = new Lazy<TypeIndex>(() => new BehaviorTypeIndex(_search.Distinct()));
_handlersTypeIndex = new Lazy<TypeIndex>(() => new HandlerTypeIndex(_search.Distinct()));
}
public void Inspect(IEnumerable<Type> assembly)
{
if (_behaviorsTypeIndex.IsValueCreated || _handlersTypeIndex.IsValueCreated)
{
throw new InvalidOperationException("The type index has already been created. Cannot inspect new assemblies.");
}
_search.AddRange(assembly);
}
public IEnumerable<T> GetRequestBehaviors<T>(IServiceProvider services)
{
return _behaviorsTypeIndex.Value.Resolve<T>(services);
}
public IEnumerable<T> GetRequestHandlers<T>(IServiceProvider services)
{
return _handlersTypeIndex.Value.Resolve<T>(services);
}
private abstract class TypeIndex
{
private readonly ConcurrentDictionary<Type, Func<IServiceProvider, IEnumerable>> _cache = new();
protected readonly Dictionary<Type, List<Type>> _closedTypeInfo = [];
protected readonly List<Type> _openTypeInfo = [];
protected TypeIndex(IEnumerable<Type> collection, Func<Type, bool> predicate)
{
foreach (var type in collection)
{
if (type.IsGenericTypeDefinition)
{
if (type.GetInterfaces().Any(predicate))
{
_openTypeInfo.Add(type);
}
}
else
{
foreach (var @interface in type.GetInterfaces().Where(predicate))
{
(_closedTypeInfo.TryGetValue(@interface, out var list) ? list : _closedTypeInfo[@interface] = []).Add(type);
}
}
}
}
public IEnumerable<T> Resolve<T>(IServiceProvider services)
{
return (IEnumerable<T>)_cache.GetOrAdd(typeof(T), CreateResolverFactory<T>)(services);
}
protected abstract IReadOnlyList<Type> IsAssignableTo(Type type);
private Func<IServiceProvider, IEnumerable<T>> CreateResolverFactory<T>(Type @interface)
{
var list = IsAssignableTo(@interface);
return ResolverFactory;
IEnumerable<T> ResolverFactory(IServiceProvider services)
{
foreach (var type in list)
{
yield return (T)ActivatorUtilities.GetServiceOrCreateInstance(services, type);
}
}
}
}
internal static bool IsRequestHandlerType(Type type)
{
return type.IsGenericType &&
(type.GetGenericTypeDefinition() == typeof(IScalarRequestHandler<,>) ||
type.GetGenericTypeDefinition() == typeof(IStreamRequestHandler<,>));
}
private sealed class HandlerTypeIndex(IEnumerable<Type> collection)
: TypeIndex(collection, IsRequestHandlerType)
{
protected override IReadOnlyList<Type> IsAssignableTo(Type @interface)
{
var result = new List<Type>();
if (_closedTypeInfo.TryGetValue(@interface, out var list))
{
result.AddRange(list);
}
var requestType = @interface.GetGenericArguments()[0];
_ = @interface.GetGenericArguments()[1];
foreach (var type in _openTypeInfo)
{
try
{
// open type case one: Handler<TRequest> : IRequestHandler<TRequest, TOutput>
// We try to close it with the request type.
var impl = type.MakeGenericType(requestType);
if (impl.IsAssignableTo(@interface))
{
result.Add(impl);
}
}
catch (ArgumentException)
{
}
try
{
// open type case two: Handler<T> : IRequestHandler<WrapperRequest<T>, TOutput>
// If the request is generic, we try to close the handler with the request's generic arguments.
if (requestType.IsGenericType)
{
var impl = type.MakeGenericType(requestType.GetGenericArguments());
if (impl.IsAssignableTo(@interface))
{
result.Add(impl);
}
}
}
catch (ArgumentException)
{
}
}
return result;
}
}
internal static bool IsRequestBehaviorType(Type type)
{
return type.IsGenericType &&
(type.GetGenericTypeDefinition() == typeof(IScalarRequestBehavior<,>) ||
type.GetGenericTypeDefinition() == typeof(IStreamRequestBehavior<,>));
}
private sealed class BehaviorTypeIndex(IEnumerable<Type> collection)
: TypeIndex(collection, IsRequestBehaviorType)
{
protected override IReadOnlyList<Type> IsAssignableTo(Type @interface)
{
var result = new List<Type>();
if (_closedTypeInfo.TryGetValue(@interface, out var list))
{
result.AddRange(list);
}
var requestType = @interface.GetGenericArguments()[0];
var responseType = @interface.GetGenericArguments()[1];
foreach (var behaviour in _openTypeInfo)
{
try
{
// open type case one: Behaviour<TRequest, TResponse> : IRequestBehaviour<TRequest, TResponse>
var impl = behaviour.MakeGenericType(requestType, responseType);
if (impl.IsAssignableTo(@interface))
{
result.Add(impl);
}
}
catch (ArgumentException)
{
}
try
{
// open type case two: Behaviour<T> : IRequestBehaviour<WrapperRequest<T>, TResponse>
var impl = behaviour.MakeGenericType(requestType.GetGenericArguments());
if (impl.IsAssignableTo(@interface))
{
result.Add(impl);
}
}
catch (ArgumentException)
{
}
}
return result;
}
}
}

View file

@ -0,0 +1,47 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Geekeey.Request;
internal abstract class ScalarRequestInvoker
{
public abstract Task<object?> HandleAsync(object request, IServiceProvider serviceProvider, CancellationToken cancellationToken);
}
internal abstract class ScalarRequestInvoker<TResponse> : ScalarRequestInvoker
{
public abstract Task<TResponse> HandleAsync(IScalarRequest<TResponse> request, IServiceProvider serviceProvider, CancellationToken cancellationToken);
}
internal sealed class ScalarRequestInvoker<TRequest, TResponse> : ScalarRequestInvoker<TResponse>
where TRequest : IScalarRequest<TResponse>
{
public override async Task<object?> HandleAsync(object request, IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
return await HandleAsync((IScalarRequest<TResponse>)request, serviceProvider, cancellationToken).ConfigureAwait(false);
}
public override Task<TResponse> HandleAsync(IScalarRequest<TResponse> request, IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
var options = serviceProvider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
var pipeline = options.GetRequestBehaviors<IScalarRequestBehavior<TRequest, TResponse>>(serviceProvider)
.Reverse()
.Aggregate((ScalarHandlerDelegate<TResponse>)Head, Chain);
return pipeline(request, cancellationToken);
static ScalarHandlerDelegate<TResponse> Chain(ScalarHandlerDelegate<TResponse> next, IScalarRequestBehavior<TRequest, TResponse> filter)
{
return (req, ct) => filter.HandleAsync((TRequest)req, next, ct);
}
Task<TResponse> Head(IScalarRequest<TResponse> r, CancellationToken ct)
{
return options.GetRequestHandlers<IScalarRequestHandler<TRequest, TResponse>>(serviceProvider).First().HandleAsync((TRequest)r, ct);
}
}
}

View file

@ -0,0 +1,50 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Geekeey.Request;
/// <summary>
/// Provides extension methods for configuring and registering request dispatchers and their dependencies.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds the request dispatcher services to the specified <see cref="IServiceCollection"/>.
/// </summary>
/// <param name="services">The service collection to which the request dispatcher services will be added.</param>
/// <returns>An instance of <see cref="IRequestDispatcherBuilder"/> to configure the request dispatcher.</returns>
public static IRequestDispatcherBuilder AddRequestDispatcher(this IServiceCollection services)
{
ArgumentNullException.ThrowIfNull(services);
services.AddOptions<RequestDispatcherOptions>();
services.TryAddTransient<IRequestDispatcher, RequestDispatcher>();
return new RequestDispatcherBuilder(services);
}
/// <summary>
/// Adds the request dispatcher services to the specified <see cref="IServiceCollection"/>
/// and configures it using the provided <see cref="Action{T}"/>.
/// </summary>
/// <param name="services">The service collection to which the request dispatcher services will be added.</param>
/// <param name="configure">A delegate to configure the request dispatcher builder.</param>
/// <returns>The service collection with the request dispatcher services added.</returns>
public static IServiceCollection AddRequestDispatcher(this IServiceCollection services, Action<IRequestDispatcherBuilder> configure)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configure);
configure(services.AddRequestDispatcher());
return services;
}
private sealed class RequestDispatcherBuilder(IServiceCollection services) : IRequestDispatcherBuilder
{
public IServiceCollection Services { get; } = services;
}
}

View file

@ -0,0 +1,52 @@
// Copyright (c) The Geekeey Authors
// SPDX-License-Identifier: EUPL-1.2
using System.Runtime.CompilerServices;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Geekeey.Request;
internal abstract class StreamRequestInvoker
{
public abstract IAsyncEnumerable<object?> HandleAsync(object request, IServiceProvider serviceProvider, CancellationToken cancellationToken);
}
internal abstract class StreamRequestInvoker<TResponse> : StreamRequestInvoker
{
public abstract IAsyncEnumerable<TResponse> HandleAsync(IStreamRequest<TResponse> request, IServiceProvider serviceProvider, CancellationToken cancellationToken);
}
internal sealed class StreamRequestInvoker<TRequest, TResponse> : StreamRequestInvoker<TResponse>
where TRequest : IStreamRequest<TResponse>
{
public override async IAsyncEnumerable<object?> HandleAsync(object request, IServiceProvider serviceProvider, [EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var item in HandleAsync((IStreamRequest<TResponse>)request, serviceProvider, cancellationToken))
{
yield return item;
}
}
public override IAsyncEnumerable<TResponse> HandleAsync(IStreamRequest<TResponse> request, IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
var options = serviceProvider.GetRequiredService<IOptions<RequestDispatcherOptions>>().Value;
var pipeline = options.GetRequestBehaviors<IStreamRequestBehavior<TRequest, TResponse>>(serviceProvider)
.Reverse()
.Aggregate((StreamHandlerDelegate<TResponse>)Head, Chain);
return pipeline(request, cancellationToken);
static StreamHandlerDelegate<TResponse> Chain(StreamHandlerDelegate<TResponse> next, IStreamRequestBehavior<TRequest, TResponse> filter)
{
return (req, ct) => filter.HandleAsync((TRequest)req, next, ct);
}
IAsyncEnumerable<TResponse> Head(IStreamRequest<TResponse> r, CancellationToken ct)
{
return options.GetRequestHandlers<IStreamRequestHandler<TRequest, TResponse>>(serviceProvider).First().HandleAsync((TRequest)r, ct);
}
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View file