This commit is contained in:
commit
b54acec2f2
66 changed files with 5135 additions and 0 deletions
280
src/Process/PipeTarget.cs
Normal file
280
src/Process/PipeTarget.cs
Normal file
|
|
@ -0,0 +1,280 @@
|
|||
using System.Buffers;
|
||||
using System.Text;
|
||||
|
||||
namespace Geekeey.Extensions.Process;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a pipe for the process's standard output or standard error stream.
|
||||
/// </summary>
|
||||
public abstract partial class PipeTarget
|
||||
{
|
||||
/// <summary>
|
||||
/// Reads the binary content from the origin stream and pushes it into the pipe.
|
||||
/// Origin stream represents the process's standard output or standard error stream.
|
||||
/// </summary>
|
||||
public abstract Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
public partial class PipeTarget
|
||||
{
|
||||
private class AnonymousPipeTarget(Func<Stream, CancellationToken, Task> func) : PipeTarget
|
||||
{
|
||||
public override async Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default)
|
||||
=> await func(origin, cancellationToken);
|
||||
}
|
||||
|
||||
private class AggregatePipeTarget(IReadOnlyList<PipeTarget> targets) : PipeTarget
|
||||
{
|
||||
public IReadOnlyList<PipeTarget> Targets { get; } = targets;
|
||||
|
||||
public override async Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
// create a separate sub-stream for each target
|
||||
var targetSubStreams = new Dictionary<PipeTarget, MemoryBufferStream>();
|
||||
foreach (var target in Targets)
|
||||
{
|
||||
targetSubStreams[target] = new MemoryBufferStream();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// start piping in the background
|
||||
async Task StartCopyAsync(KeyValuePair<PipeTarget, MemoryBufferStream> targetSubStream)
|
||||
{
|
||||
var (target, subStream) = targetSubStream;
|
||||
|
||||
try
|
||||
{
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
await target.CopyFromAsync(subStream, cts.Token);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// abort the operation if any of the targets fail
|
||||
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
await cts.CancelAsync();
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
var readingTask = Task.WhenAll(targetSubStreams.Select(StartCopyAsync));
|
||||
|
||||
try
|
||||
{
|
||||
// read from the main stream and replicate the data to each sub-stream
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSizes.Stream);
|
||||
|
||||
while (true)
|
||||
{
|
||||
var bytesRead = await origin.ReadAsync(buffer.Memory, cts.Token);
|
||||
|
||||
if (bytesRead <= 0)
|
||||
break;
|
||||
|
||||
foreach (var (_, subStream) in targetSubStreams)
|
||||
{
|
||||
await subStream.WriteAsync(buffer.Memory[..bytesRead], cts.Token);
|
||||
}
|
||||
}
|
||||
|
||||
// report that transmission is complete
|
||||
foreach (var (_, subStream) in targetSubStreams)
|
||||
{
|
||||
await subStream.ReportCompletionAsync(cts.Token);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
// wait for all targets to finish and maybe propagate exceptions
|
||||
await readingTask;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
foreach (var (_, stream) in targetSubStreams)
|
||||
{
|
||||
await stream.DisposeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public partial class PipeTarget
|
||||
{
|
||||
/// <summary>
|
||||
/// Pipe target that discards all data. Functionally equivalent to a null device.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Using this target results in the corresponding stream (standard output or standard error) not being opened for
|
||||
/// the underlying process at all. In the vast majority of cases, this behavior should be functionally equivalent to
|
||||
/// piping to a null stream, but without the performance overhead of consuming and discarding unneeded data. This
|
||||
/// may be undesirable in certain situations, in which case it's recommended to pipe to a null stream explicitly
|
||||
/// using <see cref="ToStream(Stream)" /> with <see cref="Stream.Null" />.
|
||||
/// </remarks>
|
||||
public static PipeTarget Null { get; } = Create((_, cancellationToken) =>
|
||||
!cancellationToken.IsCancellationRequested ? Task.CompletedTask : Task.FromCanceled(cancellationToken));
|
||||
|
||||
/// <summary>
|
||||
/// Creates an anonymous pipe target with the <see cref="CopyFromAsync(Stream, CancellationToken)" /> method
|
||||
/// implemented by the specified asynchronous delegate.
|
||||
/// </summary>
|
||||
public static PipeTarget Create(Func<Stream, CancellationToken, Task> func)
|
||||
=> new AnonymousPipeTarget(func);
|
||||
|
||||
/// <summary>
|
||||
/// Creates an anonymous pipe target with the <see cref="CopyFromAsync(Stream, CancellationToken)" /> method
|
||||
/// implemented by the specified synchronous delegate.
|
||||
/// </summary>
|
||||
public static PipeTarget Create(Action<Stream> action) => Create(
|
||||
(origin, _) =>
|
||||
{
|
||||
action(origin);
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that writes to the specified stream.
|
||||
/// </summary>
|
||||
public static PipeTarget ToStream(Stream stream) => Create(
|
||||
async (origin, cancellationToken) =>
|
||||
await origin.CopyToAsync(stream, cancellationToken));
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that writes to the specified file.
|
||||
/// </summary>
|
||||
public static PipeTarget ToFile(string filePath) => Create(
|
||||
async (origin, cancellationToken) =>
|
||||
{
|
||||
await using var target = File.Create(filePath);
|
||||
await origin.CopyToAsync(target, cancellationToken);
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that writes to the specified string builder.
|
||||
/// </summary>
|
||||
public static PipeTarget ToStringBuilder(StringBuilder stringBuilder, Encoding encoding) => Create(
|
||||
async (origin, cancellationToken) =>
|
||||
{
|
||||
using var reader = new StreamReader(origin, encoding, false, BufferSizes.StreamReader, true);
|
||||
using var buffer = MemoryPool<char>.Shared.Rent(BufferSizes.StreamReader);
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
var charsRead = await reader.ReadAsync(buffer.Memory, cancellationToken);
|
||||
if (charsRead <= 0) break;
|
||||
stringBuilder.Append(buffer.Memory[..charsRead]);
|
||||
}
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that writes to the specified string builder.
|
||||
/// Uses <see cref="Console.OutputEncoding" /> for decoding.
|
||||
/// </summary>
|
||||
public static PipeTarget ToStringBuilder(StringBuilder stringBuilder)
|
||||
=> ToStringBuilder(stringBuilder, Console.OutputEncoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Func<string, CancellationToken, Task> func, Encoding encoding) => Create(
|
||||
async (origin, cancellationToken) =>
|
||||
{
|
||||
using var reader = new StreamReader(origin, encoding, false, BufferSizes.StreamReader, true);
|
||||
while (await reader.ReadLineAsync(cancellationToken) is { } line)
|
||||
{
|
||||
await func(line, cancellationToken);
|
||||
}
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream.
|
||||
/// Uses <see cref="Console.OutputEncoding" /> for decoding.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Func<string, CancellationToken, Task> func) =>
|
||||
ToDelegate(func, Console.OutputEncoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Func<string, Task> func, Encoding encoding) => ToDelegate(
|
||||
async (line, _) => await func(line), encoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream.
|
||||
/// Uses <see cref="Console.OutputEncoding" /> for decoding.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Func<string, Task> func) => ToDelegate(func, Console.OutputEncoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified synchronous delegate on every line written to the stream.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Action<string> action, Encoding encoding) => ToDelegate(
|
||||
line =>
|
||||
{
|
||||
action(line);
|
||||
return Task.CompletedTask;
|
||||
}, encoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that invokes the specified synchronous delegate on every line written to the stream.
|
||||
/// Uses <see cref="Console.OutputEncoding" /> for decoding.
|
||||
/// </summary>
|
||||
public static PipeTarget ToDelegate(Action<string> action)
|
||||
=> ToDelegate(action, Console.OutputEncoding);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that replicates data over multiple inner targets.
|
||||
/// </summary>
|
||||
public static PipeTarget Merge(IEnumerable<PipeTarget> targets)
|
||||
{
|
||||
// optimize targets to avoid unnecessary piping
|
||||
var optimizedTargets = OptimizeTargets(targets);
|
||||
|
||||
return optimizedTargets.Count switch
|
||||
{
|
||||
// avoid merging if there are no targets
|
||||
0 => Null,
|
||||
// avoid merging if there's only one target
|
||||
1 => optimizedTargets.Single(),
|
||||
_ => new AggregatePipeTarget(optimizedTargets)
|
||||
};
|
||||
|
||||
static IReadOnlyList<PipeTarget> OptimizeTargets(IEnumerable<PipeTarget> targets)
|
||||
{
|
||||
var result = new List<PipeTarget>();
|
||||
|
||||
// unwrap merged targets
|
||||
UnwrapTargets(targets, result);
|
||||
|
||||
// filter out no-op
|
||||
result.RemoveAll(t => t == Null);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static void UnwrapTargets(IEnumerable<PipeTarget> targets, ICollection<PipeTarget> output)
|
||||
{
|
||||
foreach (var target in targets)
|
||||
{
|
||||
if (target is AggregatePipeTarget mergedTarget)
|
||||
{
|
||||
UnwrapTargets(mergedTarget.Targets, output);
|
||||
}
|
||||
else
|
||||
{
|
||||
output.Add(target);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a pipe target that replicates data over multiple inner targets.
|
||||
/// </summary>
|
||||
public static PipeTarget Merge(params PipeTarget[] targets) => Merge((IEnumerable<PipeTarget>)targets);
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue