Fix/workflow retry (#11999)

This commit is contained in:
zxhlyh
2024-12-23 15:55:50 +08:00
committed by GitHub
parent e068bbec73
commit ef95b1268e
5 changed files with 130 additions and 29 deletions

View File

@@ -387,6 +387,9 @@ export const useWorkflowRun = () => {
if (nodeIndex !== -1) {
currIteration[nodeIndex] = {
...currIteration[nodeIndex],
...(currIteration[nodeIndex].retryDetail
? { retryDetail: currIteration[nodeIndex].retryDetail }
: {}),
...data,
} as any
}
@@ -626,6 +629,8 @@ export const useWorkflowRun = () => {
const {
workflowRunningData,
setWorkflowRunningData,
iterParallelLogMap,
setIterParallelLogMap,
} = workflowStore.getState()
const {
getNodes,
@@ -633,19 +638,65 @@ export const useWorkflowRun = () => {
} = store.getState()
const nodes = getNodes()
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const tracing = draft.tracing!
const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id)
const currentNode = nodes.find(node => node.id === data.node_id)!
const nodeParent = nodes.find(node => node.id === currentNode.parentId)
if (nodeParent) {
if (!data.execution_metadata.parallel_mode_run_id) {
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const tracing = draft.tracing!
const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
if (currentRetryNodeIndex > -1) {
const currentRetryNode = tracing[currentRetryNodeIndex]
if (currentRetryNode.retryDetail)
draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing)
if (iteration && iteration.details?.length) {
const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id)
else
draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing]
if (currentNodeRetry) {
if (currentNodeRetry?.retryDetail)
currentNodeRetry?.retryDetail.push(data as NodeTracing)
else
currentNodeRetry.retryDetail = [data as NodeTracing]
}
}
}))
}
}))
else {
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const tracing = draft.tracing!
const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
if (iteration && iteration.details?.length) {
const iterRunID = data.execution_metadata?.parallel_mode_run_id
const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID)
const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id)
if (currentNodeRetry) {
if (currentNodeRetry?.retryDetail)
currentNodeRetry?.retryDetail.push(data as NodeTracing)
else
currentNodeRetry.retryDetail = [data as NodeTracing]
}
setIterParallelLogMap(iterParallelLogMap)
const iterLogMap = iterParallelLogMap.get(iteration.node_id)
if (iterLogMap)
iteration.details = Array.from(iterLogMap.values())
}
}))
}
}
else {
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const tracing = draft.tracing!
const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id)
if (currentRetryNodeIndex > -1) {
const currentRetryNode = tracing[currentRetryNodeIndex]
if (currentRetryNode.retryDetail)
draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing)
else
draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing]
}
}))
}
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!