Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The LangGraph streaming example fails with 'Error in ConsoleCallbackHandler.on_llm_new_token' #4119

Open
4 tasks done
qmz opened this issue Apr 2, 2025 · 9 comments
Open
4 tasks done
Assignees

Comments

@qmz
Copy link

qmz commented Apr 2, 2025

Checked other resources

  • This is a bug, not a usage question. For questions, please use GitHub Discussions.
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

Just like the example in https://langchain-ai.github.io/langgraph/how-tos/streaming-tokens/

import asyncio
import os
import sys
from typing import TypedDict
from langgraph.graph import START, StateGraph
import langchain
langchain.debug = True

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from app.services.llm_service import llm_service
joke_model = llm_service.get_llm("deepseek-r1-70b")
poem_model = llm_service.get_llm("deepseek-r1-70b")

class State(TypedDict):
    topic: str
    joke: str
    poem: str

async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}

async def run_example():
    graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
    print("Starting stream example...")
    async for msg, metadata in graph.astream(
        {"topic": "cats"},
        stream_mode="messages",
    ):
        if msg.content:
            print(msg.content, end="|", flush=True)

if __name__ == "__main__":
    asyncio.run(run_example())


The astream_events function also fails with an AssertionError
 
async for msg in graph.astream_events(
        {"topic": "cats"},
        stream_mode="messages",
    ):
#        if msg.content:
            print(msg, end="|", flush=True)

Error in _AstreamEventsCallbackHandler.on_llm_new_token callback: AssertionError('Run ID 13a8c64a-3d02-497f-ba77-2ad7317828cd not found in run map.')

Error Message and Stack Trace (if applicable)

Writing joke...
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID bf46f8d0-aae4-4d49-95c8-bedd79dbc85a.')

Description

I want to see the LLM node of the graph invoking streaming progress in real-time to improve the UX in the complex workflow.

System Info

python -m langchain_core.sys_info
System Information
OS: Darwin
OS Version: Darwin Kernel Version 24.3.0: Thu Jan 2 20:24:06 PST 2025; root:xnu-11215.81.4~3/RELEASE_ARM64_T8103
Python Version: 3.12.5 (main, Aug 6 2024, 19:08:49) [Clang 15.0.0 (clang-1500.0.40.1)]

Package Information
langchain_core: 0.3.44
langchain: 0.3.19
langchain_community: 0.3.18
langsmith: 0.3.13
langchain_experimental: 0.3.4
langchain_ollama: 0.2.3
langchain_text_splitters: 0.3.6
langgraph_sdk: 0.1.57

@qmz qmz changed the title The LangGraph streaming example fails with 'Error in ConsoleCallbackHandler.on_llm_new_token' #30605 The LangGraph streaming example fails with 'Error in ConsoleCallbackHandler.on_llm_new_token' Apr 2, 2025
@vbarda
Copy link
Collaborator

vbarda commented Apr 2, 2025

@qmz the code you're running in your example is incorrect and is not from the how-to guide, specifically this part

async for msg in graph.astream_events(
        {"topic": "cats"},
        stream_mode="messages",
    ):
#        if msg.content:
            print(msg, end="|", flush=True)

you either need to use the example as in the how-to guide, i.e., use .astream() method, or use async for chunk in graph.astream_events({"topic": "cats"}, version="v2"): (although this is not recommended, as .astream(..., stream_mode="messages") is preferred

@vbarda vbarda closed this as completed Apr 2, 2025
@qmz
Copy link
Author

qmz commented Apr 2, 2025

@vbarda tanks for your respond. I started with astream first, but it did not work, so I tired astream_events. The astream(..., stream_mode="messages") also fails with the error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID 7028b4fa-ff1c-418e-b9c3-8b758cac5233.')

 graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
    print("Starting stream example...")
    async for msg in graph.astream(
        {"topic": "cats"},
        stream_mode="messages",
    ):
       if msg.content:
          print(msg.content, end="|", flush=True)

@vbarda
Copy link
Collaborator

vbarda commented Apr 2, 2025

Where is ConsoleCallbackHandler coming from? if you want to use custom callbacks, you need to pass them like this:

async for msg in graph.astream(
        {"topic": "cats"},
       config={"callbacks": [custom_callback]},
       stream_mode="messages",
):

@qmz
Copy link
Author

qmz commented Apr 2, 2025

@vbarda Could you please try the example code? When langchain.debug = True, you can see the error. However, whether langchain.debug is set to False or True, there is no output.

@vbarda
Copy link
Collaborator

vbarda commented Apr 2, 2025

which python version are you using? does this happen only with async code or with sync as well? could it be your LLM service?

i cannot try your example unfortunately since it has custom code (llm_service, etc). i would recommend trying to repro this with ChatOpenAI and see if this still fails

@qmz
Copy link
Author

qmz commented Apr 2, 2025

@vbarda I did a little investigation in the LangGraph source code,and found that the ConsoleCallbackHandler is automatically added by .venv/lib/python3.12/site-packages/langchain_core/callbacks/manager.py."

The python version is Python 3.12.5
The llm_service simply use a class invoke langchain_ollama backend , with LLM model being the localdeepseek-r1:70b

@eyurtsev
Copy link
Collaborator

eyurtsev commented Apr 2, 2025

ConsoleCallback handler isn't added automatically by LangGraph. It's likely added by this code:

from app.services.llm_service import llm_service

Could you add include a self-contained reproduction?

There might be an issue w/ the callback handler in langchain-core that we need to fix, but this is unlikely to be a langgraph issue.

@eyurtsev eyurtsev reopened this Apr 2, 2025
@eyurtsev eyurtsev self-assigned this Apr 2, 2025
@qmz
Copy link
Author

qmz commented Apr 2, 2025

@eyurtsev thanks for your respond. Using ChatOpenAI works fine, but using OllamaLLM fails with a TracerException. The code for using OllamaLLM is as follows. that the Ollama server is deployed locally.

"""
https://langchain-ai.github.io/langgraph/how-tos/streaming-tokens/
"""
import asyncio
import os
import sys
from typing import TypedDict
from langchain_ollama import OllamaLLM
from langgraph.graph import START, StateGraph
import langchain
langchain.debug = True

import getpass
import os

"""
using ChatOpenAI work fine
"""
# from langchain_openai import ChatOpenAI

# joke_model = ChatOpenAI(
#     model="gpt-4o",
#     temperature=0,
#     max_tokens=None,
#     timeout=None,
#     max_retries=2
# )
# poem_model = ChatOpenAI(
#     model="gpt-4o",
#     temperature=0,
#     max_tokens=None,
#     timeout=None,
#     max_retries=2
# )

"""
using OllamaLLM failed with error:
Error in ConsoleCallbackHandler.on_llm_new_token callback: TracerException('No indexed run ID 0c7e5fa0-a835-4c2e-9646-fceedfa80087.')
"""
joke_model = OllamaLLM(
    model="deepseek-r1:70b",
    base_url="http://x.x.x.x:11434",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2
)
poem_model = OllamaLLM(
    model="deepseek-r1:70b",
    base_url="http://x.x.x.x:11434",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2
)

class State(TypedDict):
    topic: str
    joke: str
    poem: str

async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}

async def run_example():
    graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
    print("Starting stream example...")
    async for msg,metadata in graph.astream(
        {"topic": "cats"},
        stream_mode="messages",
    ):
       if msg.content:
          print(msg.content, end="|", flush=True)

if __name__ == "__main__":
    asyncio.run(run_example())

@eyurtsev
Copy link
Collaborator

eyurtsev commented Apr 2, 2025

Got it it might be from langchain.debug I'll take a look tomorrow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants