dify/api/tasks/mail_human_input_delivery_task.py
copilot-swe-agent[bot] 215d3ed42d
Merge remote-tracking branch 'origin/deploy/dev' into feat/evaluation
# Conflicts:
#	.vite-hooks/pre-commit
#	api/controllers/console/__init__.py
#	api/core/agent/base_agent_runner.py
#	api/core/app/app_config/easy_ui_based_app/model_config/converter.py
#	api/core/app/apps/agent_chat/app_runner.py
#	api/core/entities/provider_configuration.py
#	api/core/helper/moderation.py
#	api/core/model_manager.py
#	api/core/rag/embedding/cached_embedding.py
#	api/core/rag/retrieval/dataset_retrieval.py
#	api/core/rag/splitter/fixed_text_splitter.py
#	api/core/workflow/nodes/datasource/datasource_node.py
#	api/core/workflow/nodes/knowledge_index/knowledge_index_node.py
#	api/models/human_input.py
#	api/providers/trace/trace-tencent/src/dify_trace_tencent/span_builder.py
#	api/services/workflow_service.py
#	api/tasks/trigger_processing_tasks.py
#	api/tests/integration_tests/core/workflow/nodes/datasource/test_datasource_node_integration.py
#	api/tests/integration_tests/workflow/nodes/test_http.py
#	api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py
#	api/tests/unit_tests/controllers/service_api/app/test_conversation.py
#	api/tests/unit_tests/core/prompt/test_agent_history_prompt_transform.py
#	api/tests/unit_tests/core/variables/test_segment.py
#	api/tests/unit_tests/core/workflow/graph_engine/test_mock_factory.py
#	api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py
#	api/tests/unit_tests/core/workflow/nodes/datasource/test_datasource_node.py
#	api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py
#	api/tests/unit_tests/core/workflow/nodes/human_input/test_email_delivery_config.py
#	api/tests/unit_tests/services/workflow/test_workflow_human_input_delivery.py
#	web/app/(commonLayout)/layout.tsx
#	web/app/components/app/configuration/dataset-config/params-config/weighted-score.tsx
#	web/app/components/app/configuration/debug/debug-with-multiple-model/debug-item.tsx
#	web/app/components/app/workflow-log/__tests__/list.spec.tsx
#	web/app/components/apps/__tests__/list.spec.tsx
#	web/app/components/apps/list.tsx
#	web/app/components/base/chat/chat-with-history/header/operation.tsx
#	web/app/components/base/chat/chat-with-history/sidebar/operation.tsx
#	web/app/components/header/account-setting/data-source-page-new/operator.tsx
#	web/app/components/header/account-setting/members-page/operation/index.tsx
#	web/app/components/plugins/marketplace/sort-dropdown/__tests__/index.spec.tsx
#	web/app/components/plugins/marketplace/sort-dropdown/index.tsx
#	web/app/components/plugins/plugin-page/plugin-tasks/index.tsx
#	web/app/components/workflow/header/__tests__/test-run-menu.spec.tsx
#	web/app/components/workflow/header/test-run-menu.tsx
#	web/app/components/workflow/nodes/_base/components/next-step/operator.tsx
#	web/app/components/workflow/nodes/_base/components/panel-operator/index.tsx
#	web/app/components/workflow/nodes/assigner/components/__tests__/operation-selector.spec.tsx
#	web/app/components/workflow/nodes/assigner/components/operation-selector.tsx
#	web/app/components/workflow/operator/__tests__/more-actions.spec.tsx
#	web/app/components/workflow/operator/zoom-in-out.tsx
#	web/app/components/workflow/panel/version-history-panel/context-menu/menu-item.tsx
#	web/app/components/workflow/selection-contextmenu.tsx
#	web/eslint-suppressions.json

Co-authored-by: FFXN <31929997+FFXN@users.noreply.github.com>
2026-04-20 07:03:29 +00:00

192 lines
6.3 KiB
Python

