Plugin Performance Profiling: Track Wall Time, Memory, and CPU Spikes to Auto-Identify Bottlenecks

Plugging a profiler into a plugin system is more involved than wrapping every call in time.perf_counter(). You need execution context isolation, async stack attribution, and memory snapshot timing — and each of those decisions cascades into the next. This post walks through building an instrumentation layer that collects wall time, CPU time, and memory deltas per plugin, then automatically flags statistical outliers without any manual threshold tuning.

If you're maintaining a plugin-based agent, a tool-calling loop, or any system where third-party callables get dispatched dynamically, this approach drops in with zero changes to plugin code itself.

overall profiling pipeline


The Problem: Naive Timing Breaks in Async Systems

The first thing most developers reach for is a simple decorator:

import time

def timed(fn):
    async def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = await fn(*args, **kwargs)
        print(f"{fn.__name__}: {(time.perf_counter() - start) * 1000:.1f}ms")
        return result
    return wrapper

This works exactly once, on a toy example, in a single-plugin environment. In production you hit three walls immediately.

First, wall time in an asyncio loop conflates I/O wait with actual compute. A plugin that awaits an external API call for 200ms looks identical to one burning 200ms of CPU. Second, memory leaks in one plugin corrupt the baseline for all subsequent measurements. Third, if you want to know which plugin was running when a CPU spike fired, you have no way to correlate that — the event loop is shared, and there's no ambient context.

naive timing failure modes

The approach below addresses all three by building an instrumentation layer at the loader level — not at the plugin level — so plugin authors never need to change their code.


The Fix: Instrumentation Middleware in the Plugin Loader

The Core Measurement Wrapper

The key insight is to push instrumentation into the loader's register call, wrapping each plugin function once at registration time rather than at call time. This means the profiling layer is transparent to both the plugin and the caller.

import time
import tracemalloc
import psutil
import os
from functools import wraps
from dataclasses import dataclass, field
from typing import Callable, Any

@dataclass
class PluginProfile:
    name: str
    wall_time_ms: float = 0.0
    cpu_time_ms: float = 0.0
    memory_delta_kb: float = 0.0
    peak_memory_kb: float = 0.0
    call_count: int = 0
    errors: list = field(default_factory=list)

class PluginProfiler:
    def __init__(self):
        self._profiles: dict[str, PluginProfile] = {}
        self._process = psutil.Process(os.getpid())

    def wrap(self, name: str, fn: Callable) -> Callable:
        if name not in self._profiles:
            self._profiles[name] = PluginProfile(name=name)

        @wraps(fn)
        async def _measured(*args, **kwargs):
            profile = self._profiles[name]
            tracemalloc.start()
            cpu_before = self._process.cpu_times()
            wall_start = time.perf_counter()

            try:
                result = await fn(*args, **kwargs)
                return result
            except Exception as e:
                profile.errors.append(str(e))
                raise
            finally:
                wall_elapsed = (time.perf_counter() - wall_start) * 1000
                cpu_after = self._process.cpu_times()
                _, peak = tracemalloc.get_traced_memory()
                tracemalloc.stop()

                cpu_elapsed = (
                    (cpu_after.user - cpu_before.user) +
                    (cpu_after.system - cpu_before.system)
                ) * 1000

                profile.wall_time_ms += wall_elapsed
                profile.cpu_time_ms += cpu_elapsed
                profile.peak_memory_kb = max(profile.peak_memory_kb, peak / 1024)
                profile.call_count += 1

        return _measured

Notice errors are caught, recorded, and re-raised. A plugin that fails repeatedly shows up in your bottleneck report with an error count — useful signal that's separate from pure timing.

Reading the Wall vs CPU Gap

In an async environment, the divergence between wall_time and cpu_time is one of the most useful signals you have:

Pattern Interpretation
High wall, low CPU I/O wait or external call blocking
High wall, high CPU Genuine compute overload
Low wall, high CPU Tight loop, possibly batched calls
Both low, high error count Failing fast, worth separate investigation

psutil.Process.cpu_times() accumulates across the whole process, so per-plugin isolation is approximate in multi-threaded setups. If you're running plugins in threads, switch to time.thread_time_ns() managed as thread-local state:

import threading

_thread_cpu = threading.local()

def thread_cpu_start():
    _thread_cpu.start = time.thread_time_ns()

def thread_cpu_elapsed_ms() -> float:
    return (time.thread_time_ns() - _thread_cpu.start) / 1e6

Call thread_cpu_start() just before invoking the plugin and thread_cpu_elapsed_ms() in the finally block.

wall-time vs cpu-time signal interpretation

Memory Spike Detection via Snapshot Diff

tracemalloc snapshots let you see exactly which lines and object types are responsible for memory growth — not just the total delta. Take a snapshot before and after each plugin call, then diff them:

def capture_memory_diff(
    before: tracemalloc.Snapshot,
    after: tracemalloc.Snapshot,
    top_n: int = 5
) -> list[dict]:
    stats = after.compare_to(before, 'lineno')
    return [
        {
            "file": str(stat.traceback[0]),
            "size_diff_kb": stat.size_diff / 1024,
            "count_diff": stat.count_diff,
        }
        for stat in stats[:top_n]
    ]

Integrate this into _measured by calling tracemalloc.take_snapshot() immediately before and after await fn(...). A plugin that keeps growing bytes or list allocations without releasing them shows a positive size_diff_kb on every call — classic leak signature.


Section 3: Bottleneck Detection, CPU Monitoring, and Gotchas

Automatic Outlier Flagging

Absolute thresholds ("flag anything over 200ms") break as soon as your workload shifts. Distribution-based detection with z-scores adapts to whatever baseline your plugins establish:

import statistics

def detect_bottlenecks(
    profiles: dict[str, PluginProfile],
    z_threshold: float = 2.0
) -> list[dict]:
    times = [p.wall_time_ms / max(p.call_count, 1) for p in profiles.values()]
    if len(times) < 2:
        return []

    mean = statistics.mean(times)
    stdev = statistics.stdev(times)

    bottlenecks = []
    for name, profile in profiles.items():
        avg_time = profile.wall_time_ms / max(profile.call_count, 1)
z = (avg_time - mean) / stdev if stdev > 0 else 0
        if z > z_threshold:
            bottlenecks.append({
                "plugin": name,
                "avg_wall_ms": round(avg_time, 2),
                "z_score": round(z, 2),
                "peak_mem_kb": round(profile.peak_memory_kb, 1),
                "errors": len(profile.errors),
            })

    return sorted(bottlenecks, key=lambda x: x["z_score"], reverse=True)

A z-score of 2.0 means the plugin is more than two standard deviations slower than the average. Tune downward to 1.5 for tighter alerting, or upward to 3.0 if your plugin latencies are naturally high-variance.

Wiring It Into the Plugin Loader

class PluginLoader:
    def __init__(self, profiler: PluginProfiler):
        self._plugins: dict[str, Callable] = {}
        self._profiler = profiler

    def register(self, name: str, plugin_fn: Callable):
        self._plugins[name] = self._profiler.wrap(name, plugin_fn)

    async def invoke(self, name: str, *args, **kwargs) -> Any:
        if name not in self._plugins:
            raise KeyError(f"Plugin '{name}' not registered")
        return await self._plugins[name](*args, **kwargs)

    def report(self) -> list[dict]:
        bottlenecks = detect_bottlenecks(self._profiler._profiles)
        for b in bottlenecks:
            print(
                f"[BOTTLENECK] {b['plugin']} | "
                f"avg {b['avg_wall_ms']}ms | "
                f"z={b['z_score']} | "
                f"mem={b['peak_mem_kb']}KB"
            )
        return bottlenecks

Call loader.report() at the end of each agent run, or schedule it on a timer. The output looks like:

[BOTTLENECK] pdf-parser | avg 412.3ms | z=3.14 | mem=8240.1KB
[BOTTLENECK] embedding-cache | avg 187.6ms | z=2.31 | mem=512.0KB

Real-Time CPU Spike Monitoring

For long-running agents, a background monitor catches spikes that per-call profiling misses (e.g., a plugin that's fast individually but hammers CPU when called concurrently at scale):

import asyncio
from contextvars import ContextVar

_current_plugin: ContextVar[str] = ContextVar('current_plugin', default='<none>')

async def cpu_spike_monitor(
    interval: float = 1.0,
    spike_pct: float = 80.0
):
    process = psutil.Process(os.getpid())
    while True:
        await asyncio.sleep(interval)
        cpu = process.cpu_percent(interval=None)
        if cpu > spike_pct:
            plugin = _current_plugin.get()
            print(f"[CPU SPIKE] {cpu:.1f}% — active plugin: {plugin}")

In the _measured wrapper, set the context variable before invoking the plugin:

token = _current_plugin.set(name)
try:
    result = await fn(*args, **kwargs)
finally:
    _current_plugin.reset(token)

Now when the monitor fires, it can name the culprit plugin from async context rather than guessing.

real-time spike attribution flow

Production Gotchas

tracemalloc overhead is real. CPython's memory tracer adds roughly 10–30% memory overhead while active. Don't leave it always-on in production. Gate it with an environment variable:

import os

PROFILING_ENABLED = os.getenv("PLUGIN_PROFILING", "0") == "1"

# In _measured:
if PROFILING_ENABLED:
    tracemalloc.start()

psutil CPU times accumulate process-wide. In a threaded plugin system, you'll see CPU credited to whichever plugin happened to run last in the measurement window. Use time.thread_time_ns() per-thread for accurate attribution.

Mac vs Linux differences. On macOS, psutil.cpu_times() may not include iowait in the user-facing breakdown. Linux gives you user, system, idle, iowait, irq. Stick to user + system for portability.

Docker containers. Container CPU quotas affect psutil's cpu_percent() readings. A container capped at 0.5 CPUs will report 100% CPU usage at what would be 50% on bare metal. Normalize against psutil.cpu_count() or the container's quota if you're comparing across environments.

OpenTelemetry export. If your stack already uses distributed tracing, export plugin profiles as OTel spans instead of printing them. Each _measured invocation becomes a span with wall time, CPU, and memory as span attributes — then Jaeger or Grafana Tempo gives you a flame graph view across your entire agent run.

from opentelemetry import trace

tracer = trace.get_tracer("plugin-profiler")

# Inside _measured, replace the finally block with:
with tracer.start_as_current_span(f"plugin.{name}") as span:
    span.set_attribute("plugin.wall_ms", wall_elapsed)
    span.set_attribute("plugin.cpu_ms", cpu_elapsed)
    span.set_attribute("plugin.peak_mem_kb", peak / 1024)

production deployment options


Closing

The takeaway: wall time alone lies in async systems. The signal is the ratio between wall and CPU time, combined with per-call memory deltas and z-score-based outlier detection across the plugin population. Layer the instrumentation at the loader boundary so plugin authors never touch it, gate tracemalloc behind an env flag so production overhead stays bounded, and use ContextVar to attribute CPU spikes to the right plugin at runtime.

Next step worth exploring: persist profiles across agent runs to a time-series store (SQLite works fine at small scale) so you can track whether a specific plugin is degrading over time — not just whether it's slow today.


🐦 Faster updates on X: @baegseungh7061
📚 More in this series: All posts
💌 Subscribe: Follow on X or grab the RSS

댓글