From 74ab057f5677f5efb8830f1c53f621683dc59087 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 13 Aug 2025 09:46:02 +0800 Subject: [PATCH] refactor: improve Redis wrapper type hints and fix None value handling (#23845) --- api/extensions/ext_redis.py | 80 +++++++++++++++++++++++++++--- api/pyproject.toml | 1 + api/schedule/queue_monitor_task.py | 11 ++++ api/uv.lock | 17 ++++++- 4 files changed, 101 insertions(+), 8 deletions(-) diff --git a/api/extensions/ext_redis.py b/api/extensions/ext_redis.py index 14b9273e9..914d6219c 100644 --- a/api/extensions/ext_redis.py +++ b/api/extensions/ext_redis.py @@ -1,18 +1,23 @@ import functools import logging from collections.abc import Callable -from typing import Any, Union +from datetime import timedelta +from typing import TYPE_CHECKING, Any, Union import redis from redis import RedisError from redis.cache import CacheConfig from redis.cluster import ClusterNode, RedisCluster from redis.connection import Connection, SSLConnection +from redis.lock import Lock from redis.sentinel import Sentinel from configs import dify_config from dify_app import DifyApp +if TYPE_CHECKING: + from redis.lock import Lock + logger = logging.getLogger(__name__) @@ -28,8 +33,8 @@ class RedisClientWrapper: a failover in a Sentinel-managed Redis setup. Attributes: - _client (redis.Redis): The actual Redis client instance. It remains None until - initialized with the `initialize` method. + _client: The actual Redis client instance. It remains None until + initialized with the `initialize` method. Methods: initialize(client): Initializes the Redis client if it hasn't been initialized already. @@ -37,20 +42,78 @@ class RedisClientWrapper: if the client is not initialized. """ - def __init__(self): + _client: Union[redis.Redis, RedisCluster, None] + + def __init__(self) -> None: self._client = None - def initialize(self, client): + def initialize(self, client: Union[redis.Redis, RedisCluster]) -> None: if self._client is None: self._client = client - def __getattr__(self, item): + if TYPE_CHECKING: + # Type hints for IDE support and static analysis + # These are not executed at runtime but provide type information + def get(self, name: str | bytes) -> Any: ... + + def set( + self, + name: str | bytes, + value: Any, + ex: int | None = None, + px: int | None = None, + nx: bool = False, + xx: bool = False, + keepttl: bool = False, + get: bool = False, + exat: int | None = None, + pxat: int | None = None, + ) -> Any: ... + + def setex(self, name: str | bytes, time: int | timedelta, value: Any) -> Any: ... + def setnx(self, name: str | bytes, value: Any) -> Any: ... + def delete(self, *names: str | bytes) -> Any: ... + def incr(self, name: str | bytes, amount: int = 1) -> Any: ... + def expire( + self, + name: str | bytes, + time: int | timedelta, + nx: bool = False, + xx: bool = False, + gt: bool = False, + lt: bool = False, + ) -> Any: ... + def lock( + self, + name: str, + timeout: float | None = None, + sleep: float = 0.1, + blocking: bool = True, + blocking_timeout: float | None = None, + thread_local: bool = True, + ) -> Lock: ... + def zadd( + self, + name: str | bytes, + mapping: dict[str | bytes | int | float, float | int | str | bytes], + nx: bool = False, + xx: bool = False, + ch: bool = False, + incr: bool = False, + gt: bool = False, + lt: bool = False, + ) -> Any: ... + def zremrangebyscore(self, name: str | bytes, min: float | str, max: float | str) -> Any: ... + def zcard(self, name: str | bytes) -> Any: ... + def getdel(self, name: str | bytes) -> Any: ... + + def __getattr__(self, item: str) -> Any: if self._client is None: raise RuntimeError("Redis client is not initialized. Call init_app first.") return getattr(self._client, item) -redis_client = RedisClientWrapper() +redis_client: RedisClientWrapper = RedisClientWrapper() def init_app(app: DifyApp): @@ -80,6 +143,9 @@ def init_app(app: DifyApp): if dify_config.REDIS_USE_SENTINEL: assert dify_config.REDIS_SENTINELS is not None, "REDIS_SENTINELS must be set when REDIS_USE_SENTINEL is True" + assert dify_config.REDIS_SENTINEL_SERVICE_NAME is not None, ( + "REDIS_SENTINEL_SERVICE_NAME must be set when REDIS_USE_SENTINEL is True" + ) sentinel_hosts = [ (node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",") ] diff --git a/api/pyproject.toml b/api/pyproject.toml index 4b395276e..de472c870 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -162,6 +162,7 @@ dev = [ "pandas-stubs~=2.2.3", "scipy-stubs>=1.15.3.0", "types-python-http-client>=3.3.7.20240910", + "types-redis>=4.6.0.20241004", ] ############################################################ diff --git a/api/schedule/queue_monitor_task.py b/api/schedule/queue_monitor_task.py index f0d3bed05..5868450a1 100644 --- a/api/schedule/queue_monitor_task.py +++ b/api/schedule/queue_monitor_task.py @@ -24,9 +24,20 @@ def queue_monitor_task(): queue_name = "dataset" threshold = dify_config.QUEUE_MONITOR_THRESHOLD + if threshold is None: + logging.warning(click.style("QUEUE_MONITOR_THRESHOLD is not configured, skipping monitoring", fg="yellow")) + return + try: queue_length = celery_redis.llen(f"{queue_name}") logging.info(click.style(f"Start monitor {queue_name}", fg="green")) + + if queue_length is None: + logging.error( + click.style(f"Failed to get queue length for {queue_name} - Redis may be unavailable", fg="red") + ) + return + logging.info(click.style(f"Queue length: {queue_length}", fg="green")) if queue_length >= threshold: diff --git a/api/uv.lock b/api/uv.lock index ea2c1bef5..870975418 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11, <3.13" resolution-markers = [ "python_full_version >= '3.12.4' and platform_python_implementation != 'PyPy' and sys_platform == 'linux'", @@ -1371,6 +1371,7 @@ dev = [ { name = "types-python-http-client" }, { name = "types-pywin32" }, { name = "types-pyyaml" }, + { name = "types-redis" }, { name = "types-regex" }, { name = "types-requests" }, { name = "types-requests-oauthlib" }, @@ -1557,6 +1558,7 @@ dev = [ { name = "types-python-http-client", specifier = ">=3.3.7.20240910" }, { name = "types-pywin32", specifier = "~=310.0.0" }, { name = "types-pyyaml", specifier = "~=6.0.12" }, + { name = "types-redis", specifier = ">=4.6.0.20241004" }, { name = "types-regex", specifier = "~=2024.11.6" }, { name = "types-requests", specifier = "~=2.32.0" }, { name = "types-requests-oauthlib", specifier = "~=2.0.0" }, @@ -6064,6 +6066,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/99/5f/e0af6f7f6a260d9af67e1db4f54d732abad514252a7a378a6c4d17dd1036/types_pyyaml-6.0.12.20250516-py3-none-any.whl", hash = "sha256:8478208feaeb53a34cb5d970c56a7cd76b72659442e733e268a94dc72b2d0530", size = 20312, upload-time = "2025-05-16T03:08:04.019Z" }, ] +[[package]] +name = "types-redis" +version = "4.6.0.20241004" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cryptography" }, + { name = "types-pyopenssl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3a/95/c054d3ac940e8bac4ca216470c80c26688a0e79e09f520a942bb27da3386/types-redis-4.6.0.20241004.tar.gz", hash = "sha256:5f17d2b3f9091ab75384153bfa276619ffa1cf6a38da60e10d5e6749cc5b902e", size = 49679, upload-time = "2024-10-04T02:43:59.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/55/82/7d25dce10aad92d2226b269bce2f85cfd843b4477cd50245d7d40ecf8f89/types_redis-4.6.0.20241004-py3-none-any.whl", hash = "sha256:ef5da68cb827e5f606c8f9c0b49eeee4c2669d6d97122f301d3a55dc6a63f6ed", size = 58737, upload-time = "2024-10-04T02:43:57.968Z" }, +] + [[package]] name = "types-regex" version = "2024.11.6.20250403"