You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
From what I can see, my saga handler isn't buffering outgoing messages. As a result, when the second message (WorkAFinished) arrives, the saga data hasn't been persisted to MongoDB yet.
Program.cs
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Serializers;
using MongoDB.Driver;
using Rebus.Bus;
using Rebus.Config;
using SagasTests;
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IMongoClient>(provider =>
{
BsonSerializer.RegisterSerializer(new GuidSerializer(GuidRepresentation.Standard));
BsonSerializer.RegisterSerializer(new ObjectSerializer(type =>
ObjectSerializer.DefaultAllowedTypes(type) || type.FullName!.StartsWith("SagasTests")));
var configuration = provider.GetRequiredService<IConfiguration>();
var connectionString = configuration.GetConnectionString("MongoDb");
return new MongoClient(connectionString);
});
builder.Services.AddRebus((configure, provider) =>
{
var configuration = provider.GetRequiredService<IConfiguration>();
var mongoClient = provider.GetRequiredService<IMongoClient>();
IMongoDatabase database = mongoClient.GetDatabase("sagas");
return configure
.Transport(transport =>
{
transport.UseAzureServiceBus(configuration.GetConnectionString("ServiceBus"), "sagas");
transport.UseNativeDeadlettering();
transport.UseNativeHeaders();
})
.Sagas(sagas => sagas.StoreInMongoDb(database));
}, onCreated: async bus =>
{
await bus.Subscribe<WorkStarted>();
await bus.Subscribe<WorkAFinished>();
await bus.Subscribe<WorkBFinished>();
await bus.Subscribe<WorkCFinished>();
});
builder.Services.AddRebusHandler<WorkSaga>();
WebApplication app = builder.Build();
app.MapPost("/run/{name}", async (IBus bus, string name) =>
{
var id = Guid.NewGuid().ToString();
var work = new WorkStarted(id, name);
await bus.Publish(work);
return work;
});
app.Run();
WorkSaga.cs
using Rebus.Bus;
using Rebus.Handlers;
using Rebus.Sagas;
namespace SagasTests;
public class WorkSaga : Saga<WorkSagaData>,
IAmInitiatedBy<WorkStarted>,
IHandleMessages<WorkAFinished>,
IHandleMessages<WorkBFinished>,
IHandleMessages<WorkCFinished>
{
private readonly ILogger<WorkSaga> _logger;
private readonly IBus _bus;
public WorkSaga(ILogger<WorkSaga> logger, IBus bus)
{
_logger = logger;
_bus = bus;
}
protected override void CorrelateMessages(ICorrelationConfig<WorkSagaData> config)
{
config.Correlate<WorkStarted>(m => m.Id, d => d.Id);
config.Correlate<WorkAFinished>(m => m.Id, d => d.Id);
config.Correlate<WorkBFinished>(m => m.Id, d => d.Id);
config.Correlate<WorkCFinished>(m => m.Id, d => d.Id);
}
public async Task Handle(WorkStarted message)
{
_logger.LogInformation("WORK started: {Id}, {Name}", message.Id, message.Name);
Data.Name = message.Name;
// Do some stuff
Data.WorkAFinished = true;
await _bus.Publish(new WorkAFinished(message.Id));
_logger.LogInformation("WORK A finished: {Id}", message.Id);
}
public async Task Handle(WorkAFinished message)
{
_logger.LogInformation("WORK B started: {Id}", message.Id);
// Do some stuff
Data.WorkBFinished = true;
await _bus.Publish(new WorkBFinished(message.Id));
_logger.LogInformation("WORK B finished: {Id}", message.Id);
}
public async Task Handle(WorkBFinished message)
{
_logger.LogInformation("WORK C started: {Id}", message.Id);
// Do some stuff
Data.WorkCFinished = true;
await _bus.Publish(new WorkCFinished(message.Id));
_logger.LogInformation("WORK C finished: {Id}", message.Id);
}
public Task Handle(WorkCFinished message)
{
MarkAsComplete();
_logger.LogInformation("WORK saga finished: {Id}", message.Id);
return Task.CompletedTask;
}
}
public class WorkSagaData : SagaData
{
public string? Name { get; set; }
public bool WorkAFinished { get; set; }
public bool WorkBFinished { get; set; }
public bool WorkCFinished { get; set; }
}
Messages.cs
namespace SagasTests;
public abstract record WorkBase(string Id);
public record WorkStarted(string Id, string Name) : WorkBase(Id);
public record WorkAFinished(string Id) : WorkBase(Id);
public record WorkBFinished(string Id) : WorkBase(Id);
public record WorkCFinished(string Id) : WorkBase(Id);
What am I missing?
The text was updated successfully, but these errors were encountered:
From what I can see, my saga handler isn't buffering outgoing messages. As a result, when the second message (
WorkAFinished
) arrives, the saga data hasn't been persisted to MongoDB yet.Program.cs
WorkSaga.cs
Messages.cs
What am I missing?
The text was updated successfully, but these errors were encountered: