feat: support workflow version specification in workflow and chat APIs (#23188)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
qiaofenlin
2025-08-03 16:27:12 +08:00
committed by GitHub
parent 6d5a7684b4
commit 20f0238aab
20 changed files with 846 additions and 13 deletions

View File

@@ -2,7 +2,7 @@ import logging
from flask import request
from flask_restful import Resource, reqparse
from werkzeug.exceptions import InternalServerError, NotFound
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
import services
from controllers.service_api import api
@@ -30,6 +30,7 @@ from libs import helper
from libs.helper import uuid_value
from models.model import App, AppMode, EndUser
from services.app_generate_service import AppGenerateService
from services.errors.app import IsDraftWorkflowError, WorkflowIdFormatError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
@@ -113,7 +114,7 @@ class ChatApi(Resource):
parser.add_argument("conversation_id", type=uuid_value, location="json")
parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json")
parser.add_argument("auto_generate_name", type=bool, required=False, default=True, location="json")
parser.add_argument("workflow_id", type=str, required=False, location="json")
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
@@ -128,6 +129,12 @@ class ChatApi(Resource):
)
return helper.compact_generate_response(response)
except WorkflowNotFoundError as ex:
raise NotFound(str(ex))
except IsDraftWorkflowError as ex:
raise BadRequest(str(ex))
except WorkflowIdFormatError as ex:
raise BadRequest(str(ex))
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:

View File

@@ -5,7 +5,7 @@ from flask import request
from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range
from sqlalchemy.orm import Session, sessionmaker
from werkzeug.exceptions import InternalServerError
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from controllers.service_api import api
from controllers.service_api.app.error import (
@@ -34,6 +34,7 @@ from libs.helper import TimestampField
from models.model import App, AppMode, EndUser
from repositories.factory import DifyAPIRepositoryFactory
from services.app_generate_service import AppGenerateService
from services.errors.app import IsDraftWorkflowError, WorkflowIdFormatError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
from services.workflow_app_service import WorkflowAppService
@@ -120,6 +121,59 @@ class WorkflowRunApi(Resource):
raise InternalServerError()
class WorkflowRunByIdApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def post(self, app_model: App, end_user: EndUser, workflow_id: str):
"""
Run specific workflow by ID
"""
app_mode = AppMode.value_of(app_model.mode)
if app_mode != AppMode.WORKFLOW:
raise NotWorkflowAppError()
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
parser.add_argument("files", type=list, required=False, location="json")
parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
args = parser.parse_args()
# Add workflow_id to args for AppGenerateService
args["workflow_id"] = workflow_id
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
streaming = args.get("response_mode") == "streaming"
try:
response = AppGenerateService.generate(
app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
)
return helper.compact_generate_response(response)
except WorkflowNotFoundError as ex:
raise NotFound(str(ex))
except IsDraftWorkflowError as ex:
raise BadRequest(str(ex))
except WorkflowIdFormatError as ex:
raise BadRequest(str(ex))
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
raise e
except Exception:
logging.exception("internal server error.")
raise InternalServerError()
class WorkflowTaskStopApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def post(self, app_model: App, end_user: EndUser, task_id: str):
@@ -193,5 +247,6 @@ class WorkflowAppLogApi(Resource):
api.add_resource(WorkflowRunApi, "/workflows/run")
api.add_resource(WorkflowRunDetailApi, "/workflows/run/<string:workflow_run_id>")
api.add_resource(WorkflowRunByIdApi, "/workflows/<string:workflow_id>/run")
api.add_resource(WorkflowTaskStopApi, "/workflows/tasks/<string:task_id>/stop")
api.add_resource(WorkflowAppLogApi, "/workflows/logs")

View File

@@ -1,5 +1,6 @@
import uuid
from collections.abc import Generator, Mapping
from typing import Any, Union
from typing import Any, Optional, Union
from openai._exceptions import RateLimitError
@@ -15,6 +16,7 @@ from libs.helper import RateLimiter
from models.model import Account, App, AppMode, EndUser
from models.workflow import Workflow
from services.billing_service import BillingService
from services.errors.app import WorkflowIdFormatError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
from services.workflow_service import WorkflowService
@@ -86,7 +88,8 @@ class AppGenerateService:
request_id=request_id,
)
elif app_model.mode == AppMode.ADVANCED_CHAT.value:
workflow = cls._get_workflow(app_model, invoke_from)
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
return rate_limit.generate(
AdvancedChatAppGenerator.convert_to_event_stream(
AdvancedChatAppGenerator().generate(
@@ -101,7 +104,8 @@ class AppGenerateService:
request_id=request_id,
)
elif app_model.mode == AppMode.WORKFLOW.value:
workflow = cls._get_workflow(app_model, invoke_from)
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
return rate_limit.generate(
WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
@@ -210,14 +214,27 @@ class AppGenerateService:
)
@classmethod
def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow:
def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom, workflow_id: Optional[str] = None) -> Workflow:
"""
Get workflow
:param app_model: app model
:param invoke_from: invoke from
:param workflow_id: optional workflow id to specify a specific version
:return:
"""
workflow_service = WorkflowService()
# If workflow_id is specified, get the specific workflow version
if workflow_id:
try:
workflow_uuid = uuid.UUID(workflow_id)
except ValueError:
raise WorkflowIdFormatError(f"Invalid workflow_id format: '{workflow_id}'. ")
workflow = workflow_service.get_published_workflow_by_id(app_model=app_model, workflow_id=workflow_id)
if not workflow:
raise WorkflowNotFoundError(f"Workflow not found with id: {workflow_id}")
return workflow
if invoke_from == InvokeFrom.DEBUGGER:
# fetch draft workflow by app_model
workflow = workflow_service.get_draft_workflow(app_model=app_model)

View File

@@ -8,3 +8,11 @@ class WorkflowHashNotEqualError(Exception):
class IsDraftWorkflowError(Exception):
pass
class WorkflowNotFoundError(Exception):
pass
class WorkflowIdFormatError(Exception):
pass

View File

@@ -129,7 +129,10 @@ class WorkflowService:
if not workflow:
return None
if workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError(f"Workflow is draft version, id={workflow_id}")
raise IsDraftWorkflowError(
f"Cannot use draft workflow version. Workflow ID: {workflow_id}. "
f"Please use a published workflow version or leave workflow_id empty."
)
return workflow
def get_published_workflow(self, app_model: App) -> Optional[Workflow]: