refactor: replace bare dict with dict[str, Any] in services and hosti… (#35211)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
wdeveloper16 2026-04-14 21:44:40 +02:00 committed by GitHub
parent b1df52b8ff
commit 9c90c1c455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 23 deletions

View File

@ -1,3 +1,5 @@
from typing import Any
from flask import Flask
from graphon.model_runtime.entities.model_entities import ModelType
from pydantic import BaseModel
@ -28,7 +30,7 @@ class FreeHostingQuota(HostingQuota):
class HostingProvider(BaseModel):
enabled: bool = False
credentials: dict | None = None
credentials: dict[str, Any] | None = None
quota_unit: QuotaUnit | None = None
quotas: list[HostingQuota] = []

View File

@ -104,7 +104,7 @@ class RagPipelineService:
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@classmethod
def get_pipeline_templates(cls, type: str = "built-in", language: str = "en-US") -> dict:
def get_pipeline_templates(cls, type: str = "built-in", language: str = "en-US") -> dict[str, Any]:
if type == "built-in":
mode = dify_config.HOSTED_FETCH_PIPELINE_TEMPLATES_MODE
retrieval_instance = PipelineTemplateRetrievalFactory.get_pipeline_template_factory(mode)()
@ -120,7 +120,7 @@ class RagPipelineService:
return result
@classmethod
def get_pipeline_template_detail(cls, template_id: str, type: str = "built-in") -> dict | None:
def get_pipeline_template_detail(cls, template_id: str, type: str = "built-in") -> dict[str, Any] | None:
"""
Get pipeline template detail.
@ -131,7 +131,7 @@ class RagPipelineService:
if type == "built-in":
mode = dify_config.HOSTED_FETCH_PIPELINE_TEMPLATES_MODE
retrieval_instance = PipelineTemplateRetrievalFactory.get_pipeline_template_factory(mode)()
built_in_result: dict | None = retrieval_instance.get_pipeline_template_detail(template_id)
built_in_result: dict[str, Any] | None = retrieval_instance.get_pipeline_template_detail(template_id)
if built_in_result is None:
logger.warning(
"pipeline template retrieval returned empty result, template_id: %s, mode: %s",
@ -142,7 +142,7 @@ class RagPipelineService:
else:
mode = "customized"
retrieval_instance = PipelineTemplateRetrievalFactory.get_pipeline_template_factory(mode)()
customized_result: dict | None = retrieval_instance.get_pipeline_template_detail(template_id)
customized_result: dict[str, Any] | None = retrieval_instance.get_pipeline_template_detail(template_id)
return customized_result
@classmethod
@ -297,7 +297,7 @@ class RagPipelineService:
self,
*,
pipeline: Pipeline,
graph: dict,
graph: dict[str, Any],
unique_hash: str | None,
account: Account,
environment_variables: Sequence[VariableBase],
@ -467,7 +467,9 @@ class RagPipelineService:
return default_block_configs
def get_default_block_config(self, node_type: str, filters: dict | None = None) -> Mapping[str, object] | None:
def get_default_block_config(
self, node_type: str, filters: dict[str, Any] | None = None
) -> Mapping[str, object] | None:
"""
Get default config of node.
:param node_type: node type
@ -500,7 +502,7 @@ class RagPipelineService:
return default_config
def run_draft_workflow_node(
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account
self, pipeline: Pipeline, node_id: str, user_inputs: dict[str, Any], account: Account
) -> WorkflowNodeExecutionModel | None:
"""
Run draft workflow node
@ -582,7 +584,7 @@ class RagPipelineService:
self,
pipeline: Pipeline,
node_id: str,
user_inputs: dict,
user_inputs: dict[str, Any],
account: Account,
datasource_type: str,
is_published: bool,
@ -749,7 +751,7 @@ class RagPipelineService:
self,
pipeline: Pipeline,
node_id: str,
user_inputs: dict,
user_inputs: dict[str, Any],
account: Account,
datasource_type: str,
is_published: bool,
@ -979,7 +981,7 @@ class RagPipelineService:
return workflow_node_execution
def update_workflow(
self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict
self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict[str, Any]
) -> Workflow | None:
"""
Update workflow attributes
@ -1099,7 +1101,9 @@ class RagPipelineService:
]
return datasource_provider_variables
def get_rag_pipeline_paginate_workflow_runs(self, pipeline: Pipeline, args: dict) -> InfiniteScrollPagination:
def get_rag_pipeline_paginate_workflow_runs(
self, pipeline: Pipeline, args: dict[str, Any]
) -> InfiniteScrollPagination:
"""
Get debug workflow run list
Only return triggered_from == debugging
@ -1169,7 +1173,7 @@ class RagPipelineService:
return list(node_executions)
@classmethod
def publish_customized_pipeline_template(cls, pipeline_id: str, args: dict):
def publish_customized_pipeline_template(cls, pipeline_id: str, args: dict[str, Any]):
"""
Publish customized pipeline template
"""
@ -1259,7 +1263,7 @@ class RagPipelineService:
)
return node_exec
def set_datasource_variables(self, pipeline: Pipeline, args: dict, current_user: Account):
def set_datasource_variables(self, pipeline: Pipeline, args: dict[str, Any], current_user: Account):
"""
Set datasource variables
"""
@ -1346,7 +1350,7 @@ class RagPipelineService:
)
return workflow_node_execution_db_model
def get_recommended_plugins(self, type: str) -> dict:
def get_recommended_plugins(self, type: str) -> dict[str, Any]:
# Query active recommended plugins
stmt = select(PipelineRecommendedPlugin).where(PipelineRecommendedPlugin.active == True)
if type and type != "all":

View File

@ -241,8 +241,8 @@ class WorkflowService:
self,
*,
app_model: App,
graph: dict,
features: dict,
graph: dict[str, Any],
features: dict[str, Any],
unique_hash: str | None,
account: Account,
environment_variables: Sequence[VariableBase],
@ -576,7 +576,7 @@ class WorkflowService:
except Exception as e:
raise ValueError(f"Failed to validate default credential for tool provider {provider}: {str(e)}")
def _validate_load_balancing_credentials(self, workflow: Workflow, node_data: dict, node_id: str) -> None:
def _validate_load_balancing_credentials(self, workflow: Workflow, node_data: dict[str, Any], node_id: str) -> None:
"""
Validate load balancing credentials for a workflow node.
@ -1214,7 +1214,7 @@ class WorkflowService:
return variable_pool
def run_free_workflow_node(
self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
self, node_data: dict[str, Any], tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
) -> WorkflowNodeExecution:
"""
Run free workflow node
@ -1361,7 +1361,7 @@ class WorkflowService:
node_execution.status = WorkflowNodeExecutionStatus.FAILED
node_execution.error = error
def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
def convert_to_workflow(self, app_model: App, account: Account, args: dict[str, Any]) -> App:
"""
Basic mode of chatbot app(expert mode) to workflow
Completion App to Workflow App
@ -1421,7 +1421,7 @@ class WorkflowService:
if node_type == BuiltinNodeTypes.HUMAN_INPUT:
self._validate_human_input_node_data(node_data)
def validate_features_structure(self, app_model: App, features: dict):
def validate_features_structure(self, app_model: App, features: dict[str, Any]):
match app_model.mode:
case AppMode.ADVANCED_CHAT:
return AdvancedChatAppConfigManager.config_validate(
@ -1434,7 +1434,7 @@ class WorkflowService:
case _:
raise ValueError(f"Invalid app mode: {app_model.mode}")
def _validate_human_input_node_data(self, node_data: dict) -> None:
def _validate_human_input_node_data(self, node_data: dict[str, Any]) -> None:
"""
Validate HumanInput node data format.
@ -1452,7 +1452,7 @@ class WorkflowService:
raise ValueError(f"Invalid HumanInput node data: {str(e)}")
def update_workflow(
self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict
self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict[str, Any]
) -> Workflow | None:
"""
Update workflow attributes