From 75199442c16fb2ad1758d1f813b0c9249fc150e9 Mon Sep 17 00:00:00 2001 From: 9527MrLi <69895702+9527MrLi@users.noreply.github.com> Date: Tue, 19 Aug 2025 09:47:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Implements=20periodic=20deletion=20of?= =?UTF-8?q?=20workflow=20run=20logs=20that=20exceed=20t=E2=80=A6=20(#23881?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: shiyun.li973792 Co-authored-by: 1wangshu Co-authored-by: Blackoutta Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> --- api/.env.example | 7 + api/configs/feature/__init__.py | 9 + api/extensions/ext_celery.py | 8 +- .../clean_workflow_runlogs_precise.py | 155 ++++++++++++++++++ docker/.env.example | 8 + docker/docker-compose.yaml | 3 + 6 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 api/schedule/clean_workflow_runlogs_precise.py diff --git a/api/.env.example b/api/.env.example index 3c3087242..3052dbfe2 100644 --- a/api/.env.example +++ b/api/.env.example @@ -478,6 +478,13 @@ API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node # API workflow run repository implementation API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository +# Workflow log cleanup configuration +# Enable automatic cleanup of workflow run logs to manage database size +WORKFLOW_LOG_CLEANUP_ENABLED=true +# Number of days to retain workflow run logs (default: 30 days) +WORKFLOW_LOG_RETENTION_DAYS=30 +# Batch size for workflow log cleanup operations (default: 100) +WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100 # App configuration APP_MAX_EXECUTION_TIME=1200 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 0b2f99aec..2bccc4b7a 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -968,6 +968,14 @@ class AccountConfig(BaseSettings): ) +class WorkflowLogConfig(BaseSettings): + WORKFLOW_LOG_CLEANUP_ENABLED: bool = Field(default=True, description="Enable workflow run log cleanup") + WORKFLOW_LOG_RETENTION_DAYS: int = Field(default=30, description="Retention days for workflow run logs") + WORKFLOW_LOG_CLEANUP_BATCH_SIZE: int = Field( + default=100, description="Batch size for workflow run log cleanup operations" + ) + + class FeatureConfig( # place the configs in alphabet order AppExecutionConfig, @@ -1003,5 +1011,6 @@ class FeatureConfig( HostedServiceConfig, CeleryBeatConfig, CeleryScheduleTasksConfig, + WorkflowLogConfig, ): pass diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 6010d85eb..00e0bd9a1 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -151,7 +151,13 @@ def init_app(app: DifyApp) -> Celery: "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", "schedule": crontab(minute="*/15"), } - + if dify_config.WORKFLOW_LOG_CLEANUP_ENABLED: + # 2:00 AM every day + imports.append("schedule.clean_workflow_runlogs_precise") + beat_schedule["clean_workflow_runlogs_precise"] = { + "task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise", + "schedule": crontab(minute="0", hour="2"), + } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) return celery_app diff --git a/api/schedule/clean_workflow_runlogs_precise.py b/api/schedule/clean_workflow_runlogs_precise.py new file mode 100644 index 000000000..0de3b5f68 --- /dev/null +++ b/api/schedule/clean_workflow_runlogs_precise.py @@ -0,0 +1,155 @@ +import datetime +import logging +import time + +import click + +import app +from configs import dify_config +from extensions.ext_database import db +from models.model import ( + AppAnnotationHitHistory, + Conversation, + Message, + MessageAgentThought, + MessageAnnotation, + MessageChain, + MessageFeedback, + MessageFile, +) +from models.workflow import ConversationVariable, WorkflowAppLog, WorkflowNodeExecutionModel, WorkflowRun + +_logger = logging.getLogger(__name__) + + +MAX_RETRIES = 3 +BATCH_SIZE = dify_config.WORKFLOW_LOG_CLEANUP_BATCH_SIZE + + +@app.celery.task(queue="dataset") +def clean_workflow_runlogs_precise(): + """Clean expired workflow run logs with retry mechanism and complete message cascade""" + + click.echo(click.style("Start clean workflow run logs (precise mode with complete cascade).", fg="green")) + start_at = time.perf_counter() + + retention_days = dify_config.WORKFLOW_LOG_RETENTION_DAYS + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days) + + try: + total_workflow_runs = db.session.query(WorkflowRun).filter(WorkflowRun.created_at < cutoff_date).count() + if total_workflow_runs == 0: + _logger.info("No expired workflow run logs found") + return + _logger.info("Found %s expired workflow run logs to clean", total_workflow_runs) + + total_deleted = 0 + failed_batches = 0 + batch_count = 0 + + while True: + workflow_runs = ( + db.session.query(WorkflowRun.id).filter(WorkflowRun.created_at < cutoff_date).limit(BATCH_SIZE).all() + ) + + if not workflow_runs: + break + + workflow_run_ids = [run.id for run in workflow_runs] + batch_count += 1 + + success = _delete_batch_with_retry(workflow_run_ids, failed_batches) + + if success: + total_deleted += len(workflow_run_ids) + failed_batches = 0 + else: + failed_batches += 1 + if failed_batches >= MAX_RETRIES: + _logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES) + break + else: + # Calculate incremental delay times: 5, 10, 15 minutes + retry_delay_minutes = failed_batches * 5 + _logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes) + time.sleep(retry_delay_minutes * 60) + continue + + _logger.info("Cleanup completed: %s expired workflow run logs deleted", total_deleted) + + except Exception as e: + db.session.rollback() + _logger.exception("Unexpected error in workflow log cleanup") + raise + + end_at = time.perf_counter() + execution_time = end_at - start_at + click.echo(click.style(f"Cleaned workflow run logs from db success latency: {execution_time:.2f}s", fg="green")) + + +def _delete_batch_with_retry(workflow_run_ids: list[str], attempt_count: int) -> bool: + """Delete a single batch with a retry mechanism and complete cascading deletion""" + try: + with db.session.begin_nested(): + message_data = ( + db.session.query(Message.id, Message.conversation_id) + .filter(Message.workflow_run_id.in_(workflow_run_ids)) + .all() + ) + message_id_list = [msg.id for msg in message_data] + conversation_id_list = list({msg.conversation_id for msg in message_data if msg.conversation_id}) + if message_id_list: + db.session.query(AppAnnotationHitHistory).filter( + AppAnnotationHitHistory.message_id.in_(message_id_list) + ).delete(synchronize_session=False) + + db.session.query(MessageAgentThought).filter( + MessageAgentThought.message_id.in_(message_id_list) + ).delete(synchronize_session=False) + + db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_id_list)).delete( + synchronize_session=False + ) + + db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_id_list)).delete( + synchronize_session=False + ) + + db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_id_list)).delete( + synchronize_session=False + ) + + db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_id_list)).delete( + synchronize_session=False + ) + + db.session.query(Message).filter(Message.workflow_run_id.in_(workflow_run_ids)).delete( + synchronize_session=False + ) + + db.session.query(WorkflowAppLog).filter(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete( + synchronize_session=False + ) + + db.session.query(WorkflowNodeExecutionModel).filter( + WorkflowNodeExecutionModel.workflow_run_id.in_(workflow_run_ids) + ).delete(synchronize_session=False) + + if conversation_id_list: + db.session.query(ConversationVariable).filter( + ConversationVariable.conversation_id.in_(conversation_id_list) + ).delete(synchronize_session=False) + + db.session.query(Conversation).filter(Conversation.id.in_(conversation_id_list)).delete( + synchronize_session=False + ) + + db.session.query(WorkflowRun).filter(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False) + + db.session.commit() + return True + + except Exception as e: + db.session.rollback() + _logger.exception("Batch deletion failed (attempt %s)", attempt_count + 1) + return False diff --git a/docker/.env.example b/docker/.env.example index 743a1e8bb..826b7b9fe 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -887,6 +887,14 @@ API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository. # API workflow node execution repository implementation API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository +# Workflow log cleanup configuration +# Enable automatic cleanup of workflow run logs to manage database size +WORKFLOW_LOG_CLEANUP_ENABLED=false +# Number of days to retain workflow run logs (default: 30 days) +WORKFLOW_LOG_RETENTION_DAYS=30 +# Batch size for workflow log cleanup operations (default: 100) +WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100 + # HTTP request node in workflow configuration HTTP_REQUEST_NODE_MAX_BINARY_SIZE=10485760 HTTP_REQUEST_NODE_MAX_TEXT_SIZE=1048576 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index bcf9588df..0c352e465 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -396,6 +396,9 @@ x-shared-env: &shared-api-worker-env CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: ${CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY:-core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository} API_WORKFLOW_RUN_REPOSITORY: ${API_WORKFLOW_RUN_REPOSITORY:-repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository} API_WORKFLOW_NODE_EXECUTION_REPOSITORY: ${API_WORKFLOW_NODE_EXECUTION_REPOSITORY:-repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository} + WORKFLOW_LOG_CLEANUP_ENABLED: ${WORKFLOW_LOG_CLEANUP_ENABLED:-false} + WORKFLOW_LOG_RETENTION_DAYS: ${WORKFLOW_LOG_RETENTION_DAYS:-30} + WORKFLOW_LOG_CLEANUP_BATCH_SIZE: ${WORKFLOW_LOG_CLEANUP_BATCH_SIZE:-100} HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760} HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576} HTTP_REQUEST_NODE_SSL_VERIFY: ${HTTP_REQUEST_NODE_SSL_VERIFY:-True}