dify/api/services/workflow/virtual_workflow.py
Yansong Zhang 77c182f738 feat(api): propagate all app features in transparent upgrade
VirtualWorkflowSynthesizer._build_features() now extracts ALL legacy
app features from AppModelConfig into the synthesized workflow.features:

- opening_statement + suggested_questions
- sensitive_word_avoidance (keywords/API moderation)
- more_like_this
- speech_to_text / text_to_speech
- retriever_resource

Previously workflow.features was hardcoded to "{}", losing all these
features during transparent upgrade. Now AdvancedChatAppRunner's
moderation, opening text, and other feature layers work correctly
for transparently upgraded old apps.

Made-with: Cursor
2026-04-10 18:47:18 +08:00

329 lines
11 KiB
Python

"""Virtual Workflow Synthesizer for transparent old-app upgrade.
Converts an old App's AppModelConfig into an in-memory Workflow object
with a single agent-v2 node, without persisting to the database.
This allows legacy apps (chat/completion/agent-chat) to run through
the Agent V2 workflow engine transparently.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from uuid import uuid4
from core.workflow.nodes.agent_v2.entities import AGENT_V2_NODE_TYPE
from models.model import App, AppMode, AppModelConfig
logger = logging.getLogger(__name__)
class VirtualWorkflowSynthesizer:
"""Synthesize in-memory Workflow from legacy AppModelConfig."""
@staticmethod
def synthesize(app: App) -> Any:
"""Convert old app config to a virtual Workflow object.
Returns a Workflow-like object (not persisted to DB) that can be
passed to AdvancedChatAppGenerator.generate().
"""
from models.workflow import Workflow, WorkflowType
config = app.app_model_config
if not config:
raise ValueError("App has no model config")
model_dict = _extract_model_config(config)
prompt_template = _build_prompt_template(config, app.mode)
tools = _extract_tools(config)
agent_strategy = _extract_strategy(config)
max_iterations = _extract_max_iterations(config)
context = _build_context_config(config)
vision = _build_vision_config(config)
is_chat = app.mode != AppMode.COMPLETION
agent_node_data: dict[str, Any] = {
"type": AGENT_V2_NODE_TYPE,
"title": "Agent",
"model": model_dict,
"prompt_template": prompt_template,
"tools": tools,
"max_iterations": max_iterations,
"agent_strategy": agent_strategy,
"context": context,
"vision": vision,
}
if is_chat:
agent_node_data["memory"] = {"window": {"enabled": True, "size": 50}}
graph = _build_graph(agent_node_data, is_chat)
workflow = Workflow()
workflow.id = str(uuid4())
workflow.tenant_id = app.tenant_id
workflow.app_id = app.id
workflow.type = WorkflowType.CHAT if is_chat else WorkflowType.WORKFLOW
workflow.version = "virtual"
workflow.graph = json.dumps(graph)
workflow.features = json.dumps(_build_features(config))
workflow.created_by = app.created_by
workflow.updated_by = app.updated_by
return workflow
@staticmethod
def ensure_workflow(app: App) -> Any:
"""Ensure the old app has a workflow, creating one if needed.
On first call for a legacy app, synthesizes a workflow from its
AppModelConfig and persists it as a draft. On subsequent calls,
returns the existing draft. This is a one-time lazy upgrade:
the app gets a real workflow that can be edited in the workflow editor.
The app's workflow_id is NOT updated (preserving its legacy state),
but the workflow is findable via app_id + version="draft".
"""
from models.workflow import Workflow
from extensions.ext_database import db
existing = db.session.query(Workflow).filter_by(
app_id=app.id, version="draft"
).first()
if existing:
return existing
workflow = VirtualWorkflowSynthesizer.synthesize(app)
workflow.version = "draft"
db.session.add(workflow)
db.session.commit()
logger.info("Created draft workflow %s for legacy app %s", workflow.id, app.id)
return workflow
def _extract_model_config(config: AppModelConfig) -> dict[str, Any]:
if config.model:
try:
return json.loads(config.model)
except (json.JSONDecodeError, TypeError):
pass
return {"provider": "openai", "name": "gpt-4o", "mode": "chat", "completion_params": {}}
def _build_prompt_template(config: AppModelConfig, mode: str) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
if config.prompt_type and config.prompt_type.value == "advanced":
if config.chat_prompt_config:
try:
chat_config = json.loads(config.chat_prompt_config)
if isinstance(chat_config, dict) and "prompt" in chat_config:
prompts = chat_config["prompt"]
if isinstance(prompts, list):
for p in prompts:
if isinstance(p, dict) and "role" in p and "text" in p:
messages.append({"role": p["role"], "text": p["text"]})
except (json.JSONDecodeError, TypeError):
pass
if not messages:
pre_prompt = config.pre_prompt or ""
if pre_prompt:
messages.append({"role": "system", "text": pre_prompt})
if mode == AppMode.COMPLETION:
messages.append({"role": "user", "text": "{{#sys.query#}}"})
else:
messages.append({"role": "user", "text": "{{#sys.query#}}"})
return messages
def _extract_tools(config: AppModelConfig) -> list[dict[str, Any]]:
if not config.agent_mode:
return []
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return []
if not isinstance(agent_mode, dict) or not agent_mode.get("enabled"):
return []
tools_config = agent_mode.get("tools", [])
result: list[dict[str, Any]] = []
for tool in tools_config:
if not isinstance(tool, dict):
continue
if not tool.get("enabled", True):
continue
provider_type = tool.get("provider_type", "builtin")
provider_id = tool.get("provider_id", "")
tool_name = tool.get("tool_name", "")
if not tool_name:
continue
result.append({
"enabled": True,
"type": provider_type,
"provider_name": provider_id,
"tool_name": tool_name,
"parameters": tool.get("tool_parameters", {}),
"settings": {},
})
return result
def _extract_strategy(config: AppModelConfig) -> str:
if not config.agent_mode:
return "auto"
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return "auto"
strategy = agent_mode.get("strategy", "")
mapping = {
"function_call": "function-calling",
"react": "chain-of-thought",
}
return mapping.get(strategy, "auto")
def _extract_max_iterations(config: AppModelConfig) -> int:
if not config.agent_mode:
return 10
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return 10
return agent_mode.get("max_iteration", 10)
def _build_context_config(config: AppModelConfig) -> dict[str, Any]:
if config.dataset_configs:
try:
dc = json.loads(config.dataset_configs) if isinstance(config.dataset_configs, str) else config.dataset_configs
if isinstance(dc, dict) and dc.get("datasets", {}).get("datasets", []):
return {"enabled": True}
except (json.JSONDecodeError, TypeError):
pass
return {"enabled": False}
def _build_vision_config(config: AppModelConfig) -> dict[str, Any]:
if config.file_upload:
try:
fu = json.loads(config.file_upload) if isinstance(config.file_upload, str) else config.file_upload
if isinstance(fu, dict) and fu.get("image", {}).get("enabled"):
return {"enabled": True}
except (json.JSONDecodeError, TypeError):
pass
return {"enabled": False}
def _build_graph(agent_data: dict[str, Any], is_chat: bool) -> dict[str, Any]:
nodes: list[dict[str, Any]] = [
{
"id": "start",
"type": "custom",
"data": {"type": "start", "title": "Start", "variables": []},
"position": {"x": 80, "y": 282},
},
{
"id": "agent",
"type": "custom",
"data": agent_data,
"position": {"x": 400, "y": 282},
},
]
if is_chat:
nodes.append({
"id": "answer",
"type": "custom",
"data": {"type": "answer", "title": "Answer", "answer": "{{#agent.text#}}"},
"position": {"x": 720, "y": 282},
})
end_id = "answer"
else:
nodes.append({
"id": "end",
"type": "custom",
"data": {"type": "end", "title": "End", "outputs": [{"value_selector": ["agent", "text"], "variable": "result"}]},
"position": {"x": 720, "y": 282},
})
end_id = "end"
edges = [
{"id": "start-agent", "source": "start", "target": "agent", "sourceHandle": "source", "targetHandle": "target"},
{"id": f"agent-{end_id}", "source": "agent", "target": end_id, "sourceHandle": "source", "targetHandle": "target"},
]
return {"nodes": nodes, "edges": edges}
def _build_features(config: AppModelConfig) -> dict[str, Any]:
"""Extract app-level features from AppModelConfig for the synthesized workflow."""
features: dict[str, Any] = {}
if config.opening_statement:
features["opening_statement"] = config.opening_statement
if config.suggested_questions:
try:
sq = json.loads(config.suggested_questions) if isinstance(config.suggested_questions, str) else config.suggested_questions
if sq:
features["suggested_questions"] = sq
except (json.JSONDecodeError, TypeError):
pass
if config.sensitive_word_avoidance:
try:
swa = json.loads(config.sensitive_word_avoidance) if isinstance(config.sensitive_word_avoidance, str) else config.sensitive_word_avoidance
if swa and swa.get("enabled"):
features["sensitive_word_avoidance"] = swa
except (json.JSONDecodeError, TypeError):
pass
if config.more_like_this:
try:
mlt = json.loads(config.more_like_this) if isinstance(config.more_like_this, str) else config.more_like_this
if mlt and mlt.get("enabled"):
features["more_like_this"] = mlt
except (json.JSONDecodeError, TypeError):
pass
if config.speech_to_text:
try:
stt = json.loads(config.speech_to_text) if isinstance(config.speech_to_text, str) else config.speech_to_text
if stt and stt.get("enabled"):
features["speech_to_text"] = stt
except (json.JSONDecodeError, TypeError):
pass
if config.text_to_speech:
try:
tts = json.loads(config.text_to_speech) if isinstance(config.text_to_speech, str) else config.text_to_speech
if tts and tts.get("enabled"):
features["text_to_speech"] = tts
except (json.JSONDecodeError, TypeError):
pass
if config.retriever_resource:
try:
rr = json.loads(config.retriever_resource) if isinstance(config.retriever_resource, str) else config.retriever_resource
if rr and rr.get("enabled"):
features["retriever_resource"] = rr
except (json.JSONDecodeError, TypeError):
pass
return features