mirror of https://github.com/langgenius/dify.git
add webhook node draft single run
This commit is contained in:
parent
3370736e09
commit
d05d11e67f
|
|
@ -31,7 +31,7 @@ from libs.login import current_user, login_required
|
|||
from models import App
|
||||
from models.account import Account
|
||||
from models.model import AppMode
|
||||
from models.workflow import Workflow
|
||||
from models.workflow import NodeType, Workflow
|
||||
from services.app_generate_service import AppGenerateService
|
||||
from services.errors.app import WorkflowHashNotEqualError
|
||||
from services.errors.llm import InvokeRateLimitError
|
||||
|
|
@ -1234,6 +1234,68 @@ class DraftWorkflowTriggerWebhookRunApi(Resource):
|
|||
), 500
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/debug/webhook/run")
|
||||
class DraftWorkflowNodeWebhookDebugRunApi(Resource):
|
||||
"""Single node debug when the node is a webhook trigger."""
|
||||
|
||||
@api.doc("draft_workflow_node_webhook_debug_run")
|
||||
@api.doc(description="Poll for webhook debug payload and execute single node when event arrives")
|
||||
@api.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
|
||||
@api.response(200, "Node executed successfully")
|
||||
@api.response(403, "Permission denied")
|
||||
@api.response(400, "Invalid node type")
|
||||
@api.response(500, "Internal server error")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.WORKFLOW])
|
||||
def post(self, app_model: App, node_id: str):
|
||||
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
|
||||
raise Forbidden()
|
||||
|
||||
event = WebhookDebugService.poll_event(
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
node_id=node_id,
|
||||
)
|
||||
|
||||
if not event:
|
||||
return jsonable_encoder({"status": "waiting", "retry_in": 2000})
|
||||
|
||||
workflow_service = WorkflowService()
|
||||
draft_workflow = workflow_service.get_draft_workflow(app_model=app_model)
|
||||
|
||||
if not draft_workflow:
|
||||
raise DraftWorkflowNotExist()
|
||||
|
||||
node_config = draft_workflow.get_node_config_by_id(node_id)
|
||||
node_type = Workflow.get_node_type_from_node_config(node_config)
|
||||
if node_type != NodeType.TRIGGER_WEBHOOK:
|
||||
return jsonable_encoder({
|
||||
"status": "error",
|
||||
"message": "node is not webhook trigger",
|
||||
}), 400
|
||||
|
||||
payload = event.payload or {}
|
||||
workflow_inputs = payload.get("inputs")
|
||||
if workflow_inputs is None:
|
||||
webhook_data = payload.get("webhook_data", {})
|
||||
workflow_inputs = WebhookService.build_workflow_inputs(webhook_data)
|
||||
|
||||
workflow_node_execution = workflow_service.run_draft_workflow_node(
|
||||
app_model=app_model,
|
||||
draft_workflow=draft_workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=workflow_inputs or {},
|
||||
account=current_user,
|
||||
query="",
|
||||
files=[],
|
||||
)
|
||||
|
||||
return jsonable_encoder(workflow_node_execution)
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/schedule/run")
|
||||
class DraftWorkflowTriggerScheduleRunApi(Resource):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ from core.workflow.nodes import NodeType
|
|||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
|
||||
from core.workflow.nodes.start.entities import StartNodeData
|
||||
from core.workflow.nodes.trigger_webhook.entities import WebhookData
|
||||
from core.workflow.system_variable import SystemVariable
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
|
||||
|
|
@ -630,10 +631,13 @@ class WorkflowService:
|
|||
app=app_model,
|
||||
workflow=draft_workflow,
|
||||
)
|
||||
start_data = StartNodeData.model_validate(node_data)
|
||||
user_inputs = _rebuild_file_for_user_inputs_in_start_node(
|
||||
tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs
|
||||
)
|
||||
if node_type == NodeType.TRIGGER_WEBHOOK:
|
||||
start_data = WebhookData.model_validate(node_data)
|
||||
else:
|
||||
start_data = StartNodeData.model_validate(node_data)
|
||||
user_inputs = _rebuild_file_for_user_inputs_in_start_node(
|
||||
tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs
|
||||
)
|
||||
# init variable pool
|
||||
variable_pool = _setup_variable_pool(
|
||||
query=query,
|
||||
|
|
|
|||
|
|
@ -62,6 +62,9 @@ const singleRunFormParamsHooks: Record<BlockEnum, any> = {
|
|||
[BlockEnum.LoopEnd]: undefined,
|
||||
[BlockEnum.DataSource]: undefined,
|
||||
[BlockEnum.DataSourceEmpty]: undefined,
|
||||
[BlockEnum.TriggerWebhook]: undefined,
|
||||
[BlockEnum.TriggerSchedule]: undefined,
|
||||
[BlockEnum.TriggerPlugin]: undefined,
|
||||
}
|
||||
|
||||
const useSingleRunFormParamsHooks = (nodeType: BlockEnum) => {
|
||||
|
|
@ -97,6 +100,9 @@ const getDataForCheckMoreHooks: Record<BlockEnum, any> = {
|
|||
[BlockEnum.DataSource]: undefined,
|
||||
[BlockEnum.DataSourceEmpty]: undefined,
|
||||
[BlockEnum.KnowledgeBase]: undefined,
|
||||
[BlockEnum.TriggerWebhook]: undefined,
|
||||
[BlockEnum.TriggerSchedule]: undefined,
|
||||
[BlockEnum.TriggerPlugin]: undefined,
|
||||
}
|
||||
|
||||
const useGetDataForCheckMoreHooks = <T>(nodeType: BlockEnum) => {
|
||||
|
|
|
|||
|
|
@ -53,6 +53,8 @@ import { useInvalidLastRun } from '@/service/use-workflow'
|
|||
import useInspectVarsCrud from '../../../hooks/use-inspect-vars-crud'
|
||||
import type { FlowType } from '@/types/common'
|
||||
import useMatchSchemaType from '../components/variable/use-match-schema-type'
|
||||
import { API_PREFIX } from '@/config'
|
||||
import { getAccessToken, getBaseOptions } from '@/service/fetch'
|
||||
// eslint-disable-next-line ts/no-unsafe-function-type
|
||||
const checkValidFns: Record<BlockEnum, Function> = {
|
||||
[BlockEnum.LLM]: checkLLMValid,
|
||||
|
|
@ -198,6 +200,11 @@ const useOneStepRun = <T>({
|
|||
invalidateConversationVarValues,
|
||||
} = useInspectVarsCrud()
|
||||
const runningStatus = data._singleRunningStatus || NodeRunningStatus.NotStart
|
||||
const webhookSingleRunActiveRef = useRef(false)
|
||||
const webhookSingleRunAbortRef = useRef<AbortController | null>(null)
|
||||
const webhookSingleRunTimeoutRef = useRef<number | undefined>(undefined)
|
||||
const webhookSingleRunTokenRef = useRef(0)
|
||||
const webhookSingleRunDelayResolveRef = useRef<(() => void) | null>(null)
|
||||
const isPausedRef = useRef(isPaused)
|
||||
useEffect(() => {
|
||||
isPausedRef.current = isPaused
|
||||
|
|
@ -239,6 +246,122 @@ const useOneStepRun = <T>({
|
|||
},
|
||||
})
|
||||
}
|
||||
|
||||
const cancelWebhookSingleRun = useCallback(() => {
|
||||
webhookSingleRunActiveRef.current = false
|
||||
webhookSingleRunTokenRef.current += 1
|
||||
if (webhookSingleRunAbortRef.current)
|
||||
webhookSingleRunAbortRef.current.abort()
|
||||
webhookSingleRunAbortRef.current = null
|
||||
if (webhookSingleRunTimeoutRef.current !== undefined) {
|
||||
window.clearTimeout(webhookSingleRunTimeoutRef.current)
|
||||
webhookSingleRunTimeoutRef.current = undefined
|
||||
}
|
||||
if (webhookSingleRunDelayResolveRef.current) {
|
||||
webhookSingleRunDelayResolveRef.current()
|
||||
webhookSingleRunDelayResolveRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const runWebhookSingleRun = useCallback(async (): Promise<any | null> => {
|
||||
const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/debug/webhook/run`
|
||||
const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}`
|
||||
|
||||
webhookSingleRunActiveRef.current = true
|
||||
const token = ++webhookSingleRunTokenRef.current
|
||||
|
||||
while (webhookSingleRunActiveRef.current && token === webhookSingleRunTokenRef.current) {
|
||||
const controller = new AbortController()
|
||||
webhookSingleRunAbortRef.current = controller
|
||||
|
||||
try {
|
||||
const baseOptions = getBaseOptions()
|
||||
const headers = new Headers(baseOptions.headers as Headers)
|
||||
const accessToken = await getAccessToken()
|
||||
headers.set('Authorization', `Bearer ${accessToken}`)
|
||||
headers.set('Content-Type', 'application/json')
|
||||
|
||||
const response = await fetch(urlWithPrefix, {
|
||||
...baseOptions,
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({}),
|
||||
signal: controller.signal,
|
||||
})
|
||||
|
||||
if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)
|
||||
return null
|
||||
|
||||
const contentType = response.headers.get('Content-Type')?.toLowerCase() || ''
|
||||
const responseData = contentType.includes('application/json') ? await response.json() : undefined
|
||||
|
||||
if (!response.ok) {
|
||||
const message = responseData?.message || 'Webhook debug failed'
|
||||
Toast.notify({ type: 'error', message })
|
||||
cancelWebhookSingleRun()
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
if (responseData?.status === 'waiting') {
|
||||
const delay = Number(responseData.retry_in) || 2000
|
||||
webhookSingleRunAbortRef.current = null
|
||||
if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)
|
||||
return null
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const timeoutId = window.setTimeout(resolve, delay)
|
||||
webhookSingleRunTimeoutRef.current = timeoutId
|
||||
webhookSingleRunDelayResolveRef.current = resolve
|
||||
controller.signal.addEventListener('abort', () => {
|
||||
window.clearTimeout(timeoutId)
|
||||
resolve()
|
||||
}, { once: true })
|
||||
})
|
||||
|
||||
webhookSingleRunTimeoutRef.current = undefined
|
||||
webhookSingleRunDelayResolveRef.current = null
|
||||
continue
|
||||
}
|
||||
|
||||
if (responseData?.status === 'error') {
|
||||
const message = responseData.message || 'Webhook debug failed'
|
||||
Toast.notify({ type: 'error', message })
|
||||
cancelWebhookSingleRun()
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Running,
|
||||
},
|
||||
})
|
||||
|
||||
cancelWebhookSingleRun()
|
||||
return responseData
|
||||
}
|
||||
catch (error) {
|
||||
if (controller.signal.aborted && (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current))
|
||||
return null
|
||||
if (controller.signal.aborted)
|
||||
return null
|
||||
|
||||
console.error('handleRun: webhook debug polling error', error)
|
||||
Toast.notify({ type: 'error', message: 'Webhook debug request failed' })
|
||||
cancelWebhookSingleRun()
|
||||
if (error instanceof Error)
|
||||
throw error
|
||||
throw new Error(String(error))
|
||||
}
|
||||
finally {
|
||||
webhookSingleRunAbortRef.current = null
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
}, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun])
|
||||
const checkValidWrap = () => {
|
||||
if (!checkValid)
|
||||
return { isValid: true, errorMessage: '' }
|
||||
|
|
@ -300,33 +423,53 @@ const useOneStepRun = <T>({
|
|||
const isCompleted = runningStatus === NodeRunningStatus.Succeeded || runningStatus === NodeRunningStatus.Failed
|
||||
|
||||
const handleRun = async (submitData: Record<string, any>) => {
|
||||
const isWebhookNode = data.type === BlockEnum.TriggerWebhook
|
||||
if (isWebhookNode)
|
||||
cancelWebhookSingleRun()
|
||||
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Running,
|
||||
_singleRunningStatus: isWebhookNode ? NodeRunningStatus.Waiting : NodeRunningStatus.Running,
|
||||
},
|
||||
})
|
||||
let res: any
|
||||
let hasError = false
|
||||
try {
|
||||
if (!isIteration && !isLoop) {
|
||||
const isStartNode = data.type === BlockEnum.Start
|
||||
const postData: Record<string, any> = {}
|
||||
if (isStartNode) {
|
||||
const { '#sys.query#': query, '#sys.files#': files, ...inputs } = submitData
|
||||
if (isChatMode)
|
||||
postData.conversation_id = ''
|
||||
|
||||
postData.inputs = inputs
|
||||
postData.query = query
|
||||
postData.files = files || []
|
||||
if (isWebhookNode) {
|
||||
res = await runWebhookSingleRun()
|
||||
if (!res) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.NotStart,
|
||||
},
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
else {
|
||||
postData.inputs = submitData
|
||||
const isStartNode = data.type === BlockEnum.Start
|
||||
const postData: Record<string, any> = {}
|
||||
if (isStartNode) {
|
||||
const { '#sys.query#': query, '#sys.files#': files, ...inputs } = submitData
|
||||
if (isChatMode)
|
||||
postData.conversation_id = ''
|
||||
|
||||
postData.inputs = inputs
|
||||
postData.query = query
|
||||
postData.files = files || []
|
||||
}
|
||||
else {
|
||||
postData.inputs = submitData
|
||||
}
|
||||
res = await singleNodeRun(flowType, flowId!, id, postData) as any
|
||||
}
|
||||
res = await singleNodeRun(flowType, flowId!, id, postData) as any
|
||||
}
|
||||
else if (isIteration) {
|
||||
setIterationRunResult([])
|
||||
|
|
@ -557,6 +700,8 @@ const useOneStepRun = <T>({
|
|||
}
|
||||
}
|
||||
finally {
|
||||
if (data.type === BlockEnum.TriggerWebhook)
|
||||
cancelWebhookSingleRun()
|
||||
if (!isPausedRef.current && !isIteration && !isLoop && res) {
|
||||
setRunResult({
|
||||
...res,
|
||||
|
|
@ -583,6 +728,7 @@ const useOneStepRun = <T>({
|
|||
}
|
||||
|
||||
const handleStop = () => {
|
||||
cancelWebhookSingleRun()
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ export const canRunBySingle = (nodeType: BlockEnum, isChildNode: boolean) => {
|
|||
|| nodeType === BlockEnum.VariableAggregator
|
||||
|| nodeType === BlockEnum.Assigner
|
||||
|| nodeType === BlockEnum.DataSource
|
||||
|| nodeType === BlockEnum.TriggerWebhook
|
||||
}
|
||||
|
||||
export const isSupportCustomRunForm = (nodeType: BlockEnum) => {
|
||||
|
|
|
|||
Loading…
Reference in New Issue