diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 34a1da222..1a8923733 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -50,6 +50,7 @@ from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter +from libs.datetime_utils import naive_utc_now from models import ( Account, CreatorUserRole, @@ -399,7 +400,7 @@ class WorkflowResponseConverter: if event.error is None else WorkflowNodeExecutionStatus.FAILED, error=None, - elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), + elapsed_time=(naive_utc_now() - event.start_at).total_seconds(), total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, execution_metadata=event.metadata, finished_at=int(time.time()), @@ -478,7 +479,7 @@ class WorkflowResponseConverter: if event.error is None else WorkflowNodeExecutionStatus.FAILED, error=None, - elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), + elapsed_time=(naive_utc_now() - event.start_at).total_seconds(), total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, execution_metadata=event.metadata, finished_at=int(time.time()), diff --git a/api/core/entities/provider_configuration.py b/api/core/entities/provider_configuration.py index 8bfbd82e1..646e0e21e 100644 --- a/api/core/entities/provider_configuration.py +++ b/api/core/entities/provider_configuration.py @@ -1,4 +1,3 @@ -import datetime import json import logging from collections import defaultdict @@ -29,6 +28,7 @@ from core.model_runtime.model_providers.__base.ai_model import AIModel from core.model_runtime.model_providers.model_provider_factory import ModelProviderFactory from core.plugin.entities.plugin import ModelProviderID from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.provider import ( LoadBalancingModelConfig, Provider, @@ -261,7 +261,7 @@ class ProviderConfiguration(BaseModel): if provider_record: provider_record.encrypted_config = json.dumps(credentials) provider_record.is_valid = True - provider_record.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + provider_record.updated_at = naive_utc_now() db.session.commit() else: provider_record = Provider() @@ -426,7 +426,7 @@ class ProviderConfiguration(BaseModel): if provider_model_record: provider_model_record.encrypted_config = json.dumps(credentials) provider_model_record.is_valid = True - provider_model_record.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + provider_model_record.updated_at = naive_utc_now() db.session.commit() else: provider_model_record = ProviderModel() @@ -501,7 +501,7 @@ class ProviderConfiguration(BaseModel): if model_setting: model_setting.enabled = True - model_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + model_setting.updated_at = naive_utc_now() db.session.commit() else: model_setting = ProviderModelSetting() @@ -526,7 +526,7 @@ class ProviderConfiguration(BaseModel): if model_setting: model_setting.enabled = False - model_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + model_setting.updated_at = naive_utc_now() db.session.commit() else: model_setting = ProviderModelSetting() @@ -599,7 +599,7 @@ class ProviderConfiguration(BaseModel): if model_setting: model_setting.load_balancing_enabled = True - model_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + model_setting.updated_at = naive_utc_now() db.session.commit() else: model_setting = ProviderModelSetting() @@ -638,7 +638,7 @@ class ProviderConfiguration(BaseModel): if model_setting: model_setting.load_balancing_enabled = False - model_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + model_setting.updated_at = naive_utc_now() db.session.commit() else: model_setting = ProviderModelSetting() diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index 8201a5f3b..987619460 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -1,5 +1,4 @@ import concurrent.futures -import datetime import json import logging import re @@ -34,6 +33,7 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage from libs import helper +from libs.datetime_utils import naive_utc_now from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegment from models.dataset import Document as DatasetDocument from models.model import UploadFile @@ -87,7 +87,7 @@ class IndexingRunner: except ProviderTokenNotInitError as e: dataset_document.indexing_status = "error" dataset_document.error = str(e.description) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() except ObjectDeletedError: logging.warning("Document deleted, document id: %s", dataset_document.id) @@ -95,7 +95,7 @@ class IndexingRunner: logging.exception("consume document failed") dataset_document.indexing_status = "error" dataset_document.error = str(e) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() def run_in_splitting_status(self, dataset_document: DatasetDocument): @@ -150,13 +150,13 @@ class IndexingRunner: except ProviderTokenNotInitError as e: dataset_document.indexing_status = "error" dataset_document.error = str(e.description) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() except Exception as e: logging.exception("consume document failed") dataset_document.indexing_status = "error" dataset_document.error = str(e) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() def run_in_indexing_status(self, dataset_document: DatasetDocument): @@ -225,13 +225,13 @@ class IndexingRunner: except ProviderTokenNotInitError as e: dataset_document.indexing_status = "error" dataset_document.error = str(e.description) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() except Exception as e: logging.exception("consume document failed") dataset_document.indexing_status = "error" dataset_document.error = str(e) - dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.stopped_at = naive_utc_now() db.session.commit() def indexing_estimate( @@ -400,7 +400,7 @@ class IndexingRunner: after_indexing_status="splitting", extra_update_params={ DatasetDocument.word_count: sum(len(text_doc.page_content) for text_doc in text_docs), - DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DatasetDocument.parsing_completed_at: naive_utc_now(), }, ) @@ -583,7 +583,7 @@ class IndexingRunner: after_indexing_status="completed", extra_update_params={ DatasetDocument.tokens: tokens, - DatasetDocument.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DatasetDocument.completed_at: naive_utc_now(), DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at, DatasetDocument.error: None, }, @@ -608,7 +608,7 @@ class IndexingRunner: { DocumentSegment.status: "completed", DocumentSegment.enabled: True, - DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.completed_at: naive_utc_now(), } ) @@ -639,7 +639,7 @@ class IndexingRunner: { DocumentSegment.status: "completed", DocumentSegment.enabled: True, - DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.completed_at: naive_utc_now(), } ) @@ -727,7 +727,7 @@ class IndexingRunner: doc_store.add_documents(docs=documents, save_child=dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX) # update document status to indexing - cur_time = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + cur_time = naive_utc_now() self._update_document_index_status( document_id=dataset_document.id, after_indexing_status="indexing", @@ -742,7 +742,7 @@ class IndexingRunner: dataset_document_id=dataset_document.id, update_params={ DocumentSegment.status: "indexing", - DocumentSegment.indexing_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.indexing_at: naive_utc_now(), }, ) pass diff --git a/api/core/rag/extractor/word_extractor.py b/api/core/rag/extractor/word_extractor.py index 0eff7c186..f3b162e3d 100644 --- a/api/core/rag/extractor/word_extractor.py +++ b/api/core/rag/extractor/word_extractor.py @@ -1,6 +1,5 @@ """Abstract interface for document loader implementations.""" -import datetime import logging import mimetypes import os @@ -19,6 +18,7 @@ from core.rag.extractor.extractor_base import BaseExtractor from core.rag.models.document import Document from extensions.ext_database import db from extensions.ext_storage import storage +from libs.datetime_utils import naive_utc_now from models.enums import CreatorUserRole from models.model import UploadFile @@ -117,10 +117,10 @@ class WordExtractor(BaseExtractor): mime_type=mime_type or "", created_by=self.user_id, created_by_role=CreatorUserRole.ACCOUNT, - created_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + created_at=naive_utc_now(), used=True, used_by=self.user_id, - used_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + used_at=naive_utc_now(), ) db.session.add(upload_file) diff --git a/api/core/workflow/entities/workflow_execution.py b/api/core/workflow/entities/workflow_execution.py index 781be4b3c..f00dc11aa 100644 --- a/api/core/workflow/entities/workflow_execution.py +++ b/api/core/workflow/entities/workflow_execution.py @@ -6,12 +6,14 @@ implementation details like tenant_id, app_id, etc. """ from collections.abc import Mapping -from datetime import UTC, datetime +from datetime import datetime from enum import StrEnum from typing import Any, Optional from pydantic import BaseModel, Field +from libs.datetime_utils import naive_utc_now + class WorkflowType(StrEnum): """ @@ -60,7 +62,7 @@ class WorkflowExecution(BaseModel): Calculate elapsed time in seconds. If workflow is not finished, use current time. """ - end_time = self.finished_at or datetime.now(UTC).replace(tzinfo=None) + end_time = self.finished_at or naive_utc_now() return (end_time - self.started_at).total_seconds() @classmethod diff --git a/api/core/workflow/graph_engine/entities/runtime_route_state.py b/api/core/workflow/graph_engine/entities/runtime_route_state.py index f2d9c9893..a4ddfafab 100644 --- a/api/core/workflow/graph_engine/entities/runtime_route_state.py +++ b/api/core/workflow/graph_engine/entities/runtime_route_state.py @@ -1,5 +1,5 @@ import uuid -from datetime import UTC, datetime +from datetime import datetime from enum import Enum from typing import Optional @@ -7,6 +7,7 @@ from pydantic import BaseModel, Field from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from libs.datetime_utils import naive_utc_now class RouteNodeState(BaseModel): @@ -71,7 +72,7 @@ class RouteNodeState(BaseModel): raise Exception(f"Invalid route status {run_result.status}") self.node_run_result = run_result - self.finished_at = datetime.now(UTC).replace(tzinfo=None) + self.finished_at = naive_utc_now() class RuntimeRouteState(BaseModel): @@ -89,7 +90,7 @@ class RuntimeRouteState(BaseModel): :param node_id: node id """ - state = RouteNodeState(node_id=node_id, start_at=datetime.now(UTC).replace(tzinfo=None)) + state = RouteNodeState(node_id=node_id, start_at=naive_utc_now()) self.node_state_mapping[state.id] = state return state diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index b9663d32f..03b920ccb 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -6,7 +6,6 @@ import uuid from collections.abc import Generator, Mapping from concurrent.futures import ThreadPoolExecutor, wait from copy import copy, deepcopy -from datetime import UTC, datetime from typing import Any, Optional, cast from flask import Flask, current_app @@ -51,6 +50,7 @@ from core.workflow.nodes.base import BaseNode from core.workflow.nodes.end.end_stream_processor import EndStreamProcessor from core.workflow.nodes.enums import ErrorStrategy, FailBranchSourceHandle from core.workflow.nodes.event import RunCompletedEvent, RunRetrieverResourceEvent, RunStreamChunkEvent +from libs.datetime_utils import naive_utc_now from libs.flask_utils import preserve_flask_contexts from models.enums import UserFrom from models.workflow import WorkflowType @@ -640,7 +640,7 @@ class GraphEngine: while should_continue_retry and retries <= max_retries: try: # run node - retry_start_at = datetime.now(UTC).replace(tzinfo=None) + retry_start_at = naive_utc_now() # yield control to other threads time.sleep(0.001) event_stream = node.run() diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index def1e1cfa..7f591a3ea 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -4,7 +4,7 @@ import time import uuid from collections.abc import Generator, Mapping, Sequence from concurrent.futures import Future, wait -from datetime import UTC, datetime +from datetime import datetime from queue import Empty, Queue from typing import TYPE_CHECKING, Any, Optional, cast @@ -41,6 +41,7 @@ from core.workflow.nodes.enums import ErrorStrategy, NodeType from core.workflow.nodes.event import NodeEvent, RunCompletedEvent from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData from factories.variable_factory import build_segment +from libs.datetime_utils import naive_utc_now from libs.flask_utils import preserve_flask_contexts from .exc import ( @@ -179,7 +180,7 @@ class IterationNode(BaseNode): thread_pool_id=self.thread_pool_id, ) - start_at = datetime.now(UTC).replace(tzinfo=None) + start_at = naive_utc_now() yield IterationRunStartedEvent( iteration_id=self.id, @@ -428,7 +429,7 @@ class IterationNode(BaseNode): """ run single iteration """ - iter_start_at = datetime.now(UTC).replace(tzinfo=None) + iter_start_at = naive_utc_now() try: rst = graph_engine.run() @@ -505,7 +506,7 @@ class IterationNode(BaseNode): variable_pool.add([self.node_id, "index"], next_index) if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() + duration = (naive_utc_now() - iter_start_at).total_seconds() iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, @@ -526,7 +527,7 @@ class IterationNode(BaseNode): if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() + duration = (naive_utc_now() - iter_start_at).total_seconds() iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, @@ -602,7 +603,7 @@ class IterationNode(BaseNode): if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() + duration = (naive_utc_now() - iter_start_at).total_seconds() iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, diff --git a/api/core/workflow/nodes/llm/llm_utils.py b/api/core/workflow/nodes/llm/llm_utils.py index 0966c87a1..2441e30c8 100644 --- a/api/core/workflow/nodes/llm/llm_utils.py +++ b/api/core/workflow/nodes/llm/llm_utils.py @@ -1,5 +1,4 @@ from collections.abc import Sequence -from datetime import UTC, datetime from typing import Optional, cast from sqlalchemy import select, update @@ -20,6 +19,7 @@ from core.variables.segments import ArrayAnySegment, ArrayFileSegment, FileSegme from core.workflow.entities.variable_pool import VariablePool from core.workflow.enums import SystemVariableKey from core.workflow.nodes.llm.entities import ModelConfig +from libs.datetime_utils import naive_utc_now from models import db from models.model import Conversation from models.provider import Provider, ProviderType @@ -149,7 +149,7 @@ def deduct_llm_quota(tenant_id: str, model_instance: ModelInstance, usage: LLMUs ) .values( quota_used=Provider.quota_used + used_quota, - last_used=datetime.now(tz=UTC).replace(tzinfo=None), + last_used=naive_utc_now(), ) ) session.execute(stmt) diff --git a/api/core/workflow/nodes/loop/loop_node.py b/api/core/workflow/nodes/loop/loop_node.py index 9a288c613..b2ab94312 100644 --- a/api/core/workflow/nodes/loop/loop_node.py +++ b/api/core/workflow/nodes/loop/loop_node.py @@ -2,7 +2,7 @@ import json import logging import time from collections.abc import Generator, Mapping, Sequence -from datetime import UTC, datetime +from datetime import datetime from typing import TYPE_CHECKING, Any, Literal, Optional, cast from configs import dify_config @@ -36,6 +36,7 @@ from core.workflow.nodes.event import NodeEvent, RunCompletedEvent from core.workflow.nodes.loop.entities import LoopNodeData from core.workflow.utils.condition.processor import ConditionProcessor from factories.variable_factory import TypeMismatchError, build_segment_with_type +from libs.datetime_utils import naive_utc_now if TYPE_CHECKING: from core.workflow.entities.variable_pool import VariablePool @@ -143,7 +144,7 @@ class LoopNode(BaseNode): thread_pool_id=self.thread_pool_id, ) - start_at = datetime.now(UTC).replace(tzinfo=None) + start_at = naive_utc_now() condition_processor = ConditionProcessor() # Start Loop event @@ -171,7 +172,7 @@ class LoopNode(BaseNode): try: check_break_result = False for i in range(loop_count): - loop_start_time = datetime.now(UTC).replace(tzinfo=None) + loop_start_time = naive_utc_now() # run single loop loop_result = yield from self._run_single_loop( graph_engine=graph_engine, @@ -185,7 +186,7 @@ class LoopNode(BaseNode): start_at=start_at, inputs=inputs, ) - loop_end_time = datetime.now(UTC).replace(tzinfo=None) + loop_end_time = naive_utc_now() single_loop_variable = {} for key, selector in loop_variable_selectors.items(): diff --git a/api/services/annotation_service.py b/api/services/annotation_service.py index 1a0fdfa42..45b246af1 100644 --- a/api/services/annotation_service.py +++ b/api/services/annotation_service.py @@ -1,4 +1,3 @@ -import datetime import uuid from typing import cast @@ -10,6 +9,7 @@ from werkzeug.exceptions import NotFound from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.model import App, AppAnnotationHitHistory, AppAnnotationSetting, Message, MessageAnnotation from services.feature_service import FeatureService from tasks.annotation.add_annotation_to_index_task import add_annotation_to_index_task @@ -473,7 +473,7 @@ class AppAnnotationService: raise NotFound("App annotation not found") annotation_setting.score_threshold = args["score_threshold"] annotation_setting.updated_user_id = current_user.id - annotation_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + annotation_setting.updated_at = naive_utc_now() db.session.add(annotation_setting) db.session.commit() diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 9fb048fac..f648b06de 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -1234,7 +1234,7 @@ class DocumentService: ) if document: document.dataset_process_rule_id = dataset_process_rule.id # type: ignore - document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.updated_at = naive_utc_now() document.created_from = created_from document.doc_form = knowledge_config.doc_form document.doc_language = knowledge_config.doc_language @@ -1552,7 +1552,7 @@ class DocumentService: document.parsing_completed_at = None document.cleaning_completed_at = None document.splitting_completed_at = None - document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.updated_at = naive_utc_now() document.created_from = created_from document.doc_form = document_data.doc_form db.session.add(document) @@ -1912,7 +1912,7 @@ class DocumentService: Returns: dict: Update information or None if no update needed """ - now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + now = naive_utc_now() if action == "enable": return DocumentService._prepare_enable_update(document, now) @@ -2040,8 +2040,8 @@ class SegmentService: word_count=len(content), tokens=tokens, status="completed", - indexing_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), - completed_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + indexing_at=naive_utc_now(), + completed_at=naive_utc_now(), created_by=current_user.id, ) if document.doc_form == "qa_model": @@ -2061,7 +2061,7 @@ class SegmentService: except Exception as e: logging.exception("create segment index failed") segment_document.enabled = False - segment_document.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment_document.disabled_at = naive_utc_now() segment_document.status = "error" segment_document.error = str(e) db.session.commit() @@ -2117,8 +2117,8 @@ class SegmentService: tokens=tokens, keywords=segment_item.get("keywords", []), status="completed", - indexing_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), - completed_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + indexing_at=naive_utc_now(), + completed_at=naive_utc_now(), created_by=current_user.id, ) if document.doc_form == "qa_model": @@ -2145,7 +2145,7 @@ class SegmentService: logging.exception("create segment index failed") for segment_document in segment_data_list: segment_document.enabled = False - segment_document.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment_document.disabled_at = naive_utc_now() segment_document.status = "error" segment_document.error = str(e) db.session.commit() @@ -2162,7 +2162,7 @@ class SegmentService: if segment.enabled != action: if not action: segment.enabled = action - segment.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.disabled_at = naive_utc_now() segment.disabled_by = current_user.id db.session.add(segment) db.session.commit() @@ -2260,10 +2260,10 @@ class SegmentService: segment.word_count = len(content) segment.tokens = tokens segment.status = "completed" - segment.indexing_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) - segment.completed_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.indexing_at = naive_utc_now() + segment.completed_at = naive_utc_now() segment.updated_by = current_user.id - segment.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.updated_at = naive_utc_now() segment.enabled = True segment.disabled_at = None segment.disabled_by = None @@ -2316,7 +2316,7 @@ class SegmentService: except Exception as e: logging.exception("update segment index failed") segment.enabled = False - segment.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.disabled_at = naive_utc_now() segment.status = "error" segment.error = str(e) db.session.commit() @@ -2418,7 +2418,7 @@ class SegmentService: if cache_result is not None: continue segment.enabled = False - segment.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.disabled_at = naive_utc_now() segment.disabled_by = current_user.id db.session.add(segment) real_deal_segment_ids.append(segment.id) @@ -2508,7 +2508,7 @@ class SegmentService: child_chunk.content = child_chunk_update_args.content child_chunk.word_count = len(child_chunk.content) child_chunk.updated_by = current_user.id - child_chunk.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + child_chunk.updated_at = naive_utc_now() child_chunk.type = "customized" update_child_chunks.append(child_chunk) else: @@ -2565,7 +2565,7 @@ class SegmentService: child_chunk.content = content child_chunk.word_count = len(content) child_chunk.updated_by = current_user.id - child_chunk.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + child_chunk.updated_at = naive_utc_now() child_chunk.type = "customized" db.session.add(child_chunk) VectorService.update_child_chunk_vector([], [child_chunk], [], dataset) diff --git a/api/services/file_service.py b/api/services/file_service.py index e234c2f32..4c0a0f451 100644 --- a/api/services/file_service.py +++ b/api/services/file_service.py @@ -1,4 +1,3 @@ -import datetime import hashlib import os import uuid @@ -18,6 +17,7 @@ from core.file import helpers as file_helpers from core.rag.extractor.extract_processor import ExtractProcessor from extensions.ext_database import db from extensions.ext_storage import storage +from libs.datetime_utils import naive_utc_now from libs.helper import extract_tenant_id from models.account import Account from models.enums import CreatorUserRole @@ -80,7 +80,7 @@ class FileService: mime_type=mimetype, created_by_role=(CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER), created_by=user.id, - created_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + created_at=naive_utc_now(), used=False, hash=hashlib.sha3_256(content).hexdigest(), source_url=source_url, @@ -131,10 +131,10 @@ class FileService: mime_type="text/plain", created_by=current_user.id, created_by_role=CreatorUserRole.ACCOUNT, - created_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + created_at=naive_utc_now(), used=True, used_by=current_user.id, - used_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + used_at=naive_utc_now(), ) db.session.add(upload_file) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 2a83588f4..fd222f59d 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -1,5 +1,4 @@ import copy -import datetime import logging from typing import Optional @@ -8,6 +7,7 @@ from flask_login import current_user from core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSource from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding from services.dataset_service import DocumentService from services.entities.knowledge_entities.knowledge_entities import ( @@ -69,7 +69,7 @@ class MetadataService: old_name = metadata.name metadata.name = name metadata.updated_by = current_user.id - metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + metadata.updated_at = naive_utc_now() # update related documents dataset_metadata_bindings = ( diff --git a/api/services/model_load_balancing_service.py b/api/services/model_load_balancing_service.py index fe28aa006..f8dd70c79 100644 --- a/api/services/model_load_balancing_service.py +++ b/api/services/model_load_balancing_service.py @@ -1,4 +1,3 @@ -import datetime import json import logging from json import JSONDecodeError @@ -17,6 +16,7 @@ from core.model_runtime.entities.provider_entities import ( from core.model_runtime.model_providers.model_provider_factory import ModelProviderFactory from core.provider_manager import ProviderManager from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.provider import LoadBalancingModelConfig logger = logging.getLogger(__name__) @@ -371,7 +371,7 @@ class ModelLoadBalancingService: load_balancing_config.name = name load_balancing_config.enabled = enabled - load_balancing_config.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + load_balancing_config.updated_at = naive_utc_now() db.session.commit() self._clear_credentials_cache(tenant_id, config_id) diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index b52f4924b..9f01bcb66 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -1,5 +1,4 @@ import dataclasses -import datetime import logging from collections.abc import Mapping, Sequence from enum import StrEnum @@ -23,6 +22,7 @@ from core.workflow.nodes.variable_assigner.common.helpers import get_updated_var from core.workflow.variable_loader import VariableLoader from factories.file_factory import StorageKeyLoader from factories.variable_factory import build_segment, segment_to_variable +from libs.datetime_utils import naive_utc_now from models import App, Conversation from models.enums import DraftVariableType from models.workflow import Workflow, WorkflowDraftVariable, is_system_variable_editable @@ -231,7 +231,7 @@ class WorkflowDraftVariableService: variable.set_name(name) if value is not None: variable.set_value(value) - variable.last_edited_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + variable.last_edited_at = naive_utc_now() self._session.flush() return variable diff --git a/api/tasks/add_document_to_index_task.py b/api/tasks/add_document_to_index_task.py index c5ee4ce3f..d841b09d1 100644 --- a/api/tasks/add_document_to_index_task.py +++ b/api/tasks/add_document_to_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -10,6 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from core.rag.models.document import ChildDocument, Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import DatasetAutoDisableLog, DocumentSegment from models.dataset import Document as DatasetDocument @@ -95,7 +95,7 @@ def add_document_to_index_task(dataset_document_id: str): DocumentSegment.enabled: True, DocumentSegment.disabled_at: None, DocumentSegment.disabled_by: None, - DocumentSegment.updated_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.updated_at: naive_utc_now(), } ) db.session.commit() @@ -107,7 +107,7 @@ def add_document_to_index_task(dataset_document_id: str): except Exception as e: logging.exception("add document to index failed") dataset_document.enabled = False - dataset_document.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + dataset_document.disabled_at = naive_utc_now() dataset_document.indexing_status = "error" dataset_document.error = str(e) db.session.commit() diff --git a/api/tasks/annotation/enable_annotation_reply_task.py b/api/tasks/annotation/enable_annotation_reply_task.py index 44c65c078..f44b8aff6 100644 --- a/api/tasks/annotation/enable_annotation_reply_task.py +++ b/api/tasks/annotation/enable_annotation_reply_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -9,6 +8,7 @@ from core.rag.datasource.vdb.vector_factory import Vector from core.rag.models.document import Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset from models.model import App, AppAnnotationSetting, MessageAnnotation from services.dataset_service import DatasetCollectionBindingService @@ -72,7 +72,7 @@ def enable_annotation_reply_task( annotation_setting.score_threshold = score_threshold annotation_setting.collection_binding_id = dataset_collection_binding.id annotation_setting.updated_user_id = user_id - annotation_setting.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + annotation_setting.updated_at = naive_utc_now() db.session.add(annotation_setting) else: new_app_annotation_setting = AppAnnotationSetting( diff --git a/api/tasks/batch_create_segment_to_index_task.py b/api/tasks/batch_create_segment_to_index_task.py index dee43cd85..0dff1da14 100644 --- a/api/tasks/batch_create_segment_to_index_task.py +++ b/api/tasks/batch_create_segment_to_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import tempfile import time @@ -17,6 +16,7 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage from libs import helper +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from models.model import UploadFile from services.vector_service import VectorService @@ -123,9 +123,9 @@ def batch_create_segment_to_index_task( word_count=len(content), tokens=tokens, created_by=user_id, - indexing_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + indexing_at=naive_utc_now(), status="completed", - completed_at=datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + completed_at=naive_utc_now(), ) if dataset_document.doc_form == "qa_model": segment_document.answer = segment["answer"] diff --git a/api/tasks/create_segment_to_index_task.py b/api/tasks/create_segment_to_index_task.py index 543a51285..6176b0d0f 100644 --- a/api/tasks/create_segment_to_index_task.py +++ b/api/tasks/create_segment_to_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time from typing import Optional @@ -10,6 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from core.rag.models.document import Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import DocumentSegment @@ -41,7 +41,7 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] db.session.query(DocumentSegment).filter_by(id=segment.id).update( { DocumentSegment.status: "indexing", - DocumentSegment.indexing_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.indexing_at: naive_utc_now(), } ) db.session.commit() @@ -79,7 +79,7 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] db.session.query(DocumentSegment).filter_by(id=segment.id).update( { DocumentSegment.status: "completed", - DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.completed_at: naive_utc_now(), } ) db.session.commit() @@ -89,7 +89,7 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] except Exception as e: logging.exception("create segment to index failed") segment.enabled = False - segment.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.disabled_at = naive_utc_now() segment.status = "error" segment.error = str(e) db.session.commit() diff --git a/api/tasks/document_indexing_sync_task.py b/api/tasks/document_indexing_sync_task.py index 993b2ac40..e2ea4d3ed 100644 --- a/api/tasks/document_indexing_sync_task.py +++ b/api/tasks/document_indexing_sync_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -9,6 +8,7 @@ from core.indexing_runner import DocumentIsPausedError, IndexingRunner from core.rag.extractor.notion_extractor import NotionExtractor from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from models.source import DataSourceOauthBinding @@ -72,7 +72,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): # check the page is updated if last_edited_time != page_edited_time: document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.processing_started_at = naive_utc_now() db.session.commit() # delete all document segment and index diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index 053c0c5f4..26f2e9508 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -8,6 +7,7 @@ from celery import shared_task # type: ignore from core.indexing_runner import DocumentIsPausedError, IndexingRunner from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment @@ -31,7 +31,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str): return document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.processing_started_at = naive_utc_now() db.session.commit() # delete all document segment and index diff --git a/api/tasks/duplicate_document_indexing_task.py b/api/tasks/duplicate_document_indexing_task.py index faa7e2b8d..11f4722b9 100644 --- a/api/tasks/duplicate_document_indexing_task.py +++ b/api/tasks/duplicate_document_indexing_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -9,6 +8,7 @@ from configs import dify_config from core.indexing_runner import DocumentIsPausedError, IndexingRunner from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from services.feature_service import FeatureService @@ -55,7 +55,7 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): if document: document.indexing_status = "error" document.error = str(e) - document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.stopped_at = naive_utc_now() db.session.add(document) db.session.commit() return @@ -86,7 +86,7 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): db.session.commit() document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.processing_started_at = naive_utc_now() documents.append(document) db.session.add(document) db.session.commit() diff --git a/api/tasks/enable_segment_to_index_task.py b/api/tasks/enable_segment_to_index_task.py index f801c9d9e..ad9ebf4d9 100644 --- a/api/tasks/enable_segment_to_index_task.py +++ b/api/tasks/enable_segment_to_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -10,6 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from core.rag.models.document import ChildDocument, Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import DocumentSegment @@ -89,7 +89,7 @@ def enable_segment_to_index_task(segment_id: str): except Exception as e: logging.exception("enable segment to index failed") segment.enabled = False - segment.disabled_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + segment.disabled_at = naive_utc_now() segment.status = "error" segment.error = str(e) db.session.commit() diff --git a/api/tasks/enable_segments_to_index_task.py b/api/tasks/enable_segments_to_index_task.py index 777380631..781554894 100644 --- a/api/tasks/enable_segments_to_index_task.py +++ b/api/tasks/enable_segments_to_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -10,6 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto from core.rag.models.document import ChildDocument, Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, DocumentSegment from models.dataset import Document as DatasetDocument @@ -103,7 +103,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i { "error": str(e), "status": "error", - "disabled_at": datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + "disabled_at": naive_utc_now(), "enabled": False, } ) diff --git a/api/tasks/remove_document_from_index_task.py b/api/tasks/remove_document_from_index_task.py index 524130a29..86133c46f 100644 --- a/api/tasks/remove_document_from_index_task.py +++ b/api/tasks/remove_document_from_index_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -8,6 +7,7 @@ from celery import shared_task # type: ignore from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Document, DocumentSegment @@ -54,9 +54,9 @@ def remove_document_from_index_task(document_id: str): db.session.query(DocumentSegment).where(DocumentSegment.document_id == document.id).update( { DocumentSegment.enabled: False, - DocumentSegment.disabled_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.disabled_at: naive_utc_now(), DocumentSegment.disabled_by: document.disabled_by, - DocumentSegment.updated_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), + DocumentSegment.updated_at: naive_utc_now(), } ) db.session.commit() diff --git a/api/tasks/retry_document_indexing_task.py b/api/tasks/retry_document_indexing_task.py index 26b41aff2..3a98af980 100644 --- a/api/tasks/retry_document_indexing_task.py +++ b/api/tasks/retry_document_indexing_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -9,6 +8,7 @@ from core.indexing_runner import IndexingRunner from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from services.feature_service import FeatureService @@ -51,7 +51,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): if document: document.indexing_status = "error" document.error = str(e) - document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.stopped_at = naive_utc_now() db.session.add(document) db.session.commit() redis_client.delete(retry_indexing_cache_key) @@ -79,7 +79,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): db.session.commit() document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.processing_started_at = naive_utc_now() db.session.add(document) db.session.commit() @@ -89,7 +89,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): except Exception as ex: document.indexing_status = "error" document.error = str(ex) - document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.stopped_at = naive_utc_now() db.session.add(document) db.session.commit() logging.info(click.style(str(ex), fg="yellow")) diff --git a/api/tasks/sync_website_document_indexing_task.py b/api/tasks/sync_website_document_indexing_task.py index f112a97d2..a14b8ed0f 100644 --- a/api/tasks/sync_website_document_indexing_task.py +++ b/api/tasks/sync_website_document_indexing_task.py @@ -1,4 +1,3 @@ -import datetime import logging import time @@ -9,6 +8,7 @@ from core.indexing_runner import IndexingRunner from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document, DocumentSegment from services.feature_service import FeatureService @@ -46,7 +46,7 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): if document: document.indexing_status = "error" document.error = str(e) - document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.stopped_at = naive_utc_now() db.session.add(document) db.session.commit() redis_client.delete(sync_indexing_cache_key) @@ -72,7 +72,7 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): db.session.commit() document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.processing_started_at = naive_utc_now() db.session.add(document) db.session.commit() @@ -82,7 +82,7 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): except Exception as ex: document.indexing_status = "error" document.error = str(ex) - document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + document.stopped_at = naive_utc_now() db.session.add(document) db.session.commit() logging.info(click.style(str(ex), fg="yellow")) diff --git a/api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py b/api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py index f26be6702..3616e53fe 100644 --- a/api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py +++ b/api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py @@ -1,4 +1,3 @@ -import datetime import uuid from collections import OrderedDict from typing import Any, NamedTuple @@ -13,6 +12,7 @@ from controllers.console.app.workflow_draft_variable import ( ) from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from factories.variable_factory import build_segment +from libs.datetime_utils import naive_utc_now from models.workflow import WorkflowDraftVariable from services.workflow_draft_variable_service import WorkflowDraftVariableList @@ -57,7 +57,7 @@ class TestWorkflowDraftVariableFields: ) sys_var.id = str(uuid.uuid4()) - sys_var.last_edited_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + sys_var.last_edited_at = naive_utc_now() sys_var.visible = True expected_without_value = OrderedDict( @@ -88,7 +88,7 @@ class TestWorkflowDraftVariableFields: ) node_var.id = str(uuid.uuid4()) - node_var.last_edited_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + node_var.last_edited_at = naive_utc_now() expected_without_value: OrderedDict[str, Any] = OrderedDict( { diff --git a/api/tests/unit_tests/core/repositories/test_celery_workflow_execution_repository.py b/api/tests/unit_tests/core/repositories/test_celery_workflow_execution_repository.py index 450501c25..e7733b231 100644 --- a/api/tests/unit_tests/core/repositories/test_celery_workflow_execution_repository.py +++ b/api/tests/unit_tests/core/repositories/test_celery_workflow_execution_repository.py @@ -5,7 +5,6 @@ These tests verify the Celery-based asynchronous storage functionality for workflow execution data. """ -from datetime import UTC, datetime from unittest.mock import Mock, patch from uuid import uuid4 @@ -13,6 +12,7 @@ import pytest from core.repositories.celery_workflow_execution_repository import CeleryWorkflowExecutionRepository from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowType +from libs.datetime_utils import naive_utc_now from models import Account, EndUser from models.enums import WorkflowRunTriggeredFrom @@ -56,7 +56,7 @@ def sample_workflow_execution(): workflow_version="1.0", graph={"nodes": [], "edges": []}, inputs={"input1": "value1"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) @@ -199,7 +199,7 @@ class TestCeleryWorkflowExecutionRepository: workflow_version="1.0", graph={"nodes": [], "edges": []}, inputs={"input1": "value1"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) exec2 = WorkflowExecution.new( id_=str(uuid4()), @@ -208,7 +208,7 @@ class TestCeleryWorkflowExecutionRepository: workflow_version="1.0", graph={"nodes": [], "edges": []}, inputs={"input2": "value2"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Save both executions @@ -235,7 +235,7 @@ class TestCeleryWorkflowExecutionRepository: workflow_version="1.0", graph={"nodes": [], "edges": []}, inputs={"input1": "value1"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) repo.save(execution) diff --git a/api/tests/unit_tests/core/repositories/test_celery_workflow_node_execution_repository.py b/api/tests/unit_tests/core/repositories/test_celery_workflow_node_execution_repository.py index b38d994f0..0c6fdc8f9 100644 --- a/api/tests/unit_tests/core/repositories/test_celery_workflow_node_execution_repository.py +++ b/api/tests/unit_tests/core/repositories/test_celery_workflow_node_execution_repository.py @@ -5,7 +5,6 @@ These tests verify the Celery-based asynchronous storage functionality for workflow node execution data. """ -from datetime import UTC, datetime from unittest.mock import Mock, patch from uuid import uuid4 @@ -18,6 +17,7 @@ from core.workflow.entities.workflow_node_execution import ( ) from core.workflow.nodes.enums import NodeType from core.workflow.repositories.workflow_node_execution_repository import OrderConfig +from libs.datetime_utils import naive_utc_now from models import Account, EndUser from models.workflow import WorkflowNodeExecutionTriggeredFrom @@ -65,7 +65,7 @@ def sample_workflow_node_execution(): title="Test Node", inputs={"input1": "value1"}, status=WorkflowNodeExecutionStatus.RUNNING, - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) @@ -263,7 +263,7 @@ class TestCeleryWorkflowNodeExecutionRepository: title="Node 1", inputs={"input1": "value1"}, status=WorkflowNodeExecutionStatus.RUNNING, - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) exec2 = WorkflowNodeExecution( id=str(uuid4()), @@ -276,7 +276,7 @@ class TestCeleryWorkflowNodeExecutionRepository: title="Node 2", inputs={"input2": "value2"}, status=WorkflowNodeExecutionStatus.RUNNING, - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) # Save both executions @@ -314,7 +314,7 @@ class TestCeleryWorkflowNodeExecutionRepository: title="Node 2", inputs={}, status=WorkflowNodeExecutionStatus.RUNNING, - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) exec2 = WorkflowNodeExecution( id=str(uuid4()), @@ -327,7 +327,7 @@ class TestCeleryWorkflowNodeExecutionRepository: title="Node 1", inputs={}, status=WorkflowNodeExecutionStatus.RUNNING, - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) # Save in random order diff --git a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py index 137e8b889..8b1b9a55b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py +++ b/api/tests/unit_tests/core/workflow/nodes/answer/test_answer_stream_processor.py @@ -1,6 +1,5 @@ import uuid from collections.abc import Generator -from datetime import UTC, datetime from core.workflow.entities.variable_pool import VariablePool from core.workflow.graph_engine.entities.event import ( @@ -15,6 +14,7 @@ from core.workflow.nodes.answer.answer_stream_processor import AnswerStreamProce from core.workflow.nodes.enums import NodeType from core.workflow.nodes.start.entities import StartNodeData from core.workflow.system_variable import SystemVariable +from libs.datetime_utils import naive_utc_now def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]: @@ -29,7 +29,7 @@ def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngine def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]: - route_node_state = RouteNodeState(node_id=next_node_id, start_at=datetime.now(UTC).replace(tzinfo=None)) + route_node_state = RouteNodeState(node_id=next_node_id, start_at=naive_utc_now()) parallel_id = graph.node_parallel_mapping.get(next_node_id) parallel_start_node_id = None @@ -68,7 +68,7 @@ def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEve ) route_node_state.status = RouteNodeState.Status.SUCCESS - route_node_state.finished_at = datetime.now(UTC).replace(tzinfo=None) + route_node_state.finished_at = naive_utc_now() yield NodeRunSucceededEvent( id=node_execution_id, node_id=next_node_id, diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 4866db1fd..1d2eba1e7 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -1,5 +1,4 @@ import json -from datetime import UTC, datetime from unittest.mock import MagicMock import pytest @@ -23,6 +22,7 @@ from core.workflow.repositories.workflow_execution_repository import WorkflowExe from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager +from libs.datetime_utils import naive_utc_now from models.enums import CreatorUserRole from models.model import AppMode from models.workflow import Workflow, WorkflowRun @@ -145,8 +145,8 @@ def real_workflow(): workflow.graph = json.dumps(graph_data) workflow.features = json.dumps({"file_upload": {"enabled": False}}) workflow.created_by = "test-user-id" - workflow.created_at = datetime.now(UTC).replace(tzinfo=None) - workflow.updated_at = datetime.now(UTC).replace(tzinfo=None) + workflow.created_at = naive_utc_now() + workflow.updated_at = naive_utc_now() workflow._environment_variables = "{}" workflow._conversation_variables = "{}" @@ -169,7 +169,7 @@ def real_workflow_run(): workflow_run.outputs = json.dumps({"answer": "test answer"}) workflow_run.created_by_role = CreatorUserRole.ACCOUNT workflow_run.created_by = "test-user-id" - workflow_run.created_at = datetime.now(UTC).replace(tzinfo=None) + workflow_run.created_at = naive_utc_now() return workflow_run @@ -211,7 +211,7 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Pre-populate the cache with the workflow execution @@ -245,7 +245,7 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Pre-populate the cache with the workflow execution @@ -282,7 +282,7 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Pre-populate the cache with the workflow execution @@ -335,7 +335,7 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Pre-populate the cache with the workflow execution @@ -366,7 +366,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): event.process_data = {"process": "test process"} event.outputs = {"output": "test output"} event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100} - event.start_at = datetime.now(UTC).replace(tzinfo=None) + event.start_at = naive_utc_now() # Create a real node execution @@ -379,7 +379,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): node_id="test-node-id", node_type=NodeType.LLM, title="Test Node", - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) # Pre-populate the cache with the node execution @@ -409,7 +409,7 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl workflow_type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, - started_at=datetime.now(UTC).replace(tzinfo=None), + started_at=naive_utc_now(), ) # Pre-populate the cache with the workflow execution @@ -443,7 +443,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): event.process_data = {"process": "test process"} event.outputs = {"output": "test output"} event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100} - event.start_at = datetime.now(UTC).replace(tzinfo=None) + event.start_at = naive_utc_now() event.error = "Test error message" # Create a real node execution @@ -457,7 +457,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): node_id="test-node-id", node_type=NodeType.LLM, title="Test Node", - created_at=datetime.now(UTC).replace(tzinfo=None), + created_at=naive_utc_now(), ) # Pre-populate the cache with the node execution diff --git a/api/tests/unit_tests/services/test_dataset_service_batch_update_document_status.py b/api/tests/unit_tests/services/test_dataset_service_batch_update_document_status.py index dc09aca5b..1881ceac2 100644 --- a/api/tests/unit_tests/services/test_dataset_service_batch_update_document_status.py +++ b/api/tests/unit_tests/services/test_dataset_service_batch_update_document_status.py @@ -93,16 +93,15 @@ class TestDatasetServiceBatchUpdateDocumentStatus: with ( patch("services.dataset_service.DocumentService.get_document") as mock_get_doc, patch("extensions.ext_database.db.session") as mock_db, - patch("services.dataset_service.datetime") as mock_datetime, + patch("services.dataset_service.naive_utc_now") as mock_naive_utc_now, ): current_time = datetime.datetime(2023, 1, 1, 12, 0, 0) - mock_datetime.datetime.now.return_value = current_time - mock_datetime.UTC = datetime.UTC + mock_naive_utc_now.return_value = current_time yield { "get_document": mock_get_doc, "db_session": mock_db, - "datetime": mock_datetime, + "naive_utc_now": mock_naive_utc_now, "current_time": current_time, } @@ -120,21 +119,21 @@ class TestDatasetServiceBatchUpdateDocumentStatus: assert document.enabled == True assert document.disabled_at is None assert document.disabled_by is None - assert document.updated_at == current_time.replace(tzinfo=None) + assert document.updated_at == current_time def _assert_document_disabled(self, document: Mock, user_id: str, current_time: datetime.datetime): """Helper method to verify document was disabled correctly.""" assert document.enabled == False - assert document.disabled_at == current_time.replace(tzinfo=None) + assert document.disabled_at == current_time assert document.disabled_by == user_id - assert document.updated_at == current_time.replace(tzinfo=None) + assert document.updated_at == current_time def _assert_document_archived(self, document: Mock, user_id: str, current_time: datetime.datetime): """Helper method to verify document was archived correctly.""" assert document.archived == True - assert document.archived_at == current_time.replace(tzinfo=None) + assert document.archived_at == current_time assert document.archived_by == user_id - assert document.updated_at == current_time.replace(tzinfo=None) + assert document.updated_at == current_time def _assert_document_unarchived(self, document: Mock): """Helper method to verify document was unarchived correctly.""" @@ -430,7 +429,7 @@ class TestDatasetServiceBatchUpdateDocumentStatus: # Verify document attributes were updated correctly self._assert_document_unarchived(archived_doc) - assert archived_doc.updated_at == mock_document_service_dependencies["current_time"].replace(tzinfo=None) + assert archived_doc.updated_at == mock_document_service_dependencies["current_time"] # Verify Redis cache was set (because document is enabled) redis_mock.setex.assert_called_once_with("document_doc-1_indexing", 600, 1) @@ -495,9 +494,7 @@ class TestDatasetServiceBatchUpdateDocumentStatus: # Verify document was unarchived self._assert_document_unarchived(archived_disabled_doc) - assert archived_disabled_doc.updated_at == mock_document_service_dependencies["current_time"].replace( - tzinfo=None - ) + assert archived_disabled_doc.updated_at == mock_document_service_dependencies["current_time"] # Verify no Redis cache was set (document is disabled) redis_mock.setex.assert_not_called()