Refactor vertex streaming logic in build_vertex_stream function

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-27 21:25:27 -03:00
commit 116518202c

View file

@ -185,15 +185,16 @@ async def build_vertex(
chat_service.clear_cache(flow_id)
# Log the vertex build
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_data_response,
artifacts=artifacts,
)
if not vertex.will_stream:
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_data_response,
artifacts=artifacts,
)
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
@ -243,22 +244,31 @@ async def build_vertex_stream(
vertex: "ChatVertex" = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if not vertex.pinned or not vertex._built:
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
elif not vertex.pinned or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
number_of_chunks = 0
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
number_of_chunks += 1
yield str(stream_data)
logger.debug(f"Number of chunks: {number_of_chunks}")
elif vertex.result is not None:
stream_data = StreamData(
event="message",