diff --git a/api/core/workflow/nodes/agent/agent_node.py b/api/core/workflow/nodes/agent/agent_node.py index 987f670ac..ddc03b8d7 100644 --- a/api/core/workflow/nodes/agent/agent_node.py +++ b/api/core/workflow/nodes/agent/agent_node.py @@ -1,4 +1,5 @@ import json +import uuid from collections.abc import Generator, Mapping, Sequence from typing import Any, Optional, cast @@ -102,6 +103,36 @@ class AgentNode(ToolNode): try: # convert tool messages + agent_thoughts: list = [] + + from core.tools.entities.tool_entities import ToolInvokeMessage + + thought_log_message = ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.LOG, + message=ToolInvokeMessage.LogMessage( + id=str(uuid.uuid4()), + label=f"Agent Strategy: {cast(AgentNodeData, self.node_data).agent_strategy_name}", + parent_id=None, + error=None, + status=ToolInvokeMessage.LogMessage.LogStatus.START, + data={ + "strategy": cast(AgentNodeData, self.node_data).agent_strategy_name, + "parameters": parameters_for_log, + "thought_process": "Agent strategy execution started", + }, + metadata={ + "icon": self.agent_strategy_icon, + "agent_strategy": cast(AgentNodeData, self.node_data).agent_strategy_name, + }, + ), + ) + + from core.tools.entities.tool_entities import ToolInvokeMessage + + def enhanced_message_stream(): + yield thought_log_message + + yield from message_stream yield from self._transform_message( message_stream, @@ -110,6 +141,7 @@ class AgentNode(ToolNode): "agent_strategy": cast(AgentNodeData, self.node_data).agent_strategy_name, }, parameters_for_log, + agent_thoughts, ) except PluginDaemonClientSideError as e: yield RunCompletedEvent( diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index a4be02d86..59b3b1e2a 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -1,5 +1,5 @@ from collections.abc import Generator, Mapping, Sequence -from typing import Any, cast +from typing import Any, Optional, cast from sqlalchemy import select from sqlalchemy.orm import Session @@ -191,6 +191,7 @@ class ToolNode(BaseNode[ToolNodeData]): messages: Generator[ToolInvokeMessage, None, None], tool_info: Mapping[str, Any], parameters_for_log: dict[str, Any], + agent_thoughts: Optional[list] = None, ) -> Generator: """ Convert ToolInvokeMessages into tuple[plain_text, files] @@ -368,11 +369,35 @@ class ToolNode(BaseNode[ToolNodeData]): agent_logs.append(agent_log) yield agent_log + # Add agent_logs to outputs['json'] to ensure frontend can access thinking process + json_output: dict[str, Any] = {} + if json: + if isinstance(json, list) and len(json) == 1: + # If json is a list with only one element, convert it to a dictionary + json_output = json[0] if isinstance(json[0], dict) else {"data": json[0]} + elif isinstance(json, list): + # If json is a list with multiple elements, create a dictionary containing all data + json_output = {"data": json} + if agent_logs: + # Add agent_logs to json output + json_output["agent_logs"] = [ + { + "id": log.id, + "parent_id": log.parent_id, + "error": log.error, + "status": log.status, + "data": log.data, + "label": log.label, + "metadata": log.metadata, + "node_id": log.node_id, + } + for log in agent_logs + ] yield RunCompletedEvent( run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={"text": text, "files": ArrayFileSegment(value=files), "json": json, **variables}, + outputs={"text": text, "files": ArrayFileSegment(value=files), "json": json_output, **variables}, metadata={ **agent_execution_metadata, WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,