Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamProcessor class for handling message streams produced by run_stream and on_messages_stream. #6182

Open
1 task done
ekzhu opened this issue Apr 2, 2025 · 4 comments
Labels
help wanted Extra attention is needed proj-agentchat
Milestone

Comments

@ekzhu
Copy link
Collaborator

ekzhu commented Apr 2, 2025

Confirmation

  • I confirm that I am a maintainer and so can use this template. If I am not, I understand this issue will be closed and I will be asked to use a different template.

Issue body

Consider this design:

processor = StreamProcessor()
processor.add_handler(TextMessage, ...) # register a handler for `TextMessage` type.
processor.add_handler(HandoffMessage, ...) # register ad handler for the `HandoffMessage` type

stream = team.run_stream()
result = processor(stream)

We can then rewrite Console to subclass StreamProcessor to provide handlers for all the built-in message types and default handler for unknown message types.

Now you can customize Console to override handling of specific message types.

@ekzhu ekzhu added help wanted Extra attention is needed proj-agentchat labels Apr 2, 2025
@ekzhu ekzhu added this to the 0.4.x-python milestone Apr 2, 2025
@aguynamedben
Copy link

aguynamedben commented Apr 3, 2025

I played with this idea in a branch and it's working without breaking Console(): aguynamedben@1f69b65

It seems to work well. I would need to keep making sure I don't break the current Console() experience. I want to hook a stream simultaneously (a) WebSockets, and (b) Console output, so I might keep going on this if you think the design is okay.

Because the existing Console() supports the return types from autogen_agentchat.base.TaskRunner.run_stream and autogen_agentchat.base.ChatAgent.on_messages_stream, I made a type alias for that type called MessageStreamItem/MessageStream. I'm not sure what you think about that. I can't tell if that's a bad code smell based on the underlying functions returning similar-but-different items, or if it's okay. I only personally care about run_stream and I'm not sure about ChatAgent, but giving that combined type a name makes it "a thing" and make it easier to reason about in my opinion.

Note the branch above is based on tag python-v0.4.9.2.

@aguynamedben
Copy link

aguynamedben commented Apr 3, 2025

How it's being used in my code:

from autogen_agentchat.ui import Console, StreamProcessor
from autogen_agentchat.ui.stream_handlers import smart_printer

#... later in a function...

use_processor = True
if use_processor:
    stream = team.run_stream(task=initial_task)
    await Console(stream)
else:
    processor = StreamProcessor()
    processor.add_handler(smart_printer)
    stream = team.run_stream(task=initial_task)
    processed_stream = processor(stream)
    await Console(processed_stream)

If this design works, over time we'd probably want to direct people to do something like this:

from autogen_agentchat.ui import MessageStreamItem

def my_custom_handler(message: MessageStreamItem) -> None:
    send_message_via_websockets(message)

processsor = SteamProcessor()
processor.add_handler(console_logger)
processor.add_handler(my_custom_handler)
stream = processor(team.run_stream(task=initial_task))

Maybe a better design is to allow run_stream to take in an argument that provides the "callbacks"

stream = team.run_stream(task=initial_task, handlers=[my_custom_handler, console_logger])
result = await stream.completed()

Part of the problem with Console() is that it completely wraps the string. This last approach is more composition/dependency injection, so it might avoid the problem of wrapping.

Docs for handlers would say something like "if you wish to handle each stream message to perform a side effect (i.e. sending each message via WebSockets, custom logging, etc.) you can provide a list of handler functions. As messages arrive on the stream, the handlers will be called once with each message. <something about async vs. sync, async handlers being executing handers in sub-thread, etc.>"

@aguynamedben
Copy link

aguynamedben commented Apr 3, 2025

Another idea for an API for this:

from autogen_agentchat.ui import console_message_handler, StreamProcessor, StreamMessage
from my_project import send_via_websockets

def websockets_message_handler(message: StreamMessage) -> None:
    send_via_websockets(message)

websockets_processor = StreamProcessor(handlers=[websockets_message_handler)
console_processor = StreamProcessor(handlers=[console_message_handler])

result = await team.run_stream(processors=[websockets_processor, console_processor])

The default value of processors for run_steam() could be:

def run_stream(
    # ...
    processors: List[StreamProcessor] = [console_processor]
):

So that when users are learning AutoGen for the first time and call team.run_stream() the console output "just works" by default without having to wrap the stream in Console(). The only thing I'm not sure about with this API is the handling of user_input.

@ekzhu
Copy link
Collaborator Author

ekzhu commented Apr 4, 2025

I think these are all great ideas! @jackgerrits for comments.

Maybe we can consider a design that doesn't break the existing usage cases:

  1. Use Console to consume the messages -- Console can be made into a class if needed.
  2. Use async for message in team.run_stream(...) to consume messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed proj-agentchat
Projects
None yet
Development

No branches or pull requests

2 participants