diff --git a/.devcontainer/post_create_command.sh b/.devcontainer/post_create_command.sh index 637593b9de..b92d4c35a8 100755 --- a/.devcontainer/post_create_command.sh +++ b/.devcontainer/post_create_command.sh @@ -7,7 +7,7 @@ cd web && pnpm install pipx install uv echo "alias start-api=\"cd $WORKSPACE_ROOT/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug\"" >> ~/.bashrc -echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P threads -c 1 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention\"" >> ~/.bashrc +echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P threads -c 1 --loglevel INFO -Q dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention\"" >> ~/.bashrc echo "alias start-web=\"cd $WORKSPACE_ROOT/web && pnpm dev:inspect\"" >> ~/.bashrc echo "alias start-web-prod=\"cd $WORKSPACE_ROOT/web && pnpm build && pnpm start\"" >> ~/.bashrc echo "alias start-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d\"" >> ~/.bashrc diff --git a/.vscode/launch.json.template b/.vscode/launch.json.template index 700b815c3b..c3e2c50c52 100644 --- a/.vscode/launch.json.template +++ b/.vscode/launch.json.template @@ -37,7 +37,7 @@ "-c", "1", "-Q", - "dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution", + "dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution", "--loglevel", "INFO" ], diff --git a/Makefile b/Makefile index 0aff26b3e5..55871c86a7 100644 --- a/Makefile +++ b/Makefile @@ -68,8 +68,9 @@ lint: @echo "✅ Linting complete" type-check: - @echo "📝 Running type checks (basedpyright + mypy)..." + @echo "📝 Running type checks (basedpyright + pyrefly + mypy)..." @./dev/basedpyright-check $(PATH_TO_CHECK) + @./dev/pyrefly-check-local @uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --check-untyped-defs --disable-error-code=import-untyped . @echo "✅ Type checks complete" @@ -131,7 +132,7 @@ help: @echo " make format - Format code with ruff" @echo " make check - Check code with ruff" @echo " make lint - Format, fix, and lint code (ruff, imports, dotenv)" - @echo " make type-check - Run type checks (basedpyright, mypy)" + @echo " make type-check - Run type checks (basedpyright, pyrefly, mypy)" @echo " make test - Run backend unit tests (or TARGET_TESTS=./api/tests/)" @echo "" @echo "Docker Build Targets:" diff --git a/api/AGENTS.md b/api/AGENTS.md index 13adb42276..d43d2528b8 100644 --- a/api/AGENTS.md +++ b/api/AGENTS.md @@ -62,6 +62,22 @@ This is the default standard for backend code in this repo. Follow it for new co - Code should usually include type annotations that match the repo’s current Python version (avoid untyped public APIs and “mystery” values). - Prefer modern typing forms (e.g. `list[str]`, `dict[str, int]`) and avoid `Any` unless there’s a strong reason. +- For dictionary-like data with known keys and value types, prefer `TypedDict` over `dict[...]` or `Mapping[...]`. +- For optional keys in typed payloads, use `NotRequired[...]` (or `total=False` when most fields are optional). +- Keep `dict[...]` / `Mapping[...]` for truly dynamic key spaces where the key set is unknown. + +```python +from datetime import datetime +from typing import NotRequired, TypedDict + + +class UserProfile(TypedDict): + user_id: str + email: str + created_at: datetime + nickname: NotRequired[str] +``` + - For classes, declare member variables at the top of the class body (before `__init__`) so the class shape is obvious at a glance: ```python diff --git a/api/commands.py b/api/commands.py index 75b17df78e..0a3b808950 100644 --- a/api/commands.py +++ b/api/commands.py @@ -2668,3 +2668,77 @@ def clean_expired_messages( raise click.echo(click.style("messages cleanup completed.", fg="green")) + + +@click.command("export-app-messages", help="Export messages for an app to JSONL.GZ.") +@click.option("--app-id", required=True, help="Application ID to export messages for.") +@click.option( + "--start-from", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Optional lower bound (inclusive) for created_at.", +) +@click.option( + "--end-before", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + required=True, + help="Upper bound (exclusive) for created_at.", +) +@click.option( + "--filename", + required=True, + help="Base filename (relative path). Do not include suffix like .jsonl.gz.", +) +@click.option("--use-cloud-storage", is_flag=True, default=False, help="Upload to cloud storage instead of local file.") +@click.option("--batch-size", default=1000, show_default=True, help="Batch size for cursor pagination.") +@click.option("--dry-run", is_flag=True, default=False, help="Scan only, print stats without writing any file.") +def export_app_messages( + app_id: str, + start_from: datetime.datetime | None, + end_before: datetime.datetime, + filename: str, + use_cloud_storage: bool, + batch_size: int, + dry_run: bool, +): + if start_from and start_from >= end_before: + raise click.UsageError("--start-from must be before --end-before.") + + from services.retention.conversation.message_export_service import AppMessageExportService + + try: + validated_filename = AppMessageExportService.validate_export_filename(filename) + except ValueError as e: + raise click.BadParameter(str(e), param_hint="--filename") from e + + click.echo(click.style(f"export_app_messages: starting export for app {app_id}.", fg="green")) + start_at = time.perf_counter() + + try: + service = AppMessageExportService( + app_id=app_id, + end_before=end_before, + filename=validated_filename, + start_from=start_from, + batch_size=batch_size, + use_cloud_storage=use_cloud_storage, + dry_run=dry_run, + ) + stats = service.run() + + elapsed = time.perf_counter() - start_at + click.echo( + click.style( + f"export_app_messages: completed in {elapsed:.2f}s\n" + f" - Batches: {stats.batches}\n" + f" - Total messages: {stats.total_messages}\n" + f" - Messages with feedback: {stats.messages_with_feedback}\n" + f" - Total feedbacks: {stats.total_feedbacks}", + fg="green", + ) + ) + except Exception as e: + elapsed = time.perf_counter() - start_at + logger.exception("export_app_messages failed") + click.echo(click.style(f"export_app_messages: failed after {elapsed:.2f}s - {e}", fg="red")) + raise diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index fbd5060b8c..a1cb375e24 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -516,8 +516,10 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): graph_runtime_state=validated_state, ) + yield from self._handle_advanced_chat_message_end_event( + QueueAdvancedChatMessageEndEvent(), graph_runtime_state=validated_state + ) yield workflow_finish_resp - self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) def _handle_workflow_partial_success_event( self, @@ -538,6 +540,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): exceptions_count=event.exceptions_count, ) + yield from self._handle_advanced_chat_message_end_event( + QueueAdvancedChatMessageEndEvent(), graph_runtime_state=validated_state + ) yield workflow_finish_resp def _handle_workflow_paused_event( @@ -854,6 +859,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): yield from self._handle_workflow_paused_event(event) break + case QueueWorkflowSucceededEvent(): + yield from self._handle_workflow_succeeded_event(event, trace_manager=trace_manager) + break + + case QueueWorkflowPartialSuccessEvent(): + yield from self._handle_workflow_partial_success_event(event, trace_manager=trace_manager) + break + case QueueStopEvent(): yield from self._handle_stop_event(event, graph_runtime_state=None, trace_manager=trace_manager) break diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 57ef0c078f..b530fe1ce4 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -44,14 +44,13 @@ from core.app.entities.task_entities import ( ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manager import MessageCycleManager +from core.app.task_pipeline.message_file_utils import prepare_file_dict from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_manager import ModelInstance from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_template_parser import PromptTemplateParser -from core.tools.signature import sign_tool_file -from dify_graph.file import helpers as file_helpers from dify_graph.file.enums import FileTransferMethod from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage from dify_graph.model_runtime.entities.message_entities import ( @@ -460,91 +459,40 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): """ self._task_state.metadata.usage = self._task_state.llm_result.usage metadata_dict = self._task_state.metadata.model_dump() + + # Fetch files associated with this message + files = None + with Session(db.engine, expire_on_commit=False) as session: + message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all() + + if message_files: + # Fetch all required UploadFile objects in a single query to avoid N+1 problem + upload_file_ids = list( + dict.fromkeys( + mf.upload_file_id + for mf in message_files + if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id + ) + ) + upload_files_map = {} + if upload_file_ids: + upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all() + upload_files_map = {uf.id: uf for uf in upload_files} + + files_list = [] + for message_file in message_files: + file_dict = prepare_file_dict(message_file, upload_files_map) + files_list.append(file_dict) + + files = files_list or None + return MessageEndStreamResponse( task_id=self._application_generate_entity.task_id, id=self._message_id, metadata=metadata_dict, + files=files, ) - def _record_files(self): - with Session(db.engine, expire_on_commit=False) as session: - message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all() - if not message_files: - return None - - files_list = [] - upload_file_ids = [ - mf.upload_file_id - for mf in message_files - if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id - ] - upload_files_map = {} - if upload_file_ids: - upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all() - upload_files_map = {uf.id: uf for uf in upload_files} - - for message_file in message_files: - upload_file = None - if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id: - upload_file = upload_files_map.get(message_file.upload_file_id) - - url = None - filename = "file" - mime_type = "application/octet-stream" - size = 0 - extension = "" - - if message_file.transfer_method == FileTransferMethod.REMOTE_URL: - url = message_file.url - if message_file.url: - filename = message_file.url.split("/")[-1].split("?")[0] # Remove query params - elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE: - if upload_file: - url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id)) - filename = upload_file.name - mime_type = upload_file.mime_type or "application/octet-stream" - size = upload_file.size or 0 - extension = f".{upload_file.extension}" if upload_file.extension else "" - elif message_file.upload_file_id: - # Fallback: generate URL even if upload_file not found - url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id)) - elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url: - # For tool files, use URL directly if it's HTTP, otherwise sign it - if message_file.url.startswith("http"): - url = message_file.url - filename = message_file.url.split("/")[-1].split("?")[0] - else: - # Extract tool file id and extension from URL - url_parts = message_file.url.split("/") - if url_parts: - file_part = url_parts[-1].split("?")[0] # Remove query params first - # Use rsplit to correctly handle filenames with multiple dots - if "." in file_part: - tool_file_id, ext = file_part.rsplit(".", 1) - extension = f".{ext}" - else: - tool_file_id = file_part - extension = ".bin" - url = sign_tool_file(tool_file_id=tool_file_id, extension=extension) - filename = file_part - - transfer_method_value = message_file.transfer_method - remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else "" - file_dict = { - "related_id": message_file.id, - "extension": extension, - "filename": filename, - "size": size, - "mime_type": mime_type, - "transfer_method": transfer_method_value, - "type": message_file.type, - "url": url or "", - "upload_file_id": message_file.upload_file_id or message_file.id, - "remote_url": remote_url, - } - files_list.append(file_dict) - return files_list or None - def _agent_message_to_stream_response(self, answer: str, message_id: str) -> AgentMessageStreamResponse: """ Agent message to stream response. diff --git a/api/core/app/task_pipeline/message_cycle_manager.py b/api/core/app/task_pipeline/message_cycle_manager.py index cc4f97ad94..536ab02eae 100644 --- a/api/core/app/task_pipeline/message_cycle_manager.py +++ b/api/core/app/task_pipeline/message_cycle_manager.py @@ -1,7 +1,6 @@ import hashlib import logging -import time -from threading import Thread +from threading import Thread, Timer from typing import Union from flask import Flask, current_app @@ -96,9 +95,9 @@ class MessageCycleManager: if auto_generate_conversation_name and is_first_message: # start generate thread # time.sleep not block other logic - time.sleep(1) - thread = Thread( - target=self._generate_conversation_name_worker, + thread = Timer( + 1, + self._generate_conversation_name_worker, kwargs={ "flask_app": current_app._get_current_object(), # type: ignore "conversation_id": conversation_id, diff --git a/api/core/app/task_pipeline/message_file_utils.py b/api/core/app/task_pipeline/message_file_utils.py new file mode 100644 index 0000000000..843e9eea30 --- /dev/null +++ b/api/core/app/task_pipeline/message_file_utils.py @@ -0,0 +1,76 @@ +from core.tools.signature import sign_tool_file +from dify_graph.file import helpers as file_helpers +from dify_graph.file.enums import FileTransferMethod +from models.model import MessageFile, UploadFile + +MAX_TOOL_FILE_EXTENSION_LENGTH = 10 + + +def prepare_file_dict(message_file: MessageFile, upload_files_map: dict[str, UploadFile]) -> dict: + """ + Prepare file dictionary for message end stream response. + + :param message_file: MessageFile instance + :param upload_files_map: Dictionary mapping upload_file_id to UploadFile + :return: Dictionary containing file information + """ + upload_file = None + if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id: + upload_file = upload_files_map.get(message_file.upload_file_id) + + url = None + filename = "file" + mime_type = "application/octet-stream" + size = 0 + extension = "" + + if message_file.transfer_method == FileTransferMethod.REMOTE_URL: + url = message_file.url + if message_file.url: + filename = message_file.url.split("/")[-1].split("?")[0] + if "." in filename: + extension = "." + filename.rsplit(".", 1)[1] + elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE: + if upload_file: + url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id)) + filename = upload_file.name + mime_type = upload_file.mime_type or "application/octet-stream" + size = upload_file.size or 0 + extension = f".{upload_file.extension}" if upload_file.extension else "" + elif message_file.upload_file_id: + url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id)) + elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url: + if message_file.url.startswith(("http://", "https://")): + url = message_file.url + filename = message_file.url.split("/")[-1].split("?")[0] + if "." in filename: + extension = "." + filename.rsplit(".", 1)[1] + else: + url_parts = message_file.url.split("/") + if url_parts: + file_part = url_parts[-1].split("?")[0] + if "." in file_part: + tool_file_id, ext = file_part.rsplit(".", 1) + extension = f".{ext}" + if len(extension) > MAX_TOOL_FILE_EXTENSION_LENGTH: + extension = ".bin" + else: + tool_file_id = file_part + extension = ".bin" + url = sign_tool_file(tool_file_id=tool_file_id, extension=extension) + filename = file_part + + transfer_method_value = message_file.transfer_method.value + remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else "" + return { + "related_id": message_file.id, + "extension": extension, + "filename": filename, + "size": size, + "mime_type": mime_type, + "transfer_method": transfer_method_value, + "type": message_file.type, + "url": url or "", + "upload_file_id": message_file.upload_file_id or message_file.id, + "remote_url": remote_url, + } diff --git a/api/core/rag/datasource/vdb/chroma/chroma_vector.py b/api/core/rag/datasource/vdb/chroma/chroma_vector.py index de1572410c..cbc846f716 100644 --- a/api/core/rag/datasource/vdb/chroma/chroma_vector.py +++ b/api/core/rag/datasource/vdb/chroma/chroma_vector.py @@ -65,7 +65,7 @@ class ChromaVector(BaseVector): self._client.get_or_create_collection(collection_name) redis_client.set(collection_exist_cache_key, 1, ex=3600) - def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs): + def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs) -> list[str]: uuids = self._get_uuids(documents) texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] @@ -73,6 +73,7 @@ class ChromaVector(BaseVector): collection = self._client.get_or_create_collection(self._collection_name) # FIXME: chromadb using numpy array, fix the type error later collection.upsert(ids=uuids, documents=texts, embeddings=embeddings, metadatas=metadatas) # type: ignore + return uuids def delete_by_metadata_field(self, key: str, value: str): collection = self._client.get_or_create_collection(self._collection_name) diff --git a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py index 91bb71bfa6..8e8120fc10 100644 --- a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +++ b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py @@ -605,25 +605,36 @@ class ClickzettaVector(BaseVector): logger.warning("Failed to create inverted index: %s", e) # Continue without inverted index - full-text search will fall back to LIKE - def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs): + def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs) -> list[str]: """Add documents with embeddings to the collection.""" if not documents: - return + return [] batch_size = self._config.batch_size total_batches = (len(documents) + batch_size - 1) // batch_size + added_ids = [] for i in range(0, len(documents), batch_size): batch_docs = documents[i : i + batch_size] batch_embeddings = embeddings[i : i + batch_size] + batch_doc_ids = [] + for doc in batch_docs: + metadata = doc.metadata if isinstance(doc.metadata, dict) else {} + batch_doc_ids.append(self._safe_doc_id(metadata.get("doc_id", str(uuid.uuid4())))) + added_ids.extend(batch_doc_ids) # Execute batch insert through write queue - self._execute_write(self._insert_batch, batch_docs, batch_embeddings, i, batch_size, total_batches) + self._execute_write( + self._insert_batch, batch_docs, batch_embeddings, batch_doc_ids, i, batch_size, total_batches + ) + + return added_ids def _insert_batch( self, batch_docs: list[Document], batch_embeddings: list[list[float]], + batch_doc_ids: list[str], batch_index: int, batch_size: int, total_batches: int, @@ -641,14 +652,9 @@ class ClickzettaVector(BaseVector): data_rows = [] vector_dimension = len(batch_embeddings[0]) if batch_embeddings and batch_embeddings[0] else 768 - for doc, embedding in zip(batch_docs, batch_embeddings): + for doc, embedding, doc_id in zip(batch_docs, batch_embeddings, batch_doc_ids): # Optimized: minimal checks for common case, fallback for edge cases - metadata = doc.metadata or {} - - if not isinstance(metadata, dict): - metadata = {} - - doc_id = self._safe_doc_id(metadata.get("doc_id", str(uuid.uuid4()))) + metadata = doc.metadata if isinstance(doc.metadata, dict) else {} # Fast path for JSON serialization try: diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index 649e2f7358..770df8b050 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -194,6 +194,13 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): # Create a new database session with self._session_factory() as session: + existing_model = session.get(WorkflowRun, db_model.id) + if existing_model: + if existing_model.tenant_id != self._tenant_id: + raise ValueError("Unauthorized access to workflow run") + # Preserve the original start time for pause/resume flows. + db_model.created_at = existing_model.created_at + # SQLAlchemy merge intelligently handles both insert and update operations # based on the presence of the primary key session.merge(db_model) diff --git a/api/dify_graph/nodes/knowledge_retrieval/knowledge_retrieval_node.py b/api/dify_graph/nodes/knowledge_retrieval/knowledge_retrieval_node.py index d84dda42d6..14744d0a74 100644 --- a/api/dify_graph/nodes/knowledge_retrieval/knowledge_retrieval_node.py +++ b/api/dify_graph/nodes/knowledge_retrieval/knowledge_retrieval_node.py @@ -116,7 +116,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD try: results, usage = self._fetch_dataset_retriever(node_data=self._node_data, variables=variables) - outputs = {"result": ArrayObjectSegment(value=[item.model_dump() for item in results])} + outputs = {"result": ArrayObjectSegment(value=[item.model_dump(by_alias=True) for item in results])} return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=variables, diff --git a/api/dify_graph/runtime/variable_pool.py b/api/dify_graph/runtime/variable_pool.py index a2b1af99bb..e3ef6a2897 100644 --- a/api/dify_graph/runtime/variable_pool.py +++ b/api/dify_graph/runtime/variable_pool.py @@ -65,9 +65,15 @@ class VariablePool(BaseModel): # Add environment variables to the variable pool for var in self.environment_variables: self.add((ENVIRONMENT_VARIABLE_NODE_ID, var.name), var) - # Add conversation variables to the variable pool + # Add conversation variables to the variable pool. When restoring from a serialized + # snapshot, `variable_dictionary` already carries the latest runtime values. + # In that case, keep existing entries instead of overwriting them with the + # bootstrap list. for var in self.conversation_variables: - self.add((CONVERSATION_VARIABLE_NODE_ID, var.name), var) + selector = (CONVERSATION_VARIABLE_NODE_ID, var.name) + if self._has(selector): + continue + self.add(selector, var) # Add rag pipeline variables to the variable pool if self.rag_pipeline_variables: rag_pipeline_variables_map: defaultdict[Any, dict[Any, Any]] = defaultdict(dict) diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index 1a675b3338..6b904b7d0d 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -35,10 +35,10 @@ if [[ "${MODE}" == "worker" ]]; then if [[ -z "${CELERY_QUEUES}" ]]; then if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - DEFAULT_QUEUES="api_token,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" else # Community edition (SELF_HOSTED): dataset, pipeline and workflow have separate queues - DEFAULT_QUEUES="api_token,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" fi else DEFAULT_QUEUES="${CELERY_QUEUES}" diff --git a/api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py b/api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py index 69959acd19..b70c2183d2 100644 --- a/api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py +++ b/api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py @@ -1,3 +1,5 @@ +from typing import Any, cast + from sqlalchemy import select from events.app_event import app_model_config_was_updated @@ -54,9 +56,11 @@ def get_dataset_ids_from_model_config(app_model_config: AppModelConfig) -> set[s continue tool_type = list(tool.keys())[0] - tool_config = list(tool.values())[0] + tool_config = cast(dict[str, Any], list(tool.values())[0]) if tool_type == "dataset": - dataset_ids.add(tool_config.get("id")) + dataset_id = tool_config.get("id") + if isinstance(dataset_id, str): + dataset_ids.add(dataset_id) # get dataset from dataset_configs dataset_configs = app_model_config.dataset_configs_dict diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 46885761a1..fe95cc5816 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -13,6 +13,7 @@ def init_app(app: DifyApp): convert_to_agent_apps, create_tenant, delete_archived_workflow_runs, + export_app_messages, extract_plugins, extract_unique_plugins, file_usage, @@ -66,6 +67,7 @@ def init_app(app: DifyApp): restore_workflow_runs, clean_workflow_runs, clean_expired_messages, + export_app_messages, ] for cmd in cmds_to_register: app.cli.add_command(cmd) diff --git a/api/migrations/env.py b/api/migrations/env.py index 66a4614e80..3b1fa7bb89 100644 --- a/api/migrations/env.py +++ b/api/migrations/env.py @@ -66,6 +66,7 @@ def run_migrations_offline(): context.configure( url=url, target_metadata=get_metadata(), literal_binds=True ) + logger.info("Generating offline migration SQL with url: %s", url) with context.begin_transaction(): context.run_migrations() diff --git a/api/pyproject.toml b/api/pyproject.toml index 3c89601dc5..bf786f4584 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -247,3 +247,13 @@ module = [ "extensions.logstore.repositories.logstore_api_workflow_run_repository", ] ignore_errors = true + +[tool.pyrefly] +project-includes = ["."] +project-excludes = [ + ".venv", + "migrations/", +] +python-platform = "linux" +python-version = "3.11.0" +infer-with-first-use = false diff --git a/api/pyrefly-local-excludes.txt b/api/pyrefly-local-excludes.txt new file mode 100644 index 0000000000..d3b2ede745 --- /dev/null +++ b/api/pyrefly-local-excludes.txt @@ -0,0 +1,200 @@ +configs/middleware/cache/redis_pubsub_config.py +controllers/console/app/annotation.py +controllers/console/app/app.py +controllers/console/app/app_import.py +controllers/console/app/mcp_server.py +controllers/console/app/site.py +controllers/console/auth/email_register.py +controllers/console/human_input_form.py +controllers/console/init_validate.py +controllers/console/ping.py +controllers/console/setup.py +controllers/console/version.py +controllers/console/workspace/trigger_providers.py +controllers/service_api/app/annotation.py +controllers/web/workflow_events.py +core/agent/fc_agent_runner.py +core/app/apps/advanced_chat/app_generator.py +core/app/apps/advanced_chat/app_runner.py +core/app/apps/advanced_chat/generate_task_pipeline.py +core/app/apps/agent_chat/app_generator.py +core/app/apps/base_app_generate_response_converter.py +core/app/apps/base_app_generator.py +core/app/apps/chat/app_generator.py +core/app/apps/common/workflow_response_converter.py +core/app/apps/completion/app_generator.py +core/app/apps/pipeline/pipeline_generator.py +core/app/apps/pipeline/pipeline_runner.py +core/app/apps/workflow/app_generator.py +core/app/apps/workflow/app_runner.py +core/app/apps/workflow/generate_task_pipeline.py +core/app/apps/workflow_app_runner.py +core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +core/datasource/datasource_manager.py +core/external_data_tool/api/api.py +core/llm_generator/llm_generator.py +core/llm_generator/output_parser/structured_output.py +core/mcp/mcp_client.py +core/ops/aliyun_trace/data_exporter/traceclient.py +core/ops/arize_phoenix_trace/arize_phoenix_trace.py +core/ops/mlflow_trace/mlflow_trace.py +core/ops/ops_trace_manager.py +core/ops/tencent_trace/client.py +core/ops/tencent_trace/utils.py +core/plugin/backwards_invocation/base.py +core/plugin/backwards_invocation/model.py +core/prompt/utils/extract_thread_messages.py +core/rag/datasource/keyword/jieba/jieba.py +core/rag/datasource/keyword/jieba/jieba_keyword_table_handler.py +core/rag/datasource/vdb/analyticdb/analyticdb_vector.py +core/rag/datasource/vdb/analyticdb/analyticdb_vector_openapi.py +core/rag/datasource/vdb/baidu/baidu_vector.py +core/rag/datasource/vdb/chroma/chroma_vector.py +core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +core/rag/datasource/vdb/couchbase/couchbase_vector.py +core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py +core/rag/datasource/vdb/huawei/huawei_cloud_vector.py +core/rag/datasource/vdb/lindorm/lindorm_vector.py +core/rag/datasource/vdb/matrixone/matrixone_vector.py +core/rag/datasource/vdb/milvus/milvus_vector.py +core/rag/datasource/vdb/myscale/myscale_vector.py +core/rag/datasource/vdb/oceanbase/oceanbase_vector.py +core/rag/datasource/vdb/opensearch/opensearch_vector.py +core/rag/datasource/vdb/oracle/oraclevector.py +core/rag/datasource/vdb/pgvecto_rs/pgvecto_rs.py +core/rag/datasource/vdb/relyt/relyt_vector.py +core/rag/datasource/vdb/tablestore/tablestore_vector.py +core/rag/datasource/vdb/tencent/tencent_vector.py +core/rag/datasource/vdb/tidb_on_qdrant/tidb_on_qdrant_vector.py +core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py +core/rag/datasource/vdb/tidb_vector/tidb_vector.py +core/rag/datasource/vdb/upstash/upstash_vector.py +core/rag/datasource/vdb/vikingdb/vikingdb_vector.py +core/rag/datasource/vdb/weaviate/weaviate_vector.py +core/rag/extractor/csv_extractor.py +core/rag/extractor/excel_extractor.py +core/rag/extractor/firecrawl/firecrawl_app.py +core/rag/extractor/firecrawl/firecrawl_web_extractor.py +core/rag/extractor/html_extractor.py +core/rag/extractor/jina_reader_extractor.py +core/rag/extractor/markdown_extractor.py +core/rag/extractor/notion_extractor.py +core/rag/extractor/pdf_extractor.py +core/rag/extractor/text_extractor.py +core/rag/extractor/unstructured/unstructured_doc_extractor.py +core/rag/extractor/unstructured/unstructured_eml_extractor.py +core/rag/extractor/unstructured/unstructured_epub_extractor.py +core/rag/extractor/unstructured/unstructured_markdown_extractor.py +core/rag/extractor/unstructured/unstructured_msg_extractor.py +core/rag/extractor/unstructured/unstructured_ppt_extractor.py +core/rag/extractor/unstructured/unstructured_pptx_extractor.py +core/rag/extractor/unstructured/unstructured_xml_extractor.py +core/rag/extractor/watercrawl/client.py +core/rag/extractor/watercrawl/extractor.py +core/rag/extractor/watercrawl/provider.py +core/rag/extractor/word_extractor.py +core/rag/index_processor/processor/paragraph_index_processor.py +core/rag/index_processor/processor/parent_child_index_processor.py +core/rag/index_processor/processor/qa_index_processor.py +core/rag/retrieval/router/multi_dataset_function_call_router.py +core/rag/summary_index/summary_index.py +core/repositories/sqlalchemy_workflow_execution_repository.py +core/repositories/sqlalchemy_workflow_node_execution_repository.py +core/tools/__base/tool.py +core/tools/mcp_tool/provider.py +core/tools/plugin_tool/provider.py +core/tools/utils/message_transformer.py +core/tools/utils/web_reader_tool.py +core/tools/workflow_as_tool/provider.py +core/trigger/debug/event_selectors.py +core/trigger/entities/entities.py +core/trigger/provider.py +core/workflow/workflow_entry.py +dify_graph/entities/workflow_execution.py +dify_graph/file/file_manager.py +dify_graph/graph_engine/error_handler.py +dify_graph/graph_engine/layers/execution_limits.py +dify_graph/nodes/agent/agent_node.py +dify_graph/nodes/base/node.py +dify_graph/nodes/code/code_node.py +dify_graph/nodes/datasource/datasource_node.py +dify_graph/nodes/document_extractor/node.py +dify_graph/nodes/human_input/human_input_node.py +dify_graph/nodes/if_else/if_else_node.py +dify_graph/nodes/iteration/iteration_node.py +dify_graph/nodes/knowledge_index/knowledge_index_node.py +dify_graph/nodes/knowledge_retrieval/knowledge_retrieval_node.py +dify_graph/nodes/list_operator/node.py +dify_graph/nodes/llm/node.py +dify_graph/nodes/loop/loop_node.py +dify_graph/nodes/parameter_extractor/parameter_extractor_node.py +dify_graph/nodes/question_classifier/question_classifier_node.py +dify_graph/nodes/start/start_node.py +dify_graph/nodes/template_transform/template_transform_node.py +dify_graph/nodes/tool/tool_node.py +dify_graph/nodes/trigger_plugin/trigger_event_node.py +dify_graph/nodes/trigger_schedule/trigger_schedule_node.py +dify_graph/nodes/trigger_webhook/node.py +dify_graph/nodes/variable_aggregator/variable_aggregator_node.py +dify_graph/nodes/variable_assigner/v1/node.py +dify_graph/nodes/variable_assigner/v2/node.py +dify_graph/variables/types.py +extensions/ext_fastopenapi.py +extensions/logstore/repositories/logstore_api_workflow_run_repository.py +extensions/otel/instrumentation.py +extensions/otel/runtime.py +extensions/storage/aliyun_oss_storage.py +extensions/storage/aws_s3_storage.py +extensions/storage/azure_blob_storage.py +extensions/storage/baidu_obs_storage.py +extensions/storage/clickzetta_volume/clickzetta_volume_storage.py +extensions/storage/clickzetta_volume/file_lifecycle.py +extensions/storage/google_cloud_storage.py +extensions/storage/huawei_obs_storage.py +extensions/storage/opendal_storage.py +extensions/storage/oracle_oci_storage.py +extensions/storage/supabase_storage.py +extensions/storage/tencent_cos_storage.py +extensions/storage/volcengine_tos_storage.py +factories/variable_factory.py +libs/external_api.py +libs/gmpy2_pkcs10aep_cipher.py +libs/helper.py +libs/login.py +libs/module_loading.py +libs/oauth.py +libs/oauth_data_source.py +models/trigger.py +models/workflow.py +repositories/sqlalchemy_api_workflow_node_execution_repository.py +repositories/sqlalchemy_api_workflow_run_repository.py +repositories/sqlalchemy_execution_extra_content_repository.py +schedule/queue_monitor_task.py +services/account_service.py +services/audio_service.py +services/auth/firecrawl/firecrawl.py +services/auth/jina.py +services/auth/jina/jina.py +services/auth/watercrawl/watercrawl.py +services/conversation_service.py +services/dataset_service.py +services/document_indexing_proxy/document_indexing_task_proxy.py +services/document_indexing_proxy/duplicate_document_indexing_task_proxy.py +services/external_knowledge_service.py +services/plugin/plugin_migration.py +services/recommend_app/buildin/buildin_retrieval.py +services/recommend_app/database/database_retrieval.py +services/recommend_app/remote/remote_retrieval.py +services/summary_index_service.py +services/tools/tools_transform_service.py +services/trigger/trigger_provider_service.py +services/trigger/trigger_subscription_builder_service.py +services/trigger/webhook_service.py +services/workflow_draft_variable_service.py +services/workflow_event_snapshot_service.py +services/workflow_service.py +tasks/app_generate/workflow_execute_task.py +tasks/regenerate_summary_index_task.py +tasks/trigger_processing_tasks.py +tasks/workflow_cfs_scheduler/cfs_scheduler.py +tasks/workflow_execution_tasks.py diff --git a/api/pyrefly.toml b/api/pyrefly.toml deleted file mode 100644 index 01f4c5a529..0000000000 --- a/api/pyrefly.toml +++ /dev/null @@ -1,8 +0,0 @@ -project-includes = ["."] -project-excludes = [ - ".venv", - "migrations/", -] -python-platform = "linux" -python-version = "3.11.0" -infer-with-first-use = false diff --git a/api/pytest.ini b/api/pytest.ini index 4a9470fa0c..588dafe7eb 100644 --- a/api/pytest.ini +++ b/api/pytest.ini @@ -1,5 +1,6 @@ [pytest] -addopts = --cov=./api --cov-report=json +pythonpath = . +addopts = --cov=./api --cov-report=json --import-mode=importlib env = ANTHROPIC_API_KEY = sk-ant-api11-IamNotARealKeyJustForMockTestKawaiiiiiiiiii-NotBaka-ASkksz AZURE_OPENAI_API_BASE = https://difyai-openai.openai.azure.com @@ -19,7 +20,7 @@ env = GOOGLE_API_KEY = abcdefghijklmnopqrstuvwxyz HUGGINGFACE_API_KEY = hf-awuwuwuwuwuwuwuwuwuwuwuwuwuwuwuwuwu HUGGINGFACE_EMBEDDINGS_ENDPOINT_URL = c - HUGGINGFACE_TEXT2TEXT_GEN_ENDPOINT_URL = b + HUGGINGFACE_TEXT2TEXT_GEN_ENDPOINT_URL = b HUGGINGFACE_TEXT_GEN_ENDPOINT_URL = a MIXEDBREAD_API_KEY = mk-aaaaaaaaaaaaaaaaaaaa MOCK_SWITCH = true diff --git a/api/services/rag_pipeline/rag_pipeline_transform_service.py b/api/services/rag_pipeline/rag_pipeline_transform_service.py index d0dfbc1070..cee18387b3 100644 --- a/api/services/rag_pipeline/rag_pipeline_transform_service.py +++ b/api/services/rag_pipeline/rag_pipeline_transform_service.py @@ -63,7 +63,12 @@ class RagPipelineTransformService: ): node = self._deal_file_extensions(node) if node.get("data", {}).get("type") == "knowledge-index": - node = self._deal_knowledge_index(dataset, doc_form, indexing_technique, retrieval_model, node) + knowledge_configuration = KnowledgeConfiguration.model_validate(node.get("data", {})) + if dataset.tenant_id != current_user.current_tenant_id: + raise ValueError("Unauthorized") + node = self._deal_knowledge_index( + knowledge_configuration, dataset, indexing_technique, retrieval_model, node + ) new_nodes.append(node) if new_nodes: graph["nodes"] = new_nodes @@ -155,14 +160,13 @@ class RagPipelineTransformService: def _deal_knowledge_index( self, + knowledge_configuration: KnowledgeConfiguration, dataset: Dataset, - doc_form: str, indexing_technique: str | None, retrieval_model: RetrievalSetting | None, node: dict, ): knowledge_configuration_dict = node.get("data", {}) - knowledge_configuration = KnowledgeConfiguration.model_validate(knowledge_configuration_dict) if indexing_technique == "high_quality": knowledge_configuration.embedding_model = dataset.embedding_model diff --git a/api/services/retention/conversation/message_export_service.py b/api/services/retention/conversation/message_export_service.py new file mode 100644 index 0000000000..fbe0d2795d --- /dev/null +++ b/api/services/retention/conversation/message_export_service.py @@ -0,0 +1,304 @@ +""" +Export app messages to JSONL.GZ format. + +Outputs: conversation_id, message_id, query, answer, inputs (raw JSON), +retriever_resources (from message_metadata), feedback (user feedbacks array). + +Uses (created_at, id) cursor pagination and batch-loads feedbacks to avoid N+1. +Does NOT touch Message.inputs / Message.user_feedback properties. +""" + +import datetime +import gzip +import json +import logging +import tempfile +from collections import defaultdict +from collections.abc import Generator, Iterable +from pathlib import Path, PurePosixPath +from typing import Any, BinaryIO, cast + +import orjson +import sqlalchemy as sa +from pydantic import BaseModel, ConfigDict, Field +from sqlalchemy import select, tuple_ +from sqlalchemy.orm import Session + +from extensions.ext_database import db +from extensions.ext_storage import storage +from models.model import Message, MessageFeedback + +logger = logging.getLogger(__name__) + +MAX_FILENAME_BASE_LENGTH = 1024 +FORBIDDEN_FILENAME_SUFFIXES = (".jsonl.gz", ".jsonl", ".gz") + + +class AppMessageExportFeedback(BaseModel): + id: str + app_id: str + conversation_id: str + message_id: str + rating: str + content: str | None = None + from_source: str + from_end_user_id: str | None = None + from_account_id: str | None = None + created_at: str + updated_at: str + + model_config = ConfigDict(extra="forbid") + + +class AppMessageExportRecord(BaseModel): + conversation_id: str + message_id: str + query: str + answer: str + inputs: dict[str, Any] + retriever_resources: list[Any] = Field(default_factory=list) + feedback: list[AppMessageExportFeedback] = Field(default_factory=list) + + model_config = ConfigDict(extra="forbid") + + +class AppMessageExportStats(BaseModel): + batches: int = 0 + total_messages: int = 0 + messages_with_feedback: int = 0 + total_feedbacks: int = 0 + + model_config = ConfigDict(extra="forbid") + + +class AppMessageExportService: + @staticmethod + def validate_export_filename(filename: str) -> str: + normalized = filename.strip() + if not normalized: + raise ValueError("--filename must not be empty.") + + normalized_lower = normalized.lower() + if normalized_lower.endswith(FORBIDDEN_FILENAME_SUFFIXES): + raise ValueError("--filename must not include .jsonl.gz/.jsonl/.gz suffix; pass base filename only.") + + if normalized.startswith("/"): + raise ValueError("--filename must be a relative path; absolute paths are not allowed.") + + if "\\" in normalized: + raise ValueError("--filename must use '/' as path separator; '\\' is not allowed.") + + if "//" in normalized: + raise ValueError("--filename must not contain empty path segments ('//').") + + if len(normalized) > MAX_FILENAME_BASE_LENGTH: + raise ValueError(f"--filename is too long; max length is {MAX_FILENAME_BASE_LENGTH}.") + + for ch in normalized: + if ch == "\x00" or ord(ch) < 32 or ord(ch) == 127: + raise ValueError("--filename must not contain control characters or NUL.") + + parts = PurePosixPath(normalized).parts + if not parts: + raise ValueError("--filename must include a file name.") + + if any(part in (".", "..") for part in parts): + raise ValueError("--filename must not contain '.' or '..' path segments.") + + return normalized + + @property + def output_gz_name(self) -> str: + return f"{self._filename_base}.jsonl.gz" + + @property + def output_jsonl_name(self) -> str: + return f"{self._filename_base}.jsonl" + + def __init__( + self, + app_id: str, + end_before: datetime.datetime, + filename: str, + *, + start_from: datetime.datetime | None = None, + batch_size: int = 1000, + use_cloud_storage: bool = False, + dry_run: bool = False, + ) -> None: + if start_from and start_from >= end_before: + raise ValueError(f"start_from ({start_from}) must be before end_before ({end_before})") + + self._app_id = app_id + self._end_before = end_before + self._start_from = start_from + self._filename_base = self.validate_export_filename(filename) + self._batch_size = batch_size + self._use_cloud_storage = use_cloud_storage + self._dry_run = dry_run + + def run(self) -> AppMessageExportStats: + stats = AppMessageExportStats() + + logger.info( + "export_app_messages: app_id=%s, start_from=%s, end_before=%s, dry_run=%s, cloud=%s, output_gz=%s", + self._app_id, + self._start_from, + self._end_before, + self._dry_run, + self._use_cloud_storage, + self.output_gz_name, + ) + + if self._dry_run: + for _ in self._iter_records_with_stats(stats): + pass + self._finalize_stats(stats) + return stats + + if self._use_cloud_storage: + self._export_to_cloud(stats) + else: + self._export_to_local(stats) + + self._finalize_stats(stats) + return stats + + def iter_records(self) -> Generator[AppMessageExportRecord, None, None]: + for batch in self._iter_record_batches(): + yield from batch + + @staticmethod + def write_jsonl_gz(records: Iterable[AppMessageExportRecord], fileobj: BinaryIO) -> None: + with gzip.GzipFile(fileobj=fileobj, mode="wb") as gz: + for record in records: + gz.write(orjson.dumps(record.model_dump(mode="json")) + b"\n") + + def _export_to_local(self, stats: AppMessageExportStats) -> None: + output_path = Path.cwd() / self.output_gz_name + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("wb") as output_file: + self.write_jsonl_gz(self._iter_records_with_stats(stats), output_file) + + def _export_to_cloud(self, stats: AppMessageExportStats) -> None: + with tempfile.SpooledTemporaryFile(max_size=64 * 1024 * 1024) as tmp: + self.write_jsonl_gz(self._iter_records_with_stats(stats), cast(BinaryIO, tmp)) + tmp.seek(0) + data = tmp.read() + + storage.save(self.output_gz_name, data) + logger.info("export_app_messages: uploaded %d bytes to cloud key=%s", len(data), self.output_gz_name) + + def _iter_records_with_stats(self, stats: AppMessageExportStats) -> Generator[AppMessageExportRecord, None, None]: + for record in self.iter_records(): + self._update_stats(stats, record) + yield record + + @staticmethod + def _update_stats(stats: AppMessageExportStats, record: AppMessageExportRecord) -> None: + stats.total_messages += 1 + if record.feedback: + stats.messages_with_feedback += 1 + stats.total_feedbacks += len(record.feedback) + + def _finalize_stats(self, stats: AppMessageExportStats) -> None: + if stats.total_messages == 0: + stats.batches = 0 + return + stats.batches = (stats.total_messages + self._batch_size - 1) // self._batch_size + + def _iter_record_batches(self) -> Generator[list[AppMessageExportRecord], None, None]: + cursor: tuple[datetime.datetime, str] | None = None + while True: + rows, cursor = self._fetch_batch(cursor) + if not rows: + break + + message_ids = [str(row.id) for row in rows] + feedbacks_map = self._fetch_feedbacks(message_ids) + yield [self._build_record(row, feedbacks_map) for row in rows] + + def _fetch_batch( + self, cursor: tuple[datetime.datetime, str] | None + ) -> tuple[list[Any], tuple[datetime.datetime, str] | None]: + with Session(db.engine, expire_on_commit=False) as session: + stmt = ( + select( + Message.id, + Message.conversation_id, + Message.query, + Message.answer, + Message._inputs, # pyright: ignore[reportPrivateUsage] + Message.message_metadata, + Message.created_at, + ) + .where( + Message.app_id == self._app_id, + Message.created_at < self._end_before, + ) + .order_by(Message.created_at, Message.id) + .limit(self._batch_size) + ) + + if self._start_from: + stmt = stmt.where(Message.created_at >= self._start_from) + + if cursor: + stmt = stmt.where( + tuple_(Message.created_at, Message.id) + > tuple_( + sa.literal(cursor[0], type_=sa.DateTime()), + sa.literal(cursor[1], type_=Message.id.type), + ) + ) + + rows = list(session.execute(stmt).all()) + + if not rows: + return [], cursor + + last = rows[-1] + return rows, (last.created_at, last.id) + + def _fetch_feedbacks(self, message_ids: list[str]) -> dict[str, list[AppMessageExportFeedback]]: + if not message_ids: + return {} + + with Session(db.engine, expire_on_commit=False) as session: + stmt = ( + select(MessageFeedback) + .where( + MessageFeedback.message_id.in_(message_ids), + MessageFeedback.from_source == "user", + ) + .order_by(MessageFeedback.message_id, MessageFeedback.created_at) + ) + feedbacks = list(session.scalars(stmt).all()) + + result: dict[str, list[AppMessageExportFeedback]] = defaultdict(list) + for feedback in feedbacks: + result[str(feedback.message_id)].append(AppMessageExportFeedback.model_validate(feedback.to_dict())) + return result + + @staticmethod + def _build_record(row: Any, feedbacks_map: dict[str, list[AppMessageExportFeedback]]) -> AppMessageExportRecord: + retriever_resources: list[Any] = [] + if row.message_metadata: + try: + metadata = json.loads(row.message_metadata) + value = metadata.get("retriever_resources", []) + if isinstance(value, list): + retriever_resources = value + except (json.JSONDecodeError, TypeError): + pass + + message_id = str(row.id) + return AppMessageExportRecord( + conversation_id=str(row.conversation_id), + message_id=message_id, + query=row.query, + answer=row.answer, + inputs=row._inputs if isinstance(row._inputs, dict) else {}, + retriever_resources=retriever_resources, + feedback=feedbacks_map.get(message_id, []), + ) diff --git a/api/tasks/generate_summary_index_task.py b/api/tasks/generate_summary_index_task.py index e4273e16b5..6493833edc 100644 --- a/api/tasks/generate_summary_index_task.py +++ b/api/tasks/generate_summary_index_task.py @@ -14,7 +14,7 @@ from services.summary_index_service import SummaryIndexService logger = logging.getLogger(__name__) -@shared_task(queue="dataset") +@shared_task(queue="dataset_summary") def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: list[str] | None = None): """ Async generate summary index for document segments. diff --git a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py index 6ad04aab0d..5d201bd801 100644 --- a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py +++ b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py @@ -6,7 +6,6 @@ import typing import click from celery import shared_task -from core.helper.marketplace import record_install_plugin_event from core.plugin.entities.marketplace import MarketplacePluginSnapshot from core.plugin.entities.plugin import PluginInstallationSource from core.plugin.impl.plugin import PluginInstaller @@ -166,7 +165,6 @@ def process_tenant_plugin_autoupgrade_check_task( # execute upgrade new_unique_identifier = manifest.latest_package_identifier - record_install_plugin_event(new_unique_identifier) click.echo( click.style( f"Upgrade plugin: {original_unique_identifier} -> {new_unique_identifier}", diff --git a/api/tasks/regenerate_summary_index_task.py b/api/tasks/regenerate_summary_index_task.py index cf8988d13e..39c2f4103e 100644 --- a/api/tasks/regenerate_summary_index_task.py +++ b/api/tasks/regenerate_summary_index_task.py @@ -16,7 +16,7 @@ from services.summary_index_service import SummaryIndexService logger = logging.getLogger(__name__) -@shared_task(queue="dataset") +@shared_task(queue="dataset_summary") def regenerate_summary_index_task( dataset_id: str, regenerate_reason: str = "summary_model_changed", diff --git a/api/tests/integration_tests/controllers/console/app/test_description_validation.py b/api/tests/integration_tests/controllers/console/app/test_description_validation.py index 8160807e48..f36c596eb8 100644 --- a/api/tests/integration_tests/controllers/console/app/test_description_validation.py +++ b/api/tests/integration_tests/controllers/console/app/test_description_validation.py @@ -5,14 +5,10 @@ This test module validates the 400-character limit enforcement for App descriptions across all creation and editing endpoints. """ -import os import sys import pytest -# Add the API root to Python path for imports -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..")) - class TestAppDescriptionValidationUnit: """Unit tests for description validation function""" diff --git a/api/tests/test_containers_integration_tests/conftest.py b/api/tests/test_containers_integration_tests/conftest.py index d6d2d30305..2a23f1ea7d 100644 --- a/api/tests/test_containers_integration_tests/conftest.py +++ b/api/tests/test_containers_integration_tests/conftest.py @@ -10,8 +10,11 @@ more reliable and realistic test scenarios. import logging import os from collections.abc import Generator +from contextlib import contextmanager from pathlib import Path +from typing import Protocol, TypeVar +import psycopg2 import pytest from flask import Flask from flask.testing import FlaskClient @@ -31,6 +34,25 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(level logger = logging.getLogger(__name__) +class _CloserProtocol(Protocol): + """_Closer is any type which implement the close() method.""" + + def close(self): + """close the current object, release any external resouece (file, transaction, connection etc.) + associated with it. + """ + pass + + +_Closer = TypeVar("_Closer", bound=_CloserProtocol) + + +@contextmanager +def _auto_close(closer: _Closer) -> Generator[_Closer, None, None]: + yield closer + closer.close() + + class DifyTestContainers: """ Manages all test containers required for Dify integration tests. @@ -97,45 +119,28 @@ class DifyTestContainers: wait_for_logs(self.postgres, "is ready to accept connections", timeout=30) logger.info("PostgreSQL container is ready and accepting connections") - # Install uuid-ossp extension for UUID generation - logger.info("Installing uuid-ossp extension...") - try: - import psycopg2 - - conn = psycopg2.connect( - host=db_host, - port=db_port, - user=self.postgres.username, - password=self.postgres.password, - database=self.postgres.dbname, - ) - conn.autocommit = True - cursor = conn.cursor() - cursor.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') - cursor.close() - conn.close() + conn = psycopg2.connect( + host=db_host, + port=db_port, + user=self.postgres.username, + password=self.postgres.password, + database=self.postgres.dbname, + ) + conn.autocommit = True + with _auto_close(conn): + with conn.cursor() as cursor: + # Install uuid-ossp extension for UUID generation + logger.info("Installing uuid-ossp extension...") + cursor.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') logger.info("uuid-ossp extension installed successfully") - except Exception as e: - logger.warning("Failed to install uuid-ossp extension: %s", e) - # Create plugin database for dify-plugin-daemon - logger.info("Creating plugin database...") - try: - conn = psycopg2.connect( - host=db_host, - port=db_port, - user=self.postgres.username, - password=self.postgres.password, - database=self.postgres.dbname, - ) - conn.autocommit = True - cursor = conn.cursor() - cursor.execute("CREATE DATABASE dify_plugin;") - cursor.close() - conn.close() + # NOTE: We cannot use `with conn.cursor() as cursor:` as it will wrap the statement + # inside a transaction. However, the `CREATE DATABASE` statement cannot run inside a transaction block. + with _auto_close(conn.cursor()) as cursor: + # Create plugin database for dify-plugin-daemon + logger.info("Creating plugin database...") + cursor.execute("CREATE DATABASE dify_plugin;") logger.info("Plugin database created successfully") - except Exception as e: - logger.warning("Failed to create plugin database: %s", e) # Set up storage environment variables os.environ.setdefault("STORAGE_TYPE", "opendal") @@ -258,23 +263,16 @@ class DifyTestContainers: containers = [self.redis, self.postgres, self.dify_sandbox, self.dify_plugin_daemon] for container in containers: if container: - try: - container_name = container.image - logger.info("Stopping container: %s", container_name) - container.stop() - logger.info("Successfully stopped container: %s", container_name) - except Exception as e: - # Log error but don't fail the test cleanup - logger.warning("Failed to stop container %s: %s", container, e) + container_name = container.image + logger.info("Stopping container: %s", container_name) + container.stop() + logger.info("Successfully stopped container: %s", container_name) # Stop and remove the network if self.network: - try: - logger.info("Removing Docker network...") - self.network.remove() - logger.info("Successfully removed Docker network") - except Exception as e: - logger.warning("Failed to remove Docker network: %s", e) + logger.info("Removing Docker network...") + self.network.remove() + logger.info("Successfully removed Docker network") self._containers_started = False logger.info("All test containers stopped and cleaned up successfully") diff --git a/api/tests/test_containers_integration_tests/services/test_message_export_service.py b/api/tests/test_containers_integration_tests/services/test_message_export_service.py new file mode 100644 index 0000000000..200f688ae9 --- /dev/null +++ b/api/tests/test_containers_integration_tests/services/test_message_export_service.py @@ -0,0 +1,233 @@ +import datetime +import json +import uuid +from decimal import Decimal + +import pytest +from sqlalchemy.orm import Session + +from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole +from models.model import ( + App, + AppAnnotationHitHistory, + Conversation, + DatasetRetrieverResource, + Message, + MessageAgentThought, + MessageAnnotation, + MessageChain, + MessageFeedback, + MessageFile, +) +from models.web import SavedMessage +from services.retention.conversation.message_export_service import AppMessageExportService, AppMessageExportStats + + +class TestAppMessageExportServiceIntegration: + @pytest.fixture(autouse=True) + def cleanup_database(self, db_session_with_containers: Session): + yield + db_session_with_containers.query(DatasetRetrieverResource).delete() + db_session_with_containers.query(AppAnnotationHitHistory).delete() + db_session_with_containers.query(SavedMessage).delete() + db_session_with_containers.query(MessageFile).delete() + db_session_with_containers.query(MessageAgentThought).delete() + db_session_with_containers.query(MessageChain).delete() + db_session_with_containers.query(MessageAnnotation).delete() + db_session_with_containers.query(MessageFeedback).delete() + db_session_with_containers.query(Message).delete() + db_session_with_containers.query(Conversation).delete() + db_session_with_containers.query(App).delete() + db_session_with_containers.query(TenantAccountJoin).delete() + db_session_with_containers.query(Tenant).delete() + db_session_with_containers.query(Account).delete() + db_session_with_containers.commit() + + @staticmethod + def _create_app_context(session: Session) -> tuple[App, Conversation]: + account = Account( + email=f"test-{uuid.uuid4()}@example.com", + name="tester", + interface_language="en-US", + status="active", + ) + session.add(account) + session.flush() + + tenant = Tenant(name=f"tenant-{uuid.uuid4()}", status="normal") + session.add(tenant) + session.flush() + + join = TenantAccountJoin( + tenant_id=tenant.id, + account_id=account.id, + role=TenantAccountRole.OWNER, + current=True, + ) + session.add(join) + session.flush() + + app = App( + tenant_id=tenant.id, + name="export-app", + description="integration test app", + mode="chat", + enable_site=True, + enable_api=True, + api_rpm=60, + api_rph=3600, + is_demo=False, + is_public=False, + created_by=account.id, + updated_by=account.id, + ) + session.add(app) + session.flush() + + conversation = Conversation( + app_id=app.id, + app_model_config_id=str(uuid.uuid4()), + model_provider="openai", + model_id="gpt-4o-mini", + mode="chat", + name="conv", + inputs={"seed": 1}, + status="normal", + from_source="api", + from_end_user_id=str(uuid.uuid4()), + ) + session.add(conversation) + session.commit() + return app, conversation + + @staticmethod + def _create_message( + session: Session, + app: App, + conversation: Conversation, + created_at: datetime.datetime, + *, + query: str, + answer: str, + inputs: dict, + message_metadata: str | None, + ) -> Message: + message = Message( + app_id=app.id, + conversation_id=conversation.id, + model_provider="openai", + model_id="gpt-4o-mini", + inputs=inputs, + query=query, + answer=answer, + message=[{"role": "assistant", "content": answer}], + message_tokens=10, + message_unit_price=Decimal("0.001"), + answer_tokens=20, + answer_unit_price=Decimal("0.002"), + total_price=Decimal("0.003"), + currency="USD", + message_metadata=message_metadata, + from_source="api", + from_end_user_id=conversation.from_end_user_id, + created_at=created_at, + ) + session.add(message) + session.flush() + return message + + def test_iter_records_with_stats(self, db_session_with_containers: Session): + app, conversation = self._create_app_context(db_session_with_containers) + + first_inputs = { + "plain": "v1", + "nested": {"a": 1, "b": [1, {"x": True}]}, + "list": ["x", 2, {"y": "z"}], + } + second_inputs = {"other": "value", "items": [1, 2, 3]} + + base_time = datetime.datetime(2026, 2, 25, 10, 0, 0) + first_message = self._create_message( + db_session_with_containers, + app, + conversation, + created_at=base_time, + query="q1", + answer="a1", + inputs=first_inputs, + message_metadata=json.dumps({"retriever_resources": [{"dataset_id": "ds-1"}]}), + ) + second_message = self._create_message( + db_session_with_containers, + app, + conversation, + created_at=base_time + datetime.timedelta(minutes=1), + query="q2", + answer="a2", + inputs=second_inputs, + message_metadata=None, + ) + + user_feedback_1 = MessageFeedback( + app_id=app.id, + conversation_id=conversation.id, + message_id=first_message.id, + rating="like", + from_source="user", + content="first", + from_end_user_id=conversation.from_end_user_id, + ) + user_feedback_2 = MessageFeedback( + app_id=app.id, + conversation_id=conversation.id, + message_id=first_message.id, + rating="dislike", + from_source="user", + content="second", + from_end_user_id=conversation.from_end_user_id, + ) + admin_feedback = MessageFeedback( + app_id=app.id, + conversation_id=conversation.id, + message_id=first_message.id, + rating="like", + from_source="admin", + content="should-be-filtered", + from_account_id=str(uuid.uuid4()), + ) + db_session_with_containers.add_all([user_feedback_1, user_feedback_2, admin_feedback]) + user_feedback_1.created_at = base_time + datetime.timedelta(minutes=2) + user_feedback_2.created_at = base_time + datetime.timedelta(minutes=3) + admin_feedback.created_at = base_time + datetime.timedelta(minutes=4) + db_session_with_containers.commit() + + service = AppMessageExportService( + app_id=app.id, + start_from=base_time - datetime.timedelta(minutes=1), + end_before=base_time + datetime.timedelta(minutes=10), + filename="unused", + batch_size=1, + dry_run=True, + ) + stats = AppMessageExportStats() + records = list(service._iter_records_with_stats(stats)) + service._finalize_stats(stats) + + assert len(records) == 2 + assert records[0].message_id == first_message.id + assert records[1].message_id == second_message.id + + assert records[0].inputs == first_inputs + assert records[1].inputs == second_inputs + + assert records[0].retriever_resources == [{"dataset_id": "ds-1"}] + assert records[1].retriever_resources == [] + + assert [feedback.rating for feedback in records[0].feedback] == ["like", "dislike"] + assert [feedback.content for feedback in records[0].feedback] == ["first", "second"] + assert records[1].feedback == [] + + assert stats.batches == 2 + assert stats.total_messages == 2 + assert stats.messages_with_feedback == 1 + assert stats.total_feedbacks == 2 diff --git a/api/tests/unit_tests/conftest.py b/api/tests/unit_tests/conftest.py index d2111ebac8..3f75fd2851 100644 --- a/api/tests/unit_tests/conftest.py +++ b/api/tests/unit_tests/conftest.py @@ -32,11 +32,6 @@ os.environ.setdefault("OPENDAL_SCHEME", "fs") os.environ.setdefault("OPENDAL_FS_ROOT", "/tmp/dify-storage") os.environ.setdefault("STORAGE_TYPE", "opendal") -# Add the API directory to Python path to ensure proper imports -import sys - -sys.path.insert(0, PROJECT_DIR) - from core.db.session_factory import configure_session_factory, session_factory from extensions import ext_redis diff --git a/api/tests/unit_tests/controllers/common/test_errors.py b/api/tests/unit_tests/controllers/common/test_errors.py new file mode 100644 index 0000000000..25a9fe5b66 --- /dev/null +++ b/api/tests/unit_tests/controllers/common/test_errors.py @@ -0,0 +1,70 @@ +from controllers.common.errors import ( + BlockedFileExtensionError, + FilenameNotExistsError, + FileTooLargeError, + NoFileUploadedError, + RemoteFileUploadError, + TooManyFilesError, + UnsupportedFileTypeError, +) + + +class TestFilenameNotExistsError: + def test_defaults(self): + error = FilenameNotExistsError() + + assert error.code == 400 + assert error.description == "The specified filename does not exist." + + +class TestRemoteFileUploadError: + def test_defaults(self): + error = RemoteFileUploadError() + + assert error.code == 400 + assert error.description == "Error uploading remote file." + + +class TestFileTooLargeError: + def test_defaults(self): + error = FileTooLargeError() + + assert error.code == 413 + assert error.error_code == "file_too_large" + assert error.description == "File size exceeded. {message}" + + +class TestUnsupportedFileTypeError: + def test_defaults(self): + error = UnsupportedFileTypeError() + + assert error.code == 415 + assert error.error_code == "unsupported_file_type" + assert error.description == "File type not allowed." + + +class TestBlockedFileExtensionError: + def test_defaults(self): + error = BlockedFileExtensionError() + + assert error.code == 400 + assert error.error_code == "file_extension_blocked" + assert error.description == "The file extension is blocked for security reasons." + + +class TestTooManyFilesError: + def test_defaults(self): + error = TooManyFilesError() + + assert error.code == 400 + assert error.error_code == "too_many_files" + assert error.description == "Only one file is allowed." + + +class TestNoFileUploadedError: + def test_defaults(self): + error = NoFileUploadedError() + + assert error.code == 400 + assert error.error_code == "no_file_uploaded" + assert error.description == "Please upload your file." diff --git a/api/tests/unit_tests/controllers/common/test_file_response.py b/api/tests/unit_tests/controllers/common/test_file_response.py index 2487c362bd..b7500fb7f9 100644 --- a/api/tests/unit_tests/controllers/common/test_file_response.py +++ b/api/tests/unit_tests/controllers/common/test_file_response.py @@ -1,22 +1,95 @@ from flask import Response -from controllers.common.file_response import enforce_download_for_html, is_html_content +from controllers.common.file_response import ( + _normalize_mime_type, + enforce_download_for_html, + is_html_content, +) -class TestFileResponseHelpers: - def test_is_html_content_detects_mime_type(self): +class TestNormalizeMimeType: + def test_returns_empty_string_for_none(self): + assert _normalize_mime_type(None) == "" + + def test_returns_empty_string_for_empty_string(self): + assert _normalize_mime_type("") == "" + + def test_normalizes_mime_type(self): + assert _normalize_mime_type("Text/HTML; Charset=UTF-8") == "text/html" + + +class TestIsHtmlContent: + def test_detects_html_via_mime_type(self): mime_type = "text/html; charset=UTF-8" - result = is_html_content(mime_type, filename="file.txt", extension="txt") + result = is_html_content( + mime_type=mime_type, + filename="file.txt", + extension="txt", + ) assert result is True - def test_is_html_content_detects_extension(self): - result = is_html_content("text/plain", filename="report.html", extension=None) + def test_detects_html_via_extension_argument(self): + result = is_html_content( + mime_type="text/plain", + filename=None, + extension="html", + ) assert result is True - def test_enforce_download_for_html_sets_headers(self): + def test_detects_html_via_filename_extension(self): + result = is_html_content( + mime_type="text/plain", + filename="report.html", + extension=None, + ) + + assert result is True + + def test_returns_false_when_no_html_detected_anywhere(self): + """ + Missing negative test: + - MIME type is not HTML + - filename has no HTML extension + - extension argument is not HTML + """ + result = is_html_content( + mime_type="application/json", + filename="data.json", + extension="json", + ) + + assert result is False + + def test_returns_false_when_all_inputs_are_none(self): + result = is_html_content( + mime_type=None, + filename=None, + extension=None, + ) + + assert result is False + + +class TestEnforceDownloadForHtml: + def test_sets_attachment_when_filename_missing(self): + response = Response("payload", mimetype="text/html") + + updated = enforce_download_for_html( + response, + mime_type="text/html", + filename=None, + extension="html", + ) + + assert updated is True + assert response.headers["Content-Disposition"] == "attachment" + assert response.headers["Content-Type"] == "application/octet-stream" + assert response.headers["X-Content-Type-Options"] == "nosniff" + + def test_sets_headers_when_filename_present(self): response = Response("payload", mimetype="text/html") updated = enforce_download_for_html( @@ -27,11 +100,12 @@ class TestFileResponseHelpers: ) assert updated is True - assert "attachment" in response.headers["Content-Disposition"] + assert response.headers["Content-Disposition"].startswith("attachment") + assert "unsafe.html" in response.headers["Content-Disposition"] assert response.headers["Content-Type"] == "application/octet-stream" assert response.headers["X-Content-Type-Options"] == "nosniff" - def test_enforce_download_for_html_no_change_for_non_html(self): + def test_does_not_modify_response_for_non_html_content(self): response = Response("payload", mimetype="text/plain") updated = enforce_download_for_html( diff --git a/api/tests/unit_tests/controllers/common/test_helpers.py b/api/tests/unit_tests/controllers/common/test_helpers.py new file mode 100644 index 0000000000..59c463177c --- /dev/null +++ b/api/tests/unit_tests/controllers/common/test_helpers.py @@ -0,0 +1,188 @@ +from uuid import UUID + +import httpx +import pytest + +from controllers.common import helpers +from controllers.common.helpers import FileInfo, guess_file_info_from_response + + +def make_response( + url="https://example.com/file.txt", + headers=None, + content=None, +): + return httpx.Response( + 200, + request=httpx.Request("GET", url), + headers=headers or {}, + content=content or b"", + ) + + +class TestGuessFileInfoFromResponse: + def test_filename_from_url(self): + response = make_response( + url="https://example.com/test.pdf", + content=b"Hello World", + ) + + info = guess_file_info_from_response(response) + + assert info.filename == "test.pdf" + assert info.extension == ".pdf" + assert info.mimetype == "application/pdf" + + def test_filename_from_content_disposition(self): + headers = { + "Content-Disposition": "attachment; filename=myfile.csv", + "Content-Type": "text/csv", + } + response = make_response( + url="https://example.com/", + headers=headers, + content=b"Hello World", + ) + + info = guess_file_info_from_response(response) + + assert info.filename == "myfile.csv" + assert info.extension == ".csv" + assert info.mimetype == "text/csv" + + @pytest.mark.parametrize( + ("magic_available", "expected_ext"), + [ + (True, "txt"), + (False, "bin"), + ], + ) + def test_generated_filename_when_missing(self, monkeypatch, magic_available, expected_ext): + if magic_available: + if helpers.magic is None: + pytest.skip("python-magic is not installed, cannot run 'magic_available=True' test variant") + else: + monkeypatch.setattr(helpers, "magic", None) + + response = make_response( + url="https://example.com/", + content=b"Hello World", + ) + + info = guess_file_info_from_response(response) + + name, ext = info.filename.split(".") + UUID(name) + assert ext == expected_ext + + def test_mimetype_from_header_when_unknown(self): + headers = {"Content-Type": "application/json"} + response = make_response( + url="https://example.com/file.unknown", + headers=headers, + content=b'{"a": 1}', + ) + + info = guess_file_info_from_response(response) + + assert info.mimetype == "application/json" + + def test_extension_added_when_missing(self): + headers = {"Content-Type": "image/png"} + response = make_response( + url="https://example.com/image", + headers=headers, + content=b"fakepngdata", + ) + + info = guess_file_info_from_response(response) + + assert info.extension == ".png" + assert info.filename.endswith(".png") + + def test_content_length_used_as_size(self): + headers = { + "Content-Length": "1234", + "Content-Type": "text/plain", + } + response = make_response( + url="https://example.com/a.txt", + headers=headers, + content=b"a" * 1234, + ) + + info = guess_file_info_from_response(response) + + assert info.size == 1234 + + def test_size_minus_one_when_header_missing(self): + response = make_response(url="https://example.com/a.txt") + + info = guess_file_info_from_response(response) + + assert info.size == -1 + + def test_fallback_to_bin_extension(self): + headers = {"Content-Type": "application/octet-stream"} + response = make_response( + url="https://example.com/download", + headers=headers, + content=b"\x00\x01\x02\x03", + ) + + info = guess_file_info_from_response(response) + + assert info.extension == ".bin" + assert info.filename.endswith(".bin") + + def test_return_type(self): + response = make_response() + + info = guess_file_info_from_response(response) + + assert isinstance(info, FileInfo) + + +class TestMagicImportWarnings: + @pytest.mark.parametrize( + ("platform_name", "expected_message"), + [ + ("Windows", "pip install python-magic-bin"), + ("Darwin", "brew install libmagic"), + ("Linux", "sudo apt-get install libmagic1"), + ("Other", "install `libmagic`"), + ], + ) + def test_magic_import_warning_per_platform( + self, + monkeypatch, + platform_name, + expected_message, + ): + import builtins + import importlib + + # Force ImportError when "magic" is imported + real_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "magic": + raise ImportError("No module named magic") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + monkeypatch.setattr("platform.system", lambda: platform_name) + + # Remove helpers so it imports fresh + import sys + + original_helpers = sys.modules.get(helpers.__name__) + sys.modules.pop(helpers.__name__, None) + + try: + with pytest.warns(UserWarning, match="To use python-magic") as warning: + imported_helpers = importlib.import_module(helpers.__name__) + assert expected_message in str(warning[0].message) + finally: + if original_helpers is not None: + sys.modules[helpers.__name__] = original_helpers diff --git a/api/tests/unit_tests/controllers/common/test_schema.py b/api/tests/unit_tests/controllers/common/test_schema.py new file mode 100644 index 0000000000..56c8160f02 --- /dev/null +++ b/api/tests/unit_tests/controllers/common/test_schema.py @@ -0,0 +1,189 @@ +import sys +from enum import StrEnum +from unittest.mock import MagicMock, patch + +import pytest +from flask_restx import Namespace +from pydantic import BaseModel + + +class UserModel(BaseModel): + id: int + name: str + + +class ProductModel(BaseModel): + id: int + price: float + + +@pytest.fixture(autouse=True) +def mock_console_ns(): + """Mock the console_ns to avoid circular imports during test collection.""" + mock_ns = MagicMock(spec=Namespace) + mock_ns.models = {} + + # Inject mock before importing schema module + with patch.dict(sys.modules, {"controllers.console": MagicMock(console_ns=mock_ns)}): + yield mock_ns + + +def test_default_ref_template_value(): + from controllers.common.schema import DEFAULT_REF_TEMPLATE_SWAGGER_2_0 + + assert DEFAULT_REF_TEMPLATE_SWAGGER_2_0 == "#/definitions/{model}" + + +def test_register_schema_model_calls_namespace_schema_model(): + from controllers.common.schema import register_schema_model + + namespace = MagicMock(spec=Namespace) + + register_schema_model(namespace, UserModel) + + namespace.schema_model.assert_called_once() + + model_name, schema = namespace.schema_model.call_args.args + + assert model_name == "UserModel" + assert isinstance(schema, dict) + assert "properties" in schema + + +def test_register_schema_model_passes_schema_from_pydantic(): + from controllers.common.schema import DEFAULT_REF_TEMPLATE_SWAGGER_2_0, register_schema_model + + namespace = MagicMock(spec=Namespace) + + register_schema_model(namespace, UserModel) + + schema = namespace.schema_model.call_args.args[1] + + expected_schema = UserModel.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0) + + assert schema == expected_schema + + +def test_register_schema_models_registers_multiple_models(): + from controllers.common.schema import register_schema_models + + namespace = MagicMock(spec=Namespace) + + register_schema_models(namespace, UserModel, ProductModel) + + assert namespace.schema_model.call_count == 2 + + called_names = [call.args[0] for call in namespace.schema_model.call_args_list] + assert called_names == ["UserModel", "ProductModel"] + + +def test_register_schema_models_calls_register_schema_model(monkeypatch): + from controllers.common.schema import register_schema_models + + namespace = MagicMock(spec=Namespace) + + calls = [] + + def fake_register(ns, model): + calls.append((ns, model)) + + monkeypatch.setattr( + "controllers.common.schema.register_schema_model", + fake_register, + ) + + register_schema_models(namespace, UserModel, ProductModel) + + assert calls == [ + (namespace, UserModel), + (namespace, ProductModel), + ] + + +class StatusEnum(StrEnum): + ACTIVE = "active" + INACTIVE = "inactive" + + +class PriorityEnum(StrEnum): + HIGH = "high" + LOW = "low" + + +def test_get_or_create_model_returns_existing_model(mock_console_ns): + from controllers.common.schema import get_or_create_model + + existing_model = MagicMock() + mock_console_ns.models = {"TestModel": existing_model} + + result = get_or_create_model("TestModel", {"key": "value"}) + + assert result == existing_model + mock_console_ns.model.assert_not_called() + + +def test_get_or_create_model_creates_new_model_when_not_exists(mock_console_ns): + from controllers.common.schema import get_or_create_model + + mock_console_ns.models = {} + new_model = MagicMock() + mock_console_ns.model.return_value = new_model + field_def = {"name": {"type": "string"}} + + result = get_or_create_model("NewModel", field_def) + + assert result == new_model + mock_console_ns.model.assert_called_once_with("NewModel", field_def) + + +def test_get_or_create_model_does_not_call_model_if_exists(mock_console_ns): + from controllers.common.schema import get_or_create_model + + existing_model = MagicMock() + mock_console_ns.models = {"ExistingModel": existing_model} + + result = get_or_create_model("ExistingModel", {"key": "value"}) + + assert result == existing_model + mock_console_ns.model.assert_not_called() + + +def test_register_enum_models_registers_single_enum(): + from controllers.common.schema import register_enum_models + + namespace = MagicMock(spec=Namespace) + + register_enum_models(namespace, StatusEnum) + + namespace.schema_model.assert_called_once() + + model_name, schema = namespace.schema_model.call_args.args + + assert model_name == "StatusEnum" + assert isinstance(schema, dict) + + +def test_register_enum_models_registers_multiple_enums(): + from controllers.common.schema import register_enum_models + + namespace = MagicMock(spec=Namespace) + + register_enum_models(namespace, StatusEnum, PriorityEnum) + + assert namespace.schema_model.call_count == 2 + + called_names = [call.args[0] for call in namespace.schema_model.call_args_list] + assert called_names == ["StatusEnum", "PriorityEnum"] + + +def test_register_enum_models_uses_correct_ref_template(): + from controllers.common.schema import register_enum_models + + namespace = MagicMock(spec=Namespace) + + register_enum_models(namespace, StatusEnum) + + schema = namespace.schema_model.call_args.args[1] + + # Verify the schema contains enum values + assert "enum" in schema or "anyOf" in schema diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_extra_contents.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_extra_contents.py index be773557f6..83a6e0f231 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_extra_contents.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_extra_contents.py @@ -9,8 +9,16 @@ import pytest from core.app.apps.advanced_chat import generate_task_pipeline as pipeline_module from core.app.entities.app_invoke_entities import InvokeFrom -from core.app.entities.queue_entities import QueueTextChunkEvent, QueueWorkflowPausedEvent +from core.app.entities.queue_entities import ( + QueuePingEvent, + QueueTextChunkEvent, + QueueWorkflowPartialSuccessEvent, + QueueWorkflowPausedEvent, + QueueWorkflowSucceededEvent, +) +from core.app.entities.task_entities import StreamEvent from dify_graph.entities.pause_reason import HumanInputRequired +from dify_graph.enums import WorkflowExecutionStatus from models.enums import MessageStatus from models.execution_extra_content import HumanInputContent from models.model import EndUser @@ -185,3 +193,97 @@ def test_resume_appends_chunks_to_paused_answer() -> None: assert message.answer == "beforeafter" assert message.status == MessageStatus.NORMAL + + +def test_workflow_succeeded_emits_message_end_before_workflow_finished() -> None: + pipeline = _build_pipeline() + pipeline._application_generate_entity = SimpleNamespace(task_id="task-1") + pipeline._workflow_id = "workflow-1" + pipeline._ensure_workflow_initialized = mock.Mock() + runtime_state = SimpleNamespace() + pipeline._ensure_graph_runtime_initialized = mock.Mock(return_value=runtime_state) + pipeline._handle_advanced_chat_message_end_event = mock.Mock( + return_value=iter([SimpleNamespace(event=StreamEvent.MESSAGE_END)]) + ) + pipeline._workflow_response_converter = mock.Mock() + pipeline._workflow_response_converter.workflow_finish_to_stream_response.return_value = SimpleNamespace( + event=StreamEvent.WORKFLOW_FINISHED, + data=SimpleNamespace(status=WorkflowExecutionStatus.SUCCEEDED), + ) + + event = QueueWorkflowSucceededEvent(outputs={}) + responses = list(pipeline._handle_workflow_succeeded_event(event)) + + assert [resp.event for resp in responses] == [StreamEvent.MESSAGE_END, StreamEvent.WORKFLOW_FINISHED] + + +def test_workflow_partial_success_emits_message_end_before_workflow_finished() -> None: + pipeline = _build_pipeline() + pipeline._application_generate_entity = SimpleNamespace(task_id="task-1") + pipeline._workflow_id = "workflow-1" + pipeline._ensure_workflow_initialized = mock.Mock() + runtime_state = SimpleNamespace() + pipeline._ensure_graph_runtime_initialized = mock.Mock(return_value=runtime_state) + pipeline._handle_advanced_chat_message_end_event = mock.Mock( + return_value=iter([SimpleNamespace(event=StreamEvent.MESSAGE_END)]) + ) + pipeline._workflow_response_converter = mock.Mock() + pipeline._workflow_response_converter.workflow_finish_to_stream_response.return_value = SimpleNamespace( + event=StreamEvent.WORKFLOW_FINISHED, + data=SimpleNamespace(status=WorkflowExecutionStatus.PARTIAL_SUCCEEDED), + ) + + event = QueueWorkflowPartialSuccessEvent(exceptions_count=1, outputs={}) + responses = list(pipeline._handle_workflow_partial_success_event(event)) + + assert [resp.event for resp in responses] == [StreamEvent.MESSAGE_END, StreamEvent.WORKFLOW_FINISHED] + + +def test_process_stream_response_breaks_after_workflow_succeeded() -> None: + pipeline = _build_pipeline() + succeeded_event = QueueWorkflowSucceededEvent(outputs={}) + ping_event = QueuePingEvent() + queue_messages = [ + SimpleNamespace(event=succeeded_event), + SimpleNamespace(event=ping_event), + ] + + pipeline._conversation_name_generate_thread = None + pipeline._base_task_pipeline = mock.Mock() + pipeline._base_task_pipeline.queue_manager = mock.Mock() + pipeline._base_task_pipeline.queue_manager.listen.return_value = iter(queue_messages) + pipeline._base_task_pipeline.ping_stream_response = mock.Mock(return_value=SimpleNamespace(event=StreamEvent.PING)) + pipeline._handle_workflow_succeeded_event = mock.Mock( + return_value=iter([SimpleNamespace(event=StreamEvent.WORKFLOW_FINISHED)]) + ) + + responses = list(pipeline._process_stream_response()) + + assert [resp.event for resp in responses] == [StreamEvent.WORKFLOW_FINISHED] + pipeline._handle_workflow_succeeded_event.assert_called_once_with(succeeded_event, trace_manager=None) + pipeline._base_task_pipeline.ping_stream_response.assert_not_called() + + +def test_process_stream_response_breaks_after_workflow_partial_success() -> None: + pipeline = _build_pipeline() + partial_event = QueueWorkflowPartialSuccessEvent(exceptions_count=1, outputs={}) + ping_event = QueuePingEvent() + queue_messages = [ + SimpleNamespace(event=partial_event), + SimpleNamespace(event=ping_event), + ] + + pipeline._conversation_name_generate_thread = None + pipeline._base_task_pipeline = mock.Mock() + pipeline._base_task_pipeline.queue_manager = mock.Mock() + pipeline._base_task_pipeline.queue_manager.listen.return_value = iter(queue_messages) + pipeline._base_task_pipeline.ping_stream_response = mock.Mock(return_value=SimpleNamespace(event=StreamEvent.PING)) + pipeline._handle_workflow_partial_success_event = mock.Mock( + return_value=iter([SimpleNamespace(event=StreamEvent.WORKFLOW_FINISHED)]) + ) + + responses = list(pipeline._process_stream_response()) + + assert [resp.event for resp in responses] == [StreamEvent.WORKFLOW_FINISHED] + pipeline._handle_workflow_partial_success_event.assert_called_once_with(partial_event, trace_manager=None) + pipeline._base_task_pipeline.ping_stream_response.assert_not_called() diff --git a/api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py b/api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py index f0d9afc0db..a25e3ec3f5 100644 --- a/api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py @@ -124,12 +124,12 @@ def test_message_cycle_manager_uses_new_conversation_flag(monkeypatch): def start(self): self.started = True - def fake_thread(**kwargs): + def fake_thread(*args, **kwargs): thread = DummyThread(**kwargs) captured["thread"] = thread return thread - monkeypatch.setattr(message_cycle_manager, "Thread", fake_thread) + monkeypatch.setattr(message_cycle_manager, "Timer", fake_thread) manager = MessageCycleManager(application_generate_entity=entity, task_state=MagicMock()) thread = manager.generate_conversation_name(conversation_id="existing-conversation-id", query="hello") diff --git a/api/tests/unit_tests/core/app/apps/test_pause_resume.py b/api/tests/unit_tests/core/app/apps/test_pause_resume.py index 44af89601c..e019a4b977 100644 --- a/api/tests/unit_tests/core/app/apps/test_pause_resume.py +++ b/api/tests/unit_tests/core/app/apps/test_pause_resume.py @@ -1,13 +1,8 @@ import sys import time -from pathlib import Path from types import ModuleType, SimpleNamespace from typing import Any -API_DIR = str(Path(__file__).resolve().parents[5]) -if API_DIR not in sys.path: - sys.path.insert(0, API_DIR) - import dify_graph.nodes.human_input.entities # noqa: F401 from core.app.apps.advanced_chat import app_generator as adv_app_gen_module from core.app.apps.workflow import app_generator as wf_app_gen_module diff --git a/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py b/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py new file mode 100644 index 0000000000..582990c88a --- /dev/null +++ b/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py @@ -0,0 +1,425 @@ +""" +Unit tests for EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response method. + +This test suite ensures that the files array is correctly populated in the message_end +SSE event, which is critical for vision/image chat responses to render correctly. + +Test Coverage: +- Files array populated when MessageFile records exist +- Files array is None when no MessageFile records exist +- Correct signed URL generation for LOCAL_FILE transfer method +- Correct URL handling for REMOTE_URL transfer method +- Correct URL handling for TOOL_FILE transfer method +- Proper file metadata formatting (filename, mime_type, size, extension) +""" + +import uuid +from unittest.mock import MagicMock, Mock, patch + +import pytest +from sqlalchemy.orm import Session + +from core.app.entities.task_entities import MessageEndStreamResponse +from core.app.task_pipeline.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline +from dify_graph.file.enums import FileTransferMethod +from models.model import MessageFile, UploadFile + + +class TestMessageEndStreamResponseFiles: + """Test suite for files array population in message_end SSE event.""" + + @pytest.fixture + def mock_pipeline(self): + """Create a mock EasyUIBasedGenerateTaskPipeline instance.""" + pipeline = Mock(spec=EasyUIBasedGenerateTaskPipeline) + pipeline._message_id = str(uuid.uuid4()) + pipeline._task_state = Mock() + pipeline._task_state.metadata = Mock() + pipeline._task_state.metadata.model_dump = Mock(return_value={"test": "metadata"}) + pipeline._task_state.llm_result = Mock() + pipeline._task_state.llm_result.usage = Mock() + pipeline._application_generate_entity = Mock() + pipeline._application_generate_entity.task_id = str(uuid.uuid4()) + return pipeline + + @pytest.fixture + def mock_message_file_local(self): + """Create a mock MessageFile with LOCAL_FILE transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.LOCAL_FILE + message_file.upload_file_id = str(uuid.uuid4()) + message_file.url = None + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_message_file_remote(self): + """Create a mock MessageFile with REMOTE_URL transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.REMOTE_URL + message_file.upload_file_id = None + message_file.url = "https://example.com/image.jpg" + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_message_file_tool(self): + """Create a mock MessageFile with TOOL_FILE transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.TOOL_FILE + message_file.upload_file_id = None + message_file.url = "tool_file_123.png" + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_upload_file(self, mock_message_file_local): + """Create a mock UploadFile.""" + upload_file = Mock(spec=UploadFile) + upload_file.id = mock_message_file_local.upload_file_id + upload_file.name = "test_image.png" + upload_file.mime_type = "image/png" + upload_file.size = 1024 + upload_file.extension = "png" + return upload_file + + def test_message_end_with_no_files(self, mock_pipeline): + """Test that files array is None when no MessageFile records exist.""" + # Arrange + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + mock_session.scalars.return_value.all.return_value = [] + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is None + assert result.id == mock_pipeline._message_id + assert result.metadata == {"test": "metadata"} + + def test_message_end_with_local_file(self, mock_pipeline, mock_message_file_local, mock_upload_file): + """Test that files array is populated correctly for LOCAL_FILE transfer method.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local] + + # Second query: UploadFile (batch query to avoid N+1) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [mock_upload_file] + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/signed-url?signature=abc123" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["related_id"] == mock_message_file_local.id + assert file_dict["filename"] == "test_image.png" + assert file_dict["mime_type"] == "image/png" + assert file_dict["size"] == 1024 + assert file_dict["extension"] == ".png" + assert file_dict["type"] == "image" + assert file_dict["transfer_method"] == FileTransferMethod.LOCAL_FILE.value + assert "https://example.com/signed-url" in file_dict["url"] + assert file_dict["upload_file_id"] == mock_message_file_local.upload_file_id + assert file_dict["remote_url"] == "" + + # Verify database queries + # Should be called twice: once for MessageFile, once for UploadFile + assert mock_session.scalars.call_count == 2 + mock_get_url.assert_called_once_with(upload_file_id=str(mock_upload_file.id)) + + def test_message_end_with_remote_url(self, mock_pipeline, mock_message_file_remote): + """Test that files array is populated correctly for REMOTE_URL transfer method.""" + # Arrange + mock_message_file_remote.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_remote] + mock_session.scalars.return_value = mock_scalars_result + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["related_id"] == mock_message_file_remote.id + assert file_dict["filename"] == "image.jpg" + assert file_dict["url"] == "https://example.com/image.jpg" + assert file_dict["extension"] == ".jpg" + assert file_dict["type"] == "image" + assert file_dict["transfer_method"] == FileTransferMethod.REMOTE_URL.value + assert file_dict["remote_url"] == "https://example.com/image.jpg" + assert file_dict["upload_file_id"] == mock_message_file_remote.id + + # Verify only one query for message_files is made + mock_session.scalars.assert_called_once() + + def test_message_end_with_tool_file_http(self, mock_pipeline, mock_message_file_tool): + """Test that files array is populated correctly for TOOL_FILE with HTTP URL.""" + # Arrange + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "https://example.com/tool_file.png" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["url"] == "https://example.com/tool_file.png" + assert file_dict["filename"] == "tool_file.png" + assert file_dict["extension"] == ".png" + assert file_dict["transfer_method"] == FileTransferMethod.TOOL_FILE.value + + def test_message_end_with_tool_file_local(self, mock_pipeline, mock_message_file_tool): + """Test that files array is populated correctly for TOOL_FILE with local path.""" + # Arrange + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "tool_file_123.png" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.sign_tool_file") as mock_sign_tool, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + + mock_sign_tool.return_value = "https://example.com/signed-tool-file.png?signature=xyz" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert "https://example.com/signed-tool-file.png" in file_dict["url"] + assert file_dict["filename"] == "tool_file_123.png" + assert file_dict["extension"] == ".png" + assert file_dict["transfer_method"] == FileTransferMethod.TOOL_FILE.value + + # Verify tool file signing was called + mock_sign_tool.assert_called_once_with(tool_file_id="tool_file_123", extension=".png") + + def test_message_end_with_tool_file_long_extension(self, mock_pipeline, mock_message_file_tool): + """Test that TOOL_FILE extensions longer than MAX_TOOL_FILE_EXTENSION_LENGTH fall back to .bin.""" + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "tool_file_abc.verylongextension" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.sign_tool_file") as mock_sign_tool, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + mock_sign_tool.return_value = "https://example.com/signed.bin" + + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + assert result.files is not None + file_dict = result.files[0] + assert file_dict["extension"] == ".bin" + mock_sign_tool.assert_called_once_with(tool_file_id="tool_file_abc", extension=".bin") + + def test_message_end_with_multiple_files( + self, mock_pipeline, mock_message_file_local, mock_message_file_remote, mock_upload_file + ): + """Test that files array contains all MessageFile records when multiple exist.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + mock_message_file_remote.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local, mock_message_file_remote] + + # Second query: UploadFile (batch query to avoid N+1) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [mock_upload_file] + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/signed-url?signature=abc123" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 2 + + # Verify both files are present + file_ids = [f["related_id"] for f in result.files] + assert mock_message_file_local.id in file_ids + assert mock_message_file_remote.id in file_ids + + def test_message_end_with_local_file_no_upload_file(self, mock_pipeline, mock_message_file_local): + """Test fallback when UploadFile is not found for LOCAL_FILE.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local] + + # Second query: UploadFile (batch query) - returns empty list (not found) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [] # UploadFile not found + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/fallback-url?signature=def456" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert "https://example.com/fallback-url" in file_dict["url"] + # Verify fallback URL was generated using upload_file_id from message_file + mock_get_url.assert_called_with(upload_file_id=str(mock_message_file_local.upload_file_id)) diff --git a/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py b/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py new file mode 100644 index 0000000000..c66e50437a --- /dev/null +++ b/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py @@ -0,0 +1,84 @@ +from datetime import datetime +from unittest.mock import MagicMock +from uuid import uuid4 + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from dify_graph.entities.workflow_execution import WorkflowExecution, WorkflowType +from models import Account, WorkflowRun +from models.enums import WorkflowRunTriggeredFrom + + +def _build_repository_with_mocked_session(session: MagicMock) -> SQLAlchemyWorkflowExecutionRepository: + engine = create_engine("sqlite:///:memory:") + real_session_factory = sessionmaker(bind=engine, expire_on_commit=False) + + user = MagicMock(spec=Account) + user.id = str(uuid4()) + user.current_tenant_id = str(uuid4()) + + repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=real_session_factory, + user=user, + app_id="app-id", + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + ) + + session_context = MagicMock() + session_context.__enter__.return_value = session + session_context.__exit__.return_value = False + repository._session_factory = MagicMock(return_value=session_context) + return repository + + +def _build_execution(*, execution_id: str, started_at: datetime) -> WorkflowExecution: + return WorkflowExecution.new( + id_=execution_id, + workflow_id="workflow-id", + workflow_type=WorkflowType.WORKFLOW, + workflow_version="1.0.0", + graph={"nodes": [], "edges": []}, + inputs={"query": "hello"}, + started_at=started_at, + ) + + +def test_save_uses_execution_started_at_when_record_does_not_exist(): + session = MagicMock() + session.get.return_value = None + repository = _build_repository_with_mocked_session(session) + + started_at = datetime(2026, 1, 1, 12, 0, 0) + execution = _build_execution(execution_id=str(uuid4()), started_at=started_at) + + repository.save(execution) + + saved_model = session.merge.call_args.args[0] + assert saved_model.created_at == started_at + session.commit.assert_called_once() + + +def test_save_preserves_existing_created_at_when_record_already_exists(): + session = MagicMock() + repository = _build_repository_with_mocked_session(session) + + execution_id = str(uuid4()) + existing_created_at = datetime(2026, 1, 1, 12, 0, 0) + existing_run = WorkflowRun() + existing_run.id = execution_id + existing_run.tenant_id = repository._tenant_id + existing_run.created_at = existing_created_at + session.get.return_value = existing_run + + execution = _build_execution( + execution_id=execution_id, + started_at=datetime(2026, 1, 1, 12, 30, 0), + ) + + repository.save(execution) + + saved_model = session.merge.call_args.args[0] + assert saved_model.created_at == existing_created_at + session.commit.assert_called_once() diff --git a/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py b/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py index 0df4927697..22792eb5b3 100644 --- a/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py +++ b/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py @@ -4,8 +4,10 @@ from unittest.mock import MagicMock, patch import pytest +from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID from dify_graph.model_runtime.entities.llm_entities import LLMUsage from dify_graph.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool +from dify_graph.variables.variables import StringVariable class StubCoordinator: @@ -278,3 +280,17 @@ class TestGraphRuntimeState: assert restored_execution.started is True assert new_stub.state == "configured" + + def test_snapshot_restore_preserves_updated_conversation_variable(self): + variable_pool = VariablePool( + conversation_variables=[StringVariable(name="session_name", value="before")], + ) + variable_pool.add((CONVERSATION_VARIABLE_NODE_ID, "session_name"), "after") + + state = GraphRuntimeState(variable_pool=variable_pool, start_at=time()) + snapshot = state.dumps() + restored = GraphRuntimeState.from_snapshot(snapshot) + + restored_value = restored.variable_pool.get((CONVERSATION_VARIABLE_NODE_ID, "session_name")) + assert restored_value is not None + assert restored_value.value == "after" diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_iteration_simple.py b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_iteration_simple.py index eb449e6d75..8c8e5977c8 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_iteration_simple.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_iteration_simple.py @@ -2,15 +2,7 @@ Simple test to verify MockNodeFactory works with iteration nodes. """ -import sys -from pathlib import Path - from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY - -# Add api directory to path -api_dir = Path(__file__).parent.parent.parent.parent.parent.parent -sys.path.insert(0, str(api_dir)) - from dify_graph.enums import NodeType from tests.unit_tests.core.workflow.graph_engine.test_mock_config import MockConfigBuilder from tests.unit_tests.core.workflow.graph_engine.test_mock_factory import MockNodeFactory diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_simple.py b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_simple.py index 84d1444585..693cdf9276 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_simple.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_simple.py @@ -3,14 +3,8 @@ Simple test to validate the auto-mock system without external dependencies. """ import sys -from pathlib import Path from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY - -# Add api directory to path -api_dir = Path(__file__).parent.parent.parent.parent.parent.parent -sys.path.insert(0, str(api_dir)) - from dify_graph.enums import NodeType from tests.unit_tests.core.workflow.graph_engine.test_mock_config import MockConfig, MockConfigBuilder, NodeMockConfig from tests.unit_tests.core.workflow.graph_engine.test_mock_factory import MockNodeFactory diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_retrieval/test_knowledge_retrieval_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_retrieval/test_knowledge_retrieval_node.py index e194d66ee3..6a538d81de 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_retrieval/test_knowledge_retrieval_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_retrieval/test_knowledge_retrieval_node.py @@ -205,6 +205,7 @@ class TestKnowledgeRetrievalNode: assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert "result" in result.outputs assert mock_rag_retrieval.knowledge_retrieval.called + mock_source.model_dump.assert_called_once_with(by_alias=True) def test_run_with_query_variable_multiple_mode( self, diff --git a/api/tests/unit_tests/services/test_export_app_messages.py b/api/tests/unit_tests/services/test_export_app_messages.py new file mode 100644 index 0000000000..5f2d3f21c0 --- /dev/null +++ b/api/tests/unit_tests/services/test_export_app_messages.py @@ -0,0 +1,43 @@ +import datetime + +import pytest + +from services.retention.conversation.message_export_service import AppMessageExportService + + +def test_validate_export_filename_accepts_relative_path(): + assert AppMessageExportService.validate_export_filename("exports/2026/test01") == "exports/2026/test01" + + +@pytest.mark.parametrize( + "filename", + [ + "test01.jsonl.gz", + "test01.jsonl", + "test01.gz", + "/tmp/test01", + "exports/../test01", + "bad\x00name", + "bad\tname", + "a" * 1025, + ], +) +def test_validate_export_filename_rejects_invalid_values(filename: str): + with pytest.raises(ValueError): + AppMessageExportService.validate_export_filename(filename) + + +def test_service_derives_output_names_from_filename_base(): + service = AppMessageExportService( + app_id="736b9b03-20f2-4697-91da-8d00f6325900", + start_from=None, + end_before=datetime.datetime(2026, 3, 1), + filename="exports/2026/test01", + batch_size=1000, + use_cloud_storage=True, + dry_run=True, + ) + + assert service._filename_base == "exports/2026/test01" + assert service.output_gz_name == "exports/2026/test01.jsonl.gz" + assert service.output_jsonl_name == "exports/2026/test01.jsonl" diff --git a/api/tests/unit_tests/tasks/test_summary_queue_isolation.py b/api/tests/unit_tests/tasks/test_summary_queue_isolation.py new file mode 100644 index 0000000000..f6632e0a8a --- /dev/null +++ b/api/tests/unit_tests/tasks/test_summary_queue_isolation.py @@ -0,0 +1,40 @@ +""" +Unit tests for summary index task queue isolation. + +These tasks must NOT run on the shared 'dataset' queue because they invoke LLMs +for each document segment and can occupy all worker slots for hours, blocking +document indexing tasks. +""" + +import pytest + +from tasks.generate_summary_index_task import generate_summary_index_task +from tasks.regenerate_summary_index_task import regenerate_summary_index_task + +SUMMARY_QUEUE = "dataset_summary" +INDEXING_QUEUE = "dataset" + + +def _task_queue(task) -> str | None: + # Celery's @shared_task(queue=...) stores the routing key on the task instance + # at runtime, but type stubs don't declare it; use getattr to stay type-clean. + return getattr(task, "queue", None) + + +@pytest.mark.parametrize( + ("task", "task_name"), + [ + (generate_summary_index_task, "generate_summary_index_task"), + (regenerate_summary_index_task, "regenerate_summary_index_task"), + ], +) +def test_summary_task_uses_dedicated_queue(task, task_name): + """Summary tasks must use the dataset_summary queue, not the shared dataset queue. + + Summary generation is LLM-heavy and will block document indexing if placed + on the shared queue. + """ + assert _task_queue(task) == SUMMARY_QUEUE, ( + f"{task_name} must run on '{SUMMARY_QUEUE}' queue (not '{INDEXING_QUEUE}'). " + "Summary generation is LLM-heavy and will block document indexing if placed on the shared queue." + ) diff --git a/dev/pyrefly-check-local b/dev/pyrefly-check-local new file mode 100755 index 0000000000..80f90927bb --- /dev/null +++ b/dev/pyrefly-check-local @@ -0,0 +1,34 @@ +#!/bin/bash + +set -euo pipefail + +SCRIPT_DIR="$(dirname "$(realpath "$0")")" +REPO_ROOT="$SCRIPT_DIR/.." +cd "$REPO_ROOT" + +EXCLUDES_FILE="api/pyrefly-local-excludes.txt" + +pyrefly_args=( + "--summary=none" + "--project-excludes=.venv" + "--project-excludes=migrations/" + "--project-excludes=tests/" +) + +if [[ -f "$EXCLUDES_FILE" ]]; then + while IFS= read -r exclude; do + [[ -z "$exclude" || "${exclude:0:1}" == "#" ]] && continue + pyrefly_args+=("--project-excludes=$exclude") + done < "$EXCLUDES_FILE" +fi + +tmp_output="$(mktemp)" +set +e +uv run --directory api --dev pyrefly check "${pyrefly_args[@]}" >"$tmp_output" 2>&1 +pyrefly_status=$? +set -e + +uv run --directory api python libs/pyrefly_diagnostics.py < "$tmp_output" +rm -f "$tmp_output" + +exit "$pyrefly_status" diff --git a/dev/start-worker b/dev/start-worker index 0450851b56..8baa36f1ed 100755 --- a/dev/start-worker +++ b/dev/start-worker @@ -21,6 +21,7 @@ show_help() { echo "" echo "Available queues:" echo " dataset - RAG indexing and document processing" + echo " dataset_summary - LLM-heavy summary index generation (isolated from indexing)" echo " workflow - Workflow triggers (community edition)" echo " workflow_professional - Professional tier workflows (cloud edition)" echo " workflow_team - Team tier workflows (cloud edition)" @@ -106,10 +107,10 @@ if [[ -z "${QUEUES}" ]]; then # Configure queues based on edition if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" else # Community edition (SELF_HOSTED): dataset and workflow have separate queues - QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" fi echo "No queues specified, using edition-based defaults: ${QUEUES}" diff --git a/web/app/components/base/amplitude/AmplitudeProvider.spec.tsx b/web/app/components/base/amplitude/AmplitudeProvider.spec.tsx new file mode 100644 index 0000000000..2402c84a3e --- /dev/null +++ b/web/app/components/base/amplitude/AmplitudeProvider.spec.tsx @@ -0,0 +1,139 @@ +import * as amplitude from '@amplitude/analytics-browser' +import { sessionReplayPlugin } from '@amplitude/plugin-session-replay-browser' +import { render } from '@testing-library/react' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import AmplitudeProvider, { isAmplitudeEnabled } from './AmplitudeProvider' + +const mockConfig = vi.hoisted(() => ({ + AMPLITUDE_API_KEY: 'test-api-key', + IS_CLOUD_EDITION: true, +})) + +vi.mock('@/config', () => mockConfig) + +vi.mock('@amplitude/analytics-browser', () => ({ + init: vi.fn(), + add: vi.fn(), +})) + +vi.mock('@amplitude/plugin-session-replay-browser', () => ({ + sessionReplayPlugin: vi.fn(() => ({ name: 'session-replay' })), +})) + +describe('AmplitudeProvider', () => { + beforeEach(() => { + vi.clearAllMocks() + mockConfig.AMPLITUDE_API_KEY = 'test-api-key' + mockConfig.IS_CLOUD_EDITION = true + }) + + describe('isAmplitudeEnabled', () => { + it('returns true when cloud edition and api key present', () => { + expect(isAmplitudeEnabled()).toBe(true) + }) + + it('returns false when cloud edition but no api key', () => { + mockConfig.AMPLITUDE_API_KEY = '' + expect(isAmplitudeEnabled()).toBe(false) + }) + + it('returns false when not cloud edition', () => { + mockConfig.IS_CLOUD_EDITION = false + expect(isAmplitudeEnabled()).toBe(false) + }) + }) + + describe('Component', () => { + it('initializes amplitude when enabled', () => { + render() + + expect(amplitude.init).toHaveBeenCalledWith('test-api-key', expect.any(Object)) + expect(sessionReplayPlugin).toHaveBeenCalledWith({ sampleRate: 0.8 }) + expect(amplitude.add).toHaveBeenCalledTimes(2) + }) + + it('does not initialize amplitude when disabled', () => { + mockConfig.AMPLITUDE_API_KEY = '' + render() + + expect(amplitude.init).not.toHaveBeenCalled() + expect(amplitude.add).not.toHaveBeenCalled() + }) + + it('pageNameEnrichmentPlugin logic works as expected', async () => { + render() + const plugin = vi.mocked(amplitude.add).mock.calls[0]?.[0] as amplitude.Types.EnrichmentPlugin | undefined + expect(plugin).toBeDefined() + if (!plugin?.execute || !plugin.setup) + throw new Error('Expected page-name-enrichment plugin with setup/execute') + + expect(plugin.name).toBe('page-name-enrichment') + + const execute = plugin.execute + const setup = plugin.setup + type SetupFn = NonNullable + const getPageTitle = (evt: amplitude.Types.Event | null | undefined) => + (evt?.event_properties as Record | undefined)?.['[Amplitude] Page Title'] + + await setup( + {} as Parameters[0], + {} as Parameters[1], + ) + + const originalWindowLocation = window.location + try { + Object.defineProperty(window, 'location', { + value: { pathname: '/datasets' }, + writable: true, + }) + const event: amplitude.Types.Event = { + event_type: '[Amplitude] Page Viewed', + event_properties: {}, + } + const result = await execute(event) + expect(getPageTitle(result)).toBe('Knowledge') + window.location.pathname = '/' + await execute(event) + expect(getPageTitle(event)).toBe('Home') + window.location.pathname = '/apps' + await execute(event) + expect(getPageTitle(event)).toBe('Studio') + window.location.pathname = '/explore' + await execute(event) + expect(getPageTitle(event)).toBe('Explore') + window.location.pathname = '/tools' + await execute(event) + expect(getPageTitle(event)).toBe('Tools') + window.location.pathname = '/account' + await execute(event) + expect(getPageTitle(event)).toBe('Account') + window.location.pathname = '/signin' + await execute(event) + expect(getPageTitle(event)).toBe('Sign In') + window.location.pathname = '/signup' + await execute(event) + expect(getPageTitle(event)).toBe('Sign Up') + window.location.pathname = '/unknown' + await execute(event) + expect(getPageTitle(event)).toBe('Unknown') + const otherEvent = { + event_type: 'Button Clicked', + event_properties: {}, + } as amplitude.Types.Event + const otherResult = await execute(otherEvent) + expect(getPageTitle(otherResult)).toBeUndefined() + const noPropsEvent = { + event_type: '[Amplitude] Page Viewed', + } as amplitude.Types.Event + const noPropsResult = await execute(noPropsEvent) + expect(noPropsResult?.event_properties).toBeUndefined() + } + finally { + Object.defineProperty(window, 'location', { + value: originalWindowLocation, + writable: true, + }) + } + }) + }) +}) diff --git a/web/app/components/base/amplitude/index.spec.ts b/web/app/components/base/amplitude/index.spec.ts new file mode 100644 index 0000000000..919c0b68d1 --- /dev/null +++ b/web/app/components/base/amplitude/index.spec.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from 'vitest' +import AmplitudeProvider, { isAmplitudeEnabled } from './AmplitudeProvider' +import indexDefault, { + isAmplitudeEnabled as indexIsAmplitudeEnabled, + resetUser, + setUserId, + setUserProperties, + trackEvent, +} from './index' +import { + resetUser as utilsResetUser, + setUserId as utilsSetUserId, + setUserProperties as utilsSetUserProperties, + trackEvent as utilsTrackEvent, +} from './utils' + +describe('Amplitude index exports', () => { + it('exports AmplitudeProvider as default', () => { + expect(indexDefault).toBe(AmplitudeProvider) + }) + + it('exports isAmplitudeEnabled', () => { + expect(indexIsAmplitudeEnabled).toBe(isAmplitudeEnabled) + }) + + it('exports utils', () => { + expect(resetUser).toBe(utilsResetUser) + expect(setUserId).toBe(utilsSetUserId) + expect(setUserProperties).toBe(utilsSetUserProperties) + expect(trackEvent).toBe(utilsTrackEvent) + }) +}) diff --git a/web/app/components/base/amplitude/utils.spec.ts b/web/app/components/base/amplitude/utils.spec.ts new file mode 100644 index 0000000000..c69fc93aa4 --- /dev/null +++ b/web/app/components/base/amplitude/utils.spec.ts @@ -0,0 +1,119 @@ +import { resetUser, setUserId, setUserProperties, trackEvent } from './utils' + +const mockState = vi.hoisted(() => ({ + enabled: true, +})) + +const mockTrack = vi.hoisted(() => vi.fn()) +const mockSetUserId = vi.hoisted(() => vi.fn()) +const mockIdentify = vi.hoisted(() => vi.fn()) +const mockReset = vi.hoisted(() => vi.fn()) + +const MockIdentify = vi.hoisted(() => + class { + setCalls: Array<[string, unknown]> = [] + + set(key: string, value: unknown) { + this.setCalls.push([key, value]) + return this + } + }, +) + +vi.mock('./AmplitudeProvider', () => ({ + isAmplitudeEnabled: () => mockState.enabled, +})) + +vi.mock('@amplitude/analytics-browser', () => ({ + track: (...args: unknown[]) => mockTrack(...args), + setUserId: (...args: unknown[]) => mockSetUserId(...args), + identify: (...args: unknown[]) => mockIdentify(...args), + reset: (...args: unknown[]) => mockReset(...args), + Identify: MockIdentify, +})) + +describe('amplitude utils', () => { + beforeEach(() => { + vi.clearAllMocks() + mockState.enabled = true + }) + + describe('trackEvent', () => { + it('should call amplitude.track when amplitude is enabled', () => { + trackEvent('dataset_created', { source: 'wizard' }) + + expect(mockTrack).toHaveBeenCalledTimes(1) + expect(mockTrack).toHaveBeenCalledWith('dataset_created', { source: 'wizard' }) + }) + + it('should not call amplitude.track when amplitude is disabled', () => { + mockState.enabled = false + + trackEvent('dataset_created', { source: 'wizard' }) + + expect(mockTrack).not.toHaveBeenCalled() + }) + }) + + describe('setUserId', () => { + it('should call amplitude.setUserId when amplitude is enabled', () => { + setUserId('user-123') + + expect(mockSetUserId).toHaveBeenCalledTimes(1) + expect(mockSetUserId).toHaveBeenCalledWith('user-123') + }) + + it('should not call amplitude.setUserId when amplitude is disabled', () => { + mockState.enabled = false + + setUserId('user-123') + + expect(mockSetUserId).not.toHaveBeenCalled() + }) + }) + + describe('setUserProperties', () => { + it('should build identify event and call amplitude.identify when amplitude is enabled', () => { + const properties: Record = { + role: 'owner', + seats: 3, + verified: true, + } + + setUserProperties(properties) + + expect(mockIdentify).toHaveBeenCalledTimes(1) + const identifyArg = mockIdentify.mock.calls[0][0] as InstanceType + expect(identifyArg).toBeInstanceOf(MockIdentify) + expect(identifyArg.setCalls).toEqual([ + ['role', 'owner'], + ['seats', 3], + ['verified', true], + ]) + }) + + it('should not call amplitude.identify when amplitude is disabled', () => { + mockState.enabled = false + + setUserProperties({ role: 'owner' }) + + expect(mockIdentify).not.toHaveBeenCalled() + }) + }) + + describe('resetUser', () => { + it('should call amplitude.reset when amplitude is enabled', () => { + resetUser() + + expect(mockReset).toHaveBeenCalledTimes(1) + }) + + it('should not call amplitude.reset when amplitude is disabled', () => { + mockState.enabled = false + + resetUser() + + expect(mockReset).not.toHaveBeenCalled() + }) + }) +}) diff --git a/web/app/components/base/audio-btn/__tests__/audio.player.manager.spec.ts b/web/app/components/base/audio-btn/__tests__/audio.player.manager.spec.ts new file mode 100644 index 0000000000..c613aa2c11 --- /dev/null +++ b/web/app/components/base/audio-btn/__tests__/audio.player.manager.spec.ts @@ -0,0 +1,148 @@ +import { AudioPlayerManager } from '../audio.player.manager' + +type AudioCallback = ((event: string) => void) | null +type AudioPlayerCtorArgs = [ + string, + boolean, + string | undefined, + string | null | undefined, + string | undefined, + AudioCallback, +] + +type MockAudioPlayerInstance = { + setCallback: ReturnType + pauseAudio: ReturnType + resetMsgId: ReturnType + cacheBuffers: Array + sourceBuffer: { + abort: ReturnType + } | undefined +} + +const mockState = vi.hoisted(() => ({ + instances: [] as MockAudioPlayerInstance[], +})) + +const mockAudioPlayerConstructor = vi.hoisted(() => vi.fn()) + +const MockAudioPlayer = vi.hoisted(() => { + return class MockAudioPlayerClass { + setCallback = vi.fn() + pauseAudio = vi.fn() + resetMsgId = vi.fn() + cacheBuffers = [new ArrayBuffer(1)] + sourceBuffer = { abort: vi.fn() } + + constructor(...args: AudioPlayerCtorArgs) { + mockAudioPlayerConstructor(...args) + mockState.instances.push(this as unknown as MockAudioPlayerInstance) + } + } +}) + +vi.mock('@/app/components/base/audio-btn/audio', () => ({ + default: MockAudioPlayer, +})) + +describe('AudioPlayerManager', () => { + beforeEach(() => { + vi.clearAllMocks() + mockState.instances = [] + Reflect.set(AudioPlayerManager, 'instance', undefined) + }) + + describe('getInstance', () => { + it('should return the same singleton instance across calls', () => { + const first = AudioPlayerManager.getInstance() + const second = AudioPlayerManager.getInstance() + + expect(first).toBe(second) + }) + }) + + describe('getAudioPlayer', () => { + it('should create a new audio player when no existing player is cached', () => { + const manager = AudioPlayerManager.getInstance() + const callback = vi.fn() + + const result = manager.getAudioPlayer('/text-to-audio', false, 'msg-1', 'hello', 'en-US', callback) + + expect(mockAudioPlayerConstructor).toHaveBeenCalledTimes(1) + expect(mockAudioPlayerConstructor).toHaveBeenCalledWith( + '/text-to-audio', + false, + 'msg-1', + 'hello', + 'en-US', + callback, + ) + expect(result).toBe(mockState.instances[0]) + }) + + it('should reuse existing player and update callback when msg id is unchanged', () => { + const manager = AudioPlayerManager.getInstance() + const firstCallback = vi.fn() + const secondCallback = vi.fn() + + const first = manager.getAudioPlayer('/text-to-audio', false, 'msg-1', 'hello', 'en-US', firstCallback) + const second = manager.getAudioPlayer('/ignored', true, 'msg-1', 'ignored', 'fr-FR', secondCallback) + + expect(mockAudioPlayerConstructor).toHaveBeenCalledTimes(1) + expect(first).toBe(second) + expect(mockState.instances[0].setCallback).toHaveBeenCalledTimes(1) + expect(mockState.instances[0].setCallback).toHaveBeenCalledWith(secondCallback) + }) + + it('should cleanup existing player and create a new one when msg id changes', () => { + const manager = AudioPlayerManager.getInstance() + const callback = vi.fn() + manager.getAudioPlayer('/text-to-audio', false, 'msg-1', 'hello', 'en-US', callback) + const previous = mockState.instances[0] + + const next = manager.getAudioPlayer('/apps/1/text-to-audio', false, 'msg-2', 'world', 'en-US', callback) + + expect(previous.pauseAudio).toHaveBeenCalledTimes(1) + expect(previous.cacheBuffers).toEqual([]) + expect(previous.sourceBuffer?.abort).toHaveBeenCalledTimes(1) + expect(mockAudioPlayerConstructor).toHaveBeenCalledTimes(2) + expect(next).toBe(mockState.instances[1]) + }) + + it('should swallow cleanup errors and still create a new player', () => { + const manager = AudioPlayerManager.getInstance() + const callback = vi.fn() + manager.getAudioPlayer('/text-to-audio', false, 'msg-1', 'hello', 'en-US', callback) + const previous = mockState.instances[0] + previous.pauseAudio.mockImplementation(() => { + throw new Error('cleanup failure') + }) + + expect(() => { + manager.getAudioPlayer('/apps/1/text-to-audio', false, 'msg-2', 'world', 'en-US', callback) + }).not.toThrow() + + expect(previous.pauseAudio).toHaveBeenCalledTimes(1) + expect(mockAudioPlayerConstructor).toHaveBeenCalledTimes(2) + }) + }) + + describe('resetMsgId', () => { + it('should forward reset message id to the cached audio player when present', () => { + const manager = AudioPlayerManager.getInstance() + const callback = vi.fn() + manager.getAudioPlayer('/text-to-audio', false, 'msg-1', 'hello', 'en-US', callback) + + manager.resetMsgId('msg-updated') + + expect(mockState.instances[0].resetMsgId).toHaveBeenCalledTimes(1) + expect(mockState.instances[0].resetMsgId).toHaveBeenCalledWith('msg-updated') + }) + + it('should not throw when resetting message id without an audio player', () => { + const manager = AudioPlayerManager.getInstance() + + expect(() => manager.resetMsgId('msg-updated')).not.toThrow() + }) + }) +}) diff --git a/web/app/components/base/audio-btn/__tests__/audio.spec.ts b/web/app/components/base/audio-btn/__tests__/audio.spec.ts new file mode 100644 index 0000000000..00ffea2dfb --- /dev/null +++ b/web/app/components/base/audio-btn/__tests__/audio.spec.ts @@ -0,0 +1,610 @@ +import { Buffer } from 'node:buffer' +import { waitFor } from '@testing-library/react' +import { AppSourceType } from '@/service/share' +import AudioPlayer from '../audio' + +const mockToastNotify = vi.hoisted(() => vi.fn()) +const mockTextToAudioStream = vi.hoisted(() => vi.fn()) + +vi.mock('@/app/components/base/toast', () => ({ + default: { + notify: (...args: unknown[]) => mockToastNotify(...args), + }, +})) + +vi.mock('@/service/share', () => ({ + AppSourceType: { + webApp: 'webApp', + installedApp: 'installedApp', + }, + textToAudioStream: (...args: unknown[]) => mockTextToAudioStream(...args), +})) + +type AudioEventName = 'ended' | 'paused' | 'loaded' | 'play' | 'timeupdate' | 'loadeddate' | 'canplay' | 'error' | 'sourceopen' + +type AudioEventListener = () => void + +type ReaderResult = { + value: Uint8Array | undefined + done: boolean +} + +type Reader = { + read: () => Promise +} + +type AudioResponse = { + status: number + body: { + getReader: () => Reader + } +} + +class MockSourceBuffer { + updating = false + appendBuffer = vi.fn((_buffer: ArrayBuffer) => undefined) + abort = vi.fn(() => undefined) +} + +class MockMediaSource { + readyState: 'open' | 'closed' = 'open' + sourceBuffer = new MockSourceBuffer() + private listeners: Partial> = {} + + addEventListener = vi.fn((event: AudioEventName, listener: AudioEventListener) => { + const listeners = this.listeners[event] || [] + listeners.push(listener) + this.listeners[event] = listeners + }) + + addSourceBuffer = vi.fn((_contentType: string) => this.sourceBuffer) + endOfStream = vi.fn(() => undefined) + + emit(event: AudioEventName) { + const listeners = this.listeners[event] || [] + listeners.forEach((listener) => { + listener() + }) + } +} + +class MockAudio { + src = '' + autoplay = false + disableRemotePlayback = false + controls = false + paused = true + ended = false + played: unknown = null + private listeners: Partial> = {} + + addEventListener = vi.fn((event: AudioEventName, listener: AudioEventListener) => { + const listeners = this.listeners[event] || [] + listeners.push(listener) + this.listeners[event] = listeners + }) + + play = vi.fn(async () => { + this.paused = false + }) + + pause = vi.fn(() => { + this.paused = true + }) + + emit(event: AudioEventName) { + const listeners = this.listeners[event] || [] + listeners.forEach((listener) => { + listener() + }) + } +} + +class MockAudioContext { + state: 'running' | 'suspended' = 'running' + destination = {} + connect = vi.fn(() => undefined) + createMediaElementSource = vi.fn((_audio: MockAudio) => ({ + connect: this.connect, + })) + + resume = vi.fn(async () => { + this.state = 'running' + }) + + suspend = vi.fn(() => { + this.state = 'suspended' + }) +} + +const testState = { + mediaSources: [] as MockMediaSource[], + audios: [] as MockAudio[], + audioContexts: [] as MockAudioContext[], +} + +class MockMediaSourceCtor extends MockMediaSource { + constructor() { + super() + testState.mediaSources.push(this) + } +} + +class MockAudioCtor extends MockAudio { + constructor() { + super() + testState.audios.push(this) + } +} + +class MockAudioContextCtor extends MockAudioContext { + constructor() { + super() + testState.audioContexts.push(this) + } +} + +const originalAudio = globalThis.Audio +const originalAudioContext = globalThis.AudioContext +const originalCreateObjectURL = globalThis.URL.createObjectURL +const originalMediaSource = window.MediaSource +const originalManagedMediaSource = window.ManagedMediaSource + +const setMediaSourceSupport = (options: { mediaSource: boolean, managedMediaSource: boolean }) => { + Object.defineProperty(window, 'MediaSource', { + configurable: true, + writable: true, + value: options.mediaSource ? MockMediaSourceCtor : undefined, + }) + Object.defineProperty(window, 'ManagedMediaSource', { + configurable: true, + writable: true, + value: options.managedMediaSource ? MockMediaSourceCtor : undefined, + }) +} + +const makeAudioResponse = (status: number, reads: ReaderResult[]): AudioResponse => { + const read = vi.fn<() => Promise>() + reads.forEach((result) => { + read.mockResolvedValueOnce(result) + }) + + return { + status, + body: { + getReader: () => ({ read }), + }, + } +} + +describe('AudioPlayer', () => { + beforeEach(() => { + vi.clearAllMocks() + testState.mediaSources = [] + testState.audios = [] + testState.audioContexts = [] + + Object.defineProperty(globalThis, 'Audio', { + configurable: true, + writable: true, + value: MockAudioCtor, + }) + Object.defineProperty(globalThis, 'AudioContext', { + configurable: true, + writable: true, + value: MockAudioContextCtor, + }) + Object.defineProperty(globalThis.URL, 'createObjectURL', { + configurable: true, + writable: true, + value: vi.fn(() => 'blob:mock-url'), + }) + + setMediaSourceSupport({ mediaSource: true, managedMediaSource: false }) + }) + + afterAll(() => { + Object.defineProperty(globalThis, 'Audio', { + configurable: true, + writable: true, + value: originalAudio, + }) + Object.defineProperty(globalThis, 'AudioContext', { + configurable: true, + writable: true, + value: originalAudioContext, + }) + Object.defineProperty(globalThis.URL, 'createObjectURL', { + configurable: true, + writable: true, + value: originalCreateObjectURL, + }) + Object.defineProperty(window, 'MediaSource', { + configurable: true, + writable: true, + value: originalMediaSource, + }) + Object.defineProperty(window, 'ManagedMediaSource', { + configurable: true, + writable: true, + value: originalManagedMediaSource, + }) + }) + + describe('constructor behavior', () => { + it('should initialize media source, audio, and media element source when MediaSource exists', () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + const mediaSource = testState.mediaSources[0] + + expect(player.mediaSource).toBe(mediaSource as unknown as MediaSource) + expect(globalThis.URL.createObjectURL).toHaveBeenCalledTimes(1) + expect(audio.src).toBe('blob:mock-url') + expect(audio.autoplay).toBe(true) + expect(audioContext.createMediaElementSource).toHaveBeenCalledWith(audio) + expect(audioContext.connect).toHaveBeenCalledTimes(1) + }) + + it('should notify unsupported browser when no MediaSource implementation exists', () => { + setMediaSourceSupport({ mediaSource: false, managedMediaSource: false }) + + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const audio = testState.audios[0] + + expect(player.mediaSource).toBeNull() + expect(audio.src).toBe('') + expect(mockToastNotify).toHaveBeenCalledTimes(1) + expect(mockToastNotify).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'error', + }), + ) + }) + + it('should configure fallback audio controls when ManagedMediaSource is used', () => { + setMediaSourceSupport({ mediaSource: false, managedMediaSource: true }) + + // Create with callback to ensure constructor path completes with fallback source. + const player = new AudioPlayer('/text-to-audio', false, 'msg-1', 'hello', undefined, vi.fn()) + const audio = testState.audios[0] + + expect(player.mediaSource).not.toBeNull() + expect(audio.disableRemotePlayback).toBe(true) + expect(audio.controls).toBe(true) + }) + }) + + describe('event wiring', () => { + it('should forward registered audio events to callback', () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + + audio.emit('play') + audio.emit('ended') + audio.emit('error') + audio.emit('paused') + audio.emit('loaded') + audio.emit('timeupdate') + audio.emit('loadeddate') + audio.emit('canplay') + + expect(player.callback).toBe(callback) + expect(callback).toHaveBeenCalledWith('play') + expect(callback).toHaveBeenCalledWith('ended') + expect(callback).toHaveBeenCalledWith('error') + expect(callback).toHaveBeenCalledWith('paused') + expect(callback).toHaveBeenCalledWith('loaded') + expect(callback).toHaveBeenCalledWith('timeupdate') + expect(callback).toHaveBeenCalledWith('loadeddate') + expect(callback).toHaveBeenCalledWith('canplay') + }) + + it('should initialize source buffer only once when sourceopen fires multiple times', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', vi.fn()) + const mediaSource = testState.mediaSources[0] + + mediaSource.emit('sourceopen') + mediaSource.emit('sourceopen') + + expect(mediaSource.addSourceBuffer).toHaveBeenCalledTimes(1) + expect(player.sourceBuffer).toBe(mediaSource.sourceBuffer) + }) + }) + + describe('playback control', () => { + it('should request streaming audio when playAudio is called before loading', async () => { + mockTextToAudioStream.mockResolvedValue( + makeAudioResponse(200, [ + { value: new Uint8Array([4, 5]), done: false }, + { value: new Uint8Array([1, 2, 3]), done: true }, + ]), + ) + + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', vi.fn()) + player.playAudio() + + await waitFor(() => { + expect(mockTextToAudioStream).toHaveBeenCalledTimes(1) + }) + + expect(mockTextToAudioStream).toHaveBeenCalledWith( + '/text-to-audio', + AppSourceType.webApp, + { content_type: 'audio/mpeg' }, + { + message_id: 'msg-1', + streaming: true, + voice: 'en-US', + text: 'hello', + }, + ) + expect(player.isLoadData).toBe(true) + }) + + it('should emit error callback and reset load flag when stream response status is not 200', async () => { + const callback = vi.fn() + mockTextToAudioStream.mockResolvedValue( + makeAudioResponse(500, [{ value: new Uint8Array([1]), done: true }]), + ) + + const player = new AudioPlayer('/text-to-audio', false, 'msg-2', 'world', undefined, callback) + player.playAudio() + + await waitFor(() => { + expect(callback).toHaveBeenCalledWith('error') + }) + expect(player.isLoadData).toBe(false) + }) + + it('should resume and play immediately when playAudio is called in suspended loaded state', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', false, 'msg-1', 'hello', undefined, callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + + player.isLoadData = true + audioContext.state = 'suspended' + player.playAudio() + await Promise.resolve() + + expect(audioContext.resume).toHaveBeenCalledTimes(1) + expect(audio.play).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledWith('play') + }) + + it('should play ended audio when data is already loaded', () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', false, 'msg-1', 'hello', undefined, callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + + player.isLoadData = true + audioContext.state = 'running' + audio.ended = true + player.playAudio() + + expect(audio.play).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledWith('play') + }) + + it('should only emit play callback without replaying when loaded audio is already playing', () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', false, 'msg-1', 'hello', undefined, callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + + player.isLoadData = true + audioContext.state = 'running' + audio.ended = false + player.playAudio() + + expect(audio.play).not.toHaveBeenCalled() + expect(callback).toHaveBeenCalledWith('play') + }) + + it('should emit error callback when stream request throws', async () => { + const callback = vi.fn() + mockTextToAudioStream.mockRejectedValue(new Error('network failed')) + const player = new AudioPlayer('/text-to-audio', false, 'msg-2', 'world', undefined, callback) + + player.playAudio() + + await waitFor(() => { + expect(callback).toHaveBeenCalledWith('error') + }) + expect(player.isLoadData).toBe(false) + }) + + it('should call pause flow and notify paused event when pauseAudio is invoked', () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + + player.pauseAudio() + + expect(callback).toHaveBeenCalledWith('paused') + expect(audio.pause).toHaveBeenCalledTimes(1) + expect(audioContext.suspend).toHaveBeenCalledTimes(1) + }) + }) + + describe('message and direct-audio helpers', () => { + it('should update message id through resetMsgId', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + + player.resetMsgId('msg-2') + + expect(player.msgId).toBe('msg-2') + }) + + it('should end stream without playback when playAudioWithAudio receives empty content', async () => { + vi.useFakeTimers() + try { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const mediaSource = testState.mediaSources[0] + + await player.playAudioWithAudio('', true) + await vi.advanceTimersByTimeAsync(40) + + expect(player.isLoadData).toBe(false) + expect(player.cacheBuffers).toHaveLength(0) + expect(mediaSource.endOfStream).toHaveBeenCalledTimes(1) + expect(callback).not.toHaveBeenCalledWith('play') + } + finally { + vi.useRealTimers() + } + }) + + it('should decode base64 and start playback when playAudioWithAudio is called with playable content', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + const mediaSource = testState.mediaSources[0] + const audioBase64 = Buffer.from('hello').toString('base64') + + mediaSource.emit('sourceopen') + audio.paused = true + await player.playAudioWithAudio(audioBase64, true) + await Promise.resolve() + + expect(player.isLoadData).toBe(true) + expect(player.cacheBuffers).toHaveLength(0) + expect(mediaSource.sourceBuffer.appendBuffer).toHaveBeenCalledTimes(1) + const appendedAudioData = mediaSource.sourceBuffer.appendBuffer.mock.calls[0][0] + expect(appendedAudioData).toBeInstanceOf(ArrayBuffer) + expect(appendedAudioData.byteLength).toBeGreaterThan(0) + expect(audioContext.resume).toHaveBeenCalledTimes(1) + expect(audio.play).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledWith('play') + }) + + it('should skip playback when playAudioWithAudio is called with play=false', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + const audioContext = testState.audioContexts[0] + + await player.playAudioWithAudio(Buffer.from('hello').toString('base64'), false) + + expect(player.isLoadData).toBe(false) + expect(audioContext.resume).not.toHaveBeenCalled() + expect(audio.play).not.toHaveBeenCalled() + expect(callback).not.toHaveBeenCalledWith('play') + }) + + it('should play immediately for ended audio in playAudioWithAudio', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + + audio.paused = false + audio.ended = true + await player.playAudioWithAudio(Buffer.from('hello').toString('base64'), true) + + expect(audio.play).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledWith('play') + }) + + it('should not replay when played list exists in playAudioWithAudio', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + + audio.paused = false + audio.ended = false + audio.played = {} + await player.playAudioWithAudio(Buffer.from('hello').toString('base64'), true) + + expect(audio.play).not.toHaveBeenCalled() + expect(callback).not.toHaveBeenCalledWith('play') + }) + + it('should replay when paused is false and played list is empty in playAudioWithAudio', async () => { + const callback = vi.fn() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', callback) + const audio = testState.audios[0] + + audio.paused = false + audio.ended = false + audio.played = null + await player.playAudioWithAudio(Buffer.from('hello').toString('base64'), true) + + expect(audio.play).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledWith('play') + }) + }) + + describe('buffering internals', () => { + it('should finish stream when receiveAudioData gets an undefined chunk', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const finishStream = vi + .spyOn(player as unknown as { finishStream: () => void }, 'finishStream') + .mockImplementation(() => { }) + + ; (player as unknown as { receiveAudioData: (data: Uint8Array | undefined) => void }).receiveAudioData(undefined) + + expect(finishStream).toHaveBeenCalledTimes(1) + }) + + it('should finish stream when receiveAudioData gets empty bytes while source is open', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const finishStream = vi + .spyOn(player as unknown as { finishStream: () => void }, 'finishStream') + .mockImplementation(() => { }) + + ; (player as unknown as { receiveAudioData: (data: Uint8Array) => void }).receiveAudioData(new Uint8Array(0)) + + expect(finishStream).toHaveBeenCalledTimes(1) + }) + + it('should queue incoming buffer when source buffer is updating', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const mediaSource = testState.mediaSources[0] + mediaSource.emit('sourceopen') + mediaSource.sourceBuffer.updating = true + + ; (player as unknown as { receiveAudioData: (data: Uint8Array) => void }).receiveAudioData(new Uint8Array([1, 2, 3])) + + expect(player.cacheBuffers.length).toBe(1) + }) + + it('should append previously queued buffer before new one when source buffer is idle', () => { + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const mediaSource = testState.mediaSources[0] + mediaSource.emit('sourceopen') + + const existingBuffer = new ArrayBuffer(2) + player.cacheBuffers = [existingBuffer] + mediaSource.sourceBuffer.updating = false + + ; (player as unknown as { receiveAudioData: (data: Uint8Array) => void }).receiveAudioData(new Uint8Array([9])) + + expect(mediaSource.sourceBuffer.appendBuffer).toHaveBeenCalledTimes(1) + expect(mediaSource.sourceBuffer.appendBuffer).toHaveBeenCalledWith(existingBuffer) + expect(player.cacheBuffers.length).toBe(1) + }) + + it('should append cache chunks and end stream when finishStream drains buffers', () => { + vi.useFakeTimers() + const player = new AudioPlayer('/text-to-audio', true, 'msg-1', 'hello', 'en-US', null) + const mediaSource = testState.mediaSources[0] + mediaSource.emit('sourceopen') + mediaSource.sourceBuffer.updating = false + player.cacheBuffers = [new ArrayBuffer(3)] + + ; (player as unknown as { finishStream: () => void }).finishStream() + vi.advanceTimersByTime(50) + + expect(mediaSource.sourceBuffer.appendBuffer).toHaveBeenCalledTimes(1) + expect(mediaSource.endOfStream).toHaveBeenCalledTimes(1) + vi.useRealTimers() + }) + }) +}) diff --git a/web/app/components/base/audio-gallery/AudioPlayer.tsx b/web/app/components/base/audio-gallery/AudioPlayer.tsx index 4e5d5e61ab..673d258a57 100644 --- a/web/app/components/base/audio-gallery/AudioPlayer.tsx +++ b/web/app/components/base/audio-gallery/AudioPlayer.tsx @@ -26,6 +26,7 @@ const AudioPlayer: React.FC = ({ src, srcs }) => { useEffect(() => { const audio = audioRef.current + /* v8 ignore next 2 - @preserve */ if (!audio) return @@ -217,6 +218,7 @@ const AudioPlayer: React.FC = ({ src, srcs }) => { const drawWaveform = useCallback(() => { const canvas = canvasRef.current + /* v8 ignore next 2 - @preserve */ if (!canvas) return @@ -268,14 +270,20 @@ const AudioPlayer: React.FC = ({ src, srcs }) => { drawWaveform() }, [drawWaveform, bufferedTime, hasStartedPlaying]) - const handleMouseMove = useCallback((e: React.MouseEvent) => { + const handleMouseMove = useCallback((e: React.MouseEvent | React.TouchEvent) => { const canvas = canvasRef.current const audio = audioRef.current if (!canvas || !audio) return + const clientX = 'touches' in e + ? e.touches[0]?.clientX ?? e.changedTouches[0]?.clientX + : e.clientX + if (clientX === undefined) + return + const rect = canvas.getBoundingClientRect() - const percent = Math.min(Math.max(0, e.clientX - rect.left), rect.width) / rect.width + const percent = Math.min(Math.max(0, clientX - rect.left), rect.width) / rect.width const time = percent * duration // Check if the hovered position is within a buffered range before updating hoverTime @@ -289,7 +297,7 @@ const AudioPlayer: React.FC = ({ src, srcs }) => { return (
-