Remove unused endpoints

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-21 09:58:04 -03:00
commit ccd2355ca1

View file

@ -3,24 +3,16 @@ from typing import Optional
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
HTTPException,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import StreamingResponse
from langflow.api.utils import (
build_and_cache_graph,
build_input_keys_response,
format_elapsed_time,
)
from langflow.api.utils import build_and_cache_graph, format_elapsed_time
from langflow.api.v1.schemas import (
BuildStatus,
BuiltResponse,
InitResponse,
ResultData,
StreamData,
VertexBuildResponse,
VerticesOrderResponse,
)
@ -29,10 +21,8 @@ from langflow.services.auth.utils import (
get_current_active_user,
get_current_user_for_websocket,
)
from langflow.services.cache.service import BaseCacheService
from langflow.services.cache.utils import update_build_status
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_cache_service, get_chat_service, get_session
from langflow.services.deps import get_chat_service, get_session
from langflow.services.monitor.utils import log_vertex_build
from loguru import logger
from sqlmodel import Session
@ -81,184 +71,6 @@ async def chat(
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
@router.post("/build/init/{flow_id}", response_model=InitResponse, status_code=201)
async def init_build(
graph_data: dict,
flow_id: str,
current_user=Depends(get_current_active_user),
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Initialize the build by storing graph data and returning a unique session ID."""
try:
if flow_id is None:
raise ValueError("No ID provided")
# Check if already building
if (
flow_id in cache_service
and isinstance(cache_service[flow_id], dict)
and cache_service[flow_id].get("status") == BuildStatus.IN_PROGRESS
):
return InitResponse(flowId=flow_id)
# Delete from cache if already exists
if flow_id in chat_service.cache_service:
chat_service.cache_service.delete(flow_id)
logger.debug(f"Deleted flow {flow_id} from cache")
cache_service[flow_id] = {
"graph_data": graph_data,
"status": BuildStatus.STARTED,
"user_id": current_user.id,
}
return InitResponse(flowId=flow_id)
except Exception as exc:
logger.error(f"Error initializing build: {exc}")
return HTTPException(status_code=500, detail=str(exc))
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
async def build_status(
flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)
):
"""Check the flow_id is in the cache_service."""
try:
built = (
flow_id in cache_service
and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
)
return BuiltResponse(
built=built,
)
except Exception as exc:
logger.error(f"Error checking build status: {exc}")
return HTTPException(status_code=500, detail=str(exc))
@router.get("/build/stream/{flow_id}", response_class=StreamingResponse)
async def stream_build(
flow_id: str,
chat_service: "ChatService" = Depends(get_chat_service),
cache_service: "BaseCacheService" = Depends(get_cache_service),
):
"""Stream the build process based on stored flow data."""
async def event_stream(flow_id):
final_response = {"end_of_stream": True}
artifacts = {}
flow_cache = cache_service[flow_id]
flow_cache = flow_cache if isinstance(flow_cache, dict) else {}
try:
if flow_id not in cache_service:
error_message = "Invalid session ID"
yield str(StreamData(event="error", data={"error": error_message}))
return
if flow_cache.get("status") == BuildStatus.IN_PROGRESS:
error_message = "Already building"
yield str(StreamData(event="error", data={"error": error_message}))
return
graph_data = flow_cache.get("graph_data")
if not graph_data:
error_message = "No data provided"
yield str(StreamData(event="error", data={"error": error_message}))
return
logger.debug("Building langchain object")
# Some error could happen when building the graph
graph = Graph.from_payload(graph_data)
number_of_nodes = len(graph.vertices)
update_build_status(cache_service, flow_id, BuildStatus.IN_PROGRESS)
time_elapsed = ""
try:
user_id = flow_cache["user_id"]
except KeyError:
logger.debug("No user_id found in cache_service")
user_id = None
for i, vertex in enumerate(graph.generator_build(), 1):
start_time = time.perf_counter()
try:
log_dict = {
"log": f"Building node {vertex.vertex_type}",
}
yield str(StreamData(event="log", data=log_dict))
if vertex.is_task:
vertex = await try_running_celery_task(vertex, user_id)
else:
await vertex.build(user_id=user_id)
time_elapsed = format_elapsed_time(time.perf_counter() - start_time)
params = vertex._built_object_repr()
valid = True
logger.debug(f"Building node {str(vertex.vertex_type)}")
logger.debug(
f"Output: {params[:100]}{'...' if len(params) > 100 else ''}"
)
if vertex.artifacts:
# The artifacts will be prompt variables
# passed to build_input_keys_response
# to set the input_keys values
artifacts.update(vertex.artifacts)
except Exception as exc:
logger.exception(exc)
params = str(exc)
valid = False
time_elapsed = format_elapsed_time(time.perf_counter() - start_time)
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
vertex_id = (
vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
)
if vertex_id in graph.top_level_vertices:
response = {
"valid": valid,
"params": params,
"id": vertex_id,
"progress": round(i / number_of_nodes, 2),
"duration": time_elapsed,
}
yield str(StreamData(event="message", data=response))
langchain_object = await graph.build()
# Now we need to check the input_keys to send them to the client
if hasattr(langchain_object, "input_keys"):
input_keys_response = build_input_keys_response(
langchain_object, artifacts
)
else:
input_keys_response = {
"input_keys": None,
"memory_keys": [],
"handle_keys": [],
}
yield str(StreamData(event="message", data=input_keys_response))
chat_service.set_cache(flow_id, langchain_object)
# We need to reset the chat history
chat_service.chat_history.empty_history(flow_id)
update_build_status(cache_service, flow_id, BuildStatus.SUCCESS)
except Exception as exc:
logger.exception(exc)
logger.error("Error while building the flow: %s", exc)
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
yield str(StreamData(event="message", data=final_response))
try:
return StreamingResponse(event_stream(flow_id), media_type="text/event-stream")
except Exception as exc:
logger.error(f"Error streaming build: {exc}")
raise HTTPException(status_code=500, detail=str(exc))
async def try_running_celery_task(vertex, user_id):
# Try running the task in celery
# and set the task_id to the local vertex
@ -314,6 +126,7 @@ async def get_vertices(
async def build_vertex(
flow_id: str,
vertex_id: str,
background_tasks: BackgroundTasks,
chat_service: "ChatService" = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
):
@ -355,6 +168,7 @@ async def build_vertex(
params = vertex._built_object_repr()
valid = True
result_dict = vertex.result
artifacts = vertex.artifacts
else:
raise ValueError(f"No result found for vertex {vertex_id}")
chat_service.set_cache(flow_id, graph)
@ -366,7 +180,10 @@ async def build_vertex(
# If there's an error building the vertex
# we need to clear the cache
chat_service.clear_cache(flow_id)
await log_vertex_build(
# Log the vertex build
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,