🐛 fix(chat.py): rename flow_data_store variable to cache_manager for better clarity and consistency

 feat(chat.py): update references to flow_data_store to cache_manager for improved semantics and readability
🐛 fix(chat.py): fix incorrect cache key in chat_manager.set_cache() method call
🐛 fix(chat.py): fix incorrect cache key in chat_manager.chat_history.empty_history() method call
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-14 12:23:21 -03:00
commit 5df7341eec

View file

@ -31,15 +31,15 @@ async def chat(client_id: str, websocket: WebSocket):
@router.post("/build/init/{flow_id}", response_model=InitResponse, status_code=201)
async def init_build(graph_data: dict, flow_id: str):
"""Initialize the build by storing graph data and returning a unique session ID."""
flow_data_store = service_manager.get(ServiceType.CACHE_MANAGER)
cache_manager = service_manager.get(ServiceType.CACHE_MANAGER)
try:
if flow_id is None:
raise ValueError("No ID provided")
# Check if already building
if (
flow_id in flow_data_store
and isinstance(flow_data_store[flow_id], dict)
and flow_data_store[flow_id].get("status") == BuildStatus.IN_PROGRESS
flow_id in cache_manager
and isinstance(cache_manager[flow_id], dict)
and cache_manager[flow_id].get("status") == BuildStatus.IN_PROGRESS
):
return InitResponse(flowId=flow_id)
@ -48,7 +48,7 @@ async def init_build(graph_data: dict, flow_id: str):
if flow_id in chat_manager.cache_manager:
chat_manager.cache_manager.delete(flow_id)
logger.debug(f"Deleted flow {flow_id} from cache")
flow_data_store[flow_id] = {
cache_manager[flow_id] = {
"graph_data": graph_data,
"status": BuildStatus.STARTED,
}
@ -61,12 +61,12 @@ async def init_build(graph_data: dict, flow_id: str):
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
async def build_status(flow_id: str):
"""Check the flow_id is in the flow_data_store."""
flow_data_store = service_manager.get(ServiceType.CACHE_MANAGER)
"""Check the flow_id is in the cache_manager."""
cache_manager = service_manager.get(ServiceType.CACHE_MANAGER)
try:
built = (
flow_id in flow_data_store
and flow_data_store[flow_id]["status"] == BuildStatus.SUCCESS
flow_id in cache_manager
and cache_manager[flow_id]["status"] == BuildStatus.SUCCESS
)
return BuiltResponse(
@ -81,23 +81,23 @@ async def build_status(flow_id: str):
@router.get("/build/stream/{flow_id}", response_class=StreamingResponse)
async def stream_build(flow_id: str):
"""Stream the build process based on stored flow data."""
flow_data_store = service_manager.get(ServiceType.CACHE_MANAGER)
cache_manager = service_manager.get(ServiceType.CACHE_MANAGER)
async def event_stream(flow_id):
final_response = {"end_of_stream": True}
artifacts = {}
try:
if flow_id not in flow_data_store:
if flow_id not in cache_manager:
error_message = "Invalid session ID"
yield str(StreamData(event="error", data={"error": error_message}))
return
if flow_data_store[flow_id].get("status") == BuildStatus.IN_PROGRESS:
if cache_manager[flow_id].get("status") == BuildStatus.IN_PROGRESS:
error_message = "Already building"
yield str(StreamData(event="error", data={"error": error_message}))
return
graph_data = flow_data_store[flow_id].get("graph_data")
graph_data = cache_manager[flow_id].get("graph_data")
if not graph_data:
error_message = "No data provided"
@ -115,7 +115,7 @@ async def stream_build(flow_id: str):
return
number_of_nodes = len(graph.nodes)
flow_data_store[flow_id]["status"] = BuildStatus.IN_PROGRESS
cache_manager[flow_id]["status"] = BuildStatus.IN_PROGRESS
for i, vertex in enumerate(graph.generator_build(), 1):
try:
@ -137,7 +137,7 @@ async def stream_build(flow_id: str):
logger.exception(exc)
params = str(exc)
valid = False
flow_data_store[flow_id]["status"] = BuildStatus.FAILURE
cache_manager[flow_id]["status"] = BuildStatus.FAILURE
response = {
"valid": valid,
@ -162,14 +162,14 @@ async def stream_build(flow_id: str):
}
yield str(StreamData(event="message", data=input_keys_response))
chat_manager = service_manager.get(ServiceType.CHAT_MANAGER)
chat_manager.set_cache(f"{flow_id}_chat", langchain_object)
chat_manager.set_cache(flow_id, langchain_object)
# We need to reset the chat history
chat_manager.chat_history.empty_history(flow_id)
flow_data_store[flow_id]["status"] = BuildStatus.SUCCESS
cache_manager[flow_id]["status"] = BuildStatus.SUCCESS
except Exception as exc:
logger.exception(exc)
logger.error("Error while building the flow: %s", exc)
flow_data_store[flow_id]["status"] = BuildStatus.FAILURE
cache_manager[flow_id]["status"] = BuildStatus.FAILURE
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
yield str(StreamData(event="message", data=final_response))