diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 875cee17e..ee2164f22 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -639,26 +639,19 @@ class GraphEngine: retry_start_at = datetime.now(UTC).replace(tzinfo=None) # yield control to other threads time.sleep(0.001) - generator = node_instance.run() - for item in generator: - if isinstance(item, GraphEngineEvent): - if isinstance(item, BaseIterationEvent): - # add parallel info to iteration event - item.parallel_id = parallel_id - item.parallel_start_node_id = parallel_start_node_id - item.parent_parallel_id = parent_parallel_id - item.parent_parallel_start_node_id = parent_parallel_start_node_id - elif isinstance(item, BaseLoopEvent): - # add parallel info to loop event - item.parallel_id = parallel_id - item.parallel_start_node_id = parallel_start_node_id - item.parent_parallel_id = parent_parallel_id - item.parent_parallel_start_node_id = parent_parallel_start_node_id - - yield item + event_stream = node_instance.run() + for event in event_stream: + if isinstance(event, GraphEngineEvent): + # add parallel info to iteration event + if isinstance(event, BaseIterationEvent | BaseLoopEvent): + event.parallel_id = parallel_id + event.parallel_start_node_id = parallel_start_node_id + event.parent_parallel_id = parent_parallel_id + event.parent_parallel_start_node_id = parent_parallel_start_node_id + yield event else: - if isinstance(item, RunCompletedEvent): - run_result = item.run_result + if isinstance(event, RunCompletedEvent): + run_result = event.run_result if run_result.status == WorkflowNodeExecutionStatus.FAILED: if ( retries == max_retries @@ -694,7 +687,7 @@ class GraphEngine: # if run failed, handle error run_result = self._handle_continue_on_error( node_instance, - item.run_result, + event.run_result, self.graph_runtime_state.variable_pool, handle_exceptions=handle_exceptions, ) @@ -797,28 +790,28 @@ class GraphEngine: should_continue_retry = False break - elif isinstance(item, RunStreamChunkEvent): + elif isinstance(event, RunStreamChunkEvent): yield NodeRunStreamChunkEvent( id=node_instance.id, node_id=node_instance.node_id, node_type=node_instance.node_type, node_data=node_instance.node_data, - chunk_content=item.chunk_content, - from_variable_selector=item.from_variable_selector, + chunk_content=event.chunk_content, + from_variable_selector=event.from_variable_selector, route_node_state=route_node_state, parallel_id=parallel_id, parallel_start_node_id=parallel_start_node_id, parent_parallel_id=parent_parallel_id, parent_parallel_start_node_id=parent_parallel_start_node_id, ) - elif isinstance(item, RunRetrieverResourceEvent): + elif isinstance(event, RunRetrieverResourceEvent): yield NodeRunRetrieverResourceEvent( id=node_instance.id, node_id=node_instance.node_id, node_type=node_instance.node_type, node_data=node_instance.node_data, - retriever_resources=item.retriever_resources, - context=item.context, + retriever_resources=event.retriever_resources, + context=event.context, route_node_state=route_node_state, parallel_id=parallel_id, parallel_start_node_id=parallel_start_node_id,