From 915023b8095316a2ce93994a0fc425f3cd4bf102 Mon Sep 17 00:00:00 2001 From: zxhlyh Date: Thu, 25 Sep 2025 18:02:43 +0800 Subject: [PATCH] Chore/remove add node restrict of workflow (#26218) Co-authored-by: -LAN- --- api/.env.example | 1 - api/configs/feature/__init__.py | 5 - api/controllers/console/app/workflow.py | 19 --- .../rag_pipeline/rag_pipeline_workflow.py | 17 -- api/tests/integration_tests/.env.example | 1 - .../unit_tests/configs/test_dify_config.py | 2 - docker/.env.example | 1 - docker/docker-compose.yaml | 1 - .../rag-pipeline/hooks/use-pipeline-config.ts | 10 -- .../workflow-app/hooks/use-workflow-init.ts | 7 - web/app/components/workflow/constants.ts | 2 - .../workflow/hooks/use-nodes-interactions.ts | 45 ++--- .../components/workflow/hooks/use-workflow.ts | 54 +----- web/app/components/workflow/index.tsx | 2 - web/app/components/workflow/limit-tips.tsx | 39 ----- .../nodes/_base/components/next-step/add.tsx | 9 +- .../nodes/_base/components/node-handle.tsx | 7 +- .../workflow/store/workflow/workflow-slice.ts | 8 - web/app/components/workflow/utils/workflow.ts | 156 ------------------ 19 files changed, 17 insertions(+), 369 deletions(-) delete mode 100644 web/app/components/workflow/limit-tips.tsx diff --git a/api/.env.example b/api/.env.example index 64e79bf0b8..d53de3779b 100644 --- a/api/.env.example +++ b/api/.env.example @@ -468,7 +468,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 WORKFLOW_MAX_EXECUTION_STEPS=500 WORKFLOW_MAX_EXECUTION_TIME=1200 WORKFLOW_CALL_MAX_DEPTH=5 -WORKFLOW_PARALLEL_DEPTH_LIMIT=3 MAX_VARIABLE_SIZE=204800 # GraphEngine Worker Pool Configuration diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index e836059ca6..363cf4e2b5 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -577,11 +577,6 @@ class WorkflowConfig(BaseSettings): default=5, ) - WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field( - description="Maximum allowed depth for nested parallel executions", - default=3, - ) - MAX_VARIABLE_SIZE: PositiveInt = Field( description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.", default=200 * 1024, diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index e70765546c..1f5cbbeca5 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -9,7 +9,6 @@ from sqlalchemy.orm import Session from werkzeug.exceptions import Forbidden, InternalServerError, NotFound import services -from configs import dify_config from controllers.console import api, console_ns from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync from controllers.console.app.wraps import get_app_model @@ -797,24 +796,6 @@ class ConvertToWorkflowApi(Resource): } -@console_ns.route("/apps//workflows/draft/config") -class WorkflowConfigApi(Resource): - """Resource for workflow configuration.""" - - @api.doc("get_workflow_config") - @api.doc(description="Get workflow configuration") - @api.doc(params={"app_id": "Application ID"}) - @api.response(200, "Workflow configuration retrieved successfully") - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - def get(self, app_model: App): - return { - "parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT, - } - - @console_ns.route("/apps//workflows") class PublishedAllWorkflowApi(Resource): @api.doc("get_all_published_workflows") diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index d00be3a573..01ddb8a871 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -9,7 +9,6 @@ from sqlalchemy.orm import Session from werkzeug.exceptions import Forbidden, InternalServerError, NotFound import services -from configs import dify_config from controllers.console import api from controllers.console.app.error import ( ConversationCompletedError, @@ -609,18 +608,6 @@ class DefaultRagPipelineBlockConfigApi(Resource): return rag_pipeline_service.get_default_block_config(node_type=block_type, filters=filters) -class RagPipelineConfigApi(Resource): - """Resource for rag pipeline configuration.""" - - @setup_required - @login_required - @account_initialization_required - def get(self, pipeline_id): - return { - "parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT, - } - - class PublishedAllRagPipelineApi(Resource): @setup_required @login_required @@ -985,10 +972,6 @@ api.add_resource( DraftRagPipelineApi, "/rag/pipelines//workflows/draft", ) -api.add_resource( - RagPipelineConfigApi, - "/rag/pipelines//workflows/draft/config", -) api.add_resource( DraftRagPipelineRunApi, "/rag/pipelines//workflows/draft/run", diff --git a/api/tests/integration_tests/.env.example b/api/tests/integration_tests/.env.example index 92df93fb13..23a0ecf714 100644 --- a/api/tests/integration_tests/.env.example +++ b/api/tests/integration_tests/.env.example @@ -167,7 +167,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 WORKFLOW_MAX_EXECUTION_STEPS=500 WORKFLOW_MAX_EXECUTION_TIME=1200 WORKFLOW_CALL_MAX_DEPTH=5 -WORKFLOW_PARALLEL_DEPTH_LIMIT=3 MAX_VARIABLE_SIZE=204800 # App configuration diff --git a/api/tests/unit_tests/configs/test_dify_config.py b/api/tests/unit_tests/configs/test_dify_config.py index fbe14f1cb5..f4e3d97719 100644 --- a/api/tests/unit_tests/configs/test_dify_config.py +++ b/api/tests/unit_tests/configs/test_dify_config.py @@ -40,8 +40,6 @@ def test_dify_config(monkeypatch: pytest.MonkeyPatch): # annotated field with configured value assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30 - assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3 - # values from pyproject.toml assert Version(config.project.version) >= Version("1.0.0") diff --git a/docker/.env.example b/docker/.env.example index eebc18118f..c0f084796e 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -881,7 +881,6 @@ WORKFLOW_MAX_EXECUTION_STEPS=500 WORKFLOW_MAX_EXECUTION_TIME=1200 WORKFLOW_CALL_MAX_DEPTH=5 MAX_VARIABLE_SIZE=204800 -WORKFLOW_PARALLEL_DEPTH_LIMIT=3 WORKFLOW_FILE_UPLOAD_LIMIT=10 # GraphEngine Worker Pool Configuration diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index dd3d42c0f7..2617f84e7d 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -402,7 +402,6 @@ x-shared-env: &shared-api-worker-env WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200} WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5} MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800} - WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3} WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10} GRAPH_ENGINE_MIN_WORKERS: ${GRAPH_ENGINE_MIN_WORKERS:-1} GRAPH_ENGINE_MAX_WORKERS: ${GRAPH_ENGINE_MAX_WORKERS:-10} diff --git a/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts b/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts index 5f0daf29ce..38168d1e93 100644 --- a/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts +++ b/web/app/components/rag-pipeline/hooks/use-pipeline-config.ts @@ -14,16 +14,6 @@ export const usePipelineConfig = () => { const pipelineId = useStore(s => s.pipelineId) const workflowStore = useWorkflowStore() - const handleUpdateWorkflowConfig = useCallback((config: Record) => { - const { setWorkflowConfig } = workflowStore.getState() - - setWorkflowConfig(config) - }, [workflowStore]) - useWorkflowConfig( - pipelineId ? `/rag/pipelines/${pipelineId}/workflows/draft/config` : '', - handleUpdateWorkflowConfig, - ) - const handleUpdateNodesDefaultConfigs = useCallback((nodesDefaultConfigs: Record | Record[]) => { const { setNodesDefaultConfigs } = workflowStore.getState() let res: Record = {} diff --git a/web/app/components/workflow-app/hooks/use-workflow-init.ts b/web/app/components/workflow-app/hooks/use-workflow-init.ts index e0c341d087..fadd2007bc 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-init.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-init.ts @@ -33,13 +33,6 @@ export const useWorkflowInit = () => { workflowStore.setState({ appId: appDetail.id, appName: appDetail.name }) }, [appDetail.id, workflowStore]) - const handleUpdateWorkflowConfig = useCallback((config: Record) => { - const { setWorkflowConfig } = workflowStore.getState() - - setWorkflowConfig(config) - }, [workflowStore]) - useWorkflowConfig(`/apps/${appDetail.id}/workflows/draft/config`, handleUpdateWorkflowConfig) - const handleUpdateWorkflowFileUploadConfig = useCallback((config: FileUploadConfigResponse) => { const { setFileUploadConfig } = workflowStore.getState() setFileUploadConfig(config) diff --git a/web/app/components/workflow/constants.ts b/web/app/components/workflow/constants.ts index 875d2acf8f..a8c6a458fc 100644 --- a/web/app/components/workflow/constants.ts +++ b/web/app/components/workflow/constants.ts @@ -35,8 +35,6 @@ export const NODE_LAYOUT_HORIZONTAL_PADDING = 60 export const NODE_LAYOUT_VERTICAL_PADDING = 60 export const NODE_LAYOUT_MIN_DISTANCE = 100 -export const PARALLEL_DEPTH_LIMIT = 3 - export const RETRIEVAL_OUTPUT_STRUCT = `{ "content": "", "title": "", diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index 4000ce5c7b..c721442d86 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -70,7 +70,7 @@ export const useNodesInteractions = () => { const reactflow = useReactFlow() const { store: workflowHistoryStore } = useWorkflowHistoryStore() const { handleSyncWorkflowDraft } = useNodesSyncDraft() - const { checkNestedParallelLimit, getAfterNodesInSameBranch } = useWorkflow() + const { getAfterNodesInSameBranch } = useWorkflow() const { getNodesReadOnly } = useNodesReadOnly() const { getWorkflowReadOnly } = useWorkflowReadOnly() const { handleSetHelpline } = useHelpline() @@ -436,21 +436,13 @@ export const useNodesInteractions = () => { draft.push(newEdge) }) - if (checkNestedParallelLimit(newNodes, newEdges, targetNode)) { - setNodes(newNodes) - setEdges(newEdges) + setNodes(newNodes) + setEdges(newEdges) - handleSyncWorkflowDraft() - saveStateToHistory(WorkflowHistoryEvent.NodeConnect, { - nodeId: targetNode?.id, - }) - } - else { - const { setConnectingNodePayload, setEnteringNodePayload } - = workflowStore.getState() - setConnectingNodePayload(undefined) - setEnteringNodePayload(undefined) - } + handleSyncWorkflowDraft() + saveStateToHistory(WorkflowHistoryEvent.NodeConnect, { + nodeId: targetNode?.id, + }) }, [ getNodesReadOnly, @@ -458,7 +450,6 @@ export const useNodesInteractions = () => { workflowStore, handleSyncWorkflowDraft, saveStateToHistory, - checkNestedParallelLimit, ], ) @@ -934,13 +925,8 @@ export const useNodesInteractions = () => { if (newEdge) draft.push(newEdge) }) - if (checkNestedParallelLimit(newNodes, newEdges, prevNode)) { - setNodes(newNodes) - setEdges(newEdges) - } - else { - return false - } + setNodes(newNodes) + setEdges(newEdges) } if (!prevNodeId && nextNodeId) { const nextNodeIndex = nodes.findIndex(node => node.id === nextNodeId) @@ -1087,17 +1073,11 @@ export const useNodesInteractions = () => { draft.push(newEdge) }) - if (checkNestedParallelLimit(newNodes, newEdges, nextNode)) { - setNodes(newNodes) - setEdges(newEdges) - } - else { - return false - } + setNodes(newNodes) + setEdges(newEdges) } else { - if (checkNestedParallelLimit(newNodes, edges)) setNodes(newNodes) - else return false + setNodes(newNodes) } } if (prevNodeId && nextNodeId) { @@ -1297,7 +1277,6 @@ export const useNodesInteractions = () => { saveStateToHistory, workflowStore, getAfterNodesInSameBranch, - checkNestedParallelLimit, nodesMetaDataMap, ], ) diff --git a/web/app/components/workflow/hooks/use-workflow.ts b/web/app/components/workflow/hooks/use-workflow.ts index 1fc1eedffa..02a2f09d63 100644 --- a/web/app/components/workflow/hooks/use-workflow.ts +++ b/web/app/components/workflow/hooks/use-workflow.ts @@ -2,7 +2,6 @@ import { useCallback, } from 'react' import { uniqBy } from 'lodash-es' -import { useTranslation } from 'react-i18next' import { getIncomers, getOutgoers, @@ -24,9 +23,7 @@ import { useStore, useWorkflowStore, } from '../store' -import { getParallelInfo } from '../utils' import { - PARALLEL_DEPTH_LIMIT, SUPPORT_OUTPUT_VARS_NODE, } from '../constants' import type { IterationNodeType } from '../nodes/iteration/types' @@ -44,7 +41,6 @@ import { import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants' import { CUSTOM_LOOP_START_NODE } from '@/app/components/workflow/nodes/loop-start/constants' import { basePath } from '@/utils/var' -import { MAX_PARALLEL_LIMIT } from '@/config' import { useNodesMetaData } from '.' export const useIsChatMode = () => { @@ -54,9 +50,7 @@ export const useIsChatMode = () => { } export const useWorkflow = () => { - const { t } = useTranslation() const store = useStoreApi() - const workflowStore = useWorkflowStore() const { getAvailableBlocks } = useAvailableBlocks() const { nodesMap } = useNodesMetaData() @@ -290,20 +284,6 @@ export const useWorkflow = () => { return isUsed }, [isVarUsedInNodes]) - const checkParallelLimit = useCallback((nodeId: string, nodeHandle = 'source') => { - const { - edges, - } = store.getState() - const connectedEdges = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === nodeHandle) - if (connectedEdges.length > MAX_PARALLEL_LIMIT - 1) { - const { setShowTips } = workflowStore.getState() - setShowTips(t('workflow.common.parallelTip.limit', { num: MAX_PARALLEL_LIMIT })) - return false - } - - return true - }, [store, workflowStore, t]) - const getRootNodesById = useCallback((nodeId: string) => { const { getNodes, @@ -374,33 +354,6 @@ export const useWorkflow = () => { return startNodes }, [nodesMap, getRootNodesById]) - const checkNestedParallelLimit = useCallback((nodes: Node[], edges: Edge[], targetNode?: Node) => { - const startNodes = getStartNodes(nodes, targetNode) - - for (let i = 0; i < startNodes.length; i++) { - const { - parallelList, - hasAbnormalEdges, - } = getParallelInfo(startNodes[i], nodes, edges) - const { workflowConfig } = workflowStore.getState() - - if (hasAbnormalEdges) - return false - - for (let i = 0; i < parallelList.length; i++) { - const parallel = parallelList[i] - - if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) { - const { setShowTips } = workflowStore.getState() - setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) })) - return false - } - } - } - - return true - }, [t, workflowStore, getStartNodes]) - const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => { const { edges, @@ -410,9 +363,6 @@ export const useWorkflow = () => { const sourceNode: Node = nodes.find(node => node.id === source)! const targetNode: Node = nodes.find(node => node.id === target)! - if (!checkParallelLimit(source!, sourceHandle || 'source')) - return false - if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE) return false @@ -445,7 +395,7 @@ export const useWorkflow = () => { } return !hasCycle(targetNode) - }, [store, checkParallelLimit, getAvailableBlocks]) + }, [store, getAvailableBlocks]) return { getNodeById, @@ -457,8 +407,6 @@ export const useWorkflow = () => { isVarUsedInNodes, removeUsedVarInNodes, isNodeVarsUsedInNodes, - checkParallelLimit, - checkNestedParallelLimit, isValidConnection, getBeforeNodeById, getIterationNodeChildren, diff --git a/web/app/components/workflow/index.tsx b/web/app/components/workflow/index.tsx index 1c0c6d4545..75c4d51390 100644 --- a/web/app/components/workflow/index.tsx +++ b/web/app/components/workflow/index.tsx @@ -71,7 +71,6 @@ import PanelContextmenu from './panel-contextmenu' import NodeContextmenu from './node-contextmenu' import SelectionContextmenu from './selection-contextmenu' import SyncingDataModal from './syncing-data-modal' -import LimitTips from './limit-tips' import { setupScrollToNodeListener } from './utils/node-navigation' import { useStore, @@ -378,7 +377,6 @@ export const Workflow: FC = memo(({ /> ) } - {children} { - const showTips = useStore(s => s.showTips) - const setShowTips = useStore(s => s.setShowTips) - - if (!showTips) - return null - - return ( -
-
-
- -
-
- {showTips} -
- setShowTips('')} - > - - -
- ) -} - -export default LimitTips diff --git a/web/app/components/workflow/nodes/_base/components/next-step/add.tsx b/web/app/components/workflow/nodes/_base/components/next-step/add.tsx index 4add079fa2..601bc8ea75 100644 --- a/web/app/components/workflow/nodes/_base/components/next-step/add.tsx +++ b/web/app/components/workflow/nodes/_base/components/next-step/add.tsx @@ -12,7 +12,6 @@ import { useAvailableBlocks, useNodesInteractions, useNodesReadOnly, - useWorkflow, } from '@/app/components/workflow/hooks' import BlockSelector from '@/app/components/workflow/block-selector' import type { @@ -39,7 +38,6 @@ const Add = ({ const { handleNodeAdd } = useNodesInteractions() const { nodesReadOnly } = useNodesReadOnly() const { availableNextBlocks } = useAvailableBlocks(nodeData.type, nodeData.isInIteration || nodeData.isInLoop) - const { checkParallelLimit } = useWorkflow() const handleSelect = useCallback((type, toolDefaultValue) => { handleNodeAdd( @@ -52,14 +50,11 @@ const Add = ({ prevNodeSourceHandle: sourceHandle, }, ) - }, [nodeId, sourceHandle, handleNodeAdd]) + }, [handleNodeAdd]) const handleOpenChange = useCallback((newOpen: boolean) => { - if (newOpen && !checkParallelLimit(nodeId, sourceHandle)) - return - setOpen(newOpen) - }, [checkParallelLimit, nodeId, sourceHandle]) + }, []) const tip = useMemo(() => { if (isFailBranch) diff --git a/web/app/components/workflow/nodes/_base/components/node-handle.tsx b/web/app/components/workflow/nodes/_base/components/node-handle.tsx index 907c3b2c07..d1d79a0faa 100644 --- a/web/app/components/workflow/nodes/_base/components/node-handle.tsx +++ b/web/app/components/workflow/nodes/_base/components/node-handle.tsx @@ -22,7 +22,6 @@ import { useIsChatMode, useNodesInteractions, useNodesReadOnly, - useWorkflow, } from '../../../hooks' import { useStore, @@ -132,7 +131,6 @@ export const NodeSourceHandle = memo(({ const { availableNextBlocks } = useAvailableBlocks(data.type, data.isInIteration || data.isInLoop) const isConnectable = !!availableNextBlocks.length const isChatMode = useIsChatMode() - const { checkParallelLimit } = useWorkflow() const connected = data._connectedSourceHandleIds?.includes(handleId) const handleOpenChange = useCallback((v: boolean) => { @@ -140,9 +138,8 @@ export const NodeSourceHandle = memo(({ }, []) const handleHandleClick = useCallback((e: MouseEvent) => { e.stopPropagation() - if (checkParallelLimit(id, handleId)) - setOpen(v => !v) - }, [checkParallelLimit, id, handleId]) + setOpen(v => !v) + }, []) const handleSelect = useCallback((type: BlockEnum, toolDefaultValue?: ToolDefaultValue) => { handleNodeAdd( { diff --git a/web/app/components/workflow/store/workflow/workflow-slice.ts b/web/app/components/workflow/store/workflow/workflow-slice.ts index 02a4db4c17..91dac42adb 100644 --- a/web/app/components/workflow/store/workflow/workflow-slice.ts +++ b/web/app/components/workflow/store/workflow/workflow-slice.ts @@ -29,10 +29,6 @@ export type WorkflowSliceShape = { setControlPromptEditorRerenderKey: (controlPromptEditorRerenderKey: number) => void showImportDSLModal: boolean setShowImportDSLModal: (showImportDSLModal: boolean) => void - showTips: string - setShowTips: (showTips: string) => void - workflowConfig?: Record - setWorkflowConfig: (workflowConfig: Record) => void fileUploadConfig?: FileUploadConfigResponse setFileUploadConfig: (fileUploadConfig: FileUploadConfigResponse) => void } @@ -59,10 +55,6 @@ export const createWorkflowSlice: StateCreator = set => ({ setControlPromptEditorRerenderKey: controlPromptEditorRerenderKey => set(() => ({ controlPromptEditorRerenderKey })), showImportDSLModal: false, setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })), - showTips: '', - setShowTips: showTips => set(() => ({ showTips })), - workflowConfig: undefined, - setWorkflowConfig: workflowConfig => set(() => ({ workflowConfig })), fileUploadConfig: undefined, setFileUploadConfig: fileUploadConfig => set(() => ({ fileUploadConfig })), }) diff --git a/web/app/components/workflow/utils/workflow.ts b/web/app/components/workflow/utils/workflow.ts index fd0c30e5cf..48cb819086 100644 --- a/web/app/components/workflow/utils/workflow.ts +++ b/web/app/components/workflow/utils/workflow.ts @@ -1,12 +1,8 @@ import { - getConnectedEdges, - getIncomers, getOutgoers, } from 'reactflow' import { v4 as uuid4 } from 'uuid' import { - groupBy, - isEqual, uniqBy, } from 'lodash-es' import type { @@ -168,158 +164,6 @@ export const changeNodesAndEdgesId = (nodes: Node[], edges: Edge[]) => { return [newNodes, newEdges] as [Node[], Edge[]] } -type ParallelInfoItem = { - parallelNodeId: string - depth: number - isBranch?: boolean -} -type NodeParallelInfo = { - parallelNodeId: string - edgeHandleId: string - depth: number -} -type NodeHandle = { - node: Node - handle: string -} -type NodeStreamInfo = { - upstreamNodes: Set - downstreamEdges: Set -} -export const getParallelInfo = (startNode: Node, nodes: Node[], edges: Edge[]) => { - if (!startNode) - throw new Error('Start node not found') - - const parallelList = [] as ParallelInfoItem[] - const nextNodeHandles = [{ node: startNode, handle: 'source' }] - let hasAbnormalEdges = false - - const traverse = (firstNodeHandle: NodeHandle) => { - const nodeEdgesSet = {} as Record> - const totalEdgesSet = new Set() - const nextHandles = [firstNodeHandle] - const streamInfo = {} as Record - const parallelListItem = { - parallelNodeId: '', - depth: 0, - } as ParallelInfoItem - const nodeParallelInfoMap = {} as Record - nodeParallelInfoMap[firstNodeHandle.node.id] = { - parallelNodeId: '', - edgeHandleId: '', - depth: 0, - } - - while (nextHandles.length) { - const currentNodeHandle = nextHandles.shift()! - const { node: currentNode, handle: currentHandle = 'source' } = currentNodeHandle - const currentNodeHandleKey = currentNode.id - const connectedEdges = edges.filter(edge => edge.source === currentNode.id && edge.sourceHandle === currentHandle) - const connectedEdgesLength = connectedEdges.length - const outgoers = nodes.filter(node => connectedEdges.some(edge => edge.target === node.id)) - const incomers = getIncomers(currentNode, nodes, edges) - - if (!streamInfo[currentNodeHandleKey]) { - streamInfo[currentNodeHandleKey] = { - upstreamNodes: new Set(), - downstreamEdges: new Set(), - } - } - - if (nodeEdgesSet[currentNodeHandleKey]?.size > 0 && incomers.length > 1) { - const newSet = new Set() - for (const item of totalEdgesSet) { - if (!streamInfo[currentNodeHandleKey].downstreamEdges.has(item)) - newSet.add(item) - } - if (isEqual(nodeEdgesSet[currentNodeHandleKey], newSet)) { - parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth - nextNodeHandles.push({ node: currentNode, handle: currentHandle }) - break - } - } - - if (nodeParallelInfoMap[currentNode.id].depth > parallelListItem.depth) - parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth - - outgoers.forEach((outgoer) => { - const outgoerConnectedEdges = getConnectedEdges([outgoer], edges).filter(edge => edge.source === outgoer.id) - const sourceEdgesGroup = groupBy(outgoerConnectedEdges, 'sourceHandle') - const incomers = getIncomers(outgoer, nodes, edges) - - if (outgoers.length > 1 && incomers.length > 1) - hasAbnormalEdges = true - - Object.keys(sourceEdgesGroup).forEach((sourceHandle) => { - nextHandles.push({ node: outgoer, handle: sourceHandle }) - }) - if (!outgoerConnectedEdges.length) - nextHandles.push({ node: outgoer, handle: 'source' }) - - const outgoerKey = outgoer.id - if (!nodeEdgesSet[outgoerKey]) - nodeEdgesSet[outgoerKey] = new Set() - - if (nodeEdgesSet[currentNodeHandleKey]) { - for (const item of nodeEdgesSet[currentNodeHandleKey]) - nodeEdgesSet[outgoerKey].add(item) - } - - if (!streamInfo[outgoerKey]) { - streamInfo[outgoerKey] = { - upstreamNodes: new Set(), - downstreamEdges: new Set(), - } - } - - if (!nodeParallelInfoMap[outgoer.id]) { - nodeParallelInfoMap[outgoer.id] = { - ...nodeParallelInfoMap[currentNode.id], - } - } - - if (connectedEdgesLength > 1) { - const edge = connectedEdges.find(edge => edge.target === outgoer.id)! - nodeEdgesSet[outgoerKey].add(edge.id) - totalEdgesSet.add(edge.id) - - streamInfo[currentNodeHandleKey].downstreamEdges.add(edge.id) - streamInfo[outgoerKey].upstreamNodes.add(currentNodeHandleKey) - - for (const item of streamInfo[currentNodeHandleKey].upstreamNodes) - streamInfo[item].downstreamEdges.add(edge.id) - - if (!parallelListItem.parallelNodeId) - parallelListItem.parallelNodeId = currentNode.id - - const prevDepth = nodeParallelInfoMap[currentNode.id].depth + 1 - const currentDepth = nodeParallelInfoMap[outgoer.id].depth - - nodeParallelInfoMap[outgoer.id].depth = Math.max(prevDepth, currentDepth) - } - else { - for (const item of streamInfo[currentNodeHandleKey].upstreamNodes) - streamInfo[outgoerKey].upstreamNodes.add(item) - - nodeParallelInfoMap[outgoer.id].depth = nodeParallelInfoMap[currentNode.id].depth - } - }) - } - - parallelList.push(parallelListItem) - } - - while (nextNodeHandles.length) { - const nodeHandle = nextNodeHandles.shift()! - traverse(nodeHandle) - } - - return { - parallelList, - hasAbnormalEdges, - } -} - export const hasErrorHandleNode = (nodeType?: BlockEnum) => { return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code }