diff --git a/api/core/llm_generator/output_parser/structured_output.py b/api/core/llm_generator/output_parser/structured_output.py
new file mode 100644
index 000000000..0aaf5abef
--- /dev/null
+++ b/api/core/llm_generator/output_parser/structured_output.py
@@ -0,0 +1,374 @@
+import json
+from collections.abc import Generator, Mapping, Sequence
+from copy import deepcopy
+from enum import StrEnum
+from typing import Any, Literal, Optional, cast, overload
+
+import json_repair
+from pydantic import TypeAdapter, ValidationError
+
+from core.llm_generator.output_parser.errors import OutputParserError
+from core.llm_generator.prompts import STRUCTURED_OUTPUT_PROMPT
+from core.model_manager import ModelInstance
+from core.model_runtime.callbacks.base_callback import Callback
+from core.model_runtime.entities.llm_entities import (
+ LLMResult,
+ LLMResultChunk,
+ LLMResultChunkDelta,
+ LLMResultChunkWithStructuredOutput,
+ LLMResultWithStructuredOutput,
+)
+from core.model_runtime.entities.message_entities import (
+ AssistantPromptMessage,
+ PromptMessage,
+ PromptMessageTool,
+ SystemPromptMessage,
+)
+from core.model_runtime.entities.model_entities import AIModelEntity, ParameterRule
+
+
+class ResponseFormat(StrEnum):
+ """Constants for model response formats"""
+
+ JSON_SCHEMA = "json_schema" # model's structured output mode. some model like gemini, gpt-4o, support this mode.
+ JSON = "JSON" # model's json mode. some model like claude support this mode.
+ JSON_OBJECT = "json_object" # json mode's another alias. some model like deepseek-chat, qwen use this alias.
+
+
+class SpecialModelType(StrEnum):
+ """Constants for identifying model types"""
+
+ GEMINI = "gemini"
+ OLLAMA = "ollama"
+
+
+@overload
+def invoke_llm_with_structured_output(
+ provider: str,
+ model_schema: AIModelEntity,
+ model_instance: ModelInstance,
+ prompt_messages: Sequence[PromptMessage],
+ json_schema: Mapping[str, Any],
+ model_parameters: Optional[Mapping] = None,
+ tools: Sequence[PromptMessageTool] | None = None,
+ stop: Optional[list[str]] = None,
+ stream: Literal[True] = True,
+ user: Optional[str] = None,
+ callbacks: Optional[list[Callback]] = None,
+) -> Generator[LLMResultChunkWithStructuredOutput, None, None]: ...
+
+
+@overload
+def invoke_llm_with_structured_output(
+ provider: str,
+ model_schema: AIModelEntity,
+ model_instance: ModelInstance,
+ prompt_messages: Sequence[PromptMessage],
+ json_schema: Mapping[str, Any],
+ model_parameters: Optional[Mapping] = None,
+ tools: Sequence[PromptMessageTool] | None = None,
+ stop: Optional[list[str]] = None,
+ stream: Literal[False] = False,
+ user: Optional[str] = None,
+ callbacks: Optional[list[Callback]] = None,
+) -> LLMResultWithStructuredOutput: ...
+
+
+@overload
+def invoke_llm_with_structured_output(
+ provider: str,
+ model_schema: AIModelEntity,
+ model_instance: ModelInstance,
+ prompt_messages: Sequence[PromptMessage],
+ json_schema: Mapping[str, Any],
+ model_parameters: Optional[Mapping] = None,
+ tools: Sequence[PromptMessageTool] | None = None,
+ stop: Optional[list[str]] = None,
+ stream: bool = True,
+ user: Optional[str] = None,
+ callbacks: Optional[list[Callback]] = None,
+) -> LLMResultWithStructuredOutput | Generator[LLMResultChunkWithStructuredOutput, None, None]: ...
+
+
+def invoke_llm_with_structured_output(
+ provider: str,
+ model_schema: AIModelEntity,
+ model_instance: ModelInstance,
+ prompt_messages: Sequence[PromptMessage],
+ json_schema: Mapping[str, Any],
+ model_parameters: Optional[Mapping] = None,
+ tools: Sequence[PromptMessageTool] | None = None,
+ stop: Optional[list[str]] = None,
+ stream: bool = True,
+ user: Optional[str] = None,
+ callbacks: Optional[list[Callback]] = None,
+) -> LLMResultWithStructuredOutput | Generator[LLMResultChunkWithStructuredOutput, None, None]:
+ """
+ Invoke large language model with structured output
+ 1. This method invokes model_instance.invoke_llm with json_schema
+ 2. Try to parse the result as structured output
+
+ :param prompt_messages: prompt messages
+ :param json_schema: json schema
+ :param model_parameters: model parameters
+ :param tools: tools for tool calling
+ :param stop: stop words
+ :param stream: is stream response
+ :param user: unique user id
+ :param callbacks: callbacks
+ :return: full response or stream response chunk generator result
+ """
+
+ # handle native json schema
+ model_parameters_with_json_schema: dict[str, Any] = {
+ **(model_parameters or {}),
+ }
+
+ if model_schema.support_structure_output:
+ model_parameters = _handle_native_json_schema(
+ provider, model_schema, json_schema, model_parameters_with_json_schema, model_schema.parameter_rules
+ )
+ else:
+ # Set appropriate response format based on model capabilities
+ _set_response_format(model_parameters_with_json_schema, model_schema.parameter_rules)
+
+ # handle prompt based schema
+ prompt_messages = _handle_prompt_based_schema(
+ prompt_messages=prompt_messages,
+ structured_output_schema=json_schema,
+ )
+
+ llm_result = model_instance.invoke_llm(
+ prompt_messages=list(prompt_messages),
+ model_parameters=model_parameters_with_json_schema,
+ tools=tools,
+ stop=stop,
+ stream=stream,
+ user=user,
+ callbacks=callbacks,
+ )
+
+ if isinstance(llm_result, LLMResult):
+ if not isinstance(llm_result.message.content, str):
+ raise OutputParserError(
+ f"Failed to parse structured output, LLM result is not a string: {llm_result.message.content}"
+ )
+
+ return LLMResultWithStructuredOutput(
+ structured_output=_parse_structured_output(llm_result.message.content),
+ model=llm_result.model,
+ message=llm_result.message,
+ usage=llm_result.usage,
+ system_fingerprint=llm_result.system_fingerprint,
+ prompt_messages=llm_result.prompt_messages,
+ )
+ else:
+
+ def generator() -> Generator[LLMResultChunkWithStructuredOutput, None, None]:
+ result_text: str = ""
+ prompt_messages: Sequence[PromptMessage] = []
+ system_fingerprint: Optional[str] = None
+ for event in llm_result:
+ if isinstance(event, LLMResultChunk):
+ if isinstance(event.delta.message.content, str):
+ result_text += event.delta.message.content
+ prompt_messages = event.prompt_messages
+ system_fingerprint = event.system_fingerprint
+
+ yield LLMResultChunkWithStructuredOutput(
+ model=model_schema.model,
+ prompt_messages=prompt_messages,
+ system_fingerprint=system_fingerprint,
+ delta=event.delta,
+ )
+
+ yield LLMResultChunkWithStructuredOutput(
+ structured_output=_parse_structured_output(result_text),
+ model=model_schema.model,
+ prompt_messages=prompt_messages,
+ system_fingerprint=system_fingerprint,
+ delta=LLMResultChunkDelta(
+ index=0,
+ message=AssistantPromptMessage(content=""),
+ usage=None,
+ finish_reason=None,
+ ),
+ )
+
+ return generator()
+
+
+def _handle_native_json_schema(
+ provider: str,
+ model_schema: AIModelEntity,
+ structured_output_schema: Mapping,
+ model_parameters: dict,
+ rules: list[ParameterRule],
+) -> dict:
+ """
+ Handle structured output for models with native JSON schema support.
+
+ :param model_parameters: Model parameters to update
+ :param rules: Model parameter rules
+ :return: Updated model parameters with JSON schema configuration
+ """
+ # Process schema according to model requirements
+ schema_json = _prepare_schema_for_model(provider, model_schema, structured_output_schema)
+
+ # Set JSON schema in parameters
+ model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False)
+
+ # Set appropriate response format if required by the model
+ for rule in rules:
+ if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options:
+ model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value
+
+ return model_parameters
+
+
+def _set_response_format(model_parameters: dict, rules: list) -> None:
+ """
+ Set the appropriate response format parameter based on model rules.
+
+ :param model_parameters: Model parameters to update
+ :param rules: Model parameter rules
+ """
+ for rule in rules:
+ if rule.name == "response_format":
+ if ResponseFormat.JSON.value in rule.options:
+ model_parameters["response_format"] = ResponseFormat.JSON.value
+ elif ResponseFormat.JSON_OBJECT.value in rule.options:
+ model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value
+
+
+def _handle_prompt_based_schema(
+ prompt_messages: Sequence[PromptMessage], structured_output_schema: Mapping
+) -> list[PromptMessage]:
+ """
+ Handle structured output for models without native JSON schema support.
+ This function modifies the prompt messages to include schema-based output requirements.
+
+ Args:
+ prompt_messages: Original sequence of prompt messages
+
+ Returns:
+ list[PromptMessage]: Updated prompt messages with structured output requirements
+ """
+ # Convert schema to string format
+ schema_str = json.dumps(structured_output_schema, ensure_ascii=False)
+
+ # Find existing system prompt with schema placeholder
+ system_prompt = next(
+ (prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)),
+ None,
+ )
+ structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str)
+ # Prepare system prompt content
+ system_prompt_content = (
+ structured_output_prompt + "\n\n" + system_prompt.content
+ if system_prompt and isinstance(system_prompt.content, str)
+ else structured_output_prompt
+ )
+ system_prompt = SystemPromptMessage(content=system_prompt_content)
+
+ # Extract content from the last user message
+
+ filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)]
+ updated_prompt = [system_prompt] + filtered_prompts
+
+ return updated_prompt
+
+
+def _parse_structured_output(result_text: str) -> Mapping[str, Any]:
+ structured_output: Mapping[str, Any] = {}
+ parsed: Mapping[str, Any] = {}
+ try:
+ parsed = TypeAdapter(Mapping).validate_json(result_text)
+ if not isinstance(parsed, dict):
+ raise OutputParserError(f"Failed to parse structured output: {result_text}")
+ structured_output = parsed
+ except ValidationError:
+ # if the result_text is not a valid json, try to repair it
+ temp_parsed = json_repair.loads(result_text)
+ if not isinstance(temp_parsed, dict):
+ # handle reasoning model like deepseek-r1 got '\n\n\n' prefix
+ if isinstance(temp_parsed, list):
+ temp_parsed = next((item for item in temp_parsed if isinstance(item, dict)), {})
+ else:
+ raise OutputParserError(f"Failed to parse structured output: {result_text}")
+ structured_output = cast(dict, temp_parsed)
+ return structured_output
+
+
+def _prepare_schema_for_model(provider: str, model_schema: AIModelEntity, schema: Mapping) -> dict:
+ """
+ Prepare JSON schema based on model requirements.
+
+ Different models have different requirements for JSON schema formatting.
+ This function handles these differences.
+
+ :param schema: The original JSON schema
+ :return: Processed schema compatible with the current model
+ """
+
+ # Deep copy to avoid modifying the original schema
+ processed_schema = dict(deepcopy(schema))
+
+ # Convert boolean types to string types (common requirement)
+ convert_boolean_to_string(processed_schema)
+
+ # Apply model-specific transformations
+ if SpecialModelType.GEMINI in model_schema.model:
+ remove_additional_properties(processed_schema)
+ return processed_schema
+ elif SpecialModelType.OLLAMA in provider:
+ return processed_schema
+ else:
+ # Default format with name field
+ return {"schema": processed_schema, "name": "llm_response"}
+
+
+def remove_additional_properties(schema: dict) -> None:
+ """
+ Remove additionalProperties fields from JSON schema.
+ Used for models like Gemini that don't support this property.
+
+ :param schema: JSON schema to modify in-place
+ """
+ if not isinstance(schema, dict):
+ return
+
+ # Remove additionalProperties at current level
+ schema.pop("additionalProperties", None)
+
+ # Process nested structures recursively
+ for value in schema.values():
+ if isinstance(value, dict):
+ remove_additional_properties(value)
+ elif isinstance(value, list):
+ for item in value:
+ if isinstance(item, dict):
+ remove_additional_properties(item)
+
+
+def convert_boolean_to_string(schema: dict) -> None:
+ """
+ Convert boolean type specifications to string in JSON schema.
+
+ :param schema: JSON schema to modify in-place
+ """
+ if not isinstance(schema, dict):
+ return
+
+ # Check for boolean type at current level
+ if schema.get("type") == "boolean":
+ schema["type"] = "string"
+
+ # Process nested dictionaries and lists recursively
+ for value in schema.values():
+ if isinstance(value, dict):
+ convert_boolean_to_string(value)
+ elif isinstance(value, list):
+ for item in value:
+ if isinstance(item, dict):
+ convert_boolean_to_string(item)
diff --git a/api/core/llm_generator/prompts.py b/api/core/llm_generator/prompts.py
index ddfa1e7a6..ef81e38dc 100644
--- a/api/core/llm_generator/prompts.py
+++ b/api/core/llm_generator/prompts.py
@@ -291,3 +291,21 @@ Your task is to convert simple user descriptions into properly formatted JSON Sc
Now, generate a JSON Schema based on my description
""" # noqa: E501
+
+STRUCTURED_OUTPUT_PROMPT = """You’re a helpful AI assistant. You could answer questions and output in JSON format.
+constraints:
+ - You must output in JSON format.
+ - Do not output boolean value, use string type instead.
+ - Do not output integer or float value, use number type instead.
+eg:
+ Here is the JSON schema:
+ {"additionalProperties": false, "properties": {"age": {"type": "number"}, "name": {"type": "string"}}, "required": ["name", "age"], "type": "object"}
+
+ Here is the user's question:
+ My name is John Doe and I am 30 years old.
+
+ output:
+ {"name": "John Doe", "age": 30}
+Here is the JSON schema:
+{{schema}}
+""" # noqa: E501
diff --git a/api/core/model_runtime/entities/llm_entities.py b/api/core/model_runtime/entities/llm_entities.py
index de5a748d4..e52b0eba5 100644
--- a/api/core/model_runtime/entities/llm_entities.py
+++ b/api/core/model_runtime/entities/llm_entities.py
@@ -1,7 +1,7 @@
-from collections.abc import Sequence
+from collections.abc import Mapping, Sequence
from decimal import Decimal
from enum import StrEnum
-from typing import Optional
+from typing import Any, Optional
from pydantic import BaseModel, Field
@@ -101,6 +101,20 @@ class LLMResult(BaseModel):
system_fingerprint: Optional[str] = None
+class LLMStructuredOutput(BaseModel):
+ """
+ Model class for llm structured output.
+ """
+
+ structured_output: Optional[Mapping[str, Any]] = None
+
+
+class LLMResultWithStructuredOutput(LLMResult, LLMStructuredOutput):
+ """
+ Model class for llm result with structured output.
+ """
+
+
class LLMResultChunkDelta(BaseModel):
"""
Model class for llm result chunk delta.
@@ -123,6 +137,12 @@ class LLMResultChunk(BaseModel):
delta: LLMResultChunkDelta
+class LLMResultChunkWithStructuredOutput(LLMResultChunk, LLMStructuredOutput):
+ """
+ Model class for llm result chunk with structured output.
+ """
+
+
class NumTokensResult(PriceInfo):
"""
Model class for number of tokens result.
diff --git a/api/core/plugin/backwards_invocation/model.py b/api/core/plugin/backwards_invocation/model.py
index 072644e53..9428d198a 100644
--- a/api/core/plugin/backwards_invocation/model.py
+++ b/api/core/plugin/backwards_invocation/model.py
@@ -3,7 +3,11 @@ from binascii import hexlify, unhexlify
from collections.abc import Generator
from core.model_manager import ModelManager
-from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta
+from core.model_runtime.entities.llm_entities import (
+ LLMResult,
+ LLMResultChunk,
+ LLMResultChunkDelta,
+)
from core.model_runtime.entities.message_entities import (
PromptMessage,
SystemPromptMessage,
diff --git a/api/core/plugin/entities/parameters.py b/api/core/plugin/entities/parameters.py
index a01180100..2b438a3c3 100644
--- a/api/core/plugin/entities/parameters.py
+++ b/api/core/plugin/entities/parameters.py
@@ -10,6 +10,9 @@ from core.tools.entities.common_entities import I18nObject
class PluginParameterOption(BaseModel):
value: str = Field(..., description="The value of the option")
label: I18nObject = Field(..., description="The label of the option")
+ icon: Optional[str] = Field(
+ default=None, description="The icon of the option, can be a url or a base64 encoded image"
+ )
@field_validator("value", mode="before")
@classmethod
diff --git a/api/core/plugin/entities/request.py b/api/core/plugin/entities/request.py
index 1692020ec..f9c81ed4d 100644
--- a/api/core/plugin/entities/request.py
+++ b/api/core/plugin/entities/request.py
@@ -82,6 +82,16 @@ class RequestInvokeLLM(BaseRequestInvokeModel):
return v
+class RequestInvokeLLMWithStructuredOutput(RequestInvokeLLM):
+ """
+ Request to invoke LLM with structured output
+ """
+
+ structured_output_schema: dict[str, Any] = Field(
+ default_factory=dict, description="The schema of the structured output in JSON schema format"
+ )
+
+
class RequestInvokeTextEmbedding(BaseRequestInvokeModel):
"""
Request to invoke text embedding
diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py
index 124ae6d75..b5225ce54 100644
--- a/api/core/workflow/nodes/llm/node.py
+++ b/api/core/workflow/nodes/llm/node.py
@@ -5,11 +5,11 @@ import logging
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, cast
-import json_repair
-
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.file import FileType, file_manager
from core.helper.code_executor import CodeExecutor, CodeLanguage
+from core.llm_generator.output_parser.errors import OutputParserError
+from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities import (
@@ -18,7 +18,13 @@ from core.model_runtime.entities import (
PromptMessageContentType,
TextPromptMessageContent,
)
-from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMUsage
+from core.model_runtime.entities.llm_entities import (
+ LLMResult,
+ LLMResultChunk,
+ LLMResultChunkWithStructuredOutput,
+ LLMStructuredOutput,
+ LLMUsage,
+)
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessageContentUnionTypes,
@@ -31,7 +37,6 @@ from core.model_runtime.entities.model_entities import (
ModelFeature,
ModelPropertyKey,
ModelType,
- ParameterRule,
)
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.model_runtime.utils.encoders import jsonable_encoder
@@ -62,11 +67,6 @@ from core.workflow.nodes.event import (
RunRetrieverResourceEvent,
RunStreamChunkEvent,
)
-from core.workflow.utils.structured_output.entities import (
- ResponseFormat,
- SpecialModelType,
-)
-from core.workflow.utils.structured_output.prompt import STRUCTURED_OUTPUT_PROMPT
from core.workflow.utils.variable_template_parser import VariableTemplateParser
from . import llm_utils
@@ -143,12 +143,6 @@ class LLMNode(BaseNode[LLMNodeData]):
return "1"
def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
- def process_structured_output(text: str) -> Optional[dict[str, Any]]:
- """Process structured output if enabled"""
- if not self.node_data.structured_output_enabled or not self.node_data.structured_output:
- return None
- return self._parse_structured_output(text)
-
node_inputs: Optional[dict[str, Any]] = None
process_data = None
result_text = ""
@@ -244,6 +238,8 @@ class LLMNode(BaseNode[LLMNodeData]):
stop=stop,
)
+ structured_output: LLMStructuredOutput | None = None
+
for event in generator:
if isinstance(event, RunStreamChunkEvent):
yield event
@@ -254,10 +250,12 @@ class LLMNode(BaseNode[LLMNodeData]):
# deduct quota
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
break
+ elif isinstance(event, LLMStructuredOutput):
+ structured_output = event
+
outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason}
- structured_output = process_structured_output(result_text)
if structured_output:
- outputs["structured_output"] = structured_output
+ outputs["structured_output"] = structured_output.structured_output
if self._file_outputs is not None:
outputs["files"] = ArrayFileSegment(value=self._file_outputs)
@@ -302,20 +300,40 @@ class LLMNode(BaseNode[LLMNodeData]):
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
stop: Optional[Sequence[str]] = None,
- ) -> Generator[NodeEvent, None, None]:
- invoke_result = model_instance.invoke_llm(
- prompt_messages=list(prompt_messages),
- model_parameters=node_data_model.completion_params,
- stop=list(stop or []),
- stream=True,
- user=self.user_id,
+ ) -> Generator[NodeEvent | LLMStructuredOutput, None, None]:
+ model_schema = model_instance.model_type_instance.get_model_schema(
+ node_data_model.name, model_instance.credentials
)
+ if not model_schema:
+ raise ValueError(f"Model schema not found for {node_data_model.name}")
+
+ if self.node_data.structured_output_enabled:
+ output_schema = self._fetch_structured_output_schema()
+ invoke_result = invoke_llm_with_structured_output(
+ provider=model_instance.provider,
+ model_schema=model_schema,
+ model_instance=model_instance,
+ prompt_messages=prompt_messages,
+ json_schema=output_schema,
+ model_parameters=node_data_model.completion_params,
+ stop=list(stop or []),
+ stream=True,
+ user=self.user_id,
+ )
+ else:
+ invoke_result = model_instance.invoke_llm(
+ prompt_messages=list(prompt_messages),
+ model_parameters=node_data_model.completion_params,
+ stop=list(stop or []),
+ stream=True,
+ user=self.user_id,
+ )
return self._handle_invoke_result(invoke_result=invoke_result)
def _handle_invoke_result(
- self, invoke_result: LLMResult | Generator[LLMResultChunk, None, None]
- ) -> Generator[NodeEvent, None, None]:
+ self, invoke_result: LLMResult | Generator[LLMResultChunk | LLMStructuredOutput, None, None]
+ ) -> Generator[NodeEvent | LLMStructuredOutput, None, None]:
# For blocking mode
if isinstance(invoke_result, LLMResult):
event = self._handle_blocking_result(invoke_result=invoke_result)
@@ -329,23 +347,32 @@ class LLMNode(BaseNode[LLMNodeData]):
usage = LLMUsage.empty_usage()
finish_reason = None
full_text_buffer = io.StringIO()
- for result in invoke_result:
- contents = result.delta.message.content
- for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents):
- full_text_buffer.write(text_part)
- yield RunStreamChunkEvent(chunk_content=text_part, from_variable_selector=[self.node_id, "text"])
+ # Consume the invoke result and handle generator exception
+ try:
+ for result in invoke_result:
+ if isinstance(result, LLMResultChunkWithStructuredOutput):
+ yield result
+ if isinstance(result, LLMResultChunk):
+ contents = result.delta.message.content
+ for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents):
+ full_text_buffer.write(text_part)
+ yield RunStreamChunkEvent(
+ chunk_content=text_part, from_variable_selector=[self.node_id, "text"]
+ )
- # Update the whole metadata
- if not model and result.model:
- model = result.model
- if len(prompt_messages) == 0:
- # TODO(QuantumGhost): it seems that this update has no visable effect.
- # What's the purpose of the line below?
- prompt_messages = list(result.prompt_messages)
- if usage.prompt_tokens == 0 and result.delta.usage:
- usage = result.delta.usage
- if finish_reason is None and result.delta.finish_reason:
- finish_reason = result.delta.finish_reason
+ # Update the whole metadata
+ if not model and result.model:
+ model = result.model
+ if len(prompt_messages) == 0:
+ # TODO(QuantumGhost): it seems that this update has no visable effect.
+ # What's the purpose of the line below?
+ prompt_messages = list(result.prompt_messages)
+ if usage.prompt_tokens == 0 and result.delta.usage:
+ usage = result.delta.usage
+ if finish_reason is None and result.delta.finish_reason:
+ finish_reason = result.delta.finish_reason
+ except OutputParserError as e:
+ raise LLMNodeError(f"Failed to parse structured output: {e}")
yield ModelInvokeCompletedEvent(text=full_text_buffer.getvalue(), usage=usage, finish_reason=finish_reason)
@@ -522,12 +549,6 @@ class LLMNode(BaseNode[LLMNodeData]):
if not model_schema:
raise ModelNotExistError(f"Model {node_data_model.name} not exist.")
- if self.node_data.structured_output_enabled:
- if model_schema.support_structure_output:
- completion_params = self._handle_native_json_schema(completion_params, model_schema.parameter_rules)
- else:
- # Set appropriate response format based on model capabilities
- self._set_response_format(completion_params, model_schema.parameter_rules)
model_config_with_cred.parameters = completion_params
# NOTE(-LAN-): This line modify the `self.node_data.model`, which is used in `_invoke_llm()`.
node_data_model.completion_params = completion_params
@@ -719,32 +740,8 @@ class LLMNode(BaseNode[LLMNodeData]):
)
if not model_schema:
raise ModelNotExistError(f"Model {model_config.model} not exist.")
- if self.node_data.structured_output_enabled:
- if not model_schema.support_structure_output:
- filtered_prompt_messages = self._handle_prompt_based_schema(
- prompt_messages=filtered_prompt_messages,
- )
return filtered_prompt_messages, model_config.stop
- def _parse_structured_output(self, result_text: str) -> dict[str, Any]:
- structured_output: dict[str, Any] = {}
- try:
- parsed = json.loads(result_text)
- if not isinstance(parsed, dict):
- raise LLMNodeError(f"Failed to parse structured output: {result_text}")
- structured_output = parsed
- except json.JSONDecodeError as e:
- # if the result_text is not a valid json, try to repair it
- parsed = json_repair.loads(result_text)
- if not isinstance(parsed, dict):
- # handle reasoning model like deepseek-r1 got '\n\n\n' prefix
- if isinstance(parsed, list):
- parsed = next((item for item in parsed if isinstance(item, dict)), {})
- else:
- raise LLMNodeError(f"Failed to parse structured output: {result_text}")
- structured_output = parsed
- return structured_output
-
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
@@ -934,104 +931,6 @@ class LLMNode(BaseNode[LLMNodeData]):
self._file_outputs.append(saved_file)
return saved_file
- def _handle_native_json_schema(self, model_parameters: dict, rules: list[ParameterRule]) -> dict:
- """
- Handle structured output for models with native JSON schema support.
-
- :param model_parameters: Model parameters to update
- :param rules: Model parameter rules
- :return: Updated model parameters with JSON schema configuration
- """
- # Process schema according to model requirements
- schema = self._fetch_structured_output_schema()
- schema_json = self._prepare_schema_for_model(schema)
-
- # Set JSON schema in parameters
- model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False)
-
- # Set appropriate response format if required by the model
- for rule in rules:
- if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options:
- model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value
-
- return model_parameters
-
- def _handle_prompt_based_schema(self, prompt_messages: Sequence[PromptMessage]) -> list[PromptMessage]:
- """
- Handle structured output for models without native JSON schema support.
- This function modifies the prompt messages to include schema-based output requirements.
-
- Args:
- prompt_messages: Original sequence of prompt messages
-
- Returns:
- list[PromptMessage]: Updated prompt messages with structured output requirements
- """
- # Convert schema to string format
- schema_str = json.dumps(self._fetch_structured_output_schema(), ensure_ascii=False)
-
- # Find existing system prompt with schema placeholder
- system_prompt = next(
- (prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)),
- None,
- )
- structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str)
- # Prepare system prompt content
- system_prompt_content = (
- structured_output_prompt + "\n\n" + system_prompt.content
- if system_prompt and isinstance(system_prompt.content, str)
- else structured_output_prompt
- )
- system_prompt = SystemPromptMessage(content=system_prompt_content)
-
- # Extract content from the last user message
-
- filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)]
- updated_prompt = [system_prompt] + filtered_prompts
-
- return updated_prompt
-
- def _set_response_format(self, model_parameters: dict, rules: list) -> None:
- """
- Set the appropriate response format parameter based on model rules.
-
- :param model_parameters: Model parameters to update
- :param rules: Model parameter rules
- """
- for rule in rules:
- if rule.name == "response_format":
- if ResponseFormat.JSON.value in rule.options:
- model_parameters["response_format"] = ResponseFormat.JSON.value
- elif ResponseFormat.JSON_OBJECT.value in rule.options:
- model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value
-
- def _prepare_schema_for_model(self, schema: dict) -> dict:
- """
- Prepare JSON schema based on model requirements.
-
- Different models have different requirements for JSON schema formatting.
- This function handles these differences.
-
- :param schema: The original JSON schema
- :return: Processed schema compatible with the current model
- """
-
- # Deep copy to avoid modifying the original schema
- processed_schema = schema.copy()
-
- # Convert boolean types to string types (common requirement)
- convert_boolean_to_string(processed_schema)
-
- # Apply model-specific transformations
- if SpecialModelType.GEMINI in self.node_data.model.name:
- remove_additional_properties(processed_schema)
- return processed_schema
- elif SpecialModelType.OLLAMA in self.node_data.model.provider:
- return processed_schema
- else:
- # Default format with name field
- return {"schema": processed_schema, "name": "llm_response"}
-
def _fetch_model_schema(self, provider: str) -> AIModelEntity | None:
"""
Fetch model schema
@@ -1243,49 +1142,3 @@ def _handle_completion_template(
)
prompt_messages.append(prompt_message)
return prompt_messages
-
-
-def remove_additional_properties(schema: dict) -> None:
- """
- Remove additionalProperties fields from JSON schema.
- Used for models like Gemini that don't support this property.
-
- :param schema: JSON schema to modify in-place
- """
- if not isinstance(schema, dict):
- return
-
- # Remove additionalProperties at current level
- schema.pop("additionalProperties", None)
-
- # Process nested structures recursively
- for value in schema.values():
- if isinstance(value, dict):
- remove_additional_properties(value)
- elif isinstance(value, list):
- for item in value:
- if isinstance(item, dict):
- remove_additional_properties(item)
-
-
-def convert_boolean_to_string(schema: dict) -> None:
- """
- Convert boolean type specifications to string in JSON schema.
-
- :param schema: JSON schema to modify in-place
- """
- if not isinstance(schema, dict):
- return
-
- # Check for boolean type at current level
- if schema.get("type") == "boolean":
- schema["type"] = "string"
-
- # Process nested dictionaries and lists recursively
- for value in schema.values():
- if isinstance(value, dict):
- convert_boolean_to_string(value)
- elif isinstance(value, list):
- for item in value:
- if isinstance(item, dict):
- convert_boolean_to_string(item)
diff --git a/api/core/workflow/utils/structured_output/entities.py b/api/core/workflow/utils/structured_output/entities.py
deleted file mode 100644
index 6491042bf..000000000
--- a/api/core/workflow/utils/structured_output/entities.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from enum import StrEnum
-
-
-class ResponseFormat(StrEnum):
- """Constants for model response formats"""
-
- JSON_SCHEMA = "json_schema" # model's structured output mode. some model like gemini, gpt-4o, support this mode.
- JSON = "JSON" # model's json mode. some model like claude support this mode.
- JSON_OBJECT = "json_object" # json mode's another alias. some model like deepseek-chat, qwen use this alias.
-
-
-class SpecialModelType(StrEnum):
- """Constants for identifying model types"""
-
- GEMINI = "gemini"
- OLLAMA = "ollama"
diff --git a/api/core/workflow/utils/structured_output/prompt.py b/api/core/workflow/utils/structured_output/prompt.py
deleted file mode 100644
index 06d9b2056..000000000
--- a/api/core/workflow/utils/structured_output/prompt.py
+++ /dev/null
@@ -1,17 +0,0 @@
-STRUCTURED_OUTPUT_PROMPT = """You’re a helpful AI assistant. You could answer questions and output in JSON format.
-constraints:
- - You must output in JSON format.
- - Do not output boolean value, use string type instead.
- - Do not output integer or float value, use number type instead.
-eg:
- Here is the JSON schema:
- {"additionalProperties": false, "properties": {"age": {"type": "number"}, "name": {"type": "string"}}, "required": ["name", "age"], "type": "object"}
-
- Here is the user's question:
- My name is John Doe and I am 30 years old.
-
- output:
- {"name": "John Doe", "age": 30}
-Here is the JSON schema:
-{{schema}}
-""" # noqa: E501
diff --git a/api/tests/integration_tests/workflow/nodes/test_llm.py b/api/tests/integration_tests/workflow/nodes/test_llm.py
index a3b2fdc37..389d1071f 100644
--- a/api/tests/integration_tests/workflow/nodes/test_llm.py
+++ b/api/tests/integration_tests/workflow/nodes/test_llm.py
@@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
+from core.llm_generator.output_parser.structured_output import _parse_structured_output
from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage
from core.model_runtime.entities.message_entities import AssistantPromptMessage
from core.workflow.entities.variable_pool import VariablePool
@@ -277,29 +278,6 @@ def test_execute_llm_with_jinja2(flask_req_ctx, setup_code_executor_mock):
def test_extract_json():
- node = init_llm_node(
- config={
- "id": "llm",
- "data": {
- "title": "123",
- "type": "llm",
- "model": {"provider": "openai", "name": "gpt-3.5-turbo", "mode": "chat", "completion_params": {}},
- "prompt_config": {
- "structured_output": {
- "enabled": True,
- "schema": {
- "type": "object",
- "properties": {"name": {"type": "string"}, "age": {"type": "number"}},
- },
- }
- },
- "prompt_template": [{"role": "user", "text": "{{#sys.query#}}"}],
- "memory": None,
- "context": {"enabled": False},
- "vision": {"enabled": False},
- },
- },
- )
llm_texts = [
'\n\n{"name": "test", "age": 123', # resoning model (deepseek-r1)
'{"name":"test","age":123}', # json schema model (gpt-4o)
@@ -308,4 +286,4 @@ def test_extract_json():
'{"name":"test",age:123}', # without quotes (qwen-2.5-0.5b)
]
result = {"name": "test", "age": 123}
- assert all(node._parse_structured_output(item) == result for item in llm_texts)
+ assert all(_parse_structured_output(item) == result for item in llm_texts)