diff --git a/api/.env.example b/api/.env.example index ae7e82c77..787830858 100644 --- a/api/.env.example +++ b/api/.env.example @@ -491,3 +491,10 @@ OTEL_METRIC_EXPORT_TIMEOUT=30000 # Prevent Clickjacking ALLOW_EMBED=false + +# Dataset queue monitor configuration +QUEUE_MONITOR_THRESHOLD=200 +# You can configure multiple ones, separated by commas. eg: test1@dify.ai,test2@dify.ai +QUEUE_MONITOR_ALERT_EMAILS= +# Monitor interval in minutes, default is 30 minutes +QUEUE_MONITOR_INTERVAL=30 diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index 1b015b326..2dcf1710b 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -2,7 +2,7 @@ import os from typing import Any, Literal, Optional from urllib.parse import parse_qsl, quote_plus -from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt, computed_field +from pydantic import Field, NonNegativeFloat, NonNegativeInt, PositiveFloat, PositiveInt, computed_field from pydantic_settings import BaseSettings from .cache.redis_config import RedisConfig @@ -256,6 +256,25 @@ class InternalTestConfig(BaseSettings): ) +class DatasetQueueMonitorConfig(BaseSettings): + """ + Configuration settings for Dataset Queue Monitor + """ + + QUEUE_MONITOR_THRESHOLD: Optional[NonNegativeInt] = Field( + description="Threshold for dataset queue monitor", + default=200, + ) + QUEUE_MONITOR_ALERT_EMAILS: Optional[str] = Field( + description="Emails for dataset queue monitor alert, separated by commas", + default=None, + ) + QUEUE_MONITOR_INTERVAL: Optional[NonNegativeFloat] = Field( + description="Interval for dataset queue monitor in minutes", + default=30, + ) + + class MiddlewareConfig( # place the configs in alphabet order CeleryConfig, @@ -303,5 +322,6 @@ class MiddlewareConfig( BaiduVectorDBConfig, OpenGaussConfig, TableStoreConfig, + DatasetQueueMonitorConfig, ): pass diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 26bd6b357..a83755200 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -70,6 +70,7 @@ def init_app(app: DifyApp) -> Celery: "schedule.update_tidb_serverless_status_task", "schedule.clean_messages", "schedule.mail_clean_document_notify_task", + "schedule.queue_monitor_task", ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME beat_schedule = { @@ -98,6 +99,12 @@ def init_app(app: DifyApp) -> Celery: "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task", "schedule": crontab(minute="0", hour="10", day_of_week="1"), }, + "datasets-queue-monitor": { + "task": "schedule.queue_monitor_task.queue_monitor_task", + "schedule": timedelta( + minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30 + ), + }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) diff --git a/api/schedule/queue_monitor_task.py b/api/schedule/queue_monitor_task.py new file mode 100644 index 000000000..e3a7021b9 --- /dev/null +++ b/api/schedule/queue_monitor_task.py @@ -0,0 +1,62 @@ +import logging +from datetime import datetime +from urllib.parse import urlparse + +import click +from flask import render_template +from redis import Redis + +import app +from configs import dify_config +from extensions.ext_database import db +from extensions.ext_mail import mail + +# Create a dedicated Redis connection (using the same configuration as Celery) +celery_broker_url = dify_config.CELERY_BROKER_URL + +parsed = urlparse(celery_broker_url) +host = parsed.hostname or "localhost" +port = parsed.port or 6379 +password = parsed.password or None +redis_db = parsed.path.strip("/") or "1" # type: ignore + +celery_redis = Redis(host=host, port=port, password=password, db=redis_db) + + +@app.celery.task(queue="monitor") +def queue_monitor_task(): + queue_name = "dataset" + threshold = dify_config.QUEUE_MONITOR_THRESHOLD + + try: + queue_length = celery_redis.llen(f"{queue_name}") + logging.info(click.style(f"Start monitor {queue_name}", fg="green")) + logging.info(click.style(f"Queue length: {queue_length}", fg="green")) + + if queue_length >= threshold: + warning_msg = f"Queue {queue_name} task count exceeded the limit.: {queue_length}/{threshold}" + logging.warning(click.style(warning_msg, fg="red")) + alter_emails = dify_config.QUEUE_MONITOR_ALERT_EMAILS + if alter_emails: + to_list = alter_emails.split(",") + for to in to_list: + try: + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + html_content = render_template( + "queue_monitor_alert_email_template_en-US.html", + queue_name=queue_name, + queue_length=queue_length, + threshold=threshold, + alert_time=current_time, + ) + mail.send( + to=to, subject="Alert: Dataset Queue pending tasks exceeded the limit", html=html_content + ) + except Exception as e: + logging.exception(click.style("Exception occurred during sending email", fg="red")) + + except Exception as e: + logging.exception(click.style("Exception occurred during queue monitoring", fg="red")) + finally: + if db.session.is_active: + db.session.close() diff --git a/api/tasks/batch_create_segment_to_index_task.py b/api/tasks/batch_create_segment_to_index_task.py index f32bc4f18..51b6343fd 100644 --- a/api/tasks/batch_create_segment_to_index_task.py +++ b/api/tasks/batch_create_segment_to_index_task.py @@ -5,7 +5,7 @@ import uuid import click from celery import shared_task # type: ignore -from sqlalchemy import func, select +from sqlalchemy import func from sqlalchemy.orm import Session from core.model_manager import ModelManager @@ -68,11 +68,6 @@ def batch_create_segment_to_index_task( model_type=ModelType.TEXT_EMBEDDING, model=dataset.embedding_model, ) - word_count_change = 0 - segments_to_insert: list[str] = [] - max_position_stmt = select(func.max(DocumentSegment.position)).where( - DocumentSegment.document_id == dataset_document.id - ) word_count_change = 0 if embedding_model: tokens_list = embedding_model.get_text_embedding_num_tokens( diff --git a/api/templates/queue_monitor_alert_email_template_en-US.html b/api/templates/queue_monitor_alert_email_template_en-US.html new file mode 100644 index 000000000..288521086 --- /dev/null +++ b/api/templates/queue_monitor_alert_email_template_en-US.html @@ -0,0 +1,129 @@ + + + +
+ + + + +Queue Monitoring Alert
+Our system has detected an abnormal queue status that requires your attention:
+ +Recommended actions:
+1. Check the queue processing status in the system dashboard
+2. Verify if there are any processing bottlenecks
+3. Consider scaling up workers if needed
+Additional Information:
+