mirror of https://github.com/langgenius/dify.git
feat(workflow): enhance single run handling
This commit is contained in:
parent
334e5f19bf
commit
112b5f63dd
|
|
@ -238,6 +238,7 @@ const BasePanel: FC<BasePanelProps> = ({
|
|||
singleRunParams,
|
||||
nodeInfo,
|
||||
setRunInputData,
|
||||
handleStop,
|
||||
handleSingleRun,
|
||||
handleRunWithParams,
|
||||
getExistVarValuesInForms,
|
||||
|
|
@ -438,18 +439,10 @@ const BasePanel: FC<BasePanelProps> = ({
|
|||
<div
|
||||
className='mr-1 flex h-6 w-6 cursor-pointer items-center justify-center rounded-md hover:bg-state-base-hover'
|
||||
onClick={() => {
|
||||
if (isSingleRunning) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: undefined,
|
||||
},
|
||||
})
|
||||
}
|
||||
else {
|
||||
if (isSingleRunning)
|
||||
handleStop()
|
||||
else
|
||||
handleSingleRun()
|
||||
}
|
||||
}}
|
||||
>
|
||||
{
|
||||
|
|
|
|||
|
|
@ -205,6 +205,11 @@ const useOneStepRun = <T>({
|
|||
const webhookSingleRunTimeoutRef = useRef<number | undefined>(undefined)
|
||||
const webhookSingleRunTokenRef = useRef(0)
|
||||
const webhookSingleRunDelayResolveRef = useRef<(() => void) | null>(null)
|
||||
const pluginSingleRunActiveRef = useRef(false)
|
||||
const pluginSingleRunAbortRef = useRef<AbortController | null>(null)
|
||||
const pluginSingleRunTimeoutRef = useRef<number | undefined>(undefined)
|
||||
const pluginSingleRunTokenRef = useRef(0)
|
||||
const pluginSingleRunDelayResolveRef = useRef<(() => void) | null>(null)
|
||||
const isPausedRef = useRef(isPaused)
|
||||
useEffect(() => {
|
||||
isPausedRef.current = isPaused
|
||||
|
|
@ -263,6 +268,22 @@ const useOneStepRun = <T>({
|
|||
}
|
||||
}, [])
|
||||
|
||||
const cancelPluginSingleRun = useCallback(() => {
|
||||
pluginSingleRunActiveRef.current = false
|
||||
pluginSingleRunTokenRef.current += 1
|
||||
if (pluginSingleRunAbortRef.current)
|
||||
pluginSingleRunAbortRef.current.abort()
|
||||
pluginSingleRunAbortRef.current = null
|
||||
if (pluginSingleRunTimeoutRef.current !== undefined) {
|
||||
window.clearTimeout(pluginSingleRunTimeoutRef.current)
|
||||
pluginSingleRunTimeoutRef.current = undefined
|
||||
}
|
||||
if (pluginSingleRunDelayResolveRef.current) {
|
||||
pluginSingleRunDelayResolveRef.current()
|
||||
pluginSingleRunDelayResolveRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const runWebhookSingleRun = useCallback(async (): Promise<any | null> => {
|
||||
const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run`
|
||||
const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}`
|
||||
|
|
@ -335,7 +356,7 @@ const useOneStepRun = <T>({
|
|||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Running,
|
||||
_singleRunningStatus: NodeRunningStatus.Listening,
|
||||
},
|
||||
})
|
||||
|
||||
|
|
@ -367,12 +388,12 @@ const useOneStepRun = <T>({
|
|||
const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run`
|
||||
const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}`
|
||||
|
||||
webhookSingleRunActiveRef.current = true
|
||||
const token = ++webhookSingleRunTokenRef.current
|
||||
pluginSingleRunActiveRef.current = true
|
||||
const token = ++pluginSingleRunTokenRef.current
|
||||
|
||||
while (webhookSingleRunActiveRef.current && token === webhookSingleRunTokenRef.current) {
|
||||
while (pluginSingleRunActiveRef.current && token === pluginSingleRunTokenRef.current) {
|
||||
const controller = new AbortController()
|
||||
webhookSingleRunAbortRef.current = controller
|
||||
pluginSingleRunAbortRef.current = controller
|
||||
|
||||
try {
|
||||
const baseOptions = getBaseOptions()
|
||||
|
|
@ -388,7 +409,7 @@ const useOneStepRun = <T>({
|
|||
signal: controller.signal,
|
||||
})
|
||||
|
||||
if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)
|
||||
if (!pluginSingleRunActiveRef.current || token !== pluginSingleRunTokenRef.current)
|
||||
return null
|
||||
|
||||
const contentType = response.headers.get('Content-Type')?.toLowerCase() || ''
|
||||
|
|
@ -397,35 +418,35 @@ const useOneStepRun = <T>({
|
|||
if (!response.ok) {
|
||||
const message = responseData?.message || 'Plugin debug failed'
|
||||
Toast.notify({ type: 'error', message })
|
||||
cancelWebhookSingleRun()
|
||||
cancelPluginSingleRun()
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
if (responseData?.status === 'waiting') {
|
||||
const delay = Number(responseData.retry_in) || 2000
|
||||
webhookSingleRunAbortRef.current = null
|
||||
if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)
|
||||
pluginSingleRunAbortRef.current = null
|
||||
if (!pluginSingleRunActiveRef.current || token !== pluginSingleRunTokenRef.current)
|
||||
return null
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const timeoutId = window.setTimeout(resolve, delay)
|
||||
webhookSingleRunTimeoutRef.current = timeoutId
|
||||
webhookSingleRunDelayResolveRef.current = resolve
|
||||
pluginSingleRunTimeoutRef.current = timeoutId
|
||||
pluginSingleRunDelayResolveRef.current = resolve
|
||||
controller.signal.addEventListener('abort', () => {
|
||||
window.clearTimeout(timeoutId)
|
||||
resolve()
|
||||
}, { once: true })
|
||||
})
|
||||
|
||||
webhookSingleRunTimeoutRef.current = undefined
|
||||
webhookSingleRunDelayResolveRef.current = null
|
||||
pluginSingleRunTimeoutRef.current = undefined
|
||||
pluginSingleRunDelayResolveRef.current = null
|
||||
continue
|
||||
}
|
||||
|
||||
if (responseData?.status === 'error') {
|
||||
const message = responseData.message || 'Plugin debug failed'
|
||||
Toast.notify({ type: 'error', message })
|
||||
cancelWebhookSingleRun()
|
||||
cancelPluginSingleRun()
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
|
|
@ -434,33 +455,33 @@ const useOneStepRun = <T>({
|
|||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Running,
|
||||
_singleRunningStatus: NodeRunningStatus.Listening,
|
||||
},
|
||||
})
|
||||
|
||||
cancelWebhookSingleRun()
|
||||
cancelPluginSingleRun()
|
||||
return responseData
|
||||
}
|
||||
catch (error) {
|
||||
if (controller.signal.aborted && (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current))
|
||||
if (controller.signal.aborted && (!pluginSingleRunActiveRef.current || token !== pluginSingleRunTokenRef.current))
|
||||
return null
|
||||
if (controller.signal.aborted)
|
||||
return null
|
||||
|
||||
console.error('handleRun: plugin debug polling error', error)
|
||||
Toast.notify({ type: 'error', message: 'Plugin debug request failed' })
|
||||
cancelWebhookSingleRun()
|
||||
cancelPluginSingleRun()
|
||||
if (error instanceof Error)
|
||||
throw error
|
||||
throw new Error(String(error))
|
||||
}
|
||||
finally {
|
||||
webhookSingleRunAbortRef.current = null
|
||||
pluginSingleRunAbortRef.current = null
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
}, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun])
|
||||
}, [flowId, id, data, handleNodeDataUpdate, cancelPluginSingleRun])
|
||||
|
||||
const checkValidWrap = () => {
|
||||
if (!checkValid)
|
||||
|
|
@ -527,15 +548,17 @@ const useOneStepRun = <T>({
|
|||
const isPluginNode = data.type === BlockEnum.TriggerPlugin
|
||||
const isTriggerNode = isWebhookNode || isPluginNode
|
||||
|
||||
if (isTriggerNode)
|
||||
if (isWebhookNode)
|
||||
cancelWebhookSingleRun()
|
||||
if (isPluginNode)
|
||||
cancelPluginSingleRun()
|
||||
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: isTriggerNode ? NodeRunningStatus.Waiting : NodeRunningStatus.Running,
|
||||
_singleRunningStatus: isTriggerNode ? NodeRunningStatus.Listening : NodeRunningStatus.Running,
|
||||
},
|
||||
})
|
||||
let res: any
|
||||
|
|
@ -545,28 +568,32 @@ const useOneStepRun = <T>({
|
|||
if (isWebhookNode) {
|
||||
res = await runWebhookSingleRun()
|
||||
if (!res) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.NotStart,
|
||||
},
|
||||
})
|
||||
if (webhookSingleRunActiveRef.current) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Stopped,
|
||||
},
|
||||
})
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
else if (isPluginNode) {
|
||||
res = await runPluginSingleRun()
|
||||
if (!res) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.NotStart,
|
||||
},
|
||||
})
|
||||
if (pluginSingleRunActiveRef.current) {
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Stopped,
|
||||
},
|
||||
})
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
@ -817,8 +844,10 @@ const useOneStepRun = <T>({
|
|||
}
|
||||
}
|
||||
finally {
|
||||
if (isTriggerNode)
|
||||
if (isWebhookNode)
|
||||
cancelWebhookSingleRun()
|
||||
if (isPluginNode)
|
||||
cancelPluginSingleRun()
|
||||
if (!isPausedRef.current && !isIteration && !isLoop && res) {
|
||||
setRunResult({
|
||||
...res,
|
||||
|
|
@ -846,11 +875,13 @@ const useOneStepRun = <T>({
|
|||
|
||||
const handleStop = () => {
|
||||
cancelWebhookSingleRun()
|
||||
cancelPluginSingleRun()
|
||||
handleNodeDataUpdate({
|
||||
id,
|
||||
data: {
|
||||
...data,
|
||||
_singleRunningStatus: NodeRunningStatus.NotStart,
|
||||
_isSingleRun: false,
|
||||
_singleRunningStatus: NodeRunningStatus.Stopped,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,34 +26,6 @@ const StatusPanel: FC<ResultProps> = ({
|
|||
const docLink = useDocLink()
|
||||
const isListening = useStore(s => s.isListening)
|
||||
|
||||
if (isListening) {
|
||||
return (
|
||||
<StatusContainer status={'running'}>
|
||||
<div className='flex'>
|
||||
<div className='max-w-[120px] flex-[33%]'>
|
||||
<div className='system-2xs-medium-uppercase mb-1 text-text-tertiary'>{t('runLog.resultPanel.status')}</div>
|
||||
<div className='system-xs-semibold-uppercase flex items-center gap-1 text-util-colors-blue-light-blue-light-600'>
|
||||
<Indicator color='blue' />
|
||||
<span>LISTENING</span>
|
||||
</div>
|
||||
</div>
|
||||
<div className='max-w-[152px] flex-[33%]'>
|
||||
<div className='system-2xs-medium-uppercase mb-1 text-text-tertiary'>{t('runLog.resultPanel.time')}</div>
|
||||
<div className='system-sm-medium flex items-center gap-1 text-text-secondary'>
|
||||
<div className='h-2 w-16 rounded-sm bg-text-quaternary' />
|
||||
</div>
|
||||
</div>
|
||||
<div className='flex-[33%]'>
|
||||
<div className='system-2xs-medium-uppercase mb-1 text-text-tertiary'>{t('runLog.resultPanel.tokens')}</div>
|
||||
<div className='system-sm-medium flex items-center gap-1 text-text-secondary'>
|
||||
<div className='h-2 w-20 rounded-sm bg-text-quaternary' />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</StatusContainer>
|
||||
)
|
||||
}
|
||||
|
||||
return (
|
||||
<StatusContainer status={status}>
|
||||
<div className='flex'>
|
||||
|
|
@ -75,7 +47,7 @@ const StatusPanel: FC<ResultProps> = ({
|
|||
{status === 'running' && (
|
||||
<>
|
||||
<Indicator color={'blue'} />
|
||||
<span>Running</span>
|
||||
<span>{isListening ? 'Listening' : 'Running'}</span>
|
||||
</>
|
||||
)}
|
||||
{status === 'succeeded' && (
|
||||
|
|
|
|||
|
|
@ -360,6 +360,7 @@ export enum WorkflowVersion {
|
|||
export enum NodeRunningStatus {
|
||||
NotStart = 'not-start',
|
||||
Waiting = 'waiting',
|
||||
Listening = 'listening',
|
||||
Running = 'running',
|
||||
Succeeded = 'succeeded',
|
||||
Failed = 'failed',
|
||||
|
|
|
|||
Loading…
Reference in New Issue