mirror of https://github.com/langgenius/dify.git
298 lines
7.5 KiB
Markdown
298 lines
7.5 KiB
Markdown
# Elasticsearch Migration Guide
|
|
|
|
This guide explains how to migrate workflow log data from PostgreSQL to Elasticsearch for better performance and scalability.
|
|
|
|
## Overview
|
|
|
|
The Elasticsearch integration provides:
|
|
|
|
- **High-performance log storage**: Better suited for time-series log data
|
|
- **Advanced search capabilities**: Full-text search and complex queries
|
|
- **Scalability**: Horizontal scaling for large datasets
|
|
- **Time-series optimization**: Date-based index rotation for efficient storage
|
|
- **Multi-tenant isolation**: Separate indices per tenant for data isolation
|
|
|
|
## Architecture
|
|
|
|
The migration involves four main log tables:
|
|
|
|
1. **workflow_runs**: Core workflow execution records
|
|
2. **workflow_app_logs**: Application-level workflow logs
|
|
3. **workflow_node_executions**: Individual node execution records
|
|
4. **workflow_node_execution_offload**: Large data offloaded to storage
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables
|
|
|
|
Add the following to your `.env` file:
|
|
|
|
```bash
|
|
# Enable Elasticsearch
|
|
ELASTICSEARCH_ENABLED=true
|
|
|
|
# Elasticsearch connection
|
|
ELASTICSEARCH_HOSTS=["http://localhost:9200"]
|
|
ELASTICSEARCH_USERNAME=elastic
|
|
ELASTICSEARCH_PASSWORD=your_password
|
|
|
|
# SSL configuration (optional)
|
|
ELASTICSEARCH_USE_SSL=false
|
|
ELASTICSEARCH_VERIFY_CERTS=true
|
|
ELASTICSEARCH_CA_CERTS=/path/to/ca.crt
|
|
|
|
# Performance settings
|
|
ELASTICSEARCH_TIMEOUT=30
|
|
ELASTICSEARCH_MAX_RETRIES=3
|
|
ELASTICSEARCH_INDEX_PREFIX=dify
|
|
ELASTICSEARCH_RETENTION_DAYS=30
|
|
```
|
|
|
|
### Repository Configuration
|
|
|
|
Update your configuration to use Elasticsearch repositories:
|
|
|
|
```bash
|
|
# Core repositories
|
|
CORE_WORKFLOW_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_execution_repository.ElasticsearchWorkflowExecutionRepository
|
|
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_node_execution_repository.ElasticsearchWorkflowNodeExecutionRepository
|
|
|
|
# API repositories
|
|
API_WORKFLOW_RUN_REPOSITORY=repositories.elasticsearch_api_workflow_run_repository.ElasticsearchAPIWorkflowRunRepository
|
|
```
|
|
|
|
## Migration Process
|
|
|
|
### 1. Setup Elasticsearch
|
|
|
|
First, ensure Elasticsearch is running and accessible:
|
|
|
|
```bash
|
|
# Check Elasticsearch status
|
|
curl -X GET "localhost:9200/_cluster/health?pretty"
|
|
```
|
|
|
|
### 2. Test Configuration
|
|
|
|
Verify your Dify configuration:
|
|
|
|
```bash
|
|
# Check Elasticsearch connection
|
|
flask elasticsearch status
|
|
```
|
|
|
|
### 3. Dry Run Migration
|
|
|
|
Perform a dry run to estimate migration scope:
|
|
|
|
```bash
|
|
# Dry run for all data
|
|
flask elasticsearch migrate --dry-run
|
|
|
|
# Dry run for specific tenant
|
|
flask elasticsearch migrate --tenant-id tenant-123 --dry-run
|
|
|
|
# Dry run for date range
|
|
flask elasticsearch migrate --start-date 2024-01-01 --end-date 2024-01-31 --dry-run
|
|
```
|
|
|
|
### 4. Incremental Migration
|
|
|
|
Start with recent data and work backwards:
|
|
|
|
```bash
|
|
# Migrate last 7 days
|
|
flask elasticsearch migrate --start-date $(date -d '7 days ago' +%Y-%m-%d)
|
|
|
|
# Migrate specific data types
|
|
flask elasticsearch migrate --data-type workflow_runs
|
|
flask elasticsearch migrate --data-type app_logs
|
|
flask elasticsearch migrate --data-type node_executions
|
|
```
|
|
|
|
### 5. Full Migration
|
|
|
|
Migrate all historical data:
|
|
|
|
```bash
|
|
# Migrate all data (use appropriate batch size)
|
|
flask elasticsearch migrate --batch-size 500
|
|
|
|
# Migrate specific tenant
|
|
flask elasticsearch migrate --tenant-id tenant-123
|
|
```
|
|
|
|
### 6. Validation
|
|
|
|
Validate the migrated data:
|
|
|
|
```bash
|
|
# Validate migration for tenant
|
|
flask elasticsearch validate --tenant-id tenant-123 --sample-size 1000
|
|
```
|
|
|
|
### 7. Switch Configuration
|
|
|
|
Once validation passes, update your configuration to use Elasticsearch repositories and restart the application.
|
|
|
|
### 8. Cleanup (Optional)
|
|
|
|
After successful migration and validation, clean up old PostgreSQL data:
|
|
|
|
```bash
|
|
# Dry run cleanup
|
|
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01 --dry-run
|
|
|
|
# Actual cleanup (CAUTION: This cannot be undone)
|
|
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01
|
|
```
|
|
|
|
## Index Management
|
|
|
|
### Index Structure
|
|
|
|
Elasticsearch indices are organized as:
|
|
- `dify-workflow-runs-{tenant_id}-{YYYY.MM}`
|
|
- `dify-workflow-app-logs-{tenant_id}-{YYYY.MM}`
|
|
- `dify-workflow-node-executions-{tenant_id}-{YYYY.MM}`
|
|
|
|
### Retention Policy
|
|
|
|
Configure automatic cleanup of old indices:
|
|
|
|
```python
|
|
# In your scheduled tasks
|
|
from services.elasticsearch_migration_service import ElasticsearchMigrationService
|
|
|
|
migration_service = ElasticsearchMigrationService()
|
|
|
|
# Clean up indices older than 30 days
|
|
for tenant_id in get_all_tenant_ids():
|
|
migration_service._workflow_run_repo.cleanup_old_indices(tenant_id, retention_days=30)
|
|
migration_service._app_log_repo.cleanup_old_indices(tenant_id, retention_days=30)
|
|
```
|
|
|
|
## Performance Tuning
|
|
|
|
### Elasticsearch Settings
|
|
|
|
Optimize Elasticsearch for log data:
|
|
|
|
```json
|
|
{
|
|
"settings": {
|
|
"number_of_shards": 1,
|
|
"number_of_replicas": 0,
|
|
"index.refresh_interval": "30s",
|
|
"index.mapping.total_fields.limit": 2000
|
|
}
|
|
}
|
|
```
|
|
|
|
### Batch Processing
|
|
|
|
Adjust batch sizes based on your system:
|
|
|
|
```bash
|
|
# Smaller batches for limited memory
|
|
flask elasticsearch migrate --batch-size 100
|
|
|
|
# Larger batches for high-performance systems
|
|
flask elasticsearch migrate --batch-size 5000
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Check Migration Progress
|
|
|
|
```bash
|
|
# Monitor Elasticsearch status
|
|
flask elasticsearch status
|
|
|
|
# Check specific tenant indices
|
|
flask elasticsearch status --tenant-id tenant-123
|
|
```
|
|
|
|
### Query Performance
|
|
|
|
Monitor query performance in your application logs and Elasticsearch slow query logs.
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **Connection Timeout**
|
|
- Increase `ELASTICSEARCH_TIMEOUT`
|
|
- Check network connectivity
|
|
- Verify Elasticsearch is running
|
|
|
|
2. **Memory Issues**
|
|
- Reduce batch size
|
|
- Increase JVM heap size for Elasticsearch
|
|
- Process data in smaller date ranges
|
|
|
|
3. **Index Template Conflicts**
|
|
- Delete existing templates: `DELETE _index_template/dify-*-template`
|
|
- Restart migration
|
|
|
|
4. **Data Validation Failures**
|
|
- Check Elasticsearch logs for indexing errors
|
|
- Verify data integrity in PostgreSQL
|
|
- Re-run migration for failed records
|
|
|
|
### Recovery
|
|
|
|
If migration fails:
|
|
|
|
1. Check logs for specific errors
|
|
2. Fix configuration issues
|
|
3. Resume migration from last successful point
|
|
4. Use date ranges to process data incrementally
|
|
|
|
## Best Practices
|
|
|
|
1. **Test First**: Always run dry runs and validate on staging
|
|
2. **Incremental Migration**: Start with recent data, migrate incrementally
|
|
3. **Monitor Resources**: Watch CPU, memory, and disk usage during migration
|
|
4. **Backup**: Ensure PostgreSQL backups before cleanup
|
|
5. **Gradual Rollout**: Switch tenants to Elasticsearch gradually
|
|
6. **Index Lifecycle**: Implement proper index rotation and cleanup
|
|
|
|
## Example Migration Script
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
|
|
# Complete migration workflow
|
|
TENANT_ID="tenant-123"
|
|
START_DATE="2024-01-01"
|
|
|
|
echo "Starting Elasticsearch migration for $TENANT_ID"
|
|
|
|
# 1. Dry run
|
|
echo "Performing dry run..."
|
|
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --dry-run
|
|
|
|
# 2. Migrate data
|
|
echo "Migrating data..."
|
|
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --batch-size 1000
|
|
|
|
# 3. Validate
|
|
echo "Validating migration..."
|
|
flask elasticsearch validate --tenant-id $TENANT_ID --sample-size 500
|
|
|
|
# 4. Check status
|
|
echo "Checking status..."
|
|
flask elasticsearch status --tenant-id $TENANT_ID
|
|
|
|
echo "Migration completed for $TENANT_ID"
|
|
```
|
|
|
|
## Support
|
|
|
|
For issues or questions:
|
|
1. Check application logs for detailed error messages
|
|
2. Review Elasticsearch cluster logs
|
|
3. Verify configuration settings
|
|
4. Test with smaller datasets first
|