mirror of
https://github.com/langgenius/dify.git
synced 2026-04-29 04:26:30 +08:00
r2 transform
This commit is contained in:
parent
7f6759e0ac
commit
f4dd22b9cb
@ -11,7 +11,8 @@ from typing import Any, Literal, Optional, Union, cast, overload
|
|||||||
|
|
||||||
from flask import Flask, current_app
|
from flask import Flask, current_app
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.orm import Session, sessionmaker
|
||||||
|
|
||||||
import contexts
|
import contexts
|
||||||
from configs import dify_config
|
from configs import dify_config
|
||||||
@ -38,6 +39,7 @@ from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchem
|
|||||||
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
|
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
|
||||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||||
|
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from libs.flask_utils import preserve_flask_contexts
|
from libs.flask_utils import preserve_flask_contexts
|
||||||
from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
|
from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
|
||||||
@ -46,6 +48,7 @@ from models.enums import WorkflowRunTriggeredFrom
|
|||||||
from models.model import AppMode
|
from models.model import AppMode
|
||||||
from services.dataset_service import DocumentService
|
from services.dataset_service import DocumentService
|
||||||
from services.datasource_provider_service import DatasourceProviderService
|
from services.datasource_provider_service import DatasourceProviderService
|
||||||
|
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -275,6 +278,7 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
workflow_execution_repository: WorkflowExecutionRepository,
|
workflow_execution_repository: WorkflowExecutionRepository,
|
||||||
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
|
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
|
||||||
streaming: bool = True,
|
streaming: bool = True,
|
||||||
|
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
|
||||||
workflow_thread_pool_id: Optional[str] = None,
|
workflow_thread_pool_id: Optional[str] = None,
|
||||||
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
|
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
|
||||||
"""
|
"""
|
||||||
@ -312,6 +316,7 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
"queue_manager": queue_manager,
|
"queue_manager": queue_manager,
|
||||||
"application_generate_entity": application_generate_entity,
|
"application_generate_entity": application_generate_entity,
|
||||||
"workflow_thread_pool_id": workflow_thread_pool_id,
|
"workflow_thread_pool_id": workflow_thread_pool_id,
|
||||||
|
"variable_loader": variable_loader,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -404,6 +409,13 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
app_id=application_generate_entity.app_config.app_id,
|
app_id=application_generate_entity.app_config.app_id,
|
||||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
|
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
|
||||||
)
|
)
|
||||||
|
draft_var_srv = WorkflowDraftVariableService(db.session())
|
||||||
|
draft_var_srv.prefill_conversation_variable_default_values(workflow)
|
||||||
|
var_loader = DraftVarLoader(
|
||||||
|
engine=db.engine,
|
||||||
|
app_id=application_generate_entity.app_config.app_id,
|
||||||
|
tenant_id=application_generate_entity.app_config.tenant_id,
|
||||||
|
)
|
||||||
|
|
||||||
return self._generate(
|
return self._generate(
|
||||||
flask_app=current_app._get_current_object(), # type: ignore
|
flask_app=current_app._get_current_object(), # type: ignore
|
||||||
@ -415,6 +427,7 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
workflow_execution_repository=workflow_execution_repository,
|
workflow_execution_repository=workflow_execution_repository,
|
||||||
workflow_node_execution_repository=workflow_node_execution_repository,
|
workflow_node_execution_repository=workflow_node_execution_repository,
|
||||||
streaming=streaming,
|
streaming=streaming,
|
||||||
|
variable_loader=var_loader,
|
||||||
)
|
)
|
||||||
|
|
||||||
def single_loop_generate(
|
def single_loop_generate(
|
||||||
@ -489,6 +502,13 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
app_id=application_generate_entity.app_config.app_id,
|
app_id=application_generate_entity.app_config.app_id,
|
||||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
|
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
|
||||||
)
|
)
|
||||||
|
draft_var_srv = WorkflowDraftVariableService(db.session())
|
||||||
|
draft_var_srv.prefill_conversation_variable_default_values(workflow)
|
||||||
|
var_loader = DraftVarLoader(
|
||||||
|
engine=db.engine,
|
||||||
|
app_id=application_generate_entity.app_config.app_id,
|
||||||
|
tenant_id=application_generate_entity.app_config.tenant_id,
|
||||||
|
)
|
||||||
|
|
||||||
return self._generate(
|
return self._generate(
|
||||||
flask_app=current_app._get_current_object(), # type: ignore
|
flask_app=current_app._get_current_object(), # type: ignore
|
||||||
@ -500,6 +520,7 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
workflow_execution_repository=workflow_execution_repository,
|
workflow_execution_repository=workflow_execution_repository,
|
||||||
workflow_node_execution_repository=workflow_node_execution_repository,
|
workflow_node_execution_repository=workflow_node_execution_repository,
|
||||||
streaming=streaming,
|
streaming=streaming,
|
||||||
|
variable_loader=var_loader,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _generate_worker(
|
def _generate_worker(
|
||||||
@ -508,6 +529,7 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
application_generate_entity: RagPipelineGenerateEntity,
|
application_generate_entity: RagPipelineGenerateEntity,
|
||||||
queue_manager: AppQueueManager,
|
queue_manager: AppQueueManager,
|
||||||
context: contextvars.Context,
|
context: contextvars.Context,
|
||||||
|
variable_loader: VariableLoader,
|
||||||
workflow_thread_pool_id: Optional[str] = None,
|
workflow_thread_pool_id: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
@ -521,14 +543,41 @@ class PipelineGenerator(BaseAppGenerator):
|
|||||||
|
|
||||||
with preserve_flask_contexts(flask_app, context_vars=context):
|
with preserve_flask_contexts(flask_app, context_vars=context):
|
||||||
try:
|
try:
|
||||||
# workflow app
|
with Session(db.engine, expire_on_commit=False) as session:
|
||||||
runner = PipelineRunner(
|
workflow = session.scalar(
|
||||||
application_generate_entity=application_generate_entity,
|
select(Workflow).where(
|
||||||
queue_manager=queue_manager,
|
Workflow.tenant_id == application_generate_entity.app_config.tenant_id,
|
||||||
workflow_thread_pool_id=workflow_thread_pool_id,
|
Workflow.app_id == application_generate_entity.app_config.app_id,
|
||||||
)
|
Workflow.id == application_generate_entity.app_config.workflow_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if workflow is None:
|
||||||
|
raise ValueError("Workflow not found")
|
||||||
|
|
||||||
runner.run()
|
# Determine system_user_id based on invocation source
|
||||||
|
is_external_api_call = application_generate_entity.invoke_from in {
|
||||||
|
InvokeFrom.WEB_APP,
|
||||||
|
InvokeFrom.SERVICE_API,
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_external_api_call:
|
||||||
|
# For external API calls, use end user's session ID
|
||||||
|
end_user = session.scalar(select(EndUser).where(EndUser.id == application_generate_entity.user_id))
|
||||||
|
system_user_id = end_user.session_id if end_user else ""
|
||||||
|
else:
|
||||||
|
# For internal calls, use the original user ID
|
||||||
|
system_user_id = application_generate_entity.user_id
|
||||||
|
# workflow app
|
||||||
|
runner = PipelineRunner(
|
||||||
|
application_generate_entity=application_generate_entity,
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
workflow_thread_pool_id=workflow_thread_pool_id,
|
||||||
|
variable_loader=variable_loader,
|
||||||
|
workflow=workflow,
|
||||||
|
system_user_id=system_user_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
runner.run()
|
||||||
except GenerateTaskStoppedError:
|
except GenerateTaskStoppedError:
|
||||||
pass
|
pass
|
||||||
except InvokeAuthorizationError:
|
except InvokeAuthorizationError:
|
||||||
|
|||||||
@ -17,6 +17,7 @@ from core.workflow.enums import SystemVariableKey
|
|||||||
from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent
|
from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent
|
||||||
from core.workflow.graph_engine.entities.graph import Graph
|
from core.workflow.graph_engine.entities.graph import Graph
|
||||||
from core.workflow.system_variable import SystemVariable
|
from core.workflow.system_variable import SystemVariable
|
||||||
|
from core.workflow.variable_loader import VariableLoader
|
||||||
from core.workflow.workflow_entry import WorkflowEntry
|
from core.workflow.workflow_entry import WorkflowEntry
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from models.dataset import Document, Pipeline
|
from models.dataset import Document, Pipeline
|
||||||
@ -36,6 +37,9 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
|||||||
self,
|
self,
|
||||||
application_generate_entity: RagPipelineGenerateEntity,
|
application_generate_entity: RagPipelineGenerateEntity,
|
||||||
queue_manager: AppQueueManager,
|
queue_manager: AppQueueManager,
|
||||||
|
variable_loader: VariableLoader,
|
||||||
|
workflow: Workflow,
|
||||||
|
system_user_id: str,
|
||||||
workflow_thread_pool_id: Optional[str] = None,
|
workflow_thread_pool_id: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
@ -43,9 +47,15 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
|||||||
:param queue_manager: application queue manager
|
:param queue_manager: application queue manager
|
||||||
:param workflow_thread_pool_id: workflow thread pool id
|
:param workflow_thread_pool_id: workflow thread pool id
|
||||||
"""
|
"""
|
||||||
|
super().__init__(
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
variable_loader=variable_loader,
|
||||||
|
app_id=application_generate_entity.app_config.app_id,
|
||||||
|
)
|
||||||
self.application_generate_entity = application_generate_entity
|
self.application_generate_entity = application_generate_entity
|
||||||
self.queue_manager = queue_manager
|
|
||||||
self.workflow_thread_pool_id = workflow_thread_pool_id
|
self.workflow_thread_pool_id = workflow_thread_pool_id
|
||||||
|
self._workflow = workflow
|
||||||
|
self._sys_user_id = system_user_id
|
||||||
|
|
||||||
def _get_app_id(self) -> str:
|
def _get_app_id(self) -> str:
|
||||||
return self.application_generate_entity.app_config.app_id
|
return self.application_generate_entity.app_config.app_id
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user