Feature:Refactor batch update document status for #21324 (#21325)

This commit is contained in:
NeatGuyCoding
2025-06-23 09:49:13 +08:00
committed by GitHub
parent 3e7f8bad56
commit a0a89b562c
3 changed files with 1416 additions and 51 deletions

View File

@@ -1623,85 +1623,177 @@ class DocumentService:
Raises:
DocumentIndexingError: If document is being indexed or not in correct state
ValueError: If action is invalid
"""
if not document_ids:
return
# Early validation of action parameter
valid_actions = ["enable", "disable", "archive", "un_archive"]
if action not in valid_actions:
raise ValueError(f"Invalid action: {action}. Must be one of {valid_actions}")
documents_to_update = []
# First pass: validate all documents and prepare updates
for document_id in document_ids:
document = DocumentService.get_document(dataset.id, document_id)
if not document:
continue
# Check if document is being indexed
indexing_cache_key = f"document_{document.id}_indexing"
cache_result = redis_client.get(indexing_cache_key)
if cache_result is not None:
raise DocumentIndexingError(f"Document:{document.name} is being indexed, please try again later")
if action == "enable":
if document.enabled:
continue
document.enabled = True
document.disabled_at = None
document.disabled_by = None
document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
# Prepare update based on action
update_info = DocumentService._prepare_document_status_update(document, action, user)
if update_info:
documents_to_update.append(update_info)
# Second pass: apply all updates in a single transaction
if documents_to_update:
try:
for update_info in documents_to_update:
document = update_info["document"]
updates = update_info["updates"]
# Apply updates to the document
for field, value in updates.items():
setattr(document, field, value)
db.session.add(document)
# Batch commit all changes
db.session.commit()
except Exception as e:
# Rollback on any error
db.session.rollback()
raise e
# Execute async tasks and set Redis cache after successful commit
# propagation_error is used to capture any errors for submitting async task execution
propagation_error = None
for update_info in documents_to_update:
try:
# Execute async tasks after successful commit
if update_info["async_task"]:
task_info = update_info["async_task"]
task_func = task_info["function"]
task_args = task_info["args"]
task_func.delay(*task_args)
except Exception as e:
# Log the error but do not rollback the transaction
logging.exception(f"Error executing async task for document {update_info['document'].id}")
# don't raise the error immediately, but capture it for later
propagation_error = e
try:
# Set Redis cache if needed after successful commit
if update_info["set_cache"]:
document = update_info["document"]
indexing_cache_key = f"document_{document.id}_indexing"
redis_client.setex(indexing_cache_key, 600, 1)
except Exception as e:
# Log the error but do not rollback the transaction
logging.exception(f"Error setting cache for document {update_info['document'].id}")
# Raise any propagation error after all updates
if propagation_error:
raise propagation_error
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
@staticmethod
def _prepare_document_status_update(document, action: str, user):
"""
Prepare document status update information.
add_document_to_index_task.delay(document_id)
Args:
document: Document object to update
action: Action to perform
user: Current user
elif action == "disable":
if not document.completed_at or document.indexing_status != "completed":
raise DocumentIndexingError(f"Document: {document.name} is not completed.")
if not document.enabled:
continue
Returns:
dict: Update information or None if no update needed
"""
now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
document.enabled = False
document.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
document.disabled_by = user.id
document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
db.session.commit()
if action == "enable":
return DocumentService._prepare_enable_update(document, now)
elif action == "disable":
return DocumentService._prepare_disable_update(document, user, now)
elif action == "archive":
return DocumentService._prepare_archive_update(document, user, now)
elif action == "un_archive":
return DocumentService._prepare_unarchive_update(document, now)
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
return None
remove_document_from_index_task.delay(document_id)
@staticmethod
def _prepare_enable_update(document, now):
"""Prepare updates for enabling a document."""
if document.enabled:
return None
elif action == "archive":
if document.archived:
continue
return {
"document": document,
"updates": {"enabled": True, "disabled_at": None, "disabled_by": None, "updated_at": now},
"async_task": {"function": add_document_to_index_task, "args": [document.id]},
"set_cache": True,
}
document.archived = True
document.archived_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
document.archived_by = user.id
document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
db.session.commit()
@staticmethod
def _prepare_disable_update(document, user, now):
"""Prepare updates for disabling a document."""
if not document.completed_at or document.indexing_status != "completed":
raise DocumentIndexingError(f"Document: {document.name} is not completed.")
if document.enabled:
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
if not document.enabled:
return None
remove_document_from_index_task.delay(document_id)
return {
"document": document,
"updates": {"enabled": False, "disabled_at": now, "disabled_by": user.id, "updated_at": now},
"async_task": {"function": remove_document_from_index_task, "args": [document.id]},
"set_cache": True,
}
elif action == "un_archive":
if not document.archived:
continue
document.archived = False
document.archived_at = None
document.archived_by = None
document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
db.session.commit()
@staticmethod
def _prepare_archive_update(document, user, now):
"""Prepare updates for archiving a document."""
if document.archived:
return None
# Only re-index if the document is currently enabled
if document.enabled:
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
add_document_to_index_task.delay(document_id)
update_info = {
"document": document,
"updates": {"archived": True, "archived_at": now, "archived_by": user.id, "updated_at": now},
"async_task": None,
"set_cache": False,
}
else:
raise ValueError(f"Invalid action: {action}")
# Only set async task and cache if document is currently enabled
if document.enabled:
update_info["async_task"] = {"function": remove_document_from_index_task, "args": [document.id]}
update_info["set_cache"] = True
return update_info
@staticmethod
def _prepare_unarchive_update(document, now):
"""Prepare updates for unarchiving a document."""
if not document.archived:
return None
update_info = {
"document": document,
"updates": {"archived": False, "archived_at": None, "archived_by": None, "updated_at": now},
"async_task": None,
"set_cache": False,
}
# Only re-index if the document is currently enabled
if document.enabled:
update_info["async_task"] = {"function": add_document_to_index_task, "args": [document.id]}
update_info["set_cache"] = True
return update_info
class SegmentService: