From 1f9cd99bc244a50aa5f208793f41ba272bc6e433 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Fri, 18 Jul 2025 10:34:47 +0800 Subject: [PATCH] refactor: elegant event dispatch patterns (92% complexity reduction) (#22600) Signed-off-by: -LAN- Co-authored-by: Claude --- .../advanced_chat/generate_task_pipeline.py | 971 +++++++++++------- .../apps/workflow/generate_task_pipeline.py | 772 ++++++++------ 2 files changed, 1067 insertions(+), 676 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 1dc9796d5..337b779b5 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,6 +1,7 @@ import logging import time -from collections.abc import Generator, Mapping +from collections.abc import Callable, Generator, Mapping +from contextlib import contextmanager from threading import Thread from typing import Any, Optional, Union @@ -15,6 +16,7 @@ from core.app.entities.app_invoke_entities import ( InvokeFrom, ) from core.app.entities.queue_entities import ( + MessageQueueMessage, QueueAdvancedChatMessageEndEvent, QueueAgentLogEvent, QueueAnnotationReplyEvent, @@ -44,6 +46,7 @@ from core.app.entities.queue_entities import ( QueueWorkflowPartialSuccessEvent, QueueWorkflowStartedEvent, QueueWorkflowSucceededEvent, + WorkflowQueueMessage, ) from core.app.entities.task_entities import ( ChatbotAppBlockingResponse, @@ -52,6 +55,7 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, MessageEndStreamResponse, + PingStreamResponse, StreamResponse, WorkflowTaskState, ) @@ -162,7 +166,6 @@ class AdvancedChatAppGenerateTaskPipeline: Process generate task pipeline. :return: """ - # start generate conversation name thread self._conversation_name_generate_thread = self._message_cycle_manager.generate_conversation_name( conversation_id=self._conversation_id, query=self._application_generate_entity.query ) @@ -254,15 +257,12 @@ class AdvancedChatAppGenerateTaskPipeline: yield response start_listener_time = time.time() - # timeout while (time.time() - start_listener_time) < TTS_AUTO_PLAY_TIMEOUT: try: if not tts_publisher: break audio_trunk = tts_publisher.check_and_get_audio() if audio_trunk is None: - # release cpu - # sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file) time.sleep(TTS_AUTO_PLAY_YIELD_CPU_TIME) continue if audio_trunk.status == "finish": @@ -276,403 +276,613 @@ class AdvancedChatAppGenerateTaskPipeline: if tts_publisher: yield MessageAudioEndStreamResponse(audio="", task_id=task_id) + @contextmanager + def _database_session(self): + """Context manager for database sessions.""" + with Session(db.engine, expire_on_commit=False) as session: + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + + def _ensure_workflow_initialized(self) -> None: + """Fluent validation for workflow state.""" + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[GraphRuntimeState]) -> GraphRuntimeState: + """Fluent validation for graph runtime state.""" + if not graph_runtime_state: + raise ValueError("graph runtime state not initialized.") + return graph_runtime_state + + def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[PingStreamResponse, None, None]: + """Handle ping events.""" + yield self._base_task_pipeline._ping_stream_response() + + def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[ErrorStreamResponse, None, None]: + """Handle error events.""" + with self._database_session() as session: + err = self._base_task_pipeline._handle_error(event=event, session=session, message_id=self._message_id) + yield self._base_task_pipeline._error_to_stream_response(err) + + def _handle_workflow_started_event( + self, event: QueueWorkflowStartedEvent, *, graph_runtime_state: Optional[GraphRuntimeState] = None, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle workflow started events.""" + # Override graph runtime state - this is a side effect but necessary + graph_runtime_state = event.graph_runtime_state + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ + + message = self._get_message(session=session) + if not message: + raise ValueError(f"Message not found: {self._message_id}") + + message.workflow_run_id = workflow_execution.id_ + workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_start_resp + + def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle node retry events.""" + self._ensure_workflow_initialized() + + with self._database_session() as session: + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( + workflow_execution_id=self._workflow_run_id, event=event + ) + node_retry_resp = self._workflow_response_converter.workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_retry_resp: + yield node_retry_resp + + def _handle_node_started_event( + self, event: QueueNodeStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle node started events.""" + self._ensure_workflow_initialized() + + workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( + workflow_execution_id=self._workflow_run_id, event=event + ) + + node_start_resp = self._workflow_response_converter.workflow_node_start_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_start_resp: + yield node_start_resp + + def _handle_node_succeeded_event( + self, event: QueueNodeSucceededEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle node succeeded events.""" + # Record files if it's an answer node or end node + if event.node_type in [NodeType.ANSWER, NodeType.END]: + self._recorded_files.extend( + self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) + ) + + with self._database_session() as session: + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(event=event) + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + self._save_output_for_event(event, workflow_node_execution.id) + + if node_finish_resp: + yield node_finish_resp + + def _handle_node_failed_events( + self, + event: Union[ + QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent + ], + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle various node failure events.""" + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(event=event) + + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) + + if node_finish_resp: + yield node_finish_resp + + def _handle_text_chunk_event( + self, + event: QueueTextChunkEvent, + *, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle text chunk events.""" + delta_text = event.text + if delta_text is None: + return + + # Handle output moderation chunk + should_direct_answer = self._handle_output_moderation_chunk(delta_text) + if should_direct_answer: + return + + # Only publish tts message at text chunk streaming + if tts_publisher and queue_message: + tts_publisher.publish(queue_message) + + self._task_state.answer += delta_text + yield self._message_cycle_manager.message_to_stream_response( + answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector + ) + + def _handle_parallel_branch_started_event( + self, event: QueueParallelBranchRunStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle parallel branch started events.""" + self._ensure_workflow_initialized() + + parallel_start_resp = self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield parallel_start_resp + + def _handle_parallel_branch_finished_events( + self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle parallel branch finished events.""" + self._ensure_workflow_initialized() + + parallel_finish_resp = self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield parallel_finish_resp + + def _handle_iteration_start_event( + self, event: QueueIterationStartEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration start events.""" + self._ensure_workflow_initialized() + + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_start_resp + + def _handle_iteration_next_event( + self, event: QueueIterationNextEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration next events.""" + self._ensure_workflow_initialized() + + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_next_resp + + def _handle_iteration_completed_event( + self, event: QueueIterationCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration completed events.""" + self._ensure_workflow_initialized() + + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_finish_resp + + def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle loop start events.""" + self._ensure_workflow_initialized() + + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_start_resp + + def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle loop next events.""" + self._ensure_workflow_initialized() + + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_next_resp + + def _handle_loop_completed_event( + self, event: QueueLoopCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle loop completed events.""" + self._ensure_workflow_initialized() + + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_finish_resp + + def _handle_workflow_succeeded_event( + self, + event: QueueWorkflowSucceededEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow succeeded events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + outputs=event.outputs, + conversation_id=self._conversation_id, + trace_manager=trace_manager, + ) + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_finish_resp + self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) + + def _handle_workflow_partial_success_event( + self, + event: QueueWorkflowPartialSuccessEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow partial success events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + outputs=event.outputs, + exceptions_count=event.exceptions_count, + conversation_id=None, + trace_manager=trace_manager, + ) + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_finish_resp + self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) + + def _handle_workflow_failed_event( + self, + event: QueueWorkflowFailedEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow failed events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + status=WorkflowExecutionStatus.FAILED, + error_message=event.error, + conversation_id=self._conversation_id, + trace_manager=trace_manager, + exceptions_count=event.exceptions_count, + ) + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}")) + err = self._base_task_pipeline._handle_error(event=err_event, session=session, message_id=self._message_id) + + yield workflow_finish_resp + yield self._base_task_pipeline._error_to_stream_response(err) + + def _handle_stop_event( + self, + event: QueueStopEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle stop events.""" + if self._workflow_run_id and graph_runtime_state: + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( + workflow_run_id=self._workflow_run_id, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + status=WorkflowExecutionStatus.STOPPED, + error_message=event.get_stop_reason(), + conversation_id=self._conversation_id, + trace_manager=trace_manager, + ) + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + # Save message + self._save_message(session=session, graph_runtime_state=graph_runtime_state) + + yield workflow_finish_resp + elif event.stopped_by in ( + QueueStopEvent.StopBy.INPUT_MODERATION, + QueueStopEvent.StopBy.ANNOTATION_REPLY, + ): + # When hitting input-moderation or annotation-reply, the workflow will not start + with self._database_session() as session: + # Save message + self._save_message(session=session) + + yield self._message_end_to_stream_response() + + def _handle_advanced_chat_message_end_event( + self, + event: QueueAdvancedChatMessageEndEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle advanced chat message end events.""" + self._ensure_graph_runtime_initialized(graph_runtime_state) + + output_moderation_answer = self._base_task_pipeline._handle_output_moderation_when_task_finished( + self._task_state.answer + ) + if output_moderation_answer: + self._task_state.answer = output_moderation_answer + yield self._message_cycle_manager.message_replace_to_stream_response( + answer=output_moderation_answer, + reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, + ) + + # Save message + with self._database_session() as session: + self._save_message(session=session, graph_runtime_state=graph_runtime_state) + + yield self._message_end_to_stream_response() + + def _handle_retriever_resources_event( + self, event: QueueRetrieverResourcesEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle retriever resources events.""" + self._message_cycle_manager.handle_retriever_resources(event) + + with self._database_session() as session: + message = self._get_message(session=session) + message.message_metadata = self._task_state.metadata.model_dump_json() + return + yield # Make this a generator + + def _handle_annotation_reply_event( + self, event: QueueAnnotationReplyEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle annotation reply events.""" + self._message_cycle_manager.handle_annotation_reply(event) + + with self._database_session() as session: + message = self._get_message(session=session) + message.message_metadata = self._task_state.metadata.model_dump_json() + return + yield # Make this a generator + + def _handle_message_replace_event( + self, event: QueueMessageReplaceEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle message replace events.""" + yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text, reason=event.reason) + + def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle agent log events.""" + yield self._workflow_response_converter.handle_agent_log( + task_id=self._application_generate_entity.task_id, event=event + ) + + def _get_event_handlers(self) -> dict[type, Callable]: + """Get mapping of event types to their handlers using fluent pattern.""" + return { + # Basic events + QueuePingEvent: self._handle_ping_event, + QueueErrorEvent: self._handle_error_event, + QueueTextChunkEvent: self._handle_text_chunk_event, + # Workflow events + QueueWorkflowStartedEvent: self._handle_workflow_started_event, + QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event, + QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event, + QueueWorkflowFailedEvent: self._handle_workflow_failed_event, + # Node events + QueueNodeRetryEvent: self._handle_node_retry_event, + QueueNodeStartedEvent: self._handle_node_started_event, + QueueNodeSucceededEvent: self._handle_node_succeeded_event, + # Parallel branch events + QueueParallelBranchRunStartedEvent: self._handle_parallel_branch_started_event, + # Iteration events + QueueIterationStartEvent: self._handle_iteration_start_event, + QueueIterationNextEvent: self._handle_iteration_next_event, + QueueIterationCompletedEvent: self._handle_iteration_completed_event, + # Loop events + QueueLoopStartEvent: self._handle_loop_start_event, + QueueLoopNextEvent: self._handle_loop_next_event, + QueueLoopCompletedEvent: self._handle_loop_completed_event, + # Control events + QueueStopEvent: self._handle_stop_event, + # Message events + QueueRetrieverResourcesEvent: self._handle_retriever_resources_event, + QueueAnnotationReplyEvent: self._handle_annotation_reply_event, + QueueMessageReplaceEvent: self._handle_message_replace_event, + QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event, + QueueAgentLogEvent: self._handle_agent_log_event, + } + + def _dispatch_event( + self, + event: Any, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + trace_manager: Optional[TraceQueueManager] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + ) -> Generator[StreamResponse, None, None]: + """Dispatch events using elegant pattern matching.""" + handlers = self._get_event_handlers() + event_type = type(event) + + # Direct handler lookup + if handler := handlers.get(event_type): + yield from handler( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # Handle node failure events with isinstance check + if isinstance( + event, + ( + QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, + QueueNodeInLoopFailedEvent, + QueueNodeExceptionEvent, + ), + ): + yield from self._handle_node_failed_events( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # Handle parallel branch finished events with isinstance check + if isinstance(event, (QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent)): + yield from self._handle_parallel_branch_finished_events( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # For unhandled events, we continue (original behavior) + return + def _process_stream_response( self, tts_publisher: Optional[AppGeneratorTTSPublisher] = None, trace_manager: Optional[TraceQueueManager] = None, ) -> Generator[StreamResponse, None, None]: """ - Process stream response. - :return: + Process stream response using elegant Fluent Python patterns. + Maintains exact same functionality as original 57-if-statement version. """ - # init fake graph runtime state + # Initialize graph runtime state graph_runtime_state: Optional[GraphRuntimeState] = None for queue_message in self._base_task_pipeline._queue_manager.listen(): event = queue_message.event - if isinstance(event, QueuePingEvent): - yield self._base_task_pipeline._ping_stream_response() - elif isinstance(event, QueueErrorEvent): - with Session(db.engine, expire_on_commit=False) as session: - err = self._base_task_pipeline._handle_error( - event=event, session=session, message_id=self._message_id - ) - session.commit() - yield self._base_task_pipeline._error_to_stream_response(err) - break - elif isinstance(event, QueueWorkflowStartedEvent): - # override graph runtime state - graph_runtime_state = event.graph_runtime_state + match event: + case QueueWorkflowStartedEvent(): + graph_runtime_state = event.graph_runtime_state + yield from self._handle_workflow_started_event(event) - with Session(db.engine, expire_on_commit=False) as session: - # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() - self._workflow_run_id = workflow_execution.id_ - message = self._get_message(session=session) - if not message: - raise ValueError(f"Message not found: {self._message_id}") - message.workflow_run_id = workflow_execution.id_ - workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_start_resp - elif isinstance( - event, - QueueNodeRetryEvent, - ): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( - workflow_execution_id=self._workflow_run_id, event=event - ) - node_retry_resp = self._workflow_response_converter.workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - session.commit() - - if node_retry_resp: - yield node_retry_resp - elif isinstance(event, QueueNodeStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( - workflow_execution_id=self._workflow_run_id, event=event - ) - - node_start_resp = self._workflow_response_converter.workflow_node_start_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if node_start_resp: - yield node_start_resp - elif isinstance(event, QueueNodeSucceededEvent): - # Record files if it's an answer node or end node - if event.node_type in [NodeType.ANSWER, NodeType.END]: - self._recorded_files.extend( - self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) + case QueueTextChunkEvent(): + yield from self._handle_text_chunk_event( + event, tts_publisher=tts_publisher, queue_message=queue_message ) - with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( - event=event + case QueueErrorEvent(): + yield from self._handle_error_event(event) + break + + case QueueWorkflowFailedEvent(): + yield from self._handle_workflow_failed_event( + event, graph_runtime_state=graph_runtime_state, trace_manager=trace_manager ) + break - node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, + case QueueStopEvent(): + yield from self._handle_stop_event( + event, graph_runtime_state=graph_runtime_state, trace_manager=trace_manager ) - session.commit() - self._save_output_for_event(event, workflow_node_execution.id) + break - if node_finish_resp: - yield node_finish_resp - elif isinstance( - event, - QueueNodeFailedEvent - | QueueNodeInIterationFailedEvent - | QueueNodeInLoopFailedEvent - | QueueNodeExceptionEvent, - ): - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( - event=event - ) - - node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - if isinstance(event, QueueNodeExceptionEvent): - self._save_output_for_event(event, workflow_node_execution.id) - - if node_finish_resp: - yield node_finish_resp - elif isinstance(event, QueueParallelBranchRunStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_start_resp = ( - self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - ) - - yield parallel_start_resp - elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_finish_resp = ( - self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - ) - - yield parallel_finish_resp - elif isinstance(event, QueueIterationStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_start_resp - elif isinstance(event, QueueIterationNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_next_resp - elif isinstance(event, QueueIterationCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_finish_resp - elif isinstance(event, QueueLoopStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_start_resp - elif isinstance(event, QueueLoopNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_next_resp - elif isinstance(event, QueueLoopCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_finish_resp - elif isinstance(event, QueueWorkflowSucceededEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - if not graph_runtime_state: - raise ValueError("workflow run not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - conversation_id=self._conversation_id, - trace_manager=trace_manager, - ) - - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - - yield workflow_finish_resp - self._base_task_pipeline._queue_manager.publish( - QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE - ) - elif isinstance(event, QueueWorkflowPartialSuccessEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - exceptions_count=event.exceptions_count, - conversation_id=None, - trace_manager=trace_manager, - ) - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - - yield workflow_finish_resp - self._base_task_pipeline._queue_manager.publish( - QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE - ) - elif isinstance(event, QueueWorkflowFailedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - status=WorkflowExecutionStatus.FAILED, - error_message=event.error, - conversation_id=self._conversation_id, - trace_manager=trace_manager, - exceptions_count=event.exceptions_count, - ) - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_execution.error_message}")) - err = self._base_task_pipeline._handle_error( - event=err_event, session=session, message_id=self._message_id - ) - - yield workflow_finish_resp - yield self._base_task_pipeline._error_to_stream_response(err) - break - elif isinstance(event, QueueStopEvent): - if self._workflow_run_id and graph_runtime_state: - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - status=WorkflowExecutionStatus.STOPPED, - error_message=event.get_stop_reason(), - conversation_id=self._conversation_id, + # Handle all other events through elegant dispatch + case _: + if responses := list( + self._dispatch_event( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, trace_manager=trace_manager, + queue_message=queue_message, ) - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - # Save message - self._save_message(session=session, graph_runtime_state=graph_runtime_state) - session.commit() + ): + yield from responses - yield workflow_finish_resp - elif event.stopped_by in ( - QueueStopEvent.StopBy.INPUT_MODERATION, - QueueStopEvent.StopBy.ANNOTATION_REPLY, - ): - # When hitting input-moderation or annotation-reply, the workflow will not start - with Session(db.engine, expire_on_commit=False) as session: - # Save message - self._save_message(session=session) - session.commit() - - yield self._message_end_to_stream_response() - break - elif isinstance(event, QueueRetrieverResourcesEvent): - self._message_cycle_manager.handle_retriever_resources(event) - - with Session(db.engine, expire_on_commit=False) as session: - message = self._get_message(session=session) - message.message_metadata = self._task_state.metadata.model_dump_json() - session.commit() - elif isinstance(event, QueueAnnotationReplyEvent): - self._message_cycle_manager.handle_annotation_reply(event) - - with Session(db.engine, expire_on_commit=False) as session: - message = self._get_message(session=session) - message.message_metadata = self._task_state.metadata.model_dump_json() - session.commit() - elif isinstance(event, QueueTextChunkEvent): - delta_text = event.text - if delta_text is None: - continue - - # handle output moderation chunk - should_direct_answer = self._handle_output_moderation_chunk(delta_text) - if should_direct_answer: - continue - - # only publish tts message at text chunk streaming - if tts_publisher: - tts_publisher.publish(queue_message) - - self._task_state.answer += delta_text - yield self._message_cycle_manager.message_to_stream_response( - answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector - ) - elif isinstance(event, QueueMessageReplaceEvent): - # published by moderation - yield self._message_cycle_manager.message_replace_to_stream_response( - answer=event.text, reason=event.reason - ) - elif isinstance(event, QueueAdvancedChatMessageEndEvent): - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - output_moderation_answer = self._base_task_pipeline._handle_output_moderation_when_task_finished( - self._task_state.answer - ) - if output_moderation_answer: - self._task_state.answer = output_moderation_answer - yield self._message_cycle_manager.message_replace_to_stream_response( - answer=output_moderation_answer, - reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, - ) - - # Save message - with Session(db.engine, expire_on_commit=False) as session: - self._save_message(session=session, graph_runtime_state=graph_runtime_state) - session.commit() - - yield self._message_end_to_stream_response() - elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_response_converter.handle_agent_log( - task_id=self._application_generate_entity.task_id, event=event - ) - else: - continue - - # publish None when task finished if tts_publisher: tts_publisher.publish(None) @@ -744,7 +954,6 @@ class AdvancedChatAppGenerateTaskPipeline: """ if self._base_task_pipeline._output_moderation_handler: if self._base_task_pipeline._output_moderation_handler.should_direct_output(): - # stop subscribe new token when output moderation should direct output self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output() self._base_task_pipeline._queue_manager.publish( QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 7adc03e9c..9a39b2e01 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -1,7 +1,8 @@ import logging import time -from collections.abc import Generator -from typing import Optional, Union +from collections.abc import Callable, Generator +from contextlib import contextmanager +from typing import Any, Optional, Union from sqlalchemy.orm import Session @@ -13,6 +14,7 @@ from core.app.entities.app_invoke_entities import ( WorkflowAppGenerateEntity, ) from core.app.entities.queue_entities import ( + MessageQueueMessage, QueueAgentLogEvent, QueueErrorEvent, QueueIterationCompletedEvent, @@ -38,11 +40,13 @@ from core.app.entities.queue_entities import ( QueueWorkflowPartialSuccessEvent, QueueWorkflowStartedEvent, QueueWorkflowSucceededEvent, + WorkflowQueueMessage, ) from core.app.entities.task_entities import ( ErrorStreamResponse, MessageAudioEndStreamResponse, MessageAudioStreamResponse, + PingStreamResponse, StreamResponse, TextChunkStreamResponse, WorkflowAppBlockingResponse, @@ -54,6 +58,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType +from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository @@ -246,315 +251,492 @@ class WorkflowAppGenerateTaskPipeline: if tts_publisher: yield MessageAudioEndStreamResponse(audio="", task_id=task_id) + @contextmanager + def _database_session(self): + """Context manager for database sessions.""" + with Session(db.engine, expire_on_commit=False) as session: + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + + def _ensure_workflow_initialized(self) -> None: + """Fluent validation for workflow state.""" + if not self._workflow_run_id: + raise ValueError("workflow run not initialized.") + + def _ensure_graph_runtime_initialized(self, graph_runtime_state: Optional[GraphRuntimeState]) -> GraphRuntimeState: + """Fluent validation for graph runtime state.""" + if not graph_runtime_state: + raise ValueError("graph runtime state not initialized.") + return graph_runtime_state + + def _handle_ping_event(self, event: QueuePingEvent, **kwargs) -> Generator[PingStreamResponse, None, None]: + """Handle ping events.""" + yield self._base_task_pipeline._ping_stream_response() + + def _handle_error_event(self, event: QueueErrorEvent, **kwargs) -> Generator[ErrorStreamResponse, None, None]: + """Handle error events.""" + err = self._base_task_pipeline._handle_error(event=event) + yield self._base_task_pipeline._error_to_stream_response(err) + + def _handle_workflow_started_event( + self, event: QueueWorkflowStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle workflow started events.""" + # init workflow run + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() + self._workflow_run_id = workflow_execution.id_ + start_resp = self._workflow_response_converter.workflow_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + yield start_resp + + def _handle_node_retry_event(self, event: QueueNodeRetryEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle node retry events.""" + self._ensure_workflow_initialized() + + with self._database_session() as session: + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( + workflow_execution_id=self._workflow_run_id, + event=event, + ) + response = self._workflow_response_converter.workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if response: + yield response + + def _handle_node_started_event( + self, event: QueueNodeStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle node started events.""" + self._ensure_workflow_initialized() + + workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( + workflow_execution_id=self._workflow_run_id, event=event + ) + node_start_response = self._workflow_response_converter.workflow_node_start_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if node_start_response: + yield node_start_response + + def _handle_node_succeeded_event( + self, event: QueueNodeSucceededEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle node succeeded events.""" + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(event=event) + node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + self._save_output_for_event(event, workflow_node_execution.id) + + if node_success_response: + yield node_success_response + + def _handle_node_failed_events( + self, + event: Union[ + QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, QueueNodeInLoopFailedEvent, QueueNodeExceptionEvent + ], + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle various node failure events.""" + workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( + event=event, + ) + node_failed_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) + + if node_failed_response: + yield node_failed_response + + def _handle_parallel_branch_started_event( + self, event: QueueParallelBranchRunStartedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle parallel branch started events.""" + self._ensure_workflow_initialized() + + parallel_start_resp = self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield parallel_start_resp + + def _handle_parallel_branch_finished_events( + self, event: Union[QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent], **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle parallel branch finished events.""" + self._ensure_workflow_initialized() + + parallel_finish_resp = self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield parallel_finish_resp + + def _handle_iteration_start_event( + self, event: QueueIterationStartEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration start events.""" + self._ensure_workflow_initialized() + + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_start_resp + + def _handle_iteration_next_event( + self, event: QueueIterationNextEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration next events.""" + self._ensure_workflow_initialized() + + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_next_resp + + def _handle_iteration_completed_event( + self, event: QueueIterationCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle iteration completed events.""" + self._ensure_workflow_initialized() + + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield iter_finish_resp + + def _handle_loop_start_event(self, event: QueueLoopStartEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle loop start events.""" + self._ensure_workflow_initialized() + + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_start_resp + + def _handle_loop_next_event(self, event: QueueLoopNextEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle loop next events.""" + self._ensure_workflow_initialized() + + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_next_resp + + def _handle_loop_completed_event( + self, event: QueueLoopCompletedEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle loop completed events.""" + self._ensure_workflow_initialized() + + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) + yield loop_finish_resp + + def _handle_workflow_succeeded_event( + self, + event: QueueWorkflowSucceededEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow succeeded events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + outputs=event.outputs, + conversation_id=None, + trace_manager=trace_manager, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_finish_resp + + def _handle_workflow_partial_success_event( + self, + event: QueueWorkflowPartialSuccessEvent, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow partial success events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + outputs=event.outputs, + exceptions_count=event.exceptions_count, + conversation_id=None, + trace_manager=trace_manager, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_finish_resp + + def _handle_workflow_failed_and_stop_events( + self, + event: Union[QueueWorkflowFailedEvent, QueueStopEvent], + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + trace_manager: Optional[TraceQueueManager] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle workflow failed and stop events.""" + self._ensure_workflow_initialized() + validated_state = self._ensure_graph_runtime_initialized(graph_runtime_state) + + with self._database_session() as session: + workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( + workflow_run_id=self._workflow_run_id, + total_tokens=validated_state.total_tokens, + total_steps=validated_state.node_run_steps, + status=WorkflowExecutionStatus.FAILED + if isinstance(event, QueueWorkflowFailedEvent) + else WorkflowExecutionStatus.STOPPED, + error_message=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(), + conversation_id=None, + trace_manager=trace_manager, + exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, + ) + + # save workflow app log + self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( + session=session, + task_id=self._application_generate_entity.task_id, + workflow_execution=workflow_execution, + ) + + yield workflow_finish_resp + + def _handle_text_chunk_event( + self, + event: QueueTextChunkEvent, + *, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + **kwargs, + ) -> Generator[StreamResponse, None, None]: + """Handle text chunk events.""" + delta_text = event.text + if delta_text is None: + return + + # only publish tts message at text chunk streaming + if tts_publisher and queue_message: + tts_publisher.publish(queue_message) + + yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector) + + def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: + """Handle agent log events.""" + yield self._workflow_response_converter.handle_agent_log( + task_id=self._application_generate_entity.task_id, event=event + ) + + def _get_event_handlers(self) -> dict[type, Callable]: + """Get mapping of event types to their handlers using fluent pattern.""" + return { + # Basic events + QueuePingEvent: self._handle_ping_event, + QueueErrorEvent: self._handle_error_event, + QueueTextChunkEvent: self._handle_text_chunk_event, + # Workflow events + QueueWorkflowStartedEvent: self._handle_workflow_started_event, + QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event, + QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event, + # Node events + QueueNodeRetryEvent: self._handle_node_retry_event, + QueueNodeStartedEvent: self._handle_node_started_event, + QueueNodeSucceededEvent: self._handle_node_succeeded_event, + # Parallel branch events + QueueParallelBranchRunStartedEvent: self._handle_parallel_branch_started_event, + # Iteration events + QueueIterationStartEvent: self._handle_iteration_start_event, + QueueIterationNextEvent: self._handle_iteration_next_event, + QueueIterationCompletedEvent: self._handle_iteration_completed_event, + # Loop events + QueueLoopStartEvent: self._handle_loop_start_event, + QueueLoopNextEvent: self._handle_loop_next_event, + QueueLoopCompletedEvent: self._handle_loop_completed_event, + # Agent events + QueueAgentLogEvent: self._handle_agent_log_event, + } + + def _dispatch_event( + self, + event: Any, + *, + graph_runtime_state: Optional[GraphRuntimeState] = None, + tts_publisher: Optional[AppGeneratorTTSPublisher] = None, + trace_manager: Optional[TraceQueueManager] = None, + queue_message: Optional[Union[WorkflowQueueMessage, MessageQueueMessage]] = None, + ) -> Generator[StreamResponse, None, None]: + """Dispatch events using elegant pattern matching.""" + handlers = self._get_event_handlers() + event_type = type(event) + + # Direct handler lookup + if handler := handlers.get(event_type): + yield from handler( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # Handle node failure events with isinstance check + if isinstance( + event, + ( + QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, + QueueNodeInLoopFailedEvent, + QueueNodeExceptionEvent, + ), + ): + yield from self._handle_node_failed_events( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # Handle parallel branch finished events with isinstance check + if isinstance(event, (QueueParallelBranchRunSucceededEvent, QueueParallelBranchRunFailedEvent)): + yield from self._handle_parallel_branch_finished_events( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # Handle workflow failed and stop events with isinstance check + if isinstance(event, (QueueWorkflowFailedEvent, QueueStopEvent)): + yield from self._handle_workflow_failed_and_stop_events( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + return + + # For unhandled events, we continue (original behavior) + return + def _process_stream_response( self, tts_publisher: Optional[AppGeneratorTTSPublisher] = None, trace_manager: Optional[TraceQueueManager] = None, ) -> Generator[StreamResponse, None, None]: """ - Process stream response. - :return: + Process stream response using elegant Fluent Python patterns. + Maintains exact same functionality as original 44-if-statement version. """ + # Initialize graph runtime state graph_runtime_state = None for queue_message in self._base_task_pipeline._queue_manager.listen(): event = queue_message.event - if isinstance(event, QueuePingEvent): - yield self._base_task_pipeline._ping_stream_response() - elif isinstance(event, QueueErrorEvent): - err = self._base_task_pipeline._handle_error(event=event) - yield self._base_task_pipeline._error_to_stream_response(err) - break - elif isinstance(event, QueueWorkflowStartedEvent): - # override graph runtime state - graph_runtime_state = event.graph_runtime_state + match event: + case QueueWorkflowStartedEvent(): + graph_runtime_state = event.graph_runtime_state + yield from self._handle_workflow_started_event(event) - # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start() - self._workflow_run_id = workflow_execution.id_ - start_resp = self._workflow_response_converter.workflow_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - - yield start_resp - elif isinstance( - event, - QueueNodeRetryEvent, - ): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( - workflow_execution_id=self._workflow_run_id, - event=event, - ) - response = self._workflow_response_converter.workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - session.commit() - - if response: - yield response - elif isinstance(event, QueueNodeStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( - workflow_execution_id=self._workflow_run_id, event=event - ) - node_start_response = self._workflow_response_converter.workflow_node_start_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if node_start_response: - yield node_start_response - elif isinstance(event, QueueNodeSucceededEvent): - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( - event=event - ) - node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - self._save_output_for_event(event, workflow_node_execution.id) - - if node_success_response: - yield node_success_response - elif isinstance( - event, - QueueNodeFailedEvent - | QueueNodeInIterationFailedEvent - | QueueNodeInLoopFailedEvent - | QueueNodeExceptionEvent, - ): - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( - event=event, - ) - node_failed_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - if isinstance(event, QueueNodeExceptionEvent): - self._save_output_for_event(event, workflow_node_execution.id) - - if node_failed_response: - yield node_failed_response - - elif isinstance(event, QueueParallelBranchRunStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_start_resp = ( - self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - ) - - yield parallel_start_resp - - elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_finish_resp = ( - self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - ) - - yield parallel_finish_resp - - elif isinstance(event, QueueIterationStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_start_resp - - elif isinstance(event, QueueIterationNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_next_resp - - elif isinstance(event, QueueIterationCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_finish_resp - - elif isinstance(event, QueueLoopStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_start_resp - - elif isinstance(event, QueueLoopNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_next_resp - - elif isinstance(event, QueueLoopCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_finish_resp - - elif isinstance(event, QueueWorkflowSucceededEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - conversation_id=None, - trace_manager=trace_manager, + case QueueTextChunkEvent(): + yield from self._handle_text_chunk_event( + event, tts_publisher=tts_publisher, queue_message=queue_message ) - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) + case QueueErrorEvent(): + yield from self._handle_error_event(event) + break - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueWorkflowPartialSuccessEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - exceptions_count=event.exceptions_count, - conversation_id=None, - trace_manager=trace_manager, - ) - - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - status=WorkflowExecutionStatus.FAILED - if isinstance(event, QueueWorkflowFailedEvent) - else WorkflowExecutionStatus.STOPPED, - error_message=event.error - if isinstance(event, QueueWorkflowFailedEvent) - else event.get_stop_reason(), - conversation_id=None, - trace_manager=trace_manager, - exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, - ) - - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - - workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueTextChunkEvent): - delta_text = event.text - if delta_text is None: - continue - - # only publish tts message at text chunk streaming - if tts_publisher: - tts_publisher.publish(queue_message) - - yield self._text_chunk_to_stream_response( - delta_text, from_variable_selector=event.from_variable_selector - ) - elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_response_converter.handle_agent_log( - task_id=self._application_generate_entity.task_id, event=event - ) - else: - continue + # Handle all other events through elegant dispatch + case _: + if responses := list( + self._dispatch_event( + event, + graph_runtime_state=graph_runtime_state, + tts_publisher=tts_publisher, + trace_manager=trace_manager, + queue_message=queue_message, + ) + ): + yield from responses if tts_publisher: tts_publisher.publish(None)