Refactor vertex building and streaming endpoints

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-24 17:55:38 -03:00
commit f20feebb5c

View file

@ -56,7 +56,22 @@ async def get_vertices(
chat_service: "ChatService" = Depends(get_chat_service),
session=Depends(get_session),
):
"""Check the flow_id is in the flow_data_store."""
"""
Retrieve the vertices order for a given flow.
Args:
flow_id (str): The ID of the flow.
stop_component_id (str, optional): The ID of the stop component. Defaults to None.
start_component_id (str, optional): The ID of the start component. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
session (Session, optional): The session dependency. Defaults to Depends(get_session).
Returns:
VerticesOrderResponse: The response containing the ordered vertex IDs and the run ID.
Raises:
HTTPException: If there is an error checking the build status.
"""
try:
# First, we need to check if the flow_id is in the cache
graph = None
@ -100,7 +115,23 @@ async def build_vertex(
chat_service: "ChatService" = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
):
"""Build a vertex instead of the entire graph."""
"""Build a vertex instead of the entire graph.
Args:
flow_id (str): The ID of the flow.
vertex_id (str): The ID of the vertex to build.
background_tasks (BackgroundTasks): The background tasks object for logging.
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
Returns:
VertexBuildResponse: The response containing the built vertex information.
Raises:
HTTPException: If there is an error building the vertex.
"""
start_time = time.perf_counter()
next_runnable_vertices = []
@ -191,9 +222,6 @@ async def build_vertex(
raise HTTPException(status_code=500, detail=str(exc)) from exc
# Now onto an endpoint that is an SSE endpoint
# it will receive a component_id and a flow_id
#
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
flow_id: str,
@ -202,7 +230,31 @@ async def build_vertex_stream(
chat_service: "ChatService" = Depends(get_chat_service),
session_service: "SessionService" = Depends(get_session_service),
):
"""Build a vertex instead of the entire graph."""
"""Build a vertex instead of the entire graph.
This function is responsible for building a single vertex instead of the entire graph.
It takes the `flow_id` and `vertex_id` as required parameters, and an optional `session_id`.
It also depends on the `ChatService` and `SessionService` services.
If `session_id` is not provided, it retrieves the graph from the cache using the `chat_service`.
If `session_id` is provided, it loads the session data using the `session_service`.
Once the graph is obtained, it retrieves the specified vertex using the `vertex_id`.
If the vertex does not support streaming, an error is raised.
If the vertex has a built result, it sends the result as a chunk.
If the vertex is not frozen or not built, it streams the vertex data.
If the vertex has a result, it sends the result as a chunk.
If none of the above conditions are met, an error is raised.
If any exception occurs during the process, an error message is sent.
Finally, the stream is closed.
Returns:
A `StreamingResponse` object with the streamed vertex data in text/event-stream format.
Raises:
HTTPException: If an error occurs while building the vertex.
"""
try:
async def stream_vertex():