mirror of
https://github.com/langgenius/dify.git
synced 2026-05-13 08:57:28 +08:00
139 lines
5.9 KiB
Markdown
139 lines
5.9 KiB
Markdown
# Rule Catalog — SQLAlchemy Patterns
|
|
|
|
## Scope
|
|
- Covers: SQLAlchemy session and transaction lifecycle, query construction, tenant scoping, raw SQL boundaries, and write-path concurrency safeguards.
|
|
- Does NOT cover: table/model schema and migration design details (handled by `db-schema-rule.md`).
|
|
|
|
## Rules
|
|
|
|
### Use Session context manager with explicit transaction control behavior
|
|
- Category: best practices
|
|
- Severity: critical
|
|
- Description: Session and transaction lifecycle must be explicit and bounded on write paths. Missing commits can silently drop intended updates, while ad-hoc or long-lived transactions increase contention, lock duration, and deadlock risk.
|
|
- Suggested fix:
|
|
- Use **explicit `session.commit()`** after completing a related write unit.
|
|
- Or use **`session.begin()` context manager** for automatic commit/rollback on a scoped block.
|
|
- Keep transaction windows short: avoid network I/O, heavy computation, or unrelated work inside the transaction.
|
|
- Example:
|
|
- Bad:
|
|
```python
|
|
# Missing commit: write may never be persisted.
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
run = session.get(WorkflowRun, run_id)
|
|
run.status = "cancelled"
|
|
|
|
# Long transaction: external I/O inside a DB transaction.
|
|
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
|
run = session.get(WorkflowRun, run_id)
|
|
run.status = "cancelled"
|
|
call_external_api()
|
|
```
|
|
- Good:
|
|
```python
|
|
# Option 1: explicit commit.
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
run = session.get(WorkflowRun, run_id)
|
|
run.status = "cancelled"
|
|
session.commit()
|
|
|
|
# Option 2: scoped transaction with automatic commit/rollback.
|
|
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
|
run = session.get(WorkflowRun, run_id)
|
|
run.status = "cancelled"
|
|
|
|
# Keep non-DB work outside transaction scope.
|
|
call_external_api()
|
|
```
|
|
|
|
### Enforce tenant_id scoping on shared-resource queries
|
|
- Category: security
|
|
- Severity: critical
|
|
- Description: Reads and writes against shared tables must be scoped by `tenant_id` to prevent cross-tenant data leakage or corruption.
|
|
- Suggested fix: Add `tenant_id` predicate to all tenant-owned entity queries and propagate tenant context through service/repository interfaces.
|
|
- Example:
|
|
- Bad:
|
|
```python
|
|
stmt = select(Workflow).where(Workflow.id == workflow_id)
|
|
workflow = session.execute(stmt).scalar_one_or_none()
|
|
```
|
|
- Good:
|
|
```python
|
|
stmt = select(Workflow).where(
|
|
Workflow.id == workflow_id,
|
|
Workflow.tenant_id == tenant_id,
|
|
)
|
|
workflow = session.execute(stmt).scalar_one_or_none()
|
|
```
|
|
|
|
### Prefer SQLAlchemy expressions over raw SQL by default
|
|
- Category: maintainability
|
|
- Severity: suggestion
|
|
- Description: Raw SQL should be exceptional. ORM/Core expressions are easier to evolve, safer to compose, and more consistent with the codebase.
|
|
- Suggested fix: Rewrite straightforward raw SQL into SQLAlchemy `select/update/delete` expressions; keep raw SQL only when required by clear technical constraints.
|
|
- Example:
|
|
- Bad:
|
|
```python
|
|
row = session.execute(
|
|
text("SELECT * FROM workflows WHERE id = :id AND tenant_id = :tenant_id"),
|
|
{"id": workflow_id, "tenant_id": tenant_id},
|
|
).first()
|
|
```
|
|
- Good:
|
|
```python
|
|
stmt = select(Workflow).where(
|
|
Workflow.id == workflow_id,
|
|
Workflow.tenant_id == tenant_id,
|
|
)
|
|
row = session.execute(stmt).scalar_one_or_none()
|
|
```
|
|
|
|
### Protect write paths with concurrency safeguards
|
|
- Category: quality
|
|
- Severity: critical
|
|
- Description: Multi-writer paths without explicit concurrency control can silently overwrite data. Choose the safeguard based on contention level, lock scope, and throughput cost instead of defaulting to one strategy.
|
|
- Suggested fix:
|
|
- **Optimistic locking**: Use when contention is usually low and retries are acceptable. Add a version (or updated_at) guard in `WHERE` and treat `rowcount == 0` as a conflict.
|
|
- **Redis distributed lock**: Use when the critical section spans multiple steps/processes (or includes non-DB side effects) and you need cross-worker mutual exclusion.
|
|
- **SELECT ... FOR UPDATE**: Use when contention is high on the same rows and strict in-transaction serialization is required. Keep transactions short to reduce lock wait/deadlock risk.
|
|
- In all cases, scope by `tenant_id` and verify affected row counts for conditional writes.
|
|
- Example:
|
|
- Bad:
|
|
```python
|
|
# No tenant scope, no conflict detection, and no lock on a contested write path.
|
|
session.execute(update(WorkflowRun).where(WorkflowRun.id == run_id).values(status="cancelled"))
|
|
session.commit() # silently overwrites concurrent updates
|
|
```
|
|
- Good:
|
|
```python
|
|
# 1) Optimistic lock (low contention, retry on conflict)
|
|
result = session.execute(
|
|
update(WorkflowRun)
|
|
.where(
|
|
WorkflowRun.id == run_id,
|
|
WorkflowRun.tenant_id == tenant_id,
|
|
WorkflowRun.version == expected_version,
|
|
)
|
|
.values(status="cancelled", version=WorkflowRun.version + 1)
|
|
)
|
|
if result.rowcount == 0:
|
|
raise WorkflowStateConflictError("stale version, retry")
|
|
|
|
# 2) Redis distributed lock (cross-worker critical section)
|
|
lock_name = f"workflow_run_lock:{tenant_id}:{run_id}"
|
|
with redis_client.lock(lock_name, timeout=20):
|
|
session.execute(
|
|
update(WorkflowRun)
|
|
.where(WorkflowRun.id == run_id, WorkflowRun.tenant_id == tenant_id)
|
|
.values(status="cancelled")
|
|
)
|
|
session.commit()
|
|
|
|
# 3) Pessimistic lock with SELECT ... FOR UPDATE (high contention)
|
|
run = session.execute(
|
|
select(WorkflowRun)
|
|
.where(WorkflowRun.id == run_id, WorkflowRun.tenant_id == tenant_id)
|
|
.with_for_update()
|
|
).scalar_one()
|
|
run.status = "cancelled"
|
|
session.commit()
|
|
``` |