diff --git a/src/backend/langflow/api/base.py b/src/backend/langflow/api/base.py deleted file mode 100644 index 8cddc52e4..000000000 --- a/src/backend/langflow/api/base.py +++ /dev/null @@ -1,84 +0,0 @@ -from pydantic import BaseModel, validator - -from langflow.graph.utils import extract_input_variables_from_prompt - - -class CacheResponse(BaseModel): - data: dict - - -class Code(BaseModel): - code: str - - -class Prompt(BaseModel): - template: str - - -# Build ValidationResponse class for {"imports": {"errors": []}, "function": {"errors": []}} -class CodeValidationResponse(BaseModel): - imports: dict - function: dict - - @validator("imports") - def validate_imports(cls, v): - return v or {"errors": []} - - @validator("function") - def validate_function(cls, v): - return v or {"errors": []} - - -class PromptValidationResponse(BaseModel): - input_variables: list - - -INVALID_CHARACTERS = { - " ", - ",", - ".", - ":", - ";", - "!", - "?", - "/", - "\\", - "(", - ")", - "[", - "]", - "{", - "}", -} - - -def validate_prompt(template: str): - input_variables = extract_input_variables_from_prompt(template) - - # Check if there are invalid characters in the input_variables - input_variables = check_input_variables(input_variables) - - return PromptValidationResponse(input_variables=input_variables) - - -def check_input_variables(input_variables: list): - invalid_chars = [] - fixed_variables = [] - for variable in input_variables: - new_var = variable - for char in INVALID_CHARACTERS: - if char in variable: - invalid_chars.append(char) - new_var = new_var.replace(char, "") - fixed_variables.append(new_var) - if new_var != variable: - input_variables.remove(variable) - input_variables.append(new_var) - # If any of the input_variables is not in the fixed_variables, then it means that - # there are invalid characters in the input_variables - if any(var not in fixed_variables for var in input_variables): - raise ValueError( - f"Invalid input variables: {input_variables}. Please, use something like {fixed_variables} instead." - ) - - return input_variables diff --git a/src/backend/langflow/api/callback.py b/src/backend/langflow/api/callback.py deleted file mode 100644 index d63e107c4..000000000 --- a/src/backend/langflow/api/callback.py +++ /dev/null @@ -1,32 +0,0 @@ -import asyncio -from typing import Any - -from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler - -from langflow.api.schemas import ChatResponse - - -# https://github.com/hwchase17/chat-langchain/blob/master/callback.py -class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler): - """Callback handler for streaming LLM responses.""" - - def __init__(self, websocket): - self.websocket = websocket - - async def on_llm_new_token(self, token: str, **kwargs: Any) -> None: - resp = ChatResponse(message=token, type="stream", intermediate_steps="") - await self.websocket.send_json(resp.dict()) - - -class StreamingLLMCallbackHandler(BaseCallbackHandler): - """Callback handler for streaming LLM responses.""" - - def __init__(self, websocket): - self.websocket = websocket - - def on_llm_new_token(self, token: str, **kwargs: Any) -> None: - resp = ChatResponse(message=token, type="stream", intermediate_steps="") - - loop = asyncio.get_event_loop() - coroutine = self.websocket.send_json(resp.dict()) - asyncio.run_coroutine_threadsafe(coroutine, loop) diff --git a/src/backend/langflow/api/chat.py b/src/backend/langflow/api/chat.py deleted file mode 100644 index 4afa6c22f..000000000 --- a/src/backend/langflow/api/chat.py +++ /dev/null @@ -1,26 +0,0 @@ -from fastapi import ( - APIRouter, - WebSocket, - WebSocketDisconnect, - WebSocketException, - status, -) - -from langflow.api.chat_manager import ChatManager -from langflow.utils.logger import logger - -router = APIRouter() -chat_manager = ChatManager() - - -@router.websocket("/chat/{client_id}") -async def websocket_endpoint(client_id: str, websocket: WebSocket): - """Websocket endpoint for chat.""" - try: - await chat_manager.handle_websocket(client_id, websocket) - except WebSocketException as exc: - logger.error(exc) - await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc)) - except WebSocketDisconnect as exc: - logger.error(exc) - await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc)) diff --git a/src/backend/langflow/api/chat_manager.py b/src/backend/langflow/api/chat_manager.py deleted file mode 100644 index 8b1c7a621..000000000 --- a/src/backend/langflow/api/chat_manager.py +++ /dev/null @@ -1,223 +0,0 @@ -import asyncio -import json -from collections import defaultdict -from typing import Dict, List - -from fastapi import WebSocket, status - -from langflow.api.schemas import ChatMessage, ChatResponse, FileResponse -from langflow.cache import cache_manager -from langflow.cache.manager import Subject -from langflow.interface.run import ( - get_result_and_steps, - load_or_build_langchain_object, -) -from langflow.interface.utils import pil_to_base64, try_setting_streaming_options -from langflow.utils.logger import logger - - -class ChatHistory(Subject): - def __init__(self): - super().__init__() - self.history: Dict[str, List[ChatMessage]] = defaultdict(list) - - def add_message(self, client_id: str, message: ChatMessage): - """Add a message to the chat history.""" - - self.history[client_id].append(message) - - if not isinstance(message, FileResponse): - self.notify() - - def get_history(self, client_id: str, filter_messages=True) -> List[ChatMessage]: - """Get the chat history for a client.""" - if history := self.history.get(client_id, []): - if filter_messages: - return [msg for msg in history if msg.type not in ["start", "stream"]] - return history - else: - return [] - - def empty_history(self, client_id: str): - """Empty the chat history for a client.""" - self.history[client_id] = [] - - -class ChatManager: - def __init__(self): - self.active_connections: Dict[str, WebSocket] = {} - self.chat_history = ChatHistory() - self.cache_manager = cache_manager - self.cache_manager.attach(self.update) - - def on_chat_history_update(self): - """Send the last chat message to the client.""" - client_id = self.cache_manager.current_client_id - if client_id in self.active_connections: - chat_response = self.chat_history.get_history( - client_id, filter_messages=False - )[-1] - if chat_response.is_bot: - # Process FileResponse - if isinstance(chat_response, FileResponse): - # If data_type is pandas, convert to csv - if chat_response.data_type == "pandas": - chat_response.data = chat_response.data.to_csv() - elif chat_response.data_type == "image": - # Base64 encode the image - chat_response.data = pil_to_base64(chat_response.data) - # get event loop - loop = asyncio.get_event_loop() - - coroutine = self.send_json(client_id, chat_response) - asyncio.run_coroutine_threadsafe(coroutine, loop) - - def update(self): - if self.cache_manager.current_client_id in self.active_connections: - self.last_cached_object_dict = self.cache_manager.get_last() - # Add a new ChatResponse with the data - chat_response = FileResponse( - message=None, - type="file", - data=self.last_cached_object_dict["obj"], - data_type=self.last_cached_object_dict["type"], - ) - - self.chat_history.add_message( - self.cache_manager.current_client_id, chat_response - ) - - async def connect(self, client_id: str, websocket: WebSocket): - await websocket.accept() - self.active_connections[client_id] = websocket - - def disconnect(self, client_id: str): - self.active_connections.pop(client_id, None) - - async def send_message(self, client_id: str, message: str): - websocket = self.active_connections[client_id] - await websocket.send_text(message) - - async def send_json(self, client_id: str, message: ChatMessage): - websocket = self.active_connections[client_id] - await websocket.send_json(message.dict()) - - async def process_message(self, client_id: str, payload: Dict): - # Process the graph data and chat message - chat_message = payload.pop("message", "") - chat_message = ChatMessage(message=chat_message) - self.chat_history.add_message(client_id, chat_message) - - graph_data = payload - start_resp = ChatResponse(message=None, type="start", intermediate_steps="") - await self.send_json(client_id, start_resp) - - is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1 - # Generate result and thought - try: - logger.debug("Generating result and thought") - - result, intermediate_steps = await process_graph( - graph_data=graph_data, - is_first_message=is_first_message, - chat_message=chat_message, - websocket=self.active_connections[client_id], - ) - except Exception as e: - # Log stack trace - logger.exception(e) - self.chat_history.empty_history(client_id) - raise e - # Send a response back to the frontend, if needed - intermediate_steps = intermediate_steps or "" - history = self.chat_history.get_history(client_id, filter_messages=False) - file_responses = [] - if history: - # Iterate backwards through the history - for msg in reversed(history): - if isinstance(msg, FileResponse): - if msg.data_type == "image": - # Base64 encode the image - msg.data = pil_to_base64(msg.data) - file_responses.append(msg) - if msg.type == "start": - break - - response = ChatResponse( - message=result, - intermediate_steps=intermediate_steps.strip(), - type="end", - files=file_responses, - ) - await self.send_json(client_id, response) - self.chat_history.add_message(client_id, response) - - async def handle_websocket(self, client_id: str, websocket: WebSocket): - await self.connect(client_id, websocket) - - try: - chat_history = self.chat_history.get_history(client_id) - # iterate and make BaseModel into dict - chat_history = [chat.dict() for chat in chat_history] - await websocket.send_json(chat_history) - - while True: - json_payload = await websocket.receive_json() - try: - payload = json.loads(json_payload) - except TypeError: - payload = json_payload - if "clear_history" in payload: - self.chat_history.history[client_id] = [] - continue - - with self.cache_manager.set_client_id(client_id): - await self.process_message(client_id, payload) - - except Exception as e: - # Handle any exceptions that might occur - logger.exception(e) - # send a message to the client - await self.active_connections[client_id].close( - code=status.WS_1011_INTERNAL_ERROR, reason=str(e)[:120] - ) - self.disconnect(client_id) - finally: - try: - connection = self.active_connections.get(client_id) - if connection: - await connection.close(code=1000, reason="Client disconnected") - self.disconnect(client_id) - except Exception as e: - logger.exception(e) - self.disconnect(client_id) - - -async def process_graph( - graph_data: Dict, - is_first_message: bool, - chat_message: ChatMessage, - websocket: WebSocket, -): - langchain_object = load_or_build_langchain_object(graph_data, is_first_message) - langchain_object = try_setting_streaming_options(langchain_object, websocket) - logger.debug("Loaded langchain object") - - if langchain_object is None: - # Raise user facing error - raise ValueError( - "There was an error loading the langchain_object. Please, check all the nodes and try again." - ) - - # Generate result and thought - try: - logger.debug("Generating result and thought") - result, intermediate_steps = await get_result_and_steps( - langchain_object, chat_message.message or "", websocket=websocket - ) - logger.debug("Generated result and intermediate_steps") - return result, intermediate_steps - except Exception as e: - # Log stack trace - logger.exception(e) - raise e diff --git a/src/backend/langflow/api/endpoints.py b/src/backend/langflow/api/endpoints.py deleted file mode 100644 index 021a81ca8..000000000 --- a/src/backend/langflow/api/endpoints.py +++ /dev/null @@ -1,47 +0,0 @@ -import logging -from importlib.metadata import version - -from fastapi import APIRouter, HTTPException - -from langflow.api.schemas import ( - ExportedFlow, - GraphData, - PredictRequest, - PredictResponse, -) -from langflow.interface.run import process_graph_cached -from langflow.interface.types import build_langchain_types_dict - -# build router -router = APIRouter() -logger = logging.getLogger(__name__) - - -@router.get("/all") -def get_all(): - return build_langchain_types_dict() - - -@router.post("/predict", response_model=PredictResponse) -async def get_load(predict_request: PredictRequest): - try: - exported_flow: ExportedFlow = predict_request.exported_flow - graph_data: GraphData = exported_flow.data - data = graph_data.dict() - response = process_graph_cached(data, predict_request.message) - return PredictResponse(result=response.get("result", "")) - except Exception as e: - # Log stack trace - logger.exception(e) - raise HTTPException(status_code=500, detail=str(e)) from e - - -# get endpoint to return version of langflow -@router.get("/version") -def get_version(): - return {"version": version("langflow")} - - -@router.get("/health") -def get_health(): - return {"status": "OK"} diff --git a/src/backend/langflow/api/schemas.py b/src/backend/langflow/api/schemas.py deleted file mode 100644 index f73b0642d..000000000 --- a/src/backend/langflow/api/schemas.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import Any, Dict, List, Union - -from pydantic import BaseModel, validator - - -class GraphData(BaseModel): - """Data inside the exported flow.""" - - nodes: List[Dict[str, Any]] - edges: List[Dict[str, Any]] - - -class ExportedFlow(BaseModel): - """Exported flow from LangFlow.""" - - description: str - name: str - id: str - data: GraphData - - -class PredictRequest(BaseModel): - """Predict request schema.""" - - message: str - exported_flow: ExportedFlow - - -class PredictResponse(BaseModel): - """Predict response schema.""" - - result: str - - -class ChatMessage(BaseModel): - """Chat message schema.""" - - is_bot: bool = False - message: Union[str, None] = None - type: str = "human" - - -class ChatResponse(ChatMessage): - """Chat response schema.""" - - intermediate_steps: str - type: str - is_bot: bool = True - files: list = [] - - @validator("type") - def validate_message_type(cls, v): - if v not in ["start", "stream", "end", "error", "info", "file"]: - raise ValueError("type must be start, stream, end, error, info, or file") - return v - - -class FileResponse(ChatMessage): - """File response schema.""" - - data: Any - data_type: str - type: str = "file" - is_bot: bool = True - - @validator("data_type") - def validate_data_type(cls, v): - if v not in ["image", "csv"]: - raise ValueError("data_type must be image or csv") - return v diff --git a/src/backend/langflow/api/validate.py b/src/backend/langflow/api/validate.py deleted file mode 100644 index e90e554f0..000000000 --- a/src/backend/langflow/api/validate.py +++ /dev/null @@ -1,57 +0,0 @@ -import json - -from fastapi import APIRouter, HTTPException - -from langflow.api.base import ( - Code, - CodeValidationResponse, - Prompt, - PromptValidationResponse, - validate_prompt, -) -from langflow.graph.vertex.types import VectorStoreVertex -from langflow.interface.run import build_graph -from langflow.utils.logger import logger -from langflow.utils.validate import validate_code - -# build router -router = APIRouter(prefix="/validate", tags=["validate"]) - - -@router.post("/code", status_code=200, response_model=CodeValidationResponse) -def post_validate_code(code: Code): - try: - errors = validate_code(code.code) - return CodeValidationResponse( - imports=errors.get("imports", {}), - function=errors.get("function", {}), - ) - except Exception as e: - return HTTPException(status_code=500, detail=str(e)) - - -@router.post("/prompt", status_code=200, response_model=PromptValidationResponse) -def post_validate_prompt(prompt: Prompt): - try: - return validate_prompt(prompt.template) - except Exception as e: - logger.exception(e) - raise HTTPException(status_code=500, detail=str(e)) from e - - -# validate node -@router.post("/node/{node_id}", status_code=200) -def post_validate_node(node_id: str, data: dict): - try: - # build graph - graph = build_graph(data) - # validate node - node = graph.get_node(node_id) - if node is None: - raise ValueError(f"Node {node_id} not found") - if not isinstance(node, VectorStoreVertex): - node.build() - return json.dumps({"valid": True, "params": str(node._built_object_repr())}) - except Exception as e: - logger.exception(e) - return json.dumps({"valid": False, "params": str(e)})