Fix: release WorkflowTool database sessions promptly (#26893)

This commit is contained in:
-LAN- 2025-10-21 15:17:17 +08:00 committed by GitHub
parent fb6f05c267
commit 759a932bb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 123 additions and 65 deletions

View File

@ -434,6 +434,9 @@ CODE_EXECUTION_SSL_VERIFY=True
CODE_EXECUTION_POOL_MAX_CONNECTIONS=100 CODE_EXECUTION_POOL_MAX_CONNECTIONS=100
CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS=20 CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS=20
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY=5.0 CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY=5.0
CODE_EXECUTION_CONNECT_TIMEOUT=10
CODE_EXECUTION_READ_TIMEOUT=60
CODE_EXECUTION_WRITE_TIMEOUT=10
CODE_MAX_NUMBER=9223372036854775807 CODE_MAX_NUMBER=9223372036854775807
CODE_MIN_NUMBER=-9223372036854775808 CODE_MIN_NUMBER=-9223372036854775808
CODE_MAX_STRING_LENGTH=400000 CODE_MAX_STRING_LENGTH=400000

View File

@ -326,7 +326,8 @@ class ToolManager:
workflow_provider_stmt = select(WorkflowToolProvider).where( workflow_provider_stmt = select(WorkflowToolProvider).where(
WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == provider_id WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == provider_id
) )
workflow_provider = db.session.scalar(workflow_provider_stmt) with Session(db.engine, expire_on_commit=False) as session, session.begin():
workflow_provider = session.scalar(workflow_provider_stmt)
if workflow_provider is None: if workflow_provider is None:
raise ToolProviderNotFoundError(f"workflow provider {provider_id} not found") raise ToolProviderNotFoundError(f"workflow provider {provider_id} not found")

View File

@ -1,6 +1,7 @@
from collections.abc import Mapping from collections.abc import Mapping
from pydantic import Field from pydantic import Field
from sqlalchemy.orm import Session
from core.app.app_config.entities import VariableEntity, VariableEntityType from core.app.app_config.entities import VariableEntity, VariableEntityType
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
@ -20,6 +21,7 @@ from core.tools.entities.tool_entities import (
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
from core.tools.workflow_as_tool.tool import WorkflowTool from core.tools.workflow_as_tool.tool import WorkflowTool
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode from models.model import App, AppMode
from models.tools import WorkflowToolProvider from models.tools import WorkflowToolProvider
from models.workflow import Workflow from models.workflow import Workflow
@ -44,29 +46,34 @@ class WorkflowToolProviderController(ToolProviderController):
@classmethod @classmethod
def from_db(cls, db_provider: WorkflowToolProvider) -> "WorkflowToolProviderController": def from_db(cls, db_provider: WorkflowToolProvider) -> "WorkflowToolProviderController":
app = db_provider.app with Session(db.engine, expire_on_commit=False) as session, session.begin():
provider = session.get(WorkflowToolProvider, db_provider.id) if db_provider.id else None
if not provider:
raise ValueError("workflow provider not found")
app = session.get(App, provider.app_id)
if not app: if not app:
raise ValueError("app not found") raise ValueError("app not found")
user = session.get(Account, provider.user_id) if provider.user_id else None
controller = WorkflowToolProviderController( controller = WorkflowToolProviderController(
entity=ToolProviderEntity( entity=ToolProviderEntity(
identity=ToolProviderIdentity( identity=ToolProviderIdentity(
author=db_provider.user.name if db_provider.user_id and db_provider.user else "", author=user.name if user else "",
name=db_provider.label, name=provider.label,
label=I18nObject(en_US=db_provider.label, zh_Hans=db_provider.label), label=I18nObject(en_US=provider.label, zh_Hans=provider.label),
description=I18nObject(en_US=db_provider.description, zh_Hans=db_provider.description), description=I18nObject(en_US=provider.description, zh_Hans=provider.description),
icon=db_provider.icon, icon=provider.icon,
), ),
credentials_schema=[], credentials_schema=[],
plugin_id=None, plugin_id=None,
), ),
provider_id=db_provider.id or "", provider_id=provider.id or "",
) )
# init tools controller.tools = [
controller._get_db_provider_tool(provider, app, session=session, user=user),
controller.tools = [controller._get_db_provider_tool(db_provider, app)] ]
return controller return controller
@ -74,7 +81,14 @@ class WorkflowToolProviderController(ToolProviderController):
def provider_type(self) -> ToolProviderType: def provider_type(self) -> ToolProviderType:
return ToolProviderType.WORKFLOW return ToolProviderType.WORKFLOW
def _get_db_provider_tool(self, db_provider: WorkflowToolProvider, app: App) -> WorkflowTool: def _get_db_provider_tool(
self,
db_provider: WorkflowToolProvider,
app: App,
*,
session: Session,
user: Account | None = None,
) -> WorkflowTool:
""" """
get db provider tool get db provider tool
:param db_provider: the db provider :param db_provider: the db provider
@ -82,7 +96,7 @@ class WorkflowToolProviderController(ToolProviderController):
:return: the tool :return: the tool
""" """
workflow: Workflow | None = ( workflow: Workflow | None = (
db.session.query(Workflow) session.query(Workflow)
.where(Workflow.app_id == db_provider.app_id, Workflow.version == db_provider.version) .where(Workflow.app_id == db_provider.app_id, Workflow.version == db_provider.version)
.first() .first()
) )
@ -101,8 +115,6 @@ class WorkflowToolProviderController(ToolProviderController):
def fetch_workflow_variable(variable_name: str) -> VariableEntity | None: def fetch_workflow_variable(variable_name: str) -> VariableEntity | None:
return next(filter(lambda x: x.variable == variable_name, variables), None) return next(filter(lambda x: x.variable == variable_name, variables), None)
user = db_provider.user
workflow_tool_parameters = [] workflow_tool_parameters = []
for parameter in parameters: for parameter in parameters:
variable = fetch_workflow_variable(parameter.name) variable = fetch_workflow_variable(parameter.name)
@ -187,8 +199,9 @@ class WorkflowToolProviderController(ToolProviderController):
if self.tools is not None: if self.tools is not None:
return self.tools return self.tools
db_providers: WorkflowToolProvider | None = ( with Session(db.engine, expire_on_commit=False) as session, session.begin():
db.session.query(WorkflowToolProvider) db_provider: WorkflowToolProvider | None = (
session.query(WorkflowToolProvider)
.where( .where(
WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.tenant_id == tenant_id,
WorkflowToolProvider.app_id == self.provider_id, WorkflowToolProvider.app_id == self.provider_id,
@ -196,13 +209,15 @@ class WorkflowToolProviderController(ToolProviderController):
.first() .first()
) )
if not db_providers: if not db_provider:
return [] return []
if not db_providers.app:
app = session.get(App, db_provider.app_id)
if not app:
raise ValueError("app not found") raise ValueError("app not found")
app = db_providers.app user = session.get(Account, db_provider.user_id) if db_provider.user_id else None
self.tools = [self._get_db_provider_tool(db_providers, app)] self.tools = [self._get_db_provider_tool(db_provider, app, session=session, user=user)]
return self.tools return self.tools

View File

@ -5,6 +5,7 @@ from typing import Any
from flask import has_request_context from flask import has_request_context
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session
from core.file import FILE_MODEL_IDENTITY, File, FileTransferMethod from core.file import FILE_MODEL_IDENTITY, File, FileTransferMethod
from core.tools.__base.tool import Tool from core.tools.__base.tool import Tool
@ -179,16 +180,17 @@ class WorkflowTool(Tool):
""" """
get the workflow by app id and version get the workflow by app id and version
""" """
with Session(db.engine, expire_on_commit=False) as session, session.begin():
if not version: if not version:
workflow = ( stmt = (
db.session.query(Workflow) select(Workflow)
.where(Workflow.app_id == app_id, Workflow.version != Workflow.VERSION_DRAFT) .where(Workflow.app_id == app_id, Workflow.version != Workflow.VERSION_DRAFT)
.order_by(Workflow.created_at.desc()) .order_by(Workflow.created_at.desc())
.first()
) )
workflow = session.scalars(stmt).first()
else: else:
stmt = select(Workflow).where(Workflow.app_id == app_id, Workflow.version == version) stmt = select(Workflow).where(Workflow.app_id == app_id, Workflow.version == version)
workflow = db.session.scalar(stmt) workflow = session.scalar(stmt)
if not workflow: if not workflow:
raise ValueError("workflow not found or not published") raise ValueError("workflow not found or not published")
@ -200,7 +202,8 @@ class WorkflowTool(Tool):
get the app by app id get the app by app id
""" """
stmt = select(App).where(App.id == app_id) stmt = select(App).where(App.id == app_id)
app = db.session.scalar(stmt) with Session(db.engine, expire_on_commit=False) as session, session.begin():
app = session.scalar(stmt)
if not app: if not app:
raise ValueError("app not found") raise ValueError("app not found")

View File

@ -222,7 +222,7 @@ class WorkflowToolProvider(TypeBase):
sa.UniqueConstraint("tenant_id", "app_id", name="unique_workflow_tool_provider_app_id"), sa.UniqueConstraint("tenant_id", "app_id", name="unique_workflow_tool_provider_app_id"),
) )
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), init=False) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
# name of the workflow provider # name of the workflow provider
name: Mapped[str] = mapped_column(String(255), nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False)
# label of the workflow provider # label of the workflow provider

View File

@ -4,6 +4,7 @@ from datetime import datetime
from typing import Any from typing import Any
from sqlalchemy import or_, select from sqlalchemy import or_, select
from sqlalchemy.orm import Session
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.tools.__base.tool_provider import ToolProviderController from core.tools.__base.tool_provider import ToolProviderController
@ -13,6 +14,7 @@ from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurati
from core.tools.workflow_as_tool.provider import WorkflowToolProviderController from core.tools.workflow_as_tool.provider import WorkflowToolProviderController
from core.tools.workflow_as_tool.tool import WorkflowTool from core.tools.workflow_as_tool.tool import WorkflowTool
from extensions.ext_database import db from extensions.ext_database import db
from libs.uuid_utils import uuidv7
from models.model import App from models.model import App
from models.tools import WorkflowToolProvider from models.tools import WorkflowToolProvider
from models.workflow import Workflow from models.workflow import Workflow
@ -63,7 +65,9 @@ class WorkflowToolManageService:
if workflow is None: if workflow is None:
raise ValueError(f"Workflow not found for app {workflow_app_id}") raise ValueError(f"Workflow not found for app {workflow_app_id}")
with Session(db.engine, expire_on_commit=False) as session, session.begin():
workflow_tool_provider = WorkflowToolProvider( workflow_tool_provider = WorkflowToolProvider(
id=str(uuidv7()),
tenant_id=tenant_id, tenant_id=tenant_id,
user_id=user_id, user_id=user_id,
app_id=workflow_app_id, app_id=workflow_app_id,
@ -75,15 +79,13 @@ class WorkflowToolManageService:
privacy_policy=privacy_policy, privacy_policy=privacy_policy,
version=workflow.version, version=workflow.version,
) )
session.add(workflow_tool_provider)
try: try:
WorkflowToolProviderController.from_db(workflow_tool_provider) WorkflowToolProviderController.from_db(workflow_tool_provider)
except Exception as e: except Exception as e:
raise ValueError(str(e)) raise ValueError(str(e))
db.session.add(workflow_tool_provider)
db.session.commit()
if labels is not None: if labels is not None:
ToolLabelManager.update_tool_labels( ToolLabelManager.update_tool_labels(
ToolTransformService.workflow_provider_to_controller(workflow_tool_provider), labels ToolTransformService.workflow_provider_to_controller(workflow_tool_provider), labels
@ -168,7 +170,6 @@ class WorkflowToolManageService:
except Exception as e: except Exception as e:
raise ValueError(str(e)) raise ValueError(str(e))
db.session.add(workflow_tool_provider)
db.session.commit() db.session.commit()
if labels is not None: if labels is not None:

View File

@ -6,6 +6,7 @@ from faker import Faker
from core.tools.entities.api_entities import ToolProviderApiEntity from core.tools.entities.api_entities import ToolProviderApiEntity
from core.tools.entities.common_entities import I18nObject from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import ToolProviderType from core.tools.entities.tool_entities import ToolProviderType
from libs.uuid_utils import uuidv7
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
from services.tools.tools_transform_service import ToolTransformService from services.tools.tools_transform_service import ToolTransformService
@ -66,6 +67,7 @@ class TestToolTransformService:
) )
elif provider_type == "workflow": elif provider_type == "workflow":
provider = WorkflowToolProvider( provider = WorkflowToolProvider(
id=str(uuidv7()),
name=fake.company(), name=fake.company(),
description=fake.text(max_nb_chars=100), description=fake.text(max_nb_chars=100),
icon='{"background": "#FF6B6B", "content": "🔧"}', icon='{"background": "#FF6B6B", "content": "🔧"}',
@ -758,6 +760,7 @@ class TestToolTransformService:
# Create workflow tool provider # Create workflow tool provider
provider = WorkflowToolProvider( provider = WorkflowToolProvider(
id=str(uuidv7()),
name=fake.company(), name=fake.company(),
description=fake.text(max_nb_chars=100), description=fake.text(max_nb_chars=100),
icon='{"background": "#FF6B6B", "content": "🔧"}', icon='{"background": "#FF6B6B", "content": "🔧"}',

View File

@ -259,6 +259,18 @@ POSTGRES_MAINTENANCE_WORK_MEM=64MB
# Reference: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-EFFECTIVE-CACHE-SIZE # Reference: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-EFFECTIVE-CACHE-SIZE
POSTGRES_EFFECTIVE_CACHE_SIZE=4096MB POSTGRES_EFFECTIVE_CACHE_SIZE=4096MB
# Sets the maximum allowed duration of any statement before termination.
# Default is 60000 milliseconds.
#
# Reference: https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STATEMENT-TIMEOUT
POSTGRES_STATEMENT_TIMEOUT=60000
# Sets the maximum allowed duration of any idle in-transaction session before termination.
# Default is 60000 milliseconds.
#
# Reference: https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-IDLE-IN-TRANSACTION-SESSION-TIMEOUT
POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT=60000
# ------------------------------ # ------------------------------
# Redis Configuration # Redis Configuration
# This Redis configuration is used for caching and for pub/sub during conversation. # This Redis configuration is used for caching and for pub/sub during conversation.

View File

@ -129,6 +129,8 @@ services:
-c 'work_mem=${POSTGRES_WORK_MEM:-4MB}' -c 'work_mem=${POSTGRES_WORK_MEM:-4MB}'
-c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}' -c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}'
-c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}' -c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}'
-c 'statement_timeout=${POSTGRES_STATEMENT_TIMEOUT:-60000}'
-c 'idle_in_transaction_session_timeout=${POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT:-60000}'
volumes: volumes:
- ./volumes/db/data:/var/lib/postgresql/data - ./volumes/db/data:/var/lib/postgresql/data
healthcheck: healthcheck:

View File

@ -15,6 +15,8 @@ services:
-c 'work_mem=${POSTGRES_WORK_MEM:-4MB}' -c 'work_mem=${POSTGRES_WORK_MEM:-4MB}'
-c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}' -c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}'
-c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}' -c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}'
-c 'statement_timeout=${POSTGRES_STATEMENT_TIMEOUT:-60000}'
-c 'idle_in_transaction_session_timeout=${POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT:-60000}'
volumes: volumes:
- ${PGDATA_HOST_VOLUME:-./volumes/db/data}:/var/lib/postgresql/data - ${PGDATA_HOST_VOLUME:-./volumes/db/data}:/var/lib/postgresql/data
ports: ports:

View File

@ -68,6 +68,8 @@ x-shared-env: &shared-api-worker-env
POSTGRES_WORK_MEM: ${POSTGRES_WORK_MEM:-4MB} POSTGRES_WORK_MEM: ${POSTGRES_WORK_MEM:-4MB}
POSTGRES_MAINTENANCE_WORK_MEM: ${POSTGRES_MAINTENANCE_WORK_MEM:-64MB} POSTGRES_MAINTENANCE_WORK_MEM: ${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}
POSTGRES_EFFECTIVE_CACHE_SIZE: ${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB} POSTGRES_EFFECTIVE_CACHE_SIZE: ${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}
POSTGRES_STATEMENT_TIMEOUT: ${POSTGRES_STATEMENT_TIMEOUT:-60000}
POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT: ${POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT:-60000}
REDIS_HOST: ${REDIS_HOST:-redis} REDIS_HOST: ${REDIS_HOST:-redis}
REDIS_PORT: ${REDIS_PORT:-6379} REDIS_PORT: ${REDIS_PORT:-6379}
REDIS_USERNAME: ${REDIS_USERNAME:-} REDIS_USERNAME: ${REDIS_USERNAME:-}
@ -736,6 +738,8 @@ services:
-c 'work_mem=${POSTGRES_WORK_MEM:-4MB}' -c 'work_mem=${POSTGRES_WORK_MEM:-4MB}'
-c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}' -c 'maintenance_work_mem=${POSTGRES_MAINTENANCE_WORK_MEM:-64MB}'
-c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}' -c 'effective_cache_size=${POSTGRES_EFFECTIVE_CACHE_SIZE:-4096MB}'
-c 'statement_timeout=${POSTGRES_STATEMENT_TIMEOUT:-60000}'
-c 'idle_in_transaction_session_timeout=${POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT:-60000}'
volumes: volumes:
- ./volumes/db/data:/var/lib/postgresql/data - ./volumes/db/data:/var/lib/postgresql/data
healthcheck: healthcheck:

View File

@ -40,6 +40,18 @@ POSTGRES_MAINTENANCE_WORK_MEM=64MB
# Reference: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-EFFECTIVE-CACHE-SIZE # Reference: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-EFFECTIVE-CACHE-SIZE
POSTGRES_EFFECTIVE_CACHE_SIZE=4096MB POSTGRES_EFFECTIVE_CACHE_SIZE=4096MB
# Sets the maximum allowed duration of any statement before termination.
# Default is 60000 milliseconds.
#
# Reference: https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STATEMENT-TIMEOUT
POSTGRES_STATEMENT_TIMEOUT=60000
# Sets the maximum allowed duration of any idle in-transaction session before termination.
# Default is 60000 milliseconds.
#
# Reference: https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-IDLE-IN-TRANSACTION-SESSION-TIMEOUT
POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT=60000
# ----------------------------- # -----------------------------
# Environment Variables for redis Service # Environment Variables for redis Service
# ----------------------------- # -----------------------------