Plugins
Plugins are Observers on the Event Bus. They subscribe to lifecycle events and react to them: reporting results, filtering tests, caching data, or integrating with external systems.
Architecture
sequenceDiagram
participant Runner
participant Bus as Event Bus
participant Reporter as ReporterPlugin
participant Cache as CachePlugin
Runner ->> Bus: emit(TEST_PASS, result)
par Dispatch
Bus ->> Reporter: on_test_pass(result)
Bus ->> Cache: on_test_pass(result)
end
Note right of Bus: The bus doesn't know who's listening.<br/>It broadcasts to all subscribers.
When the runner emits an event, the bus dispatches it to all plugins that
implement the corresponding on_<event> handler. A plugin can implement as many
handlers as it needs.
Plugins don't call the bus directly. They simply define handler methods, and the bus invokes them when relevant events occur.
PluginBase
All plugins extend PluginBase:
from protest.plugin import PluginBase
from protest.entities import TestResult, SessionResult
class MyPlugin(PluginBase):
name = "my-plugin"
description = "Does something useful"
def __init__(self, output_path: str):
self.output_path = output_path
self.results: list[TestResult] = []
def on_test_pass(self, result: TestResult) -> None:
self.results.append(result)
def on_session_complete(self, result: SessionResult) -> None:
with open(self.output_path, "w") as f:
f.write(f"Passed: {len(self.results)} tests")
Handler methods follow the naming convention on_{event_name}. See
Events for the complete list of available events and their payloads.
Sync vs Async Handlers
Handlers can be sync or async. Both avoid blocking the event loop, but they differ in whether the bus waits for the handler to complete.
Sync Handlers
Sync handlers run in a thread pool (not blocking the loop), but the bus awaits their completion before continuing. Use them when:
- The work is fast (< 10ms)
- Order matters (e.g., printing results sequentially)
- You need the handler to finish before the next one starts
Async Handlers
async def on_test_pass(self, result: TestResult) -> None:
await self.http_client.post("/webhook", json=result.to_dict())
Async handlers are fire-and-forget: the bus schedules them and moves on immediately. They run concurrently with everything else. Use them when:
- The work is slow (network I/O, database queries)
- You don't need to block the emission pipeline
- Throughput matters more than ordering
The bus tracks pending async handlers and waits for them before SESSION_COMPLETE.
Comparison
| Aspect | Sync | Async |
|---|---|---|
| Blocks event loop? | No (threadpool) | No |
| Blocks event emission? | Yes (awaited) | No (fire-and-forget) |
| Execution order | Sequential per event | Concurrent |
| Best for | Fast ops, console output | Network I/O, slow work |
Plugin Activation (Factory Pattern)
Plugins can integrate with the CLI using a Factory pattern. The activate
class method decides whether to instantiate the plugin based on CLI arguments.
from typing import Self
from protest.plugin import PluginBase, PluginContext
class CTRFPlugin(PluginBase):
name = "ctrf"
description = "CTRF JSON reporter"
def __init__(self, output_path: str):
self.output_path = output_path
@classmethod
def add_cli_options(cls, parser) -> None:
"""Register CLI arguments. Called during CLI setup."""
group = parser.add_argument_group("CTRF Reporter")
group.add_argument(
"--ctrf-output",
metavar="PATH",
help="Write CTRF JSON report to PATH",
)
@classmethod
def activate(cls, ctx: PluginContext) -> Self | None:
"""Factory method. Return instance or None to skip activation."""
output = ctx.get("ctrf_output")
if not output:
return None # User didn't request this plugin
return cls(output_path=output)
PluginContext
PluginContext wraps CLI arguments (from argparse) and programmatic config into
a unified dict-like interface:
@classmethod
def activate(cls, ctx: PluginContext) -> Self | None:
# Get with default
verbose = ctx.get("verbose", False)
# Check presence
if "output" not in ctx:
return None
return cls(output=ctx.get("output"), verbose=verbose)
The string keys correspond to argparse destination names (underscored, not
hyphenated: --my-option becomes my_option).
Collection Filtering
on_collection_finish is special: it's a pipeline handler, not a notification.
It receives the collected tests and can filter or reorder them:
def on_collection_finish(self, items: list[TestItem]) -> list[TestItem]:
# Keep only tests tagged "fast"
return [item for item in items if "fast" in item.tags]
Return None or the original list to pass through unchanged. Multiple plugins
can filter; they chain in registration order.
Setup Hook
setup() is called when the plugin is registered, before any events fire:
def setup(self, session: ProTestSession) -> None:
# Access session configuration
self.concurrency = session.concurrency
# Access shared cache (for cross-plugin data)
self.cache = session.cache
Registering Plugins
Programmatic
Via CLI
Plugins with add_cli_options are auto-discovered. Their options appear in
--help, and activate() is called with the parsed arguments.
Best Practices
Do
- Use
on_session_completefor final reports (guarantees all async work is done) - Keep sync handlers fast (< 10ms)
- Use async for any network I/O
- Return
Nonefromon_collection_finishif not filtering
Don't
- Don't block in sync handlers (no
requests.get(), use async) - Don't assume handler order between plugins
- Don't raise exceptions to control flow
- Don't modify shared mutable state across
awaitpoints without synchronization
Shared Cache Storage
Plugins can share data via session.cache, a CacheStorage instance. This enables cross-plugin communication without direct coupling.
Accessing the Cache
from protest.plugin import PluginBase
from protest.entities import TestResult
class MyPlugin(PluginBase):
name = "my-plugin"
def setup(self, session) -> None:
# Read previous run data
durations = session.cache.get_durations()
failed = session.cache.get_failed_node_ids()
def on_test_pass(self, result: TestResult) -> None:
# Record result
session.cache.set_result(result.node_id, "passed", result.duration)
def on_session_end(self, result) -> None:
# Persist to disk
session.cache.save()
CacheStorage API
| Method | Returns | Description |
|---|---|---|
load() |
None |
Load cache from .protest/cache.json |
save() |
None |
Persist cache to disk |
clear() |
None |
Delete cache file |
get_result(node_id) |
TestCacheEntry \| None |
Get cached result for a test |
set_result(node_id, status, duration) |
None |
Record a test result |
get_results() |
dict[str, TestCacheEntry] |
All cached results |
get_durations() |
dict[str, float] |
Test durations by node_id |
get_failed_node_ids() |
set[str] |
Node IDs of failed tests |
get_passed_node_ids() |
set[str] |
Node IDs of passed tests |
Example: Duration-Based Ordering
class FastFirstPlugin(PluginBase):
"""Run fast tests first based on previous durations."""
name = "fast-first"
def setup(self, session) -> None:
session.cache.load()
self.durations = session.cache.get_durations()
def on_collection_finish(self, items: list[TestItem]) -> list[TestItem]:
def sort_key(item):
return self.durations.get(item.node_id, float("inf"))
return sorted(items, key=sort_key)
See Also
- Event Bus - How the bus dispatches events
- Events - Complete event reference with payloads
- Architecture - Overall system design