feat: integrate opendal storage (#11508)
Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
@@ -1,31 +1,43 @@
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from collections.abc import Callable, Generator, Mapping
|
||||
from typing import Union
|
||||
|
||||
from flask import Flask
|
||||
|
||||
from configs import dify_config
|
||||
from configs.middleware.storage.opendal_storage_config import OpenDALScheme
|
||||
from dify_app import DifyApp
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
from extensions.storage.storage_type import StorageType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Storage:
|
||||
def __init__(self):
|
||||
self.storage_runner = None
|
||||
|
||||
def init_app(self, app: Flask):
|
||||
storage_factory = self.get_storage_factory(dify_config.STORAGE_TYPE)
|
||||
with app.app_context():
|
||||
self.storage_runner = storage_factory()
|
||||
|
||||
@staticmethod
|
||||
def get_storage_factory(storage_type: str) -> type[BaseStorage]:
|
||||
def get_storage_factory(storage_type: str) -> Callable[[], BaseStorage]:
|
||||
match storage_type:
|
||||
case StorageType.S3:
|
||||
from extensions.storage.aws_s3_storage import AwsS3Storage
|
||||
from extensions.storage.opendal_storage import OpenDALStorage
|
||||
|
||||
return AwsS3Storage
|
||||
kwargs = _load_s3_storage_kwargs()
|
||||
return lambda: OpenDALStorage(scheme=OpenDALScheme.S3, **kwargs)
|
||||
case StorageType.OPENDAL:
|
||||
from extensions.storage.opendal_storage import OpenDALStorage
|
||||
|
||||
scheme = OpenDALScheme(dify_config.STORAGE_OPENDAL_SCHEME)
|
||||
kwargs = _load_opendal_storage_kwargs(scheme)
|
||||
return lambda: OpenDALStorage(scheme=scheme, **kwargs)
|
||||
case StorageType.LOCAL:
|
||||
from extensions.storage.opendal_storage import OpenDALStorage
|
||||
|
||||
kwargs = _load_local_storage_kwargs()
|
||||
return lambda: OpenDALStorage(scheme=OpenDALScheme.FS, **kwargs)
|
||||
case StorageType.AZURE_BLOB:
|
||||
from extensions.storage.azure_blob_storage import AzureBlobStorage
|
||||
|
||||
@@ -62,16 +74,14 @@ class Storage:
|
||||
from extensions.storage.supabase_storage import SupabaseStorage
|
||||
|
||||
return SupabaseStorage
|
||||
case StorageType.LOCAL | _:
|
||||
from extensions.storage.local_fs_storage import LocalFsStorage
|
||||
|
||||
return LocalFsStorage
|
||||
case _:
|
||||
raise ValueError(f"Unsupported storage type {storage_type}")
|
||||
|
||||
def save(self, filename, data):
|
||||
try:
|
||||
self.storage_runner.save(filename, data)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to save file {filename}")
|
||||
logger.exception(f"Failed to save file {filename}")
|
||||
raise e
|
||||
|
||||
def load(self, filename: str, /, *, stream: bool = False) -> Union[bytes, Generator]:
|
||||
@@ -81,45 +91,120 @@ class Storage:
|
||||
else:
|
||||
return self.load_once(filename)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to load file {filename}")
|
||||
logger.exception(f"Failed to load file {filename}")
|
||||
raise e
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
try:
|
||||
return self.storage_runner.load_once(filename)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to load_once file {filename}")
|
||||
logger.exception(f"Failed to load_once file {filename}")
|
||||
raise e
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
try:
|
||||
return self.storage_runner.load_stream(filename)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to load_stream file {filename}")
|
||||
logger.exception(f"Failed to load_stream file {filename}")
|
||||
raise e
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
try:
|
||||
self.storage_runner.download(filename, target_filepath)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to download file {filename}")
|
||||
logger.exception(f"Failed to download file {filename}")
|
||||
raise e
|
||||
|
||||
def exists(self, filename):
|
||||
try:
|
||||
return self.storage_runner.exists(filename)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to check file exists {filename}")
|
||||
logger.exception(f"Failed to check file exists {filename}")
|
||||
raise e
|
||||
|
||||
def delete(self, filename):
|
||||
try:
|
||||
return self.storage_runner.delete(filename)
|
||||
except Exception as e:
|
||||
logging.exception(f"Failed to delete file {filename}")
|
||||
logger.exception(f"Failed to delete file {filename}")
|
||||
raise e
|
||||
|
||||
|
||||
def _load_s3_storage_kwargs() -> Mapping[str, str]:
|
||||
"""
|
||||
Load the kwargs for S3 storage based on dify_config.
|
||||
Handles special cases like AWS managed IAM and R2.
|
||||
"""
|
||||
kwargs = {
|
||||
"root": "/",
|
||||
"bucket": dify_config.S3_BUCKET_NAME,
|
||||
"endpoint": dify_config.S3_ENDPOINT,
|
||||
"access_key_id": dify_config.S3_ACCESS_KEY,
|
||||
"secret_access_key": dify_config.S3_SECRET_KEY,
|
||||
"region": dify_config.S3_REGION,
|
||||
}
|
||||
kwargs = {k: v for k, v in kwargs.items() if isinstance(v, str)}
|
||||
|
||||
# For AWS managed IAM
|
||||
if dify_config.S3_USE_AWS_MANAGED_IAM:
|
||||
from extensions.storage.opendal_storage import S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS
|
||||
|
||||
logger.debug("Using AWS managed IAM role for S3")
|
||||
kwargs = {**kwargs, **{k: v for k, v in S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS.items() if k not in kwargs}}
|
||||
|
||||
# For Cloudflare R2
|
||||
if kwargs.get("endpoint"):
|
||||
from extensions.storage.opendal_storage import S3_R2_COMPATIBLE_KWARGS, is_r2_endpoint
|
||||
|
||||
if is_r2_endpoint(kwargs["endpoint"]):
|
||||
logger.debug("Using R2 for OpenDAL S3")
|
||||
kwargs = {**kwargs, **{k: v for k, v in S3_R2_COMPATIBLE_KWARGS.items() if k not in kwargs}}
|
||||
|
||||
return kwargs
|
||||
|
||||
|
||||
def _load_local_storage_kwargs() -> Mapping[str, str]:
|
||||
"""
|
||||
Load the kwargs for local storage based on dify_config.
|
||||
"""
|
||||
return {
|
||||
"root": dify_config.STORAGE_LOCAL_PATH,
|
||||
}
|
||||
|
||||
|
||||
def _load_opendal_storage_kwargs(scheme: OpenDALScheme) -> Mapping[str, str]:
|
||||
"""
|
||||
Load the kwargs for OpenDAL storage based on the given scheme.
|
||||
"""
|
||||
match scheme:
|
||||
case OpenDALScheme.FS:
|
||||
kwargs = {
|
||||
"root": dify_config.OPENDAL_FS_ROOT,
|
||||
}
|
||||
case OpenDALScheme.S3:
|
||||
# Load OpenDAL S3-related configs
|
||||
kwargs = {
|
||||
"root": dify_config.OPENDAL_S3_ROOT,
|
||||
"bucket": dify_config.OPENDAL_S3_BUCKET,
|
||||
"endpoint": dify_config.OPENDAL_S3_ENDPOINT,
|
||||
"access_key_id": dify_config.OPENDAL_S3_ACCESS_KEY_ID,
|
||||
"secret_access_key": dify_config.OPENDAL_S3_SECRET_ACCESS_KEY,
|
||||
"region": dify_config.OPENDAL_S3_REGION,
|
||||
}
|
||||
|
||||
# For Cloudflare R2
|
||||
if kwargs.get("endpoint"):
|
||||
from extensions.storage.opendal_storage import S3_R2_COMPATIBLE_KWARGS, is_r2_endpoint
|
||||
|
||||
if is_r2_endpoint(kwargs["endpoint"]):
|
||||
logger.debug("Using R2 for OpenDAL S3")
|
||||
kwargs = {**kwargs, **{k: v for k, v in S3_R2_COMPATIBLE_KWARGS.items() if k not in kwargs}}
|
||||
case _:
|
||||
logger.warning(f"Unrecognized OpenDAL scheme: {scheme}, will fall back to default.")
|
||||
kwargs = {}
|
||||
return kwargs
|
||||
|
||||
|
||||
storage = Storage()
|
||||
|
||||
|
||||
|
@@ -7,9 +7,6 @@ from collections.abc import Generator
|
||||
class BaseStorage(ABC):
|
||||
"""Interface for file storage."""
|
||||
|
||||
def __init__(self): # noqa: B027
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save(self, filename, data):
|
||||
raise NotImplementedError
|
||||
|
@@ -1,62 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from configs import dify_config
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
|
||||
class LocalFsStorage(BaseStorage):
|
||||
"""Implementation for local filesystem storage."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
folder = dify_config.STORAGE_LOCAL_PATH
|
||||
if not os.path.isabs(folder):
|
||||
folder = os.path.join(current_app.root_path, folder)
|
||||
self.folder = folder
|
||||
|
||||
def _build_filepath(self, filename: str) -> str:
|
||||
"""Build the full file path based on the folder and filename."""
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
return self.folder + filename
|
||||
else:
|
||||
return self.folder + "/" + filename
|
||||
|
||||
def save(self, filename, data):
|
||||
filepath = self._build_filepath(filename)
|
||||
folder = os.path.dirname(filepath)
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
Path(os.path.join(os.getcwd(), filepath)).write_bytes(data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
return Path(filepath).read_bytes()
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
with open(filepath, "rb") as f:
|
||||
while chunk := f.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
shutil.copyfile(filepath, target_filepath)
|
||||
|
||||
def exists(self, filename):
|
||||
filepath = self._build_filepath(filename)
|
||||
return os.path.exists(filepath)
|
||||
|
||||
def delete(self, filename):
|
||||
filepath = self._build_filepath(filename)
|
||||
if os.path.exists(filepath):
|
||||
os.remove(filepath)
|
66
api/extensions/storage/opendal_storage.py
Normal file
66
api/extensions/storage/opendal_storage.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import opendal
|
||||
|
||||
from configs.middleware.storage.opendal_storage_config import OpenDALScheme
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
S3_R2_HOSTNAME = "r2.cloudflarestorage.com"
|
||||
S3_R2_COMPATIBLE_KWARGS = {
|
||||
"delete_max_size": "700",
|
||||
"disable_stat_with_override": "true",
|
||||
"region": "auto",
|
||||
}
|
||||
S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS = {
|
||||
"server_side_encryption": "aws:kms",
|
||||
}
|
||||
|
||||
|
||||
def is_r2_endpoint(endpoint: str) -> bool:
|
||||
if not endpoint:
|
||||
return False
|
||||
|
||||
parsed_url = urlparse(endpoint)
|
||||
return bool(parsed_url.hostname and parsed_url.hostname.endswith(S3_R2_HOSTNAME))
|
||||
|
||||
|
||||
class OpenDALStorage(BaseStorage):
|
||||
def __init__(self, scheme: OpenDALScheme, **kwargs):
|
||||
if scheme == OpenDALScheme.FS:
|
||||
Path(kwargs["root"]).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.op = opendal.Operator(scheme=scheme, **kwargs)
|
||||
|
||||
def save(self, filename: str, data: bytes) -> None:
|
||||
self.op.write(path=filename, bs=data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
return self.op.read(path=filename)
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
batch_size = 4096
|
||||
file = self.op.open(path=filename, mode="rb")
|
||||
while chunk := file.read(batch_size):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename: str, target_filepath: str):
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
with Path(target_filepath).open("wb") as f:
|
||||
f.write(self.op.read(path=filename))
|
||||
|
||||
def exists(self, filename: str):
|
||||
return self.op.stat(path=filename).mode.is_file()
|
||||
|
||||
def delete(self, filename: str):
|
||||
if self.exists(filename):
|
||||
self.op.delete(path=filename)
|
@@ -9,6 +9,7 @@ class StorageType(StrEnum):
|
||||
HUAWEI_OBS = "huawei-obs"
|
||||
LOCAL = "local"
|
||||
OCI_STORAGE = "oci-storage"
|
||||
OPENDAL = "opendal"
|
||||
S3 = "s3"
|
||||
TENCENT_COS = "tencent-cos"
|
||||
VOLCENGINE_TOS = "volcengine-tos"
|
||||
|
Reference in New Issue
Block a user