import json
import logging
import time
from dataclasses import dataclass
from typing import Any
import click
from celery import shared_task
from graphon.runtime import GraphRuntimeState, VariablePool
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext
from core.workflow.human_input_adapter import EmailDeliveryConfig, EmailDeliveryMethod
from extensions.ext_database import db
from extensions.ext_mail import mail
from models.human_input import (
DeliveryMethodType,
HumanInputDelivery,
HumanInputForm,
HumanInputFormRecipient,
RecipientType,
)
from repositories.factory import DifyAPIRepositoryFactory
from services.feature_service import FeatureService
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class _EmailRecipient:
email: str
token: str
@dataclass(frozen=True)
class _EmailDeliveryJob:
form_id: str
subject: str
body: str
form_content: str
recipients: list[_EmailRecipient]
def _build_form_link(token: str) -> str:
base_url = dify_config.APP_WEB_URL
return f"{base_url.rstrip('/')}/form/{token}"
def _parse_recipient_payload(payload: str) -> tuple[str | None, RecipientType | None]:
try:
payload_dict: dict[str, Any] = json.loads(payload)
except Exception:
logger.exception("Failed to parse recipient payload")
return None, None
return payload_dict.get("email"), payload_dict.get("TYPE")
def _load_email_jobs(session: Session, form: HumanInputForm) -> list[_EmailDeliveryJob]:
deliveries = session.scalars(
select(HumanInputDelivery).where(
HumanInputDelivery.form_id == form.id,
HumanInputDelivery.delivery_method_type == DeliveryMethodType.EMAIL,
)
).all()
jobs: list[_EmailDeliveryJob] = []
for delivery in deliveries:
delivery_config = EmailDeliveryMethod.model_validate_json(delivery.channel_payload)
recipients = session.scalars(
select(HumanInputFormRecipient).where(HumanInputFormRecipient.delivery_id == delivery.id)
).all()
recipient_entities: list[_EmailRecipient] = []
for recipient in recipients:
email, recipient_type = _parse_recipient_payload(recipient.recipient_payload)
if recipient_type not in {RecipientType.EMAIL_MEMBER, RecipientType.EMAIL_EXTERNAL}:
continue
if not email:
continue
token = recipient.access_token
if not token:
continue
recipient_entities.append(_EmailRecipient(email=email, token=token))
if not recipient_entities:
continue
jobs.append(
_EmailDeliveryJob(
form_id=form.id,
subject=delivery_config.config.subject,
body=delivery_config.config.body,
form_content=form.rendered_content,
recipients=recipient_entities,
)
)
return jobs
def _render_body(
body_template: str,
form_link: str,
*,
variable_pool: VariablePool | None,
) -> str:
body = EmailDeliveryConfig.render_body_template(
body=body_template,
url=form_link,
variable_pool=variable_pool,
)
return EmailDeliveryConfig.render_markdown_body(body)
def _load_variable_pool(workflow_run_id: str | None) -> VariablePool | None:
if not workflow_run_id:
return None
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_factory)
pause_entity = workflow_run_repo.get_workflow_pause(workflow_run_id)
if pause_entity is None:
logger.info("No pause state found for workflow run %s", workflow_run_id)
return None
try:
resumption_context = WorkflowResumptionContext.loads(pause_entity.get_state().decode())
except Exception:
logger.exception("Failed to load resumption context for workflow run %s", workflow_run_id)
return None
graph_runtime_state = GraphRuntimeState.from_snapshot(resumption_context.serialized_graph_runtime_state)
return graph_runtime_state.variable_pool
def _open_session(session_factory: sessionmaker | Session | None):
if session_factory is None:
return Session(db.engine)
if isinstance(session_factory, Session):
return session_factory
return session_factory()
@shared_task(queue="mail")
def dispatch_human_input_email_task(form_id: str, node_title: str | None = None, session_factory=None):
if not mail.is_inited():
return
logger.info(click.style(f"Start human input email delivery for form {form_id}", fg="green"))
start_at = time.perf_counter()
try:
with _open_session(session_factory) as session:
form = session.get(HumanInputForm, form_id)
if form is None:
logger.warning("Human input form not found, form_id=%s", form_id)
return
features = FeatureService.get_features(form.tenant_id)
if not features.human_input_email_delivery_enabled:
logger.info(
"Human input email delivery is not available for tenant=%s, form_id=%s",
form.tenant_id,
form_id,
)
return
jobs = _load_email_jobs(session, form)
variable_pool = _load_variable_pool(form.workflow_run_id)
for job in jobs:
for recipient in job.recipients:
form_link = _build_form_link(recipient.token)
body = _render_body(job.body, form_link, variable_pool=variable_pool)
subject = EmailDeliveryConfig.sanitize_subject(job.subject)
mail.send(
to=recipient.email,
subject=subject,
html=body,
)
end_at = time.perf_counter()
logger.info(
click.style(
f"Human input email delivery succeeded for form {form_id}: latency: {end_at - start_at}", fg="green"
)
)
except Exception:
logger.exception("Send human input email failed, form_id=%s", form_id)