Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use of Java Stream is strange #3

Open
rod2j opened this issue Dec 6, 2024 · 10 comments
Open

Use of Java Stream is strange #3

rod2j opened this issue Dec 6, 2024 · 10 comments

Comments

@rod2j
Copy link

rod2j commented Dec 6, 2024

I think that the usage of Java Stream for all interfaces of Decider, Saga etc has issues.

Using Stream as a return type of public method is always risky as a Stream can only be consumed once.

It leads to all "Orchestrating" aggregate (at least in State stored aggregate) to trigger exceptions as the Stream of events is consumed twice:

  • once to apply the evolve() function of the Decider,
  • once to apply the react() function of the Saga

Regards, and thanks for fmodel, it's a great source of inspiration.

@idugalic
Copy link
Member

idugalic commented Dec 6, 2024

Hi @rod2j,

fmodel-java is still an infant :)
We miss tests, especially on the Application layer (you might have found a bug there).

I went with Stream because of lazy evaluation and flexibility in processing (Streams allow for functional-style operations such as filtering, mapping, and reducing without modifying the original data).

We could change that to a List or SequencedCollection. That also makes sense; nevertheless, stream feels more natural to me. Do you have any idea what you would change Stream to?

Thanks!

@rod2j
Copy link
Author

rod2j commented Dec 6, 2024

I agree Stream felt natural at first sight, as you could directly use it without a call to stream() to start applying evolve function.
But as some aggregate might need a Saga where you also need to reapply the Saga#react() function you need process the sequence of events twice and Java Stream doesn't permit that.

I would go for SequencedCollection in the Decider and Saga interfaces: consumer of these interfaces can apply stream() to them to process them, as many times they want.
Plus it lets implementations of these interfaces decide which concrete implementations of SequencedCollection they want to return: List, SortedSet, Deque etc. though I doubt many would need to navigate events differently than from 1st to last, but we never know.

@idugalic
Copy link
Member

idugalic commented Dec 6, 2024

OK. We can give it a try!

Would you be able to provide a PR for this?
Stream -> SequencedCollection.

Otherwise, it will have to wait for me, which will not happen in 2024 :)
I will give the Java Fmodel more love next year.

@rod2j
Copy link
Author

rod2j commented Dec 6, 2024

Definitely. I'll try to push it sometime next week.

@rod2j
Copy link
Author

rod2j commented Dec 9, 2024

Hi @idugalic ,

I'm afraid I won't have time soon to propose a PR on the subject.

Because it looks like the problem is deeper than that:
It looks like all State Stored Orchestrating aggregates have a problem (not only fmodel-java, but fmodel-kotlin as well) :

-> All actions submitted to a combined aggregate through a Saga never initializes the state correctly (they never fetch it - always start with null in their respective part of the combined state):

  • so it only works if the given command returned by the Saga expect an initial null state (but will never detect a duplicate)
  • plus all actions returned by the saga are all applied to the same initial null state, instead of being applied to the state computed by the previous action (actually the state returned by the computeNewState is never reused for the next action)
  • the state ( I mean the part of the combined state that is the one used in the decider that accepts commands of the saga) is never saved
    I could verify all this in the demo application fmodel-spring-state-stored-demo that :
  1. I could place as many times as I want the same order to the restaurant with the same order id without any error,
  2. the order were never saved in the db

Sorry... looks like the issue is actually bigger than a simple Java Stream / vs Kotlin Flow

@idugalic
Copy link
Member

OK. There might be a bug there.

Are you referring to a Kotlin Fmodel project?
https://github.com/fraktalio/fmodel/blob/104d746ac1e590ba9ed50cb5648c7a6f483e4dbd/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregate.kt#L87

interface StateOrchestratingComputation<C, S, E> : ISaga<E, C>, StateComputation<C, S, E> {
    /**
     * Computes new State based on the previous State and the [command].
     *
     * `saga` reacts on new events and sends new commands to the `decider` recursively, until it is done.
     *
     * @param command of type [C]
     * @return The newly computed state of type [S]
     */
    @ExperimentalCoroutinesApi
    override suspend fun S?.computeNewState(command: C): S {
        val currentState = this ?: initialState
        val events = decide(command, currentState)
        val newState = events.fold(currentState) { s, e -> evolve(s, e) }
        events.flatMapConcat { react(it) }.onEach { newState.computeNewState(it) }.collect()
        return newState
    }
}

@rod2j
Copy link
Author

rod2j commented Dec 13, 2024

Actually, yes the problem is present both in the Kotlin Fmodel quoted above, and also in the corresponding Java Fmodel
StateStoredLockingOrchestratingAggregate / StateStoredLockingOrchestratingAggregate :

events.flatMap(it -> react().apply(it)).forEach(it -> computeNewState(newState, it));

    private S computeNewState(S state, C command) {
        var currentState = state != null ? state : initialState().get();
        var events = decide().apply(command, currentState);
        var newState = events.sequential().reduce(currentState, (s, e) -> evolve().apply(s, e), (s, s2) -> s);
        events.flatMap(it -> react().apply(it)).forEach(it -> computeNewState(newState, it));
        return newState;
    }

a. in events.flatMap(it -> react().apply(it)).forEach(it -> computeNewState(newState, it)); :

  • the state returned by computeNewState(newState,it) is not used to update newState
  • so the value of newState that the aggregate will end up storing will always be the same as the one before executing the saga

b. in events.flatMap(it -> react().apply(it)).forEach(it -> computeNewState(newState, it)); as well :

  • when we start applying commands generated by the saga we start with an initial state that has only fetched one subpart of the combined state
    ex: in Restaurant/Order example, when you have handled a RestaurantCommand, the combined state which is a Pair<Restaurant, Order> is actually <aRestaurant, null>. When you want to apply a OrderCommand through the Saga, you would need to be able to fetch the Order state: but you don't want to get a <null, initialOrderState>, you want to "augment" the existing state to obtain a <aRestaurant, initialOrderState>.
    It means the IStateRepository method S fetchState(C command) is not enough for handling initial state when it is a combined state in "state stored orchestrating aggregate".

@idugalic
Copy link
Member

b) is a bigger problem here. It can be leaky abstraction :(
Once you combine more Deciders into a single Decider, the final state of that StateStoredAggregate will be the product of all these small states per Decider (Restaurant AND Order). This orchestrated and combined aggregate knows only about this product state (Restaurant AND Order). This is the API of the infra layer. Saying all that, it is a must to provide/fetch all state/product upfront (<RestaurantState, OrderState>) based on that first Command. State Stored Aggregate does not dynamically call fetchState for every new command published by Saga. That is where EventSourcedAggregate and StateStoredAggregate differ. EventSourcedAggregate does not have that problem as Event are SUM-ed not PRODUCT-ed, and here we can call fetchEvents as we will get just different list of Events (type of events is the same). In case of fetchState you fetch the product type (RestaurantState AND OrderState), and this is a different type

@idugalic
Copy link
Member

Looks like Event Sourcing can be generalized and abstracted better than State Stored systems. It preserves information (within events) better, and that is the main reason.

@rod2j
Copy link
Author

rod2j commented Dec 13, 2024

Agree 100%. I didn't think of just adapting the repository to fetch the right product-ed state, as in current examples it was only fetching one part of the state depending on the command. It needs to be just a little be smarter.
Also repository impl needs to be checked/updated whenever you add a new saga.

I'll give it a try after Christmas. Too bad fmodel-java doesn't have the restaurant demo yet: might try to initiate the state store version next year

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants