diff --git a/.github/workflows/autofix.yml b/.github/workflows/autofix.yml index 2ce8a09a7d..81392a9734 100644 --- a/.github/workflows/autofix.yml +++ b/.github/workflows/autofix.yml @@ -28,6 +28,11 @@ jobs: # Format code uv run ruff format .. + - name: count migration progress + run: | + cd api + ./cnt_base.sh + - name: ast-grep run: | uvx --from ast-grep-cli sg --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all diff --git a/.github/workflows/vdb-tests.yml b/.github/workflows/vdb-tests.yml index f54f5d6c64..e33fbb209e 100644 --- a/.github/workflows/vdb-tests.yml +++ b/.github/workflows/vdb-tests.yml @@ -1,7 +1,10 @@ name: Run VDB Tests on: - workflow_call: + push: + branches: [main] + paths: + - 'api/core/rag/*.py' concurrency: group: vdb-tests-${{ github.head_ref || github.run_id }} diff --git a/api/.env.example b/api/.env.example index b1ac15d25b..5713095374 100644 --- a/api/.env.example +++ b/api/.env.example @@ -159,8 +159,7 @@ SUPABASE_URL=your-server-url # CORS configuration WEB_API_CORS_ALLOW_ORIGINS=http://localhost:3000,* CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,* -# Set COOKIE_DOMAIN when the console frontend and API are on different subdomains. -# Provide the registrable domain (e.g. example.com); leading dots are optional. +# When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the site’s top-level domain (e.g., `example.com`). Leading dots are optional. COOKIE_DOMAIN= # Vector database configuration diff --git a/api/README.md b/api/README.md index 45dad07af0..7809ea8a3d 100644 --- a/api/README.md +++ b/api/README.md @@ -26,6 +26,10 @@ cp .env.example .env ``` +> [!IMPORTANT] +> +> When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the site’s top-level domain (e.g., `example.com`). The frontend and backend must be under the same top-level domain in order to share authentication cookies. + 1. Generate a `SECRET_KEY` in the `.env` file. bash for Linux diff --git a/api/cnt_base.sh b/api/cnt_base.sh new file mode 100755 index 0000000000..9e407f3584 --- /dev/null +++ b/api/cnt_base.sh @@ -0,0 +1,7 @@ +#!/bin/bash +set -euxo pipefail + +for pattern in "Base" "TypeBase"; do + printf "%s " "$pattern" + grep "($pattern):" -r --include='*.py' --exclude-dir=".venv" --exclude-dir="tests" . | wc -l +done diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 7071a1f33a..98e1a20044 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -192,7 +192,6 @@ class GraphEngine: self._dispatcher = Dispatcher( event_queue=self._event_queue, event_handler=self._event_handler_registry, - event_collector=self._event_manager, execution_coordinator=self._execution_coordinator, event_emitter=self._event_manager, ) diff --git a/api/core/workflow/graph_engine/orchestration/dispatcher.py b/api/core/workflow/graph_engine/orchestration/dispatcher.py index 4097cead9c..334a3f77bf 100644 --- a/api/core/workflow/graph_engine/orchestration/dispatcher.py +++ b/api/core/workflow/graph_engine/orchestration/dispatcher.py @@ -43,7 +43,6 @@ class Dispatcher: self, event_queue: queue.Queue[GraphNodeEventBase], event_handler: "EventHandler", - event_collector: EventManager, execution_coordinator: ExecutionCoordinator, event_emitter: EventManager | None = None, ) -> None: @@ -53,13 +52,11 @@ class Dispatcher: Args: event_queue: Queue of events from workers event_handler: Event handler registry for processing events - event_collector: Event manager for collecting unhandled events execution_coordinator: Coordinator for execution flow event_emitter: Optional event manager to signal completion """ self._event_queue = event_queue self._event_handler = event_handler - self._event_collector = event_collector self._execution_coordinator = execution_coordinator self._event_emitter = event_emitter @@ -86,37 +83,31 @@ class Dispatcher: def _dispatcher_loop(self) -> None: """Main dispatcher loop.""" try: + self._process_commands() while not self._stop_event.is_set(): - commands_checked = False - should_check_commands = False - should_break = False + if ( + self._execution_coordinator.aborted + or self._execution_coordinator.paused + or self._execution_coordinator.execution_complete + ): + break - if self._execution_coordinator.is_execution_complete(): - should_check_commands = True - should_break = True - else: - # Check for scaling - self._execution_coordinator.check_scaling() + self._execution_coordinator.check_scaling() + try: + event = self._event_queue.get(timeout=0.1) + self._event_handler.dispatch(event) + self._event_queue.task_done() + self._process_commands(event) + except queue.Empty: + time.sleep(0.1) - # Process events - try: - event = self._event_queue.get(timeout=0.1) - # Route to the event handler - self._event_handler.dispatch(event) - should_check_commands = self._should_check_commands(event) - self._event_queue.task_done() - except queue.Empty: - # Process commands even when no new events arrive so abort requests are not missed - should_check_commands = True - time.sleep(0.1) - - if should_check_commands and not commands_checked: - self._execution_coordinator.check_commands() - commands_checked = True - - if should_break: - if not commands_checked: - self._execution_coordinator.check_commands() + self._process_commands() + while True: + try: + event = self._event_queue.get(block=False) + self._event_handler.dispatch(event) + self._event_queue.task_done() + except queue.Empty: break except Exception as e: @@ -129,6 +120,6 @@ class Dispatcher: if self._event_emitter: self._event_emitter.mark_complete() - def _should_check_commands(self, event: GraphNodeEventBase) -> bool: - """Return True if the event represents a node completion.""" - return isinstance(event, self._COMMAND_TRIGGER_EVENTS) + def _process_commands(self, event: GraphNodeEventBase | None = None): + if event is None or isinstance(event, self._COMMAND_TRIGGER_EVENTS): + self._execution_coordinator.process_commands() diff --git a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py index a3162de244..e8e8f9f16c 100644 --- a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py +++ b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py @@ -40,7 +40,7 @@ class ExecutionCoordinator: self._command_processor = command_processor self._worker_pool = worker_pool - def check_commands(self) -> None: + def process_commands(self) -> None: """Process any pending commands.""" self._command_processor.process_commands() @@ -48,24 +48,16 @@ class ExecutionCoordinator: """Check and perform worker scaling if needed.""" self._worker_pool.check_and_scale() - def is_execution_complete(self) -> bool: - """ - Check if execution is complete. - - Returns: - True if execution is complete - """ - # Treat paused, aborted, or failed executions as terminal states - if self._graph_execution.is_paused: - return True - - if self._graph_execution.aborted or self._graph_execution.has_error: - return True - + @property + def execution_complete(self): return self._state_manager.is_execution_complete() @property - def is_paused(self) -> bool: + def aborted(self): + return self._graph_execution.aborted or self._graph_execution.has_error + + @property + def paused(self) -> bool: """Expose whether the underlying graph execution is paused.""" return self._graph_execution.is_paused diff --git a/api/models/dataset.py b/api/models/dataset.py index 33d396aeb9..4470d11355 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -225,7 +225,7 @@ class Dataset(Base): ExternalKnowledgeApis.id == external_knowledge_binding.external_knowledge_api_id ) ) - if not external_knowledge_api: + if external_knowledge_api is None or external_knowledge_api.settings is None: return None return { "external_knowledge_id": external_knowledge_binding.external_knowledge_id, @@ -945,18 +945,20 @@ class DatasetQuery(Base): created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp()) -class DatasetKeywordTable(Base): +class DatasetKeywordTable(TypeBase): __tablename__ = "dataset_keyword_tables" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="dataset_keyword_table_pkey"), sa.Index("dataset_keyword_table_dataset_id_idx", "dataset_id"), ) - id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()")) - dataset_id = mapped_column(StringUUID, nullable=False, unique=True) - keyword_table = mapped_column(sa.Text, nullable=False) - data_source_type = mapped_column( - String(255), nullable=False, server_default=sa.text("'database'::character varying") + id: Mapped[str] = mapped_column( + StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False + ) + dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False, unique=True) + keyword_table: Mapped[str] = mapped_column(sa.Text, nullable=False) + data_source_type: Mapped[str] = mapped_column( + String(255), nullable=False, server_default=sa.text("'database'::character varying"), default="database" ) @property @@ -1054,19 +1056,23 @@ class TidbAuthBinding(Base): created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) -class Whitelist(Base): +class Whitelist(TypeBase): __tablename__ = "whitelists" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="whitelists_pkey"), sa.Index("whitelists_tenant_idx", "tenant_id"), ) - id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()")) - tenant_id = mapped_column(StringUUID, nullable=True) + id: Mapped[str] = mapped_column( + StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False + ) + tenant_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) category: Mapped[str] = mapped_column(String(255), nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), init=False + ) -class DatasetPermission(Base): +class DatasetPermission(TypeBase): __tablename__ = "dataset_permissions" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="dataset_permission_pkey"), @@ -1075,15 +1081,21 @@ class DatasetPermission(Base): sa.Index("idx_dataset_permissions_tenant_id", "tenant_id"), ) - id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True) - dataset_id = mapped_column(StringUUID, nullable=False) - account_id = mapped_column(StringUUID, nullable=False) - tenant_id = mapped_column(StringUUID, nullable=False) - has_permission: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true")) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + id: Mapped[str] = mapped_column( + StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True, init=False + ) + dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + account_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + has_permission: Mapped[bool] = mapped_column( + sa.Boolean, nullable=False, server_default=sa.text("true"), default=True + ) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), init=False + ) -class ExternalKnowledgeApis(Base): +class ExternalKnowledgeApis(TypeBase): __tablename__ = "external_knowledge_apis" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="external_knowledge_apis_pkey"), @@ -1091,16 +1103,20 @@ class ExternalKnowledgeApis(Base): sa.Index("external_knowledge_apis_name_idx", "name"), ) - id = mapped_column(StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()")) + id: Mapped[str] = mapped_column( + StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()"), init=False + ) name: Mapped[str] = mapped_column(String(255), nullable=False) description: Mapped[str] = mapped_column(String(255), nullable=False) - tenant_id = mapped_column(StringUUID, nullable=False) - settings = mapped_column(sa.Text, nullable=True) - created_by = mapped_column(StringUUID, nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_by = mapped_column(StringUUID, nullable=True) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + settings: Mapped[str | None] = mapped_column(sa.Text, nullable=True) + created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), init=False + ) + updated_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True) updated_at: Mapped[datetime] = mapped_column( - DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False ) def to_dict(self) -> dict[str, Any]: @@ -1178,7 +1194,7 @@ class DatasetAutoDisableLog(Base): ) -class RateLimitLog(Base): +class RateLimitLog(TypeBase): __tablename__ = "rate_limit_logs" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="rate_limit_log_pkey"), @@ -1186,12 +1202,12 @@ class RateLimitLog(Base): sa.Index("rate_limit_log_operation_idx", "operation"), ) - id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) - tenant_id = mapped_column(StringUUID, nullable=False) + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), init=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) subscription_plan: Mapped[str] = mapped_column(String(255), nullable=False) operation: Mapped[str] = mapped_column(String(255), nullable=False) created_at: Mapped[datetime] = mapped_column( - DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)") + DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)"), init=False ) diff --git a/api/models/trigger.py b/api/models/trigger.py index c2b66ace46..b537f0cf3f 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -14,7 +14,7 @@ from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEnt from core.trigger.entities.entities import Subscription from core.trigger.utils.endpoint import generate_plugin_trigger_endpoint_url, generate_webhook_trigger_endpoint from libs.datetime_utils import naive_utc_now -from models.base import Base +from models.base import Base, TypeBase from models.engine import db from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus from models.model import Account @@ -399,7 +399,7 @@ class AppTrigger(Base): ) -class WorkflowSchedulePlan(Base): +class WorkflowSchedulePlan(TypeBase): """ Workflow Schedule Configuration @@ -425,7 +425,7 @@ class WorkflowSchedulePlan(Base): sa.Index("workflow_schedule_plan_next_idx", "next_run_at"), ) - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuidv7()"), init=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) @@ -436,9 +436,11 @@ class WorkflowSchedulePlan(Base): # Schedule control next_run_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), init=False + ) updated_at: Mapped[datetime] = mapped_column( - DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False ) def to_dict(self) -> dict[str, Any]: diff --git a/api/services/external_knowledge_service.py b/api/services/external_knowledge_service.py index 5cd3b471f9..0c2a80d067 100644 --- a/api/services/external_knowledge_service.py +++ b/api/services/external_knowledge_service.py @@ -62,7 +62,7 @@ class ExternalDatasetService: tenant_id=tenant_id, created_by=user_id, updated_by=user_id, - name=args.get("name"), + name=str(args.get("name")), description=args.get("description", ""), settings=json.dumps(args.get("settings"), ensure_ascii=False), ) @@ -163,7 +163,7 @@ class ExternalDatasetService: external_knowledge_api = ( db.session.query(ExternalKnowledgeApis).filter_by(id=external_knowledge_api_id, tenant_id=tenant_id).first() ) - if external_knowledge_api is None: + if external_knowledge_api is None or external_knowledge_api.settings is None: raise ValueError("api template not found") settings = json.loads(external_knowledge_api.settings) for setting in settings: @@ -290,7 +290,7 @@ class ExternalDatasetService: .filter_by(id=external_knowledge_binding.external_knowledge_api_id) .first() ) - if not external_knowledge_api: + if external_knowledge_api is None or external_knowledge_api.settings is None: raise ValueError("external api template not found") settings = json.loads(external_knowledge_api.settings) diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index a9907ac981..de099c3e96 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -13,13 +13,13 @@ from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker from configs import dify_config -from core.app.apps.workflow.app_generator import WorkflowAppGenerator +from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY, WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.layers.timeslice_layer import TimeSliceLayer from core.app.layers.trigger_post_layer import TriggerPostLayer from extensions.ext_database import db from models.account import Account -from models.enums import CreatorUserRole, WorkflowTriggerStatus +from models.enums import AppTriggerType, CreatorUserRole, WorkflowTriggerStatus from models.model import App, EndUser, Tenant from models.trigger import WorkflowTriggerLog from models.workflow import Workflow @@ -81,6 +81,19 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]): ) +def _build_generator_args(trigger_data: TriggerData) -> dict[str, Any]: + """Build args passed into WorkflowAppGenerator.generate for Celery executions.""" + args: dict[str, Any] = { + "inputs": dict(trigger_data.inputs), + "files": list(trigger_data.files), + } + + if trigger_data.trigger_type == AppTriggerType.TRIGGER_WEBHOOK: + args[SKIP_PREPARE_USER_INPUTS_KEY] = True # Webhooks already provide structured inputs + + return args + + def _execute_workflow_common( task_data: WorkflowTaskData, cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler, @@ -128,7 +141,7 @@ def _execute_workflow_common( generator = WorkflowAppGenerator() # Prepare args matching AppGenerateService.generate format - args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)} + args = _build_generator_args(trigger_data) # If workflow_id was specified, add it to args if trigger_data.workflow_id: diff --git a/api/tasks/batch_clean_document_task.py b/api/tasks/batch_clean_document_task.py index 447443703a..3e1bd16cc7 100644 --- a/api/tasks/batch_clean_document_task.py +++ b/api/tasks/batch_clean_document_task.py @@ -9,7 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from core.tools.utils.web_reader_tool import get_image_upload_file_ids from extensions.ext_database import db from extensions.ext_storage import storage -from models.dataset import Dataset, DocumentSegment +from models.dataset import Dataset, DatasetMetadataBinding, DocumentSegment from models.model import UploadFile logger = logging.getLogger(__name__) @@ -37,6 +37,11 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form if not dataset: raise Exception("Document has no dataset") + db.session.query(DatasetMetadataBinding).where( + DatasetMetadataBinding.dataset_id == dataset_id, + DatasetMetadataBinding.document_id.in_(document_ids), + ).delete(synchronize_session=False) + segments = db.session.scalars( select(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids)) ).all() @@ -71,7 +76,8 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form except Exception: logger.exception("Delete file failed when document deleted, file_id: %s", file.id) db.session.delete(file) - db.session.commit() + + db.session.commit() end_at = time.perf_counter() logger.info( diff --git a/api/tests/unit_tests/core/workflow/graph_engine/orchestration/test_dispatcher.py b/api/tests/unit_tests/core/workflow/graph_engine/orchestration/test_dispatcher.py new file mode 100644 index 0000000000..e6d4508fdf --- /dev/null +++ b/api/tests/unit_tests/core/workflow/graph_engine/orchestration/test_dispatcher.py @@ -0,0 +1,189 @@ +"""Tests for dispatcher command checking behavior.""" + +from __future__ import annotations + +import queue +from datetime import datetime +from unittest import mock + +from core.workflow.entities.pause_reason import SchedulingPause +from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus +from core.workflow.graph_engine.event_management.event_handlers import EventHandler +from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher +from core.workflow.graph_engine.orchestration.execution_coordinator import ExecutionCoordinator +from core.workflow.graph_events import ( + GraphNodeEventBase, + NodeRunPauseRequestedEvent, + NodeRunStartedEvent, + NodeRunSucceededEvent, +) +from core.workflow.node_events import NodeRunResult + + +def test_dispatcher_should_consume_remains_events_after_pause(): + event_queue = queue.Queue() + event_queue.put( + GraphNodeEventBase( + id="test", + node_id="test", + node_type=NodeType.START, + ) + ) + event_handler = mock.Mock(spec=EventHandler) + execution_coordinator = mock.Mock(spec=ExecutionCoordinator) + execution_coordinator.paused.return_value = True + dispatcher = Dispatcher( + event_queue=event_queue, + event_handler=event_handler, + execution_coordinator=execution_coordinator, + ) + dispatcher._dispatcher_loop() + assert event_queue.empty() + + +class _StubExecutionCoordinator: + """Stub execution coordinator that tracks command checks.""" + + def __init__(self) -> None: + self.command_checks = 0 + self.scaling_checks = 0 + self.execution_complete = False + self.failed = False + self._paused = False + + def process_commands(self) -> None: + self.command_checks += 1 + + def check_scaling(self) -> None: + self.scaling_checks += 1 + + @property + def paused(self) -> bool: + return self._paused + + @property + def aborted(self) -> bool: + return False + + def mark_complete(self) -> None: + self.execution_complete = True + + def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests + self.failed = True + + +class _StubEventHandler: + """Minimal event handler that marks execution complete after handling an event.""" + + def __init__(self, coordinator: _StubExecutionCoordinator) -> None: + self._coordinator = coordinator + self.events = [] + + def dispatch(self, event) -> None: + self.events.append(event) + self._coordinator.mark_complete() + + +def _run_dispatcher_for_event(event) -> int: + """Run the dispatcher loop for a single event and return command check count.""" + event_queue: queue.Queue = queue.Queue() + event_queue.put(event) + + coordinator = _StubExecutionCoordinator() + event_handler = _StubEventHandler(coordinator) + + dispatcher = Dispatcher( + event_queue=event_queue, + event_handler=event_handler, + execution_coordinator=coordinator, + ) + + dispatcher._dispatcher_loop() + + return coordinator.command_checks + + +def _make_started_event() -> NodeRunStartedEvent: + return NodeRunStartedEvent( + id="start-event", + node_id="node-1", + node_type=NodeType.CODE, + node_title="Test Node", + start_at=datetime.utcnow(), + ) + + +def _make_succeeded_event() -> NodeRunSucceededEvent: + return NodeRunSucceededEvent( + id="success-event", + node_id="node-1", + node_type=NodeType.CODE, + node_title="Test Node", + start_at=datetime.utcnow(), + node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED), + ) + + +def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None: + """Dispatcher polls commands when idle and after completion events.""" + started_checks = _run_dispatcher_for_event(_make_started_event()) + succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event()) + + assert started_checks == 2 + assert succeeded_checks == 3 + + +class _PauseStubEventHandler: + """Minimal event handler that marks execution complete after handling an event.""" + + def __init__(self, coordinator: _StubExecutionCoordinator) -> None: + self._coordinator = coordinator + self.events = [] + + def dispatch(self, event) -> None: + self.events.append(event) + if isinstance(event, NodeRunPauseRequestedEvent): + self._coordinator.mark_complete() + + +def test_dispatcher_drain_event_queue(): + events = [ + NodeRunStartedEvent( + id="start-event", + node_id="node-1", + node_type=NodeType.CODE, + node_title="Code", + start_at=datetime.utcnow(), + ), + NodeRunPauseRequestedEvent( + id="pause-event", + node_id="node-1", + node_type=NodeType.CODE, + reason=SchedulingPause(message="test pause"), + ), + NodeRunSucceededEvent( + id="success-event", + node_id="node-1", + node_type=NodeType.CODE, + start_at=datetime.utcnow(), + node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED), + ), + ] + + event_queue: queue.Queue = queue.Queue() + for e in events: + event_queue.put(e) + + coordinator = _StubExecutionCoordinator() + event_handler = _PauseStubEventHandler(coordinator) + + dispatcher = Dispatcher( + event_queue=event_queue, + event_handler=event_handler, + execution_coordinator=coordinator, + ) + + dispatcher._dispatcher_loop() + + # ensure all events are drained. + assert event_queue.empty() diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py b/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py index b29baf5a9f..868edf9832 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py @@ -3,13 +3,17 @@ import time from unittest.mock import MagicMock +from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.entities.graph_init_params import GraphInitParams from core.workflow.entities.pause_reason import SchedulingPause from core.workflow.graph import Graph from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType, PauseCommand from core.workflow.graph_events import GraphRunAbortedEvent, GraphRunPausedEvent, GraphRunStartedEvent +from core.workflow.nodes.start.start_node import StartNode from core.workflow.runtime import GraphRuntimeState, VariablePool +from models.enums import UserFrom def test_abort_command(): @@ -26,11 +30,23 @@ def test_abort_command(): mock_graph.root_node.id = "start" # Create mock nodes with required attributes - using shared runtime state - mock_start_node = MagicMock() - mock_start_node.state = None - mock_start_node.id = "start" - mock_start_node.graph_runtime_state = shared_runtime_state # Use shared instance - mock_graph.nodes["start"] = mock_start_node + start_node = StartNode( + id="start", + config={"id": "start"}, + graph_init_params=GraphInitParams( + tenant_id="test_tenant", + app_id="test_app", + workflow_id="test_workflow", + graph_config={}, + user_id="test_user", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ), + graph_runtime_state=shared_runtime_state, + ) + start_node.init_node_data({"title": "start", "variables": []}) + mock_graph.nodes["start"] = start_node # Mock graph methods mock_graph.get_outgoing_edges = MagicMock(return_value=[]) @@ -124,11 +140,23 @@ def test_pause_command(): mock_graph.root_node = MagicMock() mock_graph.root_node.id = "start" - mock_start_node = MagicMock() - mock_start_node.state = None - mock_start_node.id = "start" - mock_start_node.graph_runtime_state = shared_runtime_state - mock_graph.nodes["start"] = mock_start_node + start_node = StartNode( + id="start", + config={"id": "start"}, + graph_init_params=GraphInitParams( + tenant_id="test_tenant", + app_id="test_app", + workflow_id="test_workflow", + graph_config={}, + user_id="test_user", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ), + graph_runtime_state=shared_runtime_state, + ) + start_node.init_node_data({"title": "start", "variables": []}) + mock_graph.nodes["start"] = start_node mock_graph.get_outgoing_edges = MagicMock(return_value=[]) mock_graph.get_incoming_edges = MagicMock(return_value=[]) @@ -153,5 +181,5 @@ def test_pause_command(): assert pause_events[0].reason == SchedulingPause(message="User requested pause") graph_execution = engine.graph_runtime_state.graph_execution - assert graph_execution.is_paused + assert graph_execution.paused assert graph_execution.pause_reason == SchedulingPause(message="User requested pause") diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_dispatcher.py b/api/tests/unit_tests/core/workflow/graph_engine/test_dispatcher.py deleted file mode 100644 index 3fe4ce3400..0000000000 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_dispatcher.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Tests for dispatcher command checking behavior.""" - -from __future__ import annotations - -import queue -from datetime import datetime - -from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus -from core.workflow.graph_engine.event_management.event_manager import EventManager -from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher -from core.workflow.graph_events import NodeRunStartedEvent, NodeRunSucceededEvent -from core.workflow.node_events import NodeRunResult - - -class _StubExecutionCoordinator: - """Stub execution coordinator that tracks command checks.""" - - def __init__(self) -> None: - self.command_checks = 0 - self.scaling_checks = 0 - self._execution_complete = False - self.mark_complete_called = False - self.failed = False - self._paused = False - - def check_commands(self) -> None: - self.command_checks += 1 - - def check_scaling(self) -> None: - self.scaling_checks += 1 - - @property - def is_paused(self) -> bool: - return self._paused - - def is_execution_complete(self) -> bool: - return self._execution_complete - - def mark_complete(self) -> None: - self.mark_complete_called = True - - def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests - self.failed = True - - def set_execution_complete(self) -> None: - self._execution_complete = True - - -class _StubEventHandler: - """Minimal event handler that marks execution complete after handling an event.""" - - def __init__(self, coordinator: _StubExecutionCoordinator) -> None: - self._coordinator = coordinator - self.events = [] - - def dispatch(self, event) -> None: - self.events.append(event) - self._coordinator.set_execution_complete() - - -def _run_dispatcher_for_event(event) -> int: - """Run the dispatcher loop for a single event and return command check count.""" - event_queue: queue.Queue = queue.Queue() - event_queue.put(event) - - coordinator = _StubExecutionCoordinator() - event_handler = _StubEventHandler(coordinator) - event_manager = EventManager() - - dispatcher = Dispatcher( - event_queue=event_queue, - event_handler=event_handler, - event_collector=event_manager, - execution_coordinator=coordinator, - ) - - dispatcher._dispatcher_loop() - - return coordinator.command_checks - - -def _make_started_event() -> NodeRunStartedEvent: - return NodeRunStartedEvent( - id="start-event", - node_id="node-1", - node_type=NodeType.CODE, - node_title="Test Node", - start_at=datetime.utcnow(), - ) - - -def _make_succeeded_event() -> NodeRunSucceededEvent: - return NodeRunSucceededEvent( - id="success-event", - node_id="node-1", - node_type=NodeType.CODE, - node_title="Test Node", - start_at=datetime.utcnow(), - node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED), - ) - - -def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None: - """Dispatcher polls commands when idle and after completion events.""" - started_checks = _run_dispatcher_for_event(_make_started_event()) - succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event()) - - assert started_checks == 1 - assert succeeded_checks == 2 diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_execution_coordinator.py b/api/tests/unit_tests/core/workflow/graph_engine/test_execution_coordinator.py index 025393e435..0d67a76169 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_execution_coordinator.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_execution_coordinator.py @@ -48,15 +48,3 @@ def test_handle_pause_noop_when_execution_running() -> None: worker_pool.stop.assert_not_called() state_manager.clear_executing.assert_not_called() - - -def test_is_execution_complete_when_paused() -> None: - """Paused execution should be treated as complete.""" - graph_execution = GraphExecution(workflow_id="workflow") - graph_execution.start() - graph_execution.pause("Awaiting input") - - coordinator, state_manager, _worker_pool = _build_coordinator(graph_execution) - state_manager.is_execution_complete.return_value = False - - assert coordinator.is_execution_complete() diff --git a/api/tests/unit_tests/tasks/test_async_workflow_tasks.py b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py new file mode 100644 index 0000000000..3923e256a6 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_async_workflow_tasks.py @@ -0,0 +1,37 @@ +from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY +from models.enums import AppTriggerType, WorkflowRunTriggeredFrom +from services.workflow.entities import TriggerData, WebhookTriggerData +from tasks import async_workflow_tasks + + +def test_build_generator_args_sets_skip_flag_for_webhook(): + trigger_data = WebhookTriggerData( + app_id="app", + tenant_id="tenant", + workflow_id="workflow", + root_node_id="node", + inputs={"webhook_data": {"body": {"foo": "bar"}}}, + ) + + args = async_workflow_tasks._build_generator_args(trigger_data) + + assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True + assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar" + + +def test_build_generator_args_keeps_validation_for_other_triggers(): + trigger_data = TriggerData( + app_id="app", + tenant_id="tenant", + workflow_id="workflow", + root_node_id="node", + inputs={"foo": "bar"}, + files=[], + trigger_type=AppTriggerType.TRIGGER_SCHEDULE, + trigger_from=WorkflowRunTriggeredFrom.SCHEDULE, + ) + + args = async_workflow_tasks._build_generator_args(trigger_data) + + assert SKIP_PREPARE_USER_INPUTS_KEY not in args + assert args["inputs"] == {"foo": "bar"} diff --git a/docker/.env.example b/docker/.env.example index 519f4aa3e0..5cb948d835 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -365,10 +365,9 @@ WEB_API_CORS_ALLOW_ORIGINS=* # Specifies the allowed origins for cross-origin requests to the console API, # e.g. https://cloud.dify.ai or * for all origins. CONSOLE_CORS_ALLOW_ORIGINS=* -# Set COOKIE_DOMAIN when the console frontend and API are on different subdomains. -# Provide the registrable domain (e.g. example.com); leading dots are optional. +# When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the site’s top-level domain (e.g., `example.com`). Leading dots are optional. COOKIE_DOMAIN= -# The frontend reads NEXT_PUBLIC_COOKIE_DOMAIN to align cookie handling with the API. +# When the frontend and backend run on different subdomains, set NEXT_PUBLIC_COOKIE_DOMAIN=1. NEXT_PUBLIC_COOKIE_DOMAIN= # ------------------------------ diff --git a/web/.env.example b/web/.env.example index 5bfcc9dac0..eff6f77fd9 100644 --- a/web/.env.example +++ b/web/.env.example @@ -12,6 +12,9 @@ NEXT_PUBLIC_API_PREFIX=http://localhost:5001/console/api # console or api domain. # example: http://udify.app/api NEXT_PUBLIC_PUBLIC_API_PREFIX=http://localhost:5001/api +# When the frontend and backend run on different subdomains, set NEXT_PUBLIC_COOKIE_DOMAIN=1. +NEXT_PUBLIC_COOKIE_DOMAIN= + # The API PREFIX for MARKETPLACE NEXT_PUBLIC_MARKETPLACE_API_PREFIX=https://marketplace.dify.ai/api/v1 # The URL for MARKETPLACE @@ -34,9 +37,6 @@ NEXT_PUBLIC_CSP_WHITELIST= # Default is not allow to embed into iframe to prevent Clickjacking: https://owasp.org/www-community/attacks/Clickjacking NEXT_PUBLIC_ALLOW_EMBED= -# Shared cookie domain when console UI and API use different subdomains (e.g. example.com) -NEXT_PUBLIC_COOKIE_DOMAIN= - # Allow rendering unsafe URLs which have "data:" scheme. NEXT_PUBLIC_ALLOW_UNSAFE_DATA_SCHEME=false diff --git a/web/README.md b/web/README.md index a47cfab041..6daf1e922e 100644 --- a/web/README.md +++ b/web/README.md @@ -32,6 +32,7 @@ NEXT_PUBLIC_EDITION=SELF_HOSTED # different from api or web app domain. # example: http://cloud.dify.ai/console/api NEXT_PUBLIC_API_PREFIX=http://localhost:5001/console/api +NEXT_PUBLIC_COOKIE_DOMAIN= # The URL for Web APP, refers to the Web App base URL of WEB service if web app domain is different from # console or api domain. # example: http://udify.app/api @@ -41,6 +42,11 @@ NEXT_PUBLIC_PUBLIC_API_PREFIX=http://localhost:5001/api NEXT_PUBLIC_SENTRY_DSN= ``` +> [!IMPORTANT] +> +> 1. When the frontend and backend run on different subdomains, set NEXT_PUBLIC_COOKIE_DOMAIN=1. The frontend and backend must be under the same top-level domain in order to share authentication cookies. +> 1. It's necessary to set NEXT_PUBLIC_API_PREFIX and NEXT_PUBLIC_PUBLIC_API_PREFIX to the correct backend API URL. + Finally, run the development server: ```bash diff --git a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/card-view.tsx b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/card-view.tsx index 57f3ef6881..fb431c5ac8 100644 --- a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/card-view.tsx +++ b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/card-view.tsx @@ -1,6 +1,6 @@ 'use client' import type { FC } from 'react' -import React, { useMemo } from 'react' +import React, { useCallback, useMemo } from 'react' import { useTranslation } from 'react-i18next' import { useContext } from 'use-context-selector' import AppCard from '@/app/components/app/overview/app-card' @@ -24,6 +24,7 @@ import { useStore as useAppStore } from '@/app/components/app/store' import { useAppWorkflow } from '@/service/use-workflow' import type { BlockEnum } from '@/app/components/workflow/types' import { isTriggerNode } from '@/app/components/workflow/types' +import { useDocLink } from '@/context/i18n' export type ICardViewProps = { appId: string @@ -33,6 +34,7 @@ export type ICardViewProps = { const CardView: FC = ({ appId, isInPanel, className }) => { const { t } = useTranslation() + const docLink = useDocLink() const { notify } = useContext(ToastContext) const appDetail = useAppStore(state => state.appDetail) const setAppDetail = useAppStore(state => state.setAppDetail) @@ -53,6 +55,35 @@ const CardView: FC = ({ appId, isInPanel, className }) => { }) }, [isWorkflowApp, currentWorkflow]) const shouldRenderAppCards = !isWorkflowApp || hasTriggerNode === false + const disableAppCards = !shouldRenderAppCards + + const triggerDocUrl = docLink('/guides/workflow/node/start') + const buildTriggerModeMessage = useCallback((featureName: string) => ( +
+
+ {t('appOverview.overview.disableTooltip.triggerMode', { feature: featureName })} +
+
{ + event.stopPropagation() + window.open(triggerDocUrl, '_blank') + }} + > + {t('appOverview.overview.appInfo.enableTooltip.learnMore')} +
+
+ ), [t, triggerDocUrl]) + + const disableWebAppTooltip = disableAppCards + ? buildTriggerModeMessage(t('appOverview.overview.appInfo.title')) + : null + const disableApiTooltip = disableAppCards + ? buildTriggerModeMessage(t('appOverview.overview.apiInfo.title')) + : null + const disableMcpTooltip = disableAppCards + ? buildTriggerModeMessage(t('tools.mcp.server.title')) + : null const updateAppDetail = async () => { try { @@ -124,39 +155,48 @@ const CardView: FC = ({ appId, isInPanel, className }) => { if (!appDetail) return - return ( -
- { - shouldRenderAppCards && ( - <> - - - {showMCPCard && ( - - )} - - ) - } - {showTriggerCard && ( - + + + {showMCPCard && ( + )} + + ) + + const triggerCardNode = showTriggerCard ? ( + + ) : null + + return ( +
+ {disableAppCards && triggerCardNode} + {appCards} + {!disableAppCards && triggerCardNode}
) } diff --git a/web/app/components/app/overview/app-card.tsx b/web/app/components/app/overview/app-card.tsx index dcb6ae6b4d..a0f5780b71 100644 --- a/web/app/components/app/overview/app-card.tsx +++ b/web/app/components/app/overview/app-card.tsx @@ -51,6 +51,8 @@ export type IAppCardProps = { isInPanel?: boolean cardType?: 'api' | 'webapp' customBgColor?: string + triggerModeDisabled?: boolean // true when Trigger Node mode needs UI locked to avoid conflicting actions + triggerModeMessage?: React.ReactNode // contextual copy explaining why the card is disabled in trigger mode onChangeStatus: (val: boolean) => Promise onSaveSiteConfig?: (params: ConfigParams) => Promise onGenerateCode?: () => Promise @@ -61,6 +63,8 @@ function AppCard({ isInPanel, cardType = 'webapp', customBgColor, + triggerModeDisabled = false, + triggerModeMessage = '', onChangeStatus, onSaveSiteConfig, onGenerateCode, @@ -111,7 +115,7 @@ function AppCard({ const hasStartNode = currentWorkflow?.graph?.nodes?.some(node => node.data.type === BlockEnum.Start) const missingStartNode = isWorkflowApp && !hasStartNode const hasInsufficientPermissions = isApp ? !isCurrentWorkspaceEditor : !isCurrentWorkspaceManager - const toggleDisabled = hasInsufficientPermissions || appUnpublished || missingStartNode + const toggleDisabled = hasInsufficientPermissions || appUnpublished || missingStartNode || triggerModeDisabled const runningStatus = (appUnpublished || missingStartNode) ? false : (isApp ? appInfo.enable_site : appInfo.enable_api) const isMinimalState = appUnpublished || missingStartNode const { app_base_url, access_token } = appInfo.site ?? {} @@ -189,7 +193,20 @@ function AppCard({ className={ `${isInPanel ? 'border-l-[0.5px] border-t' : 'border-[0.5px] shadow-xs'} w-full max-w-full rounded-xl border-effects-highlight ${className ?? ''} ${isMinimalState ? 'h-12' : ''}`} > -
+
+ {triggerModeDisabled && ( + triggerModeMessage + ? ( + + + + ) + : + )}
-
- {t('appOverview.overview.appInfo.enableTooltip.description')} -
-
window.open(docLink('/guides/workflow/node/user-input'), '_blank')} - > - {t('appOverview.overview.appInfo.enableTooltip.learnMore')} -
- + toggleDisabled ? ( + triggerModeDisabled && triggerModeMessage + ? triggerModeMessage + : (appUnpublished || missingStartNode) ? ( + <> +
+ {t('appOverview.overview.appInfo.enableTooltip.description')} +
+
window.open(docLink('/guides/workflow/node/user-input'), '_blank')} + > + {t('appOverview.overview.appInfo.enableTooltip.learnMore')} +
+ + ) + : '' ) : '' } position="right" @@ -329,9 +351,11 @@ function AppCard({ {!isApp && } {OPERATIONS_MAP[cardType].map((op) => { const disabled - = op.opName === t('appOverview.overview.appInfo.settings.entry') - ? false - : !runningStatus + = triggerModeDisabled + ? true + : op.opName === t('appOverview.overview.appInfo.settings.entry') + ? false + : !runningStatus return (