From a32dde5428ce52aa86f9ba48431853912031da2d Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 2 Sep 2025 14:18:29 +0800 Subject: [PATCH] Fix: Resolve workflow_node_execution primary key conflicts with UUID v7 (#24643) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- ...hemy_workflow_node_execution_repository.py | 84 ++++++- api/core/workflow/workflow_cycle_manager.py | 6 +- ...rkflow_node_execution_conflict_handling.py | 210 ++++++++++++++++++ .../test_sqlalchemy_repository.py | 45 +++- 4 files changed, 325 insertions(+), 20 deletions(-) create mode 100644 api/tests/unit_tests/core/repositories/test_workflow_node_execution_conflict_handling.py diff --git a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py index f4532d7f2..e5e5626b2 100644 --- a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py @@ -7,9 +7,12 @@ import logging from collections.abc import Sequence from typing import Optional, Union +import psycopg2.errors from sqlalchemy import UnaryExpression, asc, desc, select from sqlalchemy.engine import Engine +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker +from tenacity import before_sleep_log, retry, retry_if_exception, stop_after_attempt from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.workflow_node_execution import ( @@ -21,6 +24,7 @@ from core.workflow.nodes.enums import NodeType from core.workflow.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from libs.helper import extract_tenant_id +from libs.uuid_utils import uuidv7 from models import ( Account, CreatorUserRole, @@ -186,18 +190,31 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) db_model.finished_at = domain_model.finished_at return db_model + def _is_duplicate_key_error(self, exception: BaseException) -> bool: + """Check if the exception is a duplicate key constraint violation.""" + return isinstance(exception, IntegrityError) and isinstance(exception.orig, psycopg2.errors.UniqueViolation) + + def _regenerate_id_on_duplicate( + self, execution: WorkflowNodeExecution, db_model: WorkflowNodeExecutionModel + ) -> None: + """Regenerate UUID v7 for both domain and database models when duplicate key detected.""" + new_id = str(uuidv7()) + logger.warning( + "Duplicate key conflict for workflow node execution ID %s, generating new UUID v7: %s", db_model.id, new_id + ) + db_model.id = new_id + execution.id = new_id + def save(self, execution: WorkflowNodeExecution) -> None: """ Save or update a NodeExecution domain entity to the database. This method serves as a domain-to-database adapter that: 1. Converts the domain entity to its database representation - 2. Persists the database model using SQLAlchemy's merge operation + 2. Checks for existing records and updates or inserts accordingly 3. Maintains proper multi-tenancy by including tenant context during conversion 4. Updates the in-memory cache for faster subsequent lookups - - The method handles both creating new records and updating existing ones through - SQLAlchemy's merge operation. + 5. Handles duplicate key conflicts by retrying with a new UUID v7 Args: execution: The NodeExecution domain entity to persist @@ -205,19 +222,62 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) # Convert domain model to database model using tenant context and other attributes db_model = self.to_db_model(execution) - # Create a new database session - with self._session_factory() as session: - # SQLAlchemy merge intelligently handles both insert and update operations - # based on the presence of the primary key - session.merge(db_model) - session.commit() + # Use tenacity for retry logic with duplicate key handling + @retry( + stop=stop_after_attempt(3), + retry=retry_if_exception(self._is_duplicate_key_error), + before_sleep=before_sleep_log(logger, logging.WARNING), + reraise=True, + ) + def _save_with_retry(): + try: + self._persist_to_database(db_model) + except IntegrityError as e: + if self._is_duplicate_key_error(e): + # Generate new UUID and retry + self._regenerate_id_on_duplicate(execution, db_model) + raise # Let tenacity handle the retry + else: + # Different integrity error, don't retry + logger.exception("Non-duplicate key integrity error while saving workflow node execution") + raise - # Update the in-memory cache for faster subsequent lookups - # Only cache if we have a node_execution_id to use as the cache key + try: + _save_with_retry() + + # Update the in-memory cache after successful save if db_model.node_execution_id: logger.debug("Updating cache for node_execution_id: %s", db_model.node_execution_id) self._node_execution_cache[db_model.node_execution_id] = db_model + except Exception as e: + logger.exception("Failed to save workflow node execution after all retries") + raise + + def _persist_to_database(self, db_model: WorkflowNodeExecutionModel) -> None: + """ + Persist the database model to the database. + + Checks if a record with the same ID exists and either updates it or creates a new one. + + Args: + db_model: The database model to persist + """ + with self._session_factory() as session: + # Check if record already exists + existing = session.get(WorkflowNodeExecutionModel, db_model.id) + + if existing: + # Update existing record by copying all non-private attributes + for key, value in db_model.__dict__.items(): + if not key.startswith("_"): + setattr(existing, key, value) + else: + # Add new record + session.add(db_model) + + session.commit() + def get_db_models_by_workflow_run( self, workflow_run_id: str, diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 03f670707..3c264e782 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -2,7 +2,6 @@ from collections.abc import Mapping from dataclasses import dataclass from datetime import datetime from typing import Any, Optional, Union -from uuid import uuid4 from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.queue_entities import ( @@ -29,6 +28,7 @@ from core.workflow.repositories.workflow_node_execution_repository import Workfl from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry from libs.datetime_utils import naive_utc_now +from libs.uuid_utils import uuidv7 @dataclass @@ -266,7 +266,7 @@ class WorkflowCycleManager: """Get execution ID from system variables or generate a new one.""" if self._workflow_system_variables and self._workflow_system_variables.workflow_execution_id: return str(self._workflow_system_variables.workflow_execution_id) - return str(uuid4()) + return str(uuidv7()) def _save_and_cache_workflow_execution(self, execution: WorkflowExecution) -> WorkflowExecution: """Save workflow execution to repository and cache it.""" @@ -371,7 +371,7 @@ class WorkflowCycleManager: } domain_execution = WorkflowNodeExecution( - id=str(uuid4()), + id=str(uuidv7()), workflow_id=workflow_execution.workflow_id, workflow_execution_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, diff --git a/api/tests/unit_tests/core/repositories/test_workflow_node_execution_conflict_handling.py b/api/tests/unit_tests/core/repositories/test_workflow_node_execution_conflict_handling.py new file mode 100644 index 000000000..84484fe22 --- /dev/null +++ b/api/tests/unit_tests/core/repositories/test_workflow_node_execution_conflict_handling.py @@ -0,0 +1,210 @@ +"""Unit tests for workflow node execution conflict handling.""" + +from datetime import datetime +from unittest.mock import MagicMock, Mock + +import psycopg2.errors +import pytest +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker + +from core.repositories.sqlalchemy_workflow_node_execution_repository import ( + SQLAlchemyWorkflowNodeExecutionRepository, +) +from core.workflow.entities.workflow_node_execution import ( + WorkflowNodeExecution, + WorkflowNodeExecutionStatus, +) +from core.workflow.nodes.enums import NodeType +from models import Account, WorkflowNodeExecutionTriggeredFrom + + +class TestWorkflowNodeExecutionConflictHandling: + """Test cases for handling duplicate key conflicts in workflow node execution.""" + + def setup_method(self): + """Set up test fixtures.""" + # Create a mock user with tenant_id + self.mock_user = Mock(spec=Account) + self.mock_user.id = "test-user-id" + self.mock_user.current_tenant_id = "test-tenant-id" + + # Create mock session factory + self.mock_session_factory = Mock(spec=sessionmaker) + + # Create repository instance + self.repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=self.mock_session_factory, + user=self.mock_user, + app_id="test-app-id", + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, + ) + + def test_save_with_duplicate_key_retries_with_new_uuid(self): + """Test that save retries with a new UUID v7 when encountering duplicate key error.""" + # Create a mock session + mock_session = MagicMock() + mock_session.__enter__ = Mock(return_value=mock_session) + mock_session.__exit__ = Mock(return_value=None) + self.mock_session_factory.return_value = mock_session + + # Mock session.get to return None (no existing record) + mock_session.get.return_value = None + + # Create IntegrityError for duplicate key with proper psycopg2.errors.UniqueViolation + mock_unique_violation = Mock(spec=psycopg2.errors.UniqueViolation) + duplicate_error = IntegrityError( + "duplicate key value violates unique constraint", + params=None, + orig=mock_unique_violation, + ) + + # First call to session.add raises IntegrityError, second succeeds + mock_session.add.side_effect = [duplicate_error, None] + mock_session.commit.side_effect = [None, None] + + # Create test execution + execution = WorkflowNodeExecution( + id="original-id", + workflow_id="test-workflow-id", + workflow_execution_id="test-workflow-execution-id", + node_execution_id="test-node-execution-id", + node_id="test-node-id", + node_type=NodeType.START, + title="Test Node", + index=1, + status=WorkflowNodeExecutionStatus.RUNNING, + created_at=datetime.utcnow(), + ) + + original_id = execution.id + + # Save should succeed after retry + self.repository.save(execution) + + # Verify that session.add was called twice (initial attempt + retry) + assert mock_session.add.call_count == 2 + + # Verify that the ID was changed (new UUID v7 generated) + assert execution.id != original_id + + def test_save_with_existing_record_updates_instead_of_insert(self): + """Test that save updates existing record instead of inserting duplicate.""" + # Create a mock session + mock_session = MagicMock() + mock_session.__enter__ = Mock(return_value=mock_session) + mock_session.__exit__ = Mock(return_value=None) + self.mock_session_factory.return_value = mock_session + + # Mock existing record + mock_existing = MagicMock() + mock_session.get.return_value = mock_existing + mock_session.commit.return_value = None + + # Create test execution + execution = WorkflowNodeExecution( + id="existing-id", + workflow_id="test-workflow-id", + workflow_execution_id="test-workflow-execution-id", + node_execution_id="test-node-execution-id", + node_id="test-node-id", + node_type=NodeType.START, + title="Test Node", + index=1, + status=WorkflowNodeExecutionStatus.SUCCEEDED, + created_at=datetime.utcnow(), + ) + + # Save should update existing record + self.repository.save(execution) + + # Verify that session.add was not called (update path) + mock_session.add.assert_not_called() + + # Verify that session.commit was called + mock_session.commit.assert_called_once() + + def test_save_exceeds_max_retries_raises_error(self): + """Test that save raises error after exceeding max retries.""" + # Create a mock session + mock_session = MagicMock() + mock_session.__enter__ = Mock(return_value=mock_session) + mock_session.__exit__ = Mock(return_value=None) + self.mock_session_factory.return_value = mock_session + + # Mock session.get to return None (no existing record) + mock_session.get.return_value = None + + # Create IntegrityError for duplicate key with proper psycopg2.errors.UniqueViolation + mock_unique_violation = Mock(spec=psycopg2.errors.UniqueViolation) + duplicate_error = IntegrityError( + "duplicate key value violates unique constraint", + params=None, + orig=mock_unique_violation, + ) + + # All attempts fail with duplicate error + mock_session.add.side_effect = duplicate_error + + # Create test execution + execution = WorkflowNodeExecution( + id="test-id", + workflow_id="test-workflow-id", + workflow_execution_id="test-workflow-execution-id", + node_execution_id="test-node-execution-id", + node_id="test-node-id", + node_type=NodeType.START, + title="Test Node", + index=1, + status=WorkflowNodeExecutionStatus.RUNNING, + created_at=datetime.utcnow(), + ) + + # Save should raise IntegrityError after max retries + with pytest.raises(IntegrityError): + self.repository.save(execution) + + # Verify that session.add was called 3 times (max_retries) + assert mock_session.add.call_count == 3 + + def test_save_non_duplicate_integrity_error_raises_immediately(self): + """Test that non-duplicate IntegrityErrors are raised immediately without retry.""" + # Create a mock session + mock_session = MagicMock() + mock_session.__enter__ = Mock(return_value=mock_session) + mock_session.__exit__ = Mock(return_value=None) + self.mock_session_factory.return_value = mock_session + + # Mock session.get to return None (no existing record) + mock_session.get.return_value = None + + # Create IntegrityError for non-duplicate constraint + other_error = IntegrityError( + "null value in column violates not-null constraint", + params=None, + orig=None, + ) + + # First call raises non-duplicate error + mock_session.add.side_effect = other_error + + # Create test execution + execution = WorkflowNodeExecution( + id="test-id", + workflow_id="test-workflow-id", + workflow_execution_id="test-workflow-execution-id", + node_execution_id="test-node-execution-id", + node_id="test-node-id", + node_type=NodeType.START, + title="Test Node", + index=1, + status=WorkflowNodeExecutionStatus.RUNNING, + created_at=datetime.utcnow(), + ) + + # Save should raise error immediately + with pytest.raises(IntegrityError): + self.repository.save(execution) + + # Verify that session.add was called only once (no retry) + assert mock_session.add.call_count == 1 diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py index c60800c49..b81d55cf5 100644 --- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py +++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py @@ -86,6 +86,8 @@ def test_save(repository, session): session_obj, _ = session # Create a mock execution execution = MagicMock(spec=WorkflowNodeExecutionModel) + execution.id = "test-id" + execution.node_execution_id = "test-node-execution-id" execution.tenant_id = None execution.app_id = None execution.inputs = None @@ -95,7 +97,13 @@ def test_save(repository, session): # Mock the to_db_model method to return the execution itself # This simulates the behavior of setting tenant_id and app_id - repository.to_db_model = MagicMock(return_value=execution) + db_model = MagicMock(spec=WorkflowNodeExecutionModel) + db_model.id = "test-id" + db_model.node_execution_id = "test-node-execution-id" + repository.to_db_model = MagicMock(return_value=db_model) + + # Mock session.get to return None (no existing record) + session_obj.get.return_value = None # Call save method repository.save(execution) @@ -103,8 +111,14 @@ def test_save(repository, session): # Assert to_db_model was called with the execution repository.to_db_model.assert_called_once_with(execution) - # Assert session.merge was called (now using merge for both save and update) - session_obj.merge.assert_called_once_with(execution) + # Assert session.get was called to check for existing record + session_obj.get.assert_called_once_with(WorkflowNodeExecutionModel, db_model.id) + + # Assert session.add was called for new record + session_obj.add.assert_called_once_with(db_model) + + # Assert session.commit was called + session_obj.commit.assert_called_once() def test_save_with_existing_tenant_id(repository, session): @@ -112,6 +126,8 @@ def test_save_with_existing_tenant_id(repository, session): session_obj, _ = session # Create a mock execution with existing tenant_id execution = MagicMock(spec=WorkflowNodeExecutionModel) + execution.id = "existing-id" + execution.node_execution_id = "existing-node-execution-id" execution.tenant_id = "existing-tenant" execution.app_id = None execution.inputs = None @@ -121,20 +137,39 @@ def test_save_with_existing_tenant_id(repository, session): # Create a modified execution that will be returned by _to_db_model modified_execution = MagicMock(spec=WorkflowNodeExecutionModel) + modified_execution.id = "existing-id" + modified_execution.node_execution_id = "existing-node-execution-id" modified_execution.tenant_id = "existing-tenant" # Tenant ID should not change modified_execution.app_id = repository._app_id # App ID should be set + # Create a dictionary to simulate __dict__ for updating attributes + modified_execution.__dict__ = { + "id": "existing-id", + "node_execution_id": "existing-node-execution-id", + "tenant_id": "existing-tenant", + "app_id": repository._app_id, + } # Mock the to_db_model method to return the modified execution repository.to_db_model = MagicMock(return_value=modified_execution) + # Mock session.get to return an existing record + existing_model = MagicMock(spec=WorkflowNodeExecutionModel) + session_obj.get.return_value = existing_model + # Call save method repository.save(execution) # Assert to_db_model was called with the execution repository.to_db_model.assert_called_once_with(execution) - # Assert session.merge was called with the modified execution (now using merge for both save and update) - session_obj.merge.assert_called_once_with(modified_execution) + # Assert session.get was called to check for existing record + session_obj.get.assert_called_once_with(WorkflowNodeExecutionModel, modified_execution.id) + + # Assert session.add was NOT called since we're updating existing + session_obj.add.assert_not_called() + + # Assert session.commit was called + session_obj.commit.assert_called_once() def test_get_by_workflow_run(repository, session, mocker: MockerFixture):