diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0f43ed42..bd91d3895 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,12 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} + STORE_API_KEY: ${{ secrets.STORE_API_KEY }} + TAVILY_API_KEY: ${{ secrets.TAVILY_API_KEY }} + jobs: check-nightly-status: name: Check PyPI Version Update diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 9fb41a4ba..660ce99bc 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -33,6 +33,8 @@ env: POETRY_VERSION: "1.8.2" NODE_VERSION: "21" PYTEST_RUN_PATH: "src/backend/tests" + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} jobs: build: @@ -69,6 +71,8 @@ jobs: max_attempts: 2 command: make unit_tests args="-x -vv --splits ${{ matrix.splitCount }} --group ${{ matrix.group }} --reruns 5" + + integration-tests: name: Integration Tests - Python ${{ matrix.python-version }} runs-on: ubuntu-latest diff --git a/src/backend/base/langflow/api/v2/files.py b/src/backend/base/langflow/api/v2/files.py index 391e0e2ce..879ff015d 100644 --- a/src/backend/base/langflow/api/v2/files.py +++ b/src/backend/base/langflow/api/v2/files.py @@ -11,7 +11,8 @@ from zoneinfo import ZoneInfo from fastapi import APIRouter, Depends, File, HTTPException, UploadFile from fastapi.responses import StreamingResponse -from sqlmodel import String, cast, col, select +from loguru import logger +from sqlmodel import col, select from langflow.api.schemas import UploadFileResponse from langflow.api.utils import CurrentActiveUser, DbSession @@ -112,35 +113,36 @@ async def upload_user_file( # Create a new database record for the uploaded file. try: - # Enforce unique constraint on name - # Name it as filename (1), (2), etc. - # Check if the file name already exists + # Enforce unique constraint on name, except for the special _mcp_servers file new_filename = file.filename try: root_filename, _ = new_filename.rsplit(".", 1) except ValueError: root_filename, _ = new_filename, "" - # Check if there are files with the same name - stmt = select(UserFile).where(cast(UserFile.name, String).like(f"{root_filename}%")) - existing_files = await session.exec(stmt) - files = existing_files.all() # Fetch all matching records + # Special handling for the MCP servers config file: always keep the same root filename + if root_filename == MCP_SERVERS_FILE: + # Check if an existing record exists; if so, delete it to replace with the new one + existing_mcp_file = await get_file_by_name(root_filename, current_user, session) + if existing_mcp_file: + await delete_file(existing_mcp_file.id, current_user, session, storage_service) + else: + # For normal files, ensure unique name by appending a count if necessary + stmt = select(UserFile).where(col(UserFile.name).like(f"{root_filename}%")) + existing_files = await session.exec(stmt) + files = existing_files.all() # Fetch all matching records - # If there are files with the same name, append a count to the filename - if files: - counts = [] + if files: + counts = [] - # Extract the count from the filename - for my_file in files: - match = re.search(r"\((\d+)\)(?=\.\w+$|$)", my_file.name) # Match (number) before extension or at end - if match: - counts.append(int(match.group(1))) + # Extract the count from the filename + for my_file in files: + match = re.search(r"\((\d+)\)(?=\.\w+$|$)", my_file.name) + if match: + counts.append(int(match.group(1))) - # Get the max count and increment by 1 - count = max(counts) if counts else 0 # Default to 0 if no matches found - - # Split the extension from the filename - root_filename = f"{root_filename} ({count + 1})" + count = max(counts) if counts else 0 + root_filename = f"{root_filename} ({count + 1})" # Compute the file size based on the path file_size = await storage_service.get_file_size( @@ -462,24 +464,26 @@ async def delete_file( ): """Delete a file by its ID.""" try: - # Fetch the file from the DB - file = await fetch_file_object(file_id, current_user, session) - if not file: + # Fetch the file object + file_to_delete = await fetch_file_object(file_id, current_user, session) + if not file_to_delete: raise HTTPException(status_code=404, detail="File not found") # Delete the file from the storage service - await storage_service.delete_file(flow_id=str(current_user.id), file_name=file.path) + await storage_service.delete_file(flow_id=str(current_user.id), file_name=file_to_delete.path) # Delete from the database - await session.delete(file) + await session.delete(file_to_delete) await session.flush() # Ensures delete is staged - await session.commit() # Commit deletion + except HTTPException: + # Re-raise HTTPException to avoid being caught by the generic exception handler + raise except Exception as e: - await session.rollback() # Rollback on failure + # Log and return a generic server error + logger.error("Error deleting file %s: %s", file_id, e) raise HTTPException(status_code=500, detail=f"Error deleting file: {e}") from e - - return {"message": "File deleted successfully"} + return {"detail": f"File {file_to_delete.name} deleted successfully"} @router.delete("") diff --git a/src/backend/base/langflow/api/v2/mcp.py b/src/backend/base/langflow/api/v2/mcp.py index 301b017a0..84e18bbdf 100644 --- a/src/backend/base/langflow/api/v2/mcp.py +++ b/src/backend/base/langflow/api/v2/mcp.py @@ -1,3 +1,4 @@ +import contextlib import json from io import BytesIO @@ -43,8 +44,22 @@ async def get_server_list( # Read the server configuration from a file using the files api server_config_file = await get_file_by_name(MCP_SERVERS_FILE, current_user, session) - # If the file does not exist, create a new one with an empty configuration - if not server_config_file: + # Attempt to download the configuration file content + try: + server_config_bytes = await download_file( + server_config_file.id if server_config_file else None, + current_user, + session, + storage_service=storage_service, + return_content=True, + ) + except (FileNotFoundError, HTTPException): + # Storage file missing - DB entry may be stale. Remove it and recreate. + if server_config_file: + with contextlib.suppress(Exception): + await delete_file(server_config_file.id, current_user, session, storage_service) + + # Create a fresh empty config await upload_server_config( {"mcpServers": {}}, current_user, @@ -52,24 +67,23 @@ async def get_server_list( storage_service=storage_service, settings_service=settings_service, ) + + # Fetch and download again server_config_file = await get_file_by_name(MCP_SERVERS_FILE, current_user, session) + if not server_config_file: + raise HTTPException(status_code=500, detail="Failed to create _mcp_servers.json") from None - # Make sure we have it now - if not server_config_file: - raise HTTPException(status_code=500, detail="Server configuration file not found.") + server_config_bytes = await download_file( + server_config_file.id, + current_user, + session, + storage_service=storage_service, + return_content=True, + ) - # Download the server configuration file content - server_config = await download_file( - server_config_file.id, - current_user, - session, - storage_service=storage_service, - return_content=True, - ) - - # Parse the JSON content + # Parse JSON content try: - servers = json.loads(server_config) + servers = json.loads(server_config_bytes) except json.JSONDecodeError: raise HTTPException(status_code=500, detail="Invalid server configuration file format.") from None diff --git a/src/backend/tests/unit/api/v2/test_files.py b/src/backend/tests/unit/api/v2/test_files.py index 599604b5a..3a8154052 100644 --- a/src/backend/tests/unit/api/v2/test_files.py +++ b/src/backend/tests/unit/api/v2/test_files.py @@ -187,7 +187,7 @@ async def test_delete_file(files_client, files_created_api_key): response = await files_client.delete(f"api/v2/files/{upload_response['id']}", headers=headers) assert response.status_code == 200 - assert response.json() == {"message": "File deleted successfully"} + assert response.json() == {"detail": "File test deleted successfully"} async def test_edit_file(files_client, files_created_api_key): diff --git a/src/backend/tests/unit/api/v2/test_mcp_servers_file.py b/src/backend/tests/unit/api/v2/test_mcp_servers_file.py new file mode 100644 index 000000000..9a68bcbd3 --- /dev/null +++ b/src/backend/tests/unit/api/v2/test_mcp_servers_file.py @@ -0,0 +1,164 @@ +import io +import uuid +from types import SimpleNamespace +from typing import TYPE_CHECKING + +import pytest +from fastapi import UploadFile + +# Module under test +from langflow.api.v2.files import MCP_SERVERS_FILE, upload_user_file + +if TYPE_CHECKING: + from langflow.services.database.models.file.model import File as UserFile + + +class FakeStorageService: # Minimal stub for storage interactions + def __init__(self): + # key -> bytes + self._store: dict[str, bytes] = {} + + async def save_file(self, flow_id: str, file_name: str, data: bytes): + self._store[f"{flow_id}/{file_name}"] = data + + async def get_file_size(self, flow_id: str, file_name: str): + return len(self._store.get(f"{flow_id}/{file_name}", b"")) + + async def delete_file(self, flow_id: str, file_name: str): + self._store.pop(f"{flow_id}/{file_name}", None) + + +class FakeResult: # Helper for Session.exec return + def __init__(self, rows): + self._rows = rows + + def first(self): + return self._rows[0] if self._rows else None + + def all(self): + return list(self._rows) + + +class FakeSession: # Minimal async session stub + def __init__(self): + self._db: dict[str, UserFile] = {} + + async def exec(self, stmt): + # Extremely simplified: detect by LIKE pattern or equality against name/id + # We only support SELECT UserFile WHERE name LIKE pattern or id equality + stmt_str = str(stmt) + if "user_file.name" in stmt_str: + # LIKE pattern extraction + pattern = stmt_str.split("like(")[-1].split(")")[0].strip('"%') + rows = [f for name, f in self._db.items() if name.startswith(pattern)] + return FakeResult(rows) + if "user_file.id" in stmt_str: + uid = stmt_str.split("=")[-1].strip().strip("'") + rows = [f for f in self._db.values() if str(f.id) == uid] + return FakeResult(rows) + return FakeResult([]) + + def add(self, obj): + self._db[obj.name] = obj + + async def commit(self): + return + + async def refresh(self, obj): # noqa: ARG002 + return + + async def delete(self, obj): + self._db.pop(obj.name, None) + + async def flush(self): + return + + +class FakeSettings: + max_file_size_upload: int = 10 # MB + + +@pytest.fixture +def current_user(): + class User(SimpleNamespace): + id: str + + return User(id=str(uuid.uuid4())) + + +@pytest.fixture +def storage_service(): + return FakeStorageService() + + +@pytest.fixture +def settings_service(): + return SimpleNamespace(settings=FakeSettings()) + + +@pytest.fixture +def session(): + return FakeSession() + + +@pytest.mark.asyncio +async def test_mcp_servers_upload_replace(session, storage_service, settings_service, current_user): + """Uploading _mcp_servers.json twice should keep single DB record and no rename.""" + content1 = b'{"mcpServers": {}}' + file1 = UploadFile(filename=f"{MCP_SERVERS_FILE}.json", file=io.BytesIO(content1)) + file1.size = len(content1) + + # First upload + await upload_user_file( + file=file1, + session=session, + current_user=current_user, + storage_service=storage_service, + settings_service=settings_service, + ) + + # DB should contain single entry named _mcp_servers + assert list(session._db.keys()) == [MCP_SERVERS_FILE] + + # Upload again with different content + content2 = b'{"mcpServers": {"everything": {}}}' + file2 = UploadFile(filename=f"{MCP_SERVERS_FILE}.json", file=io.BytesIO(content2)) + file2.size = len(content2) + + await upload_user_file( + file=file2, + session=session, + current_user=current_user, + storage_service=storage_service, + settings_service=settings_service, + ) + + # Still single record, same name + assert list(session._db.keys()) == [MCP_SERVERS_FILE] + + record = session._db[MCP_SERVERS_FILE] + # Storage path should match user_id/_mcp_servers.json + expected_path = f"{current_user.id}/{MCP_SERVERS_FILE}.json" + assert record.path == expected_path + + # Storage should have updated content + stored_bytes = storage_service._store[expected_path] + assert stored_bytes == content2 + + # Third upload with server config provided by user + content3 = ( + b'{"mcpServers": {"everything": {"command": "npx", "args": ["-y", "@modelcontextprotocol/server-everything"]}}}' + ) + file3 = UploadFile(filename=f"{MCP_SERVERS_FILE}.json", file=io.BytesIO(content3)) + file3.size = len(content3) + + await upload_user_file( + file=file3, + session=session, + current_user=current_user, + storage_service=storage_service, + settings_service=settings_service, + ) + + stored_bytes = storage_service._store[expected_path] + assert stored_bytes == content3