feat: workflow continue on error (#11474)

This commit is contained in:
zxhlyh
2024-12-11 14:21:38 +08:00
committed by GitHub
parent 86dfdcb8ec
commit bec5451f12
60 changed files with 1481 additions and 282 deletions

View File

@@ -63,25 +63,29 @@ export const useEdgesInteractions = () => {
edges,
setEdges,
} = store.getState()
const currentEdgeIndex = edges.findIndex(edge => edge.source === nodeId && edge.sourceHandle === branchId)
const edgeWillBeDeleted = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === branchId)
if (currentEdgeIndex < 0)
if (!edgeWillBeDeleted.length)
return
const currentEdge = edges[currentEdgeIndex]
const newNodes = produce(getNodes(), (draft: Node[]) => {
const sourceNode = draft.find(node => node.id === currentEdge.source)
const targetNode = draft.find(node => node.id === currentEdge.target)
if (sourceNode)
sourceNode.data._connectedSourceHandleIds = sourceNode.data._connectedSourceHandleIds?.filter(handleId => handleId !== currentEdge.sourceHandle)
if (targetNode)
targetNode.data._connectedTargetHandleIds = targetNode.data._connectedTargetHandleIds?.filter(handleId => handleId !== currentEdge.targetHandle)
const nodes = getNodes()
const nodesConnectedSourceOrTargetHandleIdsMap = getNodesConnectedSourceOrTargetHandleIdsMap(
edgeWillBeDeleted.map(edge => ({ type: 'remove', edge })),
nodes,
)
const newNodes = produce(nodes, (draft: Node[]) => {
draft.forEach((node) => {
if (nodesConnectedSourceOrTargetHandleIdsMap[node.id]) {
node.data = {
...node.data,
...nodesConnectedSourceOrTargetHandleIdsMap[node.id],
}
}
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.splice(currentEdgeIndex, 1)
return draft.filter(edge => !edgeWillBeDeleted.find(e => e.id === edge.id))
})
setEdges(newEdges)
handleSyncWorkflowDraft()
@@ -155,7 +159,9 @@ export const useEdgesInteractions = () => {
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data._run = false
edge.data._sourceRunningStatus = undefined
edge.data._targetRunningStatus = undefined
edge.data._waitingRun = false
})
})
setEdges(newEdges)

View File

@@ -1033,6 +1033,7 @@ export const useNodesInteractions = () => {
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._runningStatus = undefined
node.data._waitingRun = false
})
})
setNodes(newNodes)

View File

@@ -1,6 +1,5 @@
import { useCallback } from 'react'
import {
getIncomers,
useReactFlow,
useStoreApi,
} from 'reactflow'
@@ -9,8 +8,8 @@ import { v4 as uuidV4 } from 'uuid'
import { usePathname } from 'next/navigation'
import { useWorkflowStore } from '../store'
import { useNodesSyncDraft } from '../hooks'
import type { Node } from '../types'
import {
BlockEnum,
NodeRunningStatus,
WorkflowRunningStatus,
} from '../types'
@@ -28,6 +27,7 @@ import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player
import {
getFilesInLogs,
} from '@/app/components/base/file-uploader/utils'
import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'
export const useWorkflowRun = () => {
const store = useStoreApi()
@@ -174,6 +174,8 @@ export const useWorkflowRun = () => {
setIterParallelLogMap,
} = workflowStore.getState()
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
@@ -186,12 +188,20 @@ export const useWorkflowRun = () => {
status: WorkflowRunningStatus.Running,
}
}))
const nodes = getNodes()
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._waitingRun = true
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data = {
...edge.data,
_run: false,
_sourceRunningStatus: undefined,
_targetRunningStatus: undefined,
_waitingRun: true,
}
})
})
@@ -311,13 +321,27 @@ export const useWorkflowRun = () => {
}
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
if (edge.target === data.node_id && incomeNodesId.includes(edge.source))
edge.data = { ...edge.data, _run: true } as any
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})
incomeEdges.forEach((edge) => {
const incomeNode = nodes.find(node => node.id === edge.source)!
if (
(!incomeNode.data._runningBranchId && edge.sourceHandle === 'source')
|| (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId)
) {
edge.data = {
...edge.data,
_sourceRunningStatus: incomeNode.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
}
})
})
setEdges(newEdges)
@@ -336,6 +360,8 @@ export const useWorkflowRun = () => {
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
const nodes = getNodes()
const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
@@ -423,8 +449,31 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!
currentNode.data._runningStatus = data.status as any
if (data.status === NodeRunningStatus.Exception) {
if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch)
currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch
}
else {
if (data.node_type === BlockEnum.IfElse)
currentNode.data._runningBranchId = data?.outputs?.selected_case_id
if (data.node_type === BlockEnum.QuestionClassifier)
currentNode.data._runningBranchId = data?.outputs?.class_id
}
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_targetRunningStatus: data.status as any,
}
})
})
setEdges(newEdges)
prevNodeId = data.node_id
}
@@ -474,13 +523,20 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
const incomeEdges = draft.filter(edge => edge.target === data.node_id)
if (edge)
edge.data = { ...edge.data, _run: true } as any
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
})
})
setEdges(newEdges)