From 9eeceb2455f1acb0e4cc21d371988f0cfbb465b2 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Sat, 17 Jan 2026 15:54:32 +0800 Subject: [PATCH] fix basedpyright --- api/app.py | 2 +- api/app_factory.py | 9 +- api/controllers/console/__init__.py | 2 + api/controllers/console/app/online_user.py | 24 ++- .../console/app/workflow_comment.py | 201 ++++++++++++------ api/extensions/ext_socketio.py | 2 +- api/services/workflow_service.py | 8 +- 7 files changed, 166 insertions(+), 82 deletions(-) diff --git a/api/app.py b/api/app.py index ee0f91de3b..76ffdd1067 100644 --- a/api/app.py +++ b/api/app.py @@ -35,7 +35,7 @@ else: if __name__ == "__main__": from gevent import pywsgi - from geventwebsocket.handler import WebSocketHandler + from geventwebsocket.handler import WebSocketHandler # type: ignore[reportMissingTypeStubs] host = os.environ.get("HOST", "0.0.0.0") port = int(os.environ.get("PORT", 5001)) diff --git a/api/app_factory.py b/api/app_factory.py index 8d3d13d12a..7e7a4bf8c8 100644 --- a/api/app_factory.py +++ b/api/app_factory.py @@ -1,6 +1,8 @@ import logging import time +from typing import Any +import socketio # type: ignore[reportMissingTypeStubs] from opentelemetry.trace import get_current_span from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID @@ -8,6 +10,7 @@ from configs import dify_config from contexts.wrapper import RecyclableContextVar from core.logging.context import init_request_context from dify_app import DifyApp +from extensions.ext_socketio import sio logger = logging.getLogger(__name__) @@ -60,15 +63,11 @@ def create_flask_app_with_configs() -> DifyApp: return dify_app -def create_app() -> tuple[any, DifyApp]: +def create_app() -> tuple[Any, DifyApp]: start_time = time.perf_counter() app = create_flask_app_with_configs() initialize_extensions(app) - import socketio - - from extensions.ext_socketio import sio - sio.app = app socketio_app = socketio.WSGIApp(sio, app) diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index b341b18778..c44ea6a100 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -183,6 +183,7 @@ __all__ = [ "models", "oauth", "oauth_server", + "online_user", "ops_trace", "parameter", "ping", @@ -205,6 +206,7 @@ __all__ = [ "website", "workflow", "workflow_app_log", + "workflow_comment", "workflow_draft_variable", "workflow_run", "workflow_statistic", diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 8dceaaaec2..dc50d2d8c4 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -1,8 +1,10 @@ import json import logging import time +from collections.abc import Callable +from typing import Any, cast -from werkzeug.wrappers import Request as WerkzeugRequest +from flask import Request as FlaskRequest from extensions.ext_redis import redis_client from extensions.ext_socketio import sio @@ -40,7 +42,11 @@ def _refresh_session_state(workflow_id: str, sid: str) -> None: redis_client.expire(sid_key, SESSION_STATE_TTL_SECONDS) -@sio.on("connect") +def _sio_on(event: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + return cast(Callable[[Callable[..., Any]], Callable[..., Any]], sio.on(event)) + + +@_sio_on("connect") def socket_connect(sid, environ, auth): """ WebSocket connect event, do authentication here. @@ -51,7 +57,7 @@ def socket_connect(sid, environ, auth): if not token: try: - request_environ = WerkzeugRequest(environ) + request_environ = FlaskRequest(environ) token = extract_access_token(request_environ) except Exception: logging.exception("Failed to extract token") @@ -80,7 +86,7 @@ def socket_connect(sid, environ, auth): return False -@sio.on("user_connect") +@_sio_on("user_connect") def handle_user_connect(sid, data): """ Handle user connect event. Each session (tab) is treated as an independent collaborator. @@ -128,7 +134,7 @@ def handle_user_connect(sid, data): return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader} -@sio.on("disconnect") +@_sio_on("disconnect") def handle_disconnect(sid): """ Handle session disconnect event. Remove the specific session from online users. @@ -252,7 +258,7 @@ def broadcast_leader_change(workflow_id, new_leader_sid): """ sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) - for sid, session_info_json in sessions_json.items(): + for sid in sessions_json: try: sid_str = sid.decode("utf-8") if isinstance(sid, bytes) else sid is_leader = sid_str == new_leader_sid @@ -279,7 +285,7 @@ def broadcast_online_users(workflow_id): sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) users = [] - for sid, session_info_json in sessions_json.items(): + for session_info_json in sessions_json.values(): try: session_info = json.loads(session_info_json) # Each session appears as a separate "user" in the UI @@ -304,7 +310,7 @@ def broadcast_online_users(workflow_id): sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, room=workflow_id) -@sio.on("collaboration_event") +@_sio_on("collaboration_event") def handle_collaboration_event(sid, data): """ Handle general collaboration events, include: @@ -345,7 +351,7 @@ def handle_collaboration_event(sid, data): return {"msg": "event_broadcasted"} -@sio.on("graph_event") +@_sio_on("graph_event") def handle_graph_event(sid, data): """ Handle graph events - simple broadcast relay. diff --git a/api/controllers/console/app/workflow_comment.py b/api/controllers/console/app/workflow_comment.py index 4e3a311de2..6b06a5922a 100644 --- a/api/controllers/console/app/workflow_comment.py +++ b/api/controllers/console/app/workflow_comment.py @@ -1,8 +1,9 @@ import logging -from flask_restx import Resource, fields, marshal_with, reqparse +from flask_restx import Resource, fields, marshal_with +from pydantic import BaseModel, Field -from controllers.console import api +from controllers.console import console_ns from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, setup_required from fields.member_fields import account_with_role_fields @@ -21,57 +22,117 @@ from services.account_service import TenantService from services.workflow_comment_service import WorkflowCommentService logger = logging.getLogger(__name__) +DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}" +class WorkflowCommentCreatePayload(BaseModel): + position_x: float = Field(..., description="Comment X position") + position_y: float = Field(..., description="Comment Y position") + content: str = Field(..., description="Comment content") + mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs") + + +class WorkflowCommentUpdatePayload(BaseModel): + content: str = Field(..., description="Comment content") + position_x: float | None = Field(default=None, description="Comment X position") + position_y: float | None = Field(default=None, description="Comment Y position") + mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs") + + +class WorkflowCommentReplyCreatePayload(BaseModel): + content: str = Field(..., description="Reply content") + mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs") + + +class WorkflowCommentReplyUpdatePayload(BaseModel): + content: str = Field(..., description="Reply content") + mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs") + + +for model in ( + WorkflowCommentCreatePayload, + WorkflowCommentUpdatePayload, + WorkflowCommentReplyCreatePayload, + WorkflowCommentReplyUpdatePayload, +): + console_ns.schema_model(model.__name__, model.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0)) + +workflow_comment_basic_model = console_ns.model("WorkflowCommentBasic", workflow_comment_basic_fields) +workflow_comment_detail_model = console_ns.model("WorkflowCommentDetail", workflow_comment_detail_fields) +workflow_comment_create_model = console_ns.model("WorkflowCommentCreate", workflow_comment_create_fields) +workflow_comment_update_model = console_ns.model("WorkflowCommentUpdate", workflow_comment_update_fields) +workflow_comment_resolve_model = console_ns.model("WorkflowCommentResolve", workflow_comment_resolve_fields) +workflow_comment_reply_create_model = console_ns.model( + "WorkflowCommentReplyCreate", workflow_comment_reply_create_fields +) +workflow_comment_reply_update_model = console_ns.model( + "WorkflowCommentReplyUpdate", workflow_comment_reply_update_fields +) +workflow_comment_mention_users_model = console_ns.model( + "WorkflowCommentMentionUsers", + {"users": fields.List(fields.Nested(account_with_role_fields))}, +) + + +@console_ns.route("/apps//workflow/comments") class WorkflowCommentListApi(Resource): """API for listing and creating workflow comments.""" + @console_ns.doc("list_workflow_comments") + @console_ns.doc(description="Get all comments for a workflow") + @console_ns.doc(params={"app_id": "Application ID"}) + @console_ns.response(200, "Comments retrieved successfully", workflow_comment_basic_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_basic_fields, envelope="data") + @get_app_model() + @marshal_with(workflow_comment_basic_model, envelope="data") def get(self, app_model: App): """Get all comments for a workflow.""" comments = WorkflowCommentService.get_comments(tenant_id=current_user.current_tenant_id, app_id=app_model.id) return comments + @console_ns.doc("create_workflow_comment") + @console_ns.doc(description="Create a new workflow comment") + @console_ns.doc(params={"app_id": "Application ID"}) + @console_ns.expect(console_ns.models[WorkflowCommentCreatePayload.__name__]) + @console_ns.response(201, "Comment created successfully", workflow_comment_create_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_create_fields) + @get_app_model() + @marshal_with(workflow_comment_create_model) def post(self, app_model: App): """Create a new workflow comment.""" - parser = reqparse.RequestParser() - parser.add_argument("position_x", type=float, required=True, location="json") - parser.add_argument("position_y", type=float, required=True, location="json") - parser.add_argument("content", type=str, required=True, location="json") - parser.add_argument("mentioned_user_ids", type=list, location="json", default=[]) - args = parser.parse_args() + payload = WorkflowCommentCreatePayload.model_validate(console_ns.payload or {}) result = WorkflowCommentService.create_comment( tenant_id=current_user.current_tenant_id, app_id=app_model.id, created_by=current_user.id, - content=args.content, - position_x=args.position_x, - position_y=args.position_y, - mentioned_user_ids=args.mentioned_user_ids, + content=payload.content, + position_x=payload.position_x, + position_y=payload.position_y, + mentioned_user_ids=payload.mentioned_user_ids, ) return result, 201 +@console_ns.route("/apps//workflow/comments/") class WorkflowCommentDetailApi(Resource): """API for managing individual workflow comments.""" + @console_ns.doc("get_workflow_comment") + @console_ns.doc(description="Get a specific workflow comment") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"}) + @console_ns.response(200, "Comment retrieved successfully", workflow_comment_detail_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_detail_fields) + @get_app_model() + @marshal_with(workflow_comment_detail_model) def get(self, app_model: App, comment_id: str): """Get a specific workflow comment.""" comment = WorkflowCommentService.get_comment( @@ -80,37 +141,41 @@ class WorkflowCommentDetailApi(Resource): return comment + @console_ns.doc("update_workflow_comment") + @console_ns.doc(description="Update a workflow comment") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"}) + @console_ns.expect(console_ns.models[WorkflowCommentUpdatePayload.__name__]) + @console_ns.response(200, "Comment updated successfully", workflow_comment_update_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_update_fields) + @get_app_model() + @marshal_with(workflow_comment_update_model) def put(self, app_model: App, comment_id: str): """Update a workflow comment.""" - parser = reqparse.RequestParser() - parser.add_argument("content", type=str, required=True, location="json") - parser.add_argument("position_x", type=float, required=False, location="json") - parser.add_argument("position_y", type=float, required=False, location="json") - parser.add_argument("mentioned_user_ids", type=list, location="json", default=[]) - args = parser.parse_args() + payload = WorkflowCommentUpdatePayload.model_validate(console_ns.payload or {}) result = WorkflowCommentService.update_comment( tenant_id=current_user.current_tenant_id, app_id=app_model.id, comment_id=comment_id, user_id=current_user.id, - content=args.content, - position_x=args.position_x, - position_y=args.position_y, - mentioned_user_ids=args.mentioned_user_ids, + content=payload.content, + position_x=payload.position_x, + position_y=payload.position_y, + mentioned_user_ids=payload.mentioned_user_ids, ) return result + @console_ns.doc("delete_workflow_comment") + @console_ns.doc(description="Delete a workflow comment") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"}) + @console_ns.response(204, "Comment deleted successfully") @login_required @setup_required @account_initialization_required - @get_app_model + @get_app_model() def delete(self, app_model: App, comment_id: str): """Delete a workflow comment.""" WorkflowCommentService.delete_comment( @@ -123,14 +188,19 @@ class WorkflowCommentDetailApi(Resource): return {"result": "success"}, 204 +@console_ns.route("/apps//workflow/comments//resolve") class WorkflowCommentResolveApi(Resource): """API for resolving and reopening workflow comments.""" + @console_ns.doc("resolve_workflow_comment") + @console_ns.doc(description="Resolve a workflow comment") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"}) + @console_ns.response(200, "Comment resolved successfully", workflow_comment_resolve_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_resolve_fields) + @get_app_model() + @marshal_with(workflow_comment_resolve_model) def post(self, app_model: App, comment_id: str): """Resolve a workflow comment.""" comment = WorkflowCommentService.resolve_comment( @@ -143,14 +213,20 @@ class WorkflowCommentResolveApi(Resource): return comment +@console_ns.route("/apps//workflow/comments//replies") class WorkflowCommentReplyApi(Resource): """API for managing comment replies.""" + @console_ns.doc("create_workflow_comment_reply") + @console_ns.doc(description="Add a reply to a workflow comment") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"}) + @console_ns.expect(console_ns.models[WorkflowCommentReplyCreatePayload.__name__]) + @console_ns.response(201, "Reply created successfully", workflow_comment_reply_create_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_reply_create_fields) + @get_app_model() + @marshal_with(workflow_comment_reply_create_model) def post(self, app_model: App, comment_id: str): """Add a reply to a workflow comment.""" # Validate comment access first @@ -158,29 +234,32 @@ class WorkflowCommentReplyApi(Resource): comment_id=comment_id, tenant_id=current_user.current_tenant_id, app_id=app_model.id ) - parser = reqparse.RequestParser() - parser.add_argument("content", type=str, required=True, location="json") - parser.add_argument("mentioned_user_ids", type=list, location="json", default=[]) - args = parser.parse_args() + payload = WorkflowCommentReplyCreatePayload.model_validate(console_ns.payload or {}) result = WorkflowCommentService.create_reply( comment_id=comment_id, - content=args.content, + content=payload.content, created_by=current_user.id, - mentioned_user_ids=args.mentioned_user_ids, + mentioned_user_ids=payload.mentioned_user_ids, ) return result, 201 +@console_ns.route("/apps//workflow/comments//replies/") class WorkflowCommentReplyDetailApi(Resource): """API for managing individual comment replies.""" + @console_ns.doc("update_workflow_comment_reply") + @console_ns.doc(description="Update a comment reply") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID", "reply_id": "Reply ID"}) + @console_ns.expect(console_ns.models[WorkflowCommentReplyUpdatePayload.__name__]) + @console_ns.response(200, "Reply updated successfully", workflow_comment_reply_update_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with(workflow_comment_reply_update_fields) + @get_app_model() + @marshal_with(workflow_comment_reply_update_model) def put(self, app_model: App, comment_id: str, reply_id: str): """Update a comment reply.""" # Validate comment access first @@ -188,21 +267,25 @@ class WorkflowCommentReplyDetailApi(Resource): comment_id=comment_id, tenant_id=current_user.current_tenant_id, app_id=app_model.id ) - parser = reqparse.RequestParser() - parser.add_argument("content", type=str, required=True, location="json") - parser.add_argument("mentioned_user_ids", type=list, location="json", default=[]) - args = parser.parse_args() + payload = WorkflowCommentReplyUpdatePayload.model_validate(console_ns.payload or {}) reply = WorkflowCommentService.update_reply( - reply_id=reply_id, user_id=current_user.id, content=args.content, mentioned_user_ids=args.mentioned_user_ids + reply_id=reply_id, + user_id=current_user.id, + content=payload.content, + mentioned_user_ids=payload.mentioned_user_ids, ) return reply + @console_ns.doc("delete_workflow_comment_reply") + @console_ns.doc(description="Delete a comment reply") + @console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID", "reply_id": "Reply ID"}) + @console_ns.response(204, "Reply deleted successfully") @login_required @setup_required @account_initialization_required - @get_app_model + @get_app_model() def delete(self, app_model: App, comment_id: str, reply_id: str): """Delete a comment reply.""" # Validate comment access first @@ -215,26 +298,20 @@ class WorkflowCommentReplyDetailApi(Resource): return {"result": "success"}, 204 +@console_ns.route("/apps//workflow/comments/mention-users") class WorkflowCommentMentionUsersApi(Resource): """API for getting mentionable users for workflow comments.""" + @console_ns.doc("workflow_comment_mention_users") + @console_ns.doc(description="Get all users in current tenant for mentions") + @console_ns.doc(params={"app_id": "Application ID"}) + @console_ns.response(200, "Mentionable users retrieved successfully", workflow_comment_mention_users_model) @login_required @setup_required @account_initialization_required - @get_app_model - @marshal_with({"users": fields.List(fields.Nested(account_with_role_fields))}) + @get_app_model() + @marshal_with(workflow_comment_mention_users_model) def get(self, app_model: App): """Get all users in current tenant for mentions.""" members = TenantService.get_tenant_members(current_user.current_tenant) return {"users": members} - - -# Register API routes -api.add_resource(WorkflowCommentListApi, "/apps//workflow/comments") -api.add_resource(WorkflowCommentDetailApi, "/apps//workflow/comments/") -api.add_resource(WorkflowCommentResolveApi, "/apps//workflow/comments//resolve") -api.add_resource(WorkflowCommentReplyApi, "/apps//workflow/comments//replies") -api.add_resource( - WorkflowCommentReplyDetailApi, "/apps//workflow/comments//replies/" -) -api.add_resource(WorkflowCommentMentionUsersApi, "/apps//workflow/comments/mention-users") diff --git a/api/extensions/ext_socketio.py b/api/extensions/ext_socketio.py index 470e0b08e2..5ed82bac8d 100644 --- a/api/extensions/ext_socketio.py +++ b/api/extensions/ext_socketio.py @@ -1,4 +1,4 @@ -import socketio +import socketio # type: ignore[reportMissingTypeStubs] from configs import dify_config diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 9472735dfd..3ab344c187 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -198,8 +198,8 @@ class WorkflowService: features: dict, unique_hash: str | None, account: Account, - environment_variables: Sequence[Variable], - conversation_variables: Sequence[Variable], + environment_variables: Sequence[VariableBase], + conversation_variables: Sequence[VariableBase], force_upload: bool = False, ) -> Workflow: """ @@ -255,7 +255,7 @@ class WorkflowService: self, *, app_model: App, - environment_variables: Sequence[Variable], + environment_variables: Sequence[VariableBase], account: Account, ): """ @@ -278,7 +278,7 @@ class WorkflowService: self, *, app_model: App, - conversation_variables: Sequence[Variable], + conversation_variables: Sequence[VariableBase], account: Account, ): """