This commit is contained in:
Yansong Zhang 2025-10-10 16:26:05 +08:00
parent bac7da83f5
commit d52d80681e
8 changed files with 58 additions and 70 deletions

View File

@ -1875,6 +1875,7 @@ def migrate(
Migrate workflow log data from PostgreSQL to Elasticsearch.
"""
from datetime import datetime
from extensions.ext_elasticsearch import elasticsearch as es_extension
from services.elasticsearch_migration_service import ElasticsearchMigrationService

View File

@ -5,13 +5,10 @@ This implementation stores workflow execution data in Elasticsearch for better
performance and scalability compared to PostgreSQL storage.
"""
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
@ -141,9 +138,9 @@ class ElasticsearchWorkflowExecutionRepository(WorkflowExecutionRepository):
name=template_name,
body=template_body
)
logger.info(f"Index template {template_name} created/updated successfully")
logger.info("Index template %s created/updated successfully", template_name)
except Exception as e:
logger.error(f"Failed to create index template {template_name}: {e}")
logger.error("Failed to create index template %s: %s", template_name, e)
raise
def _serialize_complex_data(self, data: Any) -> Any:
@ -165,10 +162,10 @@ class ElasticsearchWorkflowExecutionRepository(WorkflowExecutionRepository):
try:
return jsonable_encoder(data)
except Exception as e:
logger.warning(f"Failed to serialize complex data, using string representation: {e}")
logger.warning("Failed to serialize complex data, using string representation: %s", e)
return str(data)
def _to_workflow_run_document(self, execution: WorkflowExecution) -> Dict[str, Any]:
def _to_workflow_run_document(self, execution: WorkflowExecution) -> dict[str, Any]:
"""
Convert WorkflowExecution domain entity to WorkflowRun-compatible document.
This follows the same logic as SQLAlchemy implementation.

View File

@ -5,14 +5,12 @@ This implementation stores workflow node execution logs in Elasticsearch for bet
performance and scalability compared to PostgreSQL storage.
"""
import json
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.exceptions import NotFoundError
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
@ -85,7 +83,7 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito
self._creator_user_role = CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER
# In-memory cache for workflow node executions
self._execution_cache: Dict[str, WorkflowNodeExecution] = {}
self._execution_cache: dict[str, WorkflowNodeExecution] = {}
# Ensure index template exists
self._ensure_index_template()
@ -154,9 +152,9 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito
name=template_name,
body=template_body
)
logger.info(f"Index template {template_name} created/updated successfully")
logger.info("Index template %s created/updated successfully", template_name)
except Exception as e:
logger.error(f"Failed to create index template {template_name}: {e}")
logger.error("Failed to create index template %s: %s", template_name, e)
raise
def _serialize_complex_data(self, data: Any) -> Any:
@ -178,10 +176,10 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito
try:
return jsonable_encoder(data)
except Exception as e:
logger.warning(f"Failed to serialize complex data, using string representation: {e}")
logger.warning("Failed to serialize complex data, using string representation: %s", e)
return str(data)
def _to_es_document(self, execution: WorkflowNodeExecution) -> Dict[str, Any]:
def _to_es_document(self, execution: WorkflowNodeExecution) -> dict[str, Any]:
"""
Convert WorkflowNodeExecution domain entity to Elasticsearch document.
@ -220,7 +218,7 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito
# Remove None values to reduce storage size
return {k: v for k, v in doc.items() if v is not None}
def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowNodeExecution:
def _from_es_document(self, doc: dict[str, Any]) -> WorkflowNodeExecution:
"""
Convert Elasticsearch document to WorkflowNodeExecution domain entity.
@ -401,5 +399,5 @@ class ElasticsearchWorkflowNodeExecutionRepository(WorkflowNodeExecutionReposito
return executions
except Exception as e:
logger.error(f"Failed to retrieve executions for workflow run {workflow_run_id}: {e}")
logger.error("Failed to retrieve executions for workflow run %s: %s", workflow_run_id, e)
raise

View File

@ -7,8 +7,6 @@ and the database model (WorkflowRun) that APIs expect.
import json
import logging
from datetime import datetime
from typing import Any, Dict, Mapping
from core.workflow.entities import WorkflowExecution
from core.workflow.enums import WorkflowExecutionStatus

View File

@ -72,7 +72,7 @@ class ElasticsearchExtension:
self._client = None
except Exception as e:
logger.error(f"Failed to initialize Elasticsearch client: {e}")
logger.error("Failed to initialize Elasticsearch client: %s", e)
self._client = None
# Store client in app context

View File

@ -13,14 +13,11 @@ Key Features:
- Efficient pagination and filtering
"""
import json
import logging
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import Any, Optional
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from sqlalchemy.orm import sessionmaker
from libs.infinite_scroll_pagination import InfiniteScrollPagination
@ -125,12 +122,12 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
name=template_name,
body=template_body
)
logger.info(f"Index template {template_name} created/updated successfully")
logger.info("Index template %s created/updated successfully", template_name)
except Exception as e:
logger.error(f"Failed to create index template {template_name}: {e}")
logger.error("Failed to create index template %s: %s", template_name, e)
raise
def _to_es_document(self, workflow_run: WorkflowRun) -> Dict[str, Any]:
def _to_es_document(self, workflow_run: WorkflowRun) -> dict[str, Any]:
"""
Convert WorkflowRun model to Elasticsearch document.
@ -166,7 +163,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
# Remove None values to reduce storage size
return {k: v for k, v in doc.items() if v is not None}
def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowRun:
def _from_es_document(self, doc: dict[str, Any]) -> WorkflowRun:
"""
Convert Elasticsearch document to WorkflowRun model.
@ -295,7 +292,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
except Exception as e:
logger.error(f"Failed to get paginated workflow runs: {e}")
logger.error("Failed to get paginated workflow runs: %s", e)
raise
def get_workflow_run_by_id(
@ -335,7 +332,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
return None
except Exception as e:
logger.error(f"Failed to get workflow run {run_id}: {e}")
logger.error("Failed to get workflow run %s: %s", run_id, e)
raise
def get_expired_runs_batch(
@ -376,7 +373,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
return workflow_runs
except Exception as e:
logger.error(f"Failed to get expired runs batch: {e}")
logger.error("Failed to get expired runs batch: %s", e)
raise
def delete_runs_by_ids(
@ -405,11 +402,11 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
)
deleted_count = response.get("deleted", 0)
logger.info(f"Deleted {deleted_count} workflow runs by IDs")
logger.info("Deleted %s workflow runs by IDs", deleted_count)
return deleted_count
except Exception as e:
logger.error(f"Failed to delete workflow runs by IDs: {e}")
logger.error("Failed to delete workflow runs by IDs: %s", e)
raise
def delete_runs_by_app(
@ -441,11 +438,11 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
)
deleted_count = response.get("deleted", 0)
logger.info(f"Deleted {deleted_count} workflow runs for app {app_id}")
logger.info("Deleted %s workflow runs for app %s", deleted_count, app_id)
return deleted_count
except Exception as e:
logger.error(f"Failed to delete workflow runs for app {app_id}: {e}")
logger.error("Failed to delete workflow runs for app %s: %s", app_id, e)
raise
def cleanup_old_indices(self, tenant_id: str, retention_days: int = 30) -> None:
@ -476,10 +473,10 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
if indices_to_delete:
self._es_client.indices.delete(index=','.join(indices_to_delete))
logger.info(f"Deleted old indices: {indices_to_delete}")
logger.info("Deleted old indices: %s", indices_to_delete)
except Exception as e:
logger.error(f"Failed to cleanup old indices: {e}")
logger.error("Failed to cleanup old indices: %s", e)
raise
def search_workflow_runs(
@ -492,7 +489,7 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
created_at_before: datetime | None = None,
limit: int = 20,
offset: int = 0,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Advanced search for workflow runs with full-text search capabilities.
@ -566,5 +563,5 @@ class ElasticsearchAPIWorkflowRunRepository(APIWorkflowRunRepository):
}
except Exception as e:
logger.error(f"Failed to search workflow runs: {e}")
logger.error("Failed to search workflow runs: %s", e)
raise

View File

@ -5,13 +5,11 @@ This module provides Elasticsearch-based storage for WorkflowAppLog entities,
offering better performance and scalability for log data management.
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import Any, Optional
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from models.workflow import WorkflowAppLog
@ -93,12 +91,12 @@ class ElasticsearchWorkflowAppLogRepository:
name=template_name,
body=template_body
)
logger.info(f"Index template {template_name} created/updated successfully")
logger.info("Index template %s created/updated successfully", template_name)
except Exception as e:
logger.error(f"Failed to create index template {template_name}: {e}")
logger.error("Failed to create index template %s: %s", template_name, e)
raise
def _to_es_document(self, app_log: WorkflowAppLog) -> Dict[str, Any]:
def _to_es_document(self, app_log: WorkflowAppLog) -> dict[str, Any]:
"""
Convert WorkflowAppLog model to Elasticsearch document.
@ -120,7 +118,7 @@ class ElasticsearchWorkflowAppLogRepository:
"created_at": app_log.created_at.isoformat() if app_log.created_at else None,
}
def _from_es_document(self, doc: Dict[str, Any]) -> WorkflowAppLog:
def _from_es_document(self, doc: dict[str, Any]) -> WorkflowAppLog:
"""
Convert Elasticsearch document to WorkflowAppLog model.
@ -207,7 +205,7 @@ class ElasticsearchWorkflowAppLogRepository:
return None
except Exception as e:
logger.error(f"Failed to get workflow app log {log_id}: {e}")
logger.error("Failed to get workflow app log %s: %s", log_id, e)
raise
def get_paginated_logs(
@ -219,7 +217,7 @@ class ElasticsearchWorkflowAppLogRepository:
created_from: Optional[str] = None,
limit: int = 20,
offset: int = 0,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Get paginated workflow app logs with filtering.
@ -283,7 +281,7 @@ class ElasticsearchWorkflowAppLogRepository:
}
except Exception as e:
logger.error(f"Failed to get paginated workflow app logs: {e}")
logger.error("Failed to get paginated workflow app logs: %s", e)
raise
def delete_by_app(self, tenant_id: str, app_id: str) -> int:
@ -316,11 +314,11 @@ class ElasticsearchWorkflowAppLogRepository:
)
deleted_count = response.get("deleted", 0)
logger.info(f"Deleted {deleted_count} workflow app logs for app {app_id}")
logger.info("Deleted %s workflow app logs for app %s", deleted_count, app_id)
return deleted_count
except Exception as e:
logger.error(f"Failed to delete workflow app logs for app {app_id}: {e}")
logger.error("Failed to delete workflow app logs for app %s: %s", app_id, e)
raise
def delete_expired_logs(self, tenant_id: str, before_date: datetime) -> int:
@ -353,11 +351,11 @@ class ElasticsearchWorkflowAppLogRepository:
)
deleted_count = response.get("deleted", 0)
logger.info(f"Deleted {deleted_count} expired workflow app logs for tenant {tenant_id}")
logger.info("Deleted %s expired workflow app logs for tenant %s", deleted_count, tenant_id)
return deleted_count
except Exception as e:
logger.error(f"Failed to delete expired workflow app logs: {e}")
logger.error("Failed to delete expired workflow app logs: %s", e)
raise
def cleanup_old_indices(self, tenant_id: str, retention_days: int = 30) -> None:
@ -388,8 +386,8 @@ class ElasticsearchWorkflowAppLogRepository:
if indices_to_delete:
self._es_client.indices.delete(index=','.join(indices_to_delete))
logger.info(f"Deleted old indices: {indices_to_delete}")
logger.info("Deleted old indices: %s", indices_to_delete)
except Exception as e:
logger.error(f"Failed to cleanup old indices: {e}")
logger.error("Failed to cleanup old indices: %s", e)
raise

View File

@ -7,14 +7,13 @@ to Elasticsearch, including data validation, progress tracking, and rollback cap
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from datetime import datetime
from typing import Any, Optional
from elasticsearch import Elasticsearch
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_elasticsearch import elasticsearch
from models.workflow import (
@ -66,7 +65,7 @@ class ElasticsearchMigrationService:
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Migrate WorkflowRun data from PostgreSQL to Elasticsearch.
@ -157,7 +156,7 @@ class ElasticsearchMigrationService:
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Migrate WorkflowAppLog data from PostgreSQL to Elasticsearch.
@ -248,7 +247,7 @@ class ElasticsearchMigrationService:
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Migrate WorkflowNodeExecution data from PostgreSQL to Elasticsearch.
@ -355,7 +354,7 @@ class ElasticsearchMigrationService:
return stats
def _convert_node_execution_to_es_doc(self, node_execution: WorkflowNodeExecutionModel) -> Dict[str, Any]:
def _convert_node_execution_to_es_doc(self, node_execution: WorkflowNodeExecutionModel) -> dict[str, Any]:
"""
Convert WorkflowNodeExecutionModel to Elasticsearch document format.
@ -415,7 +414,7 @@ class ElasticsearchMigrationService:
# Remove None values to reduce storage size
return {k: v for k, v in doc.items() if v is not None}
def validate_migration(self, tenant_id: str, sample_size: int = 100) -> Dict[str, Any]:
def validate_migration(self, tenant_id: str, sample_size: int = 100) -> dict[str, Any]:
"""
Validate migrated data by comparing samples from PostgreSQL and Elasticsearch.
@ -426,7 +425,7 @@ class ElasticsearchMigrationService:
Returns:
Validation results and statistics
"""
logger.info(f"Starting migration validation for tenant {tenant_id}")
logger.info("Starting migration validation for tenant %s", tenant_id)
validation_results = {
"workflow_runs": {"total": 0, "matched": 0, "mismatched": 0, "missing": 0},
@ -492,7 +491,7 @@ class ElasticsearchMigrationService:
logger.error(error_msg)
validation_results["errors"].append(error_msg)
logger.info(f"Migration validation completed for tenant {tenant_id}")
logger.info("Migration validation completed for tenant %s", tenant_id)
return validation_results
def _compare_workflow_runs(self, pg_run: WorkflowRun, es_run: WorkflowRun) -> bool:
@ -517,7 +516,7 @@ class ElasticsearchMigrationService:
tenant_id: str,
before_date: datetime,
dry_run: bool = True,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Clean up old PostgreSQL data after successful migration to Elasticsearch.
@ -529,7 +528,7 @@ class ElasticsearchMigrationService:
Returns:
Cleanup statistics
"""
logger.info(f"Starting PostgreSQL data cleanup for tenant {tenant_id}")
logger.info("Starting PostgreSQL data cleanup for tenant %s", tenant_id)
stats = {
"workflow_runs_deleted": 0,