diff --git a/api/migrations/versions/2025_06_06_1424-4474872b0ee6_workflow_draft_varaibles_add_node_execution_id.py b/api/migrations/versions/2025_06_06_1424-4474872b0ee6_workflow_draft_varaibles_add_node_execution_id.py new file mode 100644 index 000000000..d7a5d116c --- /dev/null +++ b/api/migrations/versions/2025_06_06_1424-4474872b0ee6_workflow_draft_varaibles_add_node_execution_id.py @@ -0,0 +1,60 @@ +"""`workflow_draft_varaibles` add `node_execution_id` column, add an index for `workflow_node_executions`. + +Revision ID: 4474872b0ee6 +Revises: 2adcbe1f5dfb +Create Date: 2025-06-06 14:24:44.213018 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '4474872b0ee6' +down_revision = '2adcbe1f5dfb' +branch_labels = None +depends_on = None + + +def upgrade(): + # `CREATE INDEX CONCURRENTLY` cannot run within a transaction, so use the `autocommit_block` + # context manager to wrap the index creation statement. + # Reference: + # + # - https://www.postgresql.org/docs/current/sql-createindex.html#:~:text=Another%20difference%20is,CREATE%20INDEX%20CONCURRENTLY%20cannot. + # - https://alembic.sqlalchemy.org/en/latest/api/runtime.html#alembic.runtime.migration.MigrationContext.autocommit_block + with op.get_context().autocommit_block(): + op.create_index( + op.f('workflow_node_executions_tenant_id_idx'), + "workflow_node_executions", + ['tenant_id', 'workflow_id', 'node_id', sa.literal_column('created_at DESC')], + unique=False, + postgresql_concurrently=True, + ) + + with op.batch_alter_table('workflow_draft_variables', schema=None) as batch_op: + batch_op.add_column(sa.Column('node_execution_id', models.types.StringUUID(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + # `DROP INDEX CONCURRENTLY` cannot run within a transaction, so use the `autocommit_block` + # context manager to wrap the index creation statement. + # Reference: + # + # - https://www.postgresql.org/docs/current/sql-createindex.html#:~:text=Another%20difference%20is,CREATE%20INDEX%20CONCURRENTLY%20cannot. + # - https://alembic.sqlalchemy.org/en/latest/api/runtime.html#alembic.runtime.migration.MigrationContext.autocommit_block + # `DROP INDEX CONCURRENTLY` cannot run within a transaction, so commit existing transactions first. + # Reference: + # + # https://www.postgresql.org/docs/current/sql-createindex.html#:~:text=Another%20difference%20is,CREATE%20INDEX%20CONCURRENTLY%20cannot. + with op.get_context().autocommit_block(): + op.drop_index(op.f('workflow_node_executions_tenant_id_idx'), postgresql_concurrently=True) + + with op.batch_alter_table('workflow_draft_variables', schema=None) as batch_op: + batch_op.drop_column('node_execution_id') + + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index e868fb77a..2fff04554 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -16,8 +16,8 @@ if TYPE_CHECKING: from models.model import AppMode import sqlalchemy as sa -from sqlalchemy import UniqueConstraint, func -from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy import Index, PrimaryKeyConstraint, UniqueConstraint, func +from sqlalchemy.orm import Mapped, declared_attr, mapped_column from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE from core.helper import encrypter @@ -590,28 +590,48 @@ class WorkflowNodeExecutionModel(Base): """ __tablename__ = "workflow_node_executions" - __table_args__ = ( - db.PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"), - db.Index( - "workflow_node_execution_workflow_run_idx", - "tenant_id", - "app_id", - "workflow_id", - "triggered_from", - "workflow_run_id", - ), - db.Index( - "workflow_node_execution_node_run_idx", "tenant_id", "app_id", "workflow_id", "triggered_from", "node_id" - ), - db.Index( - "workflow_node_execution_id_idx", - "tenant_id", - "app_id", - "workflow_id", - "triggered_from", - "node_execution_id", - ), - ) + + @declared_attr + def __table_args__(cls): # noqa + return ( + PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"), + Index( + "workflow_node_execution_workflow_run_idx", + "tenant_id", + "app_id", + "workflow_id", + "triggered_from", + "workflow_run_id", + ), + Index( + "workflow_node_execution_node_run_idx", + "tenant_id", + "app_id", + "workflow_id", + "triggered_from", + "node_id", + ), + Index( + "workflow_node_execution_id_idx", + "tenant_id", + "app_id", + "workflow_id", + "triggered_from", + "node_execution_id", + ), + Index( + # The first argument is the index name, + # which we leave as `None`` to allow auto-generation by the ORM. + None, + cls.tenant_id, + cls.workflow_id, + cls.node_id, + # MyPy may flag the following line because it doesn't recognize that + # the `declared_attr` decorator passes the receiving class as the first + # argument to this method, allowing us to reference class attributes. + cls.created_at.desc(), # type: ignore + ), + ) id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()")) tenant_id: Mapped[str] = mapped_column(StringUUID) @@ -885,14 +905,29 @@ class WorkflowDraftVariable(Base): selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector") + # The data type of this variable's value value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20)) - # JSON string + + # The variable's value serialized as a JSON string value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value") - # visible + # Controls whether the variable should be displayed in the variable inspection panel visible: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=True) + + # Determines whether this variable can be modified by users editable: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=False) + # The `node_execution_id` field identifies the workflow node execution that created this variable. + # It corresponds to the `id` field in the `WorkflowNodeExecutionModel` model. + # + # This field is not `None` for system variables and node variables, and is `None` + # for conversation variables. + node_execution_id: Mapped[str | None] = mapped_column( + StringUUID, + nullable=True, + default=None, + ) + def get_selector(self) -> list[str]: selector = json.loads(self.selector) if not isinstance(selector, list):