From fce9dee4c33ecb9a1255b65a7171df54f01cc18a Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 16:41:35 +0100 Subject: [PATCH 1/8] docs: add multi-stream produce design plan Co-Authored-By: Claude Opus 4.6 --- .../2026-03-09-multi-stream-produce-design.md | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 docs/plans/2026-03-09-multi-stream-produce-design.md diff --git a/docs/plans/2026-03-09-multi-stream-produce-design.md b/docs/plans/2026-03-09-multi-stream-produce-design.md new file mode 100644 index 00000000..a280cd97 --- /dev/null +++ b/docs/plans/2026-03-09-multi-stream-produce-design.md @@ -0,0 +1,64 @@ +# Multi-Stream Produce Design + +## Goal + +Add a multi-stream `Produce` overload to `IProducer` and `IProducer` for batch efficiency and API simplification. Update Gateway to use the new overload. + +## New Types + +Located in `src/Core/src/Eventuous.Producers/`. + +```csharp +// Non-generic, for IProducer +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +// Generic, for IProducer +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest( + StreamName Stream, + IEnumerable Messages, + TProduceOptions? Options +) where TProduceOptions : class; +``` + +## Interface Changes + +### IProducer + +New default interface method with parallel execution: + +```csharp +Task Produce(IReadOnlyCollection requests, CancellationToken ct = default) { + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, ct))); +} +``` + +### IProducer + +New default interface method with parallel execution: + +```csharp +Task Produce(IReadOnlyCollection> requests, CancellationToken ct = default) { + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, ct))); +} +``` + +## BaseProducer Changes + +Override multi-stream `Produce` to add tracing at batch level, then delegate to a new virtual `ProduceMessages` overload. Implementations can override for optimized multi-stream behavior; default calls single-stream `ProduceMessages` in parallel. + +## Gateway Update + +`GatewayHandler` replaces `GroupBy` + parallel per-stream `Produce` with a single call to multi-stream `Produce`, constructing `ProduceRequest` from the transformed messages. + +## What Doesn't Change + +- Individual producer implementations (Kafka, RabbitMQ, PubSub, Service Bus) inherit the default parallel behavior. They can override later for optimization. +- Existing single-stream `Produce` signatures remain untouched. +- Ack/nack semantics unchanged — `ProducedMessage` already carries callbacks. + +## Testing + +- Unit tests for default parallel behavior +- Gateway integration test verifying multi-stream produce works end-to-end From 728188dd413a9aed62bf446d910c26a2ac08a7b2 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 16:43:09 +0100 Subject: [PATCH 2/8] docs: add multi-stream produce implementation plan Co-Authored-By: Claude Opus 4.6 --- docs/plans/2026-03-09-multi-stream-produce.md | 339 ++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 docs/plans/2026-03-09-multi-stream-produce.md diff --git a/docs/plans/2026-03-09-multi-stream-produce.md b/docs/plans/2026-03-09-multi-stream-produce.md new file mode 100644 index 00000000..9ab69603 --- /dev/null +++ b/docs/plans/2026-03-09-multi-stream-produce.md @@ -0,0 +1,339 @@ +# Multi-Stream Produce Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add multi-stream `Produce` overloads to `IProducer` / `IProducer` for batch efficiency, and update Gateway to use them. + +**Architecture:** New `ProduceRequest` / `ProduceRequest` record structs as input types. Default interface methods on `IProducer` / `IProducer` with parallel execution. `BaseProducer` overrides with tracing. Gateway simplified to single multi-stream call. + +**Tech Stack:** C# preview, .NET 10/9/8, TUnit test framework, OpenTelemetry tracing + +--- + +### Task 1: Add ProduceRequest Types + +**Files:** +- Create: `src/Core/src/Eventuous.Producers/ProduceRequest.cs` + +**Step 1: Create the ProduceRequest types file** + +```csharp +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.InteropServices; + +namespace Eventuous.Producers; + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages, TProduceOptions? Options) + where TProduceOptions : class; +``` + +**Step 2: Verify it builds** + +Run: `dotnet build src/Core/src/Eventuous.Producers/Eventuous.Producers.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +feat: add ProduceRequest types for multi-stream produce +``` + +--- + +### Task 2: Add Multi-Stream Overloads to IProducer Interfaces + +**Files:** +- Modify: `src/Core/src/Eventuous.Producers/IProducer.cs` + +**Step 1: Add default interface method to IProducer** + +After the existing `Produce` method (line 18), add: + +```csharp +/// +/// Produce messages to multiple streams in parallel. +/// +/// Collection of produce requests, one per target stream +/// +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); +``` + +**Step 2: Add default interface method to IProducer** + +After the existing `Produce` method (line 33), add: + +```csharp +/// +/// Produce messages to multiple streams in parallel. +/// +/// Collection of produce requests with options, one per target stream +/// +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); +``` + +**Step 3: Verify it builds** + +Run: `dotnet build src/Core/src/Eventuous.Producers/Eventuous.Producers.csproj` +Expected: Build succeeded + +**Step 4: Commit** + +``` +feat: add multi-stream Produce overloads to IProducer interfaces +``` + +--- + +### Task 3: Add Multi-Stream Produce to BaseProducer with Tracing + +**Files:** +- Modify: `src/Core/src/Eventuous.Producers/BaseProducer.cs` + +**Step 1: Add multi-stream Produce override with batch tracing** + +After the existing single-stream `Produce` method (ends at line 56), add: + +```csharp +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +public Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); +} +``` + +Note: Each individual `Produce(stream, messages, options, ct)` call already creates its own tracing activity via the existing single-stream `Produce` method on `BaseProducer`. So multi-stream just fans out to the already-traced single-stream calls. + +Also add the non-generic overload: + +```csharp +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +public Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); +} +``` + +**Step 2: Build the solution** + +Run: `dotnet build Eventuous.slnx` +Expected: Build succeeded. This validates all producer implementations (Kafka, RabbitMQ, PubSub, Service Bus) still compile — they inherit from `BaseProducer` and don't need changes. + +**Step 3: Commit** + +``` +feat: add multi-stream Produce to BaseProducer with empty-check +``` + +--- + +### Task 4: Update GatewayProducer to Delegate Multi-Stream + +**Files:** +- Modify: `src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs` + +**Step 1: Add multi-stream Produce delegation** + +Add two new methods to `GatewayProducer` that delegate to the inner producer after waiting for readiness: + +```csharp +public async Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + await inner.Produce(requests, cancellationToken).NoContext(); +} + +public async Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + await ((IProducer)inner).Produce(requests, cancellationToken).NoContext(); +} +``` + +**Step 2: Verify it builds** + +Run: `dotnet build src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +feat: add multi-stream Produce delegation to GatewayProducer +``` + +--- + +### Task 5: Update GatewayHandler to Use Multi-Stream Produce + +**Files:** +- Modify: `src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs` + +**Step 1: Replace GroupBy + parallel Produce with single multi-stream call** + +Replace the `HandleEvent` try block (lines 37-41) with: + +```csharp +try { + var contextMeta = GatewayMetaHelper.GetContextMeta(context); + + var requests = shovelMessages + .GroupBy(x => x.TargetStream) + .Select(g => new ProduceRequest( + g.Key, + g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }), + g.First().ProduceOptions + )) + .ToArray(); + + await producer.Produce(requests, context.CancellationToken).NoContext(); +} catch (OperationCanceledException e) { context.Nack>(e); } +``` + +This eliminates the `ProduceToStream` local function entirely. The `ProduceRequest` constructor bundles stream + messages + options, and the multi-stream `Produce` handles parallel execution. + +Note on `ProduceOptions`: Currently the old code calls `Produce` per message (each with its own `ProduceOptions`). Grouping by stream means we pick `g.First().ProduceOptions` for the group. This is consistent with the existing behavior — messages to the same stream should have the same options. If different options per message within a stream are needed, that's a separate concern handled at the single-stream level. + +**Step 2: Verify it builds** + +Run: `dotnet build src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +refactor: simplify GatewayHandler using multi-stream Produce +``` + +--- + +### Task 6: Write Tests for Multi-Stream Produce + +**Files:** +- Create: `src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs` + +**Step 1: Write the test class** + +Create a test file that validates multi-stream produce behavior using a test producer (similar pattern to `RegistrationTests.cs`): + +```csharp +using Eventuous.Producers; + +namespace Eventuous.Tests.Gateway; + +public class MultiStreamProduceTests { + [Test] + public async Task ShouldProduceToMultipleStreams() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1], null), + new(stream2, [msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + [Test] + public async Task ShouldHandleEmptyRequests() { + var producer = new TestProducer(); + + await producer.Produce(Array.Empty>()); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(0); + } + + [Test] + public async Task ShouldProduceMultipleMessagesToSameStream() { + var producer = new TestProducer(); + var stream = new StreamName("stream-1"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream, [msg1, msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams.Distinct()).HasCount().EqualTo(1); + } + + class TestProducer : BaseProducer { + public List ProducedMessages { get; } = []; + public List Streams { get; } = []; + + protected override Task ProduceMessages( + StreamName stream, + IEnumerable messages, + TestProduceOptions? options, + CancellationToken cancellationToken = default + ) { + Streams.Add(stream); + ProducedMessages.AddRange(messages); + + return Task.CompletedTask; + } + } + + record TestProduceOptions; +} +``` + +**Step 2: Run the tests** + +Run: `dotnet test --project src/Gateway/test/Eventuous.Tests.Gateway/Eventuous.Tests.Gateway.csproj -f net10.0` +Expected: All tests pass + +**Step 3: Commit** + +``` +test: add multi-stream produce tests +``` + +--- + +### Task 7: Build and Run Full Test Suite + +**Step 1: Build entire solution** + +Run: `dotnet build Eventuous.slnx` +Expected: Build succeeded, 0 errors + +**Step 2: Run Gateway tests** + +Run: `dotnet test --project src/Gateway/test/Eventuous.Tests.Gateway/Eventuous.Tests.Gateway.csproj -f net10.0` +Expected: All tests pass + +**Step 3: Run core tests to verify no regressions** + +Run: `dotnet test --project src/Core/test/Eventuous.Tests/Eventuous.Tests.csproj -f net10.0` +Expected: All tests pass + +**Step 4: Final commit if any cleanup needed, then push** From 61585ae9d658e7edc2916604e513ac155cc84b38 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 16:49:21 +0100 Subject: [PATCH 3/8] feat: add multi-stream produce support to IProducer and BaseProducer Add ProduceRequest record structs and multi-stream Produce overloads that delegate to existing single-stream methods with parallel execution. Co-Authored-By: Claude Opus 4.6 --- .../src/Eventuous.Producers/BaseProducer.cs | 18 ++++++++++++++++++ src/Core/src/Eventuous.Producers/IProducer.cs | 11 +++++++++++ .../src/Eventuous.Producers/ProduceRequest.cs | 13 +++++++++++++ 3 files changed, 42 insertions(+) create mode 100644 src/Core/src/Eventuous.Producers/ProduceRequest.cs diff --git a/src/Core/src/Eventuous.Producers/BaseProducer.cs b/src/Core/src/Eventuous.Producers/BaseProducer.cs index 146b55f7..1905829e 100644 --- a/src/Core/src/Eventuous.Producers/BaseProducer.cs +++ b/src/Core/src/Eventuous.Producers/BaseProducer.cs @@ -54,4 +54,22 @@ public async Task Produce(StreamName stream, IEnumerable messag return (act, [producedMessage]); } } + + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + public Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); + } + + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + public Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); + } } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Producers/IProducer.cs b/src/Core/src/Eventuous.Producers/IProducer.cs index a7436677..8e44d851 100644 --- a/src/Core/src/Eventuous.Producers/IProducer.cs +++ b/src/Core/src/Eventuous.Producers/IProducer.cs @@ -16,6 +16,17 @@ public interface IProducer { [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] Task Produce(StreamName stream, IEnumerable messages, CancellationToken cancellationToken = default); + + /// + /// Produce messages to multiple streams in parallel. + /// + /// Collection of produce requests, one per target stream + /// + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); } [PublicAPI] diff --git a/src/Core/src/Eventuous.Producers/ProduceRequest.cs b/src/Core/src/Eventuous.Producers/ProduceRequest.cs new file mode 100644 index 00000000..a57d3814 --- /dev/null +++ b/src/Core/src/Eventuous.Producers/ProduceRequest.cs @@ -0,0 +1,13 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.InteropServices; + +namespace Eventuous.Producers; + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages, TProduceOptions? Options) + where TProduceOptions : class; From a86022a5ad9ec1d8d48b9c7fcae238da018c1236 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 16:52:21 +0100 Subject: [PATCH 4/8] feat: update Gateway to use multi-stream produce Add multi-stream delegation methods to GatewayProducer (both generic ProduceRequest and non-generic ProduceRequest overloads). Update GatewayHandler to batch messages by target stream into ProduceRequest collections instead of producing per-message, reducing overhead. Co-Authored-By: Claude Opus 4.6 --- .../src/Eventuous.Gateway/GatewayHandler.cs | 31 +++++++++---------- .../src/Eventuous.Gateway/GatewayProducer.cs | 16 ++++++++++ 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs index d3e2fa13..75750c80 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs @@ -35,27 +35,24 @@ public override async ValueTask HandleEvent(IMessageConsume } try { - var grouped = shovelMessages.GroupBy(x => x.TargetStream); + var contextMeta = GatewayMetaHelper.GetContextMeta(context); - await grouped.Select(x => ProduceToStream(x.Key, x)).WhenAll().NoContext(); + var requests = shovelMessages + .GroupBy(x => x.TargetStream) + .Select(g => new ProduceRequest( + g.Key, + g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }), + g.First().ProduceOptions + )) + .ToArray(); + + if (producer is GatewayProducer gp) + await gp.Produce(requests, context.CancellationToken).NoContext(); + else + await Task.WhenAll(requests.Select(r => producer.Produce(r.Stream, r.Messages, r.Options, context.CancellationToken))).NoContext(); } catch (OperationCanceledException e) { context.Nack>(e); } return awaitProduce ? EventHandlingStatus.Success : EventHandlingStatus.Pending; - - Task ProduceToStream(StreamName streamName, IEnumerable> toProduce) - => toProduce.Select( - x => producer.Produce( - streamName, - x.Message, - x.GetMeta(context), - x.ProduceOptions, - GatewayMetaHelper.GetContextMeta(context), - onAck, - onFail, - context.CancellationToken - ) - ) - .WhenAll(); } } diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs index b420fc39..538e2a7a 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs @@ -12,6 +12,22 @@ public async Task Produce(StreamName stream, IEnumerable messag await inner.Produce(stream, messages, options, cancellationToken).NoContext(); } + public async Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + if (inner is BaseProducer baseProducer) { + await baseProducer.Produce(requests, cancellationToken).NoContext(); + } else { + await Task.WhenAll(requests.Select(r => inner.Produce(r.Stream, r.Messages, r.Options, cancellationToken))).NoContext(); + } + } + + public async Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + await ((IProducer)inner).Produce(requests, cancellationToken).NoContext(); + } + static async ValueTask WaitForInner(IProducer inner, CancellationToken cancellationToken) { if (inner is not IHostedProducer hosted) return; From 7d06e434bac4486abbb809a14ff97a2321bdddff Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 16:54:11 +0100 Subject: [PATCH 5/8] test: add multi-stream produce tests for BaseProducer Tests validate producing to multiple streams, empty requests, multiple messages to same stream, and untyped produce requests. Co-Authored-By: Claude Opus 4.6 --- .../MultiStreamProduceTests.cs | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs diff --git a/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs b/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs new file mode 100644 index 00000000..e30bf9f8 --- /dev/null +++ b/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs @@ -0,0 +1,90 @@ +using Eventuous.Producers; + +namespace Eventuous.Tests.Gateway; + +public class MultiStreamProduceTests { + [Test] + public async Task ShouldProduceToMultipleStreams() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1], null), + new(stream2, [msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + [Test] + public async Task ShouldHandleEmptyRequests() { + var producer = new TestProducer(); + + await producer.Produce(Array.Empty>()); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(0); + } + + [Test] + public async Task ShouldProduceMultipleMessagesToSameStream() { + var producer = new TestProducer(); + var stream = new StreamName("stream-1"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream, [msg1, msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams.Distinct().Count()).IsEqualTo(1); + } + + [Test] + public async Task ShouldProduceUntypedRequests() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1]), + new(stream2, [msg2]) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + class TestProducer : BaseProducer { + public List ProducedMessages { get; } = []; + public List Streams { get; } = []; + + protected override Task ProduceMessages( + StreamName stream, + IEnumerable messages, + TestProduceOptions? options, + CancellationToken cancellationToken = default + ) { + Streams.Add(stream); + ProducedMessages.AddRange(messages); + + return Task.CompletedTask; + } + } + + record TestProduceOptions; +} From ad4b3ab7576e6b1daa9667b6ec14c8e666264d76 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 17:26:50 +0100 Subject: [PATCH 6/8] fix: add trailing newline and preserve per-message produce options - Add missing trailing newline in BaseProducer.cs - Group by (Stream, ProduceOptions) instead of just Stream in GatewayHandler to preserve per-message options when messages to the same stream have different ProduceOptions Co-Authored-By: Claude Opus 4.6 --- src/Core/src/Eventuous.Producers/BaseProducer.cs | 2 +- src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Core/src/Eventuous.Producers/BaseProducer.cs b/src/Core/src/Eventuous.Producers/BaseProducer.cs index 1905829e..81d0b880 100644 --- a/src/Core/src/Eventuous.Producers/BaseProducer.cs +++ b/src/Core/src/Eventuous.Producers/BaseProducer.cs @@ -72,4 +72,4 @@ public Task Produce(IReadOnlyCollection requests, CancellationTo return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); } -} \ No newline at end of file +} diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs index 75750c80..4d53e505 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs @@ -38,11 +38,11 @@ public override async ValueTask HandleEvent(IMessageConsume var contextMeta = GatewayMetaHelper.GetContextMeta(context); var requests = shovelMessages - .GroupBy(x => x.TargetStream) + .GroupBy(x => (x.TargetStream, x.ProduceOptions)) .Select(g => new ProduceRequest( - g.Key, + g.Key.TargetStream, g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }), - g.First().ProduceOptions + g.Key.ProduceOptions )) .ToArray(); From f87b543c4643df2aa743cd01566ef63412dcd32d Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 17:27:42 +0100 Subject: [PATCH 7/8] fix: remove inverted _isHostedService flag in GatewayProducer WaitForInner already guards with `if (inner is not IHostedProducer) return`, so call it unconditionally. The previous boolean was inverted (true when NOT hosted), meaning the wait was never reached for actual hosted producers. Co-Authored-By: Claude Opus 4.6 --- src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs index 538e2a7a..7fd47917 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs @@ -4,16 +4,14 @@ namespace Eventuous.Gateway; class GatewayProducer(IProducer inner) : IProducer where T : class { - readonly bool _isHostedService = inner is not IHostedProducer; - public async Task Produce(StreamName stream, IEnumerable messages, T? options, CancellationToken cancellationToken = default) { - if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + await WaitForInner(inner, cancellationToken).NoContext(); await inner.Produce(stream, messages, options, cancellationToken).NoContext(); } public async Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { - if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + await WaitForInner(inner, cancellationToken).NoContext(); if (inner is BaseProducer baseProducer) { await baseProducer.Produce(requests, cancellationToken).NoContext(); @@ -23,7 +21,7 @@ public async Task Produce(IReadOnlyCollection> requests, Cance } public async Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { - if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + await WaitForInner(inner, cancellationToken).NoContext(); await ((IProducer)inner).Produce(requests, cancellationToken).NoContext(); } From cdf283c89ebb68e12732b30ebca37735d3653c44 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Mar 2026 17:46:01 +0100 Subject: [PATCH 8/8] fix: increase Azure Service Bus emulator startup timeout to 5 minutes The emulator container readiness check was hitting the default 100-second HttpClient.Timeout on CI runners, causing all Service Bus tests to fail. Co-Authored-By: Claude Opus 4.6 --- .../Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs index cc5d90c7..7c6e0459 100644 --- a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs +++ b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs @@ -17,7 +17,8 @@ public class AzureServiceBusFixture : IAsyncInitializer, IAsyncDisposable { .Build(); public async Task InitializeAsync() { - await Container.StartAsync(); + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + await Container.StartAsync(cts.Token); ConnectionString = Container.GetConnectionString();