A lightweight, embedded Python orchestration engine that replaces heavy BPMN servers through Code-First workflow definitions and enterprise-grade transactional guarantees.
Workflows as Python classes, not XML
Pessimistic locking guarantees
Live Mermaid.js diagrams
Distributed transaction safety
The Q Workflow Engine (QWE) is a lightweight, embedded Python orchestration engine designed to replace heavy BPMN servers (Camunda, ProcessMaker, LUCIDCharts) in Python ecosystems.
NOTE: QWE is not a BPMN engine. It is a state machine orchestrator.
It combines Code-First workflow definitions with enterprise-grade transactional guarantees, eliminating the operational overhead of standalone workflow servers while preserving production reliability.
Workflows defined as Python classes with Git versioning, code review, and unit testing capabilities that XML configurations cannot match.
Atomic state changes with pessimistic locking ensure consistent entity states and audit trails.
Orchestration-based distributed transactions with compensation for failure recovery.
Workflows are defined as Python classes inheriting from WorkflowMachine, not XML configurations. This enables version control through Git, code review via pull requests, unit testing with standard frameworks, and complex conditional logic through native Python methods.
class OrderWorkflow(WorkflowMachine):
states = ["draft", "submitted", "approved", "shipped"]
transitions = [...]
permissions = {...}
QWE operates as a library within the application process, not a separate service. This eliminates network latency, simplifies deployment topology, and ensures workflow state transitions participate in the same database transactions as application data.
QWE explicitly targets replacement scenarios where organizations need workflow capabilities without BPMN's infrastructure and cognitive overhead. The specification identifies Camunda, ProcessMaker, and LUCIDCharts as representative targets, offering superior developer experience and operational characteristics.
Every state transition follows a strict atomic sequence with pessimistic locking via SELECT FOR UPDATE, ensuring no partial states or race conditions.
Saga pattern implementation with orchestration-based coordination, compensation on failure, and retry with exponential backoff.
JSON-based rules stored in database enable product managers to tune thresholds without code deployments, while workflow structure remains version-controlled.
The WorkflowVisualizer generates runtime-aware diagrams that reflect actual execution status—not static documentation that becomes obsolete immediately after deployment.
The visualizer combines WorkflowMachine definitions (complete transition graph) with WorkflowAudit history (actual execution path) to classify each state temporally.
stateDiagram-v2
classDef past fill:#e3f2fd,stroke:#1976d2
classDef current fill:#fff9c4,stroke:#f44336,stroke-width:3px
classDef future fill:#ffffff,stroke:#9e9e9e,stroke-dasharray:5
classDef error fill:#ffcdd2,stroke:#c62828
Draft:::past --> Submitted:::past : submit
Submitted:::past --> PendingApproval:::current : request_approval
PendingApproval:::current --> Approved:::future : approve
PendingApproval:::current --> Rejected:::future : reject
PendingApproval:::current --> Failed:::error : [rule_violation]
When state_metadata includes lane keys, the visualizer generates swim-lane diagrams using Mermaid's block-beta format, grouping states by organizational responsibility.
state_metadata = {
"draft": {"lane": "customer", "type": "user"},
"submitted": {"lane": "operations", "type": "user"},
"pending_payment": {"lane": "system", "type": "service"},
"paid": {"lane": "warehouse", "type": "user"},
"shipped": {"lane": "warehouse", "type": "service"},
"cancelled": {"lane": "system", "type": "service"},
}
Terminal states (shipped, cancelled) rendered with double borders to indicate process endpoints.
If state_metadata is absent, visualizer gracefully degrades to flat stateDiagram-v2 format.
QWE implements strict separation of concerns across four layers, enabling independent evolution, testing, and scaling of each architectural component.
Handles HTTP requests, authentication, and rate limiting using Redis token-bucket algorithm.
Pure Python logic managing state machines and business rules with no direct infrastructure dependencies.
Manages ACID transactions with pessimistic locking and immutable audit logging.
Handles timers and sagas via pluggable Dispatcher protocol with Celery/Redis reference implementation.
class Dispatcher(Protocol):
def dispatch_saga(self, saga_id: UUID) -> None: ...
def schedule_timer(self, timer_id: UUID, eta: datetime) -> str: ...
def revoke_task(self, task_id: str) -> None: ...
dispatch_saga: Initiates saga execution after database commit
schedule_timer: Schedules future execution, returns task_id for cancellation
revoke_task: Cancels previously scheduled task (best-effort)
The WorkflowMachine base class enables complete workflow specification through Python class attributes and methods, providing a declarative yet programmable interface for defining entity lifecycles.
class OrderWorkflow(WorkflowMachine):
states = ["draft", "submitted",
"pending_payment", "paid",
"shipped", "cancelled"]
terminal_states = ["shipped", "cancelled"]
transitions = [
{"trigger": "submit",
"source": "draft",
"dest": "submitted"},
{"trigger": "pay",
"source": "submitted",
"dest": "pending_payment"},
{"trigger": "cancel",
"source": ["draft", "submitted"],
"dest": "cancelled"},
]
initial_state = "draft"
permissions = {
"submit": {"roles": ["employee"]},
"pay": {"roles": ["customer"]},
"cancel": {"roles": ["admin"]},
}
task_config = {
"submitted": {
"candidate_groups": ["order-reviewers"],
"due_date_offset": 86400, # 24h
"escalation_trigger": "escalate",
},
}
Role-based access control per trigger with pluggable AuthorizationProvider integration.
Automatic task creation with candidate groups, deadlines, and escalation triggers.
on_enter_
sagas = {
"pay": [
{
"handler": "payment.charge_card",
"compensation": "payment.refund_card",
"max_retries": 3,
},
{
"handler": "notification.send_receipt",
"compensation": None,
"max_retries": 1,
},
],
}
Per-transition saga definitions with compensation handlers and retry policies.
timers = {
"pending_payment": {
"trigger": "cancel",
"delay_seconds": 86400, # 24h
"behavior": "CANCEL", # CANCEL/PROCEED/SKIP
},
}
Per-state timer definitions with configurable state mismatch behavior.
Every state change follows a strict atomic sequence of twelve steps, ensuring fail-fast behavior for validation checks while maintaining transactional consistency for all mutations.
All mutations occur within a single database transaction encompassing:
Phase 1: Atomic saga plan persistence within transaction
Phase 2: Asynchronous execution post-commit
Reverse-order execution of compensation handlers for completed steps only, with exponential backoff retry.
def handler(entity_id: str, data: dict, session: Session) -> dict:
"""Pure function: all input via data, all output via return dict"""
Upon creation, the engine must serialize the list of steps from WorkflowMachine.sagas and store them in WorkflowSaga.execution_plan.
Rationale: This "freezes" the saga definition. If code is deployed changing the saga structure while a saga is in-flight, the runner must execute the steps defined at start time, not current code time, to ensure data integrity and correct compensation paths.
On state exit, pending timers for previous state are automatically revoked via dispatcher.
The domain object whose lifecycle is governed by the workflow engine. Must implement this mixin interface for full integration.
| Field | Type | Description |
|---|---|---|
| id | String | Unique identifier (UUID recommended) |
| state | String | Current workflow status |
| version | Integer | Entity version for ETag/staleness detection |
| completed_at | DateTime | Timestamp when terminal state reached |
Worker Responsibility: While processing a long-running step, the worker must update heartbeat_timestamp every N seconds.
Zombie Detection: The monitoring system flags sagas where status=RUNNING AND heartbeat_timestamp < NOW() - threshold.
{
"conditions": {
"all": [
{
"field": "amount",
"operator": "greater_than",
"value": 500
},
{
"not": {
"field": "status",
"operator": "equal_to",
"value": "vip"
}
}
]
},
"actions": [
{
"action": "require_approval",
"params": {"level": "manager"}
}
]
}
all, any, not
To prevent Remote Code Execution (RCE), arbitrary string imports are banned. Handlers must be explicitly registered using decorators.
@workflow_handler("payment.charge_card")
def charge_card(entity_id: str, data: dict,
session: Session) -> dict:
# Process payment via Stripe
...
@compensation_handler("payment.charge_card")
def refund_card(entity_id: str, data: dict,
session: Session) -> dict:
# Reverse the charge
...
Only handlers from ALLOWED_HANDLER_PREFIXES can be registered, preventing arbitrary code execution through crafted saga definitions.
WORKFLOW_MIGRATION_MODE=greenfield creates all tables from scratch for new installations.
WORKFLOW_MIGRATION_MODE=upgrade adds missing columns to existing tables without data loss.
SQLite :memory: database with MockDispatcher for core logic verification.
PostgreSQL and Redis for full-stack validation of locking and saga execution.
Mermaid output assertions to verify diagram class definitions and styling.
Locust with production-like infrastructure for concurrency validation.
For new installations, WORKFLOW_MIGRATION_MODE=greenfield creates all required tables from scratch with optimal schema design.
For existing installations, WORKFLOW_MIGRATION_MODE=upgrade performs non-destructive schema evolution by adding missing columns.
All admin_* methods require WORKFLOW_ADMIN role authorization and emit SECURITY_AUDIT events. In production, these should be gated behind a separate admin service with MFA, not exposed on the primary API.
# 1. Verify external reality (Stripe Dashboard)
# 2. Force status update
engine.admin_force_saga_status(
saga_id="uuid...",
step_name="charge_card",
status=WorkflowStatus.COMPLETED
)
# 3. Resume workflow
engine.dispatcher.dispatch_saga(saga_id="uuid...")
For worker crashes after external effects but before status update.
# EXTREME CAUTION: Bypasses all validation
engine.admin_force_state(
entity_type="Order",
entity_id="123",
new_state="draft",
actor_id="admin-emergency",
reason="Rollback due to Bug #1234"
)
For workflows stuck in logical dead-ends due to bugs.
engine.admin_set_mode(WorkflowMode.DRAINING)
while engine.admin_active_transaction_count() > 0:
time.sleep(5)
print(f"Waiting for {engine.admin_active_transaction_count()} transactions...")
engine.admin_set_mode(WorkflowMode.MAINTENANCE)
# Run migrations
engine.admin_set_mode(WorkflowMode.NORMAL)
Let in-flight transitions complete while blocking new ones.
engine.admin_bulk_state_update(
entity_type="Order",
filter_query={"state": "broken_state"},
new_state="draft",
reason="Mass rollback for Bug #1234",
dry_run=False
)
Safer than individual force_state calls for mass corrections.
engine.admin_purge_timers(
entity_id="123",
entity_type="Order"
)
Cleanup dead timers after manual SQL operations.
stats = engine.admin_get_lock_stats(
entity_id="123"
)
engine.admin_kill_connection(pid=9821)
Diagnose and resolve lock timeout issues.
engine.admin_reconstruct_audit(
entity_type="Order",
entity_id="123",
from_state="draft",
to_state="submitted",
trigger="submit"
)
Rebuild missing audit entries from historical data.
time() - workflow_saga_start_timestamp{status="pending"} > 3600
Action: Check Celery workers, run admin_force_saga_status
histogram_quantile(0.99, workflow_lock_wait_seconds_bucket) > 5
Action: Investigate hot entities, optimize handler logic
rate(workflow_transition_total{status="compensated"}[10m]) > 0.05
Action: Downstream service likely down, check dependencies
workflow_timer_lag_seconds > 300
Action: Celery timers queue backed up, scale workers
rate(workflow_handler_errors_total[5m]) / rate(workflow_transition_total[5m]) > 0.01
Action: Review handler logs, check external service health
workflow_active_sagas{status="pending"} > 10
Action: Run zombie reaper job, investigate worker crashes
| Profile | Transitions/min | Recommended Infrastructure |
|---|---|---|
| Small | < 100 | Single PostgreSQL, 2 Celery workers, standalone Redis |
| Medium | 100 – 1,000 | PostgreSQL with read replica, 4-8 workers, Redis cluster |
| Large | 1,000 – 10,000 | PostgreSQL with pgBouncer, 16-32 workers, Redis cluster, consider sharding |
| Very Large | > 10,000 | Sharded PostgreSQL (Citus), dedicated timer queue, horizontal API scaling |
(peak_transitions_per_min / 60) × avg_saga_steps × 1.5
1.5x multiplier for headroom
(active_sagas × 2KB) + (rate_limit_buckets × 100B) + 256MB
Add buffer for Celery task metadata
(api_instances × 10) + (celery_workers × 2) + 10 admin
Use pgBouncer for > 100 connections
Add read replicas or optimize hot entities
Scale worker count
Deploy pgBouncer or increase max_connections
Scale Redis cluster or increase instance size
Add API instances, review slow transitions
| Component | HA Strategy | Failover Time |
|---|---|---|
| PostgreSQL | Streaming replication + Patroni | < 30 seconds |
| Redis | Redis Sentinel or Cluster mode | < 10 seconds |
| Celery Workers | Multiple workers with auto-restart (systemd) | Immediate |
| API Layer | Load balancer with health checks | < 5 seconds |
Headers: Authorization (required), Idempotency-Key (optional)
Returns: Mermaid.js syntax as text/plain
Returns current state and available triggers
Performs a "Dry Run" of the engine's validation stack (Permissions + Conditions + Rules) for the current state.
{
"current_state": "review",
"triggers": {
"approve": { "enabled": true },
"reject": { "enabled": false, "reason": "User is not in group 'managers'" }
}
}
Query by group, assignee, status, entity_type
Database, Redis, Celery connectivity status
Transition completed successfully
Actor lacks required role for trigger
Invalid transition, terminal state, or already claimed
Business rule or condition check failed
Another transaction holds lock; retry with guidance
Rate limit exceeded; retry with guidance
System in draining/maintenance mode
Resource not found
Outbound notifications for state changes, saga completions, and task events.
Automatic circuit breaking for failing saga handlers with fallback mechanisms.
Built-in archival and time-based expiration for WorkflowAudit and completed sagas.
Native tenant isolation with separate workflow definitions and data partitioning.
Support for multiple workflow versions running concurrently with automatic migration.
Single-state FSM design cannot model true concurrent activity branches.
Flat workflow structure without nested process composition.
Trigger-driven API calls rather than event-driven message correlation.
Form rendering externalized to client applications.
Reassignment tracked as state changes without delegation chain.
For systems requiring both QWE's transactional strengths and BPMN's process orchestration, a hybrid approach combines both technologies in their zones of strength:
BPMN service tasks call QWE's attempt_transition() to safely change entity states with locking, rules, auditing, and saga compensation, while BPMN provides process-level orchestration and dashboards.
QWE is not a BPMN engine. It is a state machine orchestrator with transactional guarantees—deliberately narrower scope enabling superior operational characteristics for entity lifecycle workflows.