Feat: time period filter for workflow logs (#14271)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
KVOJJJin
2025-03-10 14:02:58 +08:00
committed by GitHub
parent 3254018ddb
commit 78d460a6d1
9 changed files with 163 additions and 50 deletions

View File

@@ -1,30 +1,46 @@
import uuid
from datetime import datetime
from flask_sqlalchemy.pagination import Pagination
from sqlalchemy import and_, or_
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from extensions.ext_database import db
from models import App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import CreatedByRole
from models.workflow import WorkflowRunStatus
class WorkflowAppService:
def get_paginate_workflow_app_logs(self, app_model: App, args: dict) -> Pagination:
def get_paginate_workflow_app_logs(
self,
*,
session: Session,
app_model: App,
keyword: str | None = None,
status: WorkflowRunStatus | None = None,
created_at_before: datetime | None = None,
created_at_after: datetime | None = None,
page: int = 1,
limit: int = 20,
) -> dict:
"""
Get paginate workflow app logs
:param app: app model
:param args: request args
:return:
Get paginate workflow app logs using SQLAlchemy 2.0 style
:param session: SQLAlchemy session
:param app_model: app model
:param keyword: search keyword
:param status: filter by status
:param created_at_before: filter logs created before this timestamp
:param created_at_after: filter logs created after this timestamp
:param page: page number
:param limit: items per page
:return: Pagination object
"""
query = db.select(WorkflowAppLog).where(
# Build base statement using SQLAlchemy 2.0 style
stmt = select(WorkflowAppLog).where(
WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
)
status = WorkflowRunStatus.value_of(args.get("status", "")) if args.get("status") else None
keyword = args["keyword"]
if keyword or status:
query = query.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
if keyword:
keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
@@ -40,20 +56,40 @@ class WorkflowAppService:
if keyword_uuid:
keyword_conditions.append(WorkflowRun.id == keyword_uuid)
query = query.outerjoin(
stmt = stmt.outerjoin(
EndUser,
and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER),
).filter(or_(*keyword_conditions))
).where(or_(*keyword_conditions))
if status:
# join with workflow_run and filter by status
query = query.filter(WorkflowRun.status == status.value)
stmt = stmt.where(WorkflowRun.status == status)
query = query.order_by(WorkflowAppLog.created_at.desc())
# Add time-based filtering
if created_at_before:
stmt = stmt.where(WorkflowAppLog.created_at <= created_at_before)
pagination = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False)
if created_at_after:
stmt = stmt.where(WorkflowAppLog.created_at >= created_at_after)
return pagination
stmt = stmt.order_by(WorkflowAppLog.created_at.desc())
# Get total count using the same filters
count_stmt = select(func.count()).select_from(stmt.subquery())
total = session.scalar(count_stmt) or 0
# Apply pagination limits
offset_stmt = stmt.offset((page - 1) * limit).limit(limit)
# Execute query and get items
items = list(session.scalars(offset_stmt).all())
return {
"page": page,
"limit": limit,
"total": total,
"has_more": total > page * limit,
"data": items,
}
@staticmethod
def _safe_parse_uuid(value: str):