chore: replace pseudo-random generators with secrets module (#20616)
This commit is contained in:
@@ -43,6 +43,7 @@ select = [
|
||||
"S307", # suspicious-eval-usage, disallow use of `eval` and `ast.literal_eval`
|
||||
"S301", # suspicious-pickle-usage, disallow use of `pickle` and its wrappers.
|
||||
"S302", # suspicious-marshal-usage, disallow use of `marshal` module
|
||||
"S311", # suspicious-non-cryptographic-random-usage
|
||||
]
|
||||
|
||||
ignore = [
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
import random
|
||||
import secrets
|
||||
from typing import cast
|
||||
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
@@ -38,7 +38,7 @@ def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEnt
|
||||
if len(text_chunks) == 0:
|
||||
return True
|
||||
|
||||
text_chunk = random.choice(text_chunks)
|
||||
text_chunk = secrets.choice(text_chunks)
|
||||
|
||||
try:
|
||||
model_provider_factory = ModelProviderFactory(tenant_id)
|
||||
|
@@ -1,8 +1,9 @@
|
||||
import base64
|
||||
import json
|
||||
import secrets
|
||||
import string
|
||||
from collections.abc import Mapping
|
||||
from copy import deepcopy
|
||||
from random import randint
|
||||
from typing import Any, Literal
|
||||
from urllib.parse import urlencode, urlparse
|
||||
|
||||
@@ -434,4 +435,4 @@ def _generate_random_string(n: int) -> str:
|
||||
>>> _generate_random_string(5)
|
||||
'abcde'
|
||||
"""
|
||||
return "".join([chr(randint(97, 122)) for _ in range(n)])
|
||||
return "".join(secrets.choice(string.ascii_lowercase) for _ in range(n))
|
||||
|
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
import secrets
|
||||
import string
|
||||
import subprocess
|
||||
import time
|
||||
@@ -176,7 +176,7 @@ def generate_string(n):
|
||||
letters_digits = string.ascii_letters + string.digits
|
||||
result = ""
|
||||
for i in range(n):
|
||||
result += random.choice(letters_digits)
|
||||
result += secrets.choice(letters_digits)
|
||||
|
||||
return result
|
||||
|
||||
|
@@ -1,7 +1,6 @@
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import secrets
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
@@ -261,7 +260,7 @@ class AccountService:
|
||||
|
||||
@staticmethod
|
||||
def generate_account_deletion_verification_code(account: Account) -> tuple[str, str]:
|
||||
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
|
||||
code = "".join([str(secrets.randbelow(exclusive_upper_bound=10)) for _ in range(6)])
|
||||
token = TokenManager.generate_token(
|
||||
account=account, token_type="account_deletion", additional_data={"code": code}
|
||||
)
|
||||
@@ -429,7 +428,7 @@ class AccountService:
|
||||
additional_data: dict[str, Any] = {},
|
||||
):
|
||||
if not code:
|
||||
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
|
||||
code = "".join([str(secrets.randbelow(exclusive_upper_bound=10)) for _ in range(6)])
|
||||
additional_data["code"] = code
|
||||
token = TokenManager.generate_token(
|
||||
account=account, email=email, token_type="reset_password", additional_data=additional_data
|
||||
@@ -456,7 +455,7 @@ class AccountService:
|
||||
|
||||
raise EmailCodeLoginRateLimitExceededError()
|
||||
|
||||
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
|
||||
code = "".join([str(secrets.randbelow(exclusive_upper_bound=10)) for _ in range(6)])
|
||||
token = TokenManager.generate_token(
|
||||
account=account, email=email, token_type="email_code_login", additional_data={"code": code}
|
||||
)
|
||||
|
@@ -2,7 +2,7 @@ import copy
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import secrets
|
||||
import time
|
||||
import uuid
|
||||
from collections import Counter
|
||||
@@ -970,7 +970,7 @@ class DocumentService:
|
||||
documents.append(document)
|
||||
batch = document.batch
|
||||
else:
|
||||
batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999))
|
||||
batch = time.strftime("%Y%m%d%H%M%S") + str(100000 + secrets.randbelow(exclusive_upper_bound=900000))
|
||||
# save process rule
|
||||
if not dataset_process_rule:
|
||||
process_rule = knowledge_config.process_rule
|
||||
|
@@ -1,4 +1,4 @@
|
||||
import random
|
||||
import secrets
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
@@ -66,7 +66,7 @@ class WebAppAuthService:
|
||||
if email is None:
|
||||
raise ValueError("Email must be provided.")
|
||||
|
||||
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
|
||||
code = "".join([str(secrets.randbelow(exclusive_upper_bound=10)) for _ in range(6)])
|
||||
token = TokenManager.generate_token(
|
||||
account=account, email=email, token_type="webapp_email_code_login", additional_data={"code": code}
|
||||
)
|
||||
|
@@ -1,4 +1,4 @@
|
||||
import random
|
||||
import secrets
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
@@ -34,7 +34,7 @@ def test_retry_logic_success(mock_request):
|
||||
side_effects = []
|
||||
|
||||
for _ in range(SSRF_DEFAULT_MAX_RETRIES):
|
||||
status_code = random.choice(STATUS_FORCELIST)
|
||||
status_code = secrets.choice(STATUS_FORCELIST)
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = status_code
|
||||
side_effects.append(mock_response)
|
||||
|
Reference in New Issue
Block a user