dify/api/docs/elasticsearch_migration.md

298 lines
7.5 KiB
Markdown

# Elasticsearch Migration Guide
This guide explains how to migrate workflow log data from PostgreSQL to Elasticsearch for better performance and scalability.
## Overview
The Elasticsearch integration provides:
- **High-performance log storage**: Better suited for time-series log data
- **Advanced search capabilities**: Full-text search and complex queries
- **Scalability**: Horizontal scaling for large datasets
- **Time-series optimization**: Date-based index rotation for efficient storage
- **Multi-tenant isolation**: Separate indices per tenant for data isolation
## Architecture
The migration involves four main log tables:
1. **workflow_runs**: Core workflow execution records
2. **workflow_app_logs**: Application-level workflow logs
3. **workflow_node_executions**: Individual node execution records
4. **workflow_node_execution_offload**: Large data offloaded to storage
## Configuration
### Environment Variables
Add the following to your `.env` file:
```bash
# Enable Elasticsearch
ELASTICSEARCH_ENABLED=true
# Elasticsearch connection
ELASTICSEARCH_HOSTS=["http://localhost:9200"]
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=your_password
# SSL configuration (optional)
ELASTICSEARCH_USE_SSL=false
ELASTICSEARCH_VERIFY_CERTS=true
ELASTICSEARCH_CA_CERTS=/path/to/ca.crt
# Performance settings
ELASTICSEARCH_TIMEOUT=30
ELASTICSEARCH_MAX_RETRIES=3
ELASTICSEARCH_INDEX_PREFIX=dify
ELASTICSEARCH_RETENTION_DAYS=30
```
### Repository Configuration
Update your configuration to use Elasticsearch repositories:
```bash
# Core repositories
CORE_WORKFLOW_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_execution_repository.ElasticsearchWorkflowExecutionRepository
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_node_execution_repository.ElasticsearchWorkflowNodeExecutionRepository
# API repositories
API_WORKFLOW_RUN_REPOSITORY=repositories.elasticsearch_api_workflow_run_repository.ElasticsearchAPIWorkflowRunRepository
```
## Migration Process
### 1. Setup Elasticsearch
First, ensure Elasticsearch is running and accessible:
```bash
# Check Elasticsearch status
curl -X GET "localhost:9200/_cluster/health?pretty"
```
### 2. Test Configuration
Verify your Dify configuration:
```bash
# Check Elasticsearch connection
flask elasticsearch status
```
### 3. Dry Run Migration
Perform a dry run to estimate migration scope:
```bash
# Dry run for all data
flask elasticsearch migrate --dry-run
# Dry run for specific tenant
flask elasticsearch migrate --tenant-id tenant-123 --dry-run
# Dry run for date range
flask elasticsearch migrate --start-date 2024-01-01 --end-date 2024-01-31 --dry-run
```
### 4. Incremental Migration
Start with recent data and work backwards:
```bash
# Migrate last 7 days
flask elasticsearch migrate --start-date $(date -d '7 days ago' +%Y-%m-%d)
# Migrate specific data types
flask elasticsearch migrate --data-type workflow_runs
flask elasticsearch migrate --data-type app_logs
flask elasticsearch migrate --data-type node_executions
```
### 5. Full Migration
Migrate all historical data:
```bash
# Migrate all data (use appropriate batch size)
flask elasticsearch migrate --batch-size 500
# Migrate specific tenant
flask elasticsearch migrate --tenant-id tenant-123
```
### 6. Validation
Validate the migrated data:
```bash
# Validate migration for tenant
flask elasticsearch validate --tenant-id tenant-123 --sample-size 1000
```
### 7. Switch Configuration
Once validation passes, update your configuration to use Elasticsearch repositories and restart the application.
### 8. Cleanup (Optional)
After successful migration and validation, clean up old PostgreSQL data:
```bash
# Dry run cleanup
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01 --dry-run
# Actual cleanup (CAUTION: This cannot be undone)
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01
```
## Index Management
### Index Structure
Elasticsearch indices are organized as:
- `dify-workflow-runs-{tenant_id}-{YYYY.MM}`
- `dify-workflow-app-logs-{tenant_id}-{YYYY.MM}`
- `dify-workflow-node-executions-{tenant_id}-{YYYY.MM}`
### Retention Policy
Configure automatic cleanup of old indices:
```python
# In your scheduled tasks
from services.elasticsearch_migration_service import ElasticsearchMigrationService
migration_service = ElasticsearchMigrationService()
# Clean up indices older than 30 days
for tenant_id in get_all_tenant_ids():
migration_service._workflow_run_repo.cleanup_old_indices(tenant_id, retention_days=30)
migration_service._app_log_repo.cleanup_old_indices(tenant_id, retention_days=30)
```
## Performance Tuning
### Elasticsearch Settings
Optimize Elasticsearch for log data:
```json
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.refresh_interval": "30s",
"index.mapping.total_fields.limit": 2000
}
}
```
### Batch Processing
Adjust batch sizes based on your system:
```bash
# Smaller batches for limited memory
flask elasticsearch migrate --batch-size 100
# Larger batches for high-performance systems
flask elasticsearch migrate --batch-size 5000
```
## Monitoring
### Check Migration Progress
```bash
# Monitor Elasticsearch status
flask elasticsearch status
# Check specific tenant indices
flask elasticsearch status --tenant-id tenant-123
```
### Query Performance
Monitor query performance in your application logs and Elasticsearch slow query logs.
## Troubleshooting
### Common Issues
1. **Connection Timeout**
- Increase `ELASTICSEARCH_TIMEOUT`
- Check network connectivity
- Verify Elasticsearch is running
2. **Memory Issues**
- Reduce batch size
- Increase JVM heap size for Elasticsearch
- Process data in smaller date ranges
3. **Index Template Conflicts**
- Delete existing templates: `DELETE _index_template/dify-*-template`
- Restart migration
4. **Data Validation Failures**
- Check Elasticsearch logs for indexing errors
- Verify data integrity in PostgreSQL
- Re-run migration for failed records
### Recovery
If migration fails:
1. Check logs for specific errors
2. Fix configuration issues
3. Resume migration from last successful point
4. Use date ranges to process data incrementally
## Best Practices
1. **Test First**: Always run dry runs and validate on staging
2. **Incremental Migration**: Start with recent data, migrate incrementally
3. **Monitor Resources**: Watch CPU, memory, and disk usage during migration
4. **Backup**: Ensure PostgreSQL backups before cleanup
5. **Gradual Rollout**: Switch tenants to Elasticsearch gradually
6. **Index Lifecycle**: Implement proper index rotation and cleanup
## Example Migration Script
```bash
#!/bin/bash
# Complete migration workflow
TENANT_ID="tenant-123"
START_DATE="2024-01-01"
echo "Starting Elasticsearch migration for $TENANT_ID"
# 1. Dry run
echo "Performing dry run..."
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --dry-run
# 2. Migrate data
echo "Migrating data..."
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --batch-size 1000
# 3. Validate
echo "Validating migration..."
flask elasticsearch validate --tenant-id $TENANT_ID --sample-size 500
# 4. Check status
echo "Checking status..."
flask elasticsearch status --tenant-id $TENANT_ID
echo "Migration completed for $TENANT_ID"
```
## Support
For issues or questions:
1. Check application logs for detailed error messages
2. Review Elasticsearch cluster logs
3. Verify configuration settings
4. Test with smaller datasets first