-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use of Java Stream is strange #3
Comments
Hi @rod2j,
I went with Stream because of lazy evaluation and flexibility in processing (Streams allow for functional-style operations such as filtering, mapping, and reducing without modifying the original data). We could change that to a Thanks! |
I agree Stream felt natural at first sight, as you could directly use it without a call to stream() to start applying evolve function. I would go for SequencedCollection in the Decider and Saga interfaces: consumer of these interfaces can apply stream() to them to process them, as many times they want. |
OK. We can give it a try! Would you be able to provide a PR for this? Otherwise, it will have to wait for me, which will not happen in 2024 :) |
Definitely. I'll try to push it sometime next week. |
Hi @idugalic , I'm afraid I won't have time soon to propose a PR on the subject. Because it looks like the problem is deeper than that: -> All actions submitted to a combined aggregate through a Saga never initializes the state correctly (they never fetch it - always start with null in their respective part of the combined state):
Sorry... looks like the issue is actually bigger than a simple Java Stream / vs Kotlin Flow |
OK. There might be a bug there. Are you referring to a Kotlin Fmodel project? interface StateOrchestratingComputation<C, S, E> : ISaga<E, C>, StateComputation<C, S, E> {
/**
* Computes new State based on the previous State and the [command].
*
* `saga` reacts on new events and sends new commands to the `decider` recursively, until it is done.
*
* @param command of type [C]
* @return The newly computed state of type [S]
*/
@ExperimentalCoroutinesApi
override suspend fun S?.computeNewState(command: C): S {
val currentState = this ?: initialState
val events = decide(command, currentState)
val newState = events.fold(currentState) { s, e -> evolve(s, e) }
events.flatMapConcat { react(it) }.onEach { newState.computeNewState(it) }.collect()
return newState
}
} |
Actually, yes the problem is present both in the Kotlin Fmodel quoted above, and also in the corresponding Java Fmodel Line 89 in 17008ed
private S computeNewState(S state, C command) {
var currentState = state != null ? state : initialState().get();
var events = decide().apply(command, currentState);
var newState = events.sequential().reduce(currentState, (s, e) -> evolve().apply(s, e), (s, s2) -> s);
events.flatMap(it -> react().apply(it)).forEach(it -> computeNewState(newState, it));
return newState;
} a. in
b. in
|
b) is a bigger problem here. It can be leaky abstraction :( |
Looks like Event Sourcing can be generalized and abstracted better than State Stored systems. It preserves information (within events) better, and that is the main reason. |
Agree 100%. I didn't think of just adapting the repository to fetch the right product-ed state, as in current examples it was only fetching one part of the state depending on the command. It needs to be just a little be smarter. I'll give it a try after Christmas. Too bad fmodel-java doesn't have the restaurant demo yet: might try to initiate the state store version next year |
I think that the usage of Java Stream for all interfaces of Decider, Saga etc has issues.
Using Stream as a return type of public method is always risky as a Stream can only be consumed once.
It leads to all "Orchestrating" aggregate (at least in State stored aggregate) to trigger exceptions as the Stream of events is consumed twice:
Regards, and thanks for fmodel, it's a great source of inspiration.
The text was updated successfully, but these errors were encountered: