"""ClickZetta Volume file lifecycle management This module provides file lifecycle management features including version control, automatic cleanup, backup and restore. Supports complete lifecycle management for knowledge base files. """ import json import logging from dataclasses import asdict, dataclass from datetime import datetime from enum import Enum from typing import Any, Optional logger = logging.getLogger(__name__) class FileStatus(Enum): """File status enumeration""" ACTIVE = "active" # Active status ARCHIVED = "archived" # Archived DELETED = "deleted" # Deleted (soft delete) BACKUP = "backup" # Backup file @dataclass class FileMetadata: """File metadata""" filename: str size: int | None created_at: datetime modified_at: datetime version: int | None status: FileStatus checksum: Optional[str] = None tags: Optional[dict[str, str]] = None parent_version: Optional[int] = None def to_dict(self) -> dict: """Convert to dictionary format""" data = asdict(self) data["created_at"] = self.created_at.isoformat() data["modified_at"] = self.modified_at.isoformat() data["status"] = self.status.value return data @classmethod def from_dict(cls, data: dict) -> "FileMetadata": """Create instance from dictionary""" data = data.copy() data["created_at"] = datetime.fromisoformat(data["created_at"]) data["modified_at"] = datetime.fromisoformat(data["modified_at"]) data["status"] = FileStatus(data["status"]) return cls(**data) class FileLifecycleManager: """File lifecycle manager""" def __init__(self, storage, dataset_id: Optional[str] = None): """Initialize lifecycle manager Args: storage: ClickZetta Volume storage instance dataset_id: Dataset ID (for Table Volume) """ self._storage = storage self._dataset_id = dataset_id self._metadata_file = ".dify_file_metadata.json" self._version_prefix = ".versions/" self._backup_prefix = ".backups/" self._deleted_prefix = ".deleted/" # Get permission manager (if exists) self._permission_manager: Optional[Any] = getattr(storage, "_permission_manager", None) def save_with_lifecycle(self, filename: str, data: bytes, tags: Optional[dict[str, str]] = None) -> FileMetadata: """Save file and manage lifecycle Args: filename: File name data: File content tags: File tags Returns: File metadata """ # Permission check if not self._check_permission(filename, "save"): from .volume_permissions import VolumePermissionError raise VolumePermissionError( f"Permission denied for lifecycle save operation on file: {filename}", operation="save", volume_type=getattr(self._storage, "_config", {}).get("volume_type", "unknown"), dataset_id=self._dataset_id, ) try: # 1. Check if old version exists metadata_dict = self._load_metadata() current_metadata = metadata_dict.get(filename) # 2. If old version exists, create version backup if current_metadata: self._create_version_backup(filename, current_metadata) # 3. Calculate file information now = datetime.now() checksum = self._calculate_checksum(data) new_version = (current_metadata["version"] + 1) if current_metadata else 1 # 4. Save new file self._storage.save(filename, data) # 5. Create metadata created_at = now parent_version = None if current_metadata: # If created_at is string, convert to datetime if isinstance(current_metadata["created_at"], str): created_at = datetime.fromisoformat(current_metadata["created_at"]) else: created_at = current_metadata["created_at"] parent_version = current_metadata["version"] file_metadata = FileMetadata( filename=filename, size=len(data), created_at=created_at, modified_at=now, version=new_version, status=FileStatus.ACTIVE, checksum=checksum, tags=tags or {}, parent_version=parent_version, ) # 6. Update metadata metadata_dict[filename] = file_metadata.to_dict() self._save_metadata(metadata_dict) logger.info("File %s saved with lifecycle management, version %s", filename, new_version) return file_metadata except Exception: logger.exception("Failed to save file with lifecycle") raise def get_file_metadata(self, filename: str) -> Optional[FileMetadata]: """Get file metadata Args: filename: File name Returns: File metadata, returns None if not exists """ try: metadata_dict = self._load_metadata() if filename in metadata_dict: return FileMetadata.from_dict(metadata_dict[filename]) return None except Exception: logger.exception("Failed to get file metadata for %s", filename) return None def list_file_versions(self, filename: str) -> list[FileMetadata]: """List all versions of a file Args: filename: File name Returns: File version list, sorted by version number """ try: versions = [] # Get current version current_metadata = self.get_file_metadata(filename) if current_metadata: versions.append(current_metadata) # Get historical versions try: version_files = self._storage.scan(self._dataset_id or "", files=True) for file_path in version_files: if file_path.startswith(f"{self._version_prefix}{filename}.v"): # Parse version number version_str = file_path.split(".v")[-1].split(".")[0] try: _ = int(version_str) # Simplified processing here, should actually read metadata from version file # Temporarily create basic metadata information except ValueError: continue except: # If cannot scan version files, only return current version pass return sorted(versions, key=lambda x: x.version or 0, reverse=True) except Exception: logger.exception("Failed to list file versions for %s", filename) return [] def restore_version(self, filename: str, version: int) -> bool: """Restore file to specified version Args: filename: File name version: Version number to restore Returns: Whether restore succeeded """ try: version_filename = f"{self._version_prefix}{filename}.v{version}" # Check if version file exists if not self._storage.exists(version_filename): logger.warning("Version %s of %s not found", version, filename) return False # Read version file content version_data = self._storage.load_once(version_filename) # Save current version as backup current_metadata = self.get_file_metadata(filename) if current_metadata: self._create_version_backup(filename, current_metadata.to_dict()) # Restore file self.save_with_lifecycle(filename, version_data, {"restored_from": str(version)}) return True except Exception: logger.exception("Failed to restore %s to version %s", filename, version) return False def archive_file(self, filename: str) -> bool: """Archive file Args: filename: File name Returns: Whether archive succeeded """ # Permission check if not self._check_permission(filename, "archive"): logger.warning("Permission denied for archive operation on file: %s", filename) return False try: # Update file status to archived metadata_dict = self._load_metadata() if filename not in metadata_dict: logger.warning("File %s not found in metadata", filename) return False metadata_dict[filename]["status"] = FileStatus.ARCHIVED.value metadata_dict[filename]["modified_at"] = datetime.now().isoformat() self._save_metadata(metadata_dict) logger.info("File %s archived successfully", filename) return True except Exception: logger.exception("Failed to archive file %s", filename) return False def soft_delete_file(self, filename: str) -> bool: """Soft delete file (move to deleted directory) Args: filename: File name Returns: Whether delete succeeded """ # Permission check if not self._check_permission(filename, "delete"): logger.warning("Permission denied for soft delete operation on file: %s", filename) return False try: # Check if file exists if not self._storage.exists(filename): logger.warning("File %s not found", filename) return False # Read file content file_data = self._storage.load_once(filename) # Move to deleted directory deleted_filename = f"{self._deleted_prefix}{filename}.{datetime.now().strftime('%Y%m%d_%H%M%S')}" self._storage.save(deleted_filename, file_data) # Delete original file self._storage.delete(filename) # Update metadata metadata_dict = self._load_metadata() if filename in metadata_dict: metadata_dict[filename]["status"] = FileStatus.DELETED.value metadata_dict[filename]["modified_at"] = datetime.now().isoformat() self._save_metadata(metadata_dict) logger.info("File %s soft deleted successfully", filename) return True except Exception: logger.exception("Failed to soft delete file %s", filename) return False def cleanup_old_versions(self, max_versions: int = 5, max_age_days: int = 30) -> int: """Cleanup old version files Args: max_versions: Maximum number of versions to keep max_age_days: Maximum retention days for version files Returns: Number of files cleaned """ try: cleaned_count = 0 # Get all version files try: all_files = self._storage.scan(self._dataset_id or "", files=True) version_files = [f for f in all_files if f.startswith(self._version_prefix)] # Group by file file_versions: dict[str, list[tuple[int, str]]] = {} for version_file in version_files: # Parse filename and version parts = version_file[len(self._version_prefix) :].split(".v") if len(parts) >= 2: base_filename = parts[0] version_part = parts[1].split(".")[0] try: version_num = int(version_part) if base_filename not in file_versions: file_versions[base_filename] = [] file_versions[base_filename].append((version_num, version_file)) except ValueError: continue # Cleanup old versions for each file for base_filename, versions in file_versions.items(): # Sort by version number versions.sort(key=lambda x: x[0], reverse=True) # Keep the newest max_versions versions, delete the rest if len(versions) > max_versions: to_delete = versions[max_versions:] for version_num, version_file in to_delete: self._storage.delete(version_file) cleaned_count += 1 logger.debug("Cleaned old version: %s", version_file) logger.info("Cleaned %d old version files", cleaned_count) except Exception as e: logger.warning("Could not scan for version files: %s", e) return cleaned_count except Exception: logger.exception("Failed to cleanup old versions") return 0 def get_storage_statistics(self) -> dict[str, Any]: """Get storage statistics Returns: Storage statistics dictionary """ try: metadata_dict = self._load_metadata() stats: dict[str, Any] = { "total_files": len(metadata_dict), "active_files": 0, "archived_files": 0, "deleted_files": 0, "total_size": 0, "versions_count": 0, "oldest_file": None, "newest_file": None, } oldest_date = None newest_date = None for filename, metadata in metadata_dict.items(): file_meta = FileMetadata.from_dict(metadata) # Count file status if file_meta.status == FileStatus.ACTIVE: stats["active_files"] = (stats["active_files"] or 0) + 1 elif file_meta.status == FileStatus.ARCHIVED: stats["archived_files"] = (stats["archived_files"] or 0) + 1 elif file_meta.status == FileStatus.DELETED: stats["deleted_files"] = (stats["deleted_files"] or 0) + 1 # Count size stats["total_size"] = (stats["total_size"] or 0) + (file_meta.size or 0) # Count versions stats["versions_count"] = (stats["versions_count"] or 0) + (file_meta.version or 0) # Find newest and oldest files if oldest_date is None or file_meta.created_at < oldest_date: oldest_date = file_meta.created_at stats["oldest_file"] = filename if newest_date is None or file_meta.modified_at > newest_date: newest_date = file_meta.modified_at stats["newest_file"] = filename return stats except Exception: logger.exception("Failed to get storage statistics") return {} def _create_version_backup(self, filename: str, metadata: dict): """Create version backup""" try: # Read current file content current_data = self._storage.load_once(filename) # Save as version file version_filename = f"{self._version_prefix}{filename}.v{metadata['version']}" self._storage.save(version_filename, current_data) logger.debug("Created version backup: %s", version_filename) except Exception as e: logger.warning("Failed to create version backup for %s: %s", filename, e) def _load_metadata(self) -> dict[str, Any]: """Load metadata file""" try: if self._storage.exists(self._metadata_file): metadata_content = self._storage.load_once(self._metadata_file) result = json.loads(metadata_content.decode("utf-8")) return dict(result) if result else {} else: return {} except Exception as e: logger.warning("Failed to load metadata: %s", e) return {} def _save_metadata(self, metadata_dict: dict): """Save metadata file""" try: metadata_content = json.dumps(metadata_dict, indent=2, ensure_ascii=False) self._storage.save(self._metadata_file, metadata_content.encode("utf-8")) logger.debug("Metadata saved successfully") except Exception: logger.exception("Failed to save metadata") raise def _calculate_checksum(self, data: bytes) -> str: """Calculate file checksum""" import hashlib return hashlib.md5(data).hexdigest() def _check_permission(self, filename: str, operation: str) -> bool: """Check file operation permission Args: filename: File name operation: Operation type Returns: True if permission granted, False otherwise """ # If no permission manager, allow by default if not self._permission_manager: return True try: # Map operation type to permission operation_mapping = { "save": "save", "load": "load_once", "delete": "delete", "archive": "delete", # Archive requires delete permission "restore": "save", # Restore requires write permission "cleanup": "delete", # Cleanup requires delete permission "read": "load_once", "write": "save", } mapped_operation = operation_mapping.get(operation, operation) # Check permission result = self._permission_manager.validate_operation(mapped_operation, self._dataset_id) return bool(result) except Exception: logger.exception("Permission check failed for %s operation %s", filename, operation) # Safe default: deny access when permission check fails return False