From 47b1cd83c7a6ed08a05d0d51f8e9f18765dd24ae Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 17 Sep 2025 14:38:23 +0800 Subject: [PATCH] use DifyCoreRepositoryFactory --- .../app/apps/pipeline/pipeline_generator.py | 15 +++++++-------- .../rag_pipeline/rag_pipeline_dsl_service.py | 1 + .../rag_pipeline_transform_service.py | 1 + api/services/website_service.py | 2 ++ api/tasks/document_indexing_sync_task.py | 16 ++++++++++++++++ .../priority_rag_pipeline_run_task.py | 17 +++++++++-------- api/tasks/rag_pipeline/rag_pipeline_run_task.py | 17 +++++++++-------- 7 files changed, 45 insertions(+), 24 deletions(-) diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index 9be8ec82ce..2765b4600a 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -35,8 +35,7 @@ from core.datasource.online_drive.online_drive_plugin import OnlineDriveDatasour from core.entities.knowledge_entities import PipelineDataset, PipelineDocument from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.rag.index_processor.constant.built_in_field import BuiltInField -from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.repositories.factory import DifyCoreRepositoryFactory from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository @@ -207,14 +206,14 @@ class PipelineGenerator(BaseAppGenerator): workflow_triggered_from = WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN # Create workflow node execution repository session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, triggered_from=workflow_triggered_from, ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, @@ -434,14 +433,14 @@ class PipelineGenerator(BaseAppGenerator): # Create workflow node execution repository session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING, ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, @@ -528,14 +527,14 @@ class PipelineGenerator(BaseAppGenerator): # Create workflow node execution repository session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING, ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository( session_factory=session_factory, user=user, app_id=application_generate_entity.app_config.app_id, diff --git a/api/services/rag_pipeline/rag_pipeline_dsl_service.py b/api/services/rag_pipeline/rag_pipeline_dsl_service.py index 09a4e74640..fe92f6b084 100644 --- a/api/services/rag_pipeline/rag_pipeline_dsl_service.py +++ b/api/services/rag_pipeline/rag_pipeline_dsl_service.py @@ -411,6 +411,7 @@ class RagPipelineDslService: data=data, account=account, ) + dataset = pipeline.retrieve_dataset(session=self._session) # create dataset name = pipeline.name diff --git a/api/services/rag_pipeline/rag_pipeline_transform_service.py b/api/services/rag_pipeline/rag_pipeline_transform_service.py index 43eeb49a35..6ddb7a70ae 100644 --- a/api/services/rag_pipeline/rag_pipeline_transform_service.py +++ b/api/services/rag_pipeline/rag_pipeline_transform_service.py @@ -89,6 +89,7 @@ class RagPipelineTransformService: } def _get_transform_yaml(self, doc_form: str, datasource_type: str, indexing_technique: Optional[str]): + pipeline_yaml = {} if doc_form == "text_model": match datasource_type: case "upload_file": diff --git a/api/services/website_service.py b/api/services/website_service.py index 35a6cc52d6..7634fdd8f3 100644 --- a/api/services/website_service.py +++ b/api/services/website_service.py @@ -123,6 +123,8 @@ class WebsiteService: plugin_id = "langgenius/watercrawl_datasource" elif provider == "jinareader": plugin_id = "langgenius/jina_datasource" + else: + raise ValueError("Invalid provider") datasource_provider_service = DatasourceProviderService() credential = datasource_provider_service.get_datasource_credentials( tenant_id=tenant_id, diff --git a/api/tasks/document_indexing_sync_task.py b/api/tasks/document_indexing_sync_task.py index 226d990edb..10da9a9af4 100644 --- a/api/tasks/document_indexing_sync_task.py +++ b/api/tasks/document_indexing_sync_task.py @@ -11,6 +11,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment +from models.source import DataSourceOauthBinding logger = logging.getLogger(__name__) @@ -47,6 +48,21 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): page_type = data_source_info["type"] page_edited_time = data_source_info["last_edited_time"] + data_source_binding = ( + db.session.query(DataSourceOauthBinding) + .where( + db.and_( + DataSourceOauthBinding.tenant_id == document.tenant_id, + DataSourceOauthBinding.provider == "notion", + DataSourceOauthBinding.disabled == False, + DataSourceOauthBinding.source_info["workspace_id"] == f'"{workspace_id}"', + ) + ) + .first() + ) + if not data_source_binding: + raise ValueError("Data source binding not found.") + loader = NotionExtractor( notion_workspace_id=workspace_id, notion_obj_id=page_id, diff --git a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py index b810507387..8634530418 100644 --- a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py @@ -14,8 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker from core.app.entities.app_invoke_entities import InvokeFrom, RagPipelineGenerateEntity from core.app.entities.rag_pipeline_invoke_entities import RagPipelineInvokeEntity -from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository -from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository +from core.repositories.factory import DifyCoreRepositoryFactory from extensions.ext_database import db from models.account import Account, Tenant from models.dataset import Pipeline @@ -130,18 +129,20 @@ def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], # Create workflow repositories session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository( session_factory=session_factory, user=account, app_id=entity.app_config.app_id, triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + workflow_node_execution_repository = ( + DifyCoreRepositoryFactory.create_workflow_node_execution_repository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) ) # Set the user directly in g for preserve_flask_contexts diff --git a/api/tasks/rag_pipeline/rag_pipeline_run_task.py b/api/tasks/rag_pipeline/rag_pipeline_run_task.py index d71a305b14..cac37a565a 100644 --- a/api/tasks/rag_pipeline/rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/rag_pipeline_run_task.py @@ -14,8 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker from core.app.entities.app_invoke_entities import InvokeFrom, RagPipelineGenerateEntity from core.app.entities.rag_pipeline_invoke_entities import RagPipelineInvokeEntity -from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository -from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository +from core.repositories.factory import DifyCoreRepositoryFactory from extensions.ext_database import db from extensions.ext_redis import redis_client from models.account import Account, Tenant @@ -151,18 +150,20 @@ def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], # Create workflow repositories session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository( session_factory=session_factory, user=account, app_id=entity.app_config.app_id, triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + workflow_node_execution_repository = ( + DifyCoreRepositoryFactory.create_workflow_node_execution_repository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) ) # Set the user directly in g for preserve_flask_contexts