chore: remove refactored files
This commit is contained in:
parent
3342e03a2c
commit
ac42e8a66c
7 changed files with 0 additions and 539 deletions
|
|
@ -1,84 +0,0 @@
|
|||
from pydantic import BaseModel, validator
|
||||
|
||||
from langflow.graph.utils import extract_input_variables_from_prompt
|
||||
|
||||
|
||||
class CacheResponse(BaseModel):
|
||||
data: dict
|
||||
|
||||
|
||||
class Code(BaseModel):
|
||||
code: str
|
||||
|
||||
|
||||
class Prompt(BaseModel):
|
||||
template: str
|
||||
|
||||
|
||||
# Build ValidationResponse class for {"imports": {"errors": []}, "function": {"errors": []}}
|
||||
class CodeValidationResponse(BaseModel):
|
||||
imports: dict
|
||||
function: dict
|
||||
|
||||
@validator("imports")
|
||||
def validate_imports(cls, v):
|
||||
return v or {"errors": []}
|
||||
|
||||
@validator("function")
|
||||
def validate_function(cls, v):
|
||||
return v or {"errors": []}
|
||||
|
||||
|
||||
class PromptValidationResponse(BaseModel):
|
||||
input_variables: list
|
||||
|
||||
|
||||
INVALID_CHARACTERS = {
|
||||
" ",
|
||||
",",
|
||||
".",
|
||||
":",
|
||||
";",
|
||||
"!",
|
||||
"?",
|
||||
"/",
|
||||
"\\",
|
||||
"(",
|
||||
")",
|
||||
"[",
|
||||
"]",
|
||||
"{",
|
||||
"}",
|
||||
}
|
||||
|
||||
|
||||
def validate_prompt(template: str):
|
||||
input_variables = extract_input_variables_from_prompt(template)
|
||||
|
||||
# Check if there are invalid characters in the input_variables
|
||||
input_variables = check_input_variables(input_variables)
|
||||
|
||||
return PromptValidationResponse(input_variables=input_variables)
|
||||
|
||||
|
||||
def check_input_variables(input_variables: list):
|
||||
invalid_chars = []
|
||||
fixed_variables = []
|
||||
for variable in input_variables:
|
||||
new_var = variable
|
||||
for char in INVALID_CHARACTERS:
|
||||
if char in variable:
|
||||
invalid_chars.append(char)
|
||||
new_var = new_var.replace(char, "")
|
||||
fixed_variables.append(new_var)
|
||||
if new_var != variable:
|
||||
input_variables.remove(variable)
|
||||
input_variables.append(new_var)
|
||||
# If any of the input_variables is not in the fixed_variables, then it means that
|
||||
# there are invalid characters in the input_variables
|
||||
if any(var not in fixed_variables for var in input_variables):
|
||||
raise ValueError(
|
||||
f"Invalid input variables: {input_variables}. Please, use something like {fixed_variables} instead."
|
||||
)
|
||||
|
||||
return input_variables
|
||||
|
|
@ -1,32 +0,0 @@
|
|||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
|
||||
|
||||
from langflow.api.schemas import ChatResponse
|
||||
|
||||
|
||||
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
|
||||
class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
|
||||
"""Callback handler for streaming LLM responses."""
|
||||
|
||||
def __init__(self, websocket):
|
||||
self.websocket = websocket
|
||||
|
||||
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
|
||||
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
|
||||
await self.websocket.send_json(resp.dict())
|
||||
|
||||
|
||||
class StreamingLLMCallbackHandler(BaseCallbackHandler):
|
||||
"""Callback handler for streaming LLM responses."""
|
||||
|
||||
def __init__(self, websocket):
|
||||
self.websocket = websocket
|
||||
|
||||
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
|
||||
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
coroutine = self.websocket.send_json(resp.dict())
|
||||
asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
from fastapi import (
|
||||
APIRouter,
|
||||
WebSocket,
|
||||
WebSocketDisconnect,
|
||||
WebSocketException,
|
||||
status,
|
||||
)
|
||||
|
||||
from langflow.api.chat_manager import ChatManager
|
||||
from langflow.utils.logger import logger
|
||||
|
||||
router = APIRouter()
|
||||
chat_manager = ChatManager()
|
||||
|
||||
|
||||
@router.websocket("/chat/{client_id}")
|
||||
async def websocket_endpoint(client_id: str, websocket: WebSocket):
|
||||
"""Websocket endpoint for chat."""
|
||||
try:
|
||||
await chat_manager.handle_websocket(client_id, websocket)
|
||||
except WebSocketException as exc:
|
||||
logger.error(exc)
|
||||
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
|
||||
except WebSocketDisconnect as exc:
|
||||
logger.error(exc)
|
||||
await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc))
|
||||
|
|
@ -1,223 +0,0 @@
|
|||
import asyncio
|
||||
import json
|
||||
from collections import defaultdict
|
||||
from typing import Dict, List
|
||||
|
||||
from fastapi import WebSocket, status
|
||||
|
||||
from langflow.api.schemas import ChatMessage, ChatResponse, FileResponse
|
||||
from langflow.cache import cache_manager
|
||||
from langflow.cache.manager import Subject
|
||||
from langflow.interface.run import (
|
||||
get_result_and_steps,
|
||||
load_or_build_langchain_object,
|
||||
)
|
||||
from langflow.interface.utils import pil_to_base64, try_setting_streaming_options
|
||||
from langflow.utils.logger import logger
|
||||
|
||||
|
||||
class ChatHistory(Subject):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.history: Dict[str, List[ChatMessage]] = defaultdict(list)
|
||||
|
||||
def add_message(self, client_id: str, message: ChatMessage):
|
||||
"""Add a message to the chat history."""
|
||||
|
||||
self.history[client_id].append(message)
|
||||
|
||||
if not isinstance(message, FileResponse):
|
||||
self.notify()
|
||||
|
||||
def get_history(self, client_id: str, filter_messages=True) -> List[ChatMessage]:
|
||||
"""Get the chat history for a client."""
|
||||
if history := self.history.get(client_id, []):
|
||||
if filter_messages:
|
||||
return [msg for msg in history if msg.type not in ["start", "stream"]]
|
||||
return history
|
||||
else:
|
||||
return []
|
||||
|
||||
def empty_history(self, client_id: str):
|
||||
"""Empty the chat history for a client."""
|
||||
self.history[client_id] = []
|
||||
|
||||
|
||||
class ChatManager:
|
||||
def __init__(self):
|
||||
self.active_connections: Dict[str, WebSocket] = {}
|
||||
self.chat_history = ChatHistory()
|
||||
self.cache_manager = cache_manager
|
||||
self.cache_manager.attach(self.update)
|
||||
|
||||
def on_chat_history_update(self):
|
||||
"""Send the last chat message to the client."""
|
||||
client_id = self.cache_manager.current_client_id
|
||||
if client_id in self.active_connections:
|
||||
chat_response = self.chat_history.get_history(
|
||||
client_id, filter_messages=False
|
||||
)[-1]
|
||||
if chat_response.is_bot:
|
||||
# Process FileResponse
|
||||
if isinstance(chat_response, FileResponse):
|
||||
# If data_type is pandas, convert to csv
|
||||
if chat_response.data_type == "pandas":
|
||||
chat_response.data = chat_response.data.to_csv()
|
||||
elif chat_response.data_type == "image":
|
||||
# Base64 encode the image
|
||||
chat_response.data = pil_to_base64(chat_response.data)
|
||||
# get event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
coroutine = self.send_json(client_id, chat_response)
|
||||
asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
|
||||
def update(self):
|
||||
if self.cache_manager.current_client_id in self.active_connections:
|
||||
self.last_cached_object_dict = self.cache_manager.get_last()
|
||||
# Add a new ChatResponse with the data
|
||||
chat_response = FileResponse(
|
||||
message=None,
|
||||
type="file",
|
||||
data=self.last_cached_object_dict["obj"],
|
||||
data_type=self.last_cached_object_dict["type"],
|
||||
)
|
||||
|
||||
self.chat_history.add_message(
|
||||
self.cache_manager.current_client_id, chat_response
|
||||
)
|
||||
|
||||
async def connect(self, client_id: str, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections[client_id] = websocket
|
||||
|
||||
def disconnect(self, client_id: str):
|
||||
self.active_connections.pop(client_id, None)
|
||||
|
||||
async def send_message(self, client_id: str, message: str):
|
||||
websocket = self.active_connections[client_id]
|
||||
await websocket.send_text(message)
|
||||
|
||||
async def send_json(self, client_id: str, message: ChatMessage):
|
||||
websocket = self.active_connections[client_id]
|
||||
await websocket.send_json(message.dict())
|
||||
|
||||
async def process_message(self, client_id: str, payload: Dict):
|
||||
# Process the graph data and chat message
|
||||
chat_message = payload.pop("message", "")
|
||||
chat_message = ChatMessage(message=chat_message)
|
||||
self.chat_history.add_message(client_id, chat_message)
|
||||
|
||||
graph_data = payload
|
||||
start_resp = ChatResponse(message=None, type="start", intermediate_steps="")
|
||||
await self.send_json(client_id, start_resp)
|
||||
|
||||
is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1
|
||||
# Generate result and thought
|
||||
try:
|
||||
logger.debug("Generating result and thought")
|
||||
|
||||
result, intermediate_steps = await process_graph(
|
||||
graph_data=graph_data,
|
||||
is_first_message=is_first_message,
|
||||
chat_message=chat_message,
|
||||
websocket=self.active_connections[client_id],
|
||||
)
|
||||
except Exception as e:
|
||||
# Log stack trace
|
||||
logger.exception(e)
|
||||
self.chat_history.empty_history(client_id)
|
||||
raise e
|
||||
# Send a response back to the frontend, if needed
|
||||
intermediate_steps = intermediate_steps or ""
|
||||
history = self.chat_history.get_history(client_id, filter_messages=False)
|
||||
file_responses = []
|
||||
if history:
|
||||
# Iterate backwards through the history
|
||||
for msg in reversed(history):
|
||||
if isinstance(msg, FileResponse):
|
||||
if msg.data_type == "image":
|
||||
# Base64 encode the image
|
||||
msg.data = pil_to_base64(msg.data)
|
||||
file_responses.append(msg)
|
||||
if msg.type == "start":
|
||||
break
|
||||
|
||||
response = ChatResponse(
|
||||
message=result,
|
||||
intermediate_steps=intermediate_steps.strip(),
|
||||
type="end",
|
||||
files=file_responses,
|
||||
)
|
||||
await self.send_json(client_id, response)
|
||||
self.chat_history.add_message(client_id, response)
|
||||
|
||||
async def handle_websocket(self, client_id: str, websocket: WebSocket):
|
||||
await self.connect(client_id, websocket)
|
||||
|
||||
try:
|
||||
chat_history = self.chat_history.get_history(client_id)
|
||||
# iterate and make BaseModel into dict
|
||||
chat_history = [chat.dict() for chat in chat_history]
|
||||
await websocket.send_json(chat_history)
|
||||
|
||||
while True:
|
||||
json_payload = await websocket.receive_json()
|
||||
try:
|
||||
payload = json.loads(json_payload)
|
||||
except TypeError:
|
||||
payload = json_payload
|
||||
if "clear_history" in payload:
|
||||
self.chat_history.history[client_id] = []
|
||||
continue
|
||||
|
||||
with self.cache_manager.set_client_id(client_id):
|
||||
await self.process_message(client_id, payload)
|
||||
|
||||
except Exception as e:
|
||||
# Handle any exceptions that might occur
|
||||
logger.exception(e)
|
||||
# send a message to the client
|
||||
await self.active_connections[client_id].close(
|
||||
code=status.WS_1011_INTERNAL_ERROR, reason=str(e)[:120]
|
||||
)
|
||||
self.disconnect(client_id)
|
||||
finally:
|
||||
try:
|
||||
connection = self.active_connections.get(client_id)
|
||||
if connection:
|
||||
await connection.close(code=1000, reason="Client disconnected")
|
||||
self.disconnect(client_id)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
self.disconnect(client_id)
|
||||
|
||||
|
||||
async def process_graph(
|
||||
graph_data: Dict,
|
||||
is_first_message: bool,
|
||||
chat_message: ChatMessage,
|
||||
websocket: WebSocket,
|
||||
):
|
||||
langchain_object = load_or_build_langchain_object(graph_data, is_first_message)
|
||||
langchain_object = try_setting_streaming_options(langchain_object, websocket)
|
||||
logger.debug("Loaded langchain object")
|
||||
|
||||
if langchain_object is None:
|
||||
# Raise user facing error
|
||||
raise ValueError(
|
||||
"There was an error loading the langchain_object. Please, check all the nodes and try again."
|
||||
)
|
||||
|
||||
# Generate result and thought
|
||||
try:
|
||||
logger.debug("Generating result and thought")
|
||||
result, intermediate_steps = await get_result_and_steps(
|
||||
langchain_object, chat_message.message or "", websocket=websocket
|
||||
)
|
||||
logger.debug("Generated result and intermediate_steps")
|
||||
return result, intermediate_steps
|
||||
except Exception as e:
|
||||
# Log stack trace
|
||||
logger.exception(e)
|
||||
raise e
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
import logging
|
||||
from importlib.metadata import version
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
|
||||
from langflow.api.schemas import (
|
||||
ExportedFlow,
|
||||
GraphData,
|
||||
PredictRequest,
|
||||
PredictResponse,
|
||||
)
|
||||
from langflow.interface.run import process_graph_cached
|
||||
from langflow.interface.types import build_langchain_types_dict
|
||||
|
||||
# build router
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@router.get("/all")
|
||||
def get_all():
|
||||
return build_langchain_types_dict()
|
||||
|
||||
|
||||
@router.post("/predict", response_model=PredictResponse)
|
||||
async def get_load(predict_request: PredictRequest):
|
||||
try:
|
||||
exported_flow: ExportedFlow = predict_request.exported_flow
|
||||
graph_data: GraphData = exported_flow.data
|
||||
data = graph_data.dict()
|
||||
response = process_graph_cached(data, predict_request.message)
|
||||
return PredictResponse(result=response.get("result", ""))
|
||||
except Exception as e:
|
||||
# Log stack trace
|
||||
logger.exception(e)
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
|
||||
# get endpoint to return version of langflow
|
||||
@router.get("/version")
|
||||
def get_version():
|
||||
return {"version": version("langflow")}
|
||||
|
||||
|
||||
@router.get("/health")
|
||||
def get_health():
|
||||
return {"status": "OK"}
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
from typing import Any, Dict, List, Union
|
||||
|
||||
from pydantic import BaseModel, validator
|
||||
|
||||
|
||||
class GraphData(BaseModel):
|
||||
"""Data inside the exported flow."""
|
||||
|
||||
nodes: List[Dict[str, Any]]
|
||||
edges: List[Dict[str, Any]]
|
||||
|
||||
|
||||
class ExportedFlow(BaseModel):
|
||||
"""Exported flow from LangFlow."""
|
||||
|
||||
description: str
|
||||
name: str
|
||||
id: str
|
||||
data: GraphData
|
||||
|
||||
|
||||
class PredictRequest(BaseModel):
|
||||
"""Predict request schema."""
|
||||
|
||||
message: str
|
||||
exported_flow: ExportedFlow
|
||||
|
||||
|
||||
class PredictResponse(BaseModel):
|
||||
"""Predict response schema."""
|
||||
|
||||
result: str
|
||||
|
||||
|
||||
class ChatMessage(BaseModel):
|
||||
"""Chat message schema."""
|
||||
|
||||
is_bot: bool = False
|
||||
message: Union[str, None] = None
|
||||
type: str = "human"
|
||||
|
||||
|
||||
class ChatResponse(ChatMessage):
|
||||
"""Chat response schema."""
|
||||
|
||||
intermediate_steps: str
|
||||
type: str
|
||||
is_bot: bool = True
|
||||
files: list = []
|
||||
|
||||
@validator("type")
|
||||
def validate_message_type(cls, v):
|
||||
if v not in ["start", "stream", "end", "error", "info", "file"]:
|
||||
raise ValueError("type must be start, stream, end, error, info, or file")
|
||||
return v
|
||||
|
||||
|
||||
class FileResponse(ChatMessage):
|
||||
"""File response schema."""
|
||||
|
||||
data: Any
|
||||
data_type: str
|
||||
type: str = "file"
|
||||
is_bot: bool = True
|
||||
|
||||
@validator("data_type")
|
||||
def validate_data_type(cls, v):
|
||||
if v not in ["image", "csv"]:
|
||||
raise ValueError("data_type must be image or csv")
|
||||
return v
|
||||
|
|
@ -1,57 +0,0 @@
|
|||
import json
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
|
||||
from langflow.api.base import (
|
||||
Code,
|
||||
CodeValidationResponse,
|
||||
Prompt,
|
||||
PromptValidationResponse,
|
||||
validate_prompt,
|
||||
)
|
||||
from langflow.graph.vertex.types import VectorStoreVertex
|
||||
from langflow.interface.run import build_graph
|
||||
from langflow.utils.logger import logger
|
||||
from langflow.utils.validate import validate_code
|
||||
|
||||
# build router
|
||||
router = APIRouter(prefix="/validate", tags=["validate"])
|
||||
|
||||
|
||||
@router.post("/code", status_code=200, response_model=CodeValidationResponse)
|
||||
def post_validate_code(code: Code):
|
||||
try:
|
||||
errors = validate_code(code.code)
|
||||
return CodeValidationResponse(
|
||||
imports=errors.get("imports", {}),
|
||||
function=errors.get("function", {}),
|
||||
)
|
||||
except Exception as e:
|
||||
return HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/prompt", status_code=200, response_model=PromptValidationResponse)
|
||||
def post_validate_prompt(prompt: Prompt):
|
||||
try:
|
||||
return validate_prompt(prompt.template)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
|
||||
# validate node
|
||||
@router.post("/node/{node_id}", status_code=200)
|
||||
def post_validate_node(node_id: str, data: dict):
|
||||
try:
|
||||
# build graph
|
||||
graph = build_graph(data)
|
||||
# validate node
|
||||
node = graph.get_node(node_id)
|
||||
if node is None:
|
||||
raise ValueError(f"Node {node_id} not found")
|
||||
if not isinstance(node, VectorStoreVertex):
|
||||
node.build()
|
||||
return json.dumps({"valid": True, "params": str(node._built_object_repr())})
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return json.dumps({"valid": False, "params": str(e)})
|
||||
Loading…
Add table
Add a link
Reference in a new issue