dify/api/services/trigger_debug_service.py

201 lines
7.0 KiB
Python

"""
Trigger debug service for webhook debugging in draft workflows.
This service provides debugging capabilities for trigger nodes by using
Redis Pub/Sub to enable real-time event forwarding across distributed instances.
"""
import json
import logging
import time
import uuid
from collections.abc import Generator
from core.trigger.entities.entities import TriggerDebugEventData
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
class TriggerDebugService:
"""
Trigger debug service - supports distributed environments.
Cleans up resources on disconnect, no reconnection handling.
"""
SESSION_PREFIX = "trigger_debug_session:"
SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:"
PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:"
@classmethod
def create_debug_session(
cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = 300
) -> str:
"""
Create a debug session.
Args:
app_id: Application ID
node_id: Node ID being debugged
subscription_id: Subscription ID to monitor
user_id: User ID creating the session
timeout: Session timeout in seconds
Returns:
Session ID
"""
session_id = str(uuid.uuid4())
session_data = {
"session_id": session_id,
"app_id": app_id,
"node_id": node_id,
"subscription_id": subscription_id,
"user_id": user_id,
"created_at": time.time(),
}
# 1. Save session info
redis_client.setex(f"{cls.SESSION_PREFIX}{session_id}", timeout, json.dumps(session_data))
# 2. Register to subscription's debug session set
redis_client.sadd(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
redis_client.expire(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", timeout)
logger.info("Created debug session %s for subscription %s", session_id, subscription_id)
return session_id
@classmethod
def listen_for_events(cls, session_id: str, timeout: int = 300) -> Generator:
"""
Listen for events using Redis Pub/Sub.
Args:
session_id: Debug session ID
timeout: Timeout in seconds
Yields:
Event data or heartbeat messages
"""
pubsub = redis_client.pubsub()
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
try:
# Subscribe to channel
pubsub.subscribe(channel)
logger.info("Listening on channel: %s", channel)
start_time = time.time()
last_heartbeat = time.time()
# Real-time listening
while time.time() - start_time < timeout:
# Non-blocking message retrieval with 1 second timeout
message = pubsub.get_message(timeout=1.0)
if message and message["type"] == "message":
# Received trigger event
event_data = json.loads(message["data"])
logger.info("Received trigger event for session %s", session_id)
yield event_data
break # End listening after receiving event
# Send periodic heartbeat
if time.time() - last_heartbeat > 5:
yield {"type": "heartbeat", "remaining": int(timeout - (time.time() - start_time))}
last_heartbeat = time.time()
# Timeout
if time.time() - start_time >= timeout:
yield {"type": "timeout"}
except Exception as e:
logger.exception("Error in listen_for_events", exc_info=e)
yield {"type": "error", "message": str(e)}
finally:
# Clean up resources
cls.close_session(session_id)
pubsub.unsubscribe(channel)
pubsub.close()
logger.info("Closed listening for session %s", session_id)
@classmethod
def close_session(cls, session_id: str):
"""
Close and clean up debug session.
Args:
session_id: Session ID to close
"""
try:
# Get session info
session_data = redis_client.get(f"{cls.SESSION_PREFIX}{session_id}")
if session_data:
session = json.loads(session_data)
subscription_id = session.get("subscription_id")
# Remove from subscription set
if subscription_id:
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
logger.info("Removed session %s from subscription %s", session_id, subscription_id)
# Delete session info
redis_client.delete(f"{cls.SESSION_PREFIX}{session_id}")
logger.info("Cleaned up session %s", session_id)
except Exception as e:
logger.exception("Error closing session %s", session_id, exc_info=e)
@classmethod
def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int:
"""
Dispatch events to debug sessions using Pub/Sub only.
Args:
subscription_id: Subscription ID
event_data: Event data to dispatch
Returns:
Number of active debug sessions
"""
try:
# Get all listening debug sessions
debug_sessions = redis_client.smembers(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}")
if not debug_sessions:
return 0
active_sessions = 0
for session_id_bytes in debug_sessions:
if isinstance(session_id_bytes, bytes):
session_id = session_id_bytes.decode("utf-8")
else:
session_id = session_id_bytes
# Verify session is valid
if not redis_client.exists(f"{cls.SESSION_PREFIX}{session_id}"):
# Clean up invalid session
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
continue
# Publish event via Pub/Sub
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
subscriber_count = redis_client.publish(channel, json.dumps(event_data.model_dump()))
if subscriber_count > 0:
active_sessions += 1
logger.info("Published event to %d subscribers on channel %s", subscriber_count, channel)
else:
# No subscribers, clean up session
logger.info("No subscribers for session %s, cleaning up", session_id)
cls.close_session(session_id)
if active_sessions > 0:
logger.info("Dispatched event to %d active debug sessions", active_sessions)
return active_sessions
except Exception as e:
logger.exception("Failed to dispatch to debug sessions", exc_info=e)
return 0