-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync_workflow.exs
63 lines (54 loc) · 1.58 KB
/
async_workflow.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Asynchronous Workflow Example
#
# This example demonstrates how to use the wait primitive to handle
# asynchronous operations in a workflow.
#
# To run: mix run examples/async_workflow.exs
defmodule AsyncWorkflow do
alias AgentForge.{Flow, Signal}
def simulate_async_job(caller) do
# Simulate a background job
spawn(fn ->
# Very short sleep for example
Process.sleep(100)
send(caller, {:job_complete, %{result: "Completed"}})
end)
end
def run do
# Start async job
start_job = fn _signal, state ->
simulate_async_job(self())
{Signal.emit(:job_started, "Starting async job"), state}
end
# Wait and process job completion
wait_process = fn _signal, state ->
receive do
{:job_complete, result} ->
message = "Job completed with result: #{inspect(result.result)}"
{Signal.emit(:notification, message), state}
after
# Longer timeout to ensure we catch the message
2000 ->
{Signal.emit(:error, "Job timed out"), state}
end
end
# Compose workflow
workflow = [
start_job,
wait_process
]
# Execute workflow
signal = Signal.new(:start, %{job_id: "123"})
state = %{}
case Flow.process(workflow, signal, state) do
{:ok, result, final_state} ->
IO.puts("Workflow completed successfully")
IO.inspect(result, label: "Final Result")
IO.inspect(final_state, label: "Final State")
{:error, reason} ->
IO.puts("Workflow failed: #{reason}")
end
end
end
# Run the example
AsyncWorkflow.run()