Export DSL from history (#24939)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
GuanMu
2025-09-02 21:36:52 +08:00
committed by GitHub
parent 8fcc864fb7
commit 25a11bfafc
9 changed files with 119 additions and 13 deletions

View File

@@ -237,9 +237,14 @@ class AppExportApi(Resource):
# Add include_secret params
parser = reqparse.RequestParser()
parser.add_argument("include_secret", type=inputs.boolean, default=False, location="args")
parser.add_argument("workflow_id", type=str, location="args")
args = parser.parse_args()
return {"data": AppDslService.export_dsl(app_model=app_model, include_secret=args["include_secret"])}
return {
"data": AppDslService.export_dsl(
app_model=app_model, include_secret=args["include_secret"], workflow_id=args.get("workflow_id")
)
}
class AppNameApi(Resource):

View File

@@ -532,7 +532,7 @@ class AppDslService:
return app
@classmethod
def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:
def export_dsl(cls, app_model: App, include_secret: bool = False, workflow_id: Optional[str] = None) -> str:
"""
Export app
:param app_model: App instance
@@ -556,7 +556,7 @@ class AppDslService:
if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
cls._append_workflow_export_data(
export_data=export_data, app_model=app_model, include_secret=include_secret
export_data=export_data, app_model=app_model, include_secret=include_secret, workflow_id=workflow_id
)
else:
cls._append_model_config_export_data(export_data, app_model)
@@ -564,14 +564,16 @@ class AppDslService:
return yaml.dump(export_data, allow_unicode=True) # type: ignore
@classmethod
def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
def _append_workflow_export_data(
cls, *, export_data: dict, app_model: App, include_secret: bool, workflow_id: Optional[str] = None
) -> None:
"""
Append workflow export data
:param export_data: export data
:param app_model: App instance
"""
workflow_service = WorkflowService()
workflow = workflow_service.get_draft_workflow(app_model)
workflow = workflow_service.get_draft_workflow(app_model, workflow_id)
if not workflow:
raise ValueError("Missing draft workflow configuration, please check.")

View File

@@ -96,10 +96,12 @@ class WorkflowService:
)
return db.session.execute(stmt).scalar_one()
def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
def get_draft_workflow(self, app_model: App, workflow_id: Optional[str] = None) -> Optional[Workflow]:
"""
Get draft workflow
"""
if workflow_id:
return self.get_published_workflow_by_id(app_model, workflow_id)
# fetch draft workflow by app_model
workflow = (
db.session.query(Workflow)
@@ -115,7 +117,9 @@ class WorkflowService:
return workflow
def get_published_workflow_by_id(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
# fetch published workflow by workflow_id
"""
fetch published workflow by workflow_id
"""
workflow = (
db.session.query(Workflow)
.where(

View File

@@ -322,7 +322,87 @@ class TestAppDslService:
# Verify workflow service was called
mock_external_service_dependencies["workflow_service"].return_value.get_draft_workflow.assert_called_once_with(
app
app, None
)
def test_export_dsl_with_workflow_id_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful DSL export with specific workflow ID.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Update app to workflow mode
app.mode = "workflow"
db_session_with_containers.commit()
# Mock workflow service to return a workflow when specific workflow_id is provided
mock_workflow = MagicMock()
mock_workflow.to_dict.return_value = {
"graph": {"nodes": [{"id": "start", "type": "start", "data": {"type": "start"}}], "edges": []},
"features": {},
"environment_variables": [],
"conversation_variables": [],
}
# Mock the get_draft_workflow method to return different workflows based on workflow_id
def mock_get_draft_workflow(app_model, workflow_id=None):
if workflow_id == "specific-workflow-id":
return mock_workflow
return None
mock_external_service_dependencies[
"workflow_service"
].return_value.get_draft_workflow.side_effect = mock_get_draft_workflow
# Export DSL with specific workflow ID
exported_dsl = AppDslService.export_dsl(app, include_secret=False, workflow_id="specific-workflow-id")
# Parse exported YAML
exported_data = yaml.safe_load(exported_dsl)
# Verify exported data structure
assert exported_data["kind"] == "app"
assert exported_data["app"]["name"] == app.name
assert exported_data["app"]["mode"] == "workflow"
# Verify workflow was exported
assert "workflow" in exported_data
assert "graph" in exported_data["workflow"]
assert "nodes" in exported_data["workflow"]["graph"]
# Verify dependencies were exported
assert "dependencies" in exported_data
assert isinstance(exported_data["dependencies"], list)
# Verify workflow service was called with specific workflow ID
mock_external_service_dependencies["workflow_service"].return_value.get_draft_workflow.assert_called_once_with(
app, "specific-workflow-id"
)
def test_export_dsl_with_invalid_workflow_id_raises_error(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test that export_dsl raises error when invalid workflow ID is provided.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Update app to workflow mode
app.mode = "workflow"
db_session_with_containers.commit()
# Mock workflow service to return None when invalid workflow ID is provided
mock_external_service_dependencies["workflow_service"].return_value.get_draft_workflow.return_value = None
# Export DSL with invalid workflow ID should raise ValueError
with pytest.raises(ValueError, match="Missing draft workflow configuration, please check."):
AppDslService.export_dsl(app, include_secret=False, workflow_id="invalid-workflow-id")
# Verify workflow service was called with the invalid workflow ID
mock_external_service_dependencies["workflow_service"].return_value.get_draft_workflow.assert_called_once_with(
app, "invalid-workflow-id"
)
def test_check_dependencies_success(self, db_session_with_containers, mock_external_service_dependencies):