diff --git a/api/app_factory.py b/api/app_factory.py
index f827842d68..1fb01d2e91 100644
--- a/api/app_factory.py
+++ b/api/app_factory.py
@@ -71,6 +71,8 @@ def create_app() -> DifyApp:
def initialize_extensions(app: DifyApp):
+ # Initialize Flask context capture for workflow execution
+ from context.flask_app_context import init_flask_context
from extensions import (
ext_app_metrics,
ext_blueprints,
@@ -100,6 +102,8 @@ def initialize_extensions(app: DifyApp):
ext_warnings,
)
+ init_flask_context()
+
extensions = [
ext_timezone,
ext_logging,
diff --git a/api/commands.py b/api/commands.py
index 5539639cf1..99ba835d04 100644
--- a/api/commands.py
+++ b/api/commands.py
@@ -862,8 +862,27 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
@click.command("clean-workflow-runs", help="Clean expired workflow runs and related data for free tenants.")
-@click.option("--days", default=30, show_default=True, help="Delete workflow runs created before N days ago.")
+@click.option(
+ "--before-days",
+ "--days",
+ default=30,
+ show_default=True,
+ type=click.IntRange(min=0),
+ help="Delete workflow runs created before N days ago.",
+)
@click.option("--batch-size", default=200, show_default=True, help="Batch size for selecting workflow runs.")
+@click.option(
+ "--from-days-ago",
+ default=None,
+ type=click.IntRange(min=0),
+ help="Lower bound in days ago (older). Must be paired with --to-days-ago.",
+)
+@click.option(
+ "--to-days-ago",
+ default=None,
+ type=click.IntRange(min=0),
+ help="Upper bound in days ago (newer). Must be paired with --from-days-ago.",
+)
@click.option(
"--start-from",
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
@@ -882,8 +901,10 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
help="Preview cleanup results without deleting any workflow run data.",
)
def clean_workflow_runs(
- days: int,
+ before_days: int,
batch_size: int,
+ from_days_ago: int | None,
+ to_days_ago: int | None,
start_from: datetime.datetime | None,
end_before: datetime.datetime | None,
dry_run: bool,
@@ -894,11 +915,24 @@ def clean_workflow_runs(
if (start_from is None) ^ (end_before is None):
raise click.UsageError("--start-from and --end-before must be provided together.")
+ if (from_days_ago is None) ^ (to_days_ago is None):
+ raise click.UsageError("--from-days-ago and --to-days-ago must be provided together.")
+
+ if from_days_ago is not None and to_days_ago is not None:
+ if start_from or end_before:
+ raise click.UsageError("Choose either day offsets or explicit dates, not both.")
+ if from_days_ago <= to_days_ago:
+ raise click.UsageError("--from-days-ago must be greater than --to-days-ago.")
+ now = datetime.datetime.now()
+ start_from = now - datetime.timedelta(days=from_days_ago)
+ end_before = now - datetime.timedelta(days=to_days_ago)
+ before_days = 0
+
start_time = datetime.datetime.now(datetime.UTC)
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
WorkflowRunCleanup(
- days=days,
+ days=before_days,
batch_size=batch_size,
start_from=start_from,
end_before=end_before,
diff --git a/api/context/__init__.py b/api/context/__init__.py
new file mode 100644
index 0000000000..aebf9750ce
--- /dev/null
+++ b/api/context/__init__.py
@@ -0,0 +1,74 @@
+"""
+Core Context - Framework-agnostic context management.
+
+This module provides context management that is independent of any specific
+web framework. Framework-specific implementations register their context
+capture functions at application initialization time.
+
+This ensures the workflow layer remains completely decoupled from Flask
+or any other web framework.
+"""
+
+import contextvars
+from collections.abc import Callable
+
+from core.workflow.context.execution_context import (
+ ExecutionContext,
+ IExecutionContext,
+ NullAppContext,
+)
+
+# Global capturer function - set by framework-specific modules
+_capturer: Callable[[], IExecutionContext] | None = None
+
+
+def register_context_capturer(capturer: Callable[[], IExecutionContext]) -> None:
+ """
+ Register a context capture function.
+
+ This should be called by framework-specific modules (e.g., Flask)
+ during application initialization.
+
+ Args:
+ capturer: Function that captures current context and returns IExecutionContext
+ """
+ global _capturer
+ _capturer = capturer
+
+
+def capture_current_context() -> IExecutionContext:
+ """
+ Capture current execution context.
+
+ This function uses the registered context capturer. If no capturer
+ is registered, it returns a minimal context with only contextvars
+ (suitable for non-framework environments like tests or standalone scripts).
+
+ Returns:
+ IExecutionContext with captured context
+ """
+ if _capturer is None:
+ # No framework registered - return minimal context
+ return ExecutionContext(
+ app_context=NullAppContext(),
+ context_vars=contextvars.copy_context(),
+ )
+
+ return _capturer()
+
+
+def reset_context_provider() -> None:
+ """
+ Reset the context capturer.
+
+ This is primarily useful for testing to ensure a clean state.
+ """
+ global _capturer
+ _capturer = None
+
+
+__all__ = [
+ "capture_current_context",
+ "register_context_capturer",
+ "reset_context_provider",
+]
diff --git a/api/context/flask_app_context.py b/api/context/flask_app_context.py
new file mode 100644
index 0000000000..4b693cd91f
--- /dev/null
+++ b/api/context/flask_app_context.py
@@ -0,0 +1,198 @@
+"""
+Flask App Context - Flask implementation of AppContext interface.
+"""
+
+import contextvars
+from collections.abc import Generator
+from contextlib import contextmanager
+from typing import Any, final
+
+from flask import Flask, current_app, g
+
+from context import register_context_capturer
+from core.workflow.context.execution_context import (
+ AppContext,
+ IExecutionContext,
+)
+
+
+@final
+class FlaskAppContext(AppContext):
+ """
+ Flask implementation of AppContext.
+
+ This adapts Flask's app context to the AppContext interface.
+ """
+
+ def __init__(self, flask_app: Flask) -> None:
+ """
+ Initialize Flask app context.
+
+ Args:
+ flask_app: The Flask application instance
+ """
+ self._flask_app = flask_app
+
+ def get_config(self, key: str, default: Any = None) -> Any:
+ """Get configuration value from Flask app config."""
+ return self._flask_app.config.get(key, default)
+
+ def get_extension(self, name: str) -> Any:
+ """Get Flask extension by name."""
+ return self._flask_app.extensions.get(name)
+
+ @contextmanager
+ def enter(self) -> Generator[None, None, None]:
+ """Enter Flask app context."""
+ with self._flask_app.app_context():
+ yield
+
+ @property
+ def flask_app(self) -> Flask:
+ """Get the underlying Flask app instance."""
+ return self._flask_app
+
+
+def capture_flask_context(user: Any = None) -> IExecutionContext:
+ """
+ Capture current Flask execution context.
+
+ This function captures the Flask app context and contextvars from the
+ current environment. It should be called from within a Flask request or
+ app context.
+
+ Args:
+ user: Optional user object to include in context
+
+ Returns:
+ IExecutionContext with captured Flask context
+
+ Raises:
+ RuntimeError: If called outside Flask context
+ """
+ # Get Flask app instance
+ flask_app = current_app._get_current_object() # type: ignore
+
+ # Save current user if available
+ saved_user = user
+ if saved_user is None:
+ # Check for user in g (flask-login)
+ if hasattr(g, "_login_user"):
+ saved_user = g._login_user
+
+ # Capture contextvars
+ context_vars = contextvars.copy_context()
+
+ return FlaskExecutionContext(
+ flask_app=flask_app,
+ context_vars=context_vars,
+ user=saved_user,
+ )
+
+
+@final
+class FlaskExecutionContext:
+ """
+ Flask-specific execution context.
+
+ This is a specialized version of ExecutionContext that includes Flask app
+ context. It provides the same interface as ExecutionContext but with
+ Flask-specific implementation.
+ """
+
+ def __init__(
+ self,
+ flask_app: Flask,
+ context_vars: contextvars.Context,
+ user: Any = None,
+ ) -> None:
+ """
+ Initialize Flask execution context.
+
+ Args:
+ flask_app: Flask application instance
+ context_vars: Python contextvars
+ user: Optional user object
+ """
+ self._app_context = FlaskAppContext(flask_app)
+ self._context_vars = context_vars
+ self._user = user
+ self._flask_app = flask_app
+
+ @property
+ def app_context(self) -> FlaskAppContext:
+ """Get Flask app context."""
+ return self._app_context
+
+ @property
+ def context_vars(self) -> contextvars.Context:
+ """Get context variables."""
+ return self._context_vars
+
+ @property
+ def user(self) -> Any:
+ """Get user object."""
+ return self._user
+
+ def __enter__(self) -> "FlaskExecutionContext":
+ """Enter the Flask execution context."""
+ # Restore context variables
+ for var, val in self._context_vars.items():
+ var.set(val)
+
+ # Save current user from g if available
+ saved_user = None
+ if hasattr(g, "_login_user"):
+ saved_user = g._login_user
+
+ # Enter Flask app context
+ self._cm = self._app_context.enter()
+ self._cm.__enter__()
+
+ # Restore user in new app context
+ if saved_user is not None:
+ g._login_user = saved_user
+
+ return self
+
+ def __exit__(self, *args: Any) -> None:
+ """Exit the Flask execution context."""
+ if hasattr(self, "_cm"):
+ self._cm.__exit__(*args)
+
+ @contextmanager
+ def enter(self) -> Generator[None, None, None]:
+ """Enter Flask execution context as context manager."""
+ # Restore context variables
+ for var, val in self._context_vars.items():
+ var.set(val)
+
+ # Save current user from g if available
+ saved_user = None
+ if hasattr(g, "_login_user"):
+ saved_user = g._login_user
+
+ # Enter Flask app context
+ with self._flask_app.app_context():
+ # Restore user in new app context
+ if saved_user is not None:
+ g._login_user = saved_user
+ yield
+
+
+def init_flask_context() -> None:
+ """
+ Initialize Flask context capture by registering the capturer.
+
+ This function should be called during Flask application initialization
+ to register the Flask-specific context capturer with the core context module.
+
+ Example:
+ app = Flask(__name__)
+ init_flask_context() # Register Flask context capturer
+
+ Note:
+ This function does not need the app instance as it uses Flask's
+ `current_app` to get the app when capturing context.
+ """
+ register_context_capturer(capture_flask_context)
diff --git a/api/controllers/console/app/app.py b/api/controllers/console/app/app.py
index d66bb7063f..dad184c54b 100644
--- a/api/controllers/console/app/app.py
+++ b/api/controllers/console/app/app.py
@@ -1,4 +1,3 @@
-import re
import uuid
from datetime import datetime
from typing import Any, Literal, TypeAlias
@@ -68,48 +67,6 @@ class AppListQuery(BaseModel):
raise ValueError("Invalid UUID format in tag_ids.") from exc
-# XSS prevention: patterns that could lead to XSS attacks
-# Includes: script tags, iframe tags, javascript: protocol, SVG with onload, etc.
-_XSS_PATTERNS = [
- r"", # Script tags
- r")", # Iframe tags (including self-closing)
- r"javascript:", # JavaScript protocol
- r"]*?\s+onload\s*=[^>]*>", # SVG with onload handler (attribute-aware, flexible whitespace)
- r"<.*?on\s*\w+\s*=", # Event handlers like onclick, onerror, etc.
- r"]*(?:\s*/>|>.*? )", # Object tags (opening tag)
- r"]*>", # Embed tags (self-closing)
- r" ]*>", # Link tags with javascript
-]
-
-
-def _validate_xss_safe(value: str | None, field_name: str = "Field") -> str | None:
- """
- Validate that a string value doesn't contain potential XSS payloads.
-
- Args:
- value: The string value to validate
- field_name: Name of the field for error messages
-
- Returns:
- The original value if safe
-
- Raises:
- ValueError: If the value contains XSS patterns
- """
- if value is None:
- return None
-
- value_lower = value.lower()
- for pattern in _XSS_PATTERNS:
- if re.search(pattern, value_lower, re.DOTALL | re.IGNORECASE):
- raise ValueError(
- f"{field_name} contains invalid characters or patterns. "
- "HTML tags, JavaScript, and other potentially dangerous content are not allowed."
- )
-
- return value
-
-
class CreateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
description: str | None = Field(default=None, description="App description (max 400 chars)", max_length=400)
@@ -118,11 +75,6 @@ class CreateAppPayload(BaseModel):
icon: str | None = Field(default=None, description="Icon")
icon_background: str | None = Field(default=None, description="Icon background color")
- @field_validator("name", "description", mode="before")
- @classmethod
- def validate_xss_safe(cls, value: str | None, info) -> str | None:
- return _validate_xss_safe(value, info.field_name)
-
class UpdateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
@@ -133,11 +85,6 @@ class UpdateAppPayload(BaseModel):
use_icon_as_answer_icon: bool | None = Field(default=None, description="Use icon as answer icon")
max_active_requests: int | None = Field(default=None, description="Maximum active requests")
- @field_validator("name", "description", mode="before")
- @classmethod
- def validate_xss_safe(cls, value: str | None, info) -> str | None:
- return _validate_xss_safe(value, info.field_name)
-
class CopyAppPayload(BaseModel):
name: str | None = Field(default=None, description="Name for the copied app")
@@ -146,11 +93,6 @@ class CopyAppPayload(BaseModel):
icon: str | None = Field(default=None, description="Icon")
icon_background: str | None = Field(default=None, description="Icon background color")
- @field_validator("name", "description", mode="before")
- @classmethod
- def validate_xss_safe(cls, value: str | None, info) -> str | None:
- return _validate_xss_safe(value, info.field_name)
-
class AppExportQuery(BaseModel):
include_secret: bool = Field(default=False, description="Include secrets in export")
diff --git a/api/controllers/console/auth/activate.py b/api/controllers/console/auth/activate.py
index cfc673880c..f741107b87 100644
--- a/api/controllers/console/auth/activate.py
+++ b/api/controllers/console/auth/activate.py
@@ -69,6 +69,13 @@ class ActivateCheckApi(Resource):
if invitation:
data = invitation.get("data", {})
tenant = invitation.get("tenant", None)
+
+ # Check workspace permission
+ if tenant:
+ from libs.workspace_permission import check_workspace_member_invite_permission
+
+ check_workspace_member_invite_permission(tenant.id)
+
workspace_name = tenant.name if tenant else None
workspace_id = tenant.id if tenant else None
invitee_email = data.get("email") if data else None
diff --git a/api/controllers/console/workspace/members.py b/api/controllers/console/workspace/members.py
index e9bd2b8f94..01cca2a8a0 100644
--- a/api/controllers/console/workspace/members.py
+++ b/api/controllers/console/workspace/members.py
@@ -107,6 +107,12 @@ class MemberInviteEmailApi(Resource):
inviter = current_user
if not inviter.current_tenant:
raise ValueError("No current tenant")
+
+ # Check workspace permission for member invitations
+ from libs.workspace_permission import check_workspace_member_invite_permission
+
+ check_workspace_member_invite_permission(inviter.current_tenant.id)
+
invitation_results = []
console_web_url = dify_config.CONSOLE_WEB_URL
diff --git a/api/controllers/console/workspace/workspace.py b/api/controllers/console/workspace/workspace.py
index 52e6f7d737..94be81d94f 100644
--- a/api/controllers/console/workspace/workspace.py
+++ b/api/controllers/console/workspace/workspace.py
@@ -20,6 +20,7 @@ from controllers.console.error import AccountNotLinkTenantError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
+ only_edition_enterprise,
setup_required,
)
from enums.cloud_plan import CloudPlan
@@ -28,6 +29,7 @@ from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantStatus
from services.account_service import TenantService
+from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.file_service import FileService
from services.workspace_service import WorkspaceService
@@ -288,3 +290,31 @@ class WorkspaceInfoApi(Resource):
db.session.commit()
return {"result": "success", "tenant": marshal(WorkspaceService.get_tenant_info(tenant), tenant_fields)}
+
+
+@console_ns.route("/workspaces/current/permission")
+class WorkspacePermissionApi(Resource):
+ """Get workspace permissions for the current workspace."""
+
+ @setup_required
+ @login_required
+ @account_initialization_required
+ @only_edition_enterprise
+ def get(self):
+ """
+ Get workspace permission settings.
+ Returns permission flags that control workspace features like member invitations and owner transfer.
+ """
+ _, current_tenant_id = current_account_with_tenant()
+
+ if not current_tenant_id:
+ raise ValueError("No current tenant")
+
+ # Get workspace permissions from enterprise service
+ permission = EnterpriseService.WorkspacePermissionService.get_permission(current_tenant_id)
+
+ return {
+ "workspace_id": permission.workspace_id,
+ "allow_member_invite": permission.allow_member_invite,
+ "allow_owner_transfer": permission.allow_owner_transfer,
+ }, 200
diff --git a/api/controllers/console/wraps.py b/api/controllers/console/wraps.py
index 95fc006a12..fd928b077d 100644
--- a/api/controllers/console/wraps.py
+++ b/api/controllers/console/wraps.py
@@ -286,13 +286,12 @@ def enable_change_email(view: Callable[P, R]):
def is_allow_transfer_owner(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
- _, current_tenant_id = current_account_with_tenant()
- features = FeatureService.get_features(current_tenant_id)
- if features.is_allow_transfer_workspace:
- return view(*args, **kwargs)
+ from libs.workspace_permission import check_workspace_owner_transfer_permission
- # otherwise, return 403
- abort(403)
+ _, current_tenant_id = current_account_with_tenant()
+ # Check both billing/plan level and workspace policy level permissions
+ check_workspace_owner_transfer_permission(current_tenant_id)
+ return view(*args, **kwargs)
return decorated
diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py
index c9d5ca46e8..b7f359bbd1 100644
--- a/api/core/app/apps/workflow/app_generator.py
+++ b/api/core/app/apps/workflow/app_generator.py
@@ -8,7 +8,7 @@ from typing import Any, Literal, Union, overload
from flask import Flask, current_app
from pydantic import ValidationError
from sqlalchemy import select
-from sqlalchemy.orm import Session, sessionmaker
+from sqlalchemy.orm import sessionmaker
import contexts
from configs import dify_config
@@ -24,6 +24,7 @@ from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTas
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.app.layers.sandbox_layer import SandboxLayer
+from core.db.session_factory import session_factory
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
@@ -479,7 +480,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
:return:
"""
with preserve_flask_contexts(flask_app, context_vars=context):
- with Session(db.engine, expire_on_commit=False) as session:
+ with session_factory.create_session() as session:
workflow = session.scalar(
select(Workflow).where(
Workflow.tenant_id == application_generate_entity.app_config.tenant_id,
diff --git a/api/core/plugin/impl/base.py b/api/core/plugin/impl/base.py
index 0e49824ad0..7a6a598a2f 100644
--- a/api/core/plugin/impl/base.py
+++ b/api/core/plugin/impl/base.py
@@ -320,18 +320,17 @@ class BasePluginClient:
case PluginInvokeError.__name__:
error_object = json.loads(message)
invoke_error_type = error_object.get("error_type")
- args = error_object.get("args")
match invoke_error_type:
case InvokeRateLimitError.__name__:
- raise InvokeRateLimitError(description=args.get("description"))
+ raise InvokeRateLimitError(description=error_object.get("message"))
case InvokeAuthorizationError.__name__:
- raise InvokeAuthorizationError(description=args.get("description"))
+ raise InvokeAuthorizationError(description=error_object.get("message"))
case InvokeBadRequestError.__name__:
- raise InvokeBadRequestError(description=args.get("description"))
+ raise InvokeBadRequestError(description=error_object.get("message"))
case InvokeConnectionError.__name__:
- raise InvokeConnectionError(description=args.get("description"))
+ raise InvokeConnectionError(description=error_object.get("message"))
case InvokeServerUnavailableError.__name__:
- raise InvokeServerUnavailableError(description=args.get("description"))
+ raise InvokeServerUnavailableError(description=error_object.get("message"))
case CredentialsValidateFailedError.__name__:
raise CredentialsValidateFailedError(error_object.get("message"))
case EndpointSetupFailedError.__name__:
@@ -339,11 +338,11 @@ class BasePluginClient:
case TriggerProviderCredentialValidationError.__name__:
raise TriggerProviderCredentialValidationError(error_object.get("message"))
case TriggerPluginInvokeError.__name__:
- raise TriggerPluginInvokeError(description=error_object.get("description"))
+ raise TriggerPluginInvokeError(description=error_object.get("message"))
case TriggerInvokeError.__name__:
raise TriggerInvokeError(error_object.get("message"))
case EventIgnoreError.__name__:
- raise EventIgnoreError(description=error_object.get("description"))
+ raise EventIgnoreError(description=error_object.get("message"))
case _:
raise PluginInvokeError(description=message)
case PluginDaemonInternalServerError.__name__:
diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py
index 389db8a972..283744b43b 100644
--- a/api/core/tools/workflow_as_tool/tool.py
+++ b/api/core/tools/workflow_as_tool/tool.py
@@ -5,7 +5,6 @@ import logging
from collections.abc import Generator, Mapping, Sequence
from typing import Any, cast
-from flask import has_request_context
from sqlalchemy import select
from core.db.session_factory import session_factory
@@ -29,6 +28,21 @@ from models.workflow import Workflow
logger = logging.getLogger(__name__)
+def _try_resolve_user_from_request() -> Account | EndUser | None:
+ """
+ Try to resolve user from Flask request context.
+
+ Returns None if not in a request context or if user is not available.
+ """
+ # Note: `current_user` is a LocalProxy. Never compare it with None directly.
+ # Use _get_current_object() to dereference the proxy
+ user = getattr(current_user, "_get_current_object", lambda: current_user)()
+ # Check if we got a valid user object
+ if user is not None and hasattr(user, "id"):
+ return user
+ return None
+
+
class WorkflowTool(Tool):
"""
Workflow tool.
@@ -209,21 +223,13 @@ class WorkflowTool(Tool):
Returns:
Account | EndUser | None: The resolved user object, or None if resolution fails.
"""
- if has_request_context():
- return self._resolve_user_from_request()
- else:
- return self._resolve_user_from_database(user_id=user_id)
+ # Try to resolve user from request context first
+ user = _try_resolve_user_from_request()
+ if user is not None:
+ return user
- def _resolve_user_from_request(self) -> Account | EndUser | None:
- """
- Resolve user from Flask request context.
- """
- try:
- # Note: `current_user` is a LocalProxy. Never compare it with None directly.
- return getattr(current_user, "_get_current_object", lambda: current_user)()
- except Exception as e:
- logger.warning("Failed to resolve user from request context: %s", e)
- return None
+ # Fall back to database resolution
+ return self._resolve_user_from_database(user_id=user_id)
def _resolve_user_from_database(self, user_id: str) -> Account | EndUser | None:
"""
diff --git a/api/core/workflow/context/__init__.py b/api/core/workflow/context/__init__.py
new file mode 100644
index 0000000000..31e1f2c8d9
--- /dev/null
+++ b/api/core/workflow/context/__init__.py
@@ -0,0 +1,22 @@
+"""
+Execution Context - Context management for workflow execution.
+
+This package provides Flask-independent context management for workflow
+execution in multi-threaded environments.
+"""
+
+from core.workflow.context.execution_context import (
+ AppContext,
+ ExecutionContext,
+ IExecutionContext,
+ NullAppContext,
+ capture_current_context,
+)
+
+__all__ = [
+ "AppContext",
+ "ExecutionContext",
+ "IExecutionContext",
+ "NullAppContext",
+ "capture_current_context",
+]
diff --git a/api/core/workflow/context/execution_context.py b/api/core/workflow/context/execution_context.py
new file mode 100644
index 0000000000..5a4203be93
--- /dev/null
+++ b/api/core/workflow/context/execution_context.py
@@ -0,0 +1,216 @@
+"""
+Execution Context - Abstracted context management for workflow execution.
+"""
+
+import contextvars
+from abc import ABC, abstractmethod
+from collections.abc import Generator
+from contextlib import AbstractContextManager, contextmanager
+from typing import Any, Protocol, final, runtime_checkable
+
+
+class AppContext(ABC):
+ """
+ Abstract application context interface.
+
+ This abstraction allows workflow execution to work with or without Flask
+ by providing a common interface for application context management.
+ """
+
+ @abstractmethod
+ def get_config(self, key: str, default: Any = None) -> Any:
+ """Get configuration value by key."""
+ pass
+
+ @abstractmethod
+ def get_extension(self, name: str) -> Any:
+ """Get Flask extension by name (e.g., 'db', 'cache')."""
+ pass
+
+ @abstractmethod
+ def enter(self) -> AbstractContextManager[None]:
+ """Enter the application context."""
+ pass
+
+
+@runtime_checkable
+class IExecutionContext(Protocol):
+ """
+ Protocol for execution context.
+
+ This protocol defines the interface that all execution contexts must implement,
+ allowing both ExecutionContext and FlaskExecutionContext to be used interchangeably.
+ """
+
+ def __enter__(self) -> "IExecutionContext":
+ """Enter the execution context."""
+ ...
+
+ def __exit__(self, *args: Any) -> None:
+ """Exit the execution context."""
+ ...
+
+ @property
+ def user(self) -> Any:
+ """Get user object."""
+ ...
+
+
+@final
+class ExecutionContext:
+ """
+ Execution context for workflow execution in worker threads.
+
+ This class encapsulates all context needed for workflow execution:
+ - Application context (Flask app or standalone)
+ - Context variables for Python contextvars
+ - User information (optional)
+
+ It is designed to be serializable and passable to worker threads.
+ """
+
+ def __init__(
+ self,
+ app_context: AppContext | None = None,
+ context_vars: contextvars.Context | None = None,
+ user: Any = None,
+ ) -> None:
+ """
+ Initialize execution context.
+
+ Args:
+ app_context: Application context (Flask or standalone)
+ context_vars: Python contextvars to preserve
+ user: User object (optional)
+ """
+ self._app_context = app_context
+ self._context_vars = context_vars
+ self._user = user
+
+ @property
+ def app_context(self) -> AppContext | None:
+ """Get application context."""
+ return self._app_context
+
+ @property
+ def context_vars(self) -> contextvars.Context | None:
+ """Get context variables."""
+ return self._context_vars
+
+ @property
+ def user(self) -> Any:
+ """Get user object."""
+ return self._user
+
+ @contextmanager
+ def enter(self) -> Generator[None, None, None]:
+ """
+ Enter this execution context.
+
+ This is a convenience method that creates a context manager.
+ """
+ # Restore context variables if provided
+ if self._context_vars:
+ for var, val in self._context_vars.items():
+ var.set(val)
+
+ # Enter app context if available
+ if self._app_context is not None:
+ with self._app_context.enter():
+ yield
+ else:
+ yield
+
+ def __enter__(self) -> "ExecutionContext":
+ """Enter the execution context."""
+ self._cm = self.enter()
+ self._cm.__enter__()
+ return self
+
+ def __exit__(self, *args: Any) -> None:
+ """Exit the execution context."""
+ if hasattr(self, "_cm"):
+ self._cm.__exit__(*args)
+
+
+class NullAppContext(AppContext):
+ """
+ Null implementation of AppContext for non-Flask environments.
+
+ This is used when running without Flask (e.g., in tests or standalone mode).
+ """
+
+ def __init__(self, config: dict[str, Any] | None = None) -> None:
+ """
+ Initialize null app context.
+
+ Args:
+ config: Optional configuration dictionary
+ """
+ self._config = config or {}
+ self._extensions: dict[str, Any] = {}
+
+ def get_config(self, key: str, default: Any = None) -> Any:
+ """Get configuration value by key."""
+ return self._config.get(key, default)
+
+ def get_extension(self, name: str) -> Any:
+ """Get extension by name."""
+ return self._extensions.get(name)
+
+ def set_extension(self, name: str, extension: Any) -> None:
+ """Set extension by name."""
+ self._extensions[name] = extension
+
+ @contextmanager
+ def enter(self) -> Generator[None, None, None]:
+ """Enter null context (no-op)."""
+ yield
+
+
+class ExecutionContextBuilder:
+ """
+ Builder for creating ExecutionContext instances.
+
+ This provides a fluent API for building execution contexts.
+ """
+
+ def __init__(self) -> None:
+ self._app_context: AppContext | None = None
+ self._context_vars: contextvars.Context | None = None
+ self._user: Any = None
+
+ def with_app_context(self, app_context: AppContext) -> "ExecutionContextBuilder":
+ """Set application context."""
+ self._app_context = app_context
+ return self
+
+ def with_context_vars(self, context_vars: contextvars.Context) -> "ExecutionContextBuilder":
+ """Set context variables."""
+ self._context_vars = context_vars
+ return self
+
+ def with_user(self, user: Any) -> "ExecutionContextBuilder":
+ """Set user."""
+ self._user = user
+ return self
+
+ def build(self) -> ExecutionContext:
+ """Build the execution context."""
+ return ExecutionContext(
+ app_context=self._app_context,
+ context_vars=self._context_vars,
+ user=self._user,
+ )
+
+
+def capture_current_context() -> IExecutionContext:
+ """
+ Capture current execution context from the calling environment.
+
+ Returns:
+ IExecutionContext with captured context
+ """
+ from context import capture_current_context
+
+ return capture_current_context()
diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py
index 9a870d7bf5..dbb2727c98 100644
--- a/api/core/workflow/graph_engine/graph_engine.py
+++ b/api/core/workflow/graph_engine/graph_engine.py
@@ -7,15 +7,13 @@ Domain-Driven Design principles for improved maintainability and testability.
from __future__ import annotations
-import contextvars
import logging
import queue
import threading
from collections.abc import Generator
from typing import TYPE_CHECKING, cast, final
-from flask import Flask, current_app
-
+from core.workflow.context import capture_current_context
from core.workflow.enums import NodeExecutionType
from core.workflow.graph import Graph
from core.workflow.graph_events import (
@@ -159,17 +157,8 @@ class GraphEngine:
self._layers: list[GraphEngineLayer] = []
# === Worker Pool Setup ===
- # Capture Flask app context for worker threads
- flask_app: Flask | None = None
- try:
- app = current_app._get_current_object() # type: ignore
- if isinstance(app, Flask):
- flask_app = app
- except RuntimeError:
- pass
-
- # Capture context variables for worker threads
- context_vars = contextvars.copy_context()
+ # Capture execution context for worker threads
+ execution_context = capture_current_context()
# Create worker pool for parallel node execution
self._worker_pool = WorkerPool(
@@ -177,8 +166,7 @@ class GraphEngine:
event_queue=self._event_queue,
graph=self._graph,
layers=self._layers,
- flask_app=flask_app,
- context_vars=context_vars,
+ execution_context=execution_context,
min_workers=self._min_workers,
max_workers=self._max_workers,
scale_up_threshold=self._scale_up_threshold,
diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py
index 83419830b6..95db5c5c92 100644
--- a/api/core/workflow/graph_engine/worker.py
+++ b/api/core/workflow/graph_engine/worker.py
@@ -5,26 +5,27 @@ Workers pull node IDs from the ready_queue, execute nodes, and push events
to the event_queue for the dispatcher to process.
"""
-import contextvars
import queue
import threading
import time
from collections.abc import Sequence
from datetime import datetime
-from typing import final
+from typing import TYPE_CHECKING, final
from uuid import uuid4
-from flask import Flask
from typing_extensions import override
+from core.workflow.context import IExecutionContext
from core.workflow.graph import Graph
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
from core.workflow.nodes.base.node import Node
-from libs.flask_utils import preserve_flask_contexts
from .ready_queue import ReadyQueue
+if TYPE_CHECKING:
+ pass
+
@final
class Worker(threading.Thread):
@@ -44,8 +45,7 @@ class Worker(threading.Thread):
layers: Sequence[GraphEngineLayer],
stop_event: threading.Event,
worker_id: int = 0,
- flask_app: Flask | None = None,
- context_vars: contextvars.Context | None = None,
+ execution_context: IExecutionContext | None = None,
) -> None:
"""
Initialize worker thread.
@@ -56,19 +56,17 @@ class Worker(threading.Thread):
graph: Graph containing nodes to execute
layers: Graph engine layers for node execution hooks
worker_id: Unique identifier for this worker
- flask_app: Optional Flask application for context preservation
- context_vars: Optional context variables to preserve in worker thread
+ execution_context: Optional execution context for context preservation
"""
super().__init__(name=f"GraphWorker-{worker_id}", daemon=True)
self._ready_queue = ready_queue
self._event_queue = event_queue
self._graph = graph
self._worker_id = worker_id
- self._flask_app = flask_app
- self._context_vars = context_vars
- self._last_task_time = time.time()
+ self._execution_context = execution_context
self._stop_event = stop_event
self._layers = layers if layers is not None else []
+ self._last_task_time = time.time()
def stop(self) -> None:
"""Worker is controlled via shared stop_event from GraphEngine.
@@ -135,11 +133,9 @@ class Worker(threading.Thread):
error: Exception | None = None
- if self._flask_app and self._context_vars:
- with preserve_flask_contexts(
- flask_app=self._flask_app,
- context_vars=self._context_vars,
- ):
+ # Execute the node with preserved context if execution context is provided
+ if self._execution_context is not None:
+ with self._execution_context:
self._invoke_node_run_start_hooks(node)
try:
node_events = node.run()
diff --git a/api/core/workflow/graph_engine/worker_management/worker_pool.py b/api/core/workflow/graph_engine/worker_management/worker_pool.py
index df76ebe882..9ce7d16e93 100644
--- a/api/core/workflow/graph_engine/worker_management/worker_pool.py
+++ b/api/core/workflow/graph_engine/worker_management/worker_pool.py
@@ -8,9 +8,10 @@ DynamicScaler, and WorkerFactory into a single class.
import logging
import queue
import threading
-from typing import TYPE_CHECKING, final
+from typing import final
from configs import dify_config
+from core.workflow.context import IExecutionContext
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase
@@ -20,11 +21,6 @@ from ..worker import Worker
logger = logging.getLogger(__name__)
-if TYPE_CHECKING:
- from contextvars import Context
-
- from flask import Flask
-
@final
class WorkerPool:
@@ -42,8 +38,7 @@ class WorkerPool:
graph: Graph,
layers: list[GraphEngineLayer],
stop_event: threading.Event,
- flask_app: "Flask | None" = None,
- context_vars: "Context | None" = None,
+ execution_context: IExecutionContext | None = None,
min_workers: int | None = None,
max_workers: int | None = None,
scale_up_threshold: int | None = None,
@@ -57,8 +52,7 @@ class WorkerPool:
event_queue: Queue for worker events
graph: The workflow graph
layers: Graph engine layers for node execution hooks
- flask_app: Optional Flask app for context preservation
- context_vars: Optional context variables
+ execution_context: Optional execution context for context preservation
min_workers: Minimum number of workers
max_workers: Maximum number of workers
scale_up_threshold: Queue depth to trigger scale up
@@ -67,8 +61,7 @@ class WorkerPool:
self._ready_queue = ready_queue
self._event_queue = event_queue
self._graph = graph
- self._flask_app = flask_app
- self._context_vars = context_vars
+ self._execution_context = execution_context
self._layers = layers
# Scaling parameters with defaults
@@ -152,8 +145,7 @@ class WorkerPool:
graph=self._graph,
layers=self._layers,
worker_id=worker_id,
- flask_app=self._flask_app,
- context_vars=self._context_vars,
+ execution_context=self._execution_context,
stop_event=self._stop_event,
)
diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py
index 91df2e4e0b..569a4196fb 100644
--- a/api/core/workflow/nodes/iteration/iteration_node.py
+++ b/api/core/workflow/nodes/iteration/iteration_node.py
@@ -1,11 +1,9 @@
-import contextvars
import logging
from collections.abc import Generator, Mapping, Sequence
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, NewType, cast
-from flask import Flask, current_app
from typing_extensions import TypeIs
from core.model_runtime.entities.llm_entities import LLMUsage
@@ -39,7 +37,6 @@ from core.workflow.nodes.base.node import Node
from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData
from core.workflow.runtime import VariablePool
from libs.datetime_utils import naive_utc_now
-from libs.flask_utils import preserve_flask_contexts
from .exc import (
InvalidIteratorValueError,
@@ -51,6 +48,7 @@ from .exc import (
)
if TYPE_CHECKING:
+ from core.workflow.context import IExecutionContext
from core.workflow.graph_engine import GraphEngine
logger = logging.getLogger(__name__)
@@ -252,8 +250,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
self._execute_single_iteration_parallel,
index=index,
item=item,
- flask_app=current_app._get_current_object(), # type: ignore
- context_vars=contextvars.copy_context(),
+ execution_context=self._capture_execution_context(),
)
future_to_index[future] = index
@@ -306,11 +303,10 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
self,
index: int,
item: object,
- flask_app: Flask,
- context_vars: contextvars.Context,
+ execution_context: "IExecutionContext",
) -> tuple[datetime, list[GraphNodeEventBase], object | None, dict[str, Variable], LLMUsage]:
"""Execute a single iteration in parallel mode and return results."""
- with preserve_flask_contexts(flask_app=flask_app, context_vars=context_vars):
+ with execution_context:
iter_start_at = datetime.now(UTC).replace(tzinfo=None)
events: list[GraphNodeEventBase] = []
outputs_temp: list[object] = []
@@ -339,6 +335,12 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
graph_engine.graph_runtime_state.llm_usage,
)
+ def _capture_execution_context(self) -> "IExecutionContext":
+ """Capture current execution context for parallel iterations."""
+ from core.workflow.context import capture_current_context
+
+ return capture_current_context()
+
def _handle_iteration_success(
self,
started_at: datetime,
diff --git a/api/libs/workspace_permission.py b/api/libs/workspace_permission.py
new file mode 100644
index 0000000000..dd42a7facf
--- /dev/null
+++ b/api/libs/workspace_permission.py
@@ -0,0 +1,74 @@
+"""
+Workspace permission helper functions.
+
+These helpers check both billing/plan level and workspace-specific policy level permissions.
+Checks are performed at two levels:
+1. Billing/plan level - via FeatureService (e.g., SANDBOX plan restrictions)
+2. Workspace policy level - via EnterpriseService (admin-configured per workspace)
+"""
+
+import logging
+
+from werkzeug.exceptions import Forbidden
+
+from configs import dify_config
+from services.enterprise.enterprise_service import EnterpriseService
+from services.feature_service import FeatureService
+
+logger = logging.getLogger(__name__)
+
+
+def check_workspace_member_invite_permission(workspace_id: str) -> None:
+ """
+ Check if workspace allows member invitations at both billing and policy levels.
+
+ Checks performed:
+ 1. Billing/plan level - For future expansion (currently no plan-level restriction)
+ 2. Enterprise policy level - Admin-configured workspace permission
+
+ Args:
+ workspace_id: The workspace ID to check permissions for
+
+ Raises:
+ Forbidden: If either billing plan or workspace policy prohibits member invitations
+ """
+ # Check enterprise workspace policy level (only if enterprise enabled)
+ if dify_config.ENTERPRISE_ENABLED:
+ try:
+ permission = EnterpriseService.WorkspacePermissionService.get_permission(workspace_id)
+ if not permission.allow_member_invite:
+ raise Forbidden("Workspace policy prohibits member invitations")
+ except Forbidden:
+ raise
+ except Exception:
+ logger.exception("Failed to check workspace invite permission for %s", workspace_id)
+
+
+def check_workspace_owner_transfer_permission(workspace_id: str) -> None:
+ """
+ Check if workspace allows owner transfer at both billing and policy levels.
+
+ Checks performed:
+ 1. Billing/plan level - SANDBOX plan blocks owner transfer
+ 2. Enterprise policy level - Admin-configured workspace permission
+
+ Args:
+ workspace_id: The workspace ID to check permissions for
+
+ Raises:
+ Forbidden: If either billing plan or workspace policy prohibits ownership transfer
+ """
+ features = FeatureService.get_features(workspace_id)
+ if not features.is_allow_transfer_workspace:
+ raise Forbidden("Your current plan does not allow workspace ownership transfer")
+
+ # Check enterprise workspace policy level (only if enterprise enabled)
+ if dify_config.ENTERPRISE_ENABLED:
+ try:
+ permission = EnterpriseService.WorkspacePermissionService.get_permission(workspace_id)
+ if not permission.allow_owner_transfer:
+ raise Forbidden("Workspace policy prohibits ownership transfer")
+ except Forbidden:
+ raise
+ except Exception:
+ logger.exception("Failed to check workspace transfer permission for %s", workspace_id)
diff --git a/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py b/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py
new file mode 100644
index 0000000000..2e1af0c83f
--- /dev/null
+++ b/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py
@@ -0,0 +1,35 @@
+"""change workflow node execution workflow_run index
+
+Revision ID: 288345cd01d1
+Revises: 3334862ee907
+Create Date: 2026-01-16 17:15:00.000000
+
+"""
+from alembic import op
+
+
+# revision identifiers, used by Alembic.
+revision = "288345cd01d1"
+down_revision = "3334862ee907"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ with op.batch_alter_table("workflow_node_executions", schema=None) as batch_op:
+ batch_op.drop_index("workflow_node_execution_workflow_run_idx")
+ batch_op.create_index(
+ "workflow_node_execution_workflow_run_id_idx",
+ ["workflow_run_id"],
+ unique=False,
+ )
+
+
+def downgrade():
+ with op.batch_alter_table("workflow_node_executions", schema=None) as batch_op:
+ batch_op.drop_index("workflow_node_execution_workflow_run_id_idx")
+ batch_op.create_index(
+ "workflow_node_execution_workflow_run_idx",
+ ["tenant_id", "app_id", "workflow_id", "triggered_from", "workflow_run_id"],
+ unique=False,
+ )
diff --git a/api/models/workflow.py b/api/models/workflow.py
index 9a687f22be..7915d923ec 100644
--- a/api/models/workflow.py
+++ b/api/models/workflow.py
@@ -820,11 +820,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
return (
PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
Index(
- "workflow_node_execution_workflow_run_idx",
- "tenant_id",
- "app_id",
- "workflow_id",
- "triggered_from",
+ "workflow_node_execution_workflow_run_id_idx",
"workflow_run_id",
),
Index(
diff --git a/api/repositories/api_workflow_node_execution_repository.py b/api/repositories/api_workflow_node_execution_repository.py
index fa2c94b623..479eb1ff54 100644
--- a/api/repositories/api_workflow_node_execution_repository.py
+++ b/api/repositories/api_workflow_node_execution_repository.py
@@ -13,6 +13,8 @@ from collections.abc import Sequence
from datetime import datetime
from typing import Protocol
+from sqlalchemy.orm import Session
+
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from models.workflow import WorkflowNodeExecutionModel
@@ -130,6 +132,18 @@ class DifyAPIWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository, Pr
"""
...
+ def count_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
+ """
+ Count node executions and offloads for the given workflow run ids.
+ """
+ ...
+
+ def delete_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
+ """
+ Delete node executions and offloads for the given workflow run ids.
+ """
+ ...
+
def delete_executions_by_app(
self,
tenant_id: str,
diff --git a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py
index 2de3a15d65..4a7c975d2c 100644
--- a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py
+++ b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py
@@ -7,17 +7,15 @@ using SQLAlchemy 2.0 style queries for WorkflowNodeExecutionModel operations.
from collections.abc import Sequence
from datetime import datetime
-from typing import TypedDict, cast
+from typing import cast
-from sqlalchemy import asc, delete, desc, func, select, tuple_
+from sqlalchemy import asc, delete, desc, func, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, sessionmaker
-from models.enums import WorkflowRunTriggeredFrom
from models.workflow import (
WorkflowNodeExecutionModel,
WorkflowNodeExecutionOffload,
- WorkflowNodeExecutionTriggeredFrom,
)
from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository
@@ -49,26 +47,6 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
"""
self._session_maker = session_maker
- @staticmethod
- def _map_run_triggered_from_to_node_triggered_from(triggered_from: str) -> str:
- """
- Map workflow run triggered_from values to workflow node execution triggered_from values.
- """
- if triggered_from in {
- WorkflowRunTriggeredFrom.APP_RUN.value,
- WorkflowRunTriggeredFrom.DEBUGGING.value,
- WorkflowRunTriggeredFrom.SCHEDULE.value,
- WorkflowRunTriggeredFrom.PLUGIN.value,
- WorkflowRunTriggeredFrom.WEBHOOK.value,
- }:
- return WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
- if triggered_from in {
- WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value,
- WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value,
- }:
- return WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN.value
- return ""
-
def get_node_last_execution(
self,
tenant_id: str,
@@ -316,51 +294,16 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
session.commit()
return result.rowcount
- class RunContext(TypedDict):
- run_id: str
- tenant_id: str
- app_id: str
- workflow_id: str
- triggered_from: str
-
- @staticmethod
- def delete_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]:
+ def delete_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
"""
- Delete node executions (and offloads) for the given workflow runs using indexed columns.
-
- Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id)
- by filtering on those columns with tuple IN.
+ Delete node executions (and offloads) for the given workflow runs using workflow_run_id.
"""
- if not runs:
+ if not run_ids:
return 0, 0
- tuple_values = [
- (
- run["tenant_id"],
- run["app_id"],
- run["workflow_id"],
- DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
- run["triggered_from"]
- ),
- run["run_id"],
- )
- for run in runs
- ]
-
- node_execution_ids = session.scalars(
- select(WorkflowNodeExecutionModel.id).where(
- tuple_(
- WorkflowNodeExecutionModel.tenant_id,
- WorkflowNodeExecutionModel.app_id,
- WorkflowNodeExecutionModel.workflow_id,
- WorkflowNodeExecutionModel.triggered_from,
- WorkflowNodeExecutionModel.workflow_run_id,
- ).in_(tuple_values)
- )
- ).all()
-
- if not node_execution_ids:
- return 0, 0
+ run_ids = list(run_ids)
+ run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
+ node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_deleted = (
cast(
@@ -377,55 +320,32 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
node_executions_deleted = (
cast(
CursorResult,
- session.execute(
- delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids))
- ),
+ session.execute(delete(WorkflowNodeExecutionModel).where(run_id_filter)),
).rowcount
or 0
)
return node_executions_deleted, offloads_deleted
- @staticmethod
- def count_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]:
+ def count_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
"""
- Count node executions (and offloads) for the given workflow runs using indexed columns.
+ Count node executions (and offloads) for the given workflow runs using workflow_run_id.
"""
- if not runs:
+ if not run_ids:
return 0, 0
- tuple_values = [
- (
- run["tenant_id"],
- run["app_id"],
- run["workflow_id"],
- DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
- run["triggered_from"]
- ),
- run["run_id"],
- )
- for run in runs
- ]
- tuple_filter = tuple_(
- WorkflowNodeExecutionModel.tenant_id,
- WorkflowNodeExecutionModel.app_id,
- WorkflowNodeExecutionModel.workflow_id,
- WorkflowNodeExecutionModel.triggered_from,
- WorkflowNodeExecutionModel.workflow_run_id,
- ).in_(tuple_values)
+ run_ids = list(run_ids)
+ run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
node_executions_count = (
- session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0
+ session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0
)
+ node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_count = (
session.scalar(
select(func.count())
.select_from(WorkflowNodeExecutionOffload)
- .join(
- WorkflowNodeExecutionModel,
- WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id,
- )
- .where(tuple_filter)
+ .where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids))
)
or 0
)
diff --git a/api/services/account_service.py b/api/services/account_service.py
index 709ef749bc..35e4a505af 100644
--- a/api/services/account_service.py
+++ b/api/services/account_service.py
@@ -1381,6 +1381,11 @@ class RegisterService:
normalized_email = email.lower()
"""Invite new member"""
+ # Check workspace permission for member invitations
+ from libs.workspace_permission import check_workspace_member_invite_permission
+
+ check_workspace_member_invite_permission(tenant.id)
+
with Session(db.engine) as session:
account = AccountService.get_account_by_email_with_case_fallback(email, session=session)
diff --git a/api/services/enterprise/enterprise_service.py b/api/services/enterprise/enterprise_service.py
index c0cc0e5233..a5133dfcb4 100644
--- a/api/services/enterprise/enterprise_service.py
+++ b/api/services/enterprise/enterprise_service.py
@@ -13,6 +13,23 @@ class WebAppSettings(BaseModel):
)
+class WorkspacePermission(BaseModel):
+ workspace_id: str = Field(
+ description="The ID of the workspace.",
+ alias="workspaceId",
+ )
+ allow_member_invite: bool = Field(
+ description="Whether to allow members to invite new members to the workspace.",
+ default=False,
+ alias="allowMemberInvite",
+ )
+ allow_owner_transfer: bool = Field(
+ description="Whether to allow owners to transfer ownership of the workspace.",
+ default=False,
+ alias="allowOwnerTransfer",
+ )
+
+
class EnterpriseService:
@classmethod
def get_info(cls):
@@ -44,6 +61,16 @@ class EnterpriseService:
except ValueError as e:
raise ValueError(f"Invalid date format: {data}") from e
+ class WorkspacePermissionService:
+ @classmethod
+ def get_permission(cls, workspace_id: str):
+ if not workspace_id:
+ raise ValueError("workspace_id must be provided.")
+ data = EnterpriseRequest.send_request("GET", f"/workspaces/{workspace_id}/permission")
+ if not data or "permission" not in data:
+ raise ValueError("No data found.")
+ return WorkspacePermission.model_validate(data["permission"])
+
class WebAppAuth:
@classmethod
def is_user_allowed_to_access_webapp(cls, user_id: str, app_id: str):
diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py
index 2213169510..c3e0dce399 100644
--- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py
+++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py
@@ -10,9 +10,7 @@ from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
-from repositories.sqlalchemy_api_workflow_node_execution_repository import (
- DifyAPISQLAlchemyWorkflowNodeExecutionRepository,
-)
+from repositories.factory import DifyAPIRepositoryFactory
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from services.billing_service import BillingService, SubscriptionPlan
@@ -92,9 +90,12 @@ class WorkflowRunCleanup:
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
+ skipped_message = (
+ f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
+ )
click.echo(
click.style(
- f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)",
+ skipped_message,
fg="yellow",
)
)
@@ -255,21 +256,6 @@ class WorkflowRunCleanup:
trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session)
return trigger_repo.count_by_run_ids(run_ids)
- @staticmethod
- def _build_run_contexts(
- runs: Sequence[WorkflowRun],
- ) -> list[DifyAPISQLAlchemyWorkflowNodeExecutionRepository.RunContext]:
- return [
- {
- "run_id": run.id,
- "tenant_id": run.tenant_id,
- "app_id": run.app_id,
- "workflow_id": run.workflow_id,
- "triggered_from": run.triggered_from,
- }
- for run in runs
- ]
-
@staticmethod
def _empty_related_counts() -> dict[str, int]:
return {
@@ -293,9 +279,15 @@ class WorkflowRunCleanup:
)
def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
- run_contexts = self._build_run_contexts(runs)
- return DifyAPISQLAlchemyWorkflowNodeExecutionRepository.count_by_runs(session, run_contexts)
+ run_ids = [run.id for run in runs]
+ repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
+ session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False)
+ )
+ return repo.count_by_runs(session, run_ids)
def _delete_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
- run_contexts = self._build_run_contexts(runs)
- return DifyAPISQLAlchemyWorkflowNodeExecutionRepository.delete_by_runs(session, run_contexts)
+ run_ids = [run.id for run in runs]
+ repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
+ session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False)
+ )
+ return repo.delete_by_runs(session, run_ids)
diff --git a/api/templates/invite_member_mail_template_en-US.html b/api/templates/invite_member_mail_template_en-US.html
index a07c5f4b16..7b296519f0 100644
--- a/api/templates/invite_member_mail_template_en-US.html
+++ b/api/templates/invite_member_mail_template_en-US.html
@@ -83,7 +83,30 @@
Dear {{ to }},
{{ inviter_name }} is pleased to invite you to join our workspace on Dify, a platform specifically designed for LLM application development. On Dify, you can explore, create, and collaborate to build and operate AI applications.
Click the button below to log in to Dify and join the workspace.
- Login Here
+
Best regards,
Dify Team
diff --git a/api/templates/invite_member_mail_template_zh-CN.html b/api/templates/invite_member_mail_template_zh-CN.html
index 27709a3c6d..c05b3ddb67 100644
--- a/api/templates/invite_member_mail_template_zh-CN.html
+++ b/api/templates/invite_member_mail_template_zh-CN.html
@@ -83,7 +83,30 @@
尊敬的 {{ to }},
{{ inviter_name }} 现邀请您加入我们在 Dify 的工作区,这是一个专为 LLM 应用开发而设计的平台。在 Dify 上,您可以探索、创造和合作,构建和运营 AI 应用。
点击下方按钮即可登录 Dify 并且加入空间。
- 在此登录
+
此致,
Dify 团队
diff --git a/api/templates/register_email_when_account_exist_template_en-US.html b/api/templates/register_email_when_account_exist_template_en-US.html
index ac5042c274..e2bb99c989 100644
--- a/api/templates/register_email_when_account_exist_template_en-US.html
+++ b/api/templates/register_email_when_account_exist_template_en-US.html
@@ -115,7 +115,30 @@
We noticed you tried to sign up, but this email is already registered with an existing account.
Please log in here:
- Log In
+
If you forgot your password, you can reset it here: Reset Password
diff --git a/api/templates/register_email_when_account_exist_template_zh-CN.html b/api/templates/register_email_when_account_exist_template_zh-CN.html
index 326b58343a..6a5bbd135b 100644
--- a/api/templates/register_email_when_account_exist_template_zh-CN.html
+++ b/api/templates/register_email_when_account_exist_template_zh-CN.html
@@ -115,7 +115,30 @@
我们注意到您尝试注册,但此电子邮件已注册。
请在此登录:
- 登录
+
如果您忘记了密码,可以在此重置: 重置密码
diff --git a/api/templates/without-brand/invite_member_mail_template_en-US.html b/api/templates/without-brand/invite_member_mail_template_en-US.html
index f9157284fa..687ece617a 100644
--- a/api/templates/without-brand/invite_member_mail_template_en-US.html
+++ b/api/templates/without-brand/invite_member_mail_template_en-US.html
@@ -92,12 +92,34 @@
platform specifically designed for LLM application development. On {{application_title}}, you can explore,
create, and collaborate to build and operate AI applications.
Click the button below to log in to {{application_title}} and join the workspace.
- Login Here
+
Best regards,
{{application_title}} Team