
If you're running Claude Agent SDK with full streaming enabled, you've probably hit the wall I hit: everything looks fine at low concurrency, then you scale up sessions and your consumer starts dropping tokens or crashing with OOM errors. This post covers the back-pressure pattern that fixed it for me — tested on a 4-node Mac Mini cluster running 8–12 concurrent sessions.
The Problem: Fast Producers, Slow Consumers
The assumption that "faster streaming is always better" breaks down the moment your consumer has any real work to do.
The Claude Agent SDK pushes tokens at generation speed by default. That's great when your consumer is a simple print(). It's a disaster when your consumer writes to a database, sends a webhook, or re-renders a UI component — all of which carry non-trivial latency.
Here's the math I measured on an M2 Pro: a 50ms processing delay per tokencauses roughly1,200 tokens lost per minute under normal generation load. At 8+ concurrent sessions, that compounds fast.
The root cause is the push model. The API shoves tokens into your process as fast as it generates them. Your consumer has no mechanism to say "slow down." The queue grows without bound and eventually your process dies.
Here's what a naive streaming loop looks like:
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def naive_stream(prompt: str):
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for event in stream:
if event.type == "content_block_delta":
await process_token(event.delta.text) # if this is slow, you're in trouble
This works fine for a single session. At 12 concurrent sessions, it was consuming 220 MB per session on my cluster — and throwing buffer overflow errors under peak load.
The Fix: Pull, Don't Push
Back-pressure flips the model. Instead of the producer deciding when to send, the consumer signals readiness before the next chunk is delivered.
The mechanism in Python's asyncio is dead simple: asyncio.Queue(maxsize=N). When the queue is full, queue.put() blocks automatically. The producer literally cannot push another token until the consumer has freed a slot. No polling, no custom throttle logic — the blocking is built into the primitive.
Here's the full implementation:
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def producer(queue: asyncio.Queue, stream):
async for event in stream:
if event.type == "content_block_delta":
await queue.put(event.delta.text) # blocks here if queue is full
await queue.put(None) # sentinel: signals end of stream
async def consumer(queue: asyncio.Queue):
while True:
token = await queue.get() # waits until something is available
if token is None:
break
await process_token(token)
queue.task_done()
async def run_with_backpressure(prompt: str):
queue = asyncio.Queue(maxsize=16) # 16 tokens max in flight at once
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
await asyncio.gather(
producer(queue, stream),
consumer(queue)
)
The None sentinel at the end of producer() is important — without it, consumer() never knows when to stop waiting and the coroutine hangs indefinitely.
Verification: run this with asyncio.Queue(maxsize=2) and add a print statement before each queue.put(). You'll see the producer pause exactly when the queue hits capacity. That pause is the back-pressure working.
Before/After: Real Numbers from the Cluster
These are measurements from 12 concurrent sessions on the 4-node Mac Mini cluster (M2 Pro nodes):
| Metric | Before | After | Change |
|---|---|---|---|
| Memory per session | 220 MB | 88 MB | −60% |
| P99 processing latency | 3,400 ms | 1,200 ms | −65% |
| Buffer overflow errors | frequent | zero | eliminated |
| Token loss at 50ms delay | ~1,200/min | 0 | eliminated |
The maxsize=16 value isn't arbitrary — I tuned it by profiling my specific workload. The right number depends on your consumer's average processing time and your acceptable memory ceiling. If your consumer is slower, go lower. If it's near-instant, you can afford higher without risk.
Variations and Gotchas
Adding Session-Level Concurrency Control
Back-pressure controls the token flow within a single session. For multi-session environments, you also need a ceiling on how many sessions run in parallel. That's what asyncio.Semaphore is for:
semaphore = asyncio.Semaphore(4) # max 4 concurrent sessions
async def run_session(prompt: str):
async with semaphore: # waits for a slot before starting
await run_with_backpressure(prompt)
async def run_all(prompts: list[str]):
await asyncio.gather(*[run_session(p) for p in prompts])
I set Semaphore(4) roughly at half my physical core count. Going higher didn't improve throughput — it just increased memory pressure. Going lower left cores idle. Your number will differ.
Mixed Environments: Claude + Ollama
I'm running Claude Agent SDK as the top-level orchestrator, with some tasks handed off to local Ollama models. The gotcha here: don't share a single Semaphore across different model backends. Claude API calls and Ollama calls have very different latency profiles.
What worked for me was separate Semaphores per backend, with the same queue-based back-pressure pattern on each:
claude_semaphore = asyncio.Semaphore(4)
ollama_semaphore = asyncio.Semaphore(6) # local, lower latency, can go higher
The consumer-driven architecture means the pattern composes cleanly — each sub-model just gets its own queue.
The maxsize Calibration Process
Don't guess. Measure. Here's the quick calibration loop I used:
import time
async def process_token_timed(token: str):
start = time.perf_counter()
await process_token(token)
elapsed_ms = (time.perf_counter() - start) * 1000
# log or accumulate elapsed_ms
# Rule of thumb after profiling:
# maxsize = math.ceil(target_buffer_ms / avg_consumer_ms)
# e.g., 500ms buffer / 30ms avg consumer = maxsize ~17
Run 50 tokens through your actual consumer, get the average processing time in milliseconds, then decide how much buffering you can afford before you'd rather let the producer block.
Mac vs. Linux vs. Docker
On macOS, asyncio uses kqueue under the hood, which handles the semaphore blocking cleanly. On Linux (which your Docker containers use), it's epoll. Behavior is consistent — but on Docker, watch out for the default memory limits. A maxsize=16 queue that's fine on bare metal can still cause issues if your container is capped at 256 MB and you're running 8 sessions.
The memory-per-session numbers I quoted (88 MB) are for bare metal. In a memory-constrained container, I'd drop maxsize to 8 and Semaphore down by one notch.
Closing
The instinct to maximize streaming throughput is wrong. The right goal is matching the flow rate to what your consumer can safely process. asyncio.Queue(maxsize=N) gives you that control with zero extra dependencies — just a bounded buffer that blocks the producer when the consumer needs to breathe.
Once your architecture is consumer-driven at the token level and session level (via Semaphore), scaling from 4 sessions to 40 sessions doesn't blow up memory — it just queues. That linear memory behavior is what makes the cluster manageable.
Next step for me: exposing maxsize as a runtime-tunable config so the cluster can adjust back-pressure under load without redeployment.
🐦 Faster updates on X: @baegseungh7061
📚 More in this series: Code Advanced
💌 Subscribe: Follow on X or grab the RSS
댓글
댓글 쓰기