-
Notifications
You must be signed in to change notification settings - Fork 19k
fix(huggingface): add stream_usage
support for ChatHuggingFace
invoke/stream
#32708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(huggingface): add stream_usage
support for ChatHuggingFace
invoke/stream
#32708
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub. |
CodSpeed WallTime Performance ReportMerging #32708 will not alter performanceComparing
|
CodSpeed Instrumentation Performance ReportMerging #32708 will create unknown performance changesComparing Summary
Footnotes
|
stream_usage
support for ChatHuggingFace
invoke/stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind sharing a reproducible snippet or adding a test to demonstrate the functionality?
It looks like token usage is already accessible via streaming when using HF Endpoints:
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="openai/gpt-oss-120b",
task="conversational",
provider="fireworks-ai",
)
model = ChatHuggingFace(llm=llm)
full = None
for chunk in model.stream("hello"):
full = chunk if full is None else full + chunk
full.usage_metadata
@@ -492,6 +492,9 @@ class GetPopulation(BaseModel): | |||
"""Modify the likelihood of specified tokens appearing in the completion.""" | |||
streaming: bool = False | |||
"""Whether to stream the results or not.""" | |||
stream_usage: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this stream_usage: Optional[bool] = None
?
(langchain-openai mistakenly did not do this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think this was a mistake on the OpenAI side. it should have been Optional. The input values in the related class methods below are also defined as Optional. I changed this :) 01f64b4
def _stream( | ||
self, | ||
messages: list[BaseMessage], | ||
stop: Optional[list[str]] = None, | ||
run_manager: Optional[CallbackManagerForLLMRun] = None, | ||
*, | ||
stream_usage: Optional[bool] = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we implement on _astream
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I push the commit!
Deployment failed with the following error:
|
@ccurme from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
# llm = HuggingFaceEndpoint(repo_id="google/gemma-3-27b-it", provider="nebius", streaming=True, model_kwargs={"base_url": "https://router.huggingface.co/v1", "stream_options": {"include_usage": True}})
llm = HuggingFaceEndpoint(
repo_id="google/gemma-3-27b-it",
task="conversational",
provider="nebius",
)
model = ChatHuggingFace(llm=llm)
full = None
for chunk in model.stream("hello"):
full = chunk if full is None else full + chunk
full.usage_metadata
results:
None I think this may be an issue with the provider. detailfrom typing import Any, Iterator, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.messages import BaseMessage, AIMessageChunk, BaseMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_huggingface import ChatHuggingFace
from langchain_huggingface.chat_models.huggingface import (
_is_huggingface_endpoint,
_convert_chunk_to_message_chunk,
)
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from collections.abc import AsyncIterator, Iterator
import os
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "xx"
class ChatHuggingFaceWithUsage(ChatHuggingFace):
stream_usage: Optional[bool] = None
"""Whether to include usage metadata in streaming output. If True, an additional
message chunk will be generated during the stream including usage metadata."""
def _should_stream_usage(
self, *, stream_usage: Optional[bool] = None, **kwargs: Any
) -> bool:
"""Determine whether to include usage metadata in streaming output.
For backwards compatibility, we check for `stream_options` passed
explicitly to kwargs or in the model_kwargs and override self.stream_usage.
"""
stream_usage_sources = [ # order of precedence
stream_usage,
kwargs.get("stream_options", {}).get("include_usage"),
self.model_kwargs.get("stream_options", {}).get("include_usage"),
self.stream_usage,
]
for source in stream_usage_sources:
if isinstance(source, bool):
return source
return self.stream_usage
def _stream(
self,
messages: list[BaseMessage],
stop: Optional[list[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
*,
stream_usage: Optional[bool] = True,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
if _is_huggingface_endpoint(self.llm):
kwargs["stream"] = True
stream_usage = self._should_stream_usage(
stream_usage=stream_usage, **kwargs
)
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class: type[BaseMessageChunk] = AIMessageChunk
for chunk in self.llm.client.chat_completion(
messages=message_dicts, **params
):
usage = chunk.get("usage")
if usage:
usage_msg = AIMessageChunk(
content="",
additional_kwargs={},
response_metadata={},
usage_metadata={
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"input_token_details": {"audio": 0, "cache_read": 0},
"output_token_details": {"audio": 0, "reasoning": 0},
},
)
yield ChatGenerationChunk(message=usage_msg)
continue
if len(chunk["choices"]) == 0:
continue
choice = chunk["choices"][0]
message_chunk = _convert_chunk_to_message_chunk(
chunk, default_chunk_class
)
generation_info = {}
if finish_reason := choice.get("finish_reason"):
generation_info["finish_reason"] = finish_reason
generation_info["model_name"] = self.model_id
logprobs = choice.get("logprobs")
if logprobs:
generation_info["logprobs"] = logprobs
default_chunk_class = message_chunk.__class__
generation_chunk = ChatGenerationChunk(
message=message_chunk, generation_info=generation_info or None
)
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk, logprobs=logprobs
)
yield generation_chunk
else:
llm_input = self._to_chat_prompt(messages)
stream_iter = self.llm._stream(
llm_input, stop=stop, run_manager=run_manager, **kwargs
)
for chunk in stream_iter: # chunk is a GenerationChunk
chat_chunk = ChatGenerationChunk(
message=AIMessageChunk(content=chunk.text),
generation_info=chunk.generation_info,
)
yield chat_chunk
async def _astream(
self,
messages: list[BaseMessage],
stop: Optional[list[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
*,
stream_usage: Optional[bool] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
kwargs["stream"] = True
stream_usage = self._should_stream_usage(
stream_usage=stream_usage, **kwargs
)
if stream_usage:
kwargs["stream_options"] = {"include_usage": stream_usage}
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class: type[BaseMessageChunk] = AIMessageChunk
async for chunk in await self.llm.async_client.chat_completion(
messages=message_dicts, **params
):
usage = chunk.get("usage")
if usage:
usage_msg = AIMessageChunk(
content="",
additional_kwargs={},
response_metadata={},
usage_metadata={
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"input_token_details": {"audio": 0, "cache_read": 0},
"output_token_details": {"audio": 0, "reasoning": 0},
},
)
yield ChatGenerationChunk(message=usage_msg)
continue
if len(chunk["choices"]) == 0:
continue
choice = chunk["choices"][0]
message_chunk = _convert_chunk_to_message_chunk(chunk, default_chunk_class)
generation_info = {}
if finish_reason := choice.get("finish_reason"):
generation_info["finish_reason"] = finish_reason
generation_info["model_name"] = self.model_id
logprobs = choice.get("logprobs")
if logprobs:
generation_info["logprobs"] = logprobs
default_chunk_class = message_chunk.__class__
generation_chunk = ChatGenerationChunk(
message=message_chunk, generation_info=generation_info or None
)
if run_manager:
await run_manager.on_llm_new_token(
token=generation_chunk.text,
chunk=generation_chunk,
logprobs=logprobs,
)
yield generation_chunk from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(repo_id="google/gemma-3-27b-it", provider="nebius", streaming=True, model_kwargs={"base_url": "https://router.huggingface.co/v1"})
gemma = ChatHuggingFace(llm=llm)
gemma_with_usage = ChatHuggingFaceWithUsage(llm=llm)
for chunk in gemma.stream("hi~", stream_options={"include_usage": True}): # None usage_metadata
print(chunk, end="\n", flush=True)
for chunk in gemma_with_usage.stream("hi~", stream_options={"include_usage": True}):
print(chunk, end="\n", flush=True)
print() |
Description:
This PR fixes an issue where
stream_usage
metadata was not being returned duringinvoke
orstream
calls for HuggingFace chat models.I updated
ChatHuggingFace
(viaChatHuggingFaceWithUsage
) to align withBaseChatOpenAI
behavior, ensuring usage information is properly included in streaming outputs.Issue: N/A (but addresses missing usage metadata in HuggingFace integration).
Dependencies: None
Twitter handle: None