OpenBox SDK provides governance and observability for Temporal workflows by capturing workflow/activity lifecycle events, HTTP telemetry, database queries, and file operations, then sending them to OpenBox Core for policy evaluation.
Key Features:
- 6 event types (WorkflowStarted, WorkflowCompleted, WorkflowFailed, SignalReceived, ActivityStarted, ActivityCompleted)
- 5-tier verdict system (ALLOW, CONSTRAIN, REQUIRE_APPROVAL, BLOCK, HALT)
- Hook-level governance — per-operation evaluation (HTTP requests, file I/O, database queries, function tracing) with started/completed stages
- HTTP/Database/File I/O instrumentation via OpenTelemetry
- Guardrails: Input/output validation and redaction
- Human-in-the-loop approval with expiration handling
- Zero-code setup via
create_openbox_worker()factory
pip install openbox-temporal-sdk-pythonRequirements:
- Python 3.11+
- Temporal SDK 1.23+ (1.8+ for factory-only usage)
- OpenTelemetry API/SDK 1.38.0+
Use OpenBoxPlugin for drop-in integration with Temporal Workers:
import os
from temporalio.worker import Worker
from openbox.plugin import OpenBoxPlugin
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[
OpenBoxPlugin(
openbox_url=os.getenv("OPENBOX_URL"),
openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)
],
)
await worker.run()The plugin automatically configures governance interceptors, OTel instrumentation,
sandbox passthrough, W3C trace propagation through Temporal headers (via
temporalio.contrib.opentelemetry.TracingInterceptor), and the
send_governance_event activity.
Credentials never leave the plugin. openbox_api_key is captured on the
governance activity instance itself — it does not flow through activity
inputs, so it is never written to workflow history. To opt out of trace
propagation (e.g., if you already wire OpenTelemetryPlugin), pass
enable_trace_propagation=False.
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[
OpenTelemetryPlugin(),
OpenBoxPlugin(openbox_url=..., openbox_api_key=...),
],
)Requires
temporalio >= 1.23.0. For older versions, usecreate_openbox_worker()below.
Use the create_openbox_worker() factory for simple integration:
import os
from openbox import create_openbox_worker
worker = create_openbox_worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
# OpenBox config
openbox_url=os.getenv("OPENBOX_URL"),
openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)
await worker.run()The factory automatically:
- Validates the API key
- Creates span processor
- Sets up OpenTelemetry instrumentation
- Creates governance interceptors (incl. W3C trace propagation)
- Builds the
GovernanceActivitiesinstance with credentials captured onselfand registers itssend_governance_eventmethod — the API key is never passed through activity inputs / workflow history - Returns fully configured Worker
OPENBOX_URL=http://localhost:8086
OPENBOX_API_KEY=obx_test_key_1
OPENBOX_GOVERNANCE_TIMEOUT=30.0
OPENBOX_GOVERNANCE_POLICY=fail_open # or fail_closedworker = create_openbox_worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
# OpenBox config
openbox_url="http://localhost:8086",
openbox_api_key="obx_test_key_1",
governance_timeout=30.0,
governance_policy="fail_open",
# Event filtering
send_start_event=True,
send_activity_start_event=True,
skip_workflow_types={"InternalWorkflow"},
skip_activity_types={"send_governance_event"},
skip_signals={"heartbeat"},
# Database instrumentation
instrument_databases=True,
db_libraries={"psycopg2", "sqlalchemy"}, # None = all available
sqlalchemy_engine=engine, # pass pre-existing engine for query capture
# File I/O instrumentation
instrument_file_io=False, # disabled by default
# Header-based W3C trace propagation (client → workflow → activities).
# Default True. Set False if you already wire OpenTelemetryPlugin or a
# custom propagator.
enable_trace_propagation=True,
# Standard Worker options (all supported)
activity_executor=my_executor,
max_concurrent_activities=10,
)OpenBox Core returns a verdict indicating what action the SDK should take.
| Verdict | Behavior |
|---|---|
ALLOW |
Continue execution normally |
CONSTRAIN |
Log constraints, continue |
REQUIRE_APPROVAL |
Pause, poll for human approval |
BLOCK |
Raise error, stop activity |
HALT |
Raise error, terminate workflow |
v1.0 Backward Compatibility:
"continue"→ALLOW"stop"→HALT"require-approval"→REQUIRE_APPROVAL
| Event | Trigger | Captured Fields |
|---|---|---|
| WorkflowStarted | Workflow begins | workflow_id, run_id, workflow_type, task_queue |
| WorkflowCompleted | Workflow succeeds | workflow_id, run_id, workflow_type |
| WorkflowFailed | Workflow fails | workflow_id, run_id, workflow_type, error |
| SignalReceived | Signal received | workflow_id, signal_name, signal_args |
| ActivityStarted | Activity begins | activity_id, activity_type, activity_input |
| ActivityCompleted | Activity ends | activity_id, activity_type, activity_input, activity_output, spans, status, duration |
OpenBox Core can validate and redact sensitive data before/after activity execution:
# Request
{
"verdict": "allow",
"guardrails_result": {
"input_type": "activity_input",
"redacted_input": {"prompt": "[REDACTED]", "user_id": "123"},
"validation_passed": true,
"reasons": []
}
}
# If validation fails:
{
"validation_passed": false,
"reasons": [
{"type": "pii", "field": "email", "reason": "Contains PII"}
]
}Configure error policy via on_api_error (constants available as hook_governance.FAIL_OPEN / FAIL_CLOSED):
| Policy | Behavior |
|---|---|
fail_open (default) |
If governance API fails, allow workflow to continue |
fail_closed |
If governance API fails, terminate workflow |
httpx(sync + async) - full body capturerequests- full body captureurllib3- full body captureurllib- request body only
Fully supported — any dbapi-compatible library using OTel's CursorTracer.traced_execution():
psycopg2,pymysql,mysql-connector-python, and other dbapi-compliant drivers
Custom hooks (best-effort) — these libraries use non-dbapi instrumentation paths; governance hooks may not work correctly in all scenarios:
asyncpg— wrapt wrapper on Connection methods (governance runs outside OTel span context)pymongo— CommandListener monitoring + wrapt Collection wrappers (dedup via thread-local flag; some internal commands likeendSessionsonly producecompletedstage)redis— native OTelrequest_hook/response_hooksqlalchemy—before/after_cursor_executeevent listeners
SQLAlchemy Note: If your SQLAlchemy engine is created before create_openbox_worker() runs (e.g., at module import time), you must pass it via the sqlalchemy_engine parameter. Without this, SQLAlchemyInstrumentor only patches future create_engine() calls and won't capture queries on pre-existing engines.
from db.engine import engine
worker = create_openbox_worker(
...,
db_libraries={"psycopg2", "sqlalchemy"},
sqlalchemy_engine=engine,
)open(),read(),write(),readline(),readlines()- Skips system paths (
/dev/,/proc/,/sys/,__pycache__)
Every HTTP request, file operation, and database query made during an activity is evaluated by OpenBox Core in real-time at two stages:
| Stage | Trigger | Data Available |
|---|---|---|
started |
Before request is sent | Method, URL, request headers, request body |
completed |
After response received | All of above + response headers, response body, status code |
Per-operation governance evaluates every read()/write()/readline()/readlines()/writelines() call, not just open/close:
| Operation | Stage | Trigger | Data Available |
|---|---|---|---|
open |
started |
Before file is opened | File path, open mode |
read |
started |
Before read executes | File path, mode |
read |
completed |
After read returns | data (content read), bytes_read |
readline |
started |
Before readline executes | File path, mode |
readline |
completed |
After readline returns | data (line read), bytes_read |
readlines |
started |
Before readlines executes | File path, mode |
readlines |
completed |
After readlines returns | data (lines read), bytes_read, lines_count |
write |
started |
Before write executes | File path, mode |
write |
completed |
After write returns | data (content written), bytes_written |
writelines |
started |
Before writelines executes | File path, mode |
writelines |
completed |
After writelines returns | data (lines written), bytes_written, lines_count |
close |
completed |
After file is closed | bytes_read, bytes_written, operations list |
How it works (HTTP):
- OTel httpx instrumentation fires a request hook → SDK sends
startedgovernance evaluation with request data - If verdict is BLOCK/HALT → request is aborted before it leaves the process
- After response arrives → SDK sends
completedgovernance evaluation with full request+response data - If verdict is BLOCK/HALT →
GovernanceBlockedErroris raised, activity fails withGovernanceStop
How it works (File I/O):
- Activity calls
open()→ SDK sendsstartedgovernance evaluation with file path and mode - If verdict is BLOCK/HALT → file is never opened,
GovernanceBlockedErroris raised - Each
read()/write()/readline()/readlines()/writelines()call sendsstarted(before) andcompleted(after) governance evaluations — enabling content-based policy enforcement - After file is closed → SDK sends
completedgovernance with lifecycle summary (total bytes, operations list) - File governance requires
instrument_file_io=True(disabled by default)
A simple open-read-close produces 4 governance evaluations: open(started) → read(started) → read(completed) → close(completed).
Every database operation is evaluated at started (pre-query, can block) and completed (post-query, reports outcome):
| Field | Started | Completed |
|---|---|---|
type |
"db_query" |
"db_query" |
stage |
"started" |
"completed" |
db_system |
postgresql, mysql, mongodb, redis, sqlite | same |
db_name |
Database name | same |
db_operation |
SQL verb or command (SELECT, INSERT, GET, etc.) | same |
db_statement |
Query string or command | same |
server_address |
Host | same |
server_port |
Port | same |
duration_ms |
— | Query duration in ms |
error |
— | Error message or None |
How it works:
- Activity executes a DB query (via any supported library)
- SDK governance hook intercepts before the query → sends
startedevaluation - If verdict is BLOCK/HALT → query is aborted,
GovernanceBlockedErrorraised - Query executes normally → SDK sends
completedevaluation with duration and error status - DB governance is automatic when
instrument_databases=True(default)
Per-library strategy:
| Library | Hook Method | Can Block? | Reliability |
|---|---|---|---|
| psycopg2, pymysql, mysql-connector-python | CursorTracer.traced_execution patch |
Yes | Fully supported |
| asyncpg | wrapt wrapper on Connection methods |
Yes | Best-effort |
| pymongo | CommandListener + wrapt Collection wrappers |
Yes (wrapt only) | Best-effort |
| redis | Native OTel request_hook/response_hook |
Yes | Best-effort |
| sqlalchemy | before/after_cursor_execute events |
Yes | Best-effort |
Note: Libraries marked "Fully supported" use OTel's
CursorTracer, which guarantees governance hooks run inside the OTel span context. Best-effort libraries use custom hooks that may produce inconsistencies (e.g., missing stages for internal commands). Some C extension types (e.g.,psycopg2.extensions.cursor) cannot be patched withwrapt— in those cases governance hooks are silently skipped, but OTel span capture still works normally.
Functions decorated with @traced are automatically governed when hook_governance is configured:
| Stage | Trigger | Data Available |
|---|---|---|
started |
Before function executes | Function name, module, arguments (if capture_args=True) |
completed |
After function returns/raises | All of above + result (if capture_result=True) or error info |
How it works (Function Tracing):
@traceddecorator wrapper starts OTel span- → SDK sends
startedgovernance evaluation with function name and module - If verdict is BLOCK/HALT →
GovernanceBlockedErrorraised, function never executes - Function executes normally
- → SDK sends
completedgovernance evaluation with result or error info - If verdict is BLOCK/HALT →
GovernanceBlockedErrorraised after execution - When
hook_governanceis not configured → zero overhead, no governance calls
Governed span tracking: When hook-level governance is active, the SDK marks HTTP spans as "governed" so the OTel on_end processor skips buffering them — preventing duplicate spans.
See System Architecture for detailed component design.
High-Level Flow:
Workflow/Activity → Interceptors → Span Processor → OpenBox Core API
↓
Returns Verdict
↓
(ALLOW, BLOCK, HALT, etc.)
Hook-Level (per HTTP request):
Activity HTTP Call → OTel Hook → hook_governance → API (started) → Allow/Block
→ Response → hook_governance → API (completed) → Allow/Block
Hook-Level (per file operation):
Activity open() → hook_governance → API (started) → Allow/Block
Activity read/write() → hook_governance → API (started) → Allow/Block
→ hook_governance → API (completed) → Allow/Block
Activity close() → hook_governance → API (completed) → Allow/Block
Hook-Level (per DB query):
Activity DB Call → db_governance_hooks → hook_governance → API (started) → Allow/Block
→ Query executes → hook_governance → API (completed)
Hook-Level (per @traced function):
@traced call → hook_governance → API (started) → Allow/Block
→ Function runs → hook_governance → API (completed)
Module responsibilities:
otel_setup.py— OTel instrumentation, hooks, TracedFile wrapperhook_governance.py— Shared governance evaluator (payload building, API calls, verdict handling)db_governance_hooks.py— Per-library DB governance wrappers (wrapt, OTel hooks, SQLAlchemy events)span_processor.py— Span buffering, activity context trackingactivity_interceptor.py— Activity lifecycle governance events
For manual control, import individual components:
from openbox import (
initialize,
WorkflowSpanProcessor,
GovernanceInterceptor,
GovernanceConfig,
)
from openbox.otel_setup import setup_opentelemetry_for_governance
from openbox.activity_interceptor import ActivityGovernanceInterceptor
from openbox.activities import build_governance_activities
from openbox.hook_governance import FAIL_OPEN, FAIL_CLOSED # error policy constants
# 1. Initialize SDK
initialize(api_url="http://localhost:8086", api_key="obx_test_key_1")
# 2. Create span processor
span_processor = WorkflowSpanProcessor(
ignored_url_prefixes=["http://localhost:8086"]
)
# 3. Setup OTel instrumentation (governance always enabled)
setup_opentelemetry_for_governance(
span_processor,
api_url="http://localhost:8086",
api_key="obx_test_key_1",
sqlalchemy_engine=engine, # optional: instrument pre-existing engine
)
# 4. Create governance config
config = GovernanceConfig(
on_api_error="fail_closed",
api_timeout=30.0,
)
# 5. Create interceptors
workflow_interceptor = GovernanceInterceptor(
api_url="http://localhost:8086",
api_key="obx_test_key_1",
span_processor=span_processor,
config=config,
)
activity_interceptor = ActivityGovernanceInterceptor(
api_url="http://localhost:8086",
api_key="obx_test_key_1",
span_processor=span_processor,
config=config,
)
# 6. Build the governance activity instance (credentials captured on `self`
# so they never flow through workflow history).
governance_activities = build_governance_activities(
api_url="http://localhost:8086",
api_key="obx_test_key_1",
)
# 7. Create worker
from temporalio.worker import Worker
from temporalio.contrib.opentelemetry import TracingInterceptor
worker = Worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity, governance_activities.send_governance_event],
interceptors=[
workflow_interceptor,
activity_interceptor,
TracingInterceptor(), # W3C header propagation for OTel spans
],
)- Project Overview & PDR - Requirements, features, constraints
- System Architecture - Component design, data flows, security
- Codebase Summary - Code structure and component details
- Code Standards - Coding conventions and best practices
- Project Roadmap - Future enhancements and timeline
The SDK includes comprehensive test coverage with 15 test files:
pytest tests/Test files: test_activities.py, test_activity_interceptor.py, test_config.py, test_db_governance_hooks.py, test_file_governance_hooks.py, test_otel_hook_pause.py, test_otel_hook_pause_db.py, test_otel_setup.py, test_plugin.py, test_plugin_integration.py, test_span_processor.py, test_tracing.py, test_types.py, test_worker.py, test_workflow_interceptor.py
MIT License - See LICENSE file for details
- Issues: GitHub Issues
- Documentation: See
./docs/
Version: 1.2.0 | Last Updated: 2026-04-05