fix wrongly remove reset nodes (#20880)
Co-authored-by: zhuqingchao <zhuqingchao@xiaomi.com>
This commit is contained in:
@@ -57,7 +57,6 @@ class StreamProcessor(ABC):
|
|||||||
|
|
||||||
# The branch_identify parameter is added to ensure that
|
# The branch_identify parameter is added to ensure that
|
||||||
# only nodes in the correct logical branch are included.
|
# only nodes in the correct logical branch are included.
|
||||||
reachable_node_ids.append(edge.target_node_id)
|
|
||||||
ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id, run_result.edge_source_handle)
|
ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id, run_result.edge_source_handle)
|
||||||
reachable_node_ids.extend(ids)
|
reachable_node_ids.extend(ids)
|
||||||
else:
|
else:
|
||||||
@@ -74,6 +73,8 @@ class StreamProcessor(ABC):
|
|||||||
self._remove_node_ids_in_unreachable_branch(node_id, reachable_node_ids)
|
self._remove_node_ids_in_unreachable_branch(node_id, reachable_node_ids)
|
||||||
|
|
||||||
def _fetch_node_ids_in_reachable_branch(self, node_id: str, branch_identify: Optional[str] = None) -> list[str]:
|
def _fetch_node_ids_in_reachable_branch(self, node_id: str, branch_identify: Optional[str] = None) -> list[str]:
|
||||||
|
if node_id not in self.rest_node_ids:
|
||||||
|
self.rest_node_ids.append(node_id)
|
||||||
node_ids = []
|
node_ids = []
|
||||||
for edge in self.graph.edge_mapping.get(node_id, []):
|
for edge in self.graph.edge_mapping.get(node_id, []):
|
||||||
if edge.target_node_id == self.graph.root_node_id:
|
if edge.target_node_id == self.graph.root_node_id:
|
||||||
|
Reference in New Issue
Block a user