chore(api/libs): Apply ruff format. (#7301)
This commit is contained in:
@@ -21,7 +21,7 @@ from models.account import Account
|
||||
|
||||
|
||||
def run(script):
|
||||
return subprocess.getstatusoutput('source /root/.bashrc && ' + script)
|
||||
return subprocess.getstatusoutput("source /root/.bashrc && " + script)
|
||||
|
||||
|
||||
class TimestampField(fields.Raw):
|
||||
@@ -36,29 +36,29 @@ def email(email):
|
||||
if re.match(pattern, email) is not None:
|
||||
return email
|
||||
|
||||
error = ('{email} is not a valid email.'
|
||||
.format(email=email))
|
||||
error = "{email} is not a valid email.".format(email=email)
|
||||
raise ValueError(error)
|
||||
|
||||
|
||||
def uuid_value(value):
|
||||
if value == '':
|
||||
if value == "":
|
||||
return str(value)
|
||||
|
||||
try:
|
||||
uuid_obj = uuid.UUID(value)
|
||||
return str(uuid_obj)
|
||||
except ValueError:
|
||||
error = ('{value} is not a valid uuid.'
|
||||
.format(value=value))
|
||||
error = "{value} is not a valid uuid.".format(value=value)
|
||||
raise ValueError(error)
|
||||
|
||||
|
||||
def alphanumeric(value: str):
|
||||
# check if the value is alphanumeric and underlined
|
||||
if re.match(r'^[a-zA-Z0-9_]+$', value):
|
||||
if re.match(r"^[a-zA-Z0-9_]+$", value):
|
||||
return value
|
||||
|
||||
raise ValueError(f'{value} is not a valid alphanumeric value')
|
||||
raise ValueError(f"{value} is not a valid alphanumeric value")
|
||||
|
||||
|
||||
def timestamp_value(timestamp):
|
||||
try:
|
||||
@@ -67,31 +67,32 @@ def timestamp_value(timestamp):
|
||||
raise ValueError
|
||||
return int_timestamp
|
||||
except ValueError:
|
||||
error = ('{timestamp} is not a valid timestamp.'
|
||||
.format(timestamp=timestamp))
|
||||
error = "{timestamp} is not a valid timestamp.".format(timestamp=timestamp)
|
||||
raise ValueError(error)
|
||||
|
||||
|
||||
class str_len:
|
||||
""" Restrict input to an integer in a range (inclusive) """
|
||||
"""Restrict input to an integer in a range (inclusive)"""
|
||||
|
||||
def __init__(self, max_length, argument='argument'):
|
||||
def __init__(self, max_length, argument="argument"):
|
||||
self.max_length = max_length
|
||||
self.argument = argument
|
||||
|
||||
def __call__(self, value):
|
||||
length = len(value)
|
||||
if length > self.max_length:
|
||||
error = ('Invalid {arg}: {val}. {arg} cannot exceed length {length}'
|
||||
.format(arg=self.argument, val=value, length=self.max_length))
|
||||
error = "Invalid {arg}: {val}. {arg} cannot exceed length {length}".format(
|
||||
arg=self.argument, val=value, length=self.max_length
|
||||
)
|
||||
raise ValueError(error)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class float_range:
|
||||
""" Restrict input to an float in a range (inclusive) """
|
||||
def __init__(self, low, high, argument='argument'):
|
||||
"""Restrict input to an float in a range (inclusive)"""
|
||||
|
||||
def __init__(self, low, high, argument="argument"):
|
||||
self.low = low
|
||||
self.high = high
|
||||
self.argument = argument
|
||||
@@ -99,15 +100,16 @@ class float_range:
|
||||
def __call__(self, value):
|
||||
value = _get_float(value)
|
||||
if value < self.low or value > self.high:
|
||||
error = ('Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}'
|
||||
.format(arg=self.argument, val=value, lo=self.low, hi=self.high))
|
||||
error = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}".format(
|
||||
arg=self.argument, val=value, lo=self.low, hi=self.high
|
||||
)
|
||||
raise ValueError(error)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class datetime_string:
|
||||
def __init__(self, format, argument='argument'):
|
||||
def __init__(self, format, argument="argument"):
|
||||
self.format = format
|
||||
self.argument = argument
|
||||
|
||||
@@ -115,8 +117,9 @@ class datetime_string:
|
||||
try:
|
||||
datetime.strptime(value, self.format)
|
||||
except ValueError:
|
||||
error = ('Invalid {arg}: {val}. {arg} must be conform to the format {format}'
|
||||
.format(arg=self.argument, val=value, format=self.format))
|
||||
error = "Invalid {arg}: {val}. {arg} must be conform to the format {format}".format(
|
||||
arg=self.argument, val=value, format=self.format
|
||||
)
|
||||
raise ValueError(error)
|
||||
|
||||
return value
|
||||
@@ -126,14 +129,14 @@ def _get_float(value):
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError('{} is not a valid float'.format(value))
|
||||
raise ValueError("{} is not a valid float".format(value))
|
||||
|
||||
|
||||
def timezone(timezone_string):
|
||||
if timezone_string and timezone_string in available_timezones():
|
||||
return timezone_string
|
||||
|
||||
error = ('{timezone_string} is not a valid timezone.'
|
||||
.format(timezone_string=timezone_string))
|
||||
error = "{timezone_string} is not a valid timezone.".format(timezone_string=timezone_string)
|
||||
raise ValueError(error)
|
||||
|
||||
|
||||
@@ -147,8 +150,8 @@ def generate_string(n):
|
||||
|
||||
|
||||
def get_remote_ip(request) -> str:
|
||||
if request.headers.get('CF-Connecting-IP'):
|
||||
return request.headers.get('Cf-Connecting-Ip')
|
||||
if request.headers.get("CF-Connecting-IP"):
|
||||
return request.headers.get("Cf-Connecting-Ip")
|
||||
elif request.headers.getlist("X-Forwarded-For"):
|
||||
return request.headers.getlist("X-Forwarded-For")[0]
|
||||
else:
|
||||
@@ -156,54 +159,45 @@ def get_remote_ip(request) -> str:
|
||||
|
||||
|
||||
def generate_text_hash(text: str) -> str:
|
||||
hash_text = str(text) + 'None'
|
||||
hash_text = str(text) + "None"
|
||||
return sha256(hash_text.encode()).hexdigest()
|
||||
|
||||
|
||||
def compact_generate_response(response: Union[dict, RateLimitGenerator]) -> Response:
|
||||
if isinstance(response, dict):
|
||||
return Response(response=json.dumps(response), status=200, mimetype='application/json')
|
||||
return Response(response=json.dumps(response), status=200, mimetype="application/json")
|
||||
else:
|
||||
|
||||
def generate() -> Generator:
|
||||
yield from response
|
||||
|
||||
return Response(stream_with_context(generate()), status=200,
|
||||
mimetype='text/event-stream')
|
||||
return Response(stream_with_context(generate()), status=200, mimetype="text/event-stream")
|
||||
|
||||
|
||||
class TokenManager:
|
||||
|
||||
@classmethod
|
||||
def generate_token(cls, account: Account, token_type: str, additional_data: dict = None) -> str:
|
||||
old_token = cls._get_current_token_for_account(account.id, token_type)
|
||||
if old_token:
|
||||
if isinstance(old_token, bytes):
|
||||
old_token = old_token.decode('utf-8')
|
||||
old_token = old_token.decode("utf-8")
|
||||
cls.revoke_token(old_token, token_type)
|
||||
|
||||
token = str(uuid.uuid4())
|
||||
token_data = {
|
||||
'account_id': account.id,
|
||||
'email': account.email,
|
||||
'token_type': token_type
|
||||
}
|
||||
token_data = {"account_id": account.id, "email": account.email, "token_type": token_type}
|
||||
if additional_data:
|
||||
token_data.update(additional_data)
|
||||
|
||||
expiry_hours = current_app.config[f'{token_type.upper()}_TOKEN_EXPIRY_HOURS']
|
||||
expiry_hours = current_app.config[f"{token_type.upper()}_TOKEN_EXPIRY_HOURS"]
|
||||
token_key = cls._get_token_key(token, token_type)
|
||||
redis_client.setex(
|
||||
token_key,
|
||||
expiry_hours * 60 * 60,
|
||||
json.dumps(token_data)
|
||||
)
|
||||
redis_client.setex(token_key, expiry_hours * 60 * 60, json.dumps(token_data))
|
||||
|
||||
cls._set_current_token_for_account(account.id, token, token_type, expiry_hours)
|
||||
return token
|
||||
|
||||
@classmethod
|
||||
def _get_token_key(cls, token: str, token_type: str) -> str:
|
||||
return f'{token_type}:token:{token}'
|
||||
return f"{token_type}:token:{token}"
|
||||
|
||||
@classmethod
|
||||
def revoke_token(cls, token: str, token_type: str):
|
||||
@@ -233,7 +227,7 @@ class TokenManager:
|
||||
|
||||
@classmethod
|
||||
def _get_account_token_key(cls, account_id: str, token_type: str) -> str:
|
||||
return f'{token_type}:account:{account_id}'
|
||||
return f"{token_type}:account:{account_id}"
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
@@ -250,7 +244,7 @@ class RateLimiter:
|
||||
current_time = int(time.time())
|
||||
window_start_time = current_time - self.time_window
|
||||
|
||||
redis_client.zremrangebyscore(key, '-inf', window_start_time)
|
||||
redis_client.zremrangebyscore(key, "-inf", window_start_time)
|
||||
attempts = redis_client.zcard(key)
|
||||
|
||||
if attempts and int(attempts) >= self.max_attempts:
|
||||
|
Reference in New Issue
Block a user