From 66212e3575a96caa4585953a00fca8e11ce1234a Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Thu, 9 Apr 2026 10:30:52 +0800 Subject: [PATCH] feat(api): implement zero-migration transparent upgrade (Phase 8) Add two feature-flag-controlled upgrade paths that allow existing apps and LLM nodes to transparently run through the Agent V2 engine without any database migration: 1. AGENT_V2_TRANSPARENT_UPGRADE (default: off): When enabled, old apps (chat/completion/agent-chat) bypass legacy Easy-UI runners. VirtualWorkflowSynthesizer converts AppModelConfig to an in-memory Workflow (start -> agent-v2 -> answer) at runtime, then executes via AdvancedChatAppGenerator. Falls back to legacy path on any synthesis error. VirtualWorkflowSynthesizer maps: - model JSON -> ModelConfig - pre_prompt/chat_prompt_config -> prompt_template - agent_mode.tools -> ToolMetadata[] - agent_mode.strategy -> agent_strategy - dataset_configs -> context - file_upload -> vision 2. AGENT_V2_REPLACES_LLM (default: off): When enabled, DifyNodeFactory.create_node() transparently remaps nodes with type="llm" to type="agent-v2" before class resolution. Since AgentV2NodeData is a strict superset of LLMNodeData, the mapping is lossless. With tools=[], Agent V2 behaves identically to LLM Node. Both flags default to False for safety. Turn off = instant rollback. 46 existing tests pass. Flask starts successfully. Made-with: Cursor --- api/configs/feature/__init__.py | 17 ++ api/core/workflow/node_factory.py | 22 ++ api/services/app_generate_service.py | 71 +++++++ api/services/workflow/virtual_workflow.py | 240 ++++++++++++++++++++++ 4 files changed, 350 insertions(+) create mode 100644 api/services/workflow/virtual_workflow.py diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index f884489f5e..ab78108f80 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1322,6 +1322,22 @@ class CollaborationConfig(BaseSettings): ) +class AgentV2UpgradeConfig(BaseSettings): + """Feature flags for transparent Agent V2 upgrade.""" + + AGENT_V2_TRANSPARENT_UPGRADE: bool = Field( + description="Transparently run old apps (chat/completion/agent-chat) through the Agent V2 workflow engine. " + "When enabled, old apps synthesize a virtual workflow at runtime instead of using legacy runners.", + default=False, + ) + + AGENT_V2_REPLACES_LLM: bool = Field( + description="Transparently replace LLM nodes in workflows with Agent V2 nodes at runtime. " + "LLMNodeData is remapped to AgentV2NodeData with tools=[] (identical behavior).", + default=False, + ) + + class LoginConfig(BaseSettings): ENABLE_EMAIL_CODE_LOGIN: bool = Field( description="whether to enable email code login", @@ -1450,6 +1466,7 @@ class FeatureConfig( WorkflowNodeExecutionConfig, WorkspaceConfig, CollaborationConfig, + AgentV2UpgradeConfig, LoginConfig, AccountConfig, SwaggerUIConfig, diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index 2bf60fc430..f8088fe9ef 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -331,6 +331,11 @@ class DifyNodeFactory(NodeFactory): typed_node_config = NodeConfigDictAdapter.validate_python(normalize_node_config_for_graph(node_config)) node_id = typed_node_config["id"] node_data = typed_node_config["data"] + + if node_data.type == BuiltinNodeTypes.LLM and dify_config.AGENT_V2_REPLACES_LLM: + node_data = self._remap_llm_to_agent_v2(node_data) + typed_node_config["data"] = node_data + node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version)) node_type = node_data.type node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = { @@ -419,6 +424,23 @@ class DifyNodeFactory(NodeFactory): """Resolve sandbox from run_context, if available.""" return self.graph_init_params.run_context.get(DIFY_SANDBOX_CONTEXT_KEY) + @staticmethod + def _remap_llm_to_agent_v2(node_data: BaseNodeData) -> BaseNodeData: + """Transparently remap LLMNodeData to AgentV2NodeData. + + Since AgentV2NodeData is a strict superset of LLMNodeData + (same LLM fields + tools/iterations/strategy), the mapping is lossless. + With tools=[], Agent V2 behaves identically to LLM Node. + """ + from core.workflow.nodes.agent_v2.entities import AGENT_V2_NODE_TYPE, AgentV2NodeData + + data_dict = node_data.model_dump() + data_dict["type"] = AGENT_V2_NODE_TYPE + data_dict.setdefault("tools", []) + data_dict.setdefault("max_iterations", 10) + data_dict.setdefault("agent_strategy", "auto") + return AgentV2NodeData.model_validate(data_dict) + @staticmethod def _validate_resolved_node_data(node_class: type[Node], node_data: BaseNodeData) -> BaseNodeData: """ diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 074b91881f..17a112136d 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -125,6 +125,77 @@ class AppGenerateService: if app_model.is_agent and app_model.mode not in {AppMode.AGENT_CHAT, AppMode.AGENT} else app_model.mode ) + + if ( + effective_mode in {AppMode.COMPLETION, AppMode.CHAT, AppMode.AGENT_CHAT} + and dify_config.AGENT_V2_TRANSPARENT_UPGRADE + ): + from services.workflow.virtual_workflow import VirtualWorkflowSynthesizer + + try: + virtual_workflow = VirtualWorkflowSynthesizer.synthesize(app_model) + logger.info( + "[AGENT_V2_UPGRADE] Transparent upgrade for app %s (mode=%s)", + app_model.id, + effective_mode, + ) + workflow_id_arg = args.get("workflow_id") + if not workflow_id_arg: + workflow = virtual_workflow + else: + workflow = cls._get_workflow(app_model, invoke_from, workflow_id_arg) + + if streaming: + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=True, + call_depth=0, + ) + payload_json = payload.model_dump_json() + + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) + + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + generator = AdvancedChatAppGenerator() + return rate_limit.generate( + generator.convert_to_event_stream( + generator.retrieve_events( + AppMode.AGENT, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), + ), + request_id=request_id, + ) + else: + advanced_generator = AdvancedChatAppGenerator() + return rate_limit.generate( + advanced_generator.convert_to_event_stream( + advanced_generator.generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + workflow_run_id=str(uuid.uuid4()), + streaming=False, + ) + ), + request_id=request_id, + ) + except Exception: + logger.warning( + "[AGENT_V2_UPGRADE] Transparent upgrade failed for app %s, falling back to legacy", + app_model.id, + exc_info=True, + ) + match effective_mode: case AppMode.COMPLETION: return rate_limit.generate( diff --git a/api/services/workflow/virtual_workflow.py b/api/services/workflow/virtual_workflow.py new file mode 100644 index 0000000000..6167f01349 --- /dev/null +++ b/api/services/workflow/virtual_workflow.py @@ -0,0 +1,240 @@ +"""Virtual Workflow Synthesizer for transparent old-app upgrade. + +Converts an old App's AppModelConfig into an in-memory Workflow object +with a single agent-v2 node, without persisting to the database. +This allows legacy apps (chat/completion/agent-chat) to run through +the Agent V2 workflow engine transparently. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any +from uuid import uuid4 + +from core.workflow.nodes.agent_v2.entities import AGENT_V2_NODE_TYPE +from models.model import App, AppMode, AppModelConfig + +logger = logging.getLogger(__name__) + + +class VirtualWorkflowSynthesizer: + """Synthesize in-memory Workflow from legacy AppModelConfig.""" + + @staticmethod + def synthesize(app: App) -> Any: + """Convert old app config to a virtual Workflow object. + + Returns a Workflow-like object (not persisted to DB) that can be + passed to AdvancedChatAppGenerator.generate(). + """ + from models.workflow import Workflow, WorkflowType + + config = app.app_model_config + if not config: + raise ValueError("App has no model config") + + model_dict = _extract_model_config(config) + prompt_template = _build_prompt_template(config, app.mode) + tools = _extract_tools(config) + agent_strategy = _extract_strategy(config) + max_iterations = _extract_max_iterations(config) + context = _build_context_config(config) + vision = _build_vision_config(config) + is_chat = app.mode != AppMode.COMPLETION + + agent_node_data: dict[str, Any] = { + "type": AGENT_V2_NODE_TYPE, + "title": "Agent", + "model": model_dict, + "prompt_template": prompt_template, + "tools": tools, + "max_iterations": max_iterations, + "agent_strategy": agent_strategy, + "context": context, + "vision": vision, + } + if is_chat: + agent_node_data["memory"] = {"window": {"enabled": True, "size": 50}} + + graph = _build_graph(agent_node_data, is_chat) + + workflow = Workflow() + workflow.id = str(uuid4()) + workflow.tenant_id = app.tenant_id + workflow.app_id = app.id + workflow.type = WorkflowType.CHAT if is_chat else WorkflowType.WORKFLOW + workflow.version = "virtual" + workflow.graph = json.dumps(graph) + workflow.features = "{}" + workflow.created_by = app.created_by + workflow.updated_by = app.updated_by + + return workflow + + +def _extract_model_config(config: AppModelConfig) -> dict[str, Any]: + if config.model: + try: + return json.loads(config.model) + except (json.JSONDecodeError, TypeError): + pass + return {"provider": "openai", "name": "gpt-4o", "mode": "chat", "completion_params": {}} + + +def _build_prompt_template(config: AppModelConfig, mode: str) -> list[dict[str, str]]: + messages: list[dict[str, str]] = [] + + if config.prompt_type and config.prompt_type.value == "advanced": + if config.chat_prompt_config: + try: + chat_config = json.loads(config.chat_prompt_config) + if isinstance(chat_config, dict) and "prompt" in chat_config: + prompts = chat_config["prompt"] + if isinstance(prompts, list): + for p in prompts: + if isinstance(p, dict) and "role" in p and "text" in p: + messages.append({"role": p["role"], "text": p["text"]}) + except (json.JSONDecodeError, TypeError): + pass + + if not messages: + pre_prompt = config.pre_prompt or "" + if pre_prompt: + messages.append({"role": "system", "text": pre_prompt}) + + if mode == AppMode.COMPLETION: + messages.append({"role": "user", "text": "{{#sys.query#}}"}) + else: + messages.append({"role": "user", "text": "{{#sys.query#}}"}) + + return messages + + +def _extract_tools(config: AppModelConfig) -> list[dict[str, Any]]: + if not config.agent_mode: + return [] + try: + agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode + except (json.JSONDecodeError, TypeError): + return [] + + if not isinstance(agent_mode, dict) or not agent_mode.get("enabled"): + return [] + + tools_config = agent_mode.get("tools", []) + result: list[dict[str, Any]] = [] + + for tool in tools_config: + if not isinstance(tool, dict): + continue + if not tool.get("enabled", True): + continue + + provider_type = tool.get("provider_type", "builtin") + provider_id = tool.get("provider_id", "") + tool_name = tool.get("tool_name", "") + + if not tool_name: + continue + + result.append({ + "enabled": True, + "type": provider_type, + "provider_name": provider_id, + "tool_name": tool_name, + "parameters": tool.get("tool_parameters", {}), + "settings": {}, + }) + + return result + + +def _extract_strategy(config: AppModelConfig) -> str: + if not config.agent_mode: + return "auto" + try: + agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode + except (json.JSONDecodeError, TypeError): + return "auto" + + strategy = agent_mode.get("strategy", "") + mapping = { + "function_call": "function-calling", + "react": "chain-of-thought", + } + return mapping.get(strategy, "auto") + + +def _extract_max_iterations(config: AppModelConfig) -> int: + if not config.agent_mode: + return 10 + try: + agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode + except (json.JSONDecodeError, TypeError): + return 10 + return agent_mode.get("max_iteration", 10) + + +def _build_context_config(config: AppModelConfig) -> dict[str, Any]: + if config.dataset_configs: + try: + dc = json.loads(config.dataset_configs) if isinstance(config.dataset_configs, str) else config.dataset_configs + if isinstance(dc, dict) and dc.get("datasets", {}).get("datasets", []): + return {"enabled": True} + except (json.JSONDecodeError, TypeError): + pass + return {"enabled": False} + + +def _build_vision_config(config: AppModelConfig) -> dict[str, Any]: + if config.file_upload: + try: + fu = json.loads(config.file_upload) if isinstance(config.file_upload, str) else config.file_upload + if isinstance(fu, dict) and fu.get("image", {}).get("enabled"): + return {"enabled": True} + except (json.JSONDecodeError, TypeError): + pass + return {"enabled": False} + + +def _build_graph(agent_data: dict[str, Any], is_chat: bool) -> dict[str, Any]: + nodes: list[dict[str, Any]] = [ + { + "id": "start", + "type": "custom", + "data": {"type": "start", "title": "Start", "variables": []}, + "position": {"x": 80, "y": 282}, + }, + { + "id": "agent", + "type": "custom", + "data": agent_data, + "position": {"x": 400, "y": 282}, + }, + ] + + if is_chat: + nodes.append({ + "id": "answer", + "type": "custom", + "data": {"type": "answer", "title": "Answer", "answer": "{{#agent.text#}}"}, + "position": {"x": 720, "y": 282}, + }) + end_id = "answer" + else: + nodes.append({ + "id": "end", + "type": "custom", + "data": {"type": "end", "title": "End", "outputs": [{"value_selector": ["agent", "text"], "variable": "result"}]}, + "position": {"x": 720, "y": 282}, + }) + end_id = "end" + + edges = [ + {"id": "start-agent", "source": "start", "target": "agent", "sourceHandle": "source", "targetHandle": "target"}, + {"id": f"agent-{end_id}", "source": "agent", "target": end_id, "sourceHandle": "source", "targetHandle": "target"}, + ] + + return {"nodes": nodes, "edges": edges}