chore: refurbish Python code by applying refurb linter rules (#8296)
This commit is contained in:
@@ -12,7 +12,7 @@ import mimetypes
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Generator, Iterable, Mapping
|
||||
from io import BufferedReader, BytesIO
|
||||
from pathlib import PurePath
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, model_validator
|
||||
@@ -56,8 +56,7 @@ class Blob(BaseModel):
|
||||
def as_string(self) -> str:
|
||||
"""Read data as a string."""
|
||||
if self.data is None and self.path:
|
||||
with open(str(self.path), encoding=self.encoding) as f:
|
||||
return f.read()
|
||||
return Path(str(self.path)).read_text(encoding=self.encoding)
|
||||
elif isinstance(self.data, bytes):
|
||||
return self.data.decode(self.encoding)
|
||||
elif isinstance(self.data, str):
|
||||
@@ -72,8 +71,7 @@ class Blob(BaseModel):
|
||||
elif isinstance(self.data, str):
|
||||
return self.data.encode(self.encoding)
|
||||
elif self.data is None and self.path:
|
||||
with open(str(self.path), "rb") as f:
|
||||
return f.read()
|
||||
return Path(str(self.path)).read_bytes()
|
||||
else:
|
||||
raise ValueError(f"Unable to get bytes for blob {self}")
|
||||
|
||||
|
@@ -68,8 +68,7 @@ class ExtractProcessor:
|
||||
suffix = "." + re.search(r"\.(\w+)$", filename).group(1)
|
||||
|
||||
file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
|
||||
with open(file_path, "wb") as file:
|
||||
file.write(response.content)
|
||||
Path(file_path).write_bytes(response.content)
|
||||
extract_setting = ExtractSetting(datasource_type="upload_file", document_model="text_model")
|
||||
if return_text:
|
||||
delimiter = "\n"
|
||||
@@ -111,7 +110,7 @@ class ExtractProcessor:
|
||||
)
|
||||
elif file_extension in [".htm", ".html"]:
|
||||
extractor = HtmlExtractor(file_path)
|
||||
elif file_extension in [".docx"]:
|
||||
elif file_extension == ".docx":
|
||||
extractor = WordExtractor(file_path, upload_file.tenant_id, upload_file.created_by)
|
||||
elif file_extension == ".csv":
|
||||
extractor = CSVExtractor(file_path, autodetect_encoding=True)
|
||||
@@ -143,7 +142,7 @@ class ExtractProcessor:
|
||||
extractor = MarkdownExtractor(file_path, autodetect_encoding=True)
|
||||
elif file_extension in [".htm", ".html"]:
|
||||
extractor = HtmlExtractor(file_path)
|
||||
elif file_extension in [".docx"]:
|
||||
elif file_extension == ".docx":
|
||||
extractor = WordExtractor(file_path, upload_file.tenant_id, upload_file.created_by)
|
||||
elif file_extension == ".csv":
|
||||
extractor = CSVExtractor(file_path, autodetect_encoding=True)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
"""Document loader helpers."""
|
||||
|
||||
import concurrent.futures
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, Optional, cast
|
||||
|
||||
|
||||
@@ -28,8 +29,7 @@ def detect_file_encodings(file_path: str, timeout: int = 5) -> list[FileEncoding
|
||||
import chardet
|
||||
|
||||
def read_and_detect(file_path: str) -> list[dict]:
|
||||
with open(file_path, "rb") as f:
|
||||
rawdata = f.read()
|
||||
rawdata = Path(file_path).read_bytes()
|
||||
return cast(list[dict], chardet.detect_all(rawdata))
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
|
@@ -1,6 +1,7 @@
|
||||
"""Abstract interface for document loader implementations."""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional, cast
|
||||
|
||||
from core.rag.extractor.extractor_base import BaseExtractor
|
||||
@@ -102,15 +103,13 @@ class MarkdownExtractor(BaseExtractor):
|
||||
"""Parse file into tuples."""
|
||||
content = ""
|
||||
try:
|
||||
with open(filepath, encoding=self._encoding) as f:
|
||||
content = f.read()
|
||||
content = Path(filepath).read_text(encoding=self._encoding)
|
||||
except UnicodeDecodeError as e:
|
||||
if self._autodetect_encoding:
|
||||
detected_encodings = detect_file_encodings(filepath)
|
||||
for encoding in detected_encodings:
|
||||
try:
|
||||
with open(filepath, encoding=encoding.encoding) as f:
|
||||
content = f.read()
|
||||
content = Path(filepath).read_text(encoding=encoding.encoding)
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
@@ -1,5 +1,6 @@
|
||||
"""Abstract interface for document loader implementations."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from core.rag.extractor.extractor_base import BaseExtractor
|
||||
@@ -25,15 +26,13 @@ class TextExtractor(BaseExtractor):
|
||||
"""Load from file path."""
|
||||
text = ""
|
||||
try:
|
||||
with open(self._file_path, encoding=self._encoding) as f:
|
||||
text = f.read()
|
||||
text = Path(self._file_path).read_text(encoding=self._encoding)
|
||||
except UnicodeDecodeError as e:
|
||||
if self._autodetect_encoding:
|
||||
detected_encodings = detect_file_encodings(self._file_path)
|
||||
for encoding in detected_encodings:
|
||||
try:
|
||||
with open(self._file_path, encoding=encoding.encoding) as f:
|
||||
text = f.read()
|
||||
text = Path(self._file_path).read_text(encoding=encoding.encoding)
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
@@ -153,7 +153,7 @@ class WordExtractor(BaseExtractor):
|
||||
if col_index >= total_cols:
|
||||
break
|
||||
cell_content = self._parse_cell(cell, image_map).strip()
|
||||
cell_colspan = cell.grid_span if cell.grid_span else 1
|
||||
cell_colspan = cell.grid_span or 1
|
||||
for i in range(cell_colspan):
|
||||
if col_index + i < total_cols:
|
||||
row_cells[col_index + i] = cell_content if i == 0 else ""
|
||||
|
Reference in New Issue
Block a user