""" title: File System author: Jojomaw author_url: https://github.com/jyapayne git_url: https://github.com/jyapayne/openwebui-filesystem.git description: This tool provides a comprehensive set of file system operations, including file and directory management, search, and analysis. required_open_webui_version: 0.1.0 requirements: version: 0.2.0 licence: MIT """ import os import json import shutil import logging import hashlib import datetime import zipfile import tarfile import stat as pystat import tempfile import mimetypes import base64 import asyncio import aiofiles import aiofiles.os import aiohttp from collections import defaultdict from pathlib import Path from pydantic import BaseModel, Field from typing import Dict, Any, List, Optional, Union # Use module logger instead of global basicConfig logger = logging.getLogger(__name__) class Tools: class Valves(BaseModel): root_restriction_directory: str = Field( default=os.getcwd(), description="Which folder the filesystem tool can access", ) verbose_logging: bool = Field( default=False, description="Enable verbose logging for filesystem operations", ) max_search_file_size: int = Field( default=10 * 1024 * 1024, # 10MB description="Maximum file size to search through (bytes)", ) return_relative_paths: bool = Field( default=True, description="Return paths relative to restriction root instead of absolute paths", ) openrouter_api_key: str = Field( default="", description="OpenRouter API key for file uploads to models", ) openrouter_model: str = Field( default="google/gemini-2.5-flash", description="Default OpenRouter model for file processing", ) openrouter_base_url: str = Field( default="https://openrouter.ai/api/v1", description="OpenRouter API base URL", ) spoof_directory_root: str = Field( default="", description="Display directory root for results (defaults to root_restriction_directory if empty)", ) debug: bool = Field( default=False, description="Enable debug mode to show internal information like actual paths and spoofing details", ) def __init__(self, base_path: Optional[str] = None): self.valves = self.Valves() # Use base_path as restriction root if provided, otherwise use valves default if base_path: self.valves.root_restriction_directory = base_path self.tags = defaultdict(list) # Will be keyed by resolved path self.versions = defaultdict(list) # Will be keyed by resolved path # Configure logging level based on valves if self.valves.verbose_logging: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) async def _emit_status(self, __event_emitter__, description: str, done: bool = False, **extra: Any) -> None: """Safely emit a status event if an emitter is provided. The emitter expects a payload of the form: {"type": "status", "data": {"description": str, "done": bool, ...}} """ if __event_emitter__ is None: return try: payload: Dict[str, Any] = { "type": "status", "data": {"description": description, "done": done}, } if extra: payload["data"].update(extra) await __event_emitter__(payload) except Exception: # Never allow status emission failures to interrupt tool execution pass def _result(self, ok: bool, action: str = "", subject_type: str = "", **kwargs) -> Dict[str, Any]: """Helper function to create consistent result format.""" result = {"ok": ok, "action": action, "subject_type": subject_type} result.update(kwargs) # Add debug information if debug mode is enabled if self.valves.debug: actual_root = Path(self.valves.root_restriction_directory).resolve() display_root = self.valves.spoof_directory_root if self.valves.spoof_directory_root else self.valves.root_restriction_directory result.update({ "debug_info": { "actual_path": str(actual_root), "display_path": display_root, "is_spoofed": display_root != str(actual_root), "spoof_directory_root": self.valves.spoof_directory_root, "root_restriction_directory": self.valves.root_restriction_directory } }) return result def _resolve_under_restriction(self, path: str) -> str: """Resolve path safely under the restriction root to prevent traversal outside.""" base = Path(self.valves.root_restriction_directory).resolve() p = (base / path).resolve() if not str(p).startswith(str(base)): raise ValueError(f"Path escapes restriction root: {path}") return str(p) def _get_relative_path(self, absolute_path: str) -> str: """Format a path for display, honoring spoof directory regardless of relative flag. - When valves.return_relative_paths is True: return a path scoped to the display root. If a spoof directory is configured, the returned path is shown under that root. Otherwise, it is relative to the restriction root. - When valves.return_relative_paths is False: return an absolute path, but if a spoof directory is configured, replace the restriction root prefix with the spoof root so that paths appear under the spoof directory. """ try: base = Path(self.valves.root_restriction_directory).resolve() display_root = self.valves.spoof_directory_root or self.valves.root_restriction_directory spoof_base = Path(display_root).resolve() resolved_absolute = str(Path(absolute_path).resolve()) # Compute the path relative to the actual restriction root (may raise ValueError) relative_from_actual = str(Path(resolved_absolute).relative_to(base)) if self.valves.return_relative_paths: # Show as if rooted at the display root. If spoof == base, keep purely relative. if str(spoof_base) == str(base): return relative_from_actual return str(spoof_base / relative_from_actual) # Absolute mode: return absolute path, mapped to spoof root when spoofing if str(spoof_base) == str(base): return resolved_absolute return str(spoof_base / relative_from_actual) except ValueError: # If the path is not under the restriction root, return as-is return absolute_path async def _ensure_parent_dir(self, file_path: str) -> None: """Ensure parent directory exists for the given file path.""" parent_dir = os.path.dirname(file_path) if parent_dir: await aiofiles.os.makedirs(parent_dir, exist_ok=True) async def _atomic_write(self, file_path: str, content: str, encoding: str = "utf-8") -> None: """Write content to file atomically using temporary file.""" await self._ensure_parent_dir(file_path) temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path), text=True) try: # Close the file descriptor immediately since we'll use aiofiles to write os.close(temp_fd) async with aiofiles.open(temp_path, 'w', encoding=encoding) as temp_file: await temp_file.write(content) await aiofiles.os.replace(temp_path, file_path) except Exception: # Clean up temp file if something went wrong try: await aiofiles.os.unlink(temp_path) except OSError: pass raise async def _is_binary_file(self, file_path: str) -> bool: """Check if file is likely binary by examining first chunk.""" try: async with aiofiles.open(file_path, 'rb') as f: chunk = await f.read(1024) return b'\0' in chunk except Exception: return True # Assume binary if can't read def _sanitize_archive_path(self, member_path: str, output_dir: str) -> str: """Sanitize archive member path to prevent directory traversal.""" # Remove leading slashes and resolve any .. components clean_path = os.path.normpath(member_path.lstrip('/')) if clean_path.startswith('..') or os.path.isabs(clean_path): raise ValueError(f"Unsafe archive member path: {member_path}") # Ensure the final path is under output directory final_path = os.path.join(output_dir, clean_path) if not os.path.abspath(final_path).startswith(os.path.abspath(output_dir)): raise ValueError(f"Archive member would extract outside target directory: {member_path}") return final_path async def cwd(self) -> Dict[str, Any]: """ Get the current working directory (returns the spoofed directory root for display). :return: The current working directory path as it should be displayed to users. """ try: # Use spoof directory root for display if set, otherwise use restriction root display_root = self.valves.spoof_directory_root if self.valves.spoof_directory_root else self.valves.root_restriction_directory # Ensure the display root path exists (for validation) actual_root = Path(self.valves.root_restriction_directory).resolve() if not await aiofiles.os.path.exists(str(actual_root)): return self._result( False, action="cwd", subject_type="directory", error="Current working directory does not exist", path=display_root ) # Return the spoofed directory as the current working directory logger.info(f"Current working directory: {display_root}") return self._result( True, action="cwd", subject_type="directory", path=display_root ) except Exception as e: return self._result( False, action="cwd", subject_type="directory", error=f"Failed to get current working directory: {str(e)}" ) async def create_folder(self, folder_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Create a new folder. :param folder_name: The name of the folder to create. :param base_dir: The base directory where the folder should be created. :return: A success message if the folder is created successfully. """ try: base_path = base_dir if base_dir else "." folder_path = self._resolve_under_restriction(os.path.join(base_path, folder_name)) if not await aiofiles.os.path.exists(folder_path): await aiofiles.os.makedirs(folder_path) logger.info(f"Folder '{folder_name}' created successfully at {folder_path}") return self._result( True, action="create", subject_type="folder", message=f"Folder '{folder_name}' created successfully", path=self._get_relative_path(folder_path) ) else: logger.warning(f"Folder '{folder_name}' already exists at {folder_path}") return self._result( True, action="create", subject_type="folder", message=f"Folder '{folder_name}' already exists", path=self._get_relative_path(folder_path) ) except ValueError as e: return self._result(False, action="create", subject_type="folder", error=str(e)) except OSError as e: return self._result(False, action="create", subject_type="folder", error=f"Failed to create folder: {str(e)}") async def delete_folder(self, folder_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Delete a folder. :param folder_name: The name of the folder to delete. :param base_dir: The base directory where the folder is located. :return: A success message if the folder is deleted successfully. """ try: base_path = base_dir if base_dir else "." folder_path = self._resolve_under_restriction(os.path.join(base_path, folder_name)) if await aiofiles.os.path.exists(folder_path): if not await aiofiles.os.path.isdir(folder_path): return self._result(False, action="delete", subject_type="folder", error="Path is not a directory") await asyncio.to_thread(shutil.rmtree, folder_path) logger.info(f"Folder '{folder_name}' deleted successfully from {folder_path}") return self._result( True, action="delete", subject_type="folder", message=f"Folder '{folder_name}' deleted successfully", path=self._get_relative_path(folder_path) ) else: return self._result( False, action="delete", subject_type="folder", error="Folder does not exist", path=self._get_relative_path(folder_path) ) except ValueError as e: return self._result(False, action="delete", subject_type="folder", error=str(e)) except OSError as e: return self._result(False, action="delete", subject_type="folder", error=f"Failed to delete folder: {str(e)}") async def create_file(self, file_name: str, content: str = "", base_dir: Optional[str] = None) -> Dict[str, Any]: """ Create a new file. :param file_name: The name of the file to create. :param content: The content to write to the file. :param base_dir: The base directory where the file should be created. :return: A success message if the file is created successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) await self._atomic_write(file_path, content) logger.info(f"File '{file_name}' created successfully at {file_path}") return self._result( True, action="create", subject_type="file", message=f"File '{file_name}' created successfully", path=self._get_relative_path(file_path), bytes=len(content.encode('utf-8')) ) except ValueError as e: return self._result(False, action="create", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="create", subject_type="file", error=f"Failed to create file: {str(e)}") async def delete_file(self, file_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Delete a file. :param file_name: The name of the file to delete. :param base_dir: The base directory where the file is located. :return: A success message if the file is deleted successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="delete", subject_type="file", error="File does not exist", path=self._get_relative_path(file_path) ) if await aiofiles.os.path.isdir(file_path): return self._result(False, action="delete", subject_type="file", error="Path is a directory") await aiofiles.os.remove(file_path) logger.info(f"File '{file_name}' deleted successfully from {file_path}") return self._result( True, action="delete", subject_type="file", message=f"File '{file_name}' deleted successfully", path=self._get_relative_path(file_path) ) except ValueError as e: return self._result(False, action="delete", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="delete", subject_type="file", error=f"Failed to delete file: {str(e)}") async def read_file( self, file_name: str, base_dir: Optional[str] = None, auto_transcribe_binary: bool = True, auto_describe_images: bool = True, image_description_type: str = "brief", force_binary: bool = False ) -> Dict[str, Any]: """ Read the content of a file. Automatically transcribes binary files (images, PDFs) to text. For images, can provide AI-generated descriptions instead of base64 content. :param file_name: The name of the file to read. :param base_dir: The base directory where the file is located. :param auto_transcribe_binary: Whether to automatically transcribe binary files to text. :param auto_describe_images: Whether to automatically describe image files using AI. :param image_description_type: Type of image description ('detailed', 'brief', 'technical', 'creative'). :param force_binary: If True, always return binary files as base64, bypassing AI processing. :return: The content of the file (text for text files, descriptions/transcribed text for binary files). """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="read", subject_type="file", error="File not found", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result(False, action="read", subject_type="file", error="Path is not a file") # Check if file is binary first is_binary = await self._is_binary_file(file_path) if (auto_transcribe_binary or auto_describe_images or force_binary) else False # If force_binary is True, skip all AI processing and return base64 if force_binary and is_binary: mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') logger.info(f"Binary file '{file_name}' returned as base64 (force_binary=True)") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=True, force_binary=True, bytes=len(binary_content) ) if is_binary and (auto_transcribe_binary or auto_describe_images): # Get MIME type for processing decisions mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" # Check if it's an image and auto_describe_images is enabled if mime_type.startswith('image/') and auto_describe_images and self.valves.openrouter_api_key: logger.info(f"Describing image file '{file_name}' ({mime_type})") description_result = await self.describe_image( file_name=file_name, base_dir=base_dir, description_type=image_description_type ) if description_result["ok"]: description_text = description_result.get("description", "") logger.info(f"Image file '{file_name}' successfully described") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=description_text, encoding="image_description", mime_type=mime_type, is_binary=True, is_image=True, description_attempted=True, description_successful=True, description_type=image_description_type, model=description_result.get("model", ""), word_count=description_result.get("word_count", 0), character_count=description_result.get("character_count", 0), bytes=description_result.get("file_size", 0) ) else: logger.warning(f"Image description failed for '{file_name}', falling back to base64") # Fall through to base64 handling below # Handle transcription for non-images or when image description is disabled/failed if auto_transcribe_binary: # Check if we have OpenRouter API key for transcription if not self.valves.openrouter_api_key: # Fallback to base64 if no API key async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') logger.warning(f"Binary file '{file_name}' returned as base64 - no OpenRouter API key for transcription") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=True, transcription_attempted=False, note="Returned as base64 - set openrouter_api_key for automatic transcription", bytes=len(binary_content) ) # Check if file type is supported for transcription if mime_type and self._is_file_supported_by_openrouter(mime_type): # Transcribe the binary file logger.info(f"Transcribing binary file '{file_name}' ({mime_type})") transcription_result = await self.transcribe_file( file_name=file_name, base_dir=base_dir, transcription_mode="auto", output_format="text" ) if transcription_result["ok"]: transcribed_text = transcription_result.get("transcribed_text", "") logger.info(f"Binary file '{file_name}' successfully transcribed to text") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=transcribed_text, encoding="transcribed_text", mime_type=mime_type, is_binary=True, transcription_attempted=True, transcription_successful=True, transcription_mode=transcription_result.get("transcription_mode", "auto"), model=transcription_result.get("model", ""), word_count=transcription_result.get("word_count", 0), character_count=transcription_result.get("character_count", 0), bytes=transcription_result.get("file_size", 0) ) else: # Transcription failed, fallback to base64 logger.warning(f"Transcription failed for '{file_name}', falling back to base64") async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=True, transcription_attempted=True, transcription_successful=False, transcription_error=transcription_result.get("error", "Unknown error"), bytes=len(binary_content) ) else: # Unsupported file type for transcription, return base64 async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') logger.info(f"Binary file '{file_name}' returned as base64 - unsupported type for transcription") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type or "application/octet-stream", is_binary=True, transcription_attempted=False, note="File type not supported for transcription", bytes=len(binary_content) ) else: # Neither transcription nor description enabled/available, return base64 async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=True, bytes=len(binary_content) ) else: # Handle as text file, but check if force_binary is requested if force_binary: # Force binary mode even for text files mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "text/plain" async with aiofiles.open(file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') logger.info(f"Text file '{file_name}' returned as base64 (force_binary=True)") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=False, force_binary=True, bytes=len(binary_content) ) else: # Normal text file handling async with aiofiles.open(file_path, "r", encoding="utf-8") as file: content = await file.read() logger.info(f"Text file '{file_name}' read successfully from {file_path}") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(file_path), content=content, encoding="utf-8", is_binary=False, bytes=len(content.encode('utf-8')) ) except UnicodeDecodeError as e: # Fallback: if UTF-8 decoding fails, treat as binary if auto_transcribe_binary: logger.warning(f"UTF-8 decode failed for '{file_name}', treating as binary file") try: # Re-resolve file path for fallback (should be same as above) base_path = base_dir if base_dir else "." fallback_file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) # Try transcription first if API key is available if self.valves.openrouter_api_key: mime_type, _ = mimetypes.guess_type(fallback_file_path) if mime_type and self._is_file_supported_by_openrouter(mime_type): transcription_result = await self.transcribe_file( file_name=file_name, base_dir=base_dir, transcription_mode="auto", output_format="text" ) if transcription_result["ok"]: transcribed_text = transcription_result.get("transcribed_text", "") return self._result( True, action="read", subject_type="file", path=self._get_relative_path(fallback_file_path), content=transcribed_text, encoding="transcribed_text", mime_type=mime_type, is_binary=True, transcription_attempted=True, transcription_successful=True, note="Transcribed due to UTF-8 decode error", bytes=transcription_result.get("file_size", 0) ) # Fallback to base64 if transcription not available or failed mime_type, _ = mimetypes.guess_type(fallback_file_path) if mime_type is None: mime_type = "application/octet-stream" async with aiofiles.open(fallback_file_path, "rb") as file: binary_content = await file.read() base64_content = base64.b64encode(binary_content).decode('utf-8') return self._result( True, action="read", subject_type="file", path=self._get_relative_path(fallback_file_path), content=base64_content, encoding="base64", mime_type=mime_type, is_binary=True, bytes=len(binary_content), note="Treated as binary due to encoding error" ) except Exception as fallback_error: return self._result(False, action="read", subject_type="file", error=f"Failed to read as binary fallback: {str(fallback_error)}") else: return self._result(False, action="read", subject_type="file", error=f"File encoding error: {str(e)}") except ValueError as e: return self._result(False, action="read", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="read", subject_type="file", error=f"Failed to read file: {str(e)}") async def get_file_info_extended(self, file_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Get extended metadata and information about a file, including MIME type and binary detection. :param file_name: The name of the file to analyze. :param base_dir: The base directory where the file is located. :return: Extended file information including MIME type, binary status, etc. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="file_info_extended", subject_type="file", error="File does not exist", path=self._get_relative_path(file_path) ) # Get basic file stats st = await aiofiles.os.stat(file_path) # Get MIME type mime_type, encoding = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" # Check if file is binary is_binary = await self._is_binary_file(file_path) if await aiofiles.os.path.isfile(file_path) else False # Calculate file hash for integrity checking file_hash = None if await aiofiles.os.path.isfile(file_path) and st.st_size < 100 * 1024 * 1024: # Only hash files < 100MB try: async with aiofiles.open(file_path, 'rb') as f: content = await f.read() file_hash = hashlib.sha256(content).hexdigest() except Exception: pass # Skip hash if can't read file return self._result( True, action="file_info_extended", subject_type="file", path=self._get_relative_path(file_path), name=os.path.basename(file_path), is_dir=await aiofiles.os.path.isdir(file_path), is_file=await aiofiles.os.path.isfile(file_path), is_symlink=await aiofiles.os.path.islink(file_path), is_binary=is_binary, size=st.st_size, mime_type=mime_type, encoding=encoding, mode=pystat.filemode(st.st_mode), mtime=st.st_mtime, ctime=st.st_ctime, atime=st.st_atime, created=datetime.datetime.utcfromtimestamp(st.st_ctime).strftime("%Y-%m-%d %H:%M:%S UTC"), modified=datetime.datetime.utcfromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S UTC"), accessed=datetime.datetime.utcfromtimestamp(st.st_atime).strftime("%Y-%m-%d %H:%M:%S UTC"), hash_sha256=file_hash ) except ValueError as e: return self._result(False, action="file_info_extended", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="file_info_extended", subject_type="file", error=f"Failed to get file info: {str(e)}") async def write_to_file(self, file_name: str, content: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Write content to a file. :param file_name: The name of the file to write to. :param content: The content to write to the file. :param base_dir: The base directory where the file is located. :return: A success message if the content is written successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) await self._atomic_write(file_path, content) logger.info(f"Content written to file '{file_name}' successfully at {file_path}") return self._result( True, action="write", subject_type="file", path=self._get_relative_path(file_path), bytes=len(content.encode('utf-8')) ) except ValueError as e: return self._result(False, action="write", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="write", subject_type="file", error=f"Failed to write file: {str(e)}") async def list_files(self, base_dir: Optional[str] = None, include_hidden: bool = False) -> Dict[str, Any]: """ List all files in the specified directory. :param base_dir: The base directory where the files should be listed. :param include_hidden: Whether to include hidden files (starting with .). :return: A list of files in the specified directory. """ try: base_path = base_dir if base_dir else "." directory_path = self._resolve_under_restriction(base_path) if not await aiofiles.os.path.exists(directory_path): return self._result( False, action="list", subject_type="directory", error="Path does not exist", path=self._get_relative_path(directory_path) ) if not await aiofiles.os.path.isdir(directory_path): return self._result( False, action="list", subject_type="directory", error="Path is not a directory", path=self._get_relative_path(directory_path) ) entries = [] dir_entries = await aiofiles.os.listdir(directory_path) for entry in sorted(dir_entries, key=lambda x: x.lower()): if not include_hidden and entry.startswith('.'): continue entry_path = os.path.join(directory_path, entry) try: st = await aiofiles.os.stat(entry_path) is_dir = await aiofiles.os.path.isdir(entry_path) entries.append({ "name": entry, "path": self._get_relative_path(entry_path), "type": "dir" if is_dir else "file", "size": None if is_dir else st.st_size, "modified": datetime.datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), "permissions": pystat.filemode(st.st_mode), }) except Exception as e: entries.append({ "name": entry, "path": self._get_relative_path(entry_path), "type": "unknown", "error": str(e) }) logger.info(f"Files listed successfully from {directory_path}") return self._result( True, action="list", subject_type="directory", entries=entries, path=self._get_relative_path(directory_path), count=len(entries) ) except ValueError as e: return self._result(False, action="list", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="list", subject_type="directory", error=f"Failed to list directory: {str(e)}") async def is_file(self, path: str) -> bool: """ Check if the given path is a file. :param path: The path to check. :return: True if the path is a file, False otherwise. """ try: resolved_path = self._resolve_under_restriction(path) return await aiofiles.os.path.isfile(resolved_path) except (ValueError, OSError): return False async def is_directory(self, path: str) -> bool: """ Check if the given path is a directory. :param path: The path to check. :return: True if the path is a directory, False otherwise. """ try: resolved_path = self._resolve_under_restriction(path) return await aiofiles.os.path.isdir(resolved_path) except (ValueError, OSError): return False async def get_file_metadata(self, file_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Get metadata of a file. :param file_name: The name of the file to get metadata for. :param base_dir: The base directory where the file is located. :return: A dictionary containing the file's metadata. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="metadata", subject_type="file", error="Path does not exist", path=self._get_relative_path(file_path) ) st = await aiofiles.os.stat(file_path) return self._result( True, action="metadata", subject_type="file", path=self._get_relative_path(file_path), is_dir=await aiofiles.os.path.isdir(file_path), is_file=await aiofiles.os.path.isfile(file_path), is_symlink=await aiofiles.os.path.islink(file_path), size=st.st_size, mode=pystat.filemode(st.st_mode), mtime=st.st_mtime, ctime=st.st_ctime, atime=st.st_atime, created=datetime.datetime.utcfromtimestamp(st.st_ctime).strftime("%Y-%m-%d %H:%M:%S UTC"), modified=datetime.datetime.utcfromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S UTC"), accessed=datetime.datetime.utcfromtimestamp(st.st_atime).strftime("%Y-%m-%d %H:%M:%S UTC"), ) except ValueError as e: return self._result(False, action="metadata", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="metadata", subject_type="file", error=f"Failed to get metadata: {str(e)}") async def copy_file( self, src_file: str, dest_file: str, src_base_dir: Optional[str] = None, dest_base_dir: Optional[str] = None ) -> Dict[str, Any]: """ Copy a file from source to destination. :param src_file: The name of the source file. :param dest_file: The name of the destination file. :param src_base_dir: The base directory where the source file is located. :param dest_base_dir: The base directory where the destination file should be created. :return: A success message if the file is copied successfully. """ try: src_base = src_base_dir if src_base_dir else "." dest_base = dest_base_dir if dest_base_dir else "." src_file_path = self._resolve_under_restriction(os.path.join(src_base, src_file)) dest_file_path = self._resolve_under_restriction(os.path.join(dest_base, dest_file)) if not await aiofiles.os.path.exists(src_file_path): return self._result( False, action="copy", subject_type="file", error="Source file does not exist", src=self._get_relative_path(src_file_path) ) if not await aiofiles.os.path.isfile(src_file_path): return self._result(False, action="copy", subject_type="file", error="Source path is not a file") # Skip symlinks for security if await aiofiles.os.path.islink(src_file_path): return self._result(False, action="copy", subject_type="file", error="Cannot copy symlinks") await self._ensure_parent_dir(dest_file_path) await asyncio.to_thread(shutil.copy2, src_file_path, dest_file_path) logger.info(f"File '{src_file}' copied successfully to {dest_file_path}") return self._result( True, action="copy", subject_type="file", src=self._get_relative_path(src_file_path), dst=self._get_relative_path(dest_file_path) ) except ValueError as e: return self._result(False, action="copy", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="copy", subject_type="file", error=f"Failed to copy file: {str(e)}") async def copy_folder( self, src_folder: str, dest_folder: str, src_base_dir: Optional[str] = None, dest_base_dir: Optional[str] = None, ) -> Dict[str, Any]: """ Copy a folder from source to destination. :param src_folder: The name of the source folder. :param dest_folder: The name of the destination folder. :param src_base_dir: The base directory where the source folder is located. :param dest_base_dir: The base directory where the destination folder should be created. :return: A success message if the folder is copied successfully. """ try: src_base = src_base_dir if src_base_dir else "." dest_base = dest_base_dir if dest_base_dir else "." src_folder_path = self._resolve_under_restriction(os.path.join(src_base, src_folder)) dest_folder_path = self._resolve_under_restriction(os.path.join(dest_base, dest_folder)) if not await aiofiles.os.path.exists(src_folder_path): return self._result( False, action="copy", subject_type="folder", error="Source folder does not exist", src=self._get_relative_path(src_folder_path) ) if not await aiofiles.os.path.isdir(src_folder_path): return self._result(False, action="copy", subject_type="folder", error="Source path is not a directory") # Skip symlinks for security if await aiofiles.os.path.islink(src_folder_path): return self._result(False, action="copy", subject_type="folder", error="Cannot copy symlinked directories") # Use dirs_exist_ok=True to handle existing destinations await asyncio.to_thread(shutil.copytree, src_folder_path, dest_folder_path, dirs_exist_ok=True, symlinks=False) logger.info(f"Folder '{src_folder}' copied successfully to {dest_folder_path}") return self._result( True, action="copy", subject_type="folder", src=self._get_relative_path(src_folder_path), dst=self._get_relative_path(dest_folder_path) ) except ValueError as e: return self._result(False, action="copy", subject_type="folder", error=str(e)) except OSError as e: return self._result(False, action="copy", subject_type="folder", error=f"Failed to copy folder: {str(e)}") async def move_file( self, src_file: str, dest_file: str, src_base_dir: Optional[str] = None, dest_base_dir: Optional[str] = None ) -> Dict[str, Any]: """ Move a file from source to destination. :param src_file: The name of the source file. :param dest_file: The name of the destination file. :param src_base_dir: The base directory where the source file is located. :param dest_base_dir: The base directory where the destination file should be created. :return: A success message if the file is moved successfully. """ try: src_base = src_base_dir if src_base_dir else "." dest_base = dest_base_dir if dest_base_dir else "." src_file_path = self._resolve_under_restriction(os.path.join(src_base, src_file)) dest_file_path = self._resolve_under_restriction(os.path.join(dest_base, dest_file)) if not await aiofiles.os.path.exists(src_file_path): return self._result( False, action="move", subject_type="file", error="Source file does not exist", src=self._get_relative_path(src_file_path) ) if not await aiofiles.os.path.isfile(src_file_path): return self._result(False, action="move", subject_type="file", error="Source path is not a file") # Skip symlinks for security if await aiofiles.os.path.islink(src_file_path): return self._result(False, action="move", subject_type="file", error="Cannot move symlinks") await self._ensure_parent_dir(dest_file_path) await asyncio.to_thread(shutil.move, src_file_path, dest_file_path) logger.info(f"File '{src_file}' moved successfully to {dest_file_path}") return self._result( True, action="move", subject_type="file", src=self._get_relative_path(src_file_path), dst=self._get_relative_path(dest_file_path) ) except ValueError as e: return self._result(False, action="move", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="move", subject_type="file", error=f"Failed to move file: {str(e)}") async def move_folder( self, src_folder: str, dest_folder: str, src_base_dir: Optional[str] = None, dest_base_dir: Optional[str] = None, ) -> Dict[str, Any]: """ Move a folder from source to destination. :param src_folder: The name of the source folder. :param dest_folder: The name of the destination folder. :param src_base_dir: The base directory where the source folder is located. :param dest_base_dir: The base directory where the destination folder should be created. :return: A success message if the folder is moved successfully. """ try: src_base = src_base_dir if src_base_dir else "." dest_base = dest_base_dir if dest_base_dir else "." src_folder_path = self._resolve_under_restriction(os.path.join(src_base, src_folder)) dest_folder_path = self._resolve_under_restriction(os.path.join(dest_base, dest_folder)) if not await aiofiles.os.path.exists(src_folder_path): return self._result( False, action="move", subject_type="folder", error="Source folder does not exist", src=self._get_relative_path(src_folder_path) ) if not await aiofiles.os.path.isdir(src_folder_path): return self._result(False, action="move", subject_type="folder", error="Source path is not a directory") # Skip symlinks for security if await aiofiles.os.path.islink(src_folder_path): return self._result(False, action="move", subject_type="folder", error="Cannot move symlinked directories") await self._ensure_parent_dir(dest_folder_path) await asyncio.to_thread(shutil.move, src_folder_path, dest_folder_path) logger.info(f"Folder '{src_folder}' moved successfully to {dest_folder_path}") return self._result( True, action="move", subject_type="folder", src=self._get_relative_path(src_folder_path), dst=self._get_relative_path(dest_folder_path) ) except ValueError as e: return self._result(False, action="move", subject_type="folder", error=str(e)) except OSError as e: return self._result(False, action="move", subject_type="folder", error=f"Failed to move folder: {str(e)}") async def batch_rename_files( self, directory: str, old_pattern: str, new_pattern: str ) -> Dict[str, Any]: """ Batch rename files in a directory. :param directory: The directory containing the files to rename. :param old_pattern: The old pattern in the file names to replace. :param new_pattern: The new pattern to replace the old pattern with. :return: A success message if the files are renamed successfully. """ try: directory_path = self._resolve_under_restriction(directory) if not await aiofiles.os.path.exists(directory_path): return self._result( False, action="batch_rename", subject_type="directory", error="Directory does not exist", path=self._get_relative_path(directory_path) ) if not await aiofiles.os.path.isdir(directory_path): return self._result(False, action="batch_rename", subject_type="directory", error="Path is not a directory") renamed = [] failed = [] dir_entries = await aiofiles.os.listdir(directory_path) for filename in dir_entries: if old_pattern in filename: new_filename = filename.replace(old_pattern, new_pattern) old_path = os.path.join(directory_path, filename) new_path = os.path.join(directory_path, new_filename) # Skip symlinks for security if await aiofiles.os.path.islink(old_path): failed.append({"from": filename, "error": "Cannot rename symlinks"}) continue try: await aiofiles.os.rename(old_path, new_path) renamed.append({ "from": self._get_relative_path(old_path), "to": self._get_relative_path(new_path) }) except OSError as e: failed.append({"from": filename, "error": str(e)}) logger.info(f"Batch rename in directory '{directory}' completed: {len(renamed)} renamed, {len(failed)} failed") return self._result( True, action="batch_rename", subject_type="directory", message=f"Batch rename completed: {len(renamed)} renamed, {len(failed)} failed", renamed=renamed, failed=failed, path=self._get_relative_path(directory_path) ) except ValueError as e: return self._result(False, action="batch_rename", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="batch_rename", subject_type="directory", error=f"Failed to access directory: {str(e)}") async def compress_file( self, file_name: Union[str, List[str]], output_filename: str, format: str = "zip", base_dir: Optional[str] = None, ) -> Dict[str, Any]: """ Compress a single file, an entire directory, or a list of files into the specified format. :param file_name: A path to a file or directory, or a list of file paths to include. :param output_filename: The name of the output compressed file. :param format: The compression format ('zip', 'tar', 'gztar'). :param base_dir: The base directory where the input path(s) are located. :return: A success message if compression succeeds. """ try: base_path = base_dir if base_dir else "." output_path = self._resolve_under_restriction(os.path.join(base_path, output_filename)) # Build list of (absolute_file_path, arcname) to include in archive def iter_items_for_source(abs_path: str) -> List[tuple[str, str]]: items: List[tuple[str, str]] = [] # Skip symlinks entirely if os.path.islink(abs_path): return items if os.path.isfile(abs_path): items.append((abs_path, os.path.basename(abs_path))) elif os.path.isdir(abs_path): root_name = os.path.basename(abs_path.rstrip(os.sep)) for walk_root, dirnames, filenames in os.walk(abs_path, followlinks=False): # Prevent following symlinked directories dirnames[:] = [d for d in dirnames if not os.path.islink(os.path.join(walk_root, d))] for fname in filenames: file_abs = os.path.join(walk_root, fname) if os.path.islink(file_abs): continue rel_inside = os.path.relpath(file_abs, abs_path) arcname = os.path.join(root_name, rel_inside) items.append((file_abs, arcname)) return items resolved_inputs: List[str] = [] # Normalize inputs to a list of absolute paths if isinstance(file_name, list): # In list mode, expect files only for name in file_name: candidate = self._resolve_under_restriction(os.path.join(base_path, name)) if not os.path.exists(candidate): return self._result( False, action="compress", subject_type="file", error=f"Source does not exist: {name}", path=self._get_relative_path(candidate) ) if os.path.islink(candidate): return self._result(False, action="compress", subject_type="file", error=f"Cannot compress symlink: {name}") if os.path.isdir(candidate): return self._result(False, action="compress", subject_type="file", error=f"List mode only supports files (got directory: {name})") resolved_inputs.append(candidate) else: single_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not os.path.exists(single_path): return self._result( False, action="compress", subject_type="file", error="Source path does not exist", path=self._get_relative_path(single_path) ) if os.path.islink(single_path): return self._result(False, action="compress", subject_type="file", error="Cannot compress symlinks") resolved_inputs.append(single_path) # Gather items items_to_add: List[tuple[str, str]] = [] for src in resolved_inputs: items_to_add.extend(iter_items_for_source(src)) if not items_to_add: return self._result(False, action="compress", subject_type="file", error="No files to compress") await self._ensure_parent_dir(output_path) # Perform compression using CPU-bound threads if format == "zip": await asyncio.to_thread(self._compress_zip_items, items_to_add, output_path) elif format == "tar": await asyncio.to_thread(self._compress_tar_items, items_to_add, output_path, False) elif format == "gztar": await asyncio.to_thread(self._compress_tar_items, items_to_add, output_path, True) else: return self._result(False, action="compress", subject_type="file", error=f"Unsupported compression format: {format}") # For result metadata display_sources = [self._get_relative_path(p) for p in resolved_inputs] logger.info(f"Compressed {len(items_to_add)} item(s) to {output_path}") return self._result( True, action="compress", subject_type="file", message="Compressed successfully", output=self._get_relative_path(output_path), format=format, sources=display_sources, file_count=len(items_to_add) ) except ValueError as e: return self._result(False, action="compress", subject_type="file", error=str(e)) except (zipfile.BadZipFile, tarfile.TarError) as e: return self._result(False, action="compress", subject_type="file", error=f"Compression error: {str(e)}") except OSError as e: return self._result(False, action="compress", subject_type="file", error=f"Failed to compress file: {str(e)}") def _compress_zip_items(self, items: List[tuple[str, str]], output_path: str) -> None: """Helper to create a ZIP archive from a list of (abs_path, arcname).""" with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf: for abs_path, arcname in items: zipf.write(abs_path, arcname) def _compress_tar_items(self, items: List[tuple[str, str]], output_path: str, gzip: bool) -> None: """Helper to create a TAR/TAR.GZ archive from a list of (abs_path, arcname).""" mode = "w:gz" if gzip else "w" with tarfile.open(output_path, mode) as tarf: for abs_path, arcname in items: tarf.add(abs_path, arcname) async def decompress_file( self, file_name: str, output_directory: str, base_dir: Optional[str] = None ) -> Dict[str, Any]: """ Decompress a file into the specified directory. :param file_name: The name of the file to decompress. :param output_directory: The directory where the decompressed files will be stored. :param base_dir: The base directory where the file is located. :return: A success message if the file is decompressed successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) output_path = self._resolve_under_restriction(os.path.join(base_path, output_directory)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="decompress", subject_type="file", error="Archive file does not exist", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result(False, action="decompress", subject_type="file", error="Archive path is not a file") # Skip symlinks for security if await aiofiles.os.path.islink(file_path): return self._result(False, action="decompress", subject_type="file", error="Cannot decompress symlinks") await self._ensure_parent_dir(output_path) await aiofiles.os.makedirs(output_path, exist_ok=True) # Use asyncio.to_thread for CPU-bound decompression operations extracted_files = await asyncio.to_thread(self._decompress_archive, file_path, output_path, file_name) logger.info(f"File '{file_name}' decompressed successfully to {output_path}") return self._result( True, action="decompress", subject_type="file", message="Decompressed successfully", output=self._get_relative_path(output_path), extracted_files=extracted_files, count=len(extracted_files) ) except ValueError as e: return self._result(False, action="decompress", subject_type="file", error=str(e)) except (zipfile.BadZipFile, tarfile.TarError) as e: return self._result(False, action="decompress", subject_type="file", error=f"Archive error: {str(e)}") except OSError as e: return self._result(False, action="decompress", subject_type="file", error=f"Failed to decompress file: {str(e)}") def _decompress_archive(self, file_path: str, output_path: str, file_name: str) -> List[str]: """Helper method for archive decompression.""" extracted_files = [] if file_name.endswith(".zip"): with zipfile.ZipFile(file_path, "r") as zipf: for member in zipf.namelist(): # Sanitize each member path to prevent directory traversal safe_path = self._sanitize_archive_path(member, output_path) # Extract to the sanitized path member_data = zipf.read(member) os.makedirs(os.path.dirname(safe_path), exist_ok=True) with open(safe_path, 'wb') as f: f.write(member_data) extracted_files.append(self._get_relative_path(safe_path)) elif ( file_name.endswith(".tar") or file_name.endswith(".tar.gz") or file_name.endswith(".tgz") ): mode = "r:gz" if file_name.endswith((".tar.gz", ".tgz")) else "r" with tarfile.open(file_path, mode) as tarf: for member in tarf.getmembers(): # Sanitize each member path to prevent directory traversal safe_path = self._sanitize_archive_path(member.name, output_path) if member.isfile(): # Extract file to sanitized path member_data = tarf.extractfile(member) if member_data: os.makedirs(os.path.dirname(safe_path), exist_ok=True) with open(safe_path, 'wb') as f: shutil.copyfileobj(member_data, f) extracted_files.append(self._get_relative_path(safe_path)) elif member.isdir(): # Create directory os.makedirs(safe_path, exist_ok=True) else: raise ValueError("Unsupported archive format") return extracted_files async def save_file_version(self, file_name: str, base_dir: Optional[str] = None) -> Dict[str, Any]: """ Save a version of the file. :param file_name: The name of the file to save a version of. :param base_dir: The base directory where the file is located. :return: A success message if the version is saved successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="version_save", subject_type="file", error="File does not exist", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result(False, action="version_save", subject_type="file", error="Path is not a file") # Skip symlinks for security if await aiofiles.os.path.islink(file_path): return self._result(False, action="version_save", subject_type="file", error="Cannot version symlinks") # Use resolved path as key to avoid collisions version_key = file_path version_index = len(self.versions[version_key]) + 1 # Create version filename with timestamp to avoid collisions timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") base_name, ext = os.path.splitext(file_name) version_name = f"{base_name}_v{version_index}_{timestamp}{ext}" version_path = self._resolve_under_restriction(os.path.join(base_path, version_name)) await asyncio.to_thread(shutil.copy2, file_path, version_path) self.versions[version_key].append(version_path) logger.info(f"Version saved for file '{file_name}' at {version_path}") return self._result( True, action="version_save", subject_type="file", message="Version saved", version=version_index, version_path=self._get_relative_path(version_path), original_path=self._get_relative_path(file_path) ) except ValueError as e: return self._result(False, action="version_save", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="version_save", subject_type="file", error=f"Failed to save version: {str(e)}") async def restore_file_version( self, file_name: str, version: int, base_dir: Optional[str] = None ) -> Dict[str, Any]: """ Restore a file to a previous version. :param file_name: The name of the file to restore. :param version: The version number to restore. :param base_dir: The base directory where the file is located. :return: A success message if the file is restored successfully. """ try: base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) # Use resolved path as key version_key = file_path available_versions = len(self.versions[version_key]) if version < 1 or version > available_versions: return self._result( False, action="version_restore", subject_type="file", error="Version does not exist", available=available_versions, requested=version ) version_path = self.versions[version_key][version - 1] if not await aiofiles.os.path.exists(version_path): return self._result( False, action="version_restore", subject_type="file", error="Version file no longer exists", version_path=self._get_relative_path(version_path) ) # Restore even if original doesn't exist (recreate it) await self._ensure_parent_dir(file_path) await asyncio.to_thread(shutil.copy2, version_path, file_path) logger.info(f"File '{file_name}' restored to version {version} at {file_path}") return self._result( True, action="version_restore", subject_type="file", message="Restored", path=self._get_relative_path(file_path), version=version, version_path=self._get_relative_path(version_path) ) except ValueError as e: return self._result(False, action="version_restore", subject_type="file", error=str(e)) except OSError as e: return self._result(False, action="version_restore", subject_type="file", error=f"Failed to restore version: {str(e)}") async def search_files( self, keyword: str, base_dir: Optional[str] = None, case_sensitive: bool = True, include_content: bool = True, max_results: int = 100, __event_emitter__=None ) -> Dict[str, Any]: """ Search for files containing the keyword in their names or content. :param keyword: The keyword to search for. :param base_dir: The base directory where to search for files. :param case_sensitive: Whether the search should be case sensitive. :param include_content: Whether to search file contents (only text files). :param max_results: Maximum number of results to return. :return: A list of file paths that match the search criteria. """ try: base_path = base_dir if base_dir else "." search_path = self._resolve_under_restriction(base_path) await self._emit_status(__event_emitter__, f"Searching for '{keyword}' in {self._get_relative_path(search_path)}") if not await aiofiles.os.path.exists(search_path): return self._result( False, action="search", subject_type="directory", error="Path must be an existing directory", path=self._get_relative_path(search_path) ) if not await aiofiles.os.path.isdir(search_path): return self._result(False, action="search", subject_type="directory", error="Path is not a directory") search_keyword = keyword if case_sensitive else keyword.lower() matches = [] # Use asyncio.to_thread for the file walking operation matches = await asyncio.to_thread( self._search_files_sync, search_path, search_keyword, case_sensitive, include_content, max_results ) logger.info(f"Search for keyword '{keyword}' completed with {len(matches)} matches") result = self._result( True, action="search", subject_type="directory", count=len(matches), matches=matches, keyword=keyword, case_sensitive=case_sensitive, truncated=len(matches) >= max_results ) await self._emit_status(__event_emitter__, f"Search complete: {len(matches)} match(es)", done=True) return result except ValueError as e: return self._result(False, action="search", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="search", subject_type="directory", error=f"Failed to search: {str(e)}") def _search_files_sync(self, search_path: str, search_keyword: str, case_sensitive: bool, include_content: bool, max_results: int) -> List[Dict[str, Any]]: """Synchronous helper method for file searching.""" matches = [] for root, dirs, files in os.walk(search_path, followlinks=False): # Remove symlinked directories from dirs to prevent following them dirs[:] = [d for d in dirs if not os.path.islink(os.path.join(root, d))] for fname in files: if len(matches) >= max_results: break fpath = os.path.join(root, fname) # Skip symlinks for security if os.path.islink(fpath): continue try: # Check filename match check_name = fname if case_sensitive else fname.lower() name_match = search_keyword in check_name content_match = False # Check content match if requested and file is not too large if include_content and not name_match: try: file_size = os.path.getsize(fpath) if file_size <= self.valves.max_search_file_size: # Check if file is binary first with open(fpath, 'rb') as f: chunk = f.read(1024) is_binary = b'\0' in chunk if not is_binary: with open(fpath, "r", encoding="utf-8", errors="ignore") as f: content = f.read() check_content = content if case_sensitive else content.lower() content_match = search_keyword in check_content except Exception: # Skip files that can't be read continue if name_match or content_match: matches.append({ "path": self._get_relative_path(fpath), "name": fname, "match_type": "name" if name_match else "content", "size": os.path.getsize(fpath) }) except Exception: # Skip files that cause errors continue if len(matches) >= max_results: break return matches async def search_file_names( self, pattern: str, base_dir: Optional[str] = None, case_sensitive: bool = True, include_extensions: bool = True, max_results: int = 100, __event_emitter__=None ) -> Dict[str, Any]: """ Search for files by name and/or extension only (no content search). :param pattern: The pattern to search for in file names/extensions. :param base_dir: The base directory where to search for files. :param case_sensitive: Whether the search should be case sensitive. :param include_extensions: Whether to search in file extensions as well as names. :param max_results: Maximum number of results to return. :return: A list of file paths that match the search criteria. """ try: base_path = base_dir if base_dir else "." search_path = self._resolve_under_restriction(base_path) await self._emit_status(__event_emitter__, f"Searching names for '{pattern}' in {self._get_relative_path(search_path)}") if not await aiofiles.os.path.exists(search_path): return self._result( False, action="search_file_names", subject_type="directory", error="Path must be an existing directory", path=self._get_relative_path(search_path) ) if not await aiofiles.os.path.isdir(search_path): return self._result(False, action="search_file_names", subject_type="directory", error="Path is not a directory") search_pattern = pattern if case_sensitive else pattern.lower() matches = [] # Use asyncio.to_thread for the file walking operation matches = await asyncio.to_thread( self._search_file_names_sync, search_path, search_pattern, case_sensitive, include_extensions, max_results ) logger.info(f"File name search for pattern '{pattern}' completed with {len(matches)} matches") result = self._result( True, action="search_file_names", subject_type="directory", count=len(matches), matches=matches, pattern=pattern, case_sensitive=case_sensitive, include_extensions=include_extensions, truncated=len(matches) >= max_results ) await self._emit_status(__event_emitter__, f"Name search complete: {len(matches)} match(es)", done=True) return result except ValueError as e: return self._result(False, action="search_file_names", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="search_file_names", subject_type="directory", error=f"Failed to search: {str(e)}") def _search_file_names_sync(self, search_path: str, search_pattern: str, case_sensitive: bool, include_extensions: bool, max_results: int) -> List[Dict[str, Any]]: """Synchronous helper method for file name searching.""" matches = [] for root, dirs, files in os.walk(search_path, followlinks=False): # Remove symlinked directories from dirs to prevent following them dirs[:] = [d for d in dirs if not os.path.islink(os.path.join(root, d))] for fname in files: if len(matches) >= max_results: break fpath = os.path.join(root, fname) # Skip symlinks for security if os.path.islink(fpath): continue try: # Check filename match check_name = fname if case_sensitive else fname.lower() name_match = search_pattern in check_name # Check extension match if requested extension_match = False if include_extensions and not name_match: # Get file extension (including the dot) _, ext = os.path.splitext(fname) if ext: # Only check if there's an extension check_ext = ext if case_sensitive else ext.lower() extension_match = search_pattern in check_ext if name_match or extension_match: # Get file size and other info file_size = os.path.getsize(fpath) _, ext = os.path.splitext(fname) matches.append({ "path": self._get_relative_path(fpath), "name": fname, "extension": ext if ext else None, "match_type": "name" if name_match else "extension", "size": file_size }) except Exception: # Skip files that cause errors continue if len(matches) >= max_results: break return matches async def synchronize_files(self, source_path: str, destination_path: str) -> Dict[str, Any]: """ Synchronize files between two directories. :param source_path: The source directory to synchronize from. :param destination_path: The destination directory to synchronize to. :return: A success message if the synchronization is completed successfully. """ try: source_path = self._resolve_under_restriction(source_path) destination_path = self._resolve_under_restriction(destination_path) if not await aiofiles.os.path.exists(source_path): return self._result( False, action="sync", subject_type="directory", error="Source directory does not exist", src=self._get_relative_path(source_path) ) if not await aiofiles.os.path.exists(destination_path): return self._result( False, action="sync", subject_type="directory", error="Destination directory does not exist", dst=self._get_relative_path(destination_path) ) if not await aiofiles.os.path.isdir(source_path) or not await aiofiles.os.path.isdir(destination_path): return self._result(False, action="sync", subject_type="directory", error="Both paths must be directories") # Use asyncio.to_thread for the synchronization operation copied, failed = await asyncio.to_thread(self._sync_files, source_path, destination_path) logger.info(f"Synchronization from '{source_path}' to '{destination_path}' completed: {copied} copied, {len(failed)} failed") return self._result( True, action="sync", subject_type="directory", message="Synchronized", copied=copied, failed=failed, src=self._get_relative_path(source_path), dst=self._get_relative_path(destination_path) ) except ValueError as e: return self._result(False, action="sync", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="sync", subject_type="directory", error=f"Failed to synchronize: {str(e)}") def _sync_files(self, source_path: str, destination_path: str) -> tuple[int, List[Dict[str, str]]]: """Synchronous helper method for file synchronization.""" copied = 0 failed = [] for root, dirs, files in os.walk(source_path, followlinks=False): # Remove symlinked directories from dirs to prevent following them dirs[:] = [d for d in dirs if not os.path.islink(os.path.join(root, d))] for fname in files: sfile = os.path.join(root, fname) # Skip symlinks for security if os.path.islink(sfile): continue try: rel = os.path.relpath(sfile, source_path) dfile = os.path.join(destination_path, rel) # Ensure parent directory exists parent_dir = os.path.dirname(dfile) if parent_dir: os.makedirs(parent_dir, exist_ok=True) # Copy if destination doesn't exist or source is newer/different size should_copy = False if not os.path.exists(dfile): should_copy = True else: src_stat = os.stat(sfile) dst_stat = os.stat(dfile) # Copy if source is newer or different size if src_stat.st_mtime > dst_stat.st_mtime or src_stat.st_size != dst_stat.st_size: should_copy = True if should_copy: shutil.copy2(sfile, dfile) copied += 1 except OSError as e: failed.append({"file": self._get_relative_path(sfile), "error": str(e)}) return copied, failed async def backup_files(self, source_path: str, backup_path: str) -> Dict[str, Any]: """ Backup files from the source directory to the backup directory. :param source_path: The source directory to backup from. :param backup_path: The backup directory to backup to. :return: A success message if the backup is completed successfully. """ try: source_path = self._resolve_under_restriction(source_path) backup_path = self._resolve_under_restriction(backup_path) if not await aiofiles.os.path.exists(source_path): return self._result( False, action="backup", subject_type="directory", error="Source directory does not exist", src=self._get_relative_path(source_path) ) if not await aiofiles.os.path.isdir(source_path): return self._result(False, action="backup", subject_type="directory", error="Source path is not a directory") # Create backup directory if it doesn't exist await aiofiles.os.makedirs(backup_path, exist_ok=True) # Use asyncio.to_thread for the backup operation count, failed = await asyncio.to_thread(self._backup_files, source_path, backup_path) logger.info(f"Backup from '{source_path}' to '{backup_path}' completed: {count} files, {len(failed)} failed") return self._result( True, action="backup", subject_type="directory", message="Backup completed", files=count, failed=failed, src=self._get_relative_path(source_path), backup=self._get_relative_path(backup_path) ) except ValueError as e: return self._result(False, action="backup", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="backup", subject_type="directory", error=f"Failed to backup: {str(e)}") def _backup_files(self, source_path: str, backup_path: str) -> tuple[int, List[Dict[str, str]]]: """Synchronous helper method for file backup.""" count = 0 failed = [] for root, dirs, files in os.walk(source_path, followlinks=False): # Remove symlinked directories from dirs to prevent following them dirs[:] = [d for d in dirs if not os.path.islink(os.path.join(root, d))] for fname in files: sfile = os.path.join(root, fname) # Skip symlinks for security if os.path.islink(sfile): continue try: rel = os.path.relpath(sfile, source_path) bfile = os.path.join(backup_path, rel) parent_dir = os.path.dirname(bfile) if parent_dir: os.makedirs(parent_dir, exist_ok=True) shutil.copy2(sfile, bfile) count += 1 except OSError as e: failed.append({"file": self._get_relative_path(sfile), "error": str(e)}) return count, failed async def recover_files(self, backup_path: str, destination_path: str) -> Dict[str, Any]: """ Recover files from the backup directory to the destination directory. :param backup_path: The backup directory to recover from. :param destination_path: The destination directory to recover to. :return: A success message if the recovery is completed successfully. """ try: backup_path = self._resolve_under_restriction(backup_path) destination_path = self._resolve_under_restriction(destination_path) if not await aiofiles.os.path.exists(backup_path): return self._result( False, action="recover", subject_type="directory", error="Backup directory does not exist", backup=self._get_relative_path(backup_path) ) if not await aiofiles.os.path.isdir(backup_path): return self._result(False, action="recover", subject_type="directory", error="Backup path is not a directory") # Create destination directory if it doesn't exist await aiofiles.os.makedirs(destination_path, exist_ok=True) # Use asyncio.to_thread for the recovery operation count, failed = await asyncio.to_thread(self._recover_files, backup_path, destination_path) logger.info(f"Recovery from '{backup_path}' to '{destination_path}' completed: {count} files, {len(failed)} failed") return self._result( True, action="recover", subject_type="directory", message="Recovery completed", files=count, failed=failed, backup=self._get_relative_path(backup_path), dst=self._get_relative_path(destination_path) ) except ValueError as e: return self._result(False, action="recover", subject_type="directory", error=str(e)) except OSError as e: return self._result(False, action="recover", subject_type="directory", error=f"Failed to recover: {str(e)}") def _recover_files(self, backup_path: str, destination_path: str) -> tuple[int, List[Dict[str, str]]]: """Synchronous helper method for file recovery.""" count = 0 failed = [] for root, dirs, files in os.walk(backup_path, followlinks=False): # Remove symlinked directories from dirs to prevent following them dirs[:] = [d for d in dirs if not os.path.islink(os.path.join(root, d))] for fname in files: bfile = os.path.join(root, fname) # Skip symlinks for security if os.path.islink(bfile): continue try: rel = os.path.relpath(bfile, backup_path) dfile = os.path.join(destination_path, rel) parent_dir = os.path.dirname(dfile) if parent_dir: os.makedirs(parent_dir, exist_ok=True) shutil.copy2(bfile, dfile) count += 1 except OSError as e: failed.append({"file": self._get_relative_path(bfile), "error": str(e)}) return count, failed def _get_supported_file_types(self) -> List[str]: """Get list of file types supported by OpenRouter vision models.""" return [ "image/jpeg", "image/jpg", "image/png", "image/gif", "image/webp", "application/pdf", "text/plain", "text/markdown", "text/csv", "application/json", "text/html", "text/xml", "audio/wav", "audio/mp3" ] def _is_file_supported_by_openrouter(self, mime_type: str) -> bool: """Check if file type is supported by OpenRouter models.""" return mime_type in self._get_supported_file_types() async def _upload_file_to_openrouter( self, file_name: str, prompt: str = "Please analyze this file", base_dir: Optional[str] = None, model: Optional[str] = None, max_tokens: int = 1000, __event_emitter__=None ) -> Dict[str, Any]: """ Upload a file to OpenRouter for analysis by a vision-capable model. :param file_name: The name of the file to upload and analyze. :param prompt: The prompt to send along with the file. :param base_dir: The base directory where the file is located. :param model: The OpenRouter model to use (overrides default). :param max_tokens: Maximum tokens for the response. :return: The model's analysis of the file. """ try: # Check if API key is configured if not self.valves.openrouter_api_key: return self._result( False, action="openrouter_upload", subject_type="file", error="OpenRouter API key not configured. Please set openrouter_api_key in valves." ) base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) await self._emit_status(__event_emitter__, f"Uploading to OpenRouter: {self._get_relative_path(file_path)}") if not await aiofiles.os.path.exists(file_path): return self._result( False, action="openrouter_upload", subject_type="file", error="File not found", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result( False, action="openrouter_upload", subject_type="file", error="Path is not a file" ) # Get file info and check if supported mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" if not self._is_file_supported_by_openrouter(mime_type): return self._result( False, action="openrouter_upload", subject_type="file", error=f"File type '{mime_type}' not supported by OpenRouter models", supported_types=self._get_supported_file_types() ) # Read file content async with aiofiles.open(file_path, "rb") as f: file_content = await f.read() # Prepare the request model_name = model or self.valves.openrouter_model base64_content = base64.b64encode(file_content).decode('utf-8') # Construct message based on file type if mime_type.startswith('image/'): # For images, use vision format messages = [ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{base64_content}" } } ] } ] elif mime_type == "application/pdf": # For PDFs, use the file format with plugin support data_url = f"data:application/pdf;base64,{base64_content}" messages = [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "file", "file": { "filename": file_name, "file_data": data_url } } ] } ] elif mime_type.startswith('audio/'): # For audio files, use input_audio format # Extract format from mime type (e.g., "audio/wav" -> "wav") audio_format = mime_type.split('/')[-1] # Handle common audio format mappings if audio_format == "mpeg": audio_format = "mp3" elif audio_format == "x-m4a": audio_format = "m4a" messages = [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "input_audio", "input_audio": { "data": base64_content, "format": audio_format } } ] } ] else: # For other documents, include content as text context try: content_text = file_content.decode('utf-8') except UnicodeDecodeError: content_text = f"[Binary File: {file_name}]\nBase64 Content: {base64_content[:100]}..." messages = [ { "role": "user", "content": f"{prompt}\n\nFile: {file_name}\nContent:\n{content_text}" } ] # Make API request headers = { "Authorization": f"Bearer {self.valves.openrouter_api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/jojomaw/filesystem", # Required by OpenRouter "X-Title": "Filesystem Tool" # Optional but recommended } payload = { "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": 0.7 } # Add plugins configuration for PDF files if mime_type == "application/pdf": payload["plugins"] = [ { "id": "file-parser", "pdf": { "engine": "pdf-text" # defaults to "mistral-ocr" } } ] async with aiohttp.ClientSession() as session: async with session.post( f"{self.valves.openrouter_base_url}/chat/completions", headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=60) ) as response: if response.status == 200: result = await response.json() # Extract the response content if "choices" in result and len(result["choices"]) > 0: analysis = result["choices"][0]["message"]["content"] logger.info(f"File '{file_name}' successfully analyzed by OpenRouter model '{model_name}'") final = self._result( True, action="openrouter_upload", subject_type="file", path=self._get_relative_path(file_path), model=model_name, mime_type=mime_type, file_size=len(file_content), prompt=prompt, analysis=analysis, usage=result.get("usage", {}), response_id=result.get("id", "") ) await self._emit_status(__event_emitter__, "OpenRouter analysis complete", done=True) return final else: return self._result( False, action="openrouter_upload", subject_type="file", error="No response content received from OpenRouter", response=result ) else: error_msg = f"OpenRouter API error: {response.status}" try: error_detail = await response.json() error_msg += f" - {error_detail.get('error', {}).get('message', 'Unknown error')}" except: error_text = await response.text() error_msg += f" - {error_text}" failed = self._result( False, action="openrouter_upload", subject_type="file", error=error_msg, status_code=response.status ) await self._emit_status(__event_emitter__, f"OpenRouter request failed: {response.status}", done=True) return failed except aiohttp.ClientError as e: return self._result( False, action="openrouter_upload", subject_type="file", error=f"Network error: {str(e)}" ) except ValueError as e: return self._result( False, action="openrouter_upload", subject_type="file", error=str(e) ) except Exception as e: return self._result( False, action="openrouter_upload", subject_type="file", error=f"Unexpected error: {str(e)}" ) async def batch_upload_files_to_openrouter( self, file_names: List[str], prompt: str = "Please analyze these files", base_dir: Optional[str] = None, model: Optional[str] = None, max_tokens: int = 1000, __event_emitter__=None ) -> Dict[str, Any]: """ Upload multiple files to OpenRouter for batch analysis. :param file_names: List of file names to upload and analyze. :param prompt: The prompt to send along with the files. :param base_dir: The base directory where the files are located. :param model: The OpenRouter model to use (overrides default). :param max_tokens: Maximum tokens for the response. :return: Results for each file analysis. """ try: if not file_names: return self._result( False, action="openrouter_batch_upload", subject_type="files", error="No files specified for batch upload" ) results = [] successful = 0 failed = 0 total = len(file_names) for index, file_name in enumerate(file_names, start=1): await self._emit_status(__event_emitter__, f"[{index}/{total}] Uploading: {file_name}") result = await self._upload_file_to_openrouter( file_name=file_name, prompt=prompt, base_dir=base_dir, model=model, max_tokens=max_tokens, __event_emitter__=__event_emitter__ ) results.append({ "file": file_name, "success": result["ok"], "result": result }) if result["ok"]: successful += 1 else: failed += 1 logger.info(f"Batch upload completed: {successful} successful, {failed} failed") final = self._result( True, action="openrouter_batch_upload", subject_type="files", total_files=len(file_names), successful=successful, failed=failed, results=results ) await self._emit_status(__event_emitter__, f"Batch upload complete: {successful} successful, {failed} failed", done=True) return final except Exception as e: return self._result( False, action="openrouter_batch_upload", subject_type="files", error=f"Batch upload error: {str(e)}" ) async def transcribe_file( self, file_name: str, base_dir: Optional[str] = None, transcription_mode: str = "auto", language: Optional[str] = None, output_format: str = "text", __event_emitter__=None ) -> Dict[str, Any]: """ Transcribe text content from any supported file type using OpenRouter models. Supports OCR for images, text extraction from PDFs, and direct text reading. :param file_name: The name of the file to transcribe. :param base_dir: The base directory where the file is located. :param transcription_mode: Mode of transcription ('auto', 'ocr', 'extract', 'read'). :param language: Expected language of the text (optional hint). :param output_format: Output format ('text', 'markdown', 'structured'). :return: Transcribed text content and metadata. """ try: # Check if API key is configured if not self.valves.openrouter_api_key: return self._result( False, action="transcribe", subject_type="file", error="OpenRouter API key not configured. Please set openrouter_api_key in valves." ) base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="transcribe", subject_type="file", error="File not found", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result( False, action="transcribe", subject_type="file", error="Path is not a file" ) # Get file info mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" # Determine transcription strategy based on file type and mode if transcription_mode == "auto": if mime_type.startswith('image/'): transcription_mode = "ocr" elif mime_type == "application/pdf": transcription_mode = "extract" elif mime_type.startswith('text/'): transcription_mode = "read" else: transcription_mode = "extract" # Build appropriate prompt based on mode and format if transcription_mode == "ocr": if output_format == "markdown": prompt = "Please perform OCR on this image and return the text in clean Markdown format. Preserve any formatting, headers, lists, and structure you can identify." elif output_format == "structured": prompt = "Please perform OCR on this image and return the text in a structured format with clear sections, headers, and organization. Include any tables, lists, or special formatting." else: prompt = "Please perform OCR on this image and extract all visible text. Return only the text content, preserving line breaks and basic formatting." elif transcription_mode == "extract": if output_format == "markdown": prompt = "Please extract and transcribe all text content from this document and format it as clean Markdown. Preserve headings, lists, tables, and document structure." elif output_format == "structured": prompt = "Please extract all text from this document and organize it in a clear, structured format with proper sections, headings, and formatting." else: prompt = "Please extract and transcribe all text content from this document. Return the complete text preserving paragraphs and basic formatting." elif transcription_mode == "read": # For text files, we can read directly but still use AI for formatting if output_format == "markdown": prompt = "Please convert this text content to clean Markdown format, adding appropriate headers and formatting where suitable." elif output_format == "structured": prompt = "Please organize this text content into a well-structured format with clear sections and formatting." else: prompt = "Please clean up and format this text content, preserving the original meaning and structure." else: return self._result( False, action="transcribe", subject_type="file", error=f"Unsupported transcription mode: {transcription_mode}" ) # Add language hint if provided if language: prompt += f" The text is expected to be in {language}." # Use the existing OpenRouter upload method await self._emit_status(__event_emitter__, f"Transcribing via OpenRouter: {file_name}") result = await self._upload_file_to_openrouter( file_name=file_name, prompt=prompt, base_dir=base_dir, max_tokens=10000, # Higher token limit for transcription __event_emitter__=__event_emitter__ ) if result["ok"]: transcribed_text = result.get("analysis", "") # Post-process the transcribed text if output_format == "text": # Clean up any markdown formatting if we just want plain text import re transcribed_text = re.sub(r'[#*_`]', '', transcribed_text) transcribed_text = re.sub(r'\n\s*\n', '\n\n', transcribed_text) logger.info(f"File '{file_name}' transcribed successfully using mode '{transcription_mode}'") final = self._result( True, action="transcribe", subject_type="file", path=self._get_relative_path(file_path), transcribed_text=transcribed_text, transcription_mode=transcription_mode, output_format=output_format, mime_type=mime_type, model=result.get("model", ""), file_size=result.get("file_size", 0), language=language, word_count=len(transcribed_text.split()) if transcribed_text else 0, character_count=len(transcribed_text) if transcribed_text else 0, usage=result.get("usage", {}) ) await self._emit_status(__event_emitter__, "Transcription complete", done=True) return final else: return self._result( False, action="transcribe", subject_type="file", error=f"Transcription failed: {result.get('error', 'Unknown error')}", path=self._get_relative_path(file_path) ) except ValueError as e: return self._result( False, action="transcribe", subject_type="file", error=str(e) ) except Exception as e: return self._result( False, action="transcribe", subject_type="file", error=f"Unexpected error during transcription: {str(e)}" ) async def batch_transcribe_files( self, file_names: List[str], base_dir: Optional[str] = None, transcription_mode: str = "auto", language: Optional[str] = None, output_format: str = "text", __event_emitter__=None ) -> Dict[str, Any]: """ Transcribe multiple files in batch using OpenRouter models. :param file_names: List of file names to transcribe. :param base_dir: The base directory where the files are located. :param transcription_mode: Mode of transcription ('auto', 'ocr', 'extract', 'read'). :param language: Expected language of the text (optional hint). :param output_format: Output format ('text', 'markdown', 'structured'). :return: Results for each file transcription. """ try: if not file_names: return self._result( False, action="batch_transcribe", subject_type="files", error="No files specified for batch transcription" ) results = [] successful = 0 failed = 0 total_words = 0 total_characters = 0 total = len(file_names) for index, file_name in enumerate(file_names, start=1): await self._emit_status(__event_emitter__, f"[{index}/{total}] Transcribing: {file_name}") result = await self.transcribe_file( file_name=file_name, base_dir=base_dir, transcription_mode=transcription_mode, language=language, output_format=output_format, __event_emitter__=__event_emitter__ ) results.append({ "file": file_name, "success": result["ok"], "result": result }) if result["ok"]: successful += 1 total_words += result.get("word_count", 0) total_characters += result.get("character_count", 0) else: failed += 1 logger.info(f"Batch transcription completed: {successful} successful, {failed} failed") final = self._result( True, action="batch_transcribe", subject_type="files", total_files=len(file_names), successful=successful, failed=failed, total_words=total_words, total_characters=total_characters, transcription_mode=transcription_mode, output_format=output_format, language=language, results=results ) await self._emit_status(__event_emitter__, f"Batch transcription complete: {successful} successful, {failed} failed", done=True) return final except Exception as e: return self._result( False, action="batch_transcribe", subject_type="files", error=f"Batch transcription error: {str(e)}" ) async def describe_image( self, file_name: str, base_dir: Optional[str] = None, description_type: str = "detailed", model: Optional[str] = None, max_tokens: int = 1000, __event_emitter__=None ) -> Dict[str, Any]: """ Describe an image using OpenRouter vision models. :param file_name: The name of the image file to describe. :param base_dir: The base directory where the image is located. :param description_type: Type of description ('detailed', 'brief', 'technical', 'creative'). :param model: The OpenRouter model to use (overrides default). :param max_tokens: Maximum tokens for the response. :return: Image description and metadata. """ try: # Check if API key is configured if not self.valves.openrouter_api_key: return self._result( False, action="describe_image", subject_type="file", error="OpenRouter API key not configured. Please set openrouter_api_key in valves." ) base_path = base_dir if base_dir else "." file_path = self._resolve_under_restriction(os.path.join(base_path, file_name)) if not await aiofiles.os.path.exists(file_path): return self._result( False, action="describe_image", subject_type="file", error="Image file not found", path=self._get_relative_path(file_path) ) if not await aiofiles.os.path.isfile(file_path): return self._result( False, action="describe_image", subject_type="file", error="Path is not a file" ) # Check if file is an image mime_type, _ = mimetypes.guess_type(file_path) if not mime_type or not mime_type.startswith('image/'): return self._result( False, action="describe_image", subject_type="file", error=f"File is not an image. Detected type: {mime_type or 'unknown'}", path=self._get_relative_path(file_path) ) # Build appropriate prompt based on description type if description_type == "detailed": prompt = "Please provide a detailed description of this image. Include information about the main subjects, setting, colors, composition, mood, and any notable details or elements you observe." elif description_type == "brief": prompt = "Please provide a brief, concise description of what you see in this image." elif description_type == "technical": prompt = "Please provide a technical analysis of this image, including composition, lighting, color palette, photographic techniques, and visual elements." elif description_type == "creative": prompt = "Please provide a creative, artistic description of this image. Focus on the mood, atmosphere, story, and emotional impact of the visual elements." else: prompt = f"Please describe this image with a focus on: {description_type}" # Use the existing OpenRouter upload method await self._emit_status(__event_emitter__, f"Describing image via OpenRouter: {file_name}") result = await self._upload_file_to_openrouter( file_name=file_name, prompt=prompt, base_dir=base_dir, model=model, max_tokens=max_tokens, __event_emitter__=__event_emitter__ ) if result["ok"]: description = result.get("analysis", "") logger.info(f"Image '{file_name}' described successfully using '{description_type}' style") final = self._result( True, action="describe_image", subject_type="file", path=self._get_relative_path(file_path), description=description, description_type=description_type, mime_type=mime_type, model=result.get("model", ""), file_size=result.get("file_size", 0), word_count=len(description.split()) if description else 0, character_count=len(description) if description else 0, usage=result.get("usage", {}), response_id=result.get("response_id", "") ) await self._emit_status(__event_emitter__, "Image description complete", done=True) return final else: return self._result( False, action="describe_image", subject_type="file", error=f"Image description failed: {result.get('error', 'Unknown error')}", path=self._get_relative_path(file_path) ) except ValueError as e: return self._result( False, action="describe_image", subject_type="file", error=str(e) ) except Exception as e: return self._result( False, action="describe_image", subject_type="file", error=f"Unexpected error during image description: {str(e)}" ) async def batch_describe_images( self, file_names: List[str], base_dir: Optional[str] = None, description_type: str = "detailed", model: Optional[str] = None, max_tokens: int = 1000, __event_emitter__=None ) -> Dict[str, Any]: """ Describe multiple images in batch using OpenRouter vision models. :param file_names: List of image file names to describe. :param base_dir: The base directory where the images are located. :param description_type: Type of description ('detailed', 'brief', 'technical', 'creative'). :param model: The OpenRouter model to use (overrides default). :param max_tokens: Maximum tokens for each response. :return: Results for each image description. """ try: if not file_names: return self._result( False, action="batch_describe_images", subject_type="files", error="No image files specified for batch description" ) results = [] successful = 0 failed = 0 total_words = 0 total_characters = 0 total = len(file_names) for index, file_name in enumerate(file_names, start=1): await self._emit_status(__event_emitter__, f"[{index}/{total}] Describing image: {file_name}") result = await self.describe_image( file_name=file_name, base_dir=base_dir, description_type=description_type, model=model, max_tokens=max_tokens, __event_emitter__=__event_emitter__ ) results.append({ "file": file_name, "success": result["ok"], "result": result }) if result["ok"]: successful += 1 total_words += result.get("word_count", 0) total_characters += result.get("character_count", 0) else: failed += 1 logger.info(f"Batch image description completed: {successful} successful, {failed} failed") final = self._result( True, action="batch_describe_images", subject_type="files", total_files=len(file_names), successful=successful, failed=failed, total_words=total_words, total_characters=total_characters, description_type=description_type, model=model or self.valves.openrouter_model, results=results ) await self._emit_status(__event_emitter__, f"Batch image description complete: {successful} successful, {failed} failed", done=True) return final except Exception as e: return self._result( False, action="batch_describe_images", subject_type="files", error=f"Batch image description error: {str(e)}" )