diff --git a/api/controllers/trigger/trigger.py b/api/controllers/trigger/trigger.py index b7bcfffcf6..184dc10a0a 100644 --- a/api/controllers/trigger/trigger.py +++ b/api/controllers/trigger/trigger.py @@ -26,6 +26,7 @@ def trigger_endpoint(endpoint_id: str): TriggerService.process_endpoint, TriggerSubscriptionBuilderService.process_builder_validation_endpoint, ] + response = None try: for handler in handling_chain: response = handler(endpoint_id, request) diff --git a/api/core/plugin/utils/http_parser.py b/api/core/plugin/utils/http_parser.py index 47cdcadcb3..ce943929be 100644 --- a/api/core/plugin/utils/http_parser.py +++ b/api/core/plugin/utils/http_parser.py @@ -86,10 +86,14 @@ def deserialize_request(raw_data: bytes) -> Request: } if "Content-Type" in headers: - environ["CONTENT_TYPE"] = headers.get("Content-Type") + content_type = headers.get("Content-Type") + if content_type is not None: + environ["CONTENT_TYPE"] = content_type if "Content-Length" in headers: - environ["CONTENT_LENGTH"] = headers.get("Content-Length") + content_length = headers.get("Content-Length") + if content_length is not None: + environ["CONTENT_LENGTH"] = content_length elif body: environ["CONTENT_LENGTH"] = str(len(body)) diff --git a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py index 1276686cd8..fb13972e29 100644 --- a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py +++ b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py @@ -3,7 +3,7 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository. """ from collections.abc import Sequence -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Any, Optional from sqlalchemy import and_, delete, func, select, update @@ -98,7 +98,7 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): self, tenant_id: str, app_id: str, hours: int = 24, limit: int = 100, offset: int = 0 ) -> Sequence[WorkflowTriggerLog]: """Get recent trigger logs within specified hours.""" - since = datetime.utcnow() - timedelta(hours=hours) + since = datetime.now(UTC) - timedelta(hours=hours) query = ( select(WorkflowTriggerLog) @@ -189,7 +189,7 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): update_data["error"] = error_message if new_status in [WorkflowTriggerStatus.SUCCEEDED, WorkflowTriggerStatus.FAILED]: - update_data["finished_at"] = datetime.utcnow() + update_data["finished_at"] = datetime.now(UTC) result = self.session.execute( update(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(trigger_log_ids)).values(**update_data) diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index 58b75fc07f..bc30709ff9 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -131,7 +131,7 @@ class AsyncWorkflowService: trigger_log_repo.update(trigger_log) session.commit() - tenant_owner_tz = rate_limiter._get_tenant_owner_timezone(trigger_data.tenant_id) + tenant_owner_tz = rate_limiter.get_tenant_owner_timezone(trigger_data.tenant_id) remaining = rate_limiter.get_remaining_quota(trigger_data.tenant_id, dispatcher.get_daily_limit()) diff --git a/api/services/plugin/plugin_parameter_service.py b/api/services/plugin/plugin_parameter_service.py index 17fda44150..680bf051f0 100644 --- a/api/services/plugin/plugin_parameter_service.py +++ b/api/services/plugin/plugin_parameter_service.py @@ -85,15 +85,14 @@ class PluginParameterService: credential_type = db_record.credential_type case "trigger": provider_controller = TriggerManager.get_trigger_provider(tenant_id, TriggerProviderID(provider)) + subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None if credential_id: - subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None = ( + subscription = ( TriggerSubscriptionBuilderService.get_subscription_builder(credential_id) or TriggerProviderService.get_subscription_by_id(tenant_id, credential_id) ) else: - subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None = ( - TriggerProviderService.get_subscription_by_id(tenant_id) - ) + subscription = TriggerProviderService.get_subscription_by_id(tenant_id) if subscription is None: raise ValueError(f"Subscription {credential_id} not found") diff --git a/api/services/workflow/rate_limiter.py b/api/services/workflow/rate_limiter.py index 1e26f40a6a..df7eba4584 100644 --- a/api/services/workflow/rate_limiter.py +++ b/api/services/workflow/rate_limiter.py @@ -30,7 +30,7 @@ class TenantDailyRateLimiter: def __init__(self, redis_client: Union[Redis, RedisClientWrapper]): self.redis = redis_client - def _get_tenant_owner_timezone(self, tenant_id: str) -> str: + def get_tenant_owner_timezone(self, tenant_id: str) -> str: """ Get timezone of tenant owner @@ -62,7 +62,7 @@ class TenantDailyRateLimiter: Returns: Redis key for the current UTC day """ - utc_now = datetime.utcnow() + utc_now = datetime.now(UTC) date_str = utc_now.strftime("%Y-%m-%d") return f"workflow:daily_limit:{tenant_id}:{date_str}" @@ -73,7 +73,7 @@ class TenantDailyRateLimiter: Returns: Number of seconds until UTC midnight """ - utc_now = datetime.utcnow() + utc_now = datetime.now(UTC) # Get next midnight in UTC next_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min) diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 6fc96f5155..798f454b0e 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -7,6 +7,7 @@ with appropriate retry policies and error handling. import json from datetime import UTC, datetime +from typing import Any from celery import shared_task from sqlalchemy import select @@ -27,14 +28,19 @@ from services.workflow.entities import AsyncTriggerExecutionResult, AsyncTrigger # Determine queue names based on edition if dify_config.EDITION == "CLOUD": # Cloud edition: separate queues for different tiers - PROFESSIONAL_QUEUE = "workflow_professional" - TEAM_QUEUE = "workflow_team" - SANDBOX_QUEUE = "workflow_sandbox" + _professional_queue = "workflow_professional" + _team_queue = "workflow_team" + _sandbox_queue = "workflow_sandbox" else: # Community edition: single workflow queue (not dataset) - PROFESSIONAL_QUEUE = "workflow" - TEAM_QUEUE = "workflow" - SANDBOX_QUEUE = "workflow" + _professional_queue = "workflow" + _team_queue = "workflow" + _sandbox_queue = "workflow" + +# Define constants +PROFESSIONAL_QUEUE = _professional_queue +TEAM_QUEUE = _team_queue +SANDBOX_QUEUE = _sandbox_queue @shared_task(queue=PROFESSIONAL_QUEUE) @@ -112,11 +118,11 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti generator = WorkflowAppGenerator() # Prepare args matching AppGenerateService.generate format - args = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)} + args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)} # If workflow_id was specified, add it to args if trigger_data.workflow_id: - args["workflow_id"] = trigger_data.workflow_id + args["workflow_id"] = str(trigger_data.workflow_id) # Execute the workflow with the trigger type result = generator.generate( @@ -127,7 +133,6 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti invoke_from=InvokeFrom.SERVICE_API, streaming=False, call_depth=0, - workflow_thread_pool_id=None, triggered_from=trigger_data.trigger_type, root_node_id=trigger_data.root_node_id, )