name: pyo3-interop description: Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior.
PyO3 Interop Architecture
Overview
duroxide-python bridges Rust's duroxide runtime to Python via PyO3/maturin. The interop has two distinct paths — orchestrations (generator-based, synchronous blocking) and activities (synchronous GIL calls). Getting this wrong causes GIL deadlocks, silent replay corruption, or dropped futures.
File Map
| File | Role |
|---|---|
src/handlers.rs | Core interop — orchestration handler loop, activity invocation, global context maps, select/race/join, activity cancellation |
src/types.rs | ScheduledTask enum — the protocol between Python and Rust |
src/lib.rs | PyO3 module entry point, #[pyfunction] trace functions |
src/runtime.rs | PyRuntime — wraps duroxide::Runtime, global tokio runtime |
src/client.rs | PyClient — wraps duroxide::Client, all methods with py.allow_threads() |
src/provider.rs | PySqliteProvider |
src/pg_provider.rs | PyPostgresProvider |
python/duroxide/__init__.py | Python wrapper: SqliteProvider, PostgresProvider, Client, Runtime, decorators |
python/duroxide/context.py | OrchestrationContext, ActivityContext |
python/duroxide/driver.py | Generator driver: create_generator, next_step, dispose_generator |
⚠️ Critical: GIL Deadlock Problem
This is the most important difference between duroxide-python and duroxide-node. Get this wrong and the process deadlocks.
The Problem
PyO3 holds the GIL when Python calls into Rust #[pymethods]. If that method calls TOKIO_RT.block_on(), it blocks the thread while holding the GIL. Meanwhile, orchestration handlers running on tokio threads need the GIL via Python::with_gil() — deadlock.
Thread A (Python → Rust):
client.wait_for_orchestration()
→ PyO3 holds GIL
→ TOKIO_RT.block_on(async { ... }) ← BLOCKS, holding GIL
Thread B (Tokio → Python):
orchestration handler invoked
→ block_in_place + Python::with_gil() ← BLOCKS, waiting for GIL
The Fix
EVERY method that calls block_on must use py.allow_threads() to release the GIL before blocking:
fn wait_for_orchestration(&self, py: Python<'_>, id: String, timeout: u64) -> PyResult<...> {
py.allow_threads(|| {
TOKIO_RT.block_on(async {
self.client.wait_for_orchestration(&id, timeout).await
.map_err(|e| format!("{e}"))
})
})
.map_err(PyRuntimeError::new_err)
}
This pattern is applied to ALL 20+ methods in client.rs and runtime.rs.
Error Handling Across the Boundary
PyErr is not Send, so you can't return PyResult from inside allow_threads. Pattern:
- Inside
allow_threads: map errors toStringvia.map_err(|e| format!("{e}")) - Outside
allow_threads: mapStringtoPyErrvia.map_err(PyRuntimeError::new_err)
Rules for ANY New Method
- Add
py: Python<'_>parameter to the method signature - Wrap
TOKIO_RT.block_on()inpy.allow_threads(|| { ... }) - Map errors to
Stringinside, toPyErroutside - Never hold the GIL while blocking on tokio
Orchestration Interop (Blocking Generator Loop)
The replay engine calls poll_once() on the handler future. If the future isn't ready in one poll, it's dropped.
Solution: block_in_place + with_gil
fn call_create_blocking(&self, payload: String) -> Result<GeneratorStepResult, String> {
tokio::task::block_in_place(|| {
Python::with_gil(|py| {
let result = self.create_fn.call1(py, (payload,))?;
// parse result...
})
})
}
Orchestration Handler Sequence
Rust (tokio thread) Python (GIL)
─────────────────── ────────────────────
1. invoke(ctx, input)
├─ Store ctx in ORCHESTRATION_CTXS[instance_id]
├─ call_create_blocking(payload) ──────► create_generator(payload)
│ (block_in_place + with_gil) ├─ Create OrchestrationContext
│ ├─ Create generator: fn(ctx, input)
│ ├─ gen.send(None) → first yield
│ └─ Return {"status": "yielded", "task": ...}
│◄────────────────────────────────────────┘
├─ Loop:
│ ├─ execute_task(ctx, task) // Real DurableFuture or replay
│ ├─ call_next_blocking(result) ──────► next_step(result)
│ │ (block_in_place + with_gil) ├─ gen.send(value) or gen.throw(exc)
│ │ └─ Return next task or completion
│ │◄────────────────────────────────────┘
│ └─ If completed/error: break
└─ Remove ctx from ORCHESTRATION_CTXS
Activity Interop (Synchronous GIL Call)
Activities in duroxide-python are synchronous functions (unlike duroxide-node's async activities). They run on tokio threads via block_in_place + with_gil:
Rust Python
──── ──────
invoke(ctx, input)
├─ Generate unique token (act-0, act-1, ...)
├─ Store ctx in ACTIVITY_CTXS[token]
├─ block_in_place + with_gil ─────────────► wrapped_fn(payload)
│ ├─ Parse ctx, create ActivityContext
│ ├─ Call user's function (synchronous)
│ └─ Return JSON result
│◄───────────────────────────────────────────┘
└─ Remove token from ACTIVITY_CTXS
Users can use asyncio.run() inside activities if they need async I/O.
Cross-Thread Tracing
Python callbacks acquire the GIL on tokio threads. Rust contexts live on tokio threads. Global HashMaps bridge the two:
static ACTIVITY_CTXS: LazyLock<Mutex<HashMap<String, ActivityContext>>>
static ORCHESTRATION_CTXS: LazyLock<Mutex<HashMap<String, OrchestrationContext>>>
Python calls PyO3 functions that look up the Rust context:
# In OrchestrationContext (fire-and-forget, no yield)
def trace_info(self, message):
orchestration_trace_log(self.instance_id, "info", str(message))
# In ActivityContext (fire-and-forget)
def trace_info(self, message):
activity_trace_log(self._trace_token, "info", str(message))
Rules for Tracing
- Never expose
is_replayingto Python — RustOrchestrationContext.trace()handles suppression - Always use global maps, not thread-locals — Python runs on a different thread
- Clean up map entries on ALL exit paths — leaked entries cause stale traces
- Use atomic tokens for activities (not instance_id) — multiple activities for the same instance can run concurrently
ScheduledTask Protocol
Python yields plain dicts. Rust deserializes them via serde_json into ScheduledTask enum variants:
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ScheduledTask {
Activity { name: String, input: String },
ActivityWithRetry { name: String, input: String, retry: RetryPolicyConfig },
Timer { delay_ms: u64 },
WaitEvent { name: String },
SubOrchestration { name: String, input: String },
SubOrchestrationWithId { name: String, instance_id: String, input: String },
SubOrchestrationVersioned { name: String, version: String, input: String },
Orchestration { name: String, instance_id: String, input: String },
OrchestrationVersioned { name: String, version: String, instance_id: String, input: String },
NewGuid,
UtcNow,
ContinueAsNew { input: String },
ContinueAsNewVersioned { input: String, version: String },
Join { tasks: Vec<ScheduledTask> },
Select { tasks: Vec<ScheduledTask> },
}
Adding a New ScheduledTask Type
- Add variant to
ScheduledTaskinsrc/types.rswith correctserdeattributes - Add execution branch in
execute_task()insrc/handlers.rs - If it should work in
select/race, add branch inmake_select_future() - If it should work in
join/all, add branch inmake_join_future() - Add Python method to
OrchestrationContextinpython/duroxide/context.py - Add test in
tests/test_e2e.py - Rebuild:
maturin develop
Global Tokio Runtime
static TOKIO_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime")
});
All async operations go through TOKIO_RT.block_on() (with GIL released via py.allow_threads()). No pyo3-async-runtimes needed.
Provider Polymorphism
PyO3 doesn't support trait objects in constructors. Python wrapper detects provider type:
class Runtime:
def __init__(self, provider, options=None):
if getattr(provider, "_type", None) == "postgres":
self._native = PyRuntime.from_postgres(provider._native, options)
else:
self._native = PyRuntime.from_sqlite(provider._native, options)
PyO3 Object Mutability
#[pyclass(get_all)] makes fields readable but NOT writable from Python. Even with set_all, setting a Python dict to an Option<String> field fails with a type mismatch.
Solution: Create pure Python wrapper objects (e.g., OrchestrationResult) instead of mutating PyO3 objects:
class OrchestrationResult:
def __init__(self, status, output=None, error=None):
self.status = status
self.output = output # parsed from JSON
self.error = error
def _parse_status(raw):
output = raw.output
if output is not None:
output = json.loads(output)
return OrchestrationResult(status=raw.status, output=output, error=raw.error)
Common Pitfalls
| Pitfall | What Happens | Fix |
|---|---|---|
Missing py.allow_threads() around block_on | GIL deadlock — process hangs forever | Wrap ALL TOKIO_RT.block_on() calls |
Returning PyErr from inside allow_threads | Compile error — PyErr is not Send | Map to String inside, PyErr outside |
| Thread-local for cross-thread context | Lookup returns None — traces silently fail | Use global HashMap |
Mutating PyO3 #[pyclass] fields from Python | TypeError or silently ignored | Use Python wrapper objects |
cargo build instead of maturin develop | Python imports stale .so — changes don't take effect | Always use maturin develop |
Missing serde(rename_all) on new ScheduledTask variants | Deserialization fails — task type not recognized | Match Python naming convention (camelCase in JSON) |
Build Requirements
# MUST use maturin (not cargo build) for the Python extension
source .venv/bin/activate
maturin develop # Debug build + install
maturin develop --release # Release build + install
maturin build --release # Build wheel for distribution
# cargo build alone produces a .dylib/.so that Python can't find