dify/api/docs/elasticsearch_migration.md

7.5 KiB

Elasticsearch Migration Guide

This guide explains how to migrate workflow log data from PostgreSQL to Elasticsearch for better performance and scalability.

Overview

The Elasticsearch integration provides:

  • High-performance log storage: Better suited for time-series log data
  • Advanced search capabilities: Full-text search and complex queries
  • Scalability: Horizontal scaling for large datasets
  • Time-series optimization: Date-based index rotation for efficient storage
  • Multi-tenant isolation: Separate indices per tenant for data isolation

Architecture

The migration involves four main log tables:

  1. workflow_runs: Core workflow execution records
  2. workflow_app_logs: Application-level workflow logs
  3. workflow_node_executions: Individual node execution records
  4. workflow_node_execution_offload: Large data offloaded to storage

Configuration

Environment Variables

Add the following to your .env file:

# Enable Elasticsearch
ELASTICSEARCH_ENABLED=true

# Elasticsearch connection
ELASTICSEARCH_HOSTS=["http://localhost:9200"]
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=your_password

# SSL configuration (optional)
ELASTICSEARCH_USE_SSL=false
ELASTICSEARCH_VERIFY_CERTS=true
ELASTICSEARCH_CA_CERTS=/path/to/ca.crt

# Performance settings
ELASTICSEARCH_TIMEOUT=30
ELASTICSEARCH_MAX_RETRIES=3
ELASTICSEARCH_INDEX_PREFIX=dify
ELASTICSEARCH_RETENTION_DAYS=30

Repository Configuration

Update your configuration to use Elasticsearch repositories:

# Core repositories
CORE_WORKFLOW_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_execution_repository.ElasticsearchWorkflowExecutionRepository
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY=core.repositories.elasticsearch_workflow_node_execution_repository.ElasticsearchWorkflowNodeExecutionRepository

# API repositories
API_WORKFLOW_RUN_REPOSITORY=repositories.elasticsearch_api_workflow_run_repository.ElasticsearchAPIWorkflowRunRepository

Migration Process

1. Setup Elasticsearch

First, ensure Elasticsearch is running and accessible:

# Check Elasticsearch status
curl -X GET "localhost:9200/_cluster/health?pretty"

2. Test Configuration

Verify your Dify configuration:

# Check Elasticsearch connection
flask elasticsearch status

3. Dry Run Migration

Perform a dry run to estimate migration scope:

# Dry run for all data
flask elasticsearch migrate --dry-run

# Dry run for specific tenant
flask elasticsearch migrate --tenant-id tenant-123 --dry-run

# Dry run for date range
flask elasticsearch migrate --start-date 2024-01-01 --end-date 2024-01-31 --dry-run

4. Incremental Migration

Start with recent data and work backwards:

# Migrate last 7 days
flask elasticsearch migrate --start-date $(date -d '7 days ago' +%Y-%m-%d)

# Migrate specific data types
flask elasticsearch migrate --data-type workflow_runs
flask elasticsearch migrate --data-type app_logs
flask elasticsearch migrate --data-type node_executions

5. Full Migration

Migrate all historical data:

# Migrate all data (use appropriate batch size)
flask elasticsearch migrate --batch-size 500

# Migrate specific tenant
flask elasticsearch migrate --tenant-id tenant-123

6. Validation

Validate the migrated data:

# Validate migration for tenant
flask elasticsearch validate --tenant-id tenant-123 --sample-size 1000

7. Switch Configuration

Once validation passes, update your configuration to use Elasticsearch repositories and restart the application.

8. Cleanup (Optional)

After successful migration and validation, clean up old PostgreSQL data:

# Dry run cleanup
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01 --dry-run

# Actual cleanup (CAUTION: This cannot be undone)
flask elasticsearch cleanup-pg --tenant-id tenant-123 --before-date 2024-01-01

Index Management

Index Structure

Elasticsearch indices are organized as:

  • dify-workflow-runs-{tenant_id}-{YYYY.MM}
  • dify-workflow-app-logs-{tenant_id}-{YYYY.MM}
  • dify-workflow-node-executions-{tenant_id}-{YYYY.MM}

Retention Policy

Configure automatic cleanup of old indices:

# In your scheduled tasks
from services.elasticsearch_migration_service import ElasticsearchMigrationService

migration_service = ElasticsearchMigrationService()

# Clean up indices older than 30 days
for tenant_id in get_all_tenant_ids():
    migration_service._workflow_run_repo.cleanup_old_indices(tenant_id, retention_days=30)
    migration_service._app_log_repo.cleanup_old_indices(tenant_id, retention_days=30)

Performance Tuning

Elasticsearch Settings

Optimize Elasticsearch for log data:

{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.refresh_interval": "30s",
    "index.mapping.total_fields.limit": 2000
  }
}

Batch Processing

Adjust batch sizes based on your system:

# Smaller batches for limited memory
flask elasticsearch migrate --batch-size 100

# Larger batches for high-performance systems
flask elasticsearch migrate --batch-size 5000

Monitoring

Check Migration Progress

# Monitor Elasticsearch status
flask elasticsearch status

# Check specific tenant indices
flask elasticsearch status --tenant-id tenant-123

Query Performance

Monitor query performance in your application logs and Elasticsearch slow query logs.

Troubleshooting

Common Issues

  1. Connection Timeout

    • Increase ELASTICSEARCH_TIMEOUT
    • Check network connectivity
    • Verify Elasticsearch is running
  2. Memory Issues

    • Reduce batch size
    • Increase JVM heap size for Elasticsearch
    • Process data in smaller date ranges
  3. Index Template Conflicts

    • Delete existing templates: DELETE _index_template/dify-*-template
    • Restart migration
  4. Data Validation Failures

    • Check Elasticsearch logs for indexing errors
    • Verify data integrity in PostgreSQL
    • Re-run migration for failed records

Recovery

If migration fails:

  1. Check logs for specific errors
  2. Fix configuration issues
  3. Resume migration from last successful point
  4. Use date ranges to process data incrementally

Best Practices

  1. Test First: Always run dry runs and validate on staging
  2. Incremental Migration: Start with recent data, migrate incrementally
  3. Monitor Resources: Watch CPU, memory, and disk usage during migration
  4. Backup: Ensure PostgreSQL backups before cleanup
  5. Gradual Rollout: Switch tenants to Elasticsearch gradually
  6. Index Lifecycle: Implement proper index rotation and cleanup

Example Migration Script

#!/bin/bash

# Complete migration workflow
TENANT_ID="tenant-123"
START_DATE="2024-01-01"

echo "Starting Elasticsearch migration for $TENANT_ID"

# 1. Dry run
echo "Performing dry run..."
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --dry-run

# 2. Migrate data
echo "Migrating data..."
flask elasticsearch migrate --tenant-id $TENANT_ID --start-date $START_DATE --batch-size 1000

# 3. Validate
echo "Validating migration..."
flask elasticsearch validate --tenant-id $TENANT_ID --sample-size 500

# 4. Check status
echo "Checking status..."
flask elasticsearch status --tenant-id $TENANT_ID

echo "Migration completed for $TENANT_ID"

Support

For issues or questions:

  1. Check application logs for detailed error messages
  2. Review Elasticsearch cluster logs
  3. Verify configuration settings
  4. Test with smaller datasets first