-
Notifications
You must be signed in to change notification settings - Fork 6.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StreamProcessor
class for handling message streams produced by run_stream
and on_messages_stream
.
#6182
Comments
I played with this idea in a branch and it's working without breaking Console(): aguynamedben@1f69b65 It seems to work well. I would need to keep making sure I don't break the current Console() experience. I want to hook a stream simultaneously (a) WebSockets, and (b) Console output, so I might keep going on this if you think the design is okay. Because the existing Console() supports the return types from Note the branch above is based on tag |
How it's being used in my code: from autogen_agentchat.ui import Console, StreamProcessor
from autogen_agentchat.ui.stream_handlers import smart_printer
#... later in a function...
use_processor = True
if use_processor:
stream = team.run_stream(task=initial_task)
await Console(stream)
else:
processor = StreamProcessor()
processor.add_handler(smart_printer)
stream = team.run_stream(task=initial_task)
processed_stream = processor(stream)
await Console(processed_stream) If this design works, over time we'd probably want to direct people to do something like this: from autogen_agentchat.ui import MessageStreamItem
def my_custom_handler(message: MessageStreamItem) -> None:
send_message_via_websockets(message)
processsor = SteamProcessor()
processor.add_handler(console_logger)
processor.add_handler(my_custom_handler)
stream = processor(team.run_stream(task=initial_task)) Maybe a better design is to allow run_stream to take in an argument that provides the "callbacks" stream = team.run_stream(task=initial_task, handlers=[my_custom_handler, console_logger])
result = await stream.completed() Part of the problem with Console() is that it completely wraps the string. This last approach is more composition/dependency injection, so it might avoid the problem of wrapping. Docs for |
Another idea for an API for this: from autogen_agentchat.ui import console_message_handler, StreamProcessor, StreamMessage
from my_project import send_via_websockets
def websockets_message_handler(message: StreamMessage) -> None:
send_via_websockets(message)
websockets_processor = StreamProcessor(handlers=[websockets_message_handler)
console_processor = StreamProcessor(handlers=[console_message_handler])
result = await team.run_stream(processors=[websockets_processor, console_processor]) The default value of processors for def run_stream(
# ...
processors: List[StreamProcessor] = [console_processor]
): So that when users are learning AutoGen for the first time and call |
I think these are all great ideas! @jackgerrits for comments. Maybe we can consider a design that doesn't break the existing usage cases:
|
Confirmation
Issue body
Consider this design:
We can then rewrite
Console
to subclassStreamProcessor
to provide handlers for all the built-in message types and default handler for unknown message types.Now you can customize
Console
to override handling of specific message types.The text was updated successfully, but these errors were encountered: