import logging from collections.abc import Callable from functools import wraps from typing import Any, Concatenate, Self, override from uuid import UUID from flask import Response, request from flask_restx import Resource from pydantic import BaseModel, Field from sqlalchemy.orm import sessionmaker from controllers.common.errors import InvalidArgumentError, NotFoundError from controllers.common.fields import SimpleResultResponse from controllers.common.schema import query_params_from_model, register_response_schema_models, register_schema_models from controllers.console import console_ns from controllers.console.app.error import ( DraftWorkflowNotExist, ) from controllers.console.app.wraps import get_app_model from controllers.console.wraps import ( RBACPermission, RBACResourceScope, account_initialization_required, edit_permission_required, rbac_permission_required, setup_required, with_current_user, ) from core.app.file_access import DatabaseFileAccessController from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from extensions.ext_database import db from factories import variable_factory from factories.file_factory import build_from_mapping, build_from_mappings from factories.variable_factory import build_segment_with_type from fields.base import ResponseModel from graphon.file import helpers as file_helpers from graphon.variables.segment_group import SegmentGroup from graphon.variables.segments import ArrayFileSegment, FileSegment, Segment from graphon.variables.types import SegmentType from libs.login import login_required from models import Account, App, AppMode from models.workflow import WorkflowDraftVariable from services.workflow_draft_variable_service import WorkflowDraftVariableList, WorkflowDraftVariableService from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) _file_access_controller = DatabaseFileAccessController() class WorkflowDraftVariableListQuery(BaseModel): page: int = Field(default=1, ge=1, le=100_000, description="Page number") limit: int = Field(default=20, ge=1, le=100, description="Items per page") class WorkflowDraftVariableUpdatePayload(BaseModel): name: str | None = Field(default=None, description="Variable name") value: Any = Field(default=None, description="Variable value") class ConversationVariableUpdatePayload(BaseModel): conversation_variables: list[dict[str, Any]] = Field( ..., description="Conversation variables for the draft workflow" ) class EnvironmentVariableUpdatePayload(BaseModel): environment_variables: list[dict[str, Any]] = Field(..., description="Environment variables for the draft workflow") class WorkflowDraftVariableFullContentResponse(ResponseModel): size_bytes: int | None value_type: str length: int | None download_url: str @classmethod def from_workflow_draft_variable(cls, variable: WorkflowDraftVariable) -> Self | None: if not variable.is_truncated(): return None variable_file = variable.variable_file assert variable_file is not None return cls( size_bytes=variable_file.size, value_type=str(variable_file.value_type.exposed_type()), length=variable_file.length, download_url=file_helpers.get_signed_file_url(variable_file.upload_file_id, as_attachment=True), ) class WorkflowDraftVariableWithoutValueResponse(ResponseModel): id: str type: str name: str description: str selector: list[str] value_type: str edited: bool visible: bool is_truncated: bool @classmethod def from_workflow_draft_variable(cls, variable: WorkflowDraftVariable) -> Self: return cls( id=variable.id, type=variable.get_variable_type().value, name=variable.name, description=variable.description, selector=variable.get_selector(), value_type=_serialize_variable_type(variable), edited=variable.edited, visible=variable.visible, is_truncated=variable.file_id is not None, ) class WorkflowDraftVariableResponse(WorkflowDraftVariableWithoutValueResponse): value: Any full_content: WorkflowDraftVariableFullContentResponse | None @classmethod @override def from_workflow_draft_variable(cls, variable: WorkflowDraftVariable) -> Self: without_value = WorkflowDraftVariableWithoutValueResponse.from_workflow_draft_variable(variable) return cls( **without_value.model_dump(), value=_serialize_var_value(variable), full_content=WorkflowDraftVariableFullContentResponse.from_workflow_draft_variable(variable), ) class WorkflowDraftVariableListWithoutValueResponse(ResponseModel): items: list[WorkflowDraftVariableWithoutValueResponse] total: int | None @classmethod def from_workflow_draft_variable_list(cls, variable_list: WorkflowDraftVariableList) -> Self: return cls( items=[ WorkflowDraftVariableWithoutValueResponse.from_workflow_draft_variable(variable) for variable in variable_list.variables ], total=variable_list.total, ) class WorkflowDraftVariableListResponse(ResponseModel): items: list[WorkflowDraftVariableResponse] @classmethod def from_workflow_draft_variable_list(cls, variable_list: WorkflowDraftVariableList) -> Self: return cls( items=[ WorkflowDraftVariableResponse.from_workflow_draft_variable(variable) for variable in variable_list.variables ], ) class WorkflowDraftEnvironmentVariableResponse(ResponseModel): id: str type: str name: str description: str | None = None selector: list[str] value_type: str value: Any edited: bool visible: bool editable: bool class WorkflowDraftEnvironmentVariableListResponse(ResponseModel): items: list[WorkflowDraftEnvironmentVariableResponse] register_schema_models( console_ns, WorkflowDraftVariableListQuery, WorkflowDraftVariableUpdatePayload, ConversationVariableUpdatePayload, EnvironmentVariableUpdatePayload, ) register_response_schema_models( console_ns, WorkflowDraftVariableFullContentResponse, WorkflowDraftVariableWithoutValueResponse, WorkflowDraftVariableResponse, WorkflowDraftVariableListWithoutValueResponse, WorkflowDraftVariableListResponse, WorkflowDraftEnvironmentVariableResponse, WorkflowDraftEnvironmentVariableListResponse, SimpleResultResponse, ) def _convert_values_to_json_serializable_object(value: Segment) -> Any: match value: case FileSegment(): return value.value.model_dump() case ArrayFileSegment(): return [file.model_dump() for file in value.value] case SegmentGroup(): return [_convert_values_to_json_serializable_object(i) for i in value.value] case _: return value.value def _serialize_var_value(variable: WorkflowDraftVariable) -> Any: value = variable.get_value() # create a copy of the value to avoid affecting the model cache. value = value.model_copy(deep=True) # Refresh the url signature before returning it to client. match value: case FileSegment(): file = value.value file.remote_url = file.generate_url() case ArrayFileSegment(): files = value.value for file in files: file.remote_url = file.generate_url() return _convert_values_to_json_serializable_object(value) def _serialize_variable_type(workflow_draft_var: WorkflowDraftVariable) -> str: value_type = workflow_draft_var.value_type return str(value_type.exposed_type()) def ensure_variable_access( variable: WorkflowDraftVariable | None, app_id: str, variable_id: str, current_user_id: str, ) -> WorkflowDraftVariable: if variable is None: raise NotFoundError(description=f"variable not found, id={variable_id}") if variable.app_id != app_id or variable.user_id != current_user_id: raise NotFoundError(description=f"variable not found, id={variable_id}") return variable def validate_node_id(node_id: str) -> None: if node_id in [ CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID, ]: # NOTE(QuantumGhost): While we store the system and conversation variables as node variables # with specific `node_id` in database, we still want to make the API separated. By disallowing # accessing system and conversation variables in `WorkflowDraftNodeVariableListApi`, # we mitigate the risk that user of the API depending on the implementation detail of the API. # # ref: [Hyrum's Law](https://www.hyrumslaw.com/) raise InvalidArgumentError( f"invalid node_id, please use correspond api for conversation and system variables, node_id={node_id}", ) def _api_prerequisite[T, **P, R]( f: Callable[Concatenate[T, Account, P], R], ) -> Callable[Concatenate[T, P], R | Response]: """Common prerequisites for all draft workflow variable APIs. It ensures the following conditions are satisfied: - Dify has been property setup. - The request user has logged in and initialized. - The requested app is a workflow or a chat flow. - The request user has the edit permission for the app. """ @setup_required @login_required @account_initialization_required @edit_permission_required @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) @with_current_user @wraps(f) def wrapper(self: T, current_user: Account, *args: P.args, **kwargs: P.kwargs) -> R | Response: return f(self, current_user, *args, **kwargs) return wrapper def _get_variable_list(app_model: App, node_id: str, current_user_id: str) -> WorkflowDraftVariableList: with sessionmaker(bind=db.engine, expire_on_commit=False).begin() as session: draft_var_srv = WorkflowDraftVariableService( session=session, ) if node_id == CONVERSATION_VARIABLE_NODE_ID: draft_vars = draft_var_srv.list_conversation_variables(app_model.id, user_id=current_user_id) elif node_id == SYSTEM_VARIABLE_NODE_ID: draft_vars = draft_var_srv.list_system_variables(app_model.id, user_id=current_user_id) else: draft_vars = draft_var_srv.list_node_variables( app_id=app_model.id, node_id=node_id, user_id=current_user_id, ) return draft_vars @console_ns.route("/apps//workflows/draft/variables") class WorkflowVariableCollectionApi(Resource): @console_ns.doc("get_workflow_variables") @console_ns.doc(description="Get draft workflow variables") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.doc(params=query_params_from_model(WorkflowDraftVariableListQuery)) @console_ns.response( 200, "Workflow variables retrieved successfully", console_ns.models[WorkflowDraftVariableListWithoutValueResponse.__name__], ) @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, current_user: Account, app_model: App): """ Get draft workflow """ # response-contract:ignore constructed Pydantic response args = WorkflowDraftVariableListQuery.model_validate(request.args.to_dict(flat=True)) # fetch draft workflow by app_model workflow_service = WorkflowService() workflow_exist = workflow_service.is_workflow_exist(app_model=app_model) if not workflow_exist: raise DraftWorkflowNotExist() # fetch draft workflow by app_model with sessionmaker(bind=db.engine, expire_on_commit=False).begin() as session: draft_var_srv = WorkflowDraftVariableService( session=session, ) workflow_vars = draft_var_srv.list_variables_without_values( app_id=app_model.id, page=args.page, limit=args.limit, user_id=current_user.id, ) return WorkflowDraftVariableListWithoutValueResponse.from_workflow_draft_variable_list( workflow_vars ).model_dump(mode="json") @console_ns.doc("delete_workflow_variables") @console_ns.doc(description="Delete all draft workflow variables") @console_ns.response(204, "Workflow variables deleted successfully") @_api_prerequisite def delete(self, current_user: Account, app_model: App): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) draft_var_srv.delete_user_workflow_variables(app_model.id, user_id=current_user.id) db.session.commit() return "", 204 @console_ns.route("/apps//workflows/draft/nodes//variables") class NodeVariableCollectionApi(Resource): @console_ns.doc("get_node_variables") @console_ns.doc(description="Get variables for a specific node") @console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) @console_ns.response( 200, "Node variables retrieved successfully", console_ns.models[WorkflowDraftVariableListResponse.__name__], ) @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, current_user: Account, app_model: App, node_id: str): # response-contract:ignore constructed Pydantic response validate_node_id(node_id) with sessionmaker(bind=db.engine, expire_on_commit=False).begin() as session: draft_var_srv = WorkflowDraftVariableService( session=session, ) node_vars = draft_var_srv.list_node_variables(app_model.id, node_id, user_id=current_user.id) return WorkflowDraftVariableListResponse.from_workflow_draft_variable_list(node_vars).model_dump(mode="json") @console_ns.doc("delete_node_variables") @console_ns.doc(description="Delete all variables for a specific node") @console_ns.response(204, "Node variables deleted successfully") @_api_prerequisite def delete(self, current_user: Account, app_model: App, node_id: str): validate_node_id(node_id) srv = WorkflowDraftVariableService(db.session()) srv.delete_node_variables(app_model.id, node_id, user_id=current_user.id) db.session.commit() return "", 204 @console_ns.route("/apps//workflows/draft/variables/") class VariableApi(Resource): _PATCH_NAME_FIELD = "name" _PATCH_VALUE_FIELD = "value" @console_ns.doc("get_variable") @console_ns.doc(description="Get a specific workflow variable") @console_ns.doc(params={"app_id": "Application ID", "variable_id": "Variable ID"}) @console_ns.response( 200, "Variable retrieved successfully", console_ns.models[WorkflowDraftVariableResponse.__name__] ) @console_ns.response(404, "Variable not found") @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, current_user: Account, app_model: App, variable_id: UUID): # response-contract:ignore constructed Pydantic response draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) variable_id_str = str(variable_id) variable = ensure_variable_access( variable=draft_var_srv.get_variable(variable_id=variable_id_str), app_id=app_model.id, variable_id=variable_id_str, current_user_id=current_user.id, ) return WorkflowDraftVariableResponse.from_workflow_draft_variable(variable).model_dump(mode="json") @console_ns.doc("update_variable") @console_ns.doc(description="Update a workflow variable") @console_ns.expect(console_ns.models[WorkflowDraftVariableUpdatePayload.__name__]) @console_ns.response( 200, "Variable updated successfully", console_ns.models[WorkflowDraftVariableResponse.__name__] ) @console_ns.response(404, "Variable not found") @_api_prerequisite def patch(self, current_user: Account, app_model: App, variable_id: UUID): # response-contract:ignore constructed Pydantic response # Request payload for file types: # # Local File: # # { # "type": "image", # "transfer_method": "local_file", # "url": "", # "upload_file_id": "daded54f-72c7-4f8e-9d18-9b0abdd9f190" # } # # Remote File: # # # { # "type": "image", # "transfer_method": "remote_url", # "url": "http://127.0.0.1:5001/files/1602650a-4fe4-423c-85a2-af76c083e3c4/file-preview?timestamp=1750041099&nonce=...&sign=...=", # "upload_file_id": "1602650a-4fe4-423c-85a2-af76c083e3c4" # } draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) args_model = WorkflowDraftVariableUpdatePayload.model_validate(console_ns.payload or {}) variable_id_str = str(variable_id) variable = ensure_variable_access( variable=draft_var_srv.get_variable(variable_id=variable_id_str), app_id=app_model.id, variable_id=variable_id_str, current_user_id=current_user.id, ) new_name = args_model.name raw_value = args_model.value if new_name is None and raw_value is None: return WorkflowDraftVariableResponse.from_workflow_draft_variable(variable).model_dump(mode="json") new_value = None if raw_value is not None: new_value_input: Any match variable.value_type: case SegmentType.FILE: if not isinstance(raw_value, dict): raise InvalidArgumentError(description=f"expected dict for file, got {type(raw_value)}") new_value_input = build_from_mapping( mapping=raw_value, tenant_id=app_model.tenant_id, access_controller=_file_access_controller, ) case SegmentType.ARRAY_FILE: if not isinstance(raw_value, list): raise InvalidArgumentError(description=f"expected list for files, got {type(raw_value)}") for index, item in enumerate(raw_value): if not isinstance(item, dict): raise InvalidArgumentError( description=f"expected dict for files[{index}], got {type(item)}" ) new_value_input = build_from_mappings( mappings=raw_value, tenant_id=app_model.tenant_id, access_controller=_file_access_controller, ) case _: new_value_input = raw_value new_value = build_segment_with_type(variable.value_type, new_value_input) draft_var_srv.update_variable(variable, name=new_name, value=new_value) db.session.commit() return WorkflowDraftVariableResponse.from_workflow_draft_variable(variable).model_dump(mode="json") @console_ns.doc("delete_variable") @console_ns.doc(description="Delete a workflow variable") @console_ns.response(204, "Variable deleted successfully") @console_ns.response(404, "Variable not found") @_api_prerequisite def delete(self, current_user: Account, app_model: App, variable_id: UUID): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) variable_id_str = str(variable_id) variable = ensure_variable_access( variable=draft_var_srv.get_variable(variable_id=variable_id_str), app_id=app_model.id, variable_id=variable_id_str, current_user_id=current_user.id, ) draft_var_srv.delete_variable(variable) db.session.commit() return "", 204 @console_ns.route("/apps//workflows/draft/variables//reset") class VariableResetApi(Resource): @console_ns.doc("reset_variable") @console_ns.doc(description="Reset a workflow variable to its default value") @console_ns.doc(params={"app_id": "Application ID", "variable_id": "Variable ID"}) @console_ns.response(200, "Variable reset successfully", console_ns.models[WorkflowDraftVariableResponse.__name__]) @console_ns.response(204, "Variable reset (no content)") @console_ns.response(404, "Variable not found") @_api_prerequisite def put(self, current_user: Account, app_model: App, variable_id: UUID): # response-contract:ignore constructed Pydantic response draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) workflow_srv = WorkflowService() draft_workflow = workflow_srv.get_draft_workflow(app_model) if draft_workflow is None: raise NotFoundError( f"Draft workflow not found, app_id={app_model.id}", ) variable_id_str = str(variable_id) variable = ensure_variable_access( variable=draft_var_srv.get_variable(variable_id=variable_id_str), app_id=app_model.id, variable_id=variable_id_str, current_user_id=current_user.id, ) resetted = draft_var_srv.reset_variable(draft_workflow, variable) db.session.commit() if resetted is None: return "", 204 return WorkflowDraftVariableResponse.from_workflow_draft_variable(resetted).model_dump(mode="json") @console_ns.route("/apps//workflows/draft/conversation-variables") class ConversationVariableCollectionApi(Resource): @console_ns.doc("get_conversation_variables") @console_ns.doc(description="Get conversation variables for workflow") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.response( 200, "Conversation variables retrieved successfully", console_ns.models[WorkflowDraftVariableListResponse.__name__], ) @console_ns.response(404, "Draft workflow not found") @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, current_user: Account, app_model: App): # response-contract:ignore constructed Pydantic response # NOTE(QuantumGhost): Prefill conversation variables into the draft variables table # so their IDs can be returned to the caller. workflow_srv = WorkflowService() draft_workflow = workflow_srv.get_draft_workflow(app_model) if draft_workflow is None: raise NotFoundError(description=f"draft workflow not found, id={app_model.id}") draft_var_srv = WorkflowDraftVariableService(db.session()) draft_var_srv.prefill_conversation_variable_default_values(draft_workflow, user_id=current_user.id) db.session.commit() return WorkflowDraftVariableListResponse.from_workflow_draft_variable_list( _get_variable_list(app_model, CONVERSATION_VARIABLE_NODE_ID, current_user.id) ).model_dump(mode="json") @console_ns.expect(console_ns.models[ConversationVariableUpdatePayload.__name__]) @console_ns.doc("update_conversation_variables") @console_ns.doc(description="Update conversation variables for workflow draft") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.response( 200, "Conversation variables updated successfully", console_ns.models[SimpleResultResponse.__name__], ) @setup_required @login_required @account_initialization_required @edit_permission_required @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_EDIT) @with_current_user @get_app_model(mode=AppMode.ADVANCED_CHAT) def post(self, current_user: Account, app_model: App): payload = ConversationVariableUpdatePayload.model_validate(console_ns.payload or {}) workflow_service = WorkflowService() conversation_variables_list = payload.conversation_variables conversation_variables = [ variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list ] workflow_service.update_draft_workflow_conversation_variables( app_model=app_model, account=current_user, conversation_variables=conversation_variables, ) return SimpleResultResponse(result="success").model_dump(mode="json") @console_ns.route("/apps//workflows/draft/system-variables") class SystemVariableCollectionApi(Resource): @console_ns.doc("get_system_variables") @console_ns.doc(description="Get system variables for workflow") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.response( 200, "System variables retrieved successfully", console_ns.models[WorkflowDraftVariableListResponse.__name__], ) @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, current_user: Account, app_model: App): # response-contract:ignore constructed Pydantic response return WorkflowDraftVariableListResponse.from_workflow_draft_variable_list( _get_variable_list(app_model, SYSTEM_VARIABLE_NODE_ID, current_user.id) ).model_dump(mode="json") @console_ns.route("/apps//workflows/draft/environment-variables") class EnvironmentVariableCollectionApi(Resource): @console_ns.doc("get_environment_variables") @console_ns.doc(description="Get environment variables for workflow") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.response( 200, "Environment variables retrieved successfully", console_ns.models[WorkflowDraftEnvironmentVariableListResponse.__name__], ) @console_ns.response(404, "Draft workflow not found") @_api_prerequisite @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_VIEW_LAYOUT) def get(self, _current_user: Account, app_model: App): """ Get draft workflow """ # fetch draft workflow by app_model workflow_service = WorkflowService() workflow = workflow_service.get_draft_workflow(app_model=app_model) if workflow is None: raise DraftWorkflowNotExist() env_vars = workflow.environment_variables env_vars_list = [] for v in env_vars: env_vars_list.append( WorkflowDraftEnvironmentVariableResponse( id=v.id, type="env", name=v.name, description=v.description, selector=list(v.selector), value_type=str(v.value_type.exposed_type()), value=v.value, # Do not track edited for env vars. edited=False, visible=True, editable=True, ) ) return WorkflowDraftEnvironmentVariableListResponse(items=env_vars_list).model_dump(mode="json") @console_ns.expect(console_ns.models[EnvironmentVariableUpdatePayload.__name__]) @console_ns.doc("update_environment_variables") @console_ns.doc(description="Update environment variables for workflow draft") @console_ns.doc(params={"app_id": "Application ID"}) @console_ns.response( 200, "Environment variables updated successfully", console_ns.models[SimpleResultResponse.__name__], ) @setup_required @login_required @account_initialization_required @edit_permission_required @rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_EDIT) @with_current_user @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) def post(self, current_user: Account, app_model: App): payload = EnvironmentVariableUpdatePayload.model_validate(console_ns.payload or {}) workflow_service = WorkflowService() environment_variables_list = payload.environment_variables environment_variables = [ variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list ] workflow_service.update_draft_workflow_environment_variables( app_model=app_model, account=current_user, environment_variables=environment_variables, ) return SimpleResultResponse(result="success").model_dump(mode="json")