
When you have more than two or three agents that depend on each other's outputs, "just call them in order" stops working. A report agent that needs collected data, a notification agent that needs both a risk score and a report — that's not a sequence, that's a dependency graph. This post walks through how I enforced execution order in a portfolio evaluation system using a Python DAG + topological sort, cutting total runtime from 22 minutes to 9.
The Problem: Order Breaks When Dependencies Fan Out
The naive approach is to call agents sequentially, top to bottom:
run_collect()
run_risk()
run_report()
run_notify()
This works until it doesn't. In my portfolio evaluation system running on n8n 2.8.4 (Mac Mini), I had six agents wired up in a flat sequence. Total wall-clock time: 22 minutes 3 seconds. Every agent waited for the one before it, even when there was no real dependency between them.
The actual dependency structure looked like this:
riskandreportboth needcollectto finish — but they don't need each othernotifyneeds bothriskandreportto be done before it fires
In a flat sequence, report waits for risk even though it has no reason to. That's dead time.
Section 1: Declaring the Dependency Graph
The first thing I wanted was a structure where the execution order was derived from the dependency declarations — not hard-coded. Here's the format I settled on:
async def run_collect(): ...
async def run_risk(): ...
async def run_report(): ...
async def run_notify(): ...
tasks = {
'collect': {'deps': [], 'fn': run_collect},
'risk': {'deps': ['collect'], 'fn': run_risk},
'report': {'deps': ['collect'], 'fn': run_report},
'notify': {'deps': ['risk', 'report'], 'fn': run_notify},
}
Each key is an agent name. deps lists what must complete before this agent runs. fn is the coroutine to execute. Nothing else. When I add a new agent later, I drop it into this dict with its deps, and the rest handles itself.
What I like about this: the shape of the dict is the documentation. You can read it and immediately know the dependency structure without tracing any call stacks.
Section 2: Topological Sort to Derive Execution Order
With the dependency map declared, I need to compute the execution order automatically. That's what topological sort does — it produces a linear ordering of nodes such that every node comes after all its dependencies.
from collections import deque
def topo_sort(tasks: dict) -> list[str]:
in_degree = {t: len(v['deps']) for t, v in tasks.items()}
queue = deque([t for t, d in in_degree.items() if d == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for t, v in tasks.items():
if node in v['deps']:
in_degree[t] -= 1
if in_degree[t] == 0:
queue.append(t)
if len(order) != len(tasks):
raise ValueError(f"Circular dependency detected — cannot execute")
return order
For the four-agent example above, this returns ['collect', 'risk', 'report', 'notify']. If you had a cycle — say risk depending on notify — it raises ValueError immediately instead of hanging forever with two agents waiting on each other.
The cycle check is the part I'm most glad I built in. Without it, a misconfigured dependency silently deadlocks everything.
Section 3: Parallel Execution with asyncio
Topological sort gives you a flat list, but risk and report are at the same dependency level — they should run concurrently. I resolve execution levels separately:
def resolve_levels(tasks: dict) -> list[list[str]]:
remaining = set(tasks.keys())
completed = set()
levels = []
while remaining:
# find all tasks whose deps are satisfied
ready = [
t for t in remaining
if all(dep in completed for dep in tasks[t]['deps'])
]
if not ready:
raise ValueError("Circular dependency — no progress possible")
levels.append(ready)
completed.update(ready)
remaining -= set(ready)
return levels
Then execute level by level, running everything in the same level concurrently:
import asyncio
async def run_dag(tasks: dict):
levels = resolve_levels(tasks)
results = {}
for level in levels:
level_tasks = [tasks[t]['fn'] for t in level]
level_results = await asyncio.gather(*level_tasks)
for name, result in zip(level, level_results):
results[name] = result
return results
asyncio.gather fires all tasks in a level simultaneously and waits for all of them before moving to the next level. notify doesn't start until both risk and report futures have resolved.
Section 4: The Race Condition I Hit — and the Fix
When I first ran risk and report in parallel, both tried to write intermediate state to the same file on disk. The file got written twice in overlapping fashion. Two corrupted outputs, one confused notification agent downstream.
The fix was architectural, not a lock or a mutex:
# Before (broken): agents write to shared file
async def run_risk():
data = load_from_file('collected_data.json')
result = compute_risk(data)
write_to_file('collected_data.json', result) # ← race condition
# After (fixed): pass data in memory, write once at the end
async def run_collect():
data = fetch_portfolio_data()
return data # return, don't write
async def run_risk(data):
return compute_risk(data)
async def run_report(data):
return generate_report(data)
async def run_notify(risk_result, report_result):
final = merge(risk_result, report_result)
write_to_file('output.json', final) # single writer
The rule I follow now: only one agent writes to any given resource, and it's always the downstream consumer. If two parallel agents need the same input, the upstream agent returns a value in memory — not a file path.
Measured Results
| Mode | Total runtime |
|---|---|
| Sequential (flat) | 22 min 3 sec |
| DAG + parallel levels | 9 min 11 sec |
| Reduction | ~59% |
Environment: Mac Mini (n8n 2.8.4), 6 agents total. The two parallel agents in level 1 (risk and report) each took about 6–7 minutes individually. Running them concurrently recovered almost all of that overlap.
Variations and Gotchas
Fan-out scales better than fan-in. If you add more agents that only depend on collect, the cost stays flat — they all run in level 1. The gains compound as the graph grows wider.
Don't use this for agents with external rate limits. If risk and report both hammer the same third-party API at the same time, you'll hit rate limits faster. Add a semaphore or stagger the launch if that's your situation:
sem = asyncio.Semaphore(2)
async def rate_limited(fn):
async with sem:
return await fn()
On Docker/Linux with n8n, asyncio event loops behave the same. The DAG code above is pure Python — no platform-specific behavior. What changes is how you pass the results dict around if you're running this inside a Code node vs. a standalone Python service.
Visualize your graph before running. I use a quick networkx dump to catch obvious mistakes:
import networkx as nx
def validate_dag(tasks):
G = nx.DiGraph()
for name, v in tasks.items():
for dep in v['deps']:
G.add_edge(dep, name)
if not nx.is_directed_acyclic_graph(G):
raise ValueError("Not a DAG — check your dependency definitions")
return list(nx.topological_sort(G))
Run this once at startup. If it raises, fix the deps before a single agent fires.
Closing
Once you have more than three agents with non-trivial dependencies, execution order is too complex to track mentally. Declare the graph, sort it topologically, run each level in parallel — and ownership of who writes what becomes explicit by design. The 59% time reduction is real, but the bigger win is that the system is now correct by construction rather than by luck.
Next step for me: making the DAG definition dynamic — agents that declare their own deps at registration time rather than a static dict at startup.
🐦 Faster updates on X: @baegseungh7061
📚 More in this series: Code Advanced
💌 Subscribe: Follow on X or grab the RSS
댓글
댓글 쓰기