🔧 refactor(chat.py): rename post_build to stream_build to improve semantics

🐛 fix(chat.py): fix generator_build method to yield node_repr and node_id
 feat(chat.py): add valid and id fields to error response to improve error handling
🔧 refactor(manager.py): change logger.exception to logger.error to log exceptions
🔧 refactor(graph/base.py): add logging to generator_build method to improve debugging
🔧 refactor(vertex/base.py): rename cache.base to cache.utils to improve semantics
The post_build method in chat.py was renamed to stream_build to better reflect its functionality. The generator_build method in graph/base.py was fixed to yield node_repr and node_id instead of node._built_object_repr() and node.id. The error response in chat.py now includes valid and id fields to improve error handling. logger.exception in manager.py was changed to logger.error to log exceptions. The generator_build method in graph/base.py now logs the sorted vertices to improve debugging. The cache.base module in vertex/base.py was renamed to cache.utils to better reflect its functionality.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-06-12 09:30:20 -03:00
commit 036818e9b9
4 changed files with 18 additions and 11 deletions

View file

@ -31,7 +31,7 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket):
@router.post("/build/{client_id}", response_class=StreamingResponse)
async def post_build(client_id: str, graph_data: dict):
async def stream_build(client_id: str, graph_data: dict):
"""Build langchain object from data_graph."""
async def event_stream(graph_data):
@ -42,13 +42,15 @@ async def post_build(client_id: str, graph_data: dict):
logger.debug("Building langchain object")
graph = Graph.from_payload(graph_data)
for node in graph.generator_build():
logger.debug(f"Building node {node.name}")
for node_repr, node_id in graph.generator_build():
logger.debug(
f"Building node {node_repr[:50]}{'...' if len(node_repr) > 50 else ''}"
)
response = json.dumps(
{
"valid": True,
"params": str(node._built_object_repr()),
"id": node.id,
"params": node_repr,
"id": node_id,
}
)
yield f"data: {response}\n\n" # SSE format
@ -57,7 +59,9 @@ async def post_build(client_id: str, graph_data: dict):
except Exception as exc:
logger.exception(exc)
error_response = json.dumps({"error": str(exc)})
error_response = json.dumps(
{"valid": False, "params": str(exc), "id": node_id}
)
yield f"data: {error_response}\n\n" # SSE format
return StreamingResponse(event_stream(graph_data), media_type="text/event-stream")

View file

@ -199,8 +199,8 @@ class ChatManager:
except Exception as e:
# Handle any exceptions that might occur
logger.exception(e)
self.close_connection(
logger.error(e)
await self.close_connection(
client_id=client_id,
code=status.WS_1011_INTERNAL_ERROR,
reason=str(e)[:120],
@ -213,5 +213,5 @@ class ChatManager:
reason="Client disconnected",
)
except Exception as e:
logger.exception(e)
logger.error(e)
self.disconnect(client_id)

View file

@ -10,6 +10,7 @@ from langflow.graph.vertex.types import (
)
from langflow.interface.tools.constants import FILE_TOOLS
from langflow.utils import payload
from langflow.utils.logger import logger
class Graph:
@ -146,8 +147,10 @@ class Graph:
"""Builds each
node in the graph and yields it."""
sorted_vertices = self.topological_sort()
logger.info("Sorted vertices: %s", sorted_vertices)
for node in sorted_vertices:
yield node.build()
node.build()
yield node._built_object_repr(), node.id
def get_node_neighbors(self, node: Vertex) -> Dict[Vertex, int]:
"""Returns the neighbors of a node."""

View file

@ -1,4 +1,4 @@
from langflow.cache import base as cache_utils
from langflow.cache import utils as cache_utils
from langflow.graph.vertex.constants import DIRECT_TYPES
from langflow.interface import loading
from langflow.interface.listing import ALL_TYPES_DICT