mirror of https://github.com/langgenius/dify.git
refactor(api): enhance request handling and time management
- Initialized `response` variable in `trigger.py` to ensure proper handling in the trigger endpoint. - Updated `http_parser.py` to conditionally set `CONTENT_TYPE` and `CONTENT_LENGTH` headers for improved robustness. - Changed `datetime.utcnow()` to `datetime.now(UTC)` in `sqlalchemy_workflow_trigger_log_repository.py` and `rate_limiter.py` for consistent time zone handling. - Refactored `async_workflow_service.py` to use the public method `get_tenant_owner_timezone` for better encapsulation. - Simplified subscription retrieval logic in `plugin_parameter_service.py` for clarity. These changes improve code reliability and maintainability while ensuring accurate time management and request processing.
This commit is contained in:
parent
452588dded
commit
d5ff89f6d3
|
|
@ -26,6 +26,7 @@ def trigger_endpoint(endpoint_id: str):
|
|||
TriggerService.process_endpoint,
|
||||
TriggerSubscriptionBuilderService.process_builder_validation_endpoint,
|
||||
]
|
||||
response = None
|
||||
try:
|
||||
for handler in handling_chain:
|
||||
response = handler(endpoint_id, request)
|
||||
|
|
|
|||
|
|
@ -86,10 +86,14 @@ def deserialize_request(raw_data: bytes) -> Request:
|
|||
}
|
||||
|
||||
if "Content-Type" in headers:
|
||||
environ["CONTENT_TYPE"] = headers.get("Content-Type")
|
||||
content_type = headers.get("Content-Type")
|
||||
if content_type is not None:
|
||||
environ["CONTENT_TYPE"] = content_type
|
||||
|
||||
if "Content-Length" in headers:
|
||||
environ["CONTENT_LENGTH"] = headers.get("Content-Length")
|
||||
content_length = headers.get("Content-Length")
|
||||
if content_length is not None:
|
||||
environ["CONTENT_LENGTH"] = content_length
|
||||
elif body:
|
||||
environ["CONTENT_LENGTH"] = str(len(body))
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository.
|
|||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy import and_, delete, func, select, update
|
||||
|
|
@ -98,7 +98,7 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
|
|||
self, tenant_id: str, app_id: str, hours: int = 24, limit: int = 100, offset: int = 0
|
||||
) -> Sequence[WorkflowTriggerLog]:
|
||||
"""Get recent trigger logs within specified hours."""
|
||||
since = datetime.utcnow() - timedelta(hours=hours)
|
||||
since = datetime.now(UTC) - timedelta(hours=hours)
|
||||
|
||||
query = (
|
||||
select(WorkflowTriggerLog)
|
||||
|
|
@ -189,7 +189,7 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
|
|||
update_data["error"] = error_message
|
||||
|
||||
if new_status in [WorkflowTriggerStatus.SUCCEEDED, WorkflowTriggerStatus.FAILED]:
|
||||
update_data["finished_at"] = datetime.utcnow()
|
||||
update_data["finished_at"] = datetime.now(UTC)
|
||||
|
||||
result = self.session.execute(
|
||||
update(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(trigger_log_ids)).values(**update_data)
|
||||
|
|
|
|||
|
|
@ -131,7 +131,7 @@ class AsyncWorkflowService:
|
|||
trigger_log_repo.update(trigger_log)
|
||||
session.commit()
|
||||
|
||||
tenant_owner_tz = rate_limiter._get_tenant_owner_timezone(trigger_data.tenant_id)
|
||||
tenant_owner_tz = rate_limiter.get_tenant_owner_timezone(trigger_data.tenant_id)
|
||||
|
||||
remaining = rate_limiter.get_remaining_quota(trigger_data.tenant_id, dispatcher.get_daily_limit())
|
||||
|
||||
|
|
|
|||
|
|
@ -85,15 +85,14 @@ class PluginParameterService:
|
|||
credential_type = db_record.credential_type
|
||||
case "trigger":
|
||||
provider_controller = TriggerManager.get_trigger_provider(tenant_id, TriggerProviderID(provider))
|
||||
subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None
|
||||
if credential_id:
|
||||
subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None = (
|
||||
subscription = (
|
||||
TriggerSubscriptionBuilderService.get_subscription_builder(credential_id)
|
||||
or TriggerProviderService.get_subscription_by_id(tenant_id, credential_id)
|
||||
)
|
||||
else:
|
||||
subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None = (
|
||||
TriggerProviderService.get_subscription_by_id(tenant_id)
|
||||
)
|
||||
subscription = TriggerProviderService.get_subscription_by_id(tenant_id)
|
||||
|
||||
if subscription is None:
|
||||
raise ValueError(f"Subscription {credential_id} not found")
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class TenantDailyRateLimiter:
|
|||
def __init__(self, redis_client: Union[Redis, RedisClientWrapper]):
|
||||
self.redis = redis_client
|
||||
|
||||
def _get_tenant_owner_timezone(self, tenant_id: str) -> str:
|
||||
def get_tenant_owner_timezone(self, tenant_id: str) -> str:
|
||||
"""
|
||||
Get timezone of tenant owner
|
||||
|
||||
|
|
@ -62,7 +62,7 @@ class TenantDailyRateLimiter:
|
|||
Returns:
|
||||
Redis key for the current UTC day
|
||||
"""
|
||||
utc_now = datetime.utcnow()
|
||||
utc_now = datetime.now(UTC)
|
||||
date_str = utc_now.strftime("%Y-%m-%d")
|
||||
return f"workflow:daily_limit:{tenant_id}:{date_str}"
|
||||
|
||||
|
|
@ -73,7 +73,7 @@ class TenantDailyRateLimiter:
|
|||
Returns:
|
||||
Number of seconds until UTC midnight
|
||||
"""
|
||||
utc_now = datetime.utcnow()
|
||||
utc_now = datetime.now(UTC)
|
||||
|
||||
# Get next midnight in UTC
|
||||
next_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ with appropriate retry policies and error handling.
|
|||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from celery import shared_task
|
||||
from sqlalchemy import select
|
||||
|
|
@ -27,14 +28,19 @@ from services.workflow.entities import AsyncTriggerExecutionResult, AsyncTrigger
|
|||
# Determine queue names based on edition
|
||||
if dify_config.EDITION == "CLOUD":
|
||||
# Cloud edition: separate queues for different tiers
|
||||
PROFESSIONAL_QUEUE = "workflow_professional"
|
||||
TEAM_QUEUE = "workflow_team"
|
||||
SANDBOX_QUEUE = "workflow_sandbox"
|
||||
_professional_queue = "workflow_professional"
|
||||
_team_queue = "workflow_team"
|
||||
_sandbox_queue = "workflow_sandbox"
|
||||
else:
|
||||
# Community edition: single workflow queue (not dataset)
|
||||
PROFESSIONAL_QUEUE = "workflow"
|
||||
TEAM_QUEUE = "workflow"
|
||||
SANDBOX_QUEUE = "workflow"
|
||||
_professional_queue = "workflow"
|
||||
_team_queue = "workflow"
|
||||
_sandbox_queue = "workflow"
|
||||
|
||||
# Define constants
|
||||
PROFESSIONAL_QUEUE = _professional_queue
|
||||
TEAM_QUEUE = _team_queue
|
||||
SANDBOX_QUEUE = _sandbox_queue
|
||||
|
||||
|
||||
@shared_task(queue=PROFESSIONAL_QUEUE)
|
||||
|
|
@ -112,11 +118,11 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti
|
|||
generator = WorkflowAppGenerator()
|
||||
|
||||
# Prepare args matching AppGenerateService.generate format
|
||||
args = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)}
|
||||
args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)}
|
||||
|
||||
# If workflow_id was specified, add it to args
|
||||
if trigger_data.workflow_id:
|
||||
args["workflow_id"] = trigger_data.workflow_id
|
||||
args["workflow_id"] = str(trigger_data.workflow_id)
|
||||
|
||||
# Execute the workflow with the trigger type
|
||||
result = generator.generate(
|
||||
|
|
@ -127,7 +133,6 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti
|
|||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
call_depth=0,
|
||||
workflow_thread_pool_id=None,
|
||||
triggered_from=trigger_data.trigger_type,
|
||||
root_node_id=trigger_data.root_node_id,
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue