From 3aedc139acfe9736ace6fa5de347d6779d63dc94 Mon Sep 17 00:00:00 2001 From: znn Date: Mon, 25 Aug 2025 07:13:45 +0530 Subject: [PATCH] fix delete conversations via Api and delete conversations from db as well (#23591) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: crazywoola <427733928@qq.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- api/README.md | 2 +- api/controllers/console/app/conversation.py | 34 ++++------- api/docker/entrypoint.sh | 2 +- api/services/conversation_service.py | 22 +++++-- api/tasks/delete_conversation_task.py | 68 +++++++++++++++++++++ 5 files changed, 100 insertions(+), 28 deletions(-) create mode 100644 api/tasks/delete_conversation_task.py diff --git a/api/README.md b/api/README.md index 5571fdd0f..9e31172ac 100644 --- a/api/README.md +++ b/api/README.md @@ -80,7 +80,7 @@ 1. If you need to handle and debug the async tasks (e.g. dataset importing and documents indexing), please start the worker service. ```bash -uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage +uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation ``` Addition, if you want to debug the celery scheduled tasks, you can use the following command in another terminal: diff --git a/api/controllers/console/app/conversation.py b/api/controllers/console/app/conversation.py index c6f636099..06f021877 100644 --- a/api/controllers/console/app/conversation.py +++ b/api/controllers/console/app/conversation.py @@ -24,6 +24,8 @@ from libs.helper import DatetimeString from libs.login import login_required from models import Conversation, EndUser, Message, MessageAnnotation from models.model import AppMode +from services.conversation_service import ConversationService +from services.errors.conversation import ConversationNotExistsError class CompletionConversationApi(Resource): @@ -46,7 +48,9 @@ class CompletionConversationApi(Resource): parser.add_argument("limit", type=int_range(1, 100), default=20, location="args") args = parser.parse_args() - query = db.select(Conversation).where(Conversation.app_id == app_model.id, Conversation.mode == "completion") + query = db.select(Conversation).where( + Conversation.app_id == app_model.id, Conversation.mode == "completion", Conversation.is_deleted.is_(False) + ) if args["keyword"]: query = query.join(Message, Message.conversation_id == Conversation.id).where( @@ -119,18 +123,11 @@ class CompletionConversationDetailApi(Resource): raise Forbidden() conversation_id = str(conversation_id) - conversation = ( - db.session.query(Conversation) - .where(Conversation.id == conversation_id, Conversation.app_id == app_model.id) - .first() - ) - - if not conversation: + try: + ConversationService.delete(app_model, conversation_id, current_user) + except ConversationNotExistsError: raise NotFound("Conversation Not Exists.") - conversation.is_deleted = True - db.session.commit() - return {"result": "success"}, 204 @@ -171,7 +168,7 @@ class ChatConversationApi(Resource): .subquery() ) - query = db.select(Conversation).where(Conversation.app_id == app_model.id) + query = db.select(Conversation).where(Conversation.app_id == app_model.id, Conversation.is_deleted.is_(False)) if args["keyword"]: keyword_filter = f"%{args['keyword']}%" @@ -284,18 +281,11 @@ class ChatConversationDetailApi(Resource): raise Forbidden() conversation_id = str(conversation_id) - conversation = ( - db.session.query(Conversation) - .where(Conversation.id == conversation_id, Conversation.app_id == app_model.id) - .first() - ) - - if not conversation: + try: + ConversationService.delete(app_model, conversation_id, current_user) + except ConversationNotExistsError: raise NotFound("Conversation Not Exists.") - conversation.is_deleted = True - db.session.commit() - return {"result": "success"}, 204 diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index da147fe89..e21092349 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -32,7 +32,7 @@ if [[ "${MODE}" == "worker" ]]; then exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \ --max-tasks-per-child ${MAX_TASK_PRE_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \ - -Q ${CELERY_QUEUES:-dataset,mail,ops_trace,app_deletion,plugin,workflow_storage} + -Q ${CELERY_QUEUES:-dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation} elif [[ "${MODE}" == "beat" ]]; then exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO} diff --git a/api/services/conversation_service.py b/api/services/conversation_service.py index 712ef4c60..ac603d3cc 100644 --- a/api/services/conversation_service.py +++ b/api/services/conversation_service.py @@ -1,4 +1,5 @@ import contextlib +import logging from collections.abc import Callable, Sequence from typing import Any, Optional, Union @@ -23,6 +24,9 @@ from services.errors.conversation import ( LastConversationNotExistsError, ) from services.errors.message import MessageNotExistsError +from tasks.delete_conversation_task import delete_conversation_related_data + +logger = logging.getLogger(__name__) class ConversationService: @@ -175,11 +179,21 @@ class ConversationService: @classmethod def delete(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): - conversation = cls.get_conversation(app_model, conversation_id, user) + try: + logger.info( + "Initiating conversation deletion for app_name %s, conversation_id: %s", + app_model.name, + conversation_id, + ) - conversation.is_deleted = True - conversation.updated_at = naive_utc_now() - db.session.commit() + db.session.query(Conversation).where(Conversation.id == conversation_id).delete(synchronize_session=False) + db.session.commit() + + delete_conversation_related_data.delay(conversation_id) + + except Exception as e: + db.session.rollback() + raise e @classmethod def get_conversational_variable( diff --git a/api/tasks/delete_conversation_task.py b/api/tasks/delete_conversation_task.py new file mode 100644 index 000000000..4279dd2c1 --- /dev/null +++ b/api/tasks/delete_conversation_task.py @@ -0,0 +1,68 @@ +import logging +import time + +import click +from celery import shared_task # type: ignore + +from extensions.ext_database import db +from models import ConversationVariable +from models.model import Message, MessageAnnotation, MessageFeedback +from models.tools import ToolConversationVariables, ToolFile +from models.web import PinnedConversation + + +@shared_task(queue="conversation") +def delete_conversation_related_data(conversation_id: str) -> None: + """ + Delete related data conversation in correct order from datatbase to respect foreign key constraints + + Args: + conversation_id: conversation Id + """ + + logging.info( + click.style(f"Starting to delete conversation data from db for conversation_id {conversation_id}", fg="green") + ) + start_at = time.perf_counter() + + try: + db.session.query(MessageAnnotation).where(MessageAnnotation.conversation_id == conversation_id).delete( + synchronize_session=False + ) + + db.session.query(MessageFeedback).where(MessageFeedback.conversation_id == conversation_id).delete( + synchronize_session=False + ) + + db.session.query(ToolConversationVariables).where( + ToolConversationVariables.conversation_id == conversation_id + ).delete(synchronize_session=False) + + db.session.query(ToolFile).where(ToolFile.conversation_id == conversation_id).delete(synchronize_session=False) + + db.session.query(ConversationVariable).where(ConversationVariable.conversation_id == conversation_id).delete( + synchronize_session=False + ) + + db.session.query(Message).where(Message.conversation_id == conversation_id).delete(synchronize_session=False) + + db.session.query(PinnedConversation).where(PinnedConversation.conversation_id == conversation_id).delete( + synchronize_session=False + ) + + db.session.commit() + + end_at = time.perf_counter() + logging.info( + click.style( + f"Succeeded cleaning data from db for conversation_id {conversation_id} latency: {end_at - start_at}", + fg="green", + ) + ) + + except Exception as e: + logging.exception("Failed to delete data from db for conversation_id: %s failed", conversation_id) + db.session.rollback() + raise e + finally: + db.session.close()