You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
importoperatorimportuuidfromtypingimportOptional, Annotated, Listfromlanggraph.checkpoint.memoryimportMemorySaverfromlanggraph.constantsimportSTART, ENDfromlanggraph.graphimportStateGraphfromlanggraph.typesimportinterrupt, Send, Interrupt, CommandfrompydanticimportBaseModel, Field# --- CHILD GRAPH ---classChildState(BaseModel):
prompt: str=Field(
...,
description="What is going to be asked to the user?"
)
human_input: Optional[str] =Field(
None,
description="What the human said"
)
human_inputs: Annotated[List[str], operator.add] =Field(
default_factory=list,
description="All of my messages"
)
defget_human_input(state: ChildState):
human_input=interrupt(state.prompt)
returndict(
human_input=human_input, # update child statehuman_inputs=[human_input], # update parent state
)
child_graph_builder=StateGraph(ChildState)
child_graph_builder.add_node("get_human_input", get_human_input)
child_graph_builder.add_edge(START, "get_human_input")
child_graph_builder.add_edge("get_human_input", END)
child_graph=child_graph_builder.compile(checkpointer=True)
# --- PARENT GRAPH ---classParentState(BaseModel):
prompts: List[str] =Field(
...,
description="What is going to be asked to the user?"
)
human_inputs: Annotated[List[str], operator.add] =Field(
default_factory=list,
description="All of my messages"
)
defassign_workers(state: ParentState):
return [
Send(
"child_graph",
dict(
prompt=prompt,
)
)
forpromptinstate.prompts
]
defcleanup(state: ParentState):
assertlen(state.human_inputs) ==len(state.prompts)
parent_graph_builder=StateGraph(ParentState)
parent_graph_builder.add_node("child_graph", child_graph)
parent_graph_builder.add_node("cleanup", cleanup)
parent_graph_builder.add_conditional_edges(START, assign_workers, ["child_graph"])
parent_graph_builder.add_edge("child_graph", "cleanup")
parent_graph_builder.add_edge("cleanup", END)
parent_graph=parent_graph_builder.compile(checkpointer=MemorySaver())
# --- CLIENT INVOCATION ---if__name__=="__main__":
thread_config=dict(
configurable=dict(
thread_id=uuid.uuid4(),
)
)
current_input=dict(
prompts=['a', 'b'],
)
done=Falsewhilenotdone:
# reset interruptcurrent_interrupt: Optional[Interrupt] =None# start / resume the graphforeventinparent_graph.stream(
input=current_input,
config=thread_config,
stream_mode="updates",
):
print(event)
# handle the interruptif"__interrupt__"inevent:
current_interrupt: Interrupt=event["__interrupt__"][0]
# assume that it breaks here, because it is an interrupt# get human input and resumeif (
current_interruptisnotNoneandcurrent_interrupt.resumableisTrue# make sure it is resumable
):
response="Test Input"current_input=Command(resume=response)
# not more human input required, must be completedelse:
done=Trueexit(0)
Error Message and Stack Trace (if applicable)
Description
Moving from LangGraph 3.13.0 to 3.14.0 we noticed a change in behavior that we had come to expect and rely on specifically related to this PR: #3889 and this issue: #3398.
The behavior that we had been relying on was the ability to start a graph using ainvoke with command resume to restart the graph from multiple interrupts that reside within parallel subgraphs.
For this example, assume we had 2 parallel subgraphs. The first subgraph has an interrupt in a node toward the beginning of the subgraph while the second subgraph has an interrupt in a node toward the end. Utilizing ainvoke we were able to wait until both subgraphs either complete or raise an interrupt. If the first subgraph raised an interrupt toward the beginning of its execution, the graph would still process the rest of the second subgraph until it too either hit an interrupt or completed. At that point we could then present any interrupts to the user at one singular time allowing them to provide all of their feedback before continuing the graphs execution.
In LangGraph 3.14.0 we are still able to gather the interrupts that were hit in the parallel subgraphs, but we are unable to resume all of the interrupts through a single graph invoke. We are aware that we could simply invoke the graph and hit the waiting interrupt again, but we would then need to wait for the subgraph that we just started with the resume to complete, instead of simply resuming both at the same time. This would also force the call to invoke the graph multiple times and force the reload of the correct state from the checkpointer multiple times. This problem gets worse as the size and concurrency of the subgraphs grow.
This behavior can be seen in the outputs from the above code that was initially provided in this issue: #3398
Output From Code Above:
In LangGraph 3.13.0 you see both interrupts get hit in the parallel subgraphs and they are able to be resumed through a single resume.
In LangGraph 3.14.0 you will see that both interrupts are still hit on the first pass through the graph, but when you resume, you only restart from the first interrupt and must wait to call the graph to resume the second subgraph, once that interrupt gets raised again.
The considerations made in the original issue are still valid, if you wanted to break the graph and return to the user after each interrupt is encountered regardless of whether it is in parallel subgraphs. Due to the long running nature and high parallelization of our system we have found it to be the optimal user experience to wait until each parallel subgraph has either completed or hit an interrupt, giving the user the ability to provide their input all at once instead of sending them a new message and waiting for their input after each one.
I believe it is possible for both methods to be supported. Currently to get around the limitations of only being able to send a single resume value when restarting all of the interrupts, we have had to utilize a dict, mapping the interrupt node name to the relevant responses from the user. Ideally, we would like the ability to specify multiple resume values or commands that would then map to each encountered interrupt. This would also fit with the previous issue raised as they could simply respond with a single resume to restart from each interrupt that is encountered.
Happy to provide more code examples to support this methodology.
System Info
System Information
OS: Linux
OS Version: #1 SMP Tue Nov 5 00:21:55 UTC 2024
Python Version: 3.12.3 (main, Jan 17 2025, 18:03:48) [GCC 13.3.0]
Running into the same issue, as we came to expect the previous behavior of subgraph interrupts prior to v0.3.14.
The behavior that is described above (ability to resume from multiple interrupts) makes more sense than the change that was added in v0.3.14, as it does not break down as both concurrency and nesting of subgraphs increase.
Hi, the previous behavior was a bug that was using the same resume value for multiple unrelated interrupts, which is not expected or desirable. We will add a way to pass multiple resume values in the same invoke call, expect this to be added in coming days
Checked other resources
Example Code
Error Message and Stack Trace (if applicable)
Description
Moving from LangGraph 3.13.0 to 3.14.0 we noticed a change in behavior that we had come to expect and rely on specifically related to this PR: #3889 and this issue: #3398.
The behavior that we had been relying on was the ability to start a graph using ainvoke with command resume to restart the graph from multiple interrupts that reside within parallel subgraphs.
For this example, assume we had 2 parallel subgraphs. The first subgraph has an interrupt in a node toward the beginning of the subgraph while the second subgraph has an interrupt in a node toward the end. Utilizing ainvoke we were able to wait until both subgraphs either complete or raise an interrupt. If the first subgraph raised an interrupt toward the beginning of its execution, the graph would still process the rest of the second subgraph until it too either hit an interrupt or completed. At that point we could then present any interrupts to the user at one singular time allowing them to provide all of their feedback before continuing the graphs execution.
In LangGraph 3.14.0 we are still able to gather the interrupts that were hit in the parallel subgraphs, but we are unable to resume all of the interrupts through a single graph invoke. We are aware that we could simply invoke the graph and hit the waiting interrupt again, but we would then need to wait for the subgraph that we just started with the resume to complete, instead of simply resuming both at the same time. This would also force the call to invoke the graph multiple times and force the reload of the correct state from the checkpointer multiple times. This problem gets worse as the size and concurrency of the subgraphs grow.
This behavior can be seen in the outputs from the above code that was initially provided in this issue:
#3398
Output From Code Above:
In LangGraph 3.13.0 you see both interrupts get hit in the parallel subgraphs and they are able to be resumed through a single resume.
{'interrupt': (Interrupt(value='a', resumable=True, ns=['child_graph', 'get_human_input:f1c95e13-34de-3127-a6cf-7945d349df2f'], when='during'),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:09753ebf-d018-7591-45f0-a718d2fc7f7f'], when='during'),)}
{'child_graph': {'human_inputs': ['Test Input']}}
{'child_graph': {'human_inputs': ['Test Input']}}
{'cleanup': None}
In LangGraph 3.14.0 you will see that both interrupts are still hit on the first pass through the graph, but when you resume, you only restart from the first interrupt and must wait to call the graph to resume the second subgraph, once that interrupt gets raised again.
{'interrupt': (Interrupt(value='a', resumable=True, ns=['child_graph', 'get_human_input:eea06b75-9d01-2d5f-8bef-b39884443394']),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:2d8ef817-08cc-8346-a2f8-4240273e50cf']),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:2d8ef817-08cc-8346-a2f8-4240273e50cf']),)}
{'child_graph': {'human_inputs': ['Test Input']}}
{'child_graph': {'human_inputs': ['Test Input']}, 'metadata': {'cached': True}}
{'child_graph': {'human_inputs': ['Test Input']}}
{'cleanup': None}
The considerations made in the original issue are still valid, if you wanted to break the graph and return to the user after each interrupt is encountered regardless of whether it is in parallel subgraphs. Due to the long running nature and high parallelization of our system we have found it to be the optimal user experience to wait until each parallel subgraph has either completed or hit an interrupt, giving the user the ability to provide their input all at once instead of sending them a new message and waiting for their input after each one.
I believe it is possible for both methods to be supported. Currently to get around the limitations of only being able to send a single resume value when restarting all of the interrupts, we have had to utilize a dict, mapping the interrupt node name to the relevant responses from the user. Ideally, we would like the ability to specify multiple resume values or commands that would then map to each encountered interrupt. This would also fit with the previous issue raised as they could simply respond with a single resume to restart from each interrupt that is encountered.
Happy to provide more code examples to support this methodology.
System Info
System Information
Package Information
The text was updated successfully, but these errors were encountered: