diff --git a/api/commands.py b/api/commands.py index db7eddcba4..824f17d120 100644 --- a/api/commands.py +++ b/api/commands.py @@ -1875,6 +1875,7 @@ def migrate( Migrate workflow log data from PostgreSQL to Elasticsearch. """ from datetime import datetime + from extensions.ext_elasticsearch import elasticsearch as es_extension from services.elasticsearch_migration_service import ElasticsearchMigrationService diff --git a/api/core/repositories/elasticsearch_workflow_execution_repository.py b/api/core/repositories/elasticsearch_workflow_execution_repository.py index e35cf9b136..10574c43a8 100644 --- a/api/core/repositories/elasticsearch_workflow_execution_repository.py +++ b/api/core/repositories/elasticsearch_workflow_execution_repository.py @@ -5,13 +5,10 @@ This implementation stores workflow execution data in Elasticsearch for better performance and scalability compared to PostgreSQL storage. """ -import json import logging from datetime import datetime -from typing import Any, Dict, Optional, Union +from typing import Any, Optional, Union -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import NotFoundError from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker @@ -141,9 +138,9 @@ class ElasticsearchWorkflowExecutionRepository(WorkflowExecutionRepository): name=template_name, body=template_body ) - logger.info(f"Index template {template_name} created/updated successfully") + logger.info("Index template %s created/updated successfully", template_name) except Exception as e: - logger.error(f"Failed to create index template {template_name}: {e}") + logger.error("Failed to create index template %s: %s", template_name, e) raise def _serialize_complex_data(self, data: Any) -> Any: @@ -165,10 +162,10 @@ class ElasticsearchWorkflowExecutionRepository(WorkflowExecutionRepository): try: return jsonable_encoder(data) except Exception as e: - logger.warning(f"Failed to serialize complex data, using string representation: {e}") + logger.warning("Failed to serialize complex data, using string representation: %s", e) return str(data) - def _to_workflow_run_document(self, execution: WorkflowExecution) -> Dict[str, Any]: + def _to_workflow_run_document(self, execution: WorkflowExecution) -> dict[str, Any]: """ Convert WorkflowExecution domain entity to WorkflowRun-compatible document. This follows the same logic as SQLAlchemy implementation. diff --git a/api/core/repositories/elasticsearch_workflow_node_execution_repository.py b/api/core/repositories/elasticsearch_workflow_node_execution_repository.py index b23c057a9f..b69ab6020d 100644 --- a/api/core/repositories/elasticsearch_workflow_node_execution_repository.py +++ b/api/core/repositories/elasticsearch_workflow_node_execution_repository.py @@ -5,14 +5,12 @@ This implementation stores workflow node execution logs in Elasticsearch for bet performance and scalability compared to PostgreSQL storage. """ -import json import logging from collections.abc import Sequence from datetime import datetime -from typing import Any, Dict, Optional, Union +from typing import Any, Optional, Union -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import NotFoundError, RequestError +from elasticsearch.exceptions import NotFoundError from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker @@ -85,7 +83,7 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito self._creator_user_role = CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER # In-memory cache for workflow node executions - self._execution_cache: Dict[str, WorkflowNodeExecution] = {} + self._execution_cache: dict[str, WorkflowNodeExecution] = {} # Ensure index template exists self._ensure_index_template() @@ -154,9 +152,9 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito name=template_name, body=template_body ) - logger.info(f"Index template {template_name} created/updated successfully") + logger.info("Index template %s created/updated successfully", template_name) except Exception as e: - logger.error(f"Failed to create index template {template_name}: {e}") + logger.error("Failed to create index template %s: %s", template_name, e) raise def _serialize_complex_data(self, data: Any) -> Any: @@ -178,10 +176,10 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito try: return jsonable_encoder(data) except Exception as e: - logger.warning(f"Failed to serialize complex data, using string representation: {e}") + logger.warning("Failed to serialize complex data, using string representation: %s", e) return str(data) - def _to_es_document(self, execution: WorkflowNodeExecution) -> Dict[str, Any]: + def _to_es_document(self, execution: WorkflowNodeExecution) -> dict[str, Any]: """ Convert WorkflowNodeExecution domain entity to Elasticsearch document. @@ -220,7 +218,7 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito # Remove None values to reduce storage size return {k: v for k, v in doc.items() if v is not None} - def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowNodeExecution: + def _from_es_document(self, doc: dict[str, Any]) -> WorkflowNodeExecution: """ Convert Elasticsearch document to WorkflowNodeExecution domain entity. @@ -401,5 +399,5 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito return executions except Exception as e: - logger.error(f"Failed to retrieve executions for workflow run {workflow_run_id}: {e}") + logger.error("Failed to retrieve executions for workflow run %s: %s", workflow_run_id, e) raise diff --git a/api/core/workflow/adapters/workflow_execution_to_run_adapter.py b/api/core/workflow/adapters/workflow_execution_to_run_adapter.py index c1fc45c1b6..6c53349449 100644 --- a/api/core/workflow/adapters/workflow_execution_to_run_adapter.py +++ b/api/core/workflow/adapters/workflow_execution_to_run_adapter.py @@ -7,8 +7,6 @@ and the database model (WorkflowRun) that APIs expect. import json import logging -from datetime import datetime -from typing import Any, Dict, Mapping from core.workflow.entities import WorkflowExecution from core.workflow.enums import WorkflowExecutionStatus diff --git a/api/extensions/ext_elasticsearch.py b/api/extensions/ext_elasticsearch.py index beba38f280..5da50b7acf 100644 --- a/api/extensions/ext_elasticsearch.py +++ b/api/extensions/ext_elasticsearch.py @@ -72,7 +72,7 @@ class ElasticsearchExtension: self._client = None except Exception as e: - logger.error(f"Failed to initialize Elasticsearch client: {e}") + logger.error("Failed to initialize Elasticsearch client: %s", e) self._client = None # Store client in app context diff --git a/api/repositories/elasticsearch_api_workflow_run_repository.py b/api/repositories/elasticsearch_api_workflow_run_repository.py index ff1a941b8e..c93e468794 100644 --- a/api/repositories/elasticsearch_api_workflow_run_repository.py +++ b/api/repositories/elasticsearch_api_workflow_run_repository.py @@ -13,14 +13,11 @@ Key Features: - Efficient pagination and filtering """ -import json import logging from collections.abc import Sequence from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import Any, Optional -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import NotFoundError from sqlalchemy.orm import sessionmaker from libs.infinite_scroll_pagination import InfiniteScrollPagination @@ -125,12 +122,12 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): name=template_name, body=template_body ) - logger.info(f"Index template {template_name} created/updated successfully") + logger.info("Index template %s created/updated successfully", template_name) except Exception as e: - logger.error(f"Failed to create index template {template_name}: {e}") + logger.error("Failed to create index template %s: %s", template_name, e) raise - def _to_es_document(self, workflow_run: WorkflowRun) -> Dict[str, Any]: + def _to_es_document(self, workflow_run: WorkflowRun) -> dict[str, Any]: """ Convert WorkflowRun model to Elasticsearch document. @@ -166,7 +163,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): # Remove None values to reduce storage size return {k: v for k, v in doc.items() if v is not None} - def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowRun: + def _from_es_document(self, doc: dict[str, Any]) -> WorkflowRun: """ Convert Elasticsearch document to WorkflowRun model. @@ -295,7 +292,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more) except Exception as e: - logger.error(f"Failed to get paginated workflow runs: {e}") + logger.error("Failed to get paginated workflow runs: %s", e) raise def get_workflow_run_by_id( @@ -335,7 +332,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): return None except Exception as e: - logger.error(f"Failed to get workflow run {run_id}: {e}") + logger.error("Failed to get workflow run %s: %s", run_id, e) raise def get_expired_runs_batch( @@ -376,7 +373,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): return workflow_runs except Exception as e: - logger.error(f"Failed to get expired runs batch: {e}") + logger.error("Failed to get expired runs batch: %s", e) raise def delete_runs_by_ids( @@ -405,11 +402,11 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): ) deleted_count = response.get("deleted", 0) - logger.info(f"Deleted {deleted_count} workflow runs by IDs") + logger.info("Deleted %s workflow runs by IDs", deleted_count) return deleted_count except Exception as e: - logger.error(f"Failed to delete workflow runs by IDs: {e}") + logger.error("Failed to delete workflow runs by IDs: %s", e) raise def delete_runs_by_app( @@ -441,11 +438,11 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): ) deleted_count = response.get("deleted", 0) - logger.info(f"Deleted {deleted_count} workflow runs for app {app_id}") + logger.info("Deleted %s workflow runs for app %s", deleted_count, app_id) return deleted_count except Exception as e: - logger.error(f"Failed to delete workflow runs for app {app_id}: {e}") + logger.error("Failed to delete workflow runs for app %s: %s", app_id, e) raise def cleanup_old_indices(self, tenant_id: str, retention_days: int = 30) -> None: @@ -476,10 +473,10 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): if indices_to_delete: self._es_client.indices.delete(index=','.join(indices_to_delete)) - logger.info(f"Deleted old indices: {indices_to_delete}") + logger.info("Deleted old indices: %s", indices_to_delete) except Exception as e: - logger.error(f"Failed to cleanup old indices: {e}") + logger.error("Failed to cleanup old indices: %s", e) raise def search_workflow_runs( @@ -492,7 +489,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): created_at_before: datetime | None = None, limit: int = 20, offset: int = 0, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Advanced search for workflow runs with full-text search capabilities. @@ -566,5 +563,5 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository): } except Exception as e: - logger.error(f"Failed to search workflow runs: {e}") + logger.error("Failed to search workflow runs: %s", e) raise diff --git a/api/repositories/elasticsearch_workflow_app_log_repository.py b/api/repositories/elasticsearch_workflow_app_log_repository.py index 4923b8053c..f10e683856 100644 --- a/api/repositories/elasticsearch_workflow_app_log_repository.py +++ b/api/repositories/elasticsearch_workflow_app_log_repository.py @@ -5,13 +5,11 @@ This module provides Elasticsearch-based storage for WorkflowAppLog entities, offering better performance and scalability for log data management. """ -import json import logging from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import Any, Optional from elasticsearch import Elasticsearch -from elasticsearch.exceptions import NotFoundError from models.workflow import WorkflowAppLog @@ -93,12 +91,12 @@ class ElasticsearchWorkflowAppLogRepository: name=template_name, body=template_body ) - logger.info(f"Index template {template_name} created/updated successfully") + logger.info("Index template %s created/updated successfully", template_name) except Exception as e: - logger.error(f"Failed to create index template {template_name}: {e}") + logger.error("Failed to create index template %s: %s", template_name, e) raise - def _to_es_document(self, app_log: WorkflowAppLog) -> Dict[str, Any]: + def _to_es_document(self, app_log: WorkflowAppLog) -> dict[str, Any]: """ Convert WorkflowAppLog model to Elasticsearch document. @@ -120,7 +118,7 @@ class ElasticsearchWorkflowAppLogRepository: "created_at": app_log.created_at.isoformat() if app_log.created_at else None, } - def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowAppLog: + def _from_es_document(self, doc: dict[str, Any]) -> WorkflowAppLog: """ Convert Elasticsearch document to WorkflowAppLog model. @@ -207,7 +205,7 @@ class ElasticsearchWorkflowAppLogRepository: return None except Exception as e: - logger.error(f"Failed to get workflow app log {log_id}: {e}") + logger.error("Failed to get workflow app log %s: %s", log_id, e) raise def get_paginated_logs( @@ -219,7 +217,7 @@ class ElasticsearchWorkflowAppLogRepository: created_from: Optional[str] = None, limit: int = 20, offset: int = 0, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Get paginated workflow app logs with filtering. @@ -283,7 +281,7 @@ class ElasticsearchWorkflowAppLogRepository: } except Exception as e: - logger.error(f"Failed to get paginated workflow app logs: {e}") + logger.error("Failed to get paginated workflow app logs: %s", e) raise def delete_by_app(self, tenant_id: str, app_id: str) -> int: @@ -316,11 +314,11 @@ class ElasticsearchWorkflowAppLogRepository: ) deleted_count = response.get("deleted", 0) - logger.info(f"Deleted {deleted_count} workflow app logs for app {app_id}") + logger.info("Deleted %s workflow app logs for app %s", deleted_count, app_id) return deleted_count except Exception as e: - logger.error(f"Failed to delete workflow app logs for app {app_id}: {e}") + logger.error("Failed to delete workflow app logs for app %s: %s", app_id, e) raise def delete_expired_logs(self, tenant_id: str, before_date: datetime) -> int: @@ -353,11 +351,11 @@ class ElasticsearchWorkflowAppLogRepository: ) deleted_count = response.get("deleted", 0) - logger.info(f"Deleted {deleted_count} expired workflow app logs for tenant {tenant_id}") + logger.info("Deleted %s expired workflow app logs for tenant %s", deleted_count, tenant_id) return deleted_count except Exception as e: - logger.error(f"Failed to delete expired workflow app logs: {e}") + logger.error("Failed to delete expired workflow app logs: %s", e) raise def cleanup_old_indices(self, tenant_id: str, retention_days: int = 30) -> None: @@ -388,8 +386,8 @@ class ElasticsearchWorkflowAppLogRepository: if indices_to_delete: self._es_client.indices.delete(index=','.join(indices_to_delete)) - logger.info(f"Deleted old indices: {indices_to_delete}") + logger.info("Deleted old indices: %s", indices_to_delete) except Exception as e: - logger.error(f"Failed to cleanup old indices: {e}") + logger.error("Failed to cleanup old indices: %s", e) raise diff --git a/api/services/elasticsearch_migration_service.py b/api/services/elasticsearch_migration_service.py index ebd6008f4b..5e727eee25 100644 --- a/api/services/elasticsearch_migration_service.py +++ b/api/services/elasticsearch_migration_service.py @@ -7,14 +7,13 @@ to Elasticsearch, including data validation, progress tracking, and rollback cap import json import logging -from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from datetime import datetime +from typing import Any, Optional from elasticsearch import Elasticsearch from sqlalchemy import select -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker -from configs import dify_config from extensions.ext_database import db from extensions.ext_elasticsearch import elasticsearch from models.workflow import ( @@ -66,7 +65,7 @@ class ElasticsearchMigrationService: start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dry_run: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Migrate WorkflowRun data from PostgreSQL to Elasticsearch. @@ -157,7 +156,7 @@ class ElasticsearchMigrationService: start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dry_run: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Migrate WorkflowAppLog data from PostgreSQL to Elasticsearch. @@ -248,7 +247,7 @@ class ElasticsearchMigrationService: start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dry_run: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Migrate WorkflowNodeExecution data from PostgreSQL to Elasticsearch. @@ -355,7 +354,7 @@ class ElasticsearchMigrationService: return stats - def _convert_node_execution_to_es_doc(self, node_execution: WorkflowNodeExecutionModel) -> Dict[str, Any]: + def _convert_node_execution_to_es_doc(self, node_execution: WorkflowNodeExecutionModel) -> dict[str, Any]: """ Convert WorkflowNodeExecutionModel to Elasticsearch document format. @@ -415,7 +414,7 @@ class ElasticsearchMigrationService: # Remove None values to reduce storage size return {k: v for k, v in doc.items() if v is not None} - def validate_migration(self, tenant_id: str, sample_size: int = 100) -> Dict[str, Any]: + def validate_migration(self, tenant_id: str, sample_size: int = 100) -> dict[str, Any]: """ Validate migrated data by comparing samples from PostgreSQL and Elasticsearch. @@ -426,7 +425,7 @@ class ElasticsearchMigrationService: Returns: Validation results and statistics """ - logger.info(f"Starting migration validation for tenant {tenant_id}") + logger.info("Starting migration validation for tenant %s", tenant_id) validation_results = { "workflow_runs": {"total": 0, "matched": 0, "mismatched": 0, "missing": 0}, @@ -492,7 +491,7 @@ class ElasticsearchMigrationService: logger.error(error_msg) validation_results["errors"].append(error_msg) - logger.info(f"Migration validation completed for tenant {tenant_id}") + logger.info("Migration validation completed for tenant %s", tenant_id) return validation_results def _compare_workflow_runs(self, pg_run: WorkflowRun, es_run: WorkflowRun) -> bool: @@ -517,7 +516,7 @@ class ElasticsearchMigrationService: tenant_id: str, before_date: datetime, dry_run: bool = True, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Clean up old PostgreSQL data after successful migration to Elasticsearch. @@ -529,7 +528,7 @@ class ElasticsearchMigrationService: Returns: Cleanup statistics """ - logger.info(f"Starting PostgreSQL data cleanup for tenant {tenant_id}") + logger.info("Starting PostgreSQL data cleanup for tenant %s", tenant_id) stats = { "workflow_runs_deleted": 0,