diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index a05a52355c..361e2012d4 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -1,5 +1,6 @@ import logging import queue +import threading import time from collections.abc import Generator from concurrent.futures import ThreadPoolExecutor @@ -37,7 +38,6 @@ from core.workflow.nodes.node_mapping import node_classes from extensions.ext_database import db from models.workflow import WorkflowNodeExecutionStatus, WorkflowType -thread_pool = ThreadPoolExecutor(max_workers=500, thread_name_prefix="ThreadGraphParallelRun") logger = logging.getLogger(__name__) @@ -217,15 +217,15 @@ class GraphEngine: q: queue.Queue = queue.Queue() # new thread - futures = [] for edge in edge_mappings: - futures.append(thread_pool.submit( - self._run_parallel_node, - flask_app=current_app._get_current_object(), # type: ignore - parallel_id=parallel_id, - parallel_start_node_id=edge.target_node_id, - q=q - )) + run_thread = threading.Thread(target=self._run_parallel_node, kwargs={ + 'flask_app': current_app._get_current_object(), + 'parallel_id': parallel_id, + 'parallel_start_node_id': edge.target_node_id, + 'q': q + }) + + run_thread.start() succeeded_count = 0 while True: @@ -247,10 +247,6 @@ class GraphEngine: except queue.Empty: continue - # not necessary - # for future in as_completed(futures): - # future.result() - # get final node id final_node_id = parallel.end_to_node_id if not final_node_id: