Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could not find existing saga data for message #1190

Open
sergiojrdotnet opened this issue Jan 8, 2025 · 0 comments
Open

Could not find existing saga data for message #1190

sergiojrdotnet opened this issue Jan 8, 2025 · 0 comments

Comments

@sergiojrdotnet
Copy link

From what I can see, my saga handler isn't buffering outgoing messages. As a result, when the second message (WorkAFinished) arrives, the saga data hasn't been persisted to MongoDB yet.

image

Program.cs

using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Serializers;
using MongoDB.Driver;
using Rebus.Bus;
using Rebus.Config;
using SagasTests;

WebApplicationBuilder builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<IMongoClient>(provider =>
{
    BsonSerializer.RegisterSerializer(new GuidSerializer(GuidRepresentation.Standard));
    BsonSerializer.RegisterSerializer(new ObjectSerializer(type =>
        ObjectSerializer.DefaultAllowedTypes(type) || type.FullName!.StartsWith("SagasTests")));

    var configuration = provider.GetRequiredService<IConfiguration>();
    var connectionString = configuration.GetConnectionString("MongoDb");
    return new MongoClient(connectionString);
});

builder.Services.AddRebus((configure, provider) =>
{
    var configuration = provider.GetRequiredService<IConfiguration>();

    var mongoClient = provider.GetRequiredService<IMongoClient>();
    IMongoDatabase database = mongoClient.GetDatabase("sagas");

    return configure
        .Transport(transport =>
        {
            transport.UseAzureServiceBus(configuration.GetConnectionString("ServiceBus"), "sagas");
            transport.UseNativeDeadlettering();
            transport.UseNativeHeaders();
        })
        .Sagas(sagas => sagas.StoreInMongoDb(database));
}, onCreated: async bus =>
{
    await bus.Subscribe<WorkStarted>();
    await bus.Subscribe<WorkAFinished>();
    await bus.Subscribe<WorkBFinished>();
    await bus.Subscribe<WorkCFinished>();
});

builder.Services.AddRebusHandler<WorkSaga>();

WebApplication app = builder.Build();

app.MapPost("/run/{name}", async (IBus bus, string name) =>
{
    var id = Guid.NewGuid().ToString();

    var work = new WorkStarted(id, name);

    await bus.Publish(work);

    return work;
});

app.Run();

WorkSaga.cs

using Rebus.Bus;
using Rebus.Handlers;
using Rebus.Sagas;

namespace SagasTests;

public class WorkSaga : Saga<WorkSagaData>,
    IAmInitiatedBy<WorkStarted>,
    IHandleMessages<WorkAFinished>,
    IHandleMessages<WorkBFinished>,
    IHandleMessages<WorkCFinished>
{
    private readonly ILogger<WorkSaga> _logger;
    private readonly IBus _bus;

    public WorkSaga(ILogger<WorkSaga> logger, IBus bus)
    {
        _logger = logger;
        _bus = bus;
    }

    protected override void CorrelateMessages(ICorrelationConfig<WorkSagaData> config)
    {
        config.Correlate<WorkStarted>(m => m.Id, d => d.Id);
        config.Correlate<WorkAFinished>(m => m.Id, d => d.Id);
        config.Correlate<WorkBFinished>(m => m.Id, d => d.Id);
        config.Correlate<WorkCFinished>(m => m.Id, d => d.Id);
    }

    public async Task Handle(WorkStarted message)
    {
        _logger.LogInformation("WORK started: {Id}, {Name}", message.Id, message.Name);

        Data.Name = message.Name;

        // Do some stuff

        Data.WorkAFinished = true;

        await _bus.Publish(new WorkAFinished(message.Id));

        _logger.LogInformation("WORK A finished: {Id}", message.Id);
    }

    public async Task Handle(WorkAFinished message)
    {
        _logger.LogInformation("WORK B started: {Id}", message.Id);

        // Do some stuff

        Data.WorkBFinished = true;

        await _bus.Publish(new WorkBFinished(message.Id));

        _logger.LogInformation("WORK B finished: {Id}", message.Id);
    }

    public async Task Handle(WorkBFinished message)
    {
        _logger.LogInformation("WORK C started: {Id}", message.Id);

        // Do some stuff

        Data.WorkCFinished = true;

        await _bus.Publish(new WorkCFinished(message.Id));

        _logger.LogInformation("WORK C finished: {Id}", message.Id);
    }

    public Task Handle(WorkCFinished message)
    {
        MarkAsComplete();

        _logger.LogInformation("WORK saga finished: {Id}", message.Id); 

        return Task.CompletedTask;
    }
}

public class WorkSagaData : SagaData
{
    public string? Name { get; set; }

    public bool WorkAFinished { get; set; }

    public bool WorkBFinished { get; set; }

    public bool WorkCFinished { get; set; }
}

Messages.cs

namespace SagasTests;

public abstract record WorkBase(string Id);

public record WorkStarted(string Id, string Name) : WorkBase(Id);

public record WorkAFinished(string Id) : WorkBase(Id);

public record WorkBFinished(string Id) : WorkBase(Id);

public record WorkCFinished(string Id) : WorkBase(Id);

What am I missing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant