feat: Persist Variables for Enhanced Debugging Workflow (#20699)

This pull request introduces a feature aimed at improving the debugging experience during workflow editing. With the addition of variable persistence, the system will automatically retain the output variables from previously executed nodes. These persisted variables can then be reused when debugging subsequent nodes, eliminating the need for repetitive manual input.

By streamlining this aspect of the workflow, the feature minimizes user errors and significantly reduces debugging effort, offering a smoother and more efficient experience.

Key highlights of this change:

- Automatic persistence of output variables for executed nodes.
- Reuse of persisted variables to simplify input steps for nodes requiring them (e.g., `code`, `template`, `variable_assigner`).
- Enhanced debugging experience with reduced friction.

Closes #19735.
This commit is contained in:
QuantumGhost
2025-06-24 09:05:29 +08:00
committed by GitHub
parent 3113350e51
commit 10b738a296
106 changed files with 6025 additions and 718 deletions

View File

@@ -1,6 +1,7 @@
import json
import time
from collections.abc import Callable, Generator, Sequence
import uuid
from collections.abc import Callable, Generator, Mapping, Sequence
from datetime import UTC, datetime
from typing import Any, Optional
from uuid import uuid4
@@ -8,12 +9,17 @@ from uuid import uuid4
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.app_config.entities import VariableEntityType
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph_engine.entities.event import InNodeEvent
from core.workflow.nodes import NodeType
@@ -22,9 +28,11 @@ from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.types import NodeEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.start.entities import StartNodeData
from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
from extensions.ext_database import db
from factories.file_factory import build_from_mapping, build_from_mappings
from models.account import Account
from models.model import App, AppMode
from models.tools import WorkflowToolProvider
@@ -34,10 +42,15 @@ from models.workflow import (
WorkflowNodeExecutionTriggeredFrom,
WorkflowType,
)
from services.errors.app import WorkflowHashNotEqualError
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
from .workflow_draft_variable_service import (
DraftVariableSaver,
DraftVarLoader,
WorkflowDraftVariableService,
)
class WorkflowService:
@@ -45,6 +58,33 @@ class WorkflowService:
Workflow Service
"""
def get_node_last_run(self, app_model: App, workflow: Workflow, node_id: str) -> WorkflowNodeExecutionModel | None:
# TODO(QuantumGhost): This query is not fully covered by index.
criteria = (
WorkflowNodeExecutionModel.tenant_id == app_model.tenant_id,
WorkflowNodeExecutionModel.app_id == app_model.id,
WorkflowNodeExecutionModel.workflow_id == workflow.id,
WorkflowNodeExecutionModel.node_id == node_id,
)
node_exec = (
db.session.query(WorkflowNodeExecutionModel)
.filter(*criteria)
.order_by(WorkflowNodeExecutionModel.created_at.desc())
.first()
)
return node_exec
def is_workflow_exist(self, app_model: App) -> bool:
return (
db.session.query(Workflow)
.filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.version == Workflow.VERSION_DRAFT,
)
.count()
) > 0
def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
"""
Get draft workflow
@@ -61,6 +101,23 @@ class WorkflowService:
# return draft workflow
return workflow
def get_published_workflow_by_id(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
# fetch published workflow by workflow_id
workflow = (
db.session.query(Workflow)
.filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == workflow_id,
)
.first()
)
if not workflow:
return None
if workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError(f"Workflow is draft version, id={workflow_id}")
return workflow
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
"""
Get published workflow
@@ -199,7 +256,7 @@ class WorkflowService:
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=draft_workflow.type,
version=str(datetime.now(UTC).replace(tzinfo=None)),
version=Workflow.version_from_datetime(datetime.now(UTC).replace(tzinfo=None)),
graph=draft_workflow.graph,
features=draft_workflow.features,
created_by=account.id,
@@ -253,26 +310,85 @@ class WorkflowService:
return default_config
def run_draft_workflow_node(
self, app_model: App, node_id: str, user_inputs: dict, account: Account
self,
app_model: App,
draft_workflow: Workflow,
node_id: str,
user_inputs: Mapping[str, Any],
account: Account,
query: str = "",
files: Sequence[File] | None = None,
) -> WorkflowNodeExecutionModel:
"""
Run draft workflow node
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
files = files or []
with Session(bind=db.engine, expire_on_commit=False) as session, session.begin():
draft_var_srv = WorkflowDraftVariableService(session)
draft_var_srv.prefill_conversation_variable_default_values(draft_workflow)
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
node_data = node_config.get("data", {})
if node_type == NodeType.START:
with Session(bind=db.engine) as session, session.begin():
draft_var_srv = WorkflowDraftVariableService(session)
conversation_id = draft_var_srv.get_or_create_conversation(
account_id=account.id,
app=app_model,
workflow=draft_workflow,
)
start_data = StartNodeData.model_validate(node_data)
user_inputs = _rebuild_file_for_user_inputs_in_start_node(
tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs
)
# init variable pool
variable_pool = _setup_variable_pool(
query=query,
files=files or [],
user_id=account.id,
user_inputs=user_inputs,
workflow=draft_workflow,
# NOTE(QuantumGhost): We rely on `DraftVarLoader` to load conversation variables.
conversation_variables=[],
node_type=node_type,
conversation_id=conversation_id,
)
else:
variable_pool = VariablePool(
system_variables={},
user_inputs=user_inputs,
environment_variables=draft_workflow.environment_variables,
conversation_variables=[],
)
variable_loader = DraftVarLoader(
engine=db.engine,
app_id=app_model.id,
tenant_id=app_model.tenant_id,
)
eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config)
if eclosing_node_type_and_id:
_, enclosing_node_id = eclosing_node_type_and_id
else:
enclosing_node_id = None
run = WorkflowEntry.single_step_run(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
variable_pool=variable_pool,
variable_loader=variable_loader,
)
# run draft workflow node
start_at = time.perf_counter()
node_execution = self._handle_node_run_result(
invoke_node_fn=lambda: WorkflowEntry.single_step_run(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
),
invoke_node_fn=lambda: run,
start_at=start_at,
node_id=node_id,
)
@@ -292,6 +408,18 @@ class WorkflowService:
# Convert node_execution to WorkflowNodeExecution after save
workflow_node_execution = repository.to_db_model(node_execution)
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
app_id=app_model.id,
node_id=workflow_node_execution.node_id,
node_type=NodeType(workflow_node_execution.node_type),
invoke_from=InvokeFrom.DEBUGGER,
enclosing_node_id=enclosing_node_id,
node_execution_id=node_execution.id,
)
draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs)
session.commit()
return workflow_node_execution
def run_free_workflow_node(
@@ -332,7 +460,7 @@ class WorkflowService:
node_run_result = event.run_result
# sign output files
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
# node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
break
if not node_run_result:
@@ -394,7 +522,7 @@ class WorkflowService:
if node_run_result.process_data
else None
)
outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
outputs = node_run_result.outputs
node_execution.inputs = inputs
node_execution.process_data = process_data
@@ -531,3 +659,83 @@ class WorkflowService:
session.delete(workflow)
return True
def _setup_variable_pool(
query: str,
files: Sequence[File],
user_id: str,
user_inputs: Mapping[str, Any],
workflow: Workflow,
node_type: NodeType,
conversation_id: str,
conversation_variables: list[Variable],
):
# Only inject system variables for START node type.
if node_type == NodeType.START:
# Create a variable pool.
system_inputs: dict[SystemVariableKey, Any] = {
# From inputs:
SystemVariableKey.FILES: files,
SystemVariableKey.USER_ID: user_id,
# From workflow model
SystemVariableKey.APP_ID: workflow.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
# Randomly generated.
SystemVariableKey.WORKFLOW_EXECUTION_ID: str(uuid.uuid4()),
}
# Only add chatflow-specific variables for non-workflow types
if workflow.type != WorkflowType.WORKFLOW.value:
system_inputs.update(
{
SystemVariableKey.QUERY: query,
SystemVariableKey.CONVERSATION_ID: conversation_id,
SystemVariableKey.DIALOGUE_COUNT: 0,
}
)
else:
system_inputs = {}
# init variable pool
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=user_inputs,
environment_variables=workflow.environment_variables,
conversation_variables=conversation_variables,
)
return variable_pool
def _rebuild_file_for_user_inputs_in_start_node(
tenant_id: str, start_node_data: StartNodeData, user_inputs: Mapping[str, Any]
) -> Mapping[str, Any]:
inputs_copy = dict(user_inputs)
for variable in start_node_data.variables:
if variable.type not in (VariableEntityType.FILE, VariableEntityType.FILE_LIST):
continue
if variable.variable not in user_inputs:
continue
value = user_inputs[variable.variable]
file = _rebuild_single_file(tenant_id=tenant_id, value=value, variable_entity_type=variable.type)
inputs_copy[variable.variable] = file
return inputs_copy
def _rebuild_single_file(tenant_id: str, value: Any, variable_entity_type: VariableEntityType) -> File | Sequence[File]:
if variable_entity_type == VariableEntityType.FILE:
if not isinstance(value, dict):
raise ValueError(f"expected dict for file object, got {type(value)}")
return build_from_mapping(mapping=value, tenant_id=tenant_id)
elif variable_entity_type == VariableEntityType.FILE_LIST:
if not isinstance(value, list):
raise ValueError(f"expected list for file list object, got {type(value)}")
if len(value) == 0:
return []
if not isinstance(value[0], dict):
raise ValueError(f"expected dict for first element in the file list, got {type(value)}")
return build_from_mappings(mappings=value, tenant_id=tenant_id)
else:
raise Exception("unreachable")