From ca9635070747093a8d7856a9b56e3e4a17d48931 Mon Sep 17 00:00:00 2001 From: Novice Date: Tue, 2 Sep 2025 11:46:11 +0800 Subject: [PATCH] chore: optimize SQL queries that perform partial full table scans (#24786) --- .../advanced_chat/generate_task_pipeline.py | 5 -- .../processor/parent_child_index_processor.py | 7 ++- .../update_provider_when_message_created.py | 48 +++++++++++++++++-- ...3885c_add_workflow_app_log_run_id_index.py | 32 +++++++++++++ api/models/workflow.py | 1 + 5 files changed, 83 insertions(+), 10 deletions(-) create mode 100644 api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index fb61b4c35..ee21a4737 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -73,7 +73,6 @@ from core.workflow.repositories.workflow_execution_repository import WorkflowExe from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager -from events.message_event import message_was_created from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models import Conversation, EndUser, Message, MessageFile @@ -939,10 +938,6 @@ class AdvancedChatAppGenerateTaskPipeline: self._task_state.metadata.usage = usage else: self._task_state.metadata.usage = LLMUsage.empty_usage() - message_was_created.send( - message, - application_generate_entity=self._application_generate_entity, - ) def _message_end_to_stream_response(self) -> MessageEndStreamResponse: """ diff --git a/api/core/rag/index_processor/processor/parent_child_index_processor.py b/api/core/rag/index_processor/processor/parent_child_index_processor.py index a13d303e4..cb7f6ab57 100644 --- a/api/core/rag/index_processor/processor/parent_child_index_processor.py +++ b/api/core/rag/index_processor/processor/parent_child_index_processor.py @@ -130,13 +130,16 @@ class ParentChildIndexProcessor(BaseIndexProcessor): if delete_child_chunks: db.session.query(ChildChunk).where( ChildChunk.dataset_id == dataset.id, ChildChunk.index_node_id.in_(child_node_ids) - ).delete() + ).delete(synchronize_session=False) db.session.commit() else: vector.delete() if delete_child_chunks: - db.session.query(ChildChunk).where(ChildChunk.dataset_id == dataset.id).delete() + # Use existing compound index: (tenant_id, dataset_id, ...) + db.session.query(ChildChunk).where( + ChildChunk.tenant_id == dataset.tenant_id, ChildChunk.dataset_id == dataset.id + ).delete(synchronize_session=False) db.session.commit() def retrieve( diff --git a/api/events/event_handlers/update_provider_when_message_created.py b/api/events/event_handlers/update_provider_when_message_created.py index 90eb524c9..96ff6fcf5 100644 --- a/api/events/event_handlers/update_provider_when_message_created.py +++ b/api/events/event_handlers/update_provider_when_message_created.py @@ -13,12 +13,39 @@ from core.entities.provider_entities import QuotaUnit, SystemConfiguration from core.plugin.entities.plugin import ModelProviderID from events.message_event import message_was_created from extensions.ext_database import db +from extensions.ext_redis import redis_client, redis_fallback from libs import datetime_utils from models.model import Message from models.provider import Provider, ProviderType logger = logging.getLogger(__name__) +# Redis cache key prefix for provider last used timestamps +_PROVIDER_LAST_USED_CACHE_PREFIX = "provider:last_used" +# Default TTL for cache entries (10 minutes) +_CACHE_TTL_SECONDS = 600 +LAST_USED_UPDATE_WINDOW_SECONDS = 60 * 5 + + +def _get_provider_cache_key(tenant_id: str, provider_name: str) -> str: + """Generate Redis cache key for provider last used timestamp.""" + return f"{_PROVIDER_LAST_USED_CACHE_PREFIX}:{tenant_id}:{provider_name}" + + +@redis_fallback(default_return=None) +def _get_last_update_timestamp(cache_key: str) -> Optional[datetime]: + """Get last update timestamp from Redis cache.""" + timestamp_str = redis_client.get(cache_key) + if timestamp_str: + return datetime.fromtimestamp(float(timestamp_str.decode("utf-8"))) + return None + + +@redis_fallback() +def _set_last_update_timestamp(cache_key: str, timestamp: datetime) -> None: + """Set last update timestamp in Redis cache with TTL.""" + redis_client.setex(cache_key, _CACHE_TTL_SECONDS, str(timestamp.timestamp())) + class _ProviderUpdateFilters(BaseModel): """Filters for identifying Provider records to update.""" @@ -139,7 +166,7 @@ def handle(sender: Message, **kwargs): provider_name, ) - except Exception as e: + except Exception: # Log failure with timing and context duration = time_module.perf_counter() - start_time @@ -215,8 +242,23 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation] # Prepare values dict for SQLAlchemy update update_values = {} - # updateing to `last_used` is removed due to performance reason. - # ref: https://github.com/langgenius/dify/issues/24526 + + # NOTE: For frequently used providers under high load, this implementation may experience + # race conditions or update contention despite the time-window optimization: + # 1. Multiple concurrent requests might check the same cache key simultaneously + # 2. Redis cache operations are not atomic with database updates + # 3. Heavy providers could still face database lock contention during peak usage + # The current implementation is acceptable for most scenarios, but future optimization + # considerations could include: batched updates, or async processing. + if values.last_used is not None: + cache_key = _get_provider_cache_key(filters.tenant_id, filters.provider_name) + now = datetime_utils.naive_utc_now() + last_update = _get_last_update_timestamp(cache_key) + + if last_update is None or (now - last_update).total_seconds() > LAST_USED_UPDATE_WINDOW_SECONDS: + update_values["last_used"] = values.last_used + _set_last_update_timestamp(cache_key, now) + if values.quota_used is not None: update_values["quota_used"] = values.quota_used # Skip the current update operation if no updates are required. diff --git a/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py new file mode 100644 index 000000000..465f8664a --- /dev/null +++ b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py @@ -0,0 +1,32 @@ +"""chore: add workflow app log run id index + +Revision ID: b95962a3885c +Revises: 0e154742a5fa +Create Date: 2025-08-29 15:34:09.838623 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'b95962a3885c' +down_revision = '8d289573e1da' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op: + batch_op.create_index('workflow_app_log_workflow_run_id_idx', ['workflow_run_id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op: + batch_op.drop_index('workflow_app_log_workflow_run_id_idx') + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index 4d0089fa4..28bf683fb 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -835,6 +835,7 @@ class WorkflowAppLog(Base): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"), sa.Index("workflow_app_log_app_idx", "tenant_id", "app_id"), + sa.Index("workflow_app_log_workflow_run_id_idx", "workflow_run_id"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))