Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to resume multiple interrupts from a single graph invoke #4028

Open
4 tasks done
cnummer1 opened this issue Mar 26, 2025 · 2 comments
Open
4 tasks done

Unable to resume multiple interrupts from a single graph invoke #4028

cnummer1 opened this issue Mar 26, 2025 · 2 comments

Comments

@cnummer1
Copy link

Checked other resources

  • This is a bug, not a usage question. For questions, please use GitHub Discussions.
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

import operator
import uuid
from typing import Optional, Annotated, List

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Send, Interrupt, Command
from pydantic import BaseModel, Field


# --- CHILD GRAPH ---

class ChildState(BaseModel):
    prompt: str = Field(
        ...,
        description="What is going to be asked to the user?"
    )
    human_input: Optional[str] = Field(
        None,
        description="What the human said"
    )
    human_inputs: Annotated[List[str], operator.add] = Field(
        default_factory=list,
        description="All of my messages"
    )

def get_human_input(state: ChildState):
    human_input = interrupt(state.prompt)

    return dict(
        human_input=human_input,  # update child state
        human_inputs=[human_input],  # update parent state
    )

child_graph_builder = StateGraph(ChildState)
child_graph_builder.add_node("get_human_input", get_human_input)
child_graph_builder.add_edge(START, "get_human_input")
child_graph_builder.add_edge("get_human_input", END)
child_graph = child_graph_builder.compile(checkpointer=True)

# --- PARENT GRAPH ---

class ParentState(BaseModel):
    prompts: List[str] = Field(
        ...,
        description="What is going to be asked to the user?"
    )
    human_inputs: Annotated[List[str], operator.add] = Field(
        default_factory=list,
        description="All of my messages"
    )

def assign_workers(state: ParentState):
    return [
        Send(
            "child_graph",
            dict(
                prompt=prompt,
            )
        )
        for prompt in state.prompts
    ]

def cleanup(state: ParentState):
    assert len(state.human_inputs) == len(state.prompts)

parent_graph_builder = StateGraph(ParentState)
parent_graph_builder.add_node("child_graph", child_graph)
parent_graph_builder.add_node("cleanup", cleanup)

parent_graph_builder.add_conditional_edges(START, assign_workers, ["child_graph"])
parent_graph_builder.add_edge("child_graph", "cleanup")
parent_graph_builder.add_edge("cleanup", END)

parent_graph = parent_graph_builder.compile(checkpointer=MemorySaver())


# --- CLIENT INVOCATION ---

if __name__ == "__main__":
    thread_config = dict(
        configurable=dict(
            thread_id=uuid.uuid4(),
        )
    )
    current_input = dict(
        prompts=['a', 'b'],
    )

    done = False
    while not done:
        # reset interrupt
        current_interrupt: Optional[Interrupt] = None

        # start / resume the graph
        for event in parent_graph.stream(
            input=current_input,
            config=thread_config,
            stream_mode="updates",
        ):
            print(event)
            # handle the interrupt
            if "__interrupt__" in event:
                current_interrupt: Interrupt = event["__interrupt__"][0]
                # assume that it breaks here, because it is an interrupt


        # get human input and resume
        if (
            current_interrupt is not None
            and current_interrupt.resumable is True  # make sure it is resumable
        ):
            response = "Test Input"
            current_input = Command(resume=response)

        # not more human input required, must be completed
        else:
            done = True

    exit(0)

Error Message and Stack Trace (if applicable)

Description

Moving from LangGraph 3.13.0 to 3.14.0 we noticed a change in behavior that we had come to expect and rely on specifically related to this PR: #3889 and this issue: #3398.

The behavior that we had been relying on was the ability to start a graph using ainvoke with command resume to restart the graph from multiple interrupts that reside within parallel subgraphs.

For this example, assume we had 2 parallel subgraphs. The first subgraph has an interrupt in a node toward the beginning of the subgraph while the second subgraph has an interrupt in a node toward the end. Utilizing ainvoke we were able to wait until both subgraphs either complete or raise an interrupt. If the first subgraph raised an interrupt toward the beginning of its execution, the graph would still process the rest of the second subgraph until it too either hit an interrupt or completed. At that point we could then present any interrupts to the user at one singular time allowing them to provide all of their feedback before continuing the graphs execution.

In LangGraph 3.14.0 we are still able to gather the interrupts that were hit in the parallel subgraphs, but we are unable to resume all of the interrupts through a single graph invoke. We are aware that we could simply invoke the graph and hit the waiting interrupt again, but we would then need to wait for the subgraph that we just started with the resume to complete, instead of simply resuming both at the same time. This would also force the call to invoke the graph multiple times and force the reload of the correct state from the checkpointer multiple times. This problem gets worse as the size and concurrency of the subgraphs grow.

This behavior can be seen in the outputs from the above code that was initially provided in this issue:
#3398

Output From Code Above:

In LangGraph 3.13.0 you see both interrupts get hit in the parallel subgraphs and they are able to be resumed through a single resume.

{'interrupt': (Interrupt(value='a', resumable=True, ns=['child_graph', 'get_human_input:f1c95e13-34de-3127-a6cf-7945d349df2f'], when='during'),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:09753ebf-d018-7591-45f0-a718d2fc7f7f'], when='during'),)}
{'child_graph': {'human_inputs': ['Test Input']}}
{'child_graph': {'human_inputs': ['Test Input']}}
{'cleanup': None}

In LangGraph 3.14.0 you will see that both interrupts are still hit on the first pass through the graph, but when you resume, you only restart from the first interrupt and must wait to call the graph to resume the second subgraph, once that interrupt gets raised again.

{'interrupt': (Interrupt(value='a', resumable=True, ns=['child_graph', 'get_human_input:eea06b75-9d01-2d5f-8bef-b39884443394']),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:2d8ef817-08cc-8346-a2f8-4240273e50cf']),)}
{'interrupt': (Interrupt(value='b', resumable=True, ns=['child_graph', 'get_human_input:2d8ef817-08cc-8346-a2f8-4240273e50cf']),)}
{'child_graph': {'human_inputs': ['Test Input']}}
{'child_graph': {'human_inputs': ['Test Input']}, 'metadata': {'cached': True}}
{'child_graph': {'human_inputs': ['Test Input']}}
{'cleanup': None}

The considerations made in the original issue are still valid, if you wanted to break the graph and return to the user after each interrupt is encountered regardless of whether it is in parallel subgraphs. Due to the long running nature and high parallelization of our system we have found it to be the optimal user experience to wait until each parallel subgraph has either completed or hit an interrupt, giving the user the ability to provide their input all at once instead of sending them a new message and waiting for their input after each one.

I believe it is possible for both methods to be supported. Currently to get around the limitations of only being able to send a single resume value when restarting all of the interrupts, we have had to utilize a dict, mapping the interrupt node name to the relevant responses from the user. Ideally, we would like the ability to specify multiple resume values or commands that would then map to each encountered interrupt. This would also fit with the previous issue raised as they could simply respond with a single resume to restart from each interrupt that is encountered.

Happy to provide more code examples to support this methodology.

System Info

System Information

OS: Linux
OS Version: #1 SMP Tue Nov 5 00:21:55 UTC 2024
Python Version: 3.12.3 (main, Jan 17 2025, 18:03:48) [GCC 13.3.0]

Package Information

langchain_core: 0.3.48
langchain: 0.3.4
langchain_community: 0.3.3
langsmith: 0.1.147
langchain_anthropic: 0.2.3
langchain_cohere: 0.3.4
langchain_experimental: 0.3.4
langchain_openai: 0.3.3
langchain_text_splitters: 0.3.7
langgraph_sdk: 0.1.58

@kbuca1
Copy link

kbuca1 commented Mar 26, 2025

Running into the same issue, as we came to expect the previous behavior of subgraph interrupts prior to v0.3.14.

The behavior that is described above (ability to resume from multiple interrupts) makes more sense than the change that was added in v0.3.14, as it does not break down as both concurrency and nesting of subgraphs increase.

@nfcampos
Copy link
Contributor

nfcampos commented Mar 26, 2025

Hi, the previous behavior was a bug that was using the same resume value for multiple unrelated interrupts, which is not expected or desirable. We will add a way to pass multiple resume values in the same invoke call, expect this to be added in coming days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants