Skip to content

Conversation

girlsending0
Copy link

@girlsending0 girlsending0 commented Aug 27, 2025

Description:
This PR fixes an issue where stream_usage metadata was not being returned during invoke or stream calls for HuggingFace chat models.
I updated ChatHuggingFace (via ChatHuggingFaceWithUsage) to align with BaseChatOpenAI behavior, ensuring usage information is properly included in streaming outputs.

Issue: N/A (but addresses missing usage metadata in HuggingFace integration).

Dependencies: None

Twitter handle: None

Copy link

vercel bot commented Aug 27, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
langchain Ignored Ignored Preview Aug 31, 2025 7:21am

Copy link

codspeed-hq bot commented Aug 27, 2025

CodSpeed WallTime Performance Report

Merging #32708 will not alter performance

Comparing girlsending0:fix/add_stream_usage (074af3b) with master (fcf7175)

⚠️ Unknown Walltime execution environment detected

Using the Walltime instrument on standard Hosted Runners will lead to inconsistent data.

For the most accurate results, we recommend using CodSpeed Macro Runners: bare-metal machines fine-tuned for performance measurement consistency.

Summary

✅ 13 untouched benchmarks

Copy link

codspeed-hq bot commented Aug 27, 2025

CodSpeed Instrumentation Performance Report

Merging #32708 will create unknown performance changes

Comparing girlsending0:fix/add_stream_usage (c0bdc4b) with master (8670b24)1

Summary

⚠️ No benchmarks were detected in both the base of the PR and the PR.
⏩ 14 skipped2

Footnotes

  1. No successful run was found on master (244c699) during the generation of this report, so 8670b24 was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

  2. 14 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@mdrxy mdrxy changed the title fix(huggingface): add stream_usage support for ChatHuggingFace invoke/stream fix(huggingface): add stream_usage support for ChatHuggingFace invoke/stream Aug 27, 2025
@mdrxy mdrxy added the integration Related to a provider partner package integration label Aug 27, 2025
Copy link
Collaborator

@ccurme ccurme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind sharing a reproducible snippet or adding a test to demonstrate the functionality?

It looks like token usage is already accessible via streaming when using HF Endpoints:

from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint


llm = HuggingFaceEndpoint(
    repo_id="openai/gpt-oss-120b",
    task="conversational",
    provider="fireworks-ai",
)

model = ChatHuggingFace(llm=llm)

full = None
for chunk in model.stream("hello"):
    full = chunk if full is None else full + chunk

full.usage_metadata

@@ -492,6 +492,9 @@ class GetPopulation(BaseModel):
"""Modify the likelihood of specified tokens appearing in the completion."""
streaming: bool = False
"""Whether to stream the results or not."""
stream_usage: bool = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this stream_usage: Optional[bool] = None?

(langchain-openai mistakenly did not do this)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this was a mistake on the OpenAI side. it should have been Optional. The input values in the related class methods below are also defined as Optional. I changed this :) 01f64b4

def _stream(
self,
messages: list[BaseMessage],
stop: Optional[list[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
*,
stream_usage: Optional[bool] = True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we implement on _astream as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I push the commit!

@ccurme ccurme self-assigned this Sep 9, 2025
Copy link

vercel bot commented Sep 16, 2025

Deployment failed with the following error:

The provided GitHub repository does not contain the requested branch or commit reference. Please ensure the repository is not empty.

@girlsending0
Copy link
Author

Would you mind sharing a reproducible snippet or adding a test to demonstrate the functionality?

It looks like token usage is already accessible via streaming when using HF Endpoints:

@ccurme
For the example below, gemma3 does not output token_usage:

from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
# llm = HuggingFaceEndpoint(repo_id="google/gemma-3-27b-it", provider="nebius", streaming=True, model_kwargs={"base_url": "https://router.huggingface.co/v1", "stream_options": {"include_usage": True}})


llm = HuggingFaceEndpoint(
    repo_id="google/gemma-3-27b-it",
    task="conversational",
    provider="nebius",
)

model = ChatHuggingFace(llm=llm)

full = None
for chunk in model.stream("hello"):
    full = chunk if full is None else full + chunk

full.usage_metadata

results:
None

I think this may be an issue with the provider.
In the case of gemma, applying my commit is required for token_usage to be returned correctly.

detail
from typing import Any, Iterator, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.messages import BaseMessage, AIMessageChunk, BaseMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_huggingface import ChatHuggingFace
from langchain_huggingface.chat_models.huggingface import (
    _is_huggingface_endpoint,
    _convert_chunk_to_message_chunk,
)
from langchain_core.callbacks.manager import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from collections.abc import AsyncIterator, Iterator

import os
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "xx"

class ChatHuggingFaceWithUsage(ChatHuggingFace):

    stream_usage: Optional[bool] = None
    """Whether to include usage metadata in streaming output. If True, an additional
    message chunk will be generated during the stream including usage metadata."""

    def _should_stream_usage(
        self, *, stream_usage: Optional[bool] = None, **kwargs: Any
    ) -> bool:
        """Determine whether to include usage metadata in streaming output.

        For backwards compatibility, we check for `stream_options` passed
        explicitly to kwargs or in the model_kwargs and override self.stream_usage.
        """
        stream_usage_sources = [  # order of precedence
            stream_usage,
            kwargs.get("stream_options", {}).get("include_usage"),
            self.model_kwargs.get("stream_options", {}).get("include_usage"),
            self.stream_usage,
        ]
        for source in stream_usage_sources:
            if isinstance(source, bool):
                return source
        return self.stream_usage


    def _stream(
        self,
        messages: list[BaseMessage],
        stop: Optional[list[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        *,
        stream_usage: Optional[bool] = True,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        if _is_huggingface_endpoint(self.llm):
            kwargs["stream"] = True
            stream_usage = self._should_stream_usage(
                stream_usage=stream_usage, **kwargs
            )
            if stream_usage:
                kwargs["stream_options"] = {"include_usage": stream_usage}
            message_dicts, params = self._create_message_dicts(messages, stop)
            params = {**params, **kwargs, "stream": True}

            default_chunk_class: type[BaseMessageChunk] = AIMessageChunk
            for chunk in self.llm.client.chat_completion(
                messages=message_dicts, **params
            ):
                usage = chunk.get("usage")
                if usage:
                    usage_msg = AIMessageChunk(
                        content="",
                        additional_kwargs={},
                        response_metadata={},
                        usage_metadata={
                            "input_tokens": usage.get("prompt_tokens", 0),
                            "output_tokens": usage.get("completion_tokens", 0),
                            "total_tokens": usage.get("total_tokens", 0),
                            "input_token_details": {"audio": 0, "cache_read": 0},
                            "output_token_details": {"audio": 0, "reasoning": 0},
                        },
                    )
                    yield ChatGenerationChunk(message=usage_msg)
                    continue
                
                if len(chunk["choices"]) == 0:
                    continue
                choice = chunk["choices"][0]
                message_chunk = _convert_chunk_to_message_chunk(
                    chunk, default_chunk_class
                )
                generation_info = {}
                if finish_reason := choice.get("finish_reason"):
                    generation_info["finish_reason"] = finish_reason
                    generation_info["model_name"] = self.model_id
                logprobs = choice.get("logprobs")
                if logprobs:
                    generation_info["logprobs"] = logprobs
                default_chunk_class = message_chunk.__class__
                generation_chunk = ChatGenerationChunk(
                    message=message_chunk, generation_info=generation_info or None
                )
                if run_manager:
                    run_manager.on_llm_new_token(
                        generation_chunk.text, chunk=generation_chunk, logprobs=logprobs
                    )
                yield generation_chunk
        else:
            llm_input = self._to_chat_prompt(messages)
            stream_iter = self.llm._stream(
                llm_input, stop=stop, run_manager=run_manager, **kwargs
            )
            for chunk in stream_iter:  # chunk is a GenerationChunk
                chat_chunk = ChatGenerationChunk(
                    message=AIMessageChunk(content=chunk.text),
                    generation_info=chunk.generation_info,
                )
                yield chat_chunk
                
    async def _astream(
        self,
        messages: list[BaseMessage],
        stop: Optional[list[str]] = None,
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
        *,
        stream_usage: Optional[bool] = None,
        **kwargs: Any,
    ) -> AsyncIterator[ChatGenerationChunk]:
        kwargs["stream"] = True
        stream_usage = self._should_stream_usage(
            stream_usage=stream_usage, **kwargs
        )
        if stream_usage:
            kwargs["stream_options"] = {"include_usage": stream_usage}
        message_dicts, params = self._create_message_dicts(messages, stop)
        params = {**params, **kwargs, "stream": True}

        default_chunk_class: type[BaseMessageChunk] = AIMessageChunk

        async for chunk in await self.llm.async_client.chat_completion(
            messages=message_dicts, **params
        ):
            usage = chunk.get("usage")
            if usage:
                usage_msg = AIMessageChunk(
                    content="",
                    additional_kwargs={},
                    response_metadata={},
                    usage_metadata={
                        "input_tokens": usage.get("prompt_tokens", 0),
                        "output_tokens": usage.get("completion_tokens", 0),
                        "total_tokens": usage.get("total_tokens", 0),
                        "input_token_details": {"audio": 0, "cache_read": 0},
                        "output_token_details": {"audio": 0, "reasoning": 0},
                    },
                )
                yield ChatGenerationChunk(message=usage_msg)
                continue

            if len(chunk["choices"]) == 0:
                continue
            choice = chunk["choices"][0]
            message_chunk = _convert_chunk_to_message_chunk(chunk, default_chunk_class)
            generation_info = {}
            if finish_reason := choice.get("finish_reason"):
                generation_info["finish_reason"] = finish_reason
                generation_info["model_name"] = self.model_id
            logprobs = choice.get("logprobs")
            if logprobs:
                generation_info["logprobs"] = logprobs
            default_chunk_class = message_chunk.__class__
            generation_chunk = ChatGenerationChunk(
                message=message_chunk, generation_info=generation_info or None
            )
            if run_manager:
                await run_manager.on_llm_new_token(
                    token=generation_chunk.text,
                    chunk=generation_chunk,
                    logprobs=logprobs,
                )
            yield generation_chunk
from langchain_huggingface import HuggingFaceEndpoint

llm = HuggingFaceEndpoint(repo_id="google/gemma-3-27b-it", provider="nebius", streaming=True, model_kwargs={"base_url": "https://router.huggingface.co/v1"})
gemma = ChatHuggingFace(llm=llm)
gemma_with_usage = ChatHuggingFaceWithUsage(llm=llm)

for chunk in gemma.stream("hi~", stream_options={"include_usage": True}): # None usage_metadata
    print(chunk, end="\n", flush=True)

for chunk in gemma_with_usage.stream("hi~", stream_options={"include_usage": True}):
    print(chunk, end="\n", flush=True)
print() 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integration Related to a provider partner package integration
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants