From acd209a890507da03a7d2904da349225eb016381 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=90=E5=B0=8F=E5=BF=83?= Date: Mon, 1 Sep 2025 18:22:42 +0800 Subject: [PATCH] fix: prevent database connection leaks in chatflow mode by using Session-managed queries (#24656) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 王锶奇 Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/core/app/apps/advanced_chat/app_generator.py | 6 ++++++ api/core/app/apps/advanced_chat/app_runner.py | 3 ++- api/core/app/apps/message_based_app_generator.py | 9 +++++++-- api/core/app/task_pipeline/message_cycle_manager.py | 8 ++++++-- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 52ae20ee1..74e282fdc 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -450,6 +450,12 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): worker_thread.start() + # release database connection, because the following new thread operations may take a long time + db.session.refresh(workflow) + db.session.refresh(message) + db.session.refresh(user) + db.session.close() + # return response or stream generator response = self._handle_advanced_chat_response( application_generate_entity=application_generate_entity, diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 8d256da9c..b78a364a7 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -72,7 +72,8 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): app_config = self.application_generate_entity.app_config app_config = cast(AdvancedChatAppConfig, app_config) - app_record = db.session.query(App).where(App.id == app_config.app_id).first() + with Session(db.engine, expire_on_commit=False) as session: + app_record = session.scalar(select(App).where(App.id == app_config.app_id)) if not app_record: raise ValueError("App not found") diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index 11c979765..1b107e072 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -3,6 +3,9 @@ import logging from collections.abc import Generator from typing import Optional, Union, cast +from sqlalchemy import select +from sqlalchemy.orm import Session + from core.app.app_config.entities import EasyUIBasedAppConfig, EasyUIBasedAppModelConfigFrom from core.app.apps.base_app_generator import BaseAppGenerator from core.app.apps.base_app_queue_manager import AppQueueManager @@ -253,7 +256,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): :param conversation_id: conversation id :return: conversation """ - conversation = db.session.query(Conversation).where(Conversation.id == conversation_id).first() + with Session(db.engine, expire_on_commit=False) as session: + conversation = session.scalar(select(Conversation).where(Conversation.id == conversation_id)) if not conversation: raise ConversationNotExistsError("Conversation not exists") @@ -266,7 +270,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): :param message_id: message id :return: message """ - message = db.session.query(Message).where(Message.id == message_id).first() + with Session(db.engine, expire_on_commit=False) as session: + message = session.scalar(select(Message).where(Message.id == message_id)) if message is None: raise MessageNotExistsError("Message not exists") diff --git a/api/core/app/task_pipeline/message_cycle_manager.py b/api/core/app/task_pipeline/message_cycle_manager.py index 50b51f70f..956ef4e83 100644 --- a/api/core/app/task_pipeline/message_cycle_manager.py +++ b/api/core/app/task_pipeline/message_cycle_manager.py @@ -3,6 +3,8 @@ from threading import Thread from typing import Optional, Union from flask import Flask, current_app +from sqlalchemy import select +from sqlalchemy.orm import Session from configs import dify_config from core.app.entities.app_invoke_entities import ( @@ -143,7 +145,8 @@ class MessageCycleManager: :param event: event :return: """ - message_file = db.session.query(MessageFile).where(MessageFile.id == event.message_file_id).first() + with Session(db.engine, expire_on_commit=False) as session: + message_file = session.scalar(select(MessageFile).where(MessageFile.id == event.message_file_id)) if message_file and message_file.url is not None: # get tool file id @@ -183,7 +186,8 @@ class MessageCycleManager: :param message_id: message id :return: """ - message_file = db.session.query(MessageFile).where(MessageFile.id == message_id).first() + with Session(db.engine, expire_on_commit=False) as session: + message_file = session.scalar(select(MessageFile).where(MessageFile.id == message_id)) event_type = StreamEvent.MESSAGE_FILE if message_file else StreamEvent.MESSAGE return MessageStreamResponse